You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
450 lines
21 KiB
450 lines
21 KiB
# foodie_engagement_tweet.py |
|
import json |
|
import logging |
|
import random |
|
import signal |
|
import sys |
|
import fcntl |
|
import os |
|
import time |
|
import re |
|
import unicodedata |
|
from datetime import datetime, timedelta, timezone |
|
import tweepy |
|
from openai import OpenAI |
|
from foodie_utils import post_tweet, load_post_counts, save_post_counts |
|
from foodie_config import ( |
|
AUTHORS, SUMMARY_MODEL, X_API_CREDENTIALS, AUTHOR_BACKGROUNDS_FILE, |
|
PERSONA_CONFIGS, ENGAGEMENT_REFERENCE_DATE_FILE |
|
) |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
REFERENCE_DATE_FILE = ENGAGEMENT_REFERENCE_DATE_FILE |
|
LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_engagement_tweet.lock" |
|
LOG_FILE = "/home/shane/foodie_automator/logs/foodie_engagement_tweet.log" |
|
LOG_PRUNE_DAYS = 30 |
|
MAX_RETRIES = 3 |
|
RETRY_BACKOFF = 2 |
|
URL = "https://insiderfoodie.com" |
|
URL_SHORTENED_LENGTH = 23 # Twitter's shortened URL length |
|
CURRENT_YEAR = "2025" # Explicitly set the current year for the prompt |
|
|
|
def setup_logging(): |
|
"""Initialize logging with pruning of old logs.""" |
|
try: |
|
# Ensure the logs directory exists |
|
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) |
|
|
|
# Configure logging first |
|
logging.basicConfig( |
|
filename=LOG_FILE, |
|
level=logging.DEBUG, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
) |
|
console_handler = logging.StreamHandler() |
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
console_handler.setLevel(logging.DEBUG) # Show DEBUG messages in console too |
|
logging.getLogger().addHandler(console_handler) |
|
logging.getLogger("openai").setLevel(logging.WARNING) |
|
logging.getLogger("tweepy").setLevel(logging.WARNING) |
|
|
|
# Now that logging is configured, prune old logs if the file exists |
|
if os.path.exists(LOG_FILE): |
|
with open(LOG_FILE, 'r') as f: |
|
lines = f.readlines() |
|
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS) |
|
pruned_lines = [] |
|
malformed_count = 0 |
|
for line in lines: |
|
if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit(): |
|
malformed_count += 1 |
|
continue |
|
try: |
|
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) |
|
if timestamp > cutoff: |
|
pruned_lines.append(line) |
|
except ValueError: |
|
malformed_count += 1 |
|
continue |
|
if malformed_count > 0: |
|
logging.info(f"Skipped {malformed_count} malformed log lines during pruning") |
|
with open(LOG_FILE, 'w') as f: |
|
f.writelines(pruned_lines) |
|
|
|
logging.info("Logging initialized for foodie_engagement_tweet.py") |
|
except Exception as e: |
|
# Fallback to console-only logging if file logging fails |
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
) |
|
console_handler = logging.StreamHandler() |
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
console_handler.setLevel(logging.DEBUG) |
|
logging.getLogger().addHandler(console_handler) |
|
logging.getLogger("openai").setLevel(logging.WARNING) |
|
logging.getLogger("tweepy").setLevel(logging.WARNING) |
|
logging.error(f"Failed to setup file logging to {LOG_FILE}: {e}. Falling back to console-only logging.") |
|
|
|
def acquire_lock(): |
|
"""Acquire a lock to prevent concurrent runs.""" |
|
os.makedirs(os.path.dirname(LOCK_FILE), exist_ok=True) |
|
lock_fd = open(LOCK_FILE, 'w') |
|
try: |
|
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) |
|
lock_fd.write(str(os.getpid())) |
|
lock_fd.flush() |
|
return lock_fd |
|
except IOError: |
|
logging.info("Another instance of foodie_engagement_tweet.py is running") |
|
sys.exit(0) |
|
|
|
def signal_handler(sig, frame): |
|
"""Handle termination signals gracefully.""" |
|
logging.info("Received termination signal, exiting...") |
|
sys.exit(0) |
|
|
|
signal.signal(signal.SIGTERM, signal_handler) |
|
signal.signal(signal.SIGINT, signal_handler) |
|
|
|
# Initialize OpenAI client |
|
try: |
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
if not os.getenv("OPENAI_API_KEY"): |
|
logging.error("OPENAI_API_KEY is not set in environment variables") |
|
raise ValueError("OPENAI_API_KEY is required") |
|
except Exception as e: |
|
logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True) |
|
sys.exit(1) |
|
|
|
# Load author backgrounds into a dictionary for faster lookup |
|
try: |
|
if not os.path.exists(AUTHOR_BACKGROUNDS_FILE): |
|
logging.error(f"Author backgrounds file not found at {AUTHOR_BACKGROUNDS_FILE}") |
|
raise FileNotFoundError(f"Author backgrounds file not found at {AUTHOR_BACKGROUNDS_FILE}") |
|
with open(AUTHOR_BACKGROUNDS_FILE, 'r') as f: |
|
background_list = json.load(f) |
|
if not isinstance(background_list, list): |
|
logging.error(f"Invalid format in {AUTHOR_BACKGROUNDS_FILE}: Expected a list, got {type(background_list)}") |
|
raise ValueError("Author backgrounds must be a list") |
|
AUTHOR_BACKGROUNDS = {} |
|
AUTHOR_BACKGROUNDS_LIST = background_list # Keep the original list for fallback lookup |
|
for bg in background_list: |
|
if "username" not in bg: |
|
logging.error(f"Invalid entry in {AUTHOR_BACKGROUNDS_FILE}: Missing 'username' key in {bg}") |
|
raise ValueError("Each author background must have a 'username' key") |
|
username = bg["username"] |
|
if not isinstance(username, str): |
|
logging.error(f"Invalid username type in {AUTHOR_BACKGROUNDS_FILE}: {username} (type: {type(username)})") |
|
raise ValueError("Username must be a string") |
|
# Normalize the username to handle encoding differences |
|
cleaned_username = unicodedata.normalize('NFC', username.strip().lower()) |
|
AUTHOR_BACKGROUNDS[cleaned_username] = bg |
|
logging.debug(f"Added to AUTHOR_BACKGROUNDS: key='{cleaned_username}', value={bg}") |
|
loaded_usernames = list(AUTHOR_BACKGROUNDS.keys()) |
|
logging.debug(f"Loaded author backgrounds: {loaded_usernames}") |
|
except Exception as e: |
|
logging.error(f"Failed to load author_backgrounds.json: {e}", exc_info=True) |
|
AUTHOR_BACKGROUNDS = {} |
|
AUTHOR_BACKGROUNDS_LIST = [] |
|
sys.exit(1) |
|
|
|
def validate_twitter_credentials(author): |
|
"""Validate Twitter API credentials for a specific author.""" |
|
username = author["username"] |
|
credentials = X_API_CREDENTIALS.get(username) |
|
if not credentials: |
|
logging.error(f"No X credentials found for {username}") |
|
return False |
|
for attempt in range(MAX_RETRIES): |
|
try: |
|
twitter_client = tweepy.Client( |
|
consumer_key=credentials["api_key"], |
|
consumer_secret=credentials["api_secret"], |
|
access_token=credentials["access_token"], |
|
access_token_secret=credentials["access_token_secret"] |
|
) |
|
user = twitter_client.get_me() |
|
logging.info(f"Credentials valid for {username} (handle: {credentials['x_username']})") |
|
return True |
|
except tweepy.TweepyException as e: |
|
logging.warning(f"Failed to validate credentials for {username} (attempt {attempt + 1}): {e}") |
|
if attempt < MAX_RETRIES - 1: |
|
time.sleep(RETRY_BACKOFF * (2 ** attempt)) |
|
else: |
|
logging.error(f"Credentials invalid for {username} after {MAX_RETRIES} attempts") |
|
return False |
|
return False |
|
|
|
def remove_emojis(text): |
|
"""Remove emojis from the given text, including variation selectors.""" |
|
# Unicode ranges for emojis, including variation selectors and combining characters |
|
emoji_pattern = re.compile( |
|
"[" |
|
"\U0001F600-\U0001F64F" # Emoticons |
|
"\U0001F300-\U0001F5FF" # Symbols & Pictographs |
|
"\U0001F680-\U0001F6FF" # Transport & Map Symbols |
|
"\U0001F700-\U0001F77F" # Alchemical Symbols |
|
"\U0001F780-\U0001F7FF" # Geometric Shapes Extended |
|
"\U0001F800-\U0001F8FF" # Supplemental Arrows-C |
|
"\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs |
|
"\U0001FA00-\U0001FA6F" # Chess Symbols |
|
"\U0001FA70-\U0001FAFF" # Symbols and Pictographs Extended-A |
|
"\U00002700-\U000027BF" # Dingbats |
|
"\U00002600-\U000026FF" # Miscellaneous Symbols |
|
"\U0000FE00-\U0000FE0F" # Variation Selectors |
|
"\U0000200D" # Zero Width Joiner |
|
"\U0000200C" # Zero Width Non-Joiner |
|
"]+", |
|
flags=re.UNICODE |
|
) |
|
return emoji_pattern.sub(r"", text) |
|
|
|
def get_reference_date(): |
|
"""Load or initialize the reference date for the 2-day interval.""" |
|
os.makedirs(os.path.dirname(REFERENCE_DATE_FILE), exist_ok=True) |
|
if os.path.exists(REFERENCE_DATE_FILE): |
|
try: |
|
with open(REFERENCE_DATE_FILE, 'r') as f: |
|
data = json.load(f) |
|
reference_date = datetime.fromisoformat(data["reference_date"]).replace(tzinfo=timezone.utc) |
|
logging.info(f"Loaded reference date: {reference_date.date()}") |
|
return reference_date |
|
except (json.JSONDecodeError, KeyError, ValueError) as e: |
|
logging.error(f"Failed to load reference date from {REFERENCE_DATE_FILE}: {e}. Initializing new date.") |
|
|
|
reference_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) |
|
try: |
|
with open(REFERENCE_DATE_FILE, 'w') as f: |
|
json.dump({"reference_date": reference_date.isoformat()}, f) |
|
logging.info(f"Initialized reference date: {reference_date.date()}") |
|
except Exception as e: |
|
logging.error(f"Failed to save reference date to {REFERENCE_DATE_FILE}: {e}. Using current date.") |
|
return reference_date |
|
|
|
def generate_engagement_tweet(author): |
|
"""Generate an engagement tweet using author background themes and persona.""" |
|
username = author["username"] |
|
if not validate_twitter_credentials(author): |
|
logging.error(f"Skipping tweet generation for {username} due to invalid credentials") |
|
return None |
|
|
|
credentials = X_API_CREDENTIALS.get(username) |
|
author_handle = credentials["x_username"] |
|
persona = author["persona"] |
|
persona_config = PERSONA_CONFIGS.get(persona, PERSONA_CONFIGS["Visionary Editor"]) |
|
|
|
# Normalize and lookup background |
|
username_cleaned = unicodedata.normalize('NFC', username.strip().lower()) |
|
logging.debug(f"Looking up background for username: raw='{username}', cleaned='{username_cleaned}'") |
|
background = AUTHOR_BACKGROUNDS.get(username_cleaned, {}) |
|
|
|
# Debug comparison |
|
available_usernames = list(AUTHOR_BACKGROUNDS.keys()) |
|
if username_cleaned in available_usernames: |
|
logging.debug(f"Direct key check: '{username_cleaned}' found in AUTHOR_BACKGROUNDS keys") |
|
else: |
|
logging.debug(f"Direct key check: '{username_cleaned}' NOT found in AUTHOR_BACKGROUNDS keys") |
|
# Byte-level comparison for the first available username |
|
if available_usernames: |
|
sample_key = available_usernames[0] |
|
logging.debug( |
|
f"Byte-level comparison sample: " |
|
f"username_cleaned bytes = {list(username_cleaned.encode('utf-8'))}, " |
|
f"sample background key bytes = {list(sample_key.encode('utf-8'))}" |
|
) |
|
|
|
# Fallback lookup if dictionary fails |
|
if not background: |
|
logging.debug(f"Dictionary lookup failed for '{username_cleaned}', attempting fallback lookup") |
|
for bg in AUTHOR_BACKGROUNDS_LIST: |
|
bg_username = bg.get("username", "") |
|
if not isinstance(bg_username, str): |
|
logging.warning(f"Skipping background entry with non-string username: {bg_username} (type: {type(bg_username)})") |
|
continue |
|
bg_username_cleaned = unicodedata.normalize('NFC', bg_username.strip().lower()) |
|
logging.debug( |
|
f"Fallback comparison: " |
|
f"author username (cleaned) = '{username_cleaned}', " |
|
f"background username (cleaned) = '{bg_username_cleaned}'" |
|
) |
|
if bg_username_cleaned == username_cleaned: |
|
background = bg |
|
logging.debug(f"Fallback lookup succeeded for '{username_cleaned}'") |
|
break |
|
|
|
if not background or "engagement_themes" not in background: |
|
logging.warning( |
|
f"No background or engagement themes found for {username}. " |
|
f"Attempted username (cleaned): {username_cleaned}. " |
|
f"Available usernames: {available_usernames}. Using default theme." |
|
) |
|
theme = "food trends" |
|
else: |
|
theme = random.choice(background["engagement_themes"]) |
|
logging.debug(f"Selected engagement theme '{theme}' for {username}") |
|
|
|
base_prompt = persona_config["x_prompt"].format( |
|
description=persona_config["description"], |
|
tone=persona_config["tone"] |
|
) |
|
prompt = ( |
|
f"{base_prompt}\n\n" |
|
f"Generate an engagement tweet for {author_handle} asking a question about {theme} to engage the public. " |
|
f"The current year is {CURRENT_YEAR}, and all references to the year should use {CURRENT_YEAR}. " |
|
f"Keep it under 230 characters to ensure room for the URL. " |
|
f"Use {persona_config['tone']}. " |
|
f"Include a call to action to follow {author_handle} or like the tweet, followed by the URL {URL} (do not mention InsiderFoodie.com separately in the text). " |
|
f"Strictly avoid using any emojis, hashtags, or reward-driven incentives (e.g., giveaways)—do not include them under any circumstances. " |
|
f"Return only the tweet text." |
|
) |
|
|
|
for attempt in range(MAX_RETRIES): |
|
try: |
|
response = client.chat.completions.create( |
|
model=SUMMARY_MODEL, |
|
messages=[ |
|
{"role": "system", "content": "You are a social media expert crafting engaging tweets."}, |
|
{"role": "user", "content": prompt} |
|
], |
|
max_tokens=80, |
|
temperature=0.7 |
|
) |
|
tweet = response.choices[0].message.content.strip() |
|
# Remove emojis as a safeguard |
|
tweet = remove_emojis(tweet) |
|
# Check for duplicate URLs and remove if present |
|
url_count = tweet.lower().count(URL.lower()) |
|
if url_count > 1: |
|
logging.warning(f"Generated tweet for {username} contains duplicate URLs: {tweet}") |
|
# Keep only the last occurrence of the URL |
|
last_url_pos = tweet.rfind(URL) |
|
tweet = tweet[:last_url_pos].replace(URL, "").strip() + " " + URL |
|
logging.debug(f"Revised tweet after removing duplicate URL: {tweet}") |
|
|
|
# Ensure the URL is at the end of the tweet |
|
if not tweet.endswith(URL): |
|
tweet = tweet.replace(URL, "").strip() + " " + URL |
|
|
|
# Calculate tweet length considering Twitter's URL shortening |
|
tweet_without_url = tweet.replace(URL, "") |
|
total_length = len(tweet_without_url) + URL_SHORTENED_LENGTH |
|
if total_length > 280: |
|
logging.warning(f"Tweet for {username} exceeds 280 characters ({total_length}), truncating") |
|
tweet_without_url = tweet_without_url[:(280 - URL_SHORTENED_LENGTH - 3)] |
|
tweet = tweet_without_url + "..." + " " + URL |
|
total_length = len(tweet_without_url) + 3 + URL_SHORTENED_LENGTH |
|
logging.debug(f"Final tweet for {username} (length {total_length}): {tweet}") |
|
return tweet |
|
except Exception as e: |
|
logging.warning(f"Failed to generate engagement tweet for {username} (attempt {attempt + 1}): {e}") |
|
if attempt < MAX_RETRIES - 1: |
|
time.sleep(RETRY_BACKOFF * (2 ** attempt)) |
|
else: |
|
logging.error(f"Failed to generate engagement tweet after {MAX_RETRIES} attempts") |
|
fallback = ( |
|
f"What's the hottest {theme}? Share and follow {author_handle} for more! {URL}" |
|
) |
|
# Ensure fallback tweet is within length limits |
|
tweet_without_url = fallback.replace(URL, "") |
|
total_length = len(tweet_without_url) + URL_SHORTENED_LENGTH |
|
if total_length > 280: |
|
tweet_without_url = tweet_without_url[:(280 - URL_SHORTENED_LENGTH - 3)] |
|
fallback = tweet_without_url + "..." + " " + URL |
|
# Remove emojis from fallback as well |
|
fallback = remove_emojis(fallback) |
|
logging.info(f"Using fallback engagement tweet: {fallback}") |
|
return fallback |
|
return None |
|
|
|
def post_engagement_tweet(): |
|
"""Post engagement tweets for authors every 2 days.""" |
|
try: |
|
logging.info("Starting foodie_engagement_tweet.py") |
|
print("Starting foodie_engagement_tweet.py") |
|
|
|
reference_date = get_reference_date() |
|
current_date = datetime.now(timezone.utc) |
|
days_since_reference = (current_date - reference_date).days |
|
logging.info(f"Days since reference date ({reference_date.date()}): {days_since_reference}") |
|
print(f"Days since reference date ({reference_date.date()}): {days_since_reference}") |
|
|
|
if days_since_reference % 2 == 0: |
|
logging.info("Today is an engagement tweet day (every 2 days). Posting...") |
|
print("Today is an engagement tweet day (every 2 days). Posting...") |
|
|
|
post_counts = load_post_counts() |
|
|
|
for author in AUTHORS: |
|
username = author["username"] |
|
try: |
|
author_count = next((entry for entry in post_counts if entry["username"] == username), None) |
|
if not author_count: |
|
logging.warning(f"No post count entry for {username}, initializing new entry") |
|
author_count = { |
|
"username": username, |
|
"month": datetime.now(timezone.utc).strftime("%Y-%m"), |
|
"monthly_count": 0, |
|
"day": datetime.now(timezone.utc).strftime("%Y-%m-%d"), |
|
"daily_count": 0 |
|
} |
|
post_counts.append(author_count) |
|
save_post_counts(post_counts) |
|
|
|
if author_count["monthly_count"] >= 500: |
|
logging.warning(f"Monthly post limit (500) reached for {username}, skipping") |
|
continue |
|
if author_count["daily_count"] >= 20: |
|
logging.warning(f"Daily post limit (20) reached for {username}, skipping") |
|
continue |
|
|
|
tweet = generate_engagement_tweet(author) |
|
if not tweet: |
|
logging.error(f"Failed to generate engagement tweet for {username}, skipping") |
|
continue |
|
|
|
logging.info(f"Posting engagement tweet for {username}: {tweet}") |
|
print(f"Posting engagement tweet for {username}: {tweet}") |
|
if post_tweet(author, tweet): |
|
logging.info(f"Successfully posted engagement tweet for {username}") |
|
author_count["monthly_count"] += 1 |
|
author_count["daily_count"] += 1 |
|
save_post_counts(post_counts) |
|
else: |
|
logging.warning(f"Failed to post engagement tweet for {username}") |
|
except Exception as e: |
|
logging.error(f"Error posting engagement tweet for {username}: {e}", exc_info=True) |
|
continue |
|
else: |
|
logging.info(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...") |
|
print(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...") |
|
|
|
logging.info("Completed foodie_engagement_tweet.py") |
|
print("Completed foodie_engagement_tweet.py") |
|
except Exception as e: |
|
logging.error(f"Unexpected error in post_engagement_tweet: {e}", exc_info=True) |
|
print(f"Error in post_engagement_tweet: {e}") |
|
|
|
def main(): |
|
"""Main function to run the script.""" |
|
lock_fd = None |
|
try: |
|
lock_fd = acquire_lock() |
|
setup_logging() |
|
post_engagement_tweet() |
|
except Exception as e: |
|
logging.error(f"Fatal error in main: {e}", exc_info=True) |
|
print(f"Fatal error: {e}") |
|
sys.exit(1) |
|
finally: |
|
if lock_fd: |
|
fcntl.flock(lock_fd, fcntl.LOCK_UN) |
|
lock_fd.close() |
|
os.remove(LOCK_FILE) if os.path.exists(LOCK_FILE) else None |
|
|
|
if __name__ == "__main__": |
|
main() |