You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

433 lines
20 KiB

# foodie_weekly_thread.py
import json
import os
import logging
import random
import signal
import sys
import fcntl
import time
from datetime import datetime, timedelta, timezone
import tweepy
from openai import OpenAI
from foodie_utils import post_tweet, AUTHORS, SUMMARY_MODEL
from foodie_config import X_API_CREDENTIALS
from dotenv import load_dotenv
load_dotenv()
LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_weekly_thread.lock"
LOG_FILE = "/home/shane/foodie_automator/logs/foodie_weekly_thread.log"
LOG_PRUNE_DAYS = 30
MAX_RETRIES = 3
RETRY_BACKOFF = 2
RECENT_POSTS_FILE = "/home/shane/foodie_automator/recent_posts.json"
def setup_logging():
"""Initialize logging with pruning of old logs."""
try:
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = []
malformed_count = 0
for line in lines:
if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit():
malformed_count += 1
continue
try:
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
if timestamp > cutoff:
pruned_lines.append(line)
except ValueError:
malformed_count += 1
continue
if malformed_count > 0:
logging.info(f"Skipped {malformed_count} malformed log lines during pruning")
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logging.getLogger("tweepy").setLevel(logging.WARNING)
logging.info("Logging initialized for foodie_weekly_thread.py")
except Exception as e:
print(f"Failed to setup logging: {e}")
sys.exit(1)
def acquire_lock():
"""Acquire a lock to prevent concurrent runs."""
os.makedirs(os.path.dirname(LOCK_FILE), exist_ok=True)
lock_fd = open(LOCK_FILE, 'w')
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
lock_fd.write(str(os.getpid()))
lock_fd.flush()
return lock_fd
except IOError:
logging.info("Another instance of foodie_weekly_thread.py is running")
sys.exit(0)
def signal_handler(sig, frame):
"""Handle termination signals gracefully."""
logging.info("Received termination signal, exiting...")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Initialize OpenAI client
try:
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
if not os.getenv("OPENAI_API_KEY"):
logging.error("OPENAI_API_KEY is not set in environment variables")
raise ValueError("OPENAI_API_KEY is required")
except Exception as e:
logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True)
sys.exit(1)
def validate_twitter_credentials():
"""Validate Twitter API credentials for all authors."""
logging.info("Validating Twitter API credentials for all authors")
valid_credentials = []
for author in AUTHORS:
credentials = X_API_CREDENTIALS.get(author["username"])
if not credentials:
logging.error(f"No X credentials found for {author['username']} in X_API_CREDENTIALS")
continue
for attempt in range(MAX_RETRIES):
try:
twitter_client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
user = twitter_client.get_me()
logging.info(f"Credentials valid for {author['username']} (handle: {credentials['x_username']})")
valid_credentials.append(credentials)
break
except tweepy.TweepyException as e:
logging.error(f"Failed to validate credentials for {author['username']} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Credentials invalid for {author['username']} after {MAX_RETRIES} attempts")
if not valid_credentials:
logging.error("No valid Twitter credentials found for any author")
raise ValueError("No valid Twitter credentials found")
return valid_credentials
def load_recent_posts():
"""Load and deduplicate posts from recent_posts.json."""
posts = []
unique_posts = {}
logging.debug(f"Attempting to load posts from {RECENT_POSTS_FILE}")
if not os.path.exists(RECENT_POSTS_FILE):
logging.error(f"Recent posts file {RECENT_POSTS_FILE} does not exist")
return posts
if not os.access(RECENT_POSTS_FILE, os.R_OK):
logging.error(f"Cannot read {RECENT_POSTS_FILE} due to permission issues")
return posts
try:
with open(RECENT_POSTS_FILE, 'r') as f:
lines = f.readlines()
logging.debug(f"Read {len(lines)} lines from {RECENT_POSTS_FILE}")
for i, line in enumerate(lines, 1):
if not line.strip():
logging.debug(f"Skipping empty line {i} in {RECENT_POSTS_FILE}")
continue
try:
entry = json.loads(line.strip())
required_fields = ["title", "url", "author_username", "timestamp"]
if not all(key in entry for key in required_fields):
logging.warning(f"Skipping invalid entry at line {i}: missing fields {entry}")
continue
try:
datetime.fromisoformat(entry["timestamp"])
except ValueError:
logging.warning(f"Skipping entry at line {i}: invalid timestamp {entry['timestamp']}")
continue
key = (entry["title"], entry["url"], entry["author_username"])
if key in unique_posts:
logging.debug(f"Skipping duplicate entry at line {i}: {entry['title']}")
continue
unique_posts[key] = entry
posts.append(entry)
except json.JSONDecodeError as e:
logging.warning(f"Skipping invalid JSON at line {i}: {e}")
continue
logging.info(f"Loaded {len(posts)} unique posts from {RECENT_POSTS_FILE} (after deduplication)")
except Exception as e:
logging.error(f"Failed to load {RECENT_POSTS_FILE}: {e}", exc_info=True)
return posts
if not posts:
logging.warning(f"No valid posts loaded from {RECENT_POSTS_FILE}")
return posts
def filter_posts_for_week(posts, start_date, end_date):
"""Filter posts within the specified week."""
filtered_posts = []
logging.debug(f"Filtering {len(posts)} posts for range {start_date} to {end_date}")
for post in posts:
try:
timestamp = datetime.fromisoformat(post["timestamp"])
logging.debug(f"Checking post '{post['title']}' with timestamp {timestamp}")
if start_date <= timestamp <= end_date:
filtered_posts.append(post)
else:
logging.debug(f"Post '{post['title']}' timestamp {timestamp} outside range")
except ValueError as e:
logging.warning(f"Skipping post with invalid timestamp: {post.get('title', 'Unknown')} - {e}")
logging.info(f"Filtered to {len(filtered_posts)} posts within week range")
return filtered_posts
def generate_intro_tweet(author):
"""Generate an intro tweet for the weekly thread."""
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return None
author_handle = credentials["x_username"]
logging.debug(f"Generating intro tweet for {author_handle}")
prompt = (
f"Generate a concise tweet (under 280 characters) for {author_handle}. "
f"Introduce a thread of their top 10 foodie posts of the week on InsiderFoodie.com. "
f"Make it engaging, create curiosity, and include a call to action to visit InsiderFoodie.com, follow {author_handle}, or like the thread. "
f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'. "
f"Do not include emojis, hashtags, or reward-driven incentives (e.g., giveaways)."
)
for attempt in range(MAX_RETRIES):
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.debug(f"Generated intro tweet: {tweet}")
return tweet
except Exception as e:
logging.warning(f"Failed to generate intro tweet for {author['username']} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Failed to generate intro tweet after {MAX_RETRIES} attempts")
fallback = (
f"This week's top 10 foodie finds by {author_handle}! Check out the best on InsiderFoodie.com. "
f"Follow {author_handle} for more and like this thread to stay in the loop! Visit us at https://insiderfoodie.com"
)
logging.info(f"Using fallback intro tweet: {fallback}")
return fallback
def generate_final_cta(author):
"""Generate a final CTA tweet for the weekly thread using GPT."""
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return None
author_handle = credentials["x_username"]
logging.debug(f"Generating final CTA tweet for {author_handle}")
prompt = (
f"Generate a concise tweet (under 280 characters) for {author_handle}. "
f"Conclude a thread of their top 10 foodie posts of the week on InsiderFoodie.com. "
f"Make it engaging, value-driven, and urgent, in the style of Neil Patel. "
f"Include a call to action to visit InsiderFoodie.com and follow {author_handle}. "
f"Mention that the top 10 foodie trends are shared every Monday. "
f"Avoid using the word 'elevate'—use humanized language like 'level up' or 'bring to life'. "
f"Do not include emojis, hashtags, or reward-driven incentives (e.g., giveaways)."
)
for attempt in range(MAX_RETRIES):
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.debug(f"Generated final CTA tweet: {tweet}")
return tweet
except Exception as e:
logging.warning(f"Failed to generate final CTA tweet for {author['username']} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Failed to generate final CTA tweet after {MAX_RETRIES} attempts")
fallback = (
f"Want more foodie insights like these? Check out insiderfoodie.com and follow {author_handle} "
f"for the world’s top 10 foodie trends every Monday. Don’t miss out!"
)
logging.info(f"Using fallback final CTA tweet: {fallback}")
return fallback
def post_weekly_thread():
"""Post weekly threads for each author."""
try:
logging.info("Starting foodie_weekly_thread.py")
print("Starting foodie_weekly_thread.py")
valid_credentials = validate_twitter_credentials()
if not valid_credentials:
logging.error("No valid Twitter credentials found, exiting")
return
today = datetime.now(timezone.utc)
days_to_monday = today.weekday()
start_date = (today - timedelta(days=days_to_monday + 7)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = start_date + timedelta(days=6, hours=23, minutes=59, seconds=59)
logging.info(f"Fetching posts from {start_date} to {end_date}")
print(f"Fetching posts from {start_date} to {end_date}")
all_posts = load_recent_posts()
logging.info(f"Loaded {len(all_posts)} posts from recent_posts.json")
print(f"Loaded {len(all_posts)} posts from recent_posts.json")
if not all_posts:
logging.warning("No posts loaded, exiting post_weekly_thread")
print("No posts loaded, exiting post_weekly_thread")
return
weekly_posts = filter_posts_for_week(all_posts, start_date, end_date)
logging.info(f"Filtered to {len(weekly_posts)} posts for the week")
print(f"Filtered to {len(weekly_posts)} posts for the week")
if not weekly_posts:
logging.warning("No posts found within the week range, exiting post_weekly_thread")
print("No posts found within the week range, exiting post_weekly_thread")
return
posts_by_author = {}
for post in weekly_posts:
author = post["author_username"]
if author not in posts_by_author:
posts_by_author[author] = []
posts_by_author[author].append(post)
logging.debug(f"Grouped posts by author: {list(posts_by_author.keys())}")
for author in AUTHORS:
try:
author_posts = posts_by_author.get(author["username"], [])
logging.info(f"Processing author {author['username']} with {len(author_posts)} posts")
print(f"Processing author {author['username']} with {len(author_posts)} posts")
if not author_posts:
logging.info(f"No posts found for {author['username']} this week")
print(f"No posts found for {author['username']} this week")
continue
author_posts.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
top_posts = author_posts[:10]
logging.info(f"Selected {len(top_posts)} top posts for {author['username']}")
print(f"Selected {len(top_posts)} top posts for {author['username']}")
intro_tweet = generate_intro_tweet(author)
if not intro_tweet:
logging.error(f"Failed to generate intro tweet for {author['username']}, skipping")
continue
logging.info(f"Posting intro tweet for {author['username']}: {intro_tweet}")
print(f"Posting intro tweet for {author['username']}: {intro_tweet}")
intro_response = post_tweet(author, intro_tweet)
if not intro_response:
logging.error(f"Failed to post intro tweet for {author['username']}, skipping thread")
print(f"Failed to post intro tweet for {author['username']}")
continue
intro_tweet_id = intro_response.get("id")
last_tweet_id = intro_tweet_id
logging.debug(f"Intro tweet posted with ID {intro_tweet_id}")
for i, post in enumerate(top_posts, 1):
try:
post_tweet_content = f"{i}. {post['title']} Link: {post['url']}"
logging.info(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}")
print(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}")
reply_response = post_tweet(author, post_tweet_content, reply_to_id=last_tweet_id)
if not reply_response:
logging.error(f"Failed to post thread reply {i} for {author['username']}")
else:
last_tweet_id = reply_response.get("id")
logging.debug(f"Thread reply {i} posted with ID {last_tweet_id}")
except Exception as e:
logging.error(f"Error posting thread reply {i} for {author['username']}: {e}", exc_info=True)
continue
# Post final CTA tweet
if last_tweet_id and top_posts: # Ensure there's a valid thread to reply to
try:
final_cta = generate_final_cta(author)
if not final_cta:
logging.error(f"Failed to generate final CTA tweet for {author['username']}, skipping")
continue
logging.info(f"Posting final CTA tweet for {author['username']}: {final_cta}")
print(f"Posting final CTA tweet for {author['username']}: {final_cta}")
cta_response = post_tweet(author, final_cta, reply_to_id=last_tweet_id)
if not cta_response:
logging.error(f"Failed to post final CTA tweet for {author['username']}")
else:
logging.debug(f"Final CTA tweet posted with ID {cta_response.get('id')}")
except Exception as e:
logging.error(f"Error posting final CTA tweet for {author['username']}: {e}", exc_info=True)
logging.info(f"Successfully posted weekly thread for {author['username']}")
print(f"Successfully posted weekly thread for {author['username']}")
except Exception as e:
logging.error(f"Error processing author {author['username']}: {e}", exc_info=True)
continue
logging.info("Completed foodie_weekly_thread.py")
print("Completed foodie_weekly_thread.py")
except Exception as e:
logging.error(f"Unexpected error in post_weekly_thread: {e}", exc_info=True)
print(f"Error in post_weekly_thread: {e}")
def main():
"""Main function to run the script."""
lock_fd = None
try:
lock_fd = acquire_lock()
setup_logging()
post_weekly_thread()
except Exception as e:
logging.error(f"Fatal error in main: {e}", exc_info=True)
print(f"Fatal error: {e}")
sys.exit(1)
finally:
if lock_fd:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
os.remove(LOCK_FILE) if os.path.exists(LOCK_FILE) else None
if __name__ == "__main__":
main()