main
Shane 7 months ago
parent 167506ef30
commit 753934db4f
  1. 208
      foodie_utils.py
  2. 1
      requirements.txt

@ -20,12 +20,15 @@ from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import tweepy
import flickr_api
from filelock import FileLock
from foodie_config import (
RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS,
get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL, X_API_CREDENTIALS,
FLICKR_API_KEY, FLICKR_API_SECRET, PIXABAY_API_KEY, RECENT_POSTS_FILE, USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS
)
last_author_index = -1
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
IMAGE_UPLOAD_TIMEOUT = 30 # Added to fix NameError
@ -78,69 +81,41 @@ def load_json_file(file_path, expiration_hours=None):
logging.error(f"Failed to load JSON file {file_path}: {e}")
return []
def save_json_file(file_path, title, timestamp):
"""Save an entry to a JSON file, maintaining a JSON array."""
def save_json_file(filename, data):
"""Save data to a JSON file with locking to prevent corruption, without resetting on error."""
lock = FileLock(f"{filename}.lock")
try:
entries = load_json_file(file_path, 24 if "posted_" in file_path else IMAGE_EXPIRATION_DAYS * 24)
entry = {"title": title, "timestamp": timestamp}
entries.append(entry)
with open(file_path, 'w') as f:
json.dump(entries, f, indent=2)
logging.info(f"Saved '{title}' to {file_path}")
except Exception as e:
logging.error(f"Failed to save to {file_path}: {e}")
def load_post_counts():
counts = []
filename = '/home/shane/foodie_automator/x_post_counts.json'
if os.path.exists(filename):
with lock:
# Read existing data
existing_data = []
try:
if os.path.exists(filename):
with open(filename, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
if line.strip():
try:
entry = json.loads(line.strip())
# Check for expected fields in x_post_counts.json
if not isinstance(entry, dict) or "username" not in entry or "month" not in entry or "monthly_count" not in entry or "day" not in entry or "daily_count" not in entry:
logging.warning(f"Skipping malformed entry in {filename} at line {i}: {entry}")
continue
counts.append(entry)
except json.JSONDecodeError as e:
logging.warning(f"Skipping invalid JSON line in {filename} at line {i}: {e}")
logging.info(f"Loaded {len(counts)} entries from {filename}")
except Exception as e:
logging.error(f"Failed to load {filename}: {e}")
counts = [] # Reset to empty on failure
if not counts:
counts = [{
"username": author["username"],
"month": datetime.now(timezone.utc).strftime("%Y-%m"),
"monthly_count": 0,
"day": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"daily_count": 0
} for author in AUTHORS]
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
current_day = datetime.now(timezone.utc).strftime("%Y-%m-%d")
for entry in counts:
if entry["month"] != current_month:
entry["month"] = current_month
entry["monthly_count"] = 0
if entry["day"] != current_day:
entry["day"] = current_day
entry["daily_count"] = 0
return counts
def save_post_counts(counts):
with open('/home/shane/foodie_automator/x_post_counts.json', 'w') as f:
for item in counts:
json.dump(item, f)
f.write('\n')
logging.info("Saved post counts to x_post_counts.json")
existing_data = json.load(f)
if not isinstance(existing_data, list):
logging.warning(f"Data in {filename} is not a list. Resetting to empty list.")
existing_data = []
except (json.JSONDecodeError, FileNotFoundError) as e:
# If the file is corrupted, log the error and skip writing to preserve existing data
if isinstance(e, json.JSONDecodeError):
logging.error(f"Invalid JSON in {filename}: {e}. Skipping write to preserve existing data.")
return
logging.warning(f"File {filename} not found: {e}. Starting with empty list.")
import re
# Append new data if it's not already present
if isinstance(data, list):
existing_data.extend([item for item in data if item not in existing_data])
else:
if data not in existing_data:
existing_data.append(data)
# Write back to file
with open(filename, 'w') as f:
json.dump(existing_data, f, indent=2)
logging.info(f"Saved data to {filename}")
except Exception as e:
logging.error(f"Failed to save to {filename}: {e}", exc_info=True)
raise
def generate_article_tweet(author, post, persona):
title = post["title"]
@ -1127,35 +1102,29 @@ def get_flickr_image(search_query, relevance_keywords, main_topic):
def select_best_author(content, interest_score):
try:
x_post_counts = load_json_file('/home/shane/foodie_automator/x_post_counts.json', expiration_hours=24*30)
monthly_counts = {entry['username']: entry['monthly_count'] for entry in x_post_counts}
best_score = -1
best_author = None
for author, persona in PERSONA_CONFIGS.items():
prompt = persona["prompt"]
for author in AUTHORS:
persona = PERSONA_CONFIGS.get(author["username"], {})
prompt = persona.get("prompt", "")
current_score = interest_score
if "trend" in prompt.lower():
current_score += 2
elif "recipe" in prompt.lower():
current_score += 1
# Penalize authors with high post counts
post_count = monthly_counts.get(author, 0)
current_score -= post_count * 0.5
if current_score > best_score:
best_score = current_score
best_author = author
best_author = author["username"]
if not best_author:
best_author = min(monthly_counts, key=monthly_counts.get, default=random.choice(list(PERSONA_CONFIGS.keys())))
best_author = random.choice([author["username"] for author in AUTHORS])
logging.info(f"Selected author: {best_author} with adjusted score: {best_score}")
return best_author
except Exception as e:
logging.error(f"Error in select_best_author: {e}")
return random.choice(list(PERSONA_CONFIGS.keys()))
return random.choice([author["username"] for author in AUTHORS])
def check_rate_limit(response):
"""Extract rate limit information from Twitter API response headers."""
@ -1168,88 +1137,63 @@ def check_rate_limit(response):
return None, None
def check_author_rate_limit(author):
"""Check the rate limit for a specific author by making a lightweight API call."""
"""Check if the author can post based on Twitter API rate limits."""
from foodie_config import X_API_CREDENTIALS
import tweepy
credentials = X_API_CREDENTIALS.get(author["username"])
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return False, None, None
return False, 0, 0
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"],
return_type=dict
access_token_secret=credentials["access_token_secret"]
)
# Use a lightweight endpoint to check rate limits (e.g., /users/me)
# Make a lightweight API call to check rate limits
response = client.get_me()
remaining, reset = check_rate_limit(response)
if remaining is None or reset is None:
logging.warning(f"Could not determine rate limit for {author['username']}. Assuming rate limit is not hit.")
return True, None, None
if remaining <= 0:
reset_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(reset))
logging.info(f"Author {author['username']} is rate-limited. Remaining: {remaining}, Reset at: {reset_time}")
return False, remaining, reset
logging.debug(f"Author {author['username']} can post. Remaining: {remaining}, Reset at: {time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(reset))}")
return True, remaining, reset
logging.warning(f"Could not determine rate limits for {author['username']}. Assuming rate-limited.")
return False, 0, 0
can_post = remaining > 0
if not can_post:
logging.info(f"Author {author['username']} is rate-limited. Remaining: {remaining}, Reset at: {reset}")
return can_post, remaining, reset
except tweepy.TweepyException as e:
logging.error(f"Failed to check rate limit for {author['username']}: {e}")
if e.response and e.response.status_code == 429:
remaining, reset = check_rate_limit(e.response)
if remaining is not None and reset is not None:
reset_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(reset))
logging.info(f"Author {author['username']} is rate-limited. Remaining: {remaining}, Reset at: {reset_time}")
return False, remaining, reset
logging.warning(f"Assuming {author['username']} is rate-limited due to error.")
return False, None, None
logging.error(f"Failed to check rate limits for {author['username']}: {e}")
return False, 0, 0
except Exception as e:
logging.error(f"Unexpected error checking rate limit for {author['username']}: {e}", exc_info=True)
return False, None, None
logging.error(f"Unexpected error checking rate limits for {author['username']}: {e}", exc_info=True)
return False, 0, 0
def get_next_author_round_robin():
"""Select the next author in a round-robin fashion, ensuring they are not rate-limited."""
last_author_file = "/home/shane/foodie_automator/last_author.json"
authors = [author["username"] for author in AUTHORS]
# Load the last used author
try:
if os.path.exists(last_author_file):
with open(last_author_file, 'r') as f:
last_data = json.load(f)
last_index = last_data.get("last_index", -1)
else:
last_index = -1
except Exception as e:
logging.warning(f"Failed to load last author from {last_author_file}: {e}. Starting from first author.")
last_index = -1
# Find the next author who is not rate-limited
start_index = (last_index + 1) % len(authors)
for i in range(len(authors)):
current_index = (start_index + i) % len(authors)
username = authors[current_index]
author = next(author for author in AUTHORS if author["username"] == username)
"""Select the next author in round-robin fashion, respecting rate limits."""
global last_author_index
authors = AUTHORS
num_authors = len(authors)
if num_authors == 0:
logging.error("No authors available in AUTHORS list.")
return None
# Check if the author can post based on rate limits
# Try each author in round-robin order
for i in range(num_authors):
last_author_index = (last_author_index + 1) % num_authors
author = authors[last_author_index]
can_post, remaining, reset = check_author_rate_limit(author)
if not can_post:
logging.info(f"Skipping author {username} due to rate limit.")
continue
# Save the current index as the last used author
try:
with open(last_author_file, 'w') as f:
json.dump({"last_index": current_index}, f)
logging.info(f"Selected author {username} (index {current_index}) in round-robin order")
except Exception as e:
logging.warning(f"Failed to save last author to {last_author_file}: {e}")
if can_post:
logging.info(f"Author {author['username']} can post")
return author
else:
reset_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(reset)) if reset else "Unknown"
logging.info(f"Skipping author {author['username']} due to rate limit. Remaining: {remaining}, Reset at: {reset_time}")
logging.warning("No authors available due to rate limits. Selecting a random author as fallback.")
return random.choice(AUTHORS)
# If no authors are available, return None instead of falling back
logging.warning("No authors available due to rate limits. Skipping posting.")
return None
def prepare_post_data(summary, title, main_topic=None):
try:

@ -11,3 +11,4 @@ webdriver-manager==4.0.2
tweepy==4.14.0
python-dotenv==1.0.1
flickr-api==0.7.1
filelock==3.16.1
Loading…
Cancel
Save