# foodie_engagement_tweet.py import json import logging import random import signal import sys import fcntl import os import time from datetime import datetime, timedelta, timezone import tweepy from openai import OpenAI from foodie_utils import post_tweet, load_post_counts, save_post_counts from foodie_config import ( AUTHORS, SUMMARY_MODEL, X_API_CREDENTIALS, AUTHOR_BACKGROUNDS_FILE, PERSONA_CONFIGS, ENGAGEMENT_REFERENCE_DATE_FILE ) from dotenv import load_dotenv load_dotenv() REFERENCE_DATE_FILE = ENGAGEMENT_REFERENCE_DATE_FILE LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_engagement_tweet.lock" LOG_FILE = "/home/shane/foodie_automator/logs/foodie_engagement_tweet.log" LOG_PRUNE_DAYS = 30 MAX_RETRIES = 3 RETRY_BACKOFF = 2 def setup_logging(): """Initialize logging with pruning of old logs.""" try: os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) if os.path.exists(LOG_FILE): with open(LOG_FILE, 'r') as f: lines = f.readlines() cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS) pruned_lines = [] malformed_count = 0 for line in lines: if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit(): malformed_count += 1 continue try: timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) if timestamp > cutoff: pruned_lines.append(line) except ValueError: malformed_count += 1 continue if malformed_count > 0: logging.info(f"Skipped {malformed_count} malformed log lines during pruning") with open(LOG_FILE, 'w') as f: f.writelines(pruned_lines) logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logging.getLogger().addHandler(console_handler) logging.getLogger("openai").setLevel(logging.WARNING) logging.getLogger("tweepy").setLevel(logging.WARNING) logging.info("Logging initialized for foodie_engagement_tweet.py") except Exception as e: print(f"Failed to setup logging: {e}") sys.exit(1) def acquire_lock(): """Acquire a lock to prevent concurrent runs.""" os.makedirs(os.path.dirname(LOCK_FILE), exist_ok=True) lock_fd = open(LOCK_FILE, 'w') try: fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) lock_fd.write(str(os.getpid())) lock_fd.flush() return lock_fd except IOError: logging.info("Another instance of foodie_engagement_tweet.py is running") sys.exit(0) def signal_handler(sig, frame): """Handle termination signals gracefully.""" logging.info("Received termination signal, exiting...") sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Initialize OpenAI client try: client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) if not os.getenv("OPENAI_API_KEY"): logging.error("OPENAI_API_KEY is not set in environment variables") raise ValueError("OPENAI_API_KEY is required") except Exception as e: logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True) sys.exit(1) # Load author backgrounds try: with open(AUTHOR_BACKGROUNDS_FILE, 'r') as f: AUTHOR_BACKGROUNDS = json.load(f) logging.debug(f"Loaded author backgrounds: {[bg['username'] for bg in AUTHOR_BACKGROUNDS]}") except Exception as e: logging.error(f"Failed to load author_backgrounds.json: {e}", exc_info=True) AUTHOR_BACKGROUNDS = [] sys.exit(1) def validate_twitter_credentials(author): """Validate Twitter API credentials for a specific author.""" username = author["username"] credentials = X_API_CREDENTIALS.get(username) if not credentials: logging.error(f"No X credentials found for {username}") return False for attempt in range(MAX_RETRIES): try: twitter_client = tweepy.Client( consumer_key=credentials["api_key"], consumer_secret=credentials["api_secret"], access_token=credentials["access_token"], access_token_secret=credentials["access_token_secret"] ) user = twitter_client.get_me() logging.info(f"Credentials valid for {username} (handle: {credentials['x_username']})") return True except tweepy.TweepyException as e: logging.warning(f"Failed to validate credentials for {username} (attempt {attempt + 1}): {e}") if attempt < MAX_RETRIES - 1: time.sleep(RETRY_BACKOFF * (2 ** attempt)) else: logging.error(f"Credentials invalid for {username} after {MAX_RETRIES} attempts") return False return False def get_reference_date(): """Load or initialize the reference date for the 2-day interval.""" os.makedirs(os.path.dirname(REFERENCE_DATE_FILE), exist_ok=True) if os.path.exists(REFERENCE_DATE_FILE): try: with open(REFERENCE_DATE_FILE, 'r') as f: data = json.load(f) reference_date = datetime.fromisoformat(data["reference_date"]).replace(tzinfo=timezone.utc) logging.info(f"Loaded reference date: {reference_date.date()}") return reference_date except (json.JSONDecodeError, KeyError, ValueError) as e: logging.error(f"Failed to load reference date from {REFERENCE_DATE_FILE}: {e}. Initializing new date.") reference_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) try: with open(REFERENCE_DATE_FILE, 'w') as f: json.dump({"reference_date": reference_date.isoformat()}, f) logging.info(f"Initialized reference date: {reference_date.date()}") except Exception as e: logging.error(f"Failed to save reference date to {REFERENCE_DATE_FILE}: {e}. Using current date.") return reference_date def generate_engagement_tweet(author): """Generate an engagement tweet using author background themes and persona.""" username = author["username"] if not validate_twitter_credentials(author): logging.error(f"Skipping tweet generation for {username} due to invalid credentials") return None credentials = X_API_CREDENTIALS.get(username) author_handle = credentials["x_username"] persona = author["persona"] persona_config = PERSONA_CONFIGS.get(persona, PERSONA_CONFIGS["Visionary Editor"]) # Case-insensitive lookup for background background = next( (bg for bg in AUTHOR_BACKGROUNDS if bg["username"].lower() == username.lower()), {} ) if not background or "engagement_themes" not in background: logging.warning(f"No background or engagement themes found for {username}, using default theme") theme = "food trends" else: theme = random.choice(background["engagement_themes"]) logging.debug(f"Selected engagement theme '{theme}' for {username}") base_prompt = persona_config["x_prompt"].format( description=persona_config["description"], tone=persona_config["tone"] ) prompt = ( f"{base_prompt}\n\n" f"Generate an engagement tweet for {author_handle} asking a question about {theme} to engage the public. " f"Keep it under 230 characters to ensure room for the URL. " f"Use {persona_config['tone']}. " f"Include a call to action to follow {author_handle} or like the tweet, and mention InsiderFoodie.com with a link to https://insiderfoodie.com. " f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'. " f"Do not include emojis, hashtags, or reward-driven incentives (e.g., giveaways). " f"Return only the tweet text." ) for attempt in range(MAX_RETRIES): try: response = client.chat.completions.create( model=SUMMARY_MODEL, messages=[ {"role": "system", "content": "You are a social media expert crafting engaging tweets."}, {"role": "user", "content": prompt} ], max_tokens=80, # Reduced to ensure shorter tweets temperature=0.7 ) tweet = response.choices[0].message.content.strip() # Ensure tweet length is within limits (accounting for URL) url_length = 23 # Twitter shortens URLs if len(tweet) > (280 - url_length): tweet = tweet[:(280 - url_length - 3)] + "..." logging.debug(f"Generated engagement tweet for {username}: {tweet}") return tweet except Exception as e: logging.warning(f"Failed to generate engagement tweet for {username} (attempt {attempt + 1}): {e}") if attempt < MAX_RETRIES - 1: time.sleep(RETRY_BACKOFF * (2 ** attempt)) else: logging.error(f"Failed to generate engagement tweet after {MAX_RETRIES} attempts") fallback = ( f"What's the hottest {theme}? Share and follow {author_handle} for more on InsiderFoodie.com! " f"https://insiderfoodie.com" ) if len(fallback) > (280 - url_length): fallback = fallback[:(280 - url_length - 3)] + "..." logging.info(f"Using fallback engagement tweet: {fallback}") return fallback return None def post_engagement_tweet(): """Post engagement tweets for authors every 2 days.""" try: logging.info("Starting foodie_engagement_tweet.py") print("Starting foodie_engagement_tweet.py") reference_date = get_reference_date() current_date = datetime.now(timezone.utc) days_since_reference = (current_date - reference_date).days logging.info(f"Days since reference date ({reference_date.date()}): {days_since_reference}") print(f"Days since reference date ({reference_date.date()}): {days_since_reference}") if days_since_reference % 2 == 0: logging.info("Today is an engagement tweet day (every 2 days). Posting...") print("Today is an engagement tweet day (every 2 days). Posting...") post_counts = load_post_counts() for author in AUTHORS: username = author["username"] try: author_count = next((entry for entry in post_counts if entry["username"] == username), None) if not author_count: logging.warning(f"No post count entry for {username}, initializing new entry") author_count = { "username": username, "month": datetime.now(timezone.utc).strftime("%Y-%m"), "monthly_count": 0, "day": datetime.now(timezone.utc).strftime("%Y-%m-%d"), "daily_count": 0 } post_counts.append(author_count) save_post_counts(post_counts) if author_count["monthly_count"] >= 500: logging.warning(f"Monthly post limit (500) reached for {username}, skipping") continue if author_count["daily_count"] >= 20: logging.warning(f"Daily post limit (20) reached for {username}, skipping") continue tweet = generate_engagement_tweet(author) if not tweet: logging.error(f"Failed to generate engagement tweet for {username}, skipping") continue logging.info(f"Posting engagement tweet for {username}: {tweet}") print(f"Posting engagement tweet for {username}: {tweet}") if post_tweet(author, tweet): logging.info(f"Successfully posted engagement tweet for {username}") author_count["monthly_count"] += 1 author_count["daily_count"] += 1 save_post_counts(post_counts) else: logging.warning(f"Failed to post engagement tweet for {username}") except Exception as e: logging.error(f"Error posting engagement tweet for {username}: {e}", exc_info=True) continue else: logging.info(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...") print(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...") logging.info("Completed foodie_engagement_tweet.py") print("Completed foodie_engagement_tweet.py") except Exception as e: logging.error(f"Unexpected error in post_engagement_tweet: {e}", exc_info=True) print(f"Error in post_engagement_tweet: {e}") def main(): """Main function to run the script.""" lock_fd = None try: lock_fd = acquire_lock() setup_logging() post_engagement_tweet() except Exception as e: logging.error(f"Fatal error in main: {e}", exc_info=True) print(f"Fatal error: {e}") sys.exit(1) finally: if lock_fd: fcntl.flock(lock_fd, fcntl.LOCK_UN) lock_fd.close() os.remove(LOCK_FILE) if os.path.exists(LOCK_FILE) else None if __name__ == "__main__": main()