395 lines
17 KiB
Python
395 lines
17 KiB
Python
# foodie_weekly_thread.py
|
|
import json
|
|
import os
|
|
import logging
|
|
import random
|
|
import signal
|
|
import sys
|
|
import fcntl
|
|
import time
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
from openai import OpenAI
|
|
from foodie_utils import AUTHORS, SUMMARY_MODEL, load_json_file, save_json_file, update_system_activity
|
|
from foodie_config import X_API_CREDENTIALS, RECENT_POSTS_FILE
|
|
from dotenv import load_dotenv
|
|
import shutil
|
|
|
|
load_dotenv()
|
|
|
|
SCRIPT_NAME = "foodie_weekly_thread"
|
|
LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_weekly_thread.lock"
|
|
LOG_FILE = "/home/shane/foodie_automator/logs/foodie_weekly_thread.log"
|
|
WEEKLY_THREADS_FILE = "/home/shane/foodie_automator/weekly_threads.json"
|
|
LOG_PRUNE_DAYS = 30
|
|
MAX_RETRIES = 3
|
|
RETRY_BACKOFF = 2
|
|
|
|
def setup_logging():
|
|
"""Initialize logging with pruning of old logs."""
|
|
try:
|
|
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
|
|
if os.path.exists(LOG_FILE):
|
|
with open(LOG_FILE, 'r') as f:
|
|
lines = f.readlines()
|
|
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
|
|
pruned_lines = []
|
|
malformed_count = 0
|
|
for line in lines:
|
|
if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit():
|
|
malformed_count += 1
|
|
continue
|
|
try:
|
|
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
|
|
if timestamp > cutoff:
|
|
pruned_lines.append(line)
|
|
except ValueError:
|
|
malformed_count += 1
|
|
continue
|
|
if malformed_count > 0:
|
|
logging.info(f"Skipped {malformed_count} malformed log lines during pruning")
|
|
with open(LOG_FILE, 'w') as f:
|
|
f.writelines(pruned_lines)
|
|
|
|
logging.basicConfig(
|
|
filename=LOG_FILE,
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
logging.getLogger().addHandler(console_handler)
|
|
logging.getLogger("openai").setLevel(logging.WARNING)
|
|
logging.info("Logging initialized for foodie_weekly_thread.py")
|
|
except Exception as e:
|
|
print(f"Failed to setup logging: {e}")
|
|
sys.exit(1)
|
|
|
|
def acquire_lock():
|
|
"""Acquire a lock to prevent concurrent runs."""
|
|
os.makedirs(os.path.dirname(LOCK_FILE), exist_ok=True)
|
|
lock_fd = open(LOCK_FILE, 'w')
|
|
try:
|
|
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
lock_fd.write(str(os.getpid()))
|
|
lock_fd.flush()
|
|
return lock_fd
|
|
except IOError:
|
|
logging.info("Another instance of foodie_weekly_thread.py is running")
|
|
sys.exit(0)
|
|
|
|
def signal_handler(sig, frame):
|
|
"""Handle termination signals gracefully."""
|
|
logging.info("Received termination signal, marking script as stopped...")
|
|
update_system_activity(SCRIPT_NAME, "stopped")
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
# Initialize OpenAI client
|
|
try:
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
logging.error("OPENAI_API_KEY is not set in environment variables")
|
|
raise ValueError("OPENAI_API_KEY is required")
|
|
except Exception as e:
|
|
logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True)
|
|
sys.exit(1)
|
|
|
|
def generate_intro_tweet(author):
|
|
"""Generate an intro tweet for the weekly thread."""
|
|
credentials = X_API_CREDENTIALS.get(author["username"])
|
|
if not credentials:
|
|
logging.error(f"No X credentials found for {author['username']}")
|
|
return None
|
|
author_handle = credentials["x_username"]
|
|
logging.debug(f"Generating intro tweet for {author_handle}")
|
|
|
|
prompt = (
|
|
f"Generate a concise tweet (under 200 characters) for {author_handle}. "
|
|
f"Introduce a thread of their top 10 foodie posts of the week on InsiderFoodie.com. "
|
|
f"Make it engaging, create curiosity, and include a call to action to visit InsiderFoodie.com or follow {author_handle}. "
|
|
f"Avoid using the word 'elevate'—use humanized language like 'level up' or 'bring to life'. "
|
|
f"Strictly exclude emojis, hashtags, or reward-driven incentives (e.g., giveaways)."
|
|
)
|
|
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=SUMMARY_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
max_tokens=150,
|
|
temperature=0.7
|
|
)
|
|
tweet = response.choices[0].message.content.strip()
|
|
tweet = re.sub(r'[\U0001F000-\U0001FFFF]', '', tweet) # Remove emojis
|
|
if len(tweet) > 280:
|
|
tweet = tweet[:277] + "..."
|
|
logging.debug(f"Generated intro tweet: {tweet}")
|
|
return tweet
|
|
except Exception as e:
|
|
logging.warning(f"Failed to generate intro tweet for {author['username']} (attempt {attempt + 1}): {e}")
|
|
if attempt < MAX_RETRIES - 1:
|
|
time.sleep(RETRY_BACKOFF * (2 ** attempt))
|
|
else:
|
|
logging.error(f"Failed to generate intro tweet after {MAX_RETRIES} attempts")
|
|
fallback = (
|
|
f"Top 10 foodie posts this week by {author_handle}! Visit InsiderFoodie.com and follow {author_handle} for more."
|
|
)
|
|
logging.info(f"Using fallback intro tweet: {fallback}")
|
|
return fallback
|
|
|
|
def generate_final_cta(author):
|
|
"""Generate a final CTA tweet for the weekly thread using GPT."""
|
|
credentials = X_API_CREDENTIALS.get(author["username"])
|
|
if not credentials:
|
|
logging.error(f"No X credentials found for {author['username']}")
|
|
return None
|
|
author_handle = credentials["x_username"]
|
|
logging.debug(f"Generating final CTA tweet for {author_handle}")
|
|
|
|
prompt = (
|
|
f"Generate a concise tweet (under 200 characters) for {author_handle}. "
|
|
f"Conclude a thread of their top 10 foodie posts of the week on InsiderFoodie.com. "
|
|
f"Make it engaging, value-driven, in the style of Neil Patel. "
|
|
f"Include a call to action to visit InsiderFoodie.com and follow {author_handle}. "
|
|
f"Mention that top 10 foodie trends are shared every Monday. "
|
|
f"Avoid using the word 'elevate'—use humanized language like 'level up' or 'bring to life'. "
|
|
f"Strictly exclude emojis, hashtags, or reward-driven incentives (e.g., giveaways)."
|
|
)
|
|
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=SUMMARY_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
max_tokens=150,
|
|
temperature=0.7
|
|
)
|
|
tweet = response.choices[0].message.content.strip()
|
|
tweet = re.sub(r'[\U0001F000-\U0001FFFF]', '', tweet) # Remove emojis
|
|
if len(tweet) > 280:
|
|
tweet = tweet[:277] + "..."
|
|
logging.debug(f"Generated final CTA tweet: {tweet}")
|
|
return tweet
|
|
except Exception as e:
|
|
logging.warning(f"Failed to generate final CTA tweet for {author['username']} (attempt {attempt + 1}): {e}")
|
|
if attempt < MAX_RETRIES - 1:
|
|
time.sleep(RETRY_BACKOFF * (2 ** attempt))
|
|
else:
|
|
logging.error(f"Failed to generate final CTA tweet after {MAX_RETRIES} attempts")
|
|
fallback = (
|
|
f"Want more foodie insights? Visit insiderfoodie.com and follow {author_handle} "
|
|
f"for top 10 foodie trends every Monday."
|
|
)
|
|
logging.info(f"Using fallback final CTA tweet: {fallback}")
|
|
return fallback
|
|
|
|
def load_recent_posts():
|
|
"""Load and deduplicate posts from recent_posts.json."""
|
|
logging.debug(f"Attempting to load posts from {RECENT_POSTS_FILE}")
|
|
posts = load_json_file(RECENT_POSTS_FILE)
|
|
|
|
if not posts:
|
|
logging.warning(f"No valid posts loaded from {RECENT_POSTS_FILE}")
|
|
return []
|
|
|
|
# Deduplicate posts
|
|
unique_posts = {}
|
|
for post in posts:
|
|
try:
|
|
required_fields = ["title", "url", "author_username", "timestamp"]
|
|
if not all(key in post for key in required_fields):
|
|
logging.warning(f"Skipping invalid post: missing fields {post}")
|
|
continue
|
|
datetime.fromisoformat(post["timestamp"].replace('Z', '+00:00'))
|
|
key = (post["title"], post["url"], post["author_username"])
|
|
if key not in unique_posts:
|
|
unique_posts[key] = post
|
|
else:
|
|
logging.debug(f"Skipping duplicate post: {post['title']}")
|
|
except (KeyError, ValueError) as e:
|
|
logging.warning(f"Skipping post due to invalid format: {e}")
|
|
continue
|
|
|
|
deduped_posts = list(unique_posts.values())
|
|
logging.info(f"Loaded {len(deduped_posts)} unique posts from {RECENT_POSTS_FILE}")
|
|
return deduped_posts
|
|
|
|
def filter_posts_for_week(posts, start_date, end_date):
|
|
"""Filter posts within the given week range."""
|
|
filtered_posts = []
|
|
for post in posts:
|
|
try:
|
|
post_date = datetime.fromisoformat(post["timestamp"])
|
|
logging.debug(f"Checking post: title={post['title']}, timestamp={post_date}, in range {start_date} to {end_date}")
|
|
if start_date <= post_date <= end_date:
|
|
filtered_posts.append(post)
|
|
logging.debug(f"Included post: {post['title']}")
|
|
else:
|
|
logging.debug(f"Excluded post: {post['title']} (timestamp {post_date} outside range)")
|
|
except (KeyError, ValueError) as e:
|
|
logging.warning(f"Skipping post due to invalid format: {e}")
|
|
continue
|
|
logging.info(f"Filtered to {len(filtered_posts)} posts for the week")
|
|
return filtered_posts
|
|
|
|
def generate_weekly_thread():
|
|
"""Generate weekly thread content for each author and save to file on Mondays."""
|
|
logging.info("Starting foodie_weekly_thread.py")
|
|
|
|
# Check if today is Monday
|
|
today = datetime.now(timezone.utc)
|
|
if today.weekday() != 0: # 0 = Monday
|
|
logging.info(f"Today is not Monday (weekday: {today.weekday()}), skipping weekly thread")
|
|
return
|
|
|
|
# Calculate date range: 7 days prior to run date
|
|
start_date = (today - timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
end_date = (today - timedelta(days=1)).replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
logging.info(f"Fetching posts from {start_date} to {end_date}")
|
|
|
|
# Load and filter posts
|
|
recent_posts = load_json_file(RECENT_POSTS_FILE)
|
|
logging.info(f"Loaded {len(recent_posts)} posts from {RECENT_POSTS_FILE}")
|
|
|
|
# Deduplicate posts
|
|
seen = set()
|
|
deduped_posts = []
|
|
for post in recent_posts:
|
|
key = (post["title"], post["url"], post["author_username"])
|
|
if key not in seen:
|
|
seen.add(key)
|
|
deduped_posts.append(post)
|
|
logging.info(f"Filtered to {len(deduped_posts)} unique posts after deduplication")
|
|
|
|
weekly_posts = filter_posts_for_week(deduped_posts, start_date, end_date)
|
|
if not weekly_posts:
|
|
logging.warning(f"No posts found within the week range {start_date} to {end_date}, exiting generate_weekly_thread")
|
|
return
|
|
|
|
# Group posts by author
|
|
posts_by_author = {author["username"]: [] for author in AUTHORS}
|
|
for post in weekly_posts:
|
|
username = post["author_username"]
|
|
if username in posts_by_author:
|
|
posts_by_author[username].append(post)
|
|
|
|
# Generate thread content for each author
|
|
thread_content = []
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
for author in AUTHORS:
|
|
username = author["username"]
|
|
author_posts = posts_by_author.get(username, [])
|
|
if not author_posts:
|
|
logging.info(f"No posts found for {username}, skipping")
|
|
continue
|
|
|
|
# Select top 2 posts (to fit within 3-tweet limit: lead + 2 posts)
|
|
author_posts = sorted(author_posts, key=lambda x: datetime.fromisoformat(x["timestamp"]), reverse=True)
|
|
selected_posts = author_posts[:2]
|
|
logging.info(f"Found {len(author_posts)} posts for {username}, selected {len(selected_posts)}")
|
|
|
|
# Generate thread content
|
|
try:
|
|
# Generate intro tweet
|
|
intro_tweet = generate_intro_tweet(author)
|
|
if not intro_tweet:
|
|
logging.error(f"Failed to generate intro tweet for {username}, skipping")
|
|
continue
|
|
|
|
# Generate thread tweets (up to 2)
|
|
thread_tweets = []
|
|
for i, post in enumerate(selected_posts, 1):
|
|
thread_tweet = (
|
|
f"{i}. {post['title']} "
|
|
f"Read more: {post['url']}"
|
|
)
|
|
if len(thread_tweet) > 280:
|
|
thread_tweet = f"{i}. {post['title'][:200]}... Read more: {post['url']}"
|
|
thread_tweets.append(thread_tweet)
|
|
logging.info(f"Generated thread tweet {i} for {username}: {thread_tweet}")
|
|
|
|
# Generate final CTA tweet
|
|
final_cta = generate_final_cta(author)
|
|
if not final_cta:
|
|
logging.error(f"Failed to generate final CTA tweet for {username}, using fallback")
|
|
final_cta = (
|
|
f"Want more foodie insights? Visit insiderfoodie.com and follow {X_API_CREDENTIALS[username]['x_username']} "
|
|
f"for top 10 foodie trends every Monday."
|
|
)
|
|
|
|
# Collect thread content for this author
|
|
author_thread = {
|
|
"username": username,
|
|
"x_handle": X_API_CREDENTIALS[username]["x_username"],
|
|
"intro_tweet": intro_tweet,
|
|
"thread_tweets": thread_tweets,
|
|
"final_cta": final_cta,
|
|
"timestamp": timestamp
|
|
}
|
|
thread_content.append(author_thread)
|
|
logging.info(f"Generated thread content for {username}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error generating thread content for {username}: {e}", exc_info=True)
|
|
continue
|
|
|
|
# Save thread content to file, overwriting any existing content
|
|
if thread_content:
|
|
try:
|
|
# Backup existing file before overwriting
|
|
if os.path.exists(WEEKLY_THREADS_FILE):
|
|
backup_dir = "/home/shane/foodie_automator/backups"
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
backup_file = f"{backup_dir}/weekly_threads_{timestamp.replace(':', '-')}.json"
|
|
shutil.copy(WEEKLY_THREADS_FILE, backup_file)
|
|
logging.info(f"Backed up existing {WEEKLY_THREADS_FILE} to {backup_file}")
|
|
|
|
# Save new thread content, overwriting the file
|
|
thread_data = {
|
|
"week_start": start_date.isoformat(),
|
|
"week_end": end_date.isoformat(),
|
|
"timestamp": timestamp,
|
|
"threads": thread_content
|
|
}
|
|
save_json_file(WEEKLY_THREADS_FILE, thread_data)
|
|
logging.info(f"Saved thread content for {len(thread_content)} authors to {WEEKLY_THREADS_FILE}")
|
|
except Exception as e:
|
|
logging.error(f"Failed to save thread content to {WEEKLY_THREADS_FILE}: {e}")
|
|
else:
|
|
logging.warning("No thread content generated, nothing to save")
|
|
|
|
logging.info("Completed foodie_weekly_thread.py")
|
|
|
|
def main():
|
|
"""Main function to run the script."""
|
|
lock_fd = None
|
|
try:
|
|
lock_fd = acquire_lock()
|
|
setup_logging()
|
|
update_system_activity(SCRIPT_NAME, "running", os.getpid()) # Record start
|
|
generate_weekly_thread()
|
|
update_system_activity(SCRIPT_NAME, "stopped") # Record stop
|
|
except Exception as e:
|
|
logging.error(f"Fatal error in main: {e}", exc_info=True)
|
|
print(f"Fatal error: {e}")
|
|
update_system_activity(SCRIPT_NAME, "stopped") # Record stop on error
|
|
sys.exit(1)
|
|
finally:
|
|
if lock_fd:
|
|
fcntl.flock(lock_fd, fcntl.LOCK_UN)
|
|
lock_fd.close()
|
|
os.remove(LOCK_FILE) if os.path.exists(LOCK_FILE) else None
|
|
|
|
if __name__ == "__main__":
|
|
main() |