update realtime rate limit for X

This commit is contained in:
2025-05-08 13:35:41 +10:00
parent 3405572ab0
commit 167506ef30
7 changed files with 222 additions and 145 deletions
+61 -28
View File
@@ -199,7 +199,6 @@ def post_tweet(author, tweet, reply_to_id=None):
from foodie_config import X_API_CREDENTIALS
import logging
import tweepy
from datetime import datetime, timezone
credentials = X_API_CREDENTIALS.get(author["username"])
if not credentials:
@@ -212,15 +211,6 @@ def post_tweet(author, tweet, reply_to_id=None):
if reply_to_id:
logging.debug(f"Replying to tweet ID: {reply_to_id}")
post_counts = load_post_counts()
author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None)
if author_count["monthly_count"] >= 500:
logging.warning(f"Monthly post limit (500) reached for {author['username']}")
return False
if author_count["daily_count"] >= 15: # Updated daily limit
logging.warning(f"Daily post limit (15) reached for {author['username']}")
return False
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
@@ -232,9 +222,6 @@ def post_tweet(author, tweet, reply_to_id=None):
text=tweet,
in_reply_to_tweet_id=reply_to_id
)
author_count["monthly_count"] += 1
author_count["daily_count"] += 1
save_post_counts(post_counts)
logging.info(f"Posted tweet for {author['username']} (handle: {credentials['x_username']}): {tweet}")
logging.debug(f"Tweet ID: {response.data['id']}")
return {"id": response.data["id"]}
@@ -1170,11 +1157,61 @@ def select_best_author(content, interest_score):
logging.error(f"Error in select_best_author: {e}")
return random.choice(list(PERSONA_CONFIGS.keys()))
def check_rate_limit(response):
"""Extract rate limit information from Twitter API response headers."""
try:
remaining = int(response.get('x-rate-limit-remaining', 0))
reset = int(response.get('x-rate-limit-reset', 0))
return remaining, reset
except (ValueError, TypeError) as e:
logging.warning(f"Failed to parse rate limit headers: {e}")
return None, None
def check_author_rate_limit(author):
"""Check the rate limit for a specific author by making a lightweight API call."""
credentials = X_API_CREDENTIALS.get(author["username"])
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return False, None, None
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"],
return_type=dict
)
# Use a lightweight endpoint to check rate limits (e.g., /users/me)
response = client.get_me()
remaining, reset = check_rate_limit(response)
if remaining is None or reset is None:
logging.warning(f"Could not determine rate limit for {author['username']}. Assuming rate limit is not hit.")
return True, None, None
if remaining <= 0:
reset_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(reset))
logging.info(f"Author {author['username']} is rate-limited. Remaining: {remaining}, Reset at: {reset_time}")
return False, remaining, reset
logging.debug(f"Author {author['username']} can post. Remaining: {remaining}, Reset at: {time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(reset))}")
return True, remaining, reset
except tweepy.TweepyException as e:
logging.error(f"Failed to check rate limit for {author['username']}: {e}")
if e.response and e.response.status_code == 429:
remaining, reset = check_rate_limit(e.response)
if remaining is not None and reset is not None:
reset_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(reset))
logging.info(f"Author {author['username']} is rate-limited. Remaining: {remaining}, Reset at: {reset_time}")
return False, remaining, reset
logging.warning(f"Assuming {author['username']} is rate-limited due to error.")
return False, None, None
except Exception as e:
logging.error(f"Unexpected error checking rate limit for {author['username']}: {e}", exc_info=True)
return False, None, None
def get_next_author_round_robin():
"""Select the next author in a round-robin fashion, respecting daily tweet limits."""
"""Select the next author in a round-robin fashion, ensuring they are not rate-limited."""
last_author_file = "/home/shane/foodie_automator/last_author.json"
authors = [author["username"] for author in AUTHORS]
post_counts = load_post_counts()
# Load the last used author
try:
@@ -1188,20 +1225,17 @@ def get_next_author_round_robin():
logging.warning(f"Failed to load last author from {last_author_file}: {e}. Starting from first author.")
last_index = -1
# Find the next author who hasn't reached the daily limit
# Find the next author who is not rate-limited
start_index = (last_index + 1) % len(authors)
for i in range(len(authors)):
current_index = (start_index + i) % len(authors)
username = authors[current_index]
author_count = next((entry for entry in post_counts if entry["username"] == username), None)
if not author_count:
logging.error(f"No post count entry for {username}, skipping")
continue
if author_count["daily_count"] >= 15: # Updated daily limit
logging.info(f"Author {username} has reached daily limit ({author_count['daily_count']}/15), skipping")
continue
if author_count["monthly_count"] >= 500:
logging.info(f"Author {username} has reached monthly limit ({author_count['monthly_count']}/500), skipping")
author = next(author for author in AUTHORS if author["username"] == username)
# Check if the author can post based on rate limits
can_post, remaining, reset = check_author_rate_limit(author)
if not can_post:
logging.info(f"Skipping author {username} due to rate limit.")
continue
# Save the current index as the last used author
@@ -1212,10 +1246,9 @@ def get_next_author_round_robin():
except Exception as e:
logging.warning(f"Failed to save last author to {last_author_file}: {e}")
# Return the selected author
return next(author for author in AUTHORS if author["username"] == username)
return author
logging.warning("No authors available within daily/monthly limits. Selecting a random author as fallback.")
logging.warning("No authors available due to rate limits. Selecting a random author as fallback.")
return random.choice(AUTHORS)
def prepare_post_data(summary, title, main_topic=None):