# foodie_engagement_tweet.py import json import logging import random import signal import sys import fcntl import os import time from datetime import datetime, timedelta, timezone import tweepy from openai import OpenAI from foodie_utils import post_tweet, load_post_counts, save_post_counts from foodie_config import ( AUTHORS, SUMMARY_MODEL, X_API_CREDENTIALS, AUTHOR_BACKGROUNDS_FILE, PERSONA_CONFIGS, ENGAGEMENT_REFERENCE_DATE_FILE ) from dotenv import load_dotenv load_dotenv() REFERENCE_DATE_FILE = ENGAGEMENT_REFERENCE_DATE_FILE LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_engagement_tweet.lock" LOG_FILE = "/home/shane/foodie_automator/logs/foodie_engagement_tweet.log" LOG_PRUNE_DAYS = 30 MAX_RETRIES = 3 RETRY_BACKOFF = 2 URL = "https://insiderfoodie.com" URL_SHORTENED_LENGTH = 23 # Twitter's shortened URL length def setup_logging(): """Initialize logging with pruning of old logs.""" try: os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) if os.path.exists(LOG_FILE): with open(LOG_FILE, 'r') as f: lines = f.readlines() cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS) pruned_lines = [] malformed_count = 0 for line in lines: if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit(): malformed_count += 1 continue try: timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) if timestamp > cutoff: pruned_lines.append(line) except ValueError: malformed_count += 1 continue if malformed_count > 0: logging.info(f"Skipped {malformed_count} malformed log lines during pruning") with open(LOG_FILE, 'w') as f: f.writelines(pruned_lines) logging.basicConfig( filename=LOG_FILE, level=logging.DEBUG, # Changed to DEBUG to show more details format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logging.getLogger().addHandler(console_handler) logging.getLogger("openai").setLevel(logging.WARNING) logging.getLogger("tweepy").setLevel(logging.WARNING) logging.info("Logging initialized for foodie_engagement_tweet.py") except Exception as e: print(f"Failed to setup logging: {e}") sys.exit(1) def acquire_lock(): """Acquire a lock to prevent concurrent runs.""" os.makedirs(os.path.dirname(LOCK_FILE), exist_ok=True) lock_fd = open(LOCK_FILE, 'w') try: fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) lock_fd.write(str(os.getpid())) lock_fd.flush() return lock_fd except IOError: logging.info("Another instance of foodie_engagement_tweet.py is running") sys.exit(0) def signal_handler(sig, frame): """Handle termination signals gracefully.""" logging.info("Received termination signal, exiting...") sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Initialize OpenAI client try: client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) if not os.getenv("OPENAI_API_KEY"): logging.error("OPENAI_API_KEY is not set in environment variables") raise ValueError("OPENAI_API_KEY is required") except Exception as e: logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True) sys.exit(1) # Load author backgrounds try: if not os.path.exists(AUTHOR_BACKGROUNDS_FILE): logging.error(f"Author backgrounds file not found at {AUTHOR_BACKGROUNDS_FILE}") raise FileNotFoundError(f"Author backgrounds file not found at {AUTHOR_BACKGROUNDS_FILE}") with open(AUTHOR_BACKGROUNDS_FILE, 'r') as f: AUTHOR_BACKGROUNDS = json.load(f) if not isinstance(AUTHOR_BACKGROUNDS, list): logging.error(f"Invalid format in {AUTHOR_BACKGROUNDS_FILE}: Expected a list, got {type(AUTHOR_BACKGROUNDS)}") raise ValueError("Author backgrounds must be a list") for bg in AUTHOR_BACKGROUNDS: if "username" not in bg: logging.error(f"Invalid entry in {AUTHOR_BACKGROUNDS_FILE}: Missing 'username' key in {bg}") raise ValueError("Each author background must have a 'username' key") loaded_usernames = [bg["username"] for bg in AUTHOR_BACKGROUNDS] logging.debug(f"Loaded author backgrounds: {loaded_usernames}") except Exception as e: logging.error(f"Failed to load author_backgrounds.json: {e}", exc_info=True) AUTHOR_BACKGROUNDS = [] sys.exit(1) def validate_twitter_credentials(author): """Validate Twitter API credentials for a specific author.""" username = author["username"] credentials = X_API_CREDENTIALS.get(username) if not credentials: logging.error(f"No X credentials found for {username}") return False for attempt in range(MAX_RETRIES): try: twitter_client = tweepy.Client( consumer_key=credentials["api_key"], consumer_secret=credentials["api_secret"], access_token=credentials["access_token"], access_token_secret=credentials["access_token_secret"] ) user = twitter_client.get_me() logging.info(f"Credentials valid for {username} (handle: {credentials['x_username']})") return True except tweepy.TweepyException as e: logging.warning(f"Failed to validate credentials for {username} (attempt {attempt + 1}): {e}") if attempt < MAX_RETRIES - 1: time.sleep(RETRY_BACKOFF * (2 ** attempt)) else: logging.error(f"Credentials invalid for {username} after {MAX_RETRIES} attempts") return False return False def get_reference_date(): """Load or initialize the reference date for the 2-day interval.""" os.makedirs(os.path.dirname(REFERENCE_DATE_FILE), exist_ok=True) if os.path.exists(REFERENCE_DATE_FILE): try: with open(REFERENCE_DATE_FILE, 'r') as f: data = json.load(f) reference_date = datetime.fromisoformat(data["reference_date"]).replace(tzinfo=timezone.utc) logging.info(f"Loaded reference date: {reference_date.date()}") return reference_date except (json.JSONDecodeError, KeyError, ValueError) as e: logging.error(f"Failed to load reference date from {REFERENCE_DATE_FILE}: {e}. Initializing new date.") reference_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) try: with open(REFERENCE_DATE_FILE, 'w') as f: json.dump({"reference_date": reference_date.isoformat()}, f) logging.info(f"Initialized reference date: {reference_date.date()}") except Exception as e: logging.error(f"Failed to save reference date to {REFERENCE_DATE_FILE}: {e}. Using current date.") return reference_date def generate_engagement_tweet(author): """Generate an engagement tweet using author background themes and persona.""" username = author["username"] if not validate_twitter_credentials(author): logging.error(f"Skipping tweet generation for {username} due to invalid credentials") return None credentials = X_API_CREDENTIALS.get(username) author_handle = credentials["x_username"] persona = author["persona"] persona_config = PERSONA_CONFIGS.get(persona, PERSONA_CONFIGS["Visionary Editor"]) # Case-insensitive lookup for background with whitespace stripping username_cleaned = username.strip().lower() background = next( (bg for bg in AUTHOR_BACKGROUNDS if bg["username"].strip().lower() == username_cleaned), {} ) if not background or "engagement_themes" not in background: available_usernames = [bg["username"] for bg in AUTHOR_BACKGROUNDS] logging.warning( f"No background or engagement themes found for {username}. " f"Attempted username (cleaned): {username_cleaned}. " f"Available usernames: {available_usernames}. Using default theme." ) theme = "food trends" else: theme = random.choice(background["engagement_themes"]) logging.debug(f"Selected engagement theme '{theme}' for {username}") base_prompt = persona_config["x_prompt"].format( description=persona_config["description"], tone=persona_config["tone"] ) prompt = ( f"{base_prompt}\n\n" f"Generate an engagement tweet for {author_handle} asking a question about {theme} to engage the public. " f"Keep it under 230 characters to ensure room for the URL. " f"Use {persona_config['tone']}. " f"Include a call to action to follow {author_handle} or like the tweet, followed by the URL {URL} (do not mention InsiderFoodie.com separately in the text). " f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'. " f"Do not include emojis, hashtags, or reward-driven incentives (e.g., giveaways). " f"Return only the tweet text." ) for attempt in range(MAX_RETRIES): try: response = client.chat.completions.create( model=SUMMARY_MODEL, messages=[ {"role": "system", "content": "You are a social media expert crafting engaging tweets."}, {"role": "user", "content": prompt} ], max_tokens=80, temperature=0.7 ) tweet = response.choices[0].message.content.strip() # Check for duplicate URLs and remove if present url_count = tweet.lower().count(URL.lower()) if url_count > 1: logging.warning(f"Generated tweet for {username} contains duplicate URLs: {tweet}") # Keep only the last occurrence of the URL last_url_pos = tweet.rfind(URL) tweet = tweet[:last_url_pos].replace(URL, "").strip() + " " + URL logging.debug(f"Revised tweet after removing duplicate URL: {tweet}") # Ensure the URL is at the end of the tweet if not tweet.endswith(URL): tweet = tweet.replace(URL, "").strip() + " " + URL # Calculate tweet length considering Twitter's URL shortening tweet_without_url = tweet.replace(URL, "") total_length = len(tweet_without_url) + URL_SHORTENED_LENGTH if total_length > 280: logging.warning(f"Tweet for {username} exceeds 280 characters ({total_length}), truncating") tweet_without_url = tweet_without_url[:(280 - URL_SHORTENED_LENGTH - 3)] tweet = tweet_without_url + "..." + " " + URL total_length = len(tweet_without_url) + 3 + URL_SHORTENED_LENGTH logging.debug(f"Final tweet for {username} (length {total_length}): {tweet}") return tweet except Exception as e: logging.warning(f"Failed to generate engagement tweet for {username} (attempt {attempt + 1}): {e}") if attempt < MAX_RETRIES - 1: time.sleep(RETRY_BACKOFF * (2 ** attempt)) else: logging.error(f"Failed to generate engagement tweet after {MAX_RETRIES} attempts") fallback = ( f"What's the hottest {theme}? Share and follow {author_handle} for more! {URL}" ) # Ensure fallback tweet is within length limits tweet_without_url = fallback.replace(URL, "") total_length = len(tweet_without_url) + URL_SHORTENED_LENGTH if total_length > 280: tweet_without_url = tweet_without_url[:(280 - URL_SHORTENED_LENGTH - 3)] fallback = tweet_without_url + "..." + " " + URL logging.info(f"Using fallback engagement tweet: {fallback}") return fallback return None def post_engagement_tweet(): """Post engagement tweets for authors every 2 days.""" try: logging.info("Starting foodie_engagement_tweet.py") print("Starting foodie_engagement_tweet.py") reference_date = get_reference_date() current_date = datetime.now(timezone.utc) days_since_reference = (current_date - reference_date).days logging.info(f"Days since reference date ({reference_date.date()}): {days_since_reference}") print(f"Days since reference date ({reference_date.date()}): {days_since_reference}") if days_since_reference % 2 == 0: logging.info("Today is an engagement tweet day (every 2 days). Posting...") print("Today is an engagement tweet day (every 2 days). Posting...") post_counts = load_post_counts() for author in AUTHORS: username = author["username"] try: author_count = next((entry for entry in post_counts if entry["username"] == username), None) if not author_count: logging.warning(f"No post count entry for {username}, initializing new entry") author_count = { "username": username, "month": datetime.now(timezone.utc).strftime("%Y-%m"), "monthly_count": 0, "day": datetime.now(timezone.utc).strftime("%Y-%m-%d"), "daily_count": 0 } post_counts.append(author_count) save_post_counts(post_counts) if author_count["monthly_count"] >= 500: logging.warning(f"Monthly post limit (500) reached for {username}, skipping") continue if author_count["daily_count"] >= 20: logging.warning(f"Daily post limit (20) reached for {username}, skipping") continue tweet = generate_engagement_tweet(author) if not tweet: logging.error(f"Failed to generate engagement tweet for {username}, skipping") continue logging.info(f"Posting engagement tweet for {username}: {tweet}") print(f"Posting engagement tweet for {username}: {tweet}") if post_tweet(author, tweet): logging.info(f"Successfully posted engagement tweet for {username}") author_count["monthly_count"] += 1 author_count["daily_count"] += 1 save_post_counts(post_counts) else: logging.warning(f"Failed to post engagement tweet for {username}") except Exception as e: logging.error(f"Error posting engagement tweet for {username}: {e}", exc_info=True) continue else: logging.info(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...") print(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...") logging.info("Completed foodie_engagement_tweet.py") print("Completed foodie_engagement_tweet.py") except Exception as e: logging.error(f"Unexpected error in post_engagement_tweet: {e}", exc_info=True) print(f"Error in post_engagement_tweet: {e}") def main(): """Main function to run the script.""" lock_fd = None try: lock_fd = acquire_lock() setup_logging() post_engagement_tweet() except Exception as e: logging.error(f"Fatal error in main: {e}", exc_info=True) print(f"Fatal error: {e}") sys.exit(1) finally: if lock_fd: fcntl.flock(lock_fd, fcntl.LOCK_UN) lock_fd.close() os.remove(LOCK_FILE) if os.path.exists(LOCK_FILE) else None if __name__ == "__main__": main()