You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
287 lines
13 KiB
287 lines
13 KiB
import json |
|
import os |
|
from datetime import datetime, timedelta, timezone |
|
import logging |
|
import random |
|
from openai import OpenAI |
|
from foodie_utils import post_tweet, AUTHORS, SUMMARY_MODEL |
|
from foodie_config import X_API_CREDENTIALS |
|
from dotenv import load_dotenv |
|
import tweepy |
|
|
|
load_dotenv() |
|
|
|
# Logging configuration |
|
LOG_FILE = "/home/shane/foodie_automator/foodie_weekly_thread.log" |
|
LOG_PRUNE_DAYS = 30 |
|
|
|
def setup_logging(): |
|
if os.path.exists(LOG_FILE): |
|
with open(LOG_FILE, 'r') as f: |
|
lines = f.readlines() |
|
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS) |
|
pruned_lines = [] |
|
for line in lines: |
|
try: |
|
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) |
|
if timestamp > cutoff: |
|
pruned_lines.append(line) |
|
except ValueError: |
|
continue |
|
with open(LOG_FILE, 'w') as f: |
|
f.writelines(pruned_lines) |
|
|
|
logging.basicConfig( |
|
filename=LOG_FILE, |
|
level=logging.DEBUG, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
) |
|
console_handler = logging.StreamHandler() |
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
logging.getLogger().addHandler(console_handler) |
|
logging.info("Logging initialized for foodie_weekly_thread.py") |
|
|
|
setup_logging() |
|
|
|
# Initialize OpenAI client |
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
if not os.getenv("OPENAI_API_KEY"): |
|
logging.error("OPENAI_API_KEY is not set in environment variables") |
|
raise ValueError("OPENAI_API_KEY is required") |
|
|
|
# Validate X_API_CREDENTIALS and test API access |
|
def validate_twitter_credentials(): |
|
logging.info("Validating Twitter API credentials for all authors") |
|
valid_credentials = [] |
|
for author in AUTHORS: |
|
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None) |
|
if not credentials: |
|
logging.error(f"No X credentials found for {author['username']} in X_API_CREDENTIALS") |
|
print(f"No X credentials found for {author['username']}") |
|
continue |
|
logging.debug(f"Testing credentials for {author['username']} (handle: {credentials['x_username']})") |
|
try: |
|
client = tweepy.Client( |
|
consumer_key=credentials["api_key"], |
|
consumer_secret=credentials["api_secret"], |
|
access_token=credentials["access_token"], |
|
access_token_secret=credentials["access_token_secret"] |
|
) |
|
# Test API access by fetching the user's profile |
|
user = client.get_me() |
|
logging.info(f"Credentials valid for {author['username']} (handle: {credentials['x_username']}, user_id: {user.data.id})") |
|
print(f"Credentials valid for {author['username']} (handle: {credentials['x_username']})") |
|
valid_credentials.append(credentials) |
|
except tweepy.TweepyException as e: |
|
logging.error(f"Failed to validate credentials for {author['username']} (handle: {credentials['x_username']}): {e}") |
|
if hasattr(e, 'response') and e.response: |
|
logging.error(f"Twitter API response: {e.response.text}") |
|
print(f"Failed to validate credentials for {author['username']}: {e}") |
|
if not valid_credentials: |
|
logging.error("No valid Twitter credentials found for any author") |
|
raise ValueError("No valid Twitter credentials found") |
|
return valid_credentials |
|
|
|
# Run credential validation |
|
validate_twitter_credentials() |
|
|
|
RECENT_POSTS_FILE = "/home/shane/foodie_automator/recent_posts.json" |
|
|
|
def load_recent_posts(): |
|
posts = [] |
|
unique_posts = {} |
|
logging.debug(f"Attempting to load posts from {RECENT_POSTS_FILE}") |
|
|
|
if not os.path.exists(RECENT_POSTS_FILE): |
|
logging.error(f"Recent posts file {RECENT_POSTS_FILE} does not exist") |
|
return posts |
|
if not os.access(RECENT_POSTS_FILE, os.R_OK): |
|
logging.error(f"Cannot read {RECENT_POSTS_FILE} due to permission issues") |
|
return posts |
|
|
|
try: |
|
with open(RECENT_POSTS_FILE, 'r') as f: |
|
lines = f.readlines() |
|
logging.debug(f"Read {len(lines)} lines from {RECENT_POSTS_FILE}") |
|
|
|
for i, line in enumerate(lines, 1): |
|
if not line.strip(): |
|
logging.debug(f"Skipping empty line {i} in {RECENT_POSTS_FILE}") |
|
continue |
|
try: |
|
entry = json.loads(line.strip()) |
|
required_fields = ["title", "url", "author_username", "timestamp"] |
|
if not all(key in entry for key in required_fields): |
|
logging.warning(f"Skipping invalid entry at line {i}: missing fields {entry}") |
|
continue |
|
try: |
|
datetime.fromisoformat(entry["timestamp"]) |
|
except ValueError: |
|
logging.warning(f"Skipping entry at line {i}: invalid timestamp {entry['timestamp']}") |
|
continue |
|
key = (entry["title"], entry["url"], entry["author_username"]) |
|
if key in unique_posts: |
|
logging.debug(f"Skipping duplicate entry at line {i}: {entry['title']}") |
|
continue |
|
unique_posts[key] = entry |
|
posts.append(entry) |
|
except json.JSONDecodeError as e: |
|
logging.warning(f"Skipping invalid JSON at line {i}: {e}") |
|
continue |
|
logging.info(f"Loaded {len(posts)} unique posts from {RECENT_POSTS_FILE} (after deduplication)") |
|
except Exception as e: |
|
logging.error(f"Failed to load {RECENT_POSTS_FILE}: {e}") |
|
|
|
if not posts: |
|
logging.warning(f"No valid posts loaded from {RECENT_POSTS_FILE}") |
|
return posts |
|
|
|
def filter_posts_for_week(posts, start_date, end_date): |
|
filtered_posts = [] |
|
logging.debug(f"Filtering {len(posts)} posts for range {start_date} to {end_date}") |
|
|
|
for post in posts: |
|
try: |
|
timestamp = datetime.fromisoformat(post["timestamp"]) |
|
logging.debug(f"Checking post '{post['title']}' with timestamp {timestamp}") |
|
if start_date <= timestamp <= end_date: |
|
filtered_posts.append(post) |
|
else: |
|
logging.debug(f"Post '{post['title']}' timestamp {timestamp} outside range") |
|
except ValueError as e: |
|
logging.warning(f"Skipping post with invalid timestamp: {post.get('title', 'Unknown')} - {e}") |
|
logging.info(f"Filtered to {len(filtered_posts)} posts within week range") |
|
return filtered_posts |
|
|
|
def generate_intro_tweet(author): |
|
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None) |
|
if not credentials: |
|
logging.error(f"No X credentials found for {author['username']}") |
|
return None |
|
author_handle = credentials["x_username"] |
|
logging.debug(f"Generating intro tweet for {author_handle}") |
|
|
|
prompt = ( |
|
f"Generate a concise tweet (under 280 characters) for {author_handle}. " |
|
f"Introduce a thread of their top 10 foodie posts of the week on InsiderFoodie.com. " |
|
f"Make it engaging, create curiosity, and include a call to action to visit InsiderFoodie.com, follow {author_handle}, or like the thread. " |
|
f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'. " |
|
f"Do not include emojis, hashtags, or reward-driven incentives (e.g., giveaways)." |
|
) |
|
|
|
try: |
|
response = client.chat.completions.create( |
|
model=SUMMARY_MODEL, |
|
messages=[ |
|
{"role": "system", "content": "You are a social media expert crafting engaging tweets."}, |
|
{"role": "user", "content": prompt} |
|
], |
|
max_tokens=100, |
|
temperature=0.7 |
|
) |
|
tweet = response.choices[0].message.content.strip() |
|
if len(tweet) > 280: |
|
tweet = tweet[:277] + "..." |
|
logging.debug(f"Generated intro tweet: {tweet}") |
|
return tweet |
|
except Exception as e: |
|
logging.error(f"Failed to generate intro tweet for {author['username']}: {e}") |
|
fallback = ( |
|
f"This weeks top 10 foodie finds by {author_handle} Check out the best on InsiderFoodie.com " |
|
f"Follow {author_handle} for more and like this thread to stay in the loop Visit us at https://insiderfoodie.com" |
|
) |
|
logging.info(f"Using fallback intro tweet: {fallback}") |
|
return fallback |
|
|
|
def post_weekly_thread(): |
|
logging.info("Entering post_weekly_thread") |
|
print("Entering post_weekly_thread") |
|
|
|
today = datetime.now(timezone.utc) |
|
days_to_monday = today.weekday() |
|
start_date = (today - timedelta(days=days_to_monday + 7)).replace(hour=0, minute=0, second=0, microsecond=0) |
|
end_date = start_date + timedelta(days=6, hours=23, minutes=59, seconds=59) |
|
|
|
logging.info(f"Fetching posts from {start_date} to {end_date}") |
|
print(f"Fetching posts from {start_date} to {end_date}") |
|
|
|
all_posts = load_recent_posts() |
|
print(f"Loaded {len(all_posts)} posts from recent_posts.json") |
|
logging.info(f"Loaded {len(all_posts)} posts from recent_posts.json") |
|
|
|
if not all_posts: |
|
logging.warning("No posts loaded, exiting post_weekly_thread") |
|
print("No posts loaded, exiting post_weekly_thread") |
|
return |
|
|
|
weekly_posts = filter_posts_for_week(all_posts, start_date, end_date) |
|
print(f"Filtered to {len(weekly_posts)} posts for the week") |
|
logging.info(f"Filtered to {len(weekly_posts)} posts for the week") |
|
|
|
if not weekly_posts: |
|
logging.warning("No posts found within the week range, exiting post_weekly_thread") |
|
print("No posts found within the week range, exiting post_weekly_thread") |
|
return |
|
|
|
posts_by_author = {} |
|
for post in weekly_posts: |
|
author = post["author_username"] |
|
if author not in posts_by_author: |
|
posts_by_author[author] = [] |
|
posts_by_author[author].append(post) |
|
logging.debug(f"Grouped posts by author: {list(posts_by_author.keys())}") |
|
|
|
for author in AUTHORS: |
|
author_posts = posts_by_author.get(author["username"], []) |
|
logging.info(f"Processing author {author['username']} with {len(author_posts)} posts") |
|
print(f"Processing author {author['username']} with {len(author_posts)} posts") |
|
|
|
if not author_posts: |
|
logging.info(f"No posts found for {author['username']} this week") |
|
print(f"No posts found for {author['username']} this week") |
|
continue |
|
|
|
author_posts.sort(key=lambda x: x.get("timestamp", ""), reverse=True) |
|
top_posts = author_posts[:10] |
|
logging.info(f"Selected {len(top_posts)} top posts for {author['username']}") |
|
print(f"Selected {len(top_posts)} top posts for {author['username']}") |
|
|
|
intro_tweet = generate_intro_tweet(author) |
|
if not intro_tweet: |
|
logging.error(f"Failed to generate intro tweet for {author['username']}, skipping") |
|
continue |
|
logging.info(f"Posting intro tweet for {author['username']}: {intro_tweet}") |
|
print(f"Posting intro tweet for {author['username']}: {intro_tweet}") |
|
|
|
intro_response = post_tweet(author, intro_tweet) |
|
if not intro_response: |
|
logging.error(f"Failed to post intro tweet for {author['username']}, skipping thread") |
|
print(f"Failed to post intro tweet for {author['username']}") |
|
continue |
|
|
|
intro_tweet_id = intro_response.get("id") |
|
logging.debug(f"Intro tweet posted with ID {intro_tweet_id}") |
|
|
|
for i, post in enumerate(top_posts, 1): |
|
post_tweet_content = f"{i}. {post['title']} Link: {post['url']}" |
|
logging.info(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}") |
|
print(f"Posting thread reply {i} for {author['username']}: {post_tweet_content}") |
|
reply_response = post_tweet(author, post_tweet_content, reply_to_id=intro_tweet_id) |
|
if not reply_response: |
|
logging.error(f"Failed to post thread reply {i} for {author['username']}") |
|
else: |
|
logging.debug(f"Thread reply {i} posted with ID {reply_response.get('id')}") |
|
|
|
logging.info(f"Successfully posted weekly thread for {author['username']}") |
|
print(f"Successfully posted weekly thread for {author['username']}") |
|
|
|
if __name__ == "__main__": |
|
print("Starting foodie_weekly_thread.py") |
|
logging.info("Starting foodie_weekly_thread.py") |
|
try: |
|
post_weekly_thread() |
|
except Exception as e: |
|
logging.error(f"Unexpected error in post_weekly_thread: {e}", exc_info=True) |
|
print("Completed foodie_weekly_thread.py") |
|
logging.info("Completed foodie_weekly_thread.py") |