Shane 7 months ago
parent ae194b502f
commit 82f4a1d8b1
  1. 3
      foodie_automator_google.py
  2. 3
      foodie_automator_reddit.py
  3. 87
      foodie_automator_rss.py
  4. 12
      foodie_engagement_tweet.py
  5. 321
      foodie_utils.py
  6. 39
      foodie_weekly_thread.py
  7. 2
      foodie_x_poster.py

@ -28,8 +28,7 @@ from foodie_utils import (
is_interesting, generate_title_from_summary, summarize_with_gpt4o, is_interesting, generate_title_from_summary, summarize_with_gpt4o,
generate_category_from_summary, post_to_wp, prepare_post_data, generate_category_from_summary, post_to_wp, prepare_post_data,
select_best_author, smart_image_and_filter, get_flickr_image, select_best_author, smart_image_and_filter, get_flickr_image,
get_next_author_round_robin, fetch_duckduckgo_news_context, get_next_author_round_robin, check_author_rate_limit
check_author_rate_limit
) )
from foodie_hooks import get_dynamic_hook, get_viral_share_prompt from foodie_hooks import get_dynamic_hook, get_viral_share_prompt
from dotenv import load_dotenv from dotenv import load_dotenv

@ -28,8 +28,7 @@ from foodie_utils import (
is_interesting, generate_title_from_summary, summarize_with_gpt4o, is_interesting, generate_title_from_summary, summarize_with_gpt4o,
generate_category_from_summary, post_to_wp, prepare_post_data, generate_category_from_summary, post_to_wp, prepare_post_data,
select_best_author, smart_image_and_filter, get_flickr_image, select_best_author, smart_image_and_filter, get_flickr_image,
get_next_author_round_robin, fetch_duckduckgo_news_context, get_next_author_round_robin, check_author_rate_limit
check_author_rate_limit
) )
from foodie_hooks import get_dynamic_hook, get_viral_share_prompt from foodie_hooks import get_dynamic_hook, get_viral_share_prompt
import fcntl import fcntl

@ -253,7 +253,7 @@ def fetch_duckduckgo_news_context(title, hours=24):
logging.error(f"Failed to fetch DuckDuckGo News context for '{title}' after {MAX_RETRIES} attempts") logging.error(f"Failed to fetch DuckDuckGo News context for '{title}' after {MAX_RETRIES} attempts")
return title return title
def curate_from_rss(posted_titles_data, posted_titles, used_images_data, used_images): def curate_from_rss():
try: try:
logging.debug(f"Using {len(posted_titles)} posted titles and {len(used_images)} used images") logging.debug(f"Using {len(posted_titles)} posted titles and {len(used_images)} used images")
@ -278,13 +278,12 @@ def curate_from_rss(posted_titles_data, posted_titles, used_images_data, used_im
attempts += 1 attempts += 1
continue continue
# Check author availability before GPT calls # Select author
author = get_next_author_round_robin() author = get_next_author_round_robin()
if not author: if not author:
logging.info(f"Skipping article '{title}' due to tweet rate limits for all authors") logging.info(f"Skipping article '{title}' due to tweet rate limits for all authors")
attempts += 1 attempts += 1
continue continue
author_username = author["username"] author_username = author["username"]
logging.info(f"Selected author via round-robin: {author_username}") logging.info(f"Selected author via round-robin: {author_username}")
@ -362,7 +361,9 @@ def curate_from_rss(posted_titles_data, posted_titles, used_images_data, used_im
f'<a href="https://x.com/intent/tweet?url={{post_url}}&text={share_text_encoded}" target="_blank"><i class="tsi tsi-twitter"></i></a> ' f'<a href="https://x.com/intent/tweet?url={{post_url}}&text={share_text_encoded}" target="_blank"><i class="tsi tsi-twitter"></i></a> '
f'<a href="https://www.facebook.com/sharer/sharer.php?u={{post_url}}" target="_blank"><i class="tsi tsi-facebook"></i></a></p>' f'<a href="https://www.facebook.com/sharer/sharer.php?u={{post_url}}" target="_blank"><i class="tsi tsi-facebook"></i></a></p>'
) )
post_data["content"] = f"{final_summary}\n\n{share_links_template}"
# Embed placeholder share links; update after getting post_url
post_data["content"] = f"{final_summary}\n\n{share_links_template.format(post_url='{post_url}', share_text=share_text_encoded)}"
global is_posting global is_posting
is_posting = True is_posting = True
@ -382,49 +383,57 @@ def curate_from_rss(posted_titles_data, posted_titles, used_images_data, used_im
) )
if not post_id: if not post_id:
logging.warning(f"Failed to post to WordPress for '{title}', using original URL: {original_url}") logging.warning(f"Failed to post to WordPress for '{title}', using original URL: {original_url}")
post_url = original_url # Fallback to original article URL post_url = original_url
else: else:
logging.info(f"Posted to WordPress for {author_username}: {post_url}") logging.info(f"Posted to WordPress for {author_username}: {post_url}")
# Update post with actual post_url # Update content with actual post_url
post_url_encoded = quote(post_url) post_url_encoded = quote(post_url)
share_links = share_links_template.format(post_url=post_url_encoded) post_data["content"] = f"{final_summary}\n\n{share_links_template.format(post_url=post_url_encoded, share_text=share_text_encoded)}"
post_data["content"] = f"{final_summary}\n\n{share_links}" if post_id:
post_data["post_id"] = post_id post_to_wp(
if post_id: post_data=post_data,
post_to_wp( category=category,
post_data=post_data, link=link,
category=category, author=author,
link=link, image_url=None, # Skip image re-upload
author=author, original_source=original_source,
image_url=None, # Skip image re-upload image_source=image_source,
original_source=original_source, uploader=uploader,
image_source=image_source, page_url=page_url,
uploader=uploader, interest_score=interest_score,
page_url=page_url, post_id=post_id,
interest_score=interest_score, should_post_tweet=False
post_id=post_id, )
should_post_tweet=False
) timestamp = datetime.now(timezone.utc).isoformat()
save_json_file(POSTED_TITLES_FILE, title, timestamp)
posted_titles.add(title)
logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE}")
if image_url:
save_json_file(USED_IMAGES_FILE, image_url, timestamp)
used_images.add(image_url)
logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE}")
logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id or 'N/A'}) from RSS *****")
return post_data, category, random.randint(0, 1800)
except Exception as e: except Exception as e:
logging.error(f"Failed to post to WordPress for '{title}': {e}", exc_info=True) logging.error(f"Failed to post to WordPress for '{title}': {e}", exc_info=True)
post_url = original_url # Fallback to original article URL post_url = original_url
timestamp = datetime.now(timezone.utc).isoformat()
save_json_file(POSTED_TITLES_FILE, title, timestamp)
posted_titles.add(title)
logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE}")
if image_url:
save_json_file(USED_IMAGES_FILE, image_url, timestamp)
used_images.add(image_url)
logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE}")
attempts += 1
finally: finally:
is_posting = False is_posting = False
timestamp = datetime.now(timezone.utc).isoformat()
save_json_file(POSTED_TITLES_FILE, title, timestamp)
posted_titles.add(title)
logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE}")
if image_url:
save_json_file(USED_IMAGES_FILE, image_url, timestamp)
used_images.add(image_url)
logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE}")
logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id or 'N/A'}) from RSS *****")
return post_data, category, random.randint(0, 1800)
logging.info("No interesting RSS article found after attempts") logging.info("No interesting RSS article found after attempts")
return None, None, random.randint(600, 1800) return None, None, random.randint(600, 1800)
except Exception as e: except Exception as e:

@ -8,7 +8,7 @@ import fcntl
import os import os
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from openai import OpenAI from openai import OpenAI
from foodie_utils import post_tweet, AUTHORS, SUMMARY_MODEL, check_author_rate_limit from foodie_utils import post_tweet, AUTHORS, SUMMARY_MODEL, check_author_rate_limit, load_json_file
from foodie_config import X_API_CREDENTIALS, AUTHOR_BACKGROUNDS_FILE from foodie_config import X_API_CREDENTIALS, AUTHOR_BACKGROUNDS_FILE
from dotenv import load_dotenv from dotenv import load_dotenv
@ -162,13 +162,9 @@ def post_engagement_tweet():
for author in AUTHORS: for author in AUTHORS:
# Check if the author can post before generating the tweet # Check if the author can post before generating the tweet
if check_author_rate_limit(author): can_post, remaining, reset = check_author_rate_limit(author)
reset_time = datetime.fromtimestamp( if not can_post:
load_json_file('/home/shane/foodie_automator/rate_limit_info.json', default={}) reset_time = datetime.fromtimestamp(reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') if reset else "Unknown"
.get(author['username'], {})
.get('tweet_reset', time.time()),
tz=timezone.utc
).strftime('%Y-%m-%d %H:%M:%S')
logging.info(f"Skipping engagement tweet for {author['username']} due to rate limit. Reset at: {reset_time}") logging.info(f"Skipping engagement tweet for {author['username']} due to rate limit. Reset at: {reset_time}")
continue continue

@ -163,34 +163,32 @@ def generate_article_tweet(author, post, persona):
def post_tweet(author, tweet, reply_to_id=None): def post_tweet(author, tweet, reply_to_id=None):
""" """
Post a tweet with real-time X API rate limit checking. Post a tweet after checking real-time X API rate limits.
Updates rate_limit_info.json with tweet-specific limits. Updates rate_limit_info.json with API-provided data.
""" """
from foodie_config import X_API_CREDENTIALS from foodie_config import X_API_CREDENTIALS
import logging
import tweepy import tweepy
logger = logging.getLogger(__name__)
credentials = X_API_CREDENTIALS.get(author["username"]) credentials = X_API_CREDENTIALS.get(author["username"])
if not credentials: if not credentials:
logging.error(f"No X credentials found for {author['username']}") logger.error(f"No X credentials found for {author['username']}")
return False return False
logging.debug(f"Attempting to post tweet for {author['username']} (handle: {credentials['x_username']})") # Check rate limit before posting
logging.debug(f"Credentials: api_key={credentials['api_key'][:4]}..., access_token={credentials['access_token'][:4]}...") if check_author_rate_limit(author):
logging.debug(f"Tweet content: {tweet}") logger.error(f"Cannot post tweet for {author['username']}: Rate limit exceeded")
return False
logger.debug(f"Attempting to post tweet for {author['username']} (handle: {credentials['x_username']})")
logger.debug(f"Tweet content: {tweet}")
if reply_to_id: if reply_to_id:
logging.debug(f"Replying to tweet ID: {reply_to_id}") logger.debug(f"Replying to tweet ID: {reply_to_id}")
rate_limit_file = '/home/shane/foodie_automator/rate_limit_info.json' rate_limit_file = '/home/shane/foodie_automator/rate_limit_info.json'
rate_limit_info = load_json_file(rate_limit_file, default={}) rate_limit_info = load_json_file(rate_limit_file, default={})
username = author["username"] username = author["username"]
if username not in rate_limit_info:
rate_limit_info[username] = {
'tweet_remaining': 17,
'tweet_reset': time.time()
}
try: try:
client = tweepy.Client( client = tweepy.Client(
consumer_key=credentials["api_key"], consumer_key=credentials["api_key"],
@ -203,34 +201,38 @@ def post_tweet(author, tweet, reply_to_id=None):
in_reply_to_tweet_id=reply_to_id in_reply_to_tweet_id=reply_to_id
) )
tweet_id = response.data['id'] tweet_id = response.data['id']
logging.info(f"Successfully posted tweet {tweet_id} for {author['username']} (handle: {credentials['x_username']}): {tweet}") logger.info(f"Successfully posted tweet {tweet_id} for {author['username']} (handle: {credentials['x_username']}): {tweet}")
# Update rate limit info with fresh API data
remaining, reset = get_x_rate_limit_status(author)
if remaining is not None and reset is not None:
rate_limit_info[username] = {
'tweet_remaining': max(0, remaining - 1), # Account for this tweet
'tweet_reset': reset
}
save_json_file(rate_limit_file, rate_limit_info)
logger.info(f"Updated rate limit for {username}: {rate_limit_info[username]['tweet_remaining']} remaining, reset at {datetime.fromtimestamp(reset, tz=timezone.utc)}")
else:
logger.warning(f"Failed to update rate limit info for {username} after posting")
# Update tweet rate limits (local decrement, headers on 429)
rate_limit_info[username]['tweet_remaining'] = max(0, rate_limit_info[username]['tweet_remaining'] - 1)
save_json_file(rate_limit_file, rate_limit_info)
logging.info(f"Updated tweet rate limit for {username}: {rate_limit_info[username]['tweet_remaining']} remaining, reset at {datetime.fromtimestamp(rate_limit_info[username]['tweet_reset'], tz=timezone.utc)}")
return {"id": tweet_id} return {"id": tweet_id}
except tweepy.TweepyException as e: except tweepy.TweepyException as e:
logging.error(f"Failed to post tweet for {author['username']} (handle: {credentials['x_username']}): {e}") logger.error(f"Failed to post tweet for {author['username']} (handle: {credentials['x_username']}): {e}")
if hasattr(e, 'response') and e.response and e.response.status_code == 429: if hasattr(e, 'response') and e.response and e.response.status_code == 429:
headers = e.response.headers remaining, reset = get_x_rate_limit_status(author)
user_remaining = headers.get('x-user-limit-24hour-remaining', 0) if remaining is None:
user_reset = headers.get('x-user-limit-24hour-reset', time.time() + 86400) remaining = 0
try: reset = time.time() + 86400
user_remaining = int(user_remaining) rate_limit_info[username] = {
user_reset = int(user_reset) 'tweet_remaining': remaining,
except (ValueError, TypeError): 'tweet_reset': reset
user_remaining = 0 }
user_reset = time.time() + 86400
rate_limit_info[username]['tweet_remaining'] = user_remaining
rate_limit_info[username]['tweet_reset'] = user_reset
save_json_file(rate_limit_file, rate_limit_info) save_json_file(rate_limit_file, rate_limit_info)
logging.info(f"Rate limit exceeded for {username}: {user_remaining} remaining, reset at {datetime.fromtimestamp(user_reset, tz=timezone.utc)}") logger.info(f"Rate limit exceeded for {username}: {remaining} remaining, reset at {datetime.fromtimestamp(reset, tz=timezone.utc)}")
return False return False
except Exception as e: except Exception as e:
logging.error(f"Unexpected error posting tweet for {author['username']} (handle: {credentials['x_username']}): {e}", exc_info=True) logger.error(f"Unexpected error posting tweet for {author['username']} (handle: {credentials['x_username']}): {e}", exc_info=True)
return False return False
def select_best_persona(interest_score, content=""): def select_best_persona(interest_score, content=""):
@ -712,13 +714,14 @@ def get_wp_tag_id(tag_name, wp_base_url, wp_username, wp_password):
logging.error(f"Failed to get WP tag ID for '{tag_name}': {e}") logging.error(f"Failed to get WP tag ID for '{tag_name}': {e}")
return None return None
def post_to_wp(post_data, category, link, author, image_url, original_source, image_source, uploader, page_url, interest_score, post_id=None, should_post_tweet=True): def post_to_wp(post_data, category, link, author, image_url, original_source, image_source="Pixabay", uploader=None, page_url=None, interest_score=4, post_id=None, should_post_tweet=True):
""" """
Post or update content to WordPress, optionally tweeting the post. Post or update content to WordPress, optionally tweeting the post.
""" """
import logging import logging
import requests import requests
from foodie_config import X_API_CREDENTIALS # Removed WP_CREDENTIALS import base64
from foodie_config import X_API_CREDENTIALS
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -728,7 +731,7 @@ def post_to_wp(post_data, category, link, author, image_url, original_source, im
wp_password = author.get("password") wp_password = author.get("password")
if not all([wp_url, wp_username, wp_password]): if not all([wp_url, wp_username, wp_password]):
logger.error(f"Missing WordPress credentials for author: {author.get('username', 'unknown')}") logger.error(f"Missing WordPress credentials for author: {wp_username or 'unknown'}")
return None, None return None, None
# Ensure wp_url ends with '/wp-json/wp/v2' # Ensure wp_url ends with '/wp-json/wp/v2'
@ -737,61 +740,129 @@ def post_to_wp(post_data, category, link, author, image_url, original_source, im
else: else:
wp_base_url = wp_url wp_base_url = wp_url
endpoint = f"{wp_base_url}/posts" # Hardcoded author ID map from old working version
if post_id: author_id_map = {
endpoint += f"/{post_id}" "owenjohnson": 10,
"javiermorales": 2,
headers = { "aishapatel": 3,
"Authorization": "Basic " + base64.b64encode(f"{wp_username}:{wp_password}".encode()).decode(), "trangnguyen": 12,
"Content-Type": "application/json" "keishareid": 13,
"lilamoreau": 7
} }
author_id = author_id_map.get(wp_username, 5) # Default to ID 5 if username not found
try:
headers = {
"Authorization": "Basic " + base64.b64encode(f"{wp_username}:{wp_password}".encode()).decode(),
"Content-Type": "application/json"
}
# Test authentication
auth_test = requests.get(f"{wp_base_url}/users/me", headers=headers)
auth_test.raise_for_status()
logger.info(f"Auth test passed for {wp_username}: {auth_test.json()['id']}")
# Get or create category ID # Get or create category ID
category_id = get_wp_category_id(category, wp_base_url, wp_username, wp_password) category_id = get_wp_category_id(category, wp_base_url, wp_username, wp_password)
if not category_id:
category_id = create_wp_category(category, wp_base_url, wp_username, wp_password)
if not category_id: if not category_id:
logger.warning(f"Failed to get or create category '{category}', using default") category_id = create_wp_category(category, wp_base_url, wp_username, wp_password)
category_id = 1 # Fallback to default category if not category_id:
logger.warning(f"Failed to get or create category '{category}', using default")
payload = { category_id = 1 # Fallback to 'Uncategorized'
"title": post_data["title"], else:
"content": post_data["content"], logger.info(f"Created new category '{category}' with ID {category_id}")
"status": post_data["status"], else:
"author": wp_username, # Use username directly logger.info(f"Found existing category '{category}' with ID {category_id}")
"categories": [category_id]
} # Handle tags
tags = [1] # Default tag ID (e.g., 'uncategorized')
if interest_score >= 9:
picks_tag_id = get_wp_tag_id("Picks", wp_base_url, wp_username, wp_password)
if picks_tag_id and picks_tag_id not in tags:
tags.append(picks_tag_id)
logger.info(f"Added 'Picks' tag (ID: {picks_tag_id}) due to high interest score: {interest_score}")
# Format content with <p> tags
content = post_data["content"]
if content is None:
logger.error(f"Post content is None for title '{post_data['title']}' - using fallback")
content = "Content unavailable. Check the original source for details."
formatted_content = "\n".join(f"<p>{para}</p>" for para in content.split('\n') if para.strip())
# Upload image before posting
image_id = None
if image_url:
logger.info(f"Attempting image upload for '{post_data['title']}', URL: {image_url}, source: {image_source}")
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, page_url)
if not image_id:
logger.info(f"Flickr upload failed for '{post_data['title']}', falling back to Pixabay")
pixabay_query = post_data["title"][:50]
image_url, image_source, uploader, page_url = get_image(pixabay_query)
if image_url:
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, page_url)
if not image_id:
logger.warning(f"All image uploads failed for '{post_data['title']}' - posting without image")
# Build payload
payload = {
"title": post_data["title"],
"content": formatted_content,
"status": post_data["status"],
"categories": [category_id],
"tags": tags,
"author": author_id,
"meta": {
"original_link": link,
"original_source": original_source,
"interest_score": interest_score
}
}
if image_id:
payload["featured_media"] = image_id
logger.info(f"Set featured image for post '{post_data['title']}': Media ID={image_id}")
try: # Set endpoint for creating or updating post
endpoint = f"{wp_base_url}/posts/{post_id}" if post_id else f"{wp_base_url}/posts"
logger.debug(f"Sending POST to {endpoint} with payload: {json.dumps(payload, indent=2)}")
response = requests.post(endpoint, headers=headers, json=payload) response = requests.post(endpoint, headers=headers, json=payload)
if response.status_code != 201 and response.status_code != 200:
logger.error(f"WordPress API error: {response.status_code} - {response.text}")
response.raise_for_status() response.raise_for_status()
post_id = response.json().get("id")
post_url = response.json().get("link") post_info = response.json()
if not isinstance(post_info, dict) or "id" not in post_info:
raise ValueError(f"Invalid WP response: {post_info}")
post_id = post_info["id"]
post_url = post_info["link"]
logger.info(f"{'Updated' if post_id else 'Posted'} WordPress post: {post_data['title']} (ID: {post_id})") logger.info(f"{'Updated' if post_id else 'Posted'} WordPress post: {post_data['title']} (ID: {post_id})")
if image_url and not post_id: # Only upload image for new posts # Save to recent posts
media_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, page_url) timestamp = datetime.now(timezone.utc).isoformat()
if media_id: save_post_to_recent(post_data["title"], post_url, wp_username, timestamp)
requests.post(
f"{wp_base_url}/posts/{post_id}", # Post tweet if enabled
headers=headers, if should_post_tweet:
json={"featured_media": media_id}
)
logger.info(f"Set featured image (Media ID: {media_id}) for post {post_id}")
if should_post_tweet and post_url:
credentials = X_API_CREDENTIALS.get(post_data["author"]) credentials = X_API_CREDENTIALS.get(post_data["author"])
if credentials: if credentials:
tweet_text = f"{post_data['title']}\n{post_url}" tweet_text = f"{post_data['title']}\n{post_url}"
if post_tweet(author, tweet_text): # Updated signature if post_tweet(author, tweet_text):
logger.info(f"Successfully tweeted for post: {post_data['title']}") logger.info(f"Successfully tweeted for post: {post_data['title']}")
else: else:
logger.warning(f"Failed to tweet for post: {post_data['title']}") logger.warning(f"Failed to tweet for post: {post_data['title']}")
return post_id, post_url return post_id, post_url
except requests.exceptions.HTTPError as e:
logger.error(f"Failed to {'update' if post_id else 'post'} WordPress post: {post_data['title']}: {e} - Response: {e.response.text}", exc_info=True)
return None, None
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"Failed to {'update' if post_id else 'post'} WordPress post: {post_data['title']}: {e}", exc_info=True) logger.error(f"Failed to {'update' if post_id else 'post'} WordPress post: {post_data['title']}: {e}", exc_info=True)
return None, None return None, None
except Exception as e:
logger.error(f"Failed to {'update' if post_id else 'post'} WordPress post: {post_data['title']}: {e}", exc_info=True)
return None, None
# Configure Flickr API with credentials # Configure Flickr API with credentials
flickr_api.set_keys(api_key=FLICKR_API_KEY, api_secret=FLICKR_API_SECRET) flickr_api.set_keys(api_key=FLICKR_API_KEY, api_secret=FLICKR_API_SECRET)
@ -1102,41 +1173,52 @@ def check_rate_limit(response):
def check_author_rate_limit(author, max_tweets=17, tweet_window_seconds=86400): def check_author_rate_limit(author, max_tweets=17, tweet_window_seconds=86400):
""" """
Check if an author is rate-limited for tweets based on X API limits. Check if an author is rate-limited for tweets using real-time X API data.
Returns (can_post, remaining, reset_timestamp) where can_post is False if rate-limited.
Caches API results in memory for the current script run.
""" """
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
rate_limit_file = '/home/shane/foodie_automator/rate_limit_info.json' rate_limit_file = '/home/shane/foodie_automator/rate_limit_info.json'
rate_limit_info = load_json_file(rate_limit_file, default={})
username = author['username']
if username not in rate_limit_info or not isinstance(rate_limit_info[username].get('tweet_reset'), (int, float)):
rate_limit_info[username] = {
'tweet_remaining': max_tweets,
'tweet_reset': time.time()
}
logger.info(f"Initialized tweet rate limit for {username}: {max_tweets} tweets available")
info = rate_limit_info[username]
current_time = time.time() current_time = time.time()
# Reset tweet limits if window expired or invalid # In-memory cache for rate limit status (reset per script run)
if current_time >= info.get('tweet_reset', 0) or info.get('tweet_reset', 0) < 1000000000: if not hasattr(check_author_rate_limit, "cache"):
info['tweet_remaining'] = max_tweets check_author_rate_limit.cache = {}
info['tweet_reset'] = current_time + tweet_window_seconds
logger.info(f"Reset tweet rate limit for {username}: {max_tweets} tweets available")
save_json_file(rate_limit_file, rate_limit_info)
if info.get('tweet_remaining', 0) <= 0: username = author['username']
reset_time = datetime.fromtimestamp(info['tweet_reset'], tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') cache_key = f"{username}_{int(current_time // 60)}" # Cache for 1 minute
logger.info(f"Author {username} is tweet rate-limited. Remaining: {info['tweet_remaining']}, Reset at: {reset_time}")
return True
logger.info(f"Tweet rate limit for {username}: {info['tweet_remaining']} tweets remaining") if cache_key in check_author_rate_limit.cache:
return False remaining, reset = check_author_rate_limit.cache[cache_key]
logger.debug(f"Using cached rate limit for {username}: {remaining} remaining, reset at {datetime.fromtimestamp(reset, tz=timezone.utc)}")
else:
remaining, reset = get_x_rate_limit_status(author)
if remaining is None or reset is None:
# Fallback: Load from rate_limit_info.json or assume rate-limited
rate_limit_info = load_json_file(rate_limit_file, default={})
if username not in rate_limit_info or current_time >= rate_limit_info.get(username, {}).get('tweet_reset', 0):
rate_limit_info[username] = {
'tweet_remaining': 0, # Conservative assumption
'tweet_reset': current_time + tweet_window_seconds
}
save_json_file(rate_limit_file, rate_limit_info)
remaining = rate_limit_info[username].get('tweet_remaining', 0)
reset = rate_limit_info[username].get('tweet_reset', current_time + tweet_window_seconds)
logger.warning(f"X API rate limit check failed for {username}, using fallback: {remaining} remaining")
check_author_rate_limit.cache[cache_key] = (remaining, reset)
can_post = remaining > 0
if not can_post:
reset_time = datetime.fromtimestamp(reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"Author {username} is rate-limited. Remaining: {remaining}, Reset at: {reset_time}")
else:
logger.info(f"Rate limit for {username}: {remaining}/{max_tweets} tweets remaining")
return can_post, remaining, reset
def get_next_author_round_robin(): def get_next_author_round_robin():
""" """
Select the next author using round-robin, respecting tweet rate limits. Select the next author using round-robin, respecting real-time X API rate limits.
Returns None if no author is available. Returns None if no author is available.
""" """
from foodie_config import AUTHORS from foodie_config import AUTHORS
@ -1154,6 +1236,49 @@ def get_next_author_round_robin():
logger.warning("No authors available due to tweet rate limits.") logger.warning("No authors available due to tweet rate limits.")
return None return None
def get_x_rate_limit_status(author):
"""
Query X API for the user's tweet rate limit status.
Returns (remaining, reset_timestamp) or (None, None) if the query fails.
"""
from foodie_config import X_API_CREDENTIALS
import tweepy
logger = logging.getLogger(__name__)
credentials = X_API_CREDENTIALS.get(author["username"])
if not credentials:
logger.error(f"No X credentials for {author['username']}")
return None, None
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
# Tweepy v2 doesn't directly expose rate limit status, so use API v1.1 for rate limit check
api = tweepy.API(
tweepy.OAuth1UserHandler(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
)
rate_limits = api.rate_limit_status()
tweet_limits = rate_limits["resources"]["statuses"]["/statuses/update"]
remaining = tweet_limits["remaining"]
reset = tweet_limits["reset"]
logger.info(f"X API rate limit for {author['username']}: {remaining} remaining, reset at {datetime.fromtimestamp(reset, tz=timezone.utc)}")
return remaining, reset
except tweepy.TweepyException as e:
logger.error(f"Failed to fetch X rate limit for {author['username']}: {e}")
return None, None
except Exception as e:
logger.error(f"Unexpected error fetching X rate limit for {author['username']}: {e}", exc_info=True)
return None, None
def prepare_post_data(summary, title, main_topic=None): def prepare_post_data(summary, title, main_topic=None):
try: try:
logging.info(f"Preparing post data for summary: {summary[:100]}...") logging.info(f"Preparing post data for summary: {summary[:100]}...")

@ -99,28 +99,23 @@ def validate_twitter_credentials():
logging.info("Validating Twitter API credentials for all authors") logging.info("Validating Twitter API credentials for all authors")
valid_credentials = [] valid_credentials = []
for author in AUTHORS: for author in AUTHORS:
credentials = X_API_CREDENTIALS.get(author["username"])
if not credentials:
logging.error(f"No X credentials found for {author['username']} in X_API_CREDENTIALS")
continue
for attempt in range(MAX_RETRIES): for attempt in range(MAX_RETRIES):
try: try:
twitter_client = tweepy.Client( remaining, reset = get_x_rate_limit_status(author)
consumer_key=credentials["api_key"], if remaining is not None and reset is not None:
consumer_secret=credentials["api_secret"], logging.info(f"Credentials valid for {author['username']} (handle: {X_API_CREDENTIALS[author['username']]['x_username']})")
access_token=credentials["access_token"], valid_credentials.append(X_API_CREDENTIALS[author['username']])
access_token_secret=credentials["access_token_secret"] break
) else:
user = twitter_client.get_me() logging.error(f"Rate limit check failed for {author['username']} (attempt {attempt + 1})")
logging.info(f"Credentials valid for {author['username']} (handle: {credentials['x_username']})") if attempt < MAX_RETRIES - 1:
valid_credentials.append(credentials) time.sleep(RETRY_BACKOFF * (2 ** attempt))
break except Exception as e:
except tweepy.TweepyException as e:
logging.error(f"Failed to validate credentials for {author['username']} (attempt {attempt + 1}): {e}") logging.error(f"Failed to validate credentials for {author['username']} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1: if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt)) time.sleep(RETRY_BACKOFF * (2 ** attempt))
else: else:
logging.error(f"Credentials invalid for {author['username']} after {MAX_RETRIES} attempts") logging.error(f"Credentials invalid for {author['username']} after {MAX_RETRIES} attempts")
if not valid_credentials: if not valid_credentials:
logging.error("No valid Twitter credentials found for any author") logging.error("No valid Twitter credentials found for any author")
raise ValueError("No valid Twitter credentials found") raise ValueError("No valid Twitter credentials found")
@ -319,13 +314,9 @@ def post_weekly_thread():
continue continue
# Check if the author can post before generating the thread # Check if the author can post before generating the thread
if check_author_rate_limit(author): can_post, remaining, reset = check_author_rate_limit(author)
reset_time = datetime.fromtimestamp( if not can_post:
load_json_file('/home/shane/foodie_automator/rate_limit_info.json', default={}) reset_time = datetime.fromtimestamp(reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') if reset else "Unknown"
.get(username, {})
.get('tweet_reset', time.time()),
tz=timezone.utc
).strftime('%Y-%m-%d %H:%M:%S')
logging.info(f"Skipping weekly thread for {username} due to rate limit. Reset at: {reset_time}") logging.info(f"Skipping weekly thread for {username} due to rate limit. Reset at: {reset_time}")
continue continue

@ -102,7 +102,7 @@ def main():
# Check if the author can post before generating the tweet # Check if the author can post before generating the tweet
can_post, remaining, reset = check_author_rate_limit(author) can_post, remaining, reset = check_author_rate_limit(author)
if not can_post: if not can_post:
reset_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(reset)) if reset else "Unknown" reset_time = datetime.fromtimestamp(reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') if reset else "Unknown"
logging.info(f"Skipping engagement tweet for {author['username']} due to rate limit. Remaining: {remaining}, Reset at: {reset_time}") logging.info(f"Skipping engagement tweet for {author['username']} due to rate limit. Remaining: {remaining}, Reset at: {reset_time}")
continue continue

Loading…
Cancel
Save