# foodie_weekly_thread.py import json import os import logging import random import signal import sys import fcntl import time import re from datetime import datetime, timedelta, timezone from openai import OpenAI from foodie_utils import AUTHORS, SUMMARY_MODEL, load_json_file, save_json_file, update_system_activity from foodie_config import X_API_CREDENTIALS, RECENT_POSTS_FILE from dotenv import load_dotenv import shutil load_dotenv() SCRIPT_NAME = "foodie_weekly_thread" LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_weekly_thread.lock" LOG_FILE = "/home/shane/foodie_automator/logs/foodie_weekly_thread.log" WEEKLY_THREADS_FILE = "/home/shane/foodie_automator/weekly_threads.json" LOG_PRUNE_DAYS = 30 MAX_RETRIES = 3 RETRY_BACKOFF = 2 def setup_logging(): """Initialize logging with pruning of old logs.""" try: os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) if os.path.exists(LOG_FILE): with open(LOG_FILE, 'r') as f: lines = f.readlines() cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS) pruned_lines = [] malformed_count = 0 for line in lines: if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit(): malformed_count += 1 continue try: timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) if timestamp > cutoff: pruned_lines.append(line) except ValueError: malformed_count += 1 continue if malformed_count > 0: logging.info(f"Skipped {malformed_count} malformed log lines during pruning") with open(LOG_FILE, 'w') as f: f.writelines(pruned_lines) logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logging.getLogger().addHandler(console_handler) logging.getLogger("openai").setLevel(logging.WARNING) logging.info("Logging initialized for foodie_weekly_thread.py") except Exception as e: print(f"Failed to setup logging: {e}") sys.exit(1) def acquire_lock(): """Acquire a lock to prevent concurrent runs.""" os.makedirs(os.path.dirname(LOCK_FILE), exist_ok=True) lock_fd = open(LOCK_FILE, 'w') try: fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) lock_fd.write(str(os.getpid())) lock_fd.flush() return lock_fd except IOError: logging.info("Another instance of foodie_weekly_thread.py is running") sys.exit(0) def signal_handler(sig, frame): """Handle termination signals gracefully.""" logging.info("Received termination signal, marking script as stopped...") update_system_activity(SCRIPT_NAME, "stopped") sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Initialize OpenAI client try: client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) if not os.getenv("OPENAI_API_KEY"): logging.error("OPENAI_API_KEY is not set in environment variables") raise ValueError("OPENAI_API_KEY is required") except Exception as e: logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True) sys.exit(1) def generate_intro_tweet(author): """Generate an intro tweet for the weekly thread.""" credentials = X_API_CREDENTIALS.get(author["username"]) if not credentials: logging.error(f"No X credentials found for {author['username']}") return None author_handle = credentials["x_username"] logging.debug(f"Generating intro tweet for {author_handle}") prompt = ( f"Generate a concise tweet (under 200 characters) for {author_handle}. " f"Introduce a thread of their top 10 foodie posts of the week on InsiderFoodie.com. " f"Make it engaging, create curiosity, and include a call to action to visit InsiderFoodie.com or follow {author_handle}. " f"Avoid using the word 'elevate'—use humanized language like 'level up' or 'bring to life'. " f"Strictly exclude emojis, hashtags, or reward-driven incentives (e.g., giveaways)." ) for attempt in range(MAX_RETRIES): try: response = client.chat.completions.create( model=SUMMARY_MODEL, messages=[ {"role": "system", "content": "You are a social media expert crafting engaging tweets."}, {"role": "user", "content": prompt} ], max_tokens=150, temperature=0.7 ) tweet = response.choices[0].message.content.strip() tweet = re.sub(r'[\U0001F000-\U0001FFFF]', '', tweet) # Remove emojis if len(tweet) > 280: tweet = tweet[:277] + "..." logging.debug(f"Generated intro tweet: {tweet}") return tweet except Exception as e: logging.warning(f"Failed to generate intro tweet for {author['username']} (attempt {attempt + 1}): {e}") if attempt < MAX_RETRIES - 1: time.sleep(RETRY_BACKOFF * (2 ** attempt)) else: logging.error(f"Failed to generate intro tweet after {MAX_RETRIES} attempts") fallback = ( f"Top 10 foodie posts this week by {author_handle}! Visit InsiderFoodie.com and follow {author_handle} for more." ) logging.info(f"Using fallback intro tweet: {fallback}") return fallback def generate_final_cta(author): """Generate a final CTA tweet for the weekly thread using GPT.""" credentials = X_API_CREDENTIALS.get(author["username"]) if not credentials: logging.error(f"No X credentials found for {author['username']}") return None author_handle = credentials["x_username"] logging.debug(f"Generating final CTA tweet for {author_handle}") prompt = ( f"Generate a concise tweet (under 200 characters) for {author_handle}. " f"Conclude a thread of their top 10 foodie posts of the week on InsiderFoodie.com. " f"Make it engaging, value-driven, in the style of Neil Patel. " f"Include a call to action to visit InsiderFoodie.com and follow {author_handle}. " f"Mention that top 10 foodie trends are shared every Monday. " f"Avoid using the word 'elevate'—use humanized language like 'level up' or 'bring to life'. " f"Strictly exclude emojis, hashtags, or reward-driven incentives (e.g., giveaways)." ) for attempt in range(MAX_RETRIES): try: response = client.chat.completions.create( model=SUMMARY_MODEL, messages=[ {"role": "system", "content": "You are a social media expert crafting engaging tweets."}, {"role": "user", "content": prompt} ], max_tokens=150, temperature=0.7 ) tweet = response.choices[0].message.content.strip() tweet = re.sub(r'[\U0001F000-\U0001FFFF]', '', tweet) # Remove emojis if len(tweet) > 280: tweet = tweet[:277] + "..." logging.debug(f"Generated final CTA tweet: {tweet}") return tweet except Exception as e: logging.warning(f"Failed to generate final CTA tweet for {author['username']} (attempt {attempt + 1}): {e}") if attempt < MAX_RETRIES - 1: time.sleep(RETRY_BACKOFF * (2 ** attempt)) else: logging.error(f"Failed to generate final CTA tweet after {MAX_RETRIES} attempts") fallback = ( f"Want more foodie insights? Visit insiderfoodie.com and follow {author_handle} " f"for top 10 foodie trends every Monday." ) logging.info(f"Using fallback final CTA tweet: {fallback}") return fallback def load_recent_posts(): """Load and deduplicate posts from recent_posts.json.""" logging.debug(f"Attempting to load posts from {RECENT_POSTS_FILE}") posts = load_json_file(RECENT_POSTS_FILE) if not posts: logging.warning(f"No valid posts loaded from {RECENT_POSTS_FILE}") return [] # Deduplicate posts unique_posts = {} for post in posts: try: required_fields = ["title", "url", "author_username", "timestamp"] if not all(key in post for key in required_fields): logging.warning(f"Skipping invalid post: missing fields {post}") continue datetime.fromisoformat(post["timestamp"].replace('Z', '+00:00')) key = (post["title"], post["url"], post["author_username"]) if key not in unique_posts: unique_posts[key] = post else: logging.debug(f"Skipping duplicate post: {post['title']}") except (KeyError, ValueError) as e: logging.warning(f"Skipping post due to invalid format: {e}") continue deduped_posts = list(unique_posts.values()) logging.info(f"Loaded {len(deduped_posts)} unique posts from {RECENT_POSTS_FILE}") return deduped_posts def filter_posts_for_week(posts, start_date, end_date): """Filter posts within the given week range.""" filtered_posts = [] for post in posts: try: post_date = datetime.fromisoformat(post["timestamp"]) logging.debug(f"Checking post: title={post['title']}, timestamp={post_date}, in range {start_date} to {end_date}") if start_date <= post_date <= end_date: filtered_posts.append(post) logging.debug(f"Included post: {post['title']}") else: logging.debug(f"Excluded post: {post['title']} (timestamp {post_date} outside range)") except (KeyError, ValueError) as e: logging.warning(f"Skipping post due to invalid format: {e}") continue logging.info(f"Filtered to {len(filtered_posts)} posts for the week") return filtered_posts def generate_weekly_thread(): """Generate weekly thread content for each author and save to file on Mondays.""" logging.info("Starting foodie_weekly_thread.py") # Check if today is Monday today = datetime.now(timezone.utc) if today.weekday() != 0: # 0 = Monday logging.info(f"Today is not Monday (weekday: {today.weekday()}), skipping weekly thread") return # Calculate date range: 7 days prior to run date start_date = (today - timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0) end_date = (today - timedelta(days=1)).replace(hour=23, minute=59, second=59, microsecond=999999) logging.info(f"Fetching posts from {start_date} to {end_date}") # Load and filter posts recent_posts = load_json_file(RECENT_POSTS_FILE) logging.info(f"Loaded {len(recent_posts)} posts from {RECENT_POSTS_FILE}") # Deduplicate posts seen = set() deduped_posts = [] for post in recent_posts: key = (post["title"], post["url"], post["author_username"]) if key not in seen: seen.add(key) deduped_posts.append(post) logging.info(f"Filtered to {len(deduped_posts)} unique posts after deduplication") weekly_posts = filter_posts_for_week(deduped_posts, start_date, end_date) if not weekly_posts: logging.warning(f"No posts found within the week range {start_date} to {end_date}, exiting generate_weekly_thread") return # Group posts by author posts_by_author = {author["username"]: [] for author in AUTHORS} for post in weekly_posts: username = post["author_username"] if username in posts_by_author: posts_by_author[username].append(post) # Generate thread content for each author thread_content = [] timestamp = datetime.now(timezone.utc).isoformat() for author in AUTHORS: username = author["username"] author_posts = posts_by_author.get(username, []) if not author_posts: logging.info(f"No posts found for {username}, skipping") continue # Select top 2 posts (to fit within 3-tweet limit: lead + 2 posts) author_posts = sorted(author_posts, key=lambda x: datetime.fromisoformat(x["timestamp"]), reverse=True) selected_posts = author_posts[:2] logging.info(f"Found {len(author_posts)} posts for {username}, selected {len(selected_posts)}") # Generate thread content try: # Generate intro tweet intro_tweet = generate_intro_tweet(author) if not intro_tweet: logging.error(f"Failed to generate intro tweet for {username}, skipping") continue # Generate thread tweets (up to 2) thread_tweets = [] for i, post in enumerate(selected_posts, 1): thread_tweet = ( f"{i}. {post['title']} " f"Read more: {post['url']}" ) if len(thread_tweet) > 280: thread_tweet = f"{i}. {post['title'][:200]}... Read more: {post['url']}" thread_tweets.append(thread_tweet) logging.info(f"Generated thread tweet {i} for {username}: {thread_tweet}") # Generate final CTA tweet final_cta = generate_final_cta(author) if not final_cta: logging.error(f"Failed to generate final CTA tweet for {username}, using fallback") final_cta = ( f"Want more foodie insights? Visit insiderfoodie.com and follow {X_API_CREDENTIALS[username]['x_username']} " f"for top 10 foodie trends every Monday." ) # Collect thread content for this author author_thread = { "username": username, "x_handle": X_API_CREDENTIALS[username]["x_username"], "intro_tweet": intro_tweet, "thread_tweets": thread_tweets, "final_cta": final_cta, "timestamp": timestamp } thread_content.append(author_thread) logging.info(f"Generated thread content for {username}") except Exception as e: logging.error(f"Error generating thread content for {username}: {e}", exc_info=True) continue # Save thread content to file, overwriting any existing content if thread_content: try: # Backup existing file before overwriting if os.path.exists(WEEKLY_THREADS_FILE): backup_dir = "/home/shane/foodie_automator/backups" os.makedirs(backup_dir, exist_ok=True) backup_file = f"{backup_dir}/weekly_threads_{timestamp.replace(':', '-')}.json" shutil.copy(WEEKLY_THREADS_FILE, backup_file) logging.info(f"Backed up existing {WEEKLY_THREADS_FILE} to {backup_file}") # Save new thread content, overwriting the file thread_data = { "week_start": start_date.isoformat(), "week_end": end_date.isoformat(), "timestamp": timestamp, "threads": thread_content } save_json_file(WEEKLY_THREADS_FILE, thread_data) logging.info(f"Saved thread content for {len(thread_content)} authors to {WEEKLY_THREADS_FILE}") except Exception as e: logging.error(f"Failed to save thread content to {WEEKLY_THREADS_FILE}: {e}") else: logging.warning("No thread content generated, nothing to save") logging.info("Completed foodie_weekly_thread.py") def main(): """Main function to run the script.""" lock_fd = None try: lock_fd = acquire_lock() setup_logging() update_system_activity(SCRIPT_NAME, "running", os.getpid()) # Record start generate_weekly_thread() update_system_activity(SCRIPT_NAME, "stopped") # Record stop except Exception as e: logging.error(f"Fatal error in main: {e}", exc_info=True) print(f"Fatal error: {e}") update_system_activity(SCRIPT_NAME, "stopped") # Record stop on error sys.exit(1) finally: if lock_fd: fcntl.flock(lock_fd, fcntl.LOCK_UN) lock_fd.close() os.remove(LOCK_FILE) if os.path.exists(LOCK_FILE) else None if __name__ == "__main__": main()