You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

262 lines
12 KiB

import json
import os
from datetime import datetime, timedelta, timezone
import logging
import random
from openai import OpenAI
from foodie_utils import post_tweet, AUTHORS, SUMMARY_MODEL
from foodie_config import X_API_CREDENTIALS
from dotenv import load_dotenv
load_dotenv()
# Logging configuration
LOG_FILE = "/home/shane/foodie_automator/foodie_weekly_thread.log"
LOG_PRUNE_DAYS = 30
def setup_logging():
# Prune old logs
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = []
for line in lines:
try:
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
if timestamp > cutoff:
pruned_lines.append(line)
except ValueError:
continue
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
# Set up logging to file and console
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG, # Set to DEBUG for detailed output
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logging.info("Logging initialized for foodie_weekly_thread.py")
setup_logging()
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
if not os.getenv("OPENAI_API_KEY"):
logging.error("OPENAI_API_KEY is not set in environment variables")
raise ValueError("OPENAI_API_KEY is required")
# Validate X_API_CREDENTIALS
if not X_API_CREDENTIALS:
logging.error("X_API_CREDENTIALS is empty in foodie_config.py")
raise ValueError("X_API_CREDENTIALS is required")
RECENT_POSTS_FILE = "/home/shane/foodie_automator/recent_posts.json"
def load_recent_posts():
posts = []
unique_posts = {} # To track unique posts by title, URL, and author
logging.debug(f"Attempting to load posts from {RECENT_POSTS_FILE}")
# Check if file exists and is readable
if not os.path.exists(RECENT_POSTS_FILE):
logging.error(f"Recent posts file {RECENT_POSTS_FILE} does not exist")
return posts
if not os.access(RECENT_POSTS_FILE, os.R_OK):
logging.error(f"Cannot read {RECENT_POSTS_FILE} due to permission issues")
return posts
try:
with open(RECENT_POSTS_FILE, 'r') as f:
lines = f.readlines()
logging.debug(f"Read {len(lines)} lines from {RECENT_POSTS_FILE}")
for i, line in enumerate(lines, 1):
if not line.strip():
logging.debug(f"Skipping empty line {i} in {RECENT_POSTS_FILE}")
continue
try:
entry = json.loads(line.strip())
# Validate required fields
required_fields = ["title", "url", "author_username", "timestamp"]
if not all(key in entry for key in required_fields):
logging.warning(f"Skipping invalid entry at line {i}: missing fields {entry}")
continue
# Validate timestamp format
try:
datetime.fromisoformat(entry["timestamp"])
except ValueError:
logging.warning(f"Skipping entry at line {i}: invalid timestamp {entry['timestamp']}")
continue
# Deduplicate based on title, URL, and author
key = (entry["title"], entry["url"], entry["author_username"])
if key in unique_posts:
logging.debug(f"Skipping duplicate entry at line {i}: {entry['title']}")
continue
unique_posts[key] = entry
posts.append(entry)
except json.JSONDecodeError as e:
logging.warning(f"Skipping invalid JSON at line {i}: {e}")
continue
logging.info(f"Loaded {len(posts)} unique posts from {RECENT_POSTS_FILE} (after deduplication)")
except Exception as e:
logging.error(f"Failed to load {RECENT_POSTS_FILE}: {e}")
if not posts:
logging.warning(f"No valid posts loaded from {RECENT_POSTS_FILE}")
return posts
def filter_posts_for_week(posts, start_date, end_date):
filtered_posts = []
logging.debug(f"Filtering {len(posts)} posts for range {start_date} to {end_date}")
for post in posts:
try:
timestamp = datetime.fromisoformat(post["timestamp"])
logging.debug(f"Checking post '{post['title']}' with timestamp {timestamp}")
if start_date <= timestamp <= end_date:
filtered_posts.append(post)
else:
logging.debug(f"Post '{post['title']}' timestamp {timestamp} outside range")
except ValueError as e:
logging.warning(f"Skipping post with invalid timestamp: {post.get('title', 'Unknown')} - {e}")
logging.info(f"Filtered to {len(filtered_posts)} posts within week range")
return filtered_posts
def generate_intro_tweet(author):
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return None
author_handle = credentials["x_username"]
logging.debug(f"Generating intro tweet for {author_handle}")
prompt = (
f"Generate a concise tweet (under 280 characters) for {author_handle}. "
f"Introduce a thread of their top 10 foodie posts of the week on InsiderFoodie.com. "
f"Make it engaging, create curiosity, and include a call to action to visit InsiderFoodie.com, follow {author_handle}, or like the thread. "
f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'. "
f"Do not include emojis, hashtags, or reward-driven incentives (e.g., giveaways)."
)
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.debug(f"Generated intro tweet: {tweet}")
return tweet
except Exception as e:
logging.error(f"Failed to generate intro tweet for {author['username']}: {e}")
fallback = (
f"This weeks top 10 foodie finds by {author_handle} Check out the best on InsiderFoodie.com "
f"Follow {author_handle} for more and like this thread to stay in the loop Visit us at https://insiderfoodie.com"
)
logging.info(f"Using fallback intro tweet: {fallback}")
return fallback
def post_weekly_thread():
logging.info("Entering post_weekly_thread")
print("Entering post_weekly_thread")
today = datetime.now(timezone.utc)
# Fix week calculation to target the previous week (Monday to Sunday)
days_to_monday = today.weekday() # 0 for Monday, 1 for Tuesday, etc.
start_date = (today - timedelta(days=days_to_monday + 7)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = start_date + timedelta(days=6, hours=23, minutes=59, seconds=59)
logging.info(f"Fetching posts from {start_date} to {end_date}")
print(f"Fetching posts from {start_date} to {end_date}")
all_posts = load_recent_posts()
print(f"Loaded {len(all_posts)} posts from recent_posts.json")
logging.info(f"Loaded {len(all_posts)} posts from recent_posts.json")
if not all_posts:
logging.warning("No posts loaded, exiting post_weekly_thread")
print("No posts loaded, exiting post_weekly_thread")
return
weekly_posts = filter_posts_for_week(all_posts, start_date, end_date)
print(f"Filtered to {len(weekly_posts)} posts for the week")
logging.info(f"Filtered to {len(weekly_posts)} posts for the week")
if not weekly_posts:
logging.warning("No posts found within the week range, exiting post_weekly_thread")
print("No posts found within the week range, exiting post_weekly_thread")
return
posts_by_author = {}
for post in weekly_posts:
author = post["author_username"]
if author not in posts_by_author:
posts_by_author[author] = []
posts_by_author[author].append(post)
logging.debug(f"Grouped posts by author: {list(posts_by_author.keys())}")
for author in AUTHORS:
author_posts = posts_by_author.get(author["username"], [])
logging.info(f"Processing author {author['username']} with {len(author_posts)} posts")
print(f"Processing author {author['username']} with {len(author_posts)} posts")
if not author_posts:
logging.info(f"No posts found for {author['username']} this week")
print(f"No posts found for {author['username']} this week")
continue
author_posts.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
top_posts = author_posts[:10]
logging.info(f"Selected {len(top_posts)} top posts for {author['username']}")
print(f"Selected {len(top_posts)} top posts for {author['username']}")
intro_tweet = generate_intro_tweet(author)
if not intro_tweet:
logging.error(f"Failed to generate intro tweet for {author['username']}, skipping")
continue
logging.info(f"Posting intro tweet for {author['username']}: {intro_tweet}")
print(f"Posting intro tweet for {author['username']}: {intro_tweet}")
intro_response = post_tweet(author, intro_tweet)
if not intro_response:
logging.error(f"Failed to post intro tweet for {author['username']}")
print(f"Failed to post intro tweet for {author['username']}")
continue
intro_tweet_id = intro_response.get("id")
logging.debug(f"Intro tweet posted with ID {intro_tweet_id}")
for i, post in enumerate(top_posts, 1):
post_tweet_content = f"{i}. {post['title']} Link: {post['url']}"
logging.info(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}")
print(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}")
reply_response = post_tweet(author, post_tweet_content, reply_to_id=intro_tweet_id)
if not reply_response:
logging.error(f"Failed to post thread reply {i} for {author['username']}")
else:
logging.debug(f"Thread reply {i} posted with ID {reply_response.get('id')}")
logging.info(f"Successfully posted weekly thread for {author['username']}")
print(f"Successfully posted weekly thread for {author['username']}")
if __name__ == "__main__":
print("Starting foodie_weekly_thread.py")
logging.info("Starting foodie_weekly_thread.py")
try:
post_weekly_thread()
except Exception as e:
logging.error(f"Unexpected error in post_weekly_thread: {e}", exc_info=True)
print("Completed foodie_weekly_thread.py")
logging.info("Completed foodie_weekly_thread.py")