import json import logging import random import time import sys import signal import os from datetime import datetime, timedelta, timezone from openai import OpenAI import tweepy from foodie_config import OPENAI_API_KEY, AUTHORS, LIGHT_TASK_MODEL from foodie_utils import load_json_file from foodie_x_config import X_API_CREDENTIALS, X_PERSONA_PROMPTS, AUTHOR_BACKGROUNDS_FILE, X_POST_COUNTS_FILE, RECENT_POSTS_FILE from dotenv import load_dotenv load_dotenv() LOG_FILE = "/home/shane/foodie_automator/foodie_x_poster.log" LOG_PRUNE_DAYS = 30 def setup_logging(): if os.path.exists(LOG_FILE): with open(LOG_FILE, 'r') as f: lines = f.readlines() cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS) pruned_lines = [line for line in lines if datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) > cutoff] with open(LOG_FILE, 'w') as f: f.writelines(pruned_lines) logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logging.getLogger().addHandler(console_handler) logging.info("Logging initialized for foodie_x_poster.py") setup_logging() client = OpenAI(api_key=OPENAI_API_KEY) try: with open(AUTHOR_BACKGROUNDS_FILE, 'r') as f: AUTHOR_BACKGROUNDS = json.load(f) except Exception as e: logging.error(f"Failed to load author_backgrounds.json: {e}") sys.exit(1) def load_post_counts(): counts = load_json_file(X_POST_COUNTS_FILE) if not counts: counts = [{"username": author["username"], "count": 0, "month": datetime.now(timezone.utc).strftime("%Y-%m")} for author in AUTHORS] current_month = datetime.now(timezone.utc).strftime("%Y-%m") for entry in counts: if entry["month"] != current_month: entry["count"] = 0 entry["month"] = current_month return counts def save_post_counts(counts): with open(X_POST_COUNTS_FILE, 'w') as f: for item in counts: json.dump(item, f) f.write('\n') logging.info(f"Saved post counts to {X_POST_COUNTS_FILE}") is_posting = False def signal_handler(sig, frame): logging.info("Received termination signal, checking if safe to exit...") if is_posting: logging.info("Currently posting, will exit after completion.") else: logging.info("Safe to exit immediately.") sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def get_recent_posts_for_author(username): posts = load_json_file(RECENT_POSTS_FILE) return [post for post in posts if post["author_username"] == username] def delete_used_post(post_title): posts = load_json_file(RECENT_POSTS_FILE) posts = [post for post in posts if post["title"] != post_title] with open(RECENT_POSTS_FILE, 'w') as f: for item in posts: json.dump(item, f) f.write('\n') logging.info(f"Deleted post '{post_title}' from recent_posts.json") def generate_article_tweet(author, post, persona): prompt = X_PERSONA_PROMPTS[persona].replace( "For article tweets, include the article title, a quirky hook, and the URL.", f"Generate an article tweet including the title '{post['title']}', a quirky hook, and the URL '{post['url']}'." ) try: response = client.chat.completions.create( model=LIGHT_TASK_MODEL, messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": f"Generate tweet for {post['title']}."} ], max_tokens=100, temperature=0.9 ) tweet = response.choices[0].message.content.strip() if len(tweet) > 280: tweet = tweet[:277] + "..." logging.info(f"Generated article tweet for {author['username']}: {tweet}") return tweet except Exception as e: logging.error(f"Failed to generate article tweet for {author['username']}: {e}") return f"This trend is fire! Check out {post['title']} at {post['url']} #Foodie" def generate_personal_tweet(author, persona): background = next((bg for bg in AUTHOR_BACKGROUNDS if bg["username"] == author["username"]), {}) if not background: logging.warning(f"No background found for {author['username']}") return f"Loving my gig at InsiderFoodie, dishing out food trends! #FoodieLife" # Get DOB and calculate age dob = author.get('dob', '1980-01-01') current_year = datetime.now().year birth_year = int(dob.split('-')[0]) age = current_year - birth_year is_role_reflection = random.choice([True, False]) if is_role_reflection: content = f"Reflect on your role at InsiderFoodie as {author['persona']}. Mention you're {age} years old." else: content = ( f"Share a personal story about your background, considering you were born on {dob} and are {age} years old. " f"Hometown: {background['hometown']}, Cultural influences: {background['cultural_influences']}, " f"Early memory: {background['early_memory']}, Career path: {background['career_path']}." ) prompt = X_PERSONA_PROMPTS[persona].replace( "For personal tweets, reflect on your role at InsiderFoodie or background.", content ) try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": f"Generate personal tweet for {author['username']}."} ], max_tokens=100, temperature=0.9 ) tweet = response.choices[0].message.content.strip() if len(tweet) > 280: tweet = tweet[:277] + "..." logging.info(f"Generated personal tweet for {author['username']}: {tweet}") return tweet except Exception as e: logging.error(f"Failed to generate personal tweet for {author['username']}: {e}") return f"Loving my gig at InsiderFoodie, dishing out food trends! #FoodieLife" def post_tweet(author, tweet): global is_posting credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None) if not credentials: logging.error(f"No X credentials found for {author['username']}") return False post_counts = load_post_counts() author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None) if author_count["count"] >= 450: logging.warning(f"Post limit reached for {author['username']} this month") return False try: client = tweepy.Client( consumer_key=credentials["api_key"], consumer_secret=credentials["api_secret"], access_token=credentials["access_token"], access_token_secret=credentials["access_token_secret"] ) is_posting = True response = client.create_tweet(text=tweet) is_posting = False author_count["count"] += 1 save_post_counts(post_counts) logging.info(f"Posted tweet for {author['username']}: {tweet}") return True except Exception as e: is_posting = False logging.error(f"Failed to post tweet for {author['username']}: {e}") return False def main(): logging.info("***** X Poster Launched *****") for author in AUTHORS: posts = get_recent_posts_for_author(author["username"]) if not posts: logging.info(f"No recent posts for {author['username']}, skipping") continue article_tweets = 0 for post in posts[:2]: tweet = generate_article_tweet(author, post, author["persona"]) if post_tweet(author, tweet): delete_used_post(post["title"]) article_tweets += 1 time.sleep(random.uniform(3600, 7200)) if article_tweets >= 2: break tweet = generate_personal_tweet(author, author["persona"]) post_tweet(author, tweet) logging.info("X posting completed") return random.randint(600, 1800) if __name__ == "__main__": sleep_time = main() logging.info(f"Sleeping for {sleep_time} seconds") time.sleep(sleep_time)