add max tweet to author limit

main
Shane 7 months ago
parent 4adaa3442c
commit 2158c780ca
  1. 49
      foodie_automator_google.py
  2. 49
      foodie_automator_reddit.py
  3. 65
      foodie_automator_rss.py
  4. 105
      foodie_engagement_tweet.py
  5. 52
      foodie_utils.py
  6. 57
      foodie_weekly_thread.py

@ -314,36 +314,27 @@ def curate_from_google_trends(geo_list=['US']):
final_summary = insert_link_naturally(final_summary, source_name, link)
# Balanced author selection
x_post_counts = load_json_file('/home/shane/foodie_automator/x_post_counts.json', expiration_hours=24*30)
monthly_counts = {entry['username']: entry['monthly_count'] for entry in x_post_counts}
low_post_authors = [u for u, c in monthly_counts.items() if c < 3]
if low_post_authors:
author_username = random.choice(low_post_authors)
author = next(a for a in AUTHORS if a['username'] == author_username)
logging.info(f"Prioritizing low-post author: {author_username}")
post_data = {
"title": generate_title_from_summary(final_summary),
"content": final_summary,
"status": "publish",
"author": author_username,
"categories": [generate_category_from_summary(final_summary)]
}
category = post_data["categories"][0]
image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords, main_topic)
# Use round-robin author selection
author = get_next_author_round_robin()
author_username = author["username"]
logging.info(f"Selected author via round-robin: {author_username}")
post_data = {
"title": generate_title_from_summary(final_summary),
"content": final_summary,
"status": "publish",
"author": author_username,
"categories": [generate_category_from_summary(final_summary)]
}
category = post_data["categories"][0]
image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords, main_topic)
if not image_url:
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
logging.warning(f"All image uploads failed for '{title}' - posting without image")
image_source = None
uploader = None
page_url = None
else:
post_data, author, category, image_url, image_source, uploader, page_url = prepare_post_data(final_summary, title, main_topic)
if not post_data:
attempts += 1
continue
logging.warning(f"All image uploads failed for '{title}' - posting without image")
image_source = None
uploader = None
page_url = None
hook = get_dynamic_hook(post_data["title"]).strip()

@ -346,36 +346,27 @@ def curate_from_reddit():
final_summary = insert_link_naturally(final_summary, source_name, link)
# Balanced author selection
x_post_counts = load_json_file('/home/shane/foodie_automator/x_post_counts.json', expiration_hours=24*30)
monthly_counts = {entry['username']: entry['monthly_count'] for entry in x_post_counts}
low_post_authors = [u for u, c in monthly_counts.items() if c < 3]
if low_post_authors:
author_username = random.choice(low_post_authors)
author = next(a for a in AUTHORS if a['username'] == author_username)
logging.info(f"Prioritizing low-post author: {author_username}")
post_data = {
"title": generate_title_from_summary(final_summary),
"content": final_summary,
"status": "publish",
"author": author_username,
"categories": [generate_category_from_summary(final_summary)]
}
category = post_data["categories"][0]
image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords, main_topic)
# Use round-robin author selection
author = get_next_author_round_robin()
author_username = author["username"]
logging.info(f"Selected author via round-robin: {author_username}")
post_data = {
"title": generate_title_from_summary(final_summary),
"content": final_summary,
"status": "publish",
"author": author_username,
"categories": [generate_category_from_summary(final_summary)]
}
category = post_data["categories"][0]
image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords, main_topic)
if not image_url:
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
logging.warning(f"All image uploads failed for '{title}' - posting without image")
image_source = None
uploader = None
page_url = None
else:
post_data, author, category, image_url, image_source, uploader, page_url = prepare_post_data(final_summary, title, main_topic)
if not post_data:
attempts += 1
continue
logging.warning(f"All image uploads failed for '{title}' - posting without image")
image_source = None
uploader = None
page_url = None
hook = get_dynamic_hook(post_data["title"]).strip()

@ -27,7 +27,8 @@ from foodie_utils import (
upload_image_to_wp, determine_paragraph_count, insert_link_naturally,
is_interesting, generate_title_from_summary, summarize_with_gpt4o,
generate_category_from_summary, post_to_wp, prepare_post_data,
select_best_author, smart_image_and_filter, get_flickr_image
select_best_author, smart_image_and_filter, get_flickr_image,
get_next_author_round_robin # Add this line
)
from foodie_hooks import get_dynamic_hook, get_viral_share_prompt
from dotenv import load_dotenv
@ -335,43 +336,31 @@ def curate_from_rss():
final_summary = insert_link_naturally(final_summary, source_name, link)
# Insert balanced author selection logic here
x_post_counts = load_json_file('/home/shane/foodie_automator/x_post_counts.json', expiration_hours=24*30)
monthly_counts = {entry['username']: entry['monthly_count'] for entry in x_post_counts}
low_post_authors = [u for u, c in monthly_counts.items() if c < 3] # Authors with <3 posts
if low_post_authors:
author_username = random.choice(low_post_authors)
author = next(a for a in AUTHORS if a['username'] == author_username)
logging.info(f"Prioritizing low-post author: {author_username}")
post_data = {
"title": generate_title_from_summary(final_summary),
"content": final_summary,
"status": "publish",
"author": author_username,
"categories": [generate_category_from_summary(final_summary)]
}
category = post_data["categories"][0]
image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords, main_topic)
# Use round-robin author selection
author = get_next_author_round_robin()
author_username = author["username"]
logging.info(f"Selected author via round-robin: {author_username}")
post_data = {
"title": generate_title_from_summary(final_summary),
"content": final_summary,
"status": "publish",
"author": author_username,
"categories": [generate_category_from_summary(final_summary)]
}
category = post_data["categories"][0]
image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords, main_topic)
if not image_url:
print(f"Flickr image fetch failed for '{image_query}', trying fallback")
logging.warning(f"Flickr image fetch failed for '{image_query}', trying fallback")
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
print(f"Flickr image fetch failed for '{image_query}', trying fallback")
logging.warning(f"Flickr image fetch failed for '{image_query}', trying fallback")
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
print(f"All image uploads failed for '{title}' - posting without image")
logging.warning(f"All image uploads failed for '{title}' - posting without image")
image_source = None
uploader = None
page_url = None
else:
post_data, author, category, image_url, image_source, uploader, page_url = prepare_post_data(final_summary, title, main_topic)
if not post_data:
print(f"Post data preparation failed for '{title}'")
logging.info(f"Post data preparation failed for '{title}'")
attempts += 1
continue
print(f"All image uploads failed for '{title}' - posting without image")
logging.warning(f"All image uploads failed for '{title}' - posting without image")
image_source = None
uploader = None
page_url = None
# ... (rest of the function: image fetching, posting logic, etc.)
hook = get_dynamic_hook(post_data["title"]).strip()
share_prompt = get_viral_share_prompt(post_data["title"], final_summary)
@ -462,10 +451,6 @@ def curate_from_rss():
print("No interesting RSS article found after attempts")
logging.info("No interesting RSS article found after attempts")
return None, None, random.randint(600, 1800)
except Exception as e:
print(f"Unexpected error in curate_from_rss: {e}")
logging.error(f"Unexpected error in curate_from_rss: {e}", exc_info=True)
return None, None, random.randint(600, 1800)
def run_rss_automator():
lock_fd = None

@ -16,7 +16,6 @@ load_dotenv()
LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_engagement_tweet.lock"
LOG_FILE = "/home/shane/foodie_automator/logs/foodie_engagement_tweet.log"
REFERENCE_DATE_FILE = "/home/shane/foodie_automator/engagement_reference_date.json"
LOG_PRUNE_DAYS = 30
MAX_RETRIES = 3
RETRY_BACKOFF = 2
@ -101,29 +100,6 @@ except Exception as e:
logging.error(f"Failed to load author_backgrounds.json: {e}", exc_info=True)
sys.exit(1)
def get_reference_date():
"""Load or initialize the reference date for the 2-day interval."""
os.makedirs(os.path.dirname(REFERENCE_DATE_FILE), exist_ok=True)
if os.path.exists(REFERENCE_DATE_FILE):
try:
with open(REFERENCE_DATE_FILE, 'r') as f:
data = json.load(f)
reference_date = datetime.fromisoformat(data["reference_date"]).replace(tzinfo=timezone.utc)
logging.info(f"Loaded reference date: {reference_date.date()}")
return reference_date
except (json.JSONDecodeError, KeyError, ValueError) as e:
logging.error(f"Failed to load reference date from {REFERENCE_DATE_FILE}: {e}. Initializing new date.")
# Initialize with current date (start of day)
reference_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
try:
with open(REFERENCE_DATE_FILE, 'w') as f:
json.dump({"reference_date": reference_date.isoformat()}, f)
logging.info(f"Initialized reference date: {reference_date.date()}")
except Exception as e:
logging.error(f"Failed to save reference date to {REFERENCE_DATE_FILE}: {e}. Using current date.")
return reference_date
def generate_engagement_tweet(author):
"""Generate an engagement tweet using author background themes."""
credentials = X_API_CREDENTIALS.get(author["username"])
@ -180,61 +156,46 @@ def generate_engagement_tweet(author):
return template
def post_engagement_tweet():
"""Post engagement tweets for authors every 2 days."""
"""Post engagement tweets for authors daily."""
try:
logging.info("Starting foodie_engagement_tweet.py")
print("Starting foodie_engagement_tweet.py")
# Get reference date
reference_date = get_reference_date()
current_date = datetime.now(timezone.utc)
days_since_reference = (current_date - reference_date).days
logging.info(f"Days since reference date ({reference_date.date()}): {days_since_reference}")
print(f"Days since reference date ({reference_date.date()}): {days_since_reference}")
# Post only if the number of days since the reference date is divisible by 2
if days_since_reference % 2 == 0:
logging.info("Today is an engagement tweet day (every 2 days). Posting...")
print("Today is an engagement tweet day (every 2 days). Posting...")
# Load post counts to check limits
post_counts = load_post_counts()
# Load post counts to check limits
post_counts = load_post_counts()
for author in AUTHORS:
try:
# Check post limits
author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None)
if not author_count:
logging.error(f"No post count entry for {author['username']}, skipping")
continue
if author_count["monthly_count"] >= 500:
logging.warning(f"Monthly post limit (500) reached for {author['username']}, skipping")
continue
if author_count["daily_count"] >= 20:
logging.warning(f"Daily post limit (20) reached for {author['username']}, skipping")
continue
tweet = generate_engagement_tweet(author)
if not tweet:
logging.error(f"Failed to generate engagement tweet for {author['username']}, skipping")
continue
for author in AUTHORS:
try:
# Check post limits
author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None)
if not author_count:
logging.error(f"No post count entry for {author['username']}, skipping")
continue
if author_count["monthly_count"] >= 500:
logging.warning(f"Monthly post limit (500) reached for {author['username']}, skipping")
continue
if author_count["daily_count"] >= 15:
logging.warning(f"Daily post limit (15) reached for {author['username']}, skipping")
continue
logging.info(f"Posting engagement tweet for {author['username']}: {tweet}")
print(f"Posting engagement tweet for {author['username']}: {tweet}")
if post_tweet(author, tweet):
logging.info(f"Successfully posted engagement tweet for {author['username']}")
# Update post counts
author_count["monthly_count"] += 1
author_count["daily_count"] += 1
save_post_counts(post_counts)
else:
logging.warning(f"Failed to post engagement tweet for {author['username']}")
except Exception as e:
logging.error(f"Error posting engagement tweet for {author['username']}: {e}", exc_info=True)
tweet = generate_engagement_tweet(author)
if not tweet:
logging.error(f"Failed to generate engagement tweet for {author['username']}, skipping")
continue
else:
logging.info(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...")
print(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...")
logging.info(f"Posting engagement tweet for {author['username']}: {tweet}")
print(f"Posting engagement tweet for {author['username']}: {tweet}")
if post_tweet(author, tweet):
logging.info(f"Successfully posted engagement tweet for {author['username']}")
# Update post counts
author_count["monthly_count"] += 1
author_count["daily_count"] += 1
save_post_counts(post_counts)
else:
logging.warning(f"Failed to post engagement tweet for {author['username']}")
except Exception as e:
logging.error(f"Error posting engagement tweet for {author['username']}: {e}", exc_info=True)
continue
logging.info("Completed foodie_engagement_tweet.py")
print("Completed foodie_engagement_tweet.py")

@ -217,8 +217,8 @@ def post_tweet(author, tweet, reply_to_id=None):
if author_count["monthly_count"] >= 500:
logging.warning(f"Monthly post limit (500) reached for {author['username']}")
return False
if author_count["daily_count"] >= 20:
logging.warning(f"Daily post limit (20) reached for {author['username']}")
if author_count["daily_count"] >= 15: # Updated daily limit
logging.warning(f"Daily post limit (15) reached for {author['username']}")
return False
try:
@ -1170,6 +1170,54 @@ def select_best_author(content, interest_score):
logging.error(f"Error in select_best_author: {e}")
return random.choice(list(PERSONA_CONFIGS.keys()))
def get_next_author_round_robin():
"""Select the next author in a round-robin fashion, respecting daily tweet limits."""
last_author_file = "/home/shane/foodie_automator/last_author.json"
authors = [author["username"] for author in AUTHORS]
post_counts = load_post_counts()
# Load the last used author
try:
if os.path.exists(last_author_file):
with open(last_author_file, 'r') as f:
last_data = json.load(f)
last_index = last_data.get("last_index", -1)
else:
last_index = -1
except Exception as e:
logging.warning(f"Failed to load last author from {last_author_file}: {e}. Starting from first author.")
last_index = -1
# Find the next author who hasn't reached the daily limit
start_index = (last_index + 1) % len(authors)
for i in range(len(authors)):
current_index = (start_index + i) % len(authors)
username = authors[current_index]
author_count = next((entry for entry in post_counts if entry["username"] == username), None)
if not author_count:
logging.error(f"No post count entry for {username}, skipping")
continue
if author_count["daily_count"] >= 15: # Updated daily limit
logging.info(f"Author {username} has reached daily limit ({author_count['daily_count']}/15), skipping")
continue
if author_count["monthly_count"] >= 500:
logging.info(f"Author {username} has reached monthly limit ({author_count['monthly_count']}/500), skipping")
continue
# Save the current index as the last used author
try:
with open(last_author_file, 'w') as f:
json.dump({"last_index": current_index}, f)
logging.info(f"Selected author {username} (index {current_index}) in round-robin order")
except Exception as e:
logging.warning(f"Failed to save last author to {last_author_file}: {e}")
# Return the selected author
return next(author for author in AUTHORS if author["username"] == username)
logging.warning("No authors available within daily/monthly limits. Selecting a random author as fallback.")
return random.choice(AUTHORS)
def prepare_post_data(summary, title, main_topic=None):
try:
logging.info(f"Preparing post data for summary: {summary[:100]}...")

@ -270,11 +270,16 @@ def generate_final_cta(author):
return fallback
def post_weekly_thread():
"""Generate and post a weekly thread of top posts for each author."""
"""Generate and post a weekly thread of top posts for each author on Mondays."""
logging.info("Starting foodie_weekly_thread.py")
# Calculate date range: 7 days prior to run date
# Check if today is Monday
today = datetime.now(timezone.utc)
if today.weekday() != 0: # 0 = Monday
logging.info(f"Today is not Monday (weekday: {today.weekday()}), skipping weekly thread")
return
# Calculate date range: 7 days prior to run date
start_date = (today - timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = (today - timedelta(days=1)).replace(hour=23, minute=59, second=59, microsecond=999999)
logging.info(f"Fetching posts from {start_date} to {end_date}")
@ -305,6 +310,9 @@ def post_weekly_thread():
if username in posts_by_author:
posts_by_author[username].append(post)
# Load post counts to check limits
post_counts = load_post_counts()
# Post threads for each author
for author in AUTHORS:
username = author["username"]
@ -313,29 +321,44 @@ def post_weekly_thread():
logging.info(f"No posts found for {username}, skipping")
continue
# Select top 10 posts (or fewer if less than 10)
author_posts = sorted(author_posts, key=lambda x: datetime.fromisoformat(x["timestamp"]), reverse=True)[:10]
# Check daily limit (each thread will use 3 tweets: lead + 2 thread tweets)
author_count = next((entry for entry in post_counts if entry["username"] == username), None)
if not author_count:
logging.error(f"No post count entry for {username}, skipping")
continue
if author_count["daily_count"] >= 15:
logging.warning(f"Daily post limit (15) reached for {username}, skipping")
continue
if author_count["daily_count"] + 3 > 15:
logging.warning(f"Posting thread for {username} would exceed daily limit (current: {author_count['daily_count']}, needed: 3), skipping")
continue
if author_count["monthly_count"] >= 500:
logging.warning(f"Monthly post limit (500) reached for {username}, skipping")
continue
# Select top 2 posts (to fit within 3-tweet limit: lead + 2 posts)
author_posts = sorted(author_posts, key=lambda x: datetime.fromisoformat(x["timestamp"]), reverse=True)[:2]
logging.info(f"Selected {len(author_posts)} posts for {username}")
# Generate and post thread
try:
# Post lead tweet
lead_tweet = (
f"Top foodie finds this week from {author['name']} (@{author['x_username']})! "
f"Check out these {len(author_posts)} posts on InsiderFoodie.com 🍽"
)
lead_response = post_tweet(author, lead_tweet)
intro_tweet = generate_intro_tweet(author)
if not intro_tweet:
logging.error(f"Failed to generate intro tweet for {username}, skipping")
continue
lead_response = post_tweet(author, intro_tweet)
if not lead_response:
logging.error(f"Failed to post lead tweet for {username}, skipping")
continue
lead_tweet_id = lead_response["id"]
logging.info(f"Posted lead tweet for {username}: {lead_tweet}")
logging.info(f"Posted lead tweet for {username}: {intro_tweet}")
# Post thread tweets
# Post thread tweets (up to 2)
for i, post in enumerate(author_posts, 1):
thread_tweet = (
f"{i}. {post['title']} "
f"Read more: {post['url']} #FoodieThread"
f"Read more: {post['url']}"
)
thread_response = post_tweet(author, thread_tweet, reply_to_id=lead_tweet_id)
if thread_response:
@ -344,11 +367,11 @@ def post_weekly_thread():
else:
logging.warning(f"Failed to post thread tweet {i} for {username}")
# Post engagement tweet
engagement_tweet = generate_engagement_tweet(author)
if engagement_tweet:
post_tweet(author, engagement_tweet, reply_to_id=lead_tweet_id)
logging.info(f"Posted engagement tweet for {username}: {engagement_tweet}")
# Post final CTA tweet
final_cta = generate_final_cta(author)
if final_cta:
post_tweet(author, final_cta, reply_to_id=lead_tweet_id)
logging.info(f"Posted final CTA tweet for {username}: {final_cta}")
except Exception as e:
logging.error(f"Error posting thread for {username}: {e}", exc_info=True)
continue

Loading…
Cancel
Save