add lock files and update weekly tweet to include last tweet to follow

This commit is contained in:
2025-05-06 09:40:04 +10:00
parent 331979ca9e
commit 028dfc3fc8
6 changed files with 1481 additions and 904 deletions
+316 -170
View File
@@ -1,94 +1,134 @@
# foodie_weekly_thread.py
import json
import os
from datetime import datetime, timedelta, timezone
import logging
import random
import signal
import sys
import fcntl
import time
from datetime import datetime, timedelta, timezone
import tweepy
from openai import OpenAI
from foodie_utils import post_tweet, AUTHORS, SUMMARY_MODEL
from foodie_config import X_API_CREDENTIALS
from dotenv import load_dotenv
import tweepy
load_dotenv()
# Logging configuration
LOG_FILE = "/home/shane/foodie_automator/foodie_weekly_thread.log"
LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_weekly_thread.lock"
LOG_FILE = "/home/shane/foodie_automator/logs/foodie_weekly_thread.log"
LOG_PRUNE_DAYS = 30
MAX_RETRIES = 3
RETRY_BACKOFF = 2
RECENT_POSTS_FILE = "/home/shane/foodie_automator/recent_posts.json"
def setup_logging():
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = []
for line in lines:
try:
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
if timestamp > cutoff:
pruned_lines.append(line)
except ValueError:
continue
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logging.info("Logging initialized for foodie_weekly_thread.py")
"""Initialize logging with pruning of old logs."""
try:
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = []
malformed_count = 0
for line in lines:
if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit():
malformed_count += 1
continue
try:
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
if timestamp > cutoff:
pruned_lines.append(line)
except ValueError:
malformed_count += 1
continue
if malformed_count > 0:
logging.info(f"Skipped {malformed_count} malformed log lines during pruning")
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logging.getLogger("tweepy").setLevel(logging.WARNING)
logging.info("Logging initialized for foodie_weekly_thread.py")
except Exception as e:
print(f"Failed to setup logging: {e}")
sys.exit(1)
setup_logging()
def acquire_lock():
"""Acquire a lock to prevent concurrent runs."""
os.makedirs(os.path.dirname(LOCK_FILE), exist_ok=True)
lock_fd = open(LOCK_FILE, 'w')
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
lock_fd.write(str(os.getpid()))
lock_fd.flush()
return lock_fd
except IOError:
logging.info("Another instance of foodie_weekly_thread.py is running")
sys.exit(0)
def signal_handler(sig, frame):
"""Handle termination signals gracefully."""
logging.info("Received termination signal, exiting...")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
if not os.getenv("OPENAI_API_KEY"):
logging.error("OPENAI_API_KEY is not set in environment variables")
raise ValueError("OPENAI_API_KEY is required")
try:
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
if not os.getenv("OPENAI_API_KEY"):
logging.error("OPENAI_API_KEY is not set in environment variables")
raise ValueError("OPENAI_API_KEY is required")
except Exception as e:
logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True)
sys.exit(1)
# Validate X_API_CREDENTIALS and test API access
def validate_twitter_credentials():
"""Validate Twitter API credentials for all authors."""
logging.info("Validating Twitter API credentials for all authors")
valid_credentials = []
for author in AUTHORS:
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']} in X_API_CREDENTIALS")
print(f"No X credentials found for {author['username']}")
continue
logging.debug(f"Testing credentials for {author['username']} (handle: {credentials['x_username']})")
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
# Test API access by fetching the user's profile
user = client.get_me()
logging.info(f"Credentials valid for {author['username']} (handle: {credentials['x_username']}, user_id: {user.data.id})")
print(f"Credentials valid for {author['username']} (handle: {credentials['x_username']})")
valid_credentials.append(credentials)
except tweepy.TweepyException as e:
logging.error(f"Failed to validate credentials for {author['username']} (handle: {credentials['x_username']}): {e}")
if hasattr(e, 'response') and e.response:
logging.error(f"Twitter API response: {e.response.text}")
print(f"Failed to validate credentials for {author['username']}: {e}")
for attempt in range(MAX_RETRIES):
try:
twitter_client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
user = twitter_client.get_me()
logging.info(f"Credentials valid for {author['username']} (handle: {credentials['x_username']})")
valid_credentials.append(credentials)
break
except tweepy.TweepyException as e:
logging.error(f"Failed to validate credentials for {author['username']} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Credentials invalid for {author['username']} after {MAX_RETRIES} attempts")
if not valid_credentials:
logging.error("No valid Twitter credentials found for any author")
raise ValueError("No valid Twitter credentials found")
return valid_credentials
# Run credential validation
validate_twitter_credentials()
RECENT_POSTS_FILE = "/home/shane/foodie_automator/recent_posts.json"
def load_recent_posts():
"""Load and deduplicate posts from recent_posts.json."""
posts = []
unique_posts = {}
logging.debug(f"Attempting to load posts from {RECENT_POSTS_FILE}")
@@ -131,13 +171,15 @@ def load_recent_posts():
continue
logging.info(f"Loaded {len(posts)} unique posts from {RECENT_POSTS_FILE} (after deduplication)")
except Exception as e:
logging.error(f"Failed to load {RECENT_POSTS_FILE}: {e}")
logging.error(f"Failed to load {RECENT_POSTS_FILE}: {e}", exc_info=True)
return posts
if not posts:
logging.warning(f"No valid posts loaded from {RECENT_POSTS_FILE}")
return posts
def filter_posts_for_week(posts, start_date, end_date):
"""Filter posts within the specified week."""
filtered_posts = []
logging.debug(f"Filtering {len(posts)} posts for range {start_date} to {end_date}")
@@ -155,6 +197,7 @@ def filter_posts_for_week(posts, start_date, end_date):
return filtered_posts
def generate_intro_tweet(author):
"""Generate an intro tweet for the weekly thread."""
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
@@ -170,118 +213,221 @@ def generate_intro_tweet(author):
f"Do not include emojis, hashtags, or reward-driven incentives (e.g., giveaways)."
)
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.debug(f"Generated intro tweet: {tweet}")
return tweet
except Exception as e:
logging.error(f"Failed to generate intro tweet for {author['username']}: {e}")
fallback = (
f"This weeks top 10 foodie finds by {author_handle} Check out the best on InsiderFoodie.com "
f"Follow {author_handle} for more and like this thread to stay in the loop Visit us at https://insiderfoodie.com"
)
logging.info(f"Using fallback intro tweet: {fallback}")
return fallback
for attempt in range(MAX_RETRIES):
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.debug(f"Generated intro tweet: {tweet}")
return tweet
except Exception as e:
logging.warning(f"Failed to generate intro tweet for {author['username']} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Failed to generate intro tweet after {MAX_RETRIES} attempts")
fallback = (
f"This week's top 10 foodie finds by {author_handle}! Check out the best on InsiderFoodie.com. "
f"Follow {author_handle} for more and like this thread to stay in the loop! Visit us at https://insiderfoodie.com"
)
logging.info(f"Using fallback intro tweet: {fallback}")
return fallback
def generate_final_cta(author):
"""Generate a final CTA tweet for the weekly thread using GPT."""
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return None
author_handle = credentials["x_username"]
logging.debug(f"Generating final CTA tweet for {author_handle}")
prompt = (
f"Generate a concise tweet (under 280 characters) for {author_handle}. "
f"Conclude a thread of their top 10 foodie posts of the week on InsiderFoodie.com. "
f"Make it engaging, value-driven, and urgent, in the style of Neil Patel. "
f"Include a call to action to visit InsiderFoodie.com and follow {author_handle}. "
f"Mention that the top 10 foodie trends are shared every Monday. "
f"Avoid using the word 'elevate'—use humanized language like 'level up' or 'bring to life'. "
f"Do not include emojis, hashtags, or reward-driven incentives (e.g., giveaways)."
)
for attempt in range(MAX_RETRIES):
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.debug(f"Generated final CTA tweet: {tweet}")
return tweet
except Exception as e:
logging.warning(f"Failed to generate final CTA tweet for {author['username']} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Failed to generate final CTA tweet after {MAX_RETRIES} attempts")
fallback = (
f"Want more foodie insights like these? Check out insiderfoodie.com and follow {author_handle} "
f"for the worlds top 10 foodie trends every Monday. Dont miss out!"
)
logging.info(f"Using fallback final CTA tweet: {fallback}")
return fallback
def post_weekly_thread():
logging.info("Entering post_weekly_thread")
print("Entering post_weekly_thread")
today = datetime.now(timezone.utc)
days_to_monday = today.weekday()
start_date = (today - timedelta(days=days_to_monday + 7)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = start_date + timedelta(days=6, hours=23, minutes=59, seconds=59)
logging.info(f"Fetching posts from {start_date} to {end_date}")
print(f"Fetching posts from {start_date} to {end_date}")
all_posts = load_recent_posts()
print(f"Loaded {len(all_posts)} posts from recent_posts.json")
logging.info(f"Loaded {len(all_posts)} posts from recent_posts.json")
if not all_posts:
logging.warning("No posts loaded, exiting post_weekly_thread")
print("No posts loaded, exiting post_weekly_thread")
return
weekly_posts = filter_posts_for_week(all_posts, start_date, end_date)
print(f"Filtered to {len(weekly_posts)} posts for the week")
logging.info(f"Filtered to {len(weekly_posts)} posts for the week")
if not weekly_posts:
logging.warning("No posts found within the week range, exiting post_weekly_thread")
print("No posts found within the week range, exiting post_weekly_thread")
return
posts_by_author = {}
for post in weekly_posts:
author = post["author_username"]
if author not in posts_by_author:
posts_by_author[author] = []
posts_by_author[author].append(post)
logging.debug(f"Grouped posts by author: {list(posts_by_author.keys())}")
for author in AUTHORS:
author_posts = posts_by_author.get(author["username"], [])
logging.info(f"Processing author {author['username']} with {len(author_posts)} posts")
print(f"Processing author {author['username']} with {len(author_posts)} posts")
if not author_posts:
logging.info(f"No posts found for {author['username']} this week")
print(f"No posts found for {author['username']} this week")
continue
author_posts.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
top_posts = author_posts[:10]
logging.info(f"Selected {len(top_posts)} top posts for {author['username']}")
print(f"Selected {len(top_posts)} top posts for {author['username']}")
intro_tweet = generate_intro_tweet(author)
if not intro_tweet:
logging.error(f"Failed to generate intro tweet for {author['username']}, skipping")
continue
logging.info(f"Posting intro tweet for {author['username']}: {intro_tweet}")
print(f"Posting intro tweet for {author['username']}: {intro_tweet}")
intro_response = post_tweet(author, intro_tweet)
if not intro_response:
logging.error(f"Failed to post intro tweet for {author['username']}, skipping thread")
print(f"Failed to post intro tweet for {author['username']}")
continue
intro_tweet_id = intro_response.get("id")
logging.debug(f"Intro tweet posted with ID {intro_tweet_id}")
for i, post in enumerate(top_posts, 1):
post_tweet_content = f"{i}. {post['title']} Link: {post['url']}"
logging.info(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}")
print(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}")
reply_response = post_tweet(author, post_tweet_content, reply_to_id=intro_tweet_id)
if not reply_response:
logging.error(f"Failed to post thread reply {i} for {author['username']}")
else:
logging.debug(f"Thread reply {i} posted with ID {reply_response.get('id')}")
logging.info(f"Successfully posted weekly thread for {author['username']}")
print(f"Successfully posted weekly thread for {author['username']}")
if __name__ == "__main__":
print("Starting foodie_weekly_thread.py")
logging.info("Starting foodie_weekly_thread.py")
"""Post weekly threads for each author."""
try:
post_weekly_thread()
logging.info("Starting foodie_weekly_thread.py")
print("Starting foodie_weekly_thread.py")
valid_credentials = validate_twitter_credentials()
if not valid_credentials:
logging.error("No valid Twitter credentials found, exiting")
return
today = datetime.now(timezone.utc)
days_to_monday = today.weekday()
start_date = (today - timedelta(days=days_to_monday + 7)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = start_date + timedelta(days=6, hours=23, minutes=59, seconds=59)
logging.info(f"Fetching posts from {start_date} to {end_date}")
print(f"Fetching posts from {start_date} to {end_date}")
all_posts = load_recent_posts()
logging.info(f"Loaded {len(all_posts)} posts from recent_posts.json")
print(f"Loaded {len(all_posts)} posts from recent_posts.json")
if not all_posts:
logging.warning("No posts loaded, exiting post_weekly_thread")
print("No posts loaded, exiting post_weekly_thread")
return
weekly_posts = filter_posts_for_week(all_posts, start_date, end_date)
logging.info(f"Filtered to {len(weekly_posts)} posts for the week")
print(f"Filtered to {len(weekly_posts)} posts for the week")
if not weekly_posts:
logging.warning("No posts found within the week range, exiting post_weekly_thread")
print("No posts found within the week range, exiting post_weekly_thread")
return
posts_by_author = {}
for post in weekly_posts:
author = post["author_username"]
if author not in posts_by_author:
posts_by_author[author] = []
posts_by_author[author].append(post)
logging.debug(f"Grouped posts by author: {list(posts_by_author.keys())}")
for author in AUTHORS:
try:
author_posts = posts_by_author.get(author["username"], [])
logging.info(f"Processing author {author['username']} with {len(author_posts)} posts")
print(f"Processing author {author['username']} with {len(author_posts)} posts")
if not author_posts:
logging.info(f"No posts found for {author['username']} this week")
print(f"No posts found for {author['username']} this week")
continue
author_posts.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
top_posts = author_posts[:10]
logging.info(f"Selected {len(top_posts)} top posts for {author['username']}")
print(f"Selected {len(top_posts)} top posts for {author['username']}")
intro_tweet = generate_intro_tweet(author)
if not intro_tweet:
logging.error(f"Failed to generate intro tweet for {author['username']}, skipping")
continue
logging.info(f"Posting intro tweet for {author['username']}: {intro_tweet}")
print(f"Posting intro tweet for {author['username']}: {intro_tweet}")
intro_response = post_tweet(author, intro_tweet)
if not intro_response:
logging.error(f"Failed to post intro tweet for {author['username']}, skipping thread")
print(f"Failed to post intro tweet for {author['username']}")
continue
intro_tweet_id = intro_response.get("id")
last_tweet_id = intro_tweet_id
logging.debug(f"Intro tweet posted with ID {intro_tweet_id}")
for i, post in enumerate(top_posts, 1):
try:
post_tweet_content = f"{i}. {post['title']} Link: {post['url']}"
logging.info(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}")
print(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}")
reply_response = post_tweet(author, post_tweet_content, reply_to_id=last_tweet_id)
if not reply_response:
logging.error(f"Failed to post thread reply {i} for {author['username']}")
else:
last_tweet_id = reply_response.get("id")
logging.debug(f"Thread reply {i} posted with ID {last_tweet_id}")
except Exception as e:
logging.error(f"Error posting thread reply {i} for {author['username']}: {e}", exc_info=True)
continue
# Post final CTA tweet
if last_tweet_id and top_posts: # Ensure there's a valid thread to reply to
try:
final_cta = generate_final_cta(author)
if not final_cta:
logging.error(f"Failed to generate final CTA tweet for {author['username']}, skipping")
continue
logging.info(f"Posting final CTA tweet for {author['username']}: {final_cta}")
print(f"Posting final CTA tweet for {author['username']}: {final_cta}")
cta_response = post_tweet(author, final_cta, reply_to_id=last_tweet_id)
if not cta_response:
logging.error(f"Failed to post final CTA tweet for {author['username']}")
else:
logging.debug(f"Final CTA tweet posted with ID {cta_response.get('id')}")
except Exception as e:
logging.error(f"Error posting final CTA tweet for {author['username']}: {e}", exc_info=True)
logging.info(f"Successfully posted weekly thread for {author['username']}")
print(f"Successfully posted weekly thread for {author['username']}")
except Exception as e:
logging.error(f"Error processing author {author['username']}: {e}", exc_info=True)
continue
logging.info("Completed foodie_weekly_thread.py")
print("Completed foodie_weekly_thread.py")
except Exception as e:
logging.error(f"Unexpected error in post_weekly_thread: {e}", exc_info=True)
print("Completed foodie_weekly_thread.py")
logging.info("Completed foodie_weekly_thread.py")
print(f"Error in post_weekly_thread: {e}")
def main():
"""Main function to run the script."""
lock_fd = None
try:
lock_fd = acquire_lock()
setup_logging()
post_weekly_thread()
except Exception as e:
logging.error(f"Fatal error in main: {e}", exc_info=True)
print(f"Fatal error: {e}")
sys.exit(1)
finally:
if lock_fd:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
os.remove(LOCK_FILE) if os.path.exists(LOCK_FILE) else None
if __name__ == "__main__":
main()