Initial commit of foodie automator scripts

my-fix-branch
Shane 8 months ago
commit d4f098639e
  1. 294
      foodie_automator_google.py
  2. 330
      foodie_automator_reddit.py
  3. 330
      foodie_automator_rss.py
  4. 162
      foodie_config.py
  5. 44
      foodie_hooks.py
  6. 952
      foodie_utils.py
  7. 8
      requirements.txt

@ -0,0 +1,294 @@
import requests
import random
import time
import logging
import re
import os
import json
from datetime import datetime, timedelta, timezone
from openai import OpenAI
from urllib.parse import quote
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
from duckduckgo_search import DDGS
from foodie_config import (
AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS,
SUMMARY_PERSONA_PROMPTS, CATEGORIES, CTAS, get_clean_source_name
)
from foodie_utils import (
load_json_file, save_json_file, get_image, generate_image_query,
upload_image_to_wp, select_best_persona, determine_paragraph_count, is_interesting,
generate_title_from_summary, summarize_with_gpt4o, generate_category_from_summary, post_to_wp,
prepare_post_data, smart_image_and_filter, insert_link_naturally, get_flickr_image_via_ddg
)
from foodie_hooks import get_dynamic_hook, select_best_cta
logger = logging.getLogger()
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler('/tmp/foodie_automator_google_trends.log', mode='a')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(console_handler)
logging.info("Logging initialized for foodie_automator_google.py")
client = OpenAI(api_key="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")
POSTED_TITLES_FILE = '/home/shane/foodie_automator/posted_google_titles.json'
USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json'
EXPIRATION_HOURS = 24
IMAGE_EXPIRATION_DAYS = 7
posted_titles_data = load_json_file(POSTED_TITLES_FILE, EXPIRATION_HOURS)
posted_titles = set(entry["title"] for entry in posted_titles_data)
used_images = set(entry["title"] for entry in load_json_file(USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS) if "title" in entry)
def parse_search_volume(volume_text):
try:
volume_part = volume_text.split('\n')[0].lower().strip().replace('+', '')
if 'k' in volume_part:
volume = float(volume_part.replace('k', '')) * 1000
elif 'm' in volume_part:
volume = float(volume_part.replace('m', '')) * 1000000
else:
volume = float(volume_part)
return volume
except (ValueError, AttributeError) as e:
logging.warning(f"Could not parse search volume from '{volume_text}': {e}")
return 0
def scrape_google_trends(geo='US'):
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/125.0.0.0 Safari/537.36")
driver = webdriver.Chrome(options=chrome_options)
try:
for attempt in range(3):
try:
time.sleep(random.uniform(2, 5))
url = f"https://trends.google.com/trending?geo={geo}&hours=24&sort=search-volume&category=5"
logging.info(f"Navigating to {url} (attempt {attempt + 1})")
driver.get(url)
logging.info("Waiting for page to load...")
WebDriverWait(driver, 60).until(
EC.presence_of_element_located((By.TAG_NAME, "tbody"))
)
break
except TimeoutException:
logging.warning(f"Timeout on attempt {attempt + 1} for geo={geo}")
if attempt == 2:
logging.error(f"Failed after 3 attempts for geo={geo}")
return []
time.sleep(5)
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
trends = []
rows = driver.find_elements(By.XPATH, "//tbody/tr")
logging.info(f"Found {len(rows)} rows in tbody for geo={geo}")
cutoff_date = datetime.now(timezone.utc) - timedelta(hours=24)
for row in rows:
try:
columns = row.find_elements(By.TAG_NAME, "td")
if len(columns) >= 3:
title = columns[1].text.strip()
search_volume_text = columns[2].text.strip()
search_volume = parse_search_volume(search_volume_text)
logging.info(f"Parsed trend: {title} with search volume: {search_volume}")
if title and search_volume >= 20000:
link = f"https://trends.google.com/trends/explore?q={quote(title)}&geo={geo}"
trends.append({
"title": title,
"link": link,
"search_volume": search_volume
})
logging.info(f"Added trend: {title} with search volume: {search_volume}")
else:
logging.info(f"Skipping trend: {title} (volume: {search_volume} < 20K or no title)")
else:
logging.info(f"Skipping row with insufficient columns: {len(columns)}")
except Exception as e:
logging.warning(f"Row processing error: {e}")
continue
if trends:
trends.sort(key=lambda x: x["search_volume"], reverse=True)
logging.info(f"Extracted {len(trends)} trends for geo={geo}: {[t['title'] for t in trends]}")
print(f"Raw trends fetched for geo={geo}: {[t['title'] for t in trends]}")
else:
logging.warning(f"No valid trends found with search volume >= 20K for geo={geo}")
return trends
finally:
driver.quit()
logging.info(f"Chrome driver closed for geo={geo}")
def fetch_duckduckgo_news_context(trend_title, hours=24):
try:
with DDGS() as ddgs:
results = ddgs.news(f"{trend_title} news", timelimit="d", max_results=5)
titles = []
for r in results:
try:
date_str = r["date"]
if '+00:00' in date_str:
dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=timezone.utc)
else:
dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
if dt > (datetime.now(timezone.utc) - timedelta(hours=24)):
titles.append(r["title"].lower())
except ValueError as e:
logging.warning(f"Date parsing failed for '{date_str}': {e}")
continue
context = " ".join(titles) if titles else "No recent news found within 24 hours"
logging.info(f"DuckDuckGo News context for '{trend_title}': {context}")
return context
except Exception as e:
logging.warning(f"DuckDuckGo News context fetch failed for '{trend_title}': {e}")
return trend_title
def curate_from_google_trends(geo_list=['US']):
original_source = '<a href="https://trends.google.com/">Google Trends</a>'
for geo in geo_list:
trends = scrape_google_trends(geo=geo)
if not trends:
print(f"No trends available for geo={geo}")
logging.info(f"No trends available for geo={geo}")
continue
attempts = 0
max_attempts = 10
while attempts < max_attempts and trends:
trend = trends.pop(0) # Take highest-volume trend
title = trend["title"]
link = trend["link"]
search_volume = trend["search_volume"]
print(f"Trying Trend: {title} with search volume: {search_volume} for geo={geo}")
logging.info(f"Trying Trend: {title} with search volume: {search_volume} for geo={geo}")
if title in posted_titles:
print(f"Skipping already posted trend: {title}")
logging.info(f"Skipping already posted trend: {title}")
attempts += 1
continue
image_query, relevance_keywords, skip = smart_image_and_filter(title, "")
if skip:
print(f"Skipping unwanted trend: {title}")
logging.info(f"Skipping unwanted trend: {title}")
attempts += 1
continue
context = fetch_duckduckgo_news_context(title)
scoring_content = f"{title}\n\n{context}"
interest_score = is_interesting(scoring_content)
logging.info(f"Interest score for '{title}' in geo={geo}: {interest_score}")
if interest_score < 6:
print(f"Trend Interest Too Low: {interest_score}")
logging.info(f"Trend Interest Too Low: {interest_score}")
attempts += 1
continue
num_paragraphs = determine_paragraph_count(interest_score)
extra_prompt = (
f"Generate exactly {num_paragraphs} paragraphs. "
f"Do not mention Google Trends, Google, or include any links. "
f"Summarize as a standalone food industry trend, focusing on '{title}' and its context."
)
final_summary = summarize_with_gpt4o(
scoring_content,
source_name="Google Trends",
source_url=link,
interest_score=interest_score,
extra_prompt=extra_prompt
)
if not final_summary:
logging.info(f"Summary failed for '{title}'")
attempts += 1
continue
final_summary = insert_link_naturally(final_summary, "Google Trends", link)
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
if not post_data:
attempts += 1
continue
image_url, image_source, uploader, page_url = get_flickr_image_via_ddg(image_query, relevance_keywords)
if not image_url:
image_url, image_source, uploader, page_url = get_image(image_query)
hook = get_dynamic_hook(post_data["title"]).strip()
cta = select_best_cta(post_data["title"], final_summary, post_url=None)
post_data["content"] = f"{final_summary}\n\n{cta}"
post_id, post_url = post_to_wp(
post_data=post_data,
category=category,
link=link,
author=author,
image_url=image_url,
original_source=original_source,
image_source=image_source,
uploader=uploader,
pixabay_url=pixabay_url,
interest_score=interest_score
)
if post_id:
cta = select_best_cta(post_data["title"], final_summary, post_url=post_url)
post_data["content"] = f"{final_summary}\n\n{cta}"
post_to_wp(
post_data=post_data,
category=category,
link=link,
author=author,
image_url=image_url,
original_source=original_source,
image_source=image_source,
uploader=uploader,
pixabay_url=pixabay_url,
interest_score=interest_score,
post_id=post_id
)
timestamp = datetime.now(timezone.utc).isoformat()
save_json_file(POSTED_TITLES_FILE, title, timestamp)
posted_titles.add(title)
logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE}")
if image_url:
save_json_file(USED_IMAGES_FILE, image_url, timestamp)
logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE}")
print(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from trend for geo={geo} *****")
logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from trend for geo={geo} *****")
return post_data, category, random.randint(0, 1800)
print(f"No interesting trend found for geo={geo}")
logging.info(f"No interesting trend found for geo={geo}")
print(f"No interesting trend found across regions {geo_list}")
logging.info(f"No interesting trend found across regions {geo_list}")
return None, None, random.randint(600, 1200)
def run_google_trends_automator():
logging.info("***** Google Trends Automator Launched *****")
geo_list = ['US', 'GB', 'AU']
post_data, category, sleep_time = curate_from_google_trends(geo_list=geo_list)
print(f"Sleeping for {sleep_time}s")
logging.info(f"Completed run with sleep time: {sleep_time} seconds")
time.sleep(sleep_time)
return post_data, category, sleep_time
if __name__ == "__main__":
run_google_trends_automator()

@ -0,0 +1,330 @@
import requests
import random
import time
import logging
import os
import json
from datetime import datetime, timedelta, timezone
from openai import OpenAI
from urllib.parse import quote
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import praw
from foodie_config import (
AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS,
SUMMARY_PERSONA_PROMPTS, CATEGORIES, CTAS, get_clean_source_name,
REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT, LIGHT_TASK_MODEL
)
from foodie_utils import (
load_json_file, save_json_file, get_image, generate_image_query,
upload_image_to_wp, determine_paragraph_count, insert_link_naturally,
summarize_with_gpt4o, generate_category_from_summary, post_to_wp,
prepare_post_data, select_best_author, smart_image_and_filter, get_flickr_image_via_ddg
)
from foodie_hooks import get_dynamic_hook, select_best_cta
LOG_FILE = "/home/shane/foodie_automator/foodie_automator_reddit.log"
LOG_PRUNE_DAYS = 30
def setup_logging():
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = []
for line in lines:
try:
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
if timestamp > cutoff:
pruned_lines.append(line)
except ValueError:
logging.warning(f"Skipping malformed log line: {line.strip()[:50]}...")
continue
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("prawcore").setLevel(logging.WARNING)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logging.info("Logging initialized for foodie_automator_reddit.py")
setup_logging()
POSTED_TITLES_FILE = '/home/shane/foodie_automator/posted_reddit_titles.json'
USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json'
EXPIRATION_HOURS = 24
IMAGE_EXPIRATION_DAYS = 7
posted_titles_data = load_json_file(POSTED_TITLES_FILE, EXPIRATION_HOURS)
posted_titles = set(entry["title"] for entry in posted_titles_data if "title" in entry)
used_images_data = load_json_file(USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS)
used_images = set(entry["title"] for entry in used_images_data if "title" in entry)
client = OpenAI(api_key="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")
def is_interesting_reddit(title, summary, upvotes, comment_count, top_comments):
try:
content = f"Title: {title}\n\nContent: {summary}"
if top_comments:
content += f"\n\nTop Comments:\n{'\n'.join(top_comments)}"
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Rate this Reddit post from 0-10 based on rarity, buzzworthiness, and engagement potential for food lovers, covering food topics (skip recipes). "
"Score 8-10 for rare, highly shareable ideas (e.g., unique dishes or restaurant trends). "
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. "
"Consider comments for added context (e.g., specific locations or unique details). "
"Return only a number."
)},
{"role": "user", "content": content}
],
max_tokens=5
)
base_score = int(response.choices[0].message.content.strip()) if response.choices[0].message.content.strip().isdigit() else 0
engagement_boost = 0
if upvotes >= 500:
engagement_boost += 3
elif upvotes >= 100:
engagement_boost += 2
elif upvotes >= 50:
engagement_boost += 1
if comment_count >= 100:
engagement_boost += 2
elif comment_count >= 20:
engagement_boost += 1
final_score = min(base_score + engagement_boost, 10)
logging.info(f"Reddit Interest Score: {final_score} (base: {base_score}, upvotes: {upvotes}, comments: {comment_count}, top_comments: {len(top_comments)}) for '{title}'")
print(f"Interest Score for '{title[:50]}...': {final_score} (base: {base_score}, upvotes: {upvotes}, comments: {comment_count})")
return final_score
except Exception as e:
logging.error(f"Reddit interestingness scoring failed: {e}")
print(f"Reddit Interest Error: {e}")
return 0
def get_top_comments(post_url, reddit, limit=3):
try:
submission = reddit.submission(url=post_url)
submission.comments.replace_more(limit=0)
submission.comment_sort = 'top'
top_comments = [comment.body for comment in submission.comments[:limit] if not comment.body.startswith('[deleted]')]
logging.info(f"Fetched {len(top_comments)} top comments for {post_url}")
return top_comments
except Exception as e:
logging.error(f"Failed to fetch comments for {post_url}: {e}")
return []
def fetch_reddit_posts():
reddit = praw.Reddit(
client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
user_agent=REDDIT_USER_AGENT
)
feeds = ['FoodPorn', 'restaurant', 'FoodIndustry', 'food']
articles = []
cutoff_date = datetime.now(timezone.utc) - timedelta(hours=EXPIRATION_HOURS)
logging.info(f"Starting fetch with cutoff date: {cutoff_date}")
for subreddit_name in feeds:
try:
subreddit = reddit.subreddit(subreddit_name)
for submission in subreddit.top(time_filter='day', limit=100):
pub_date = datetime.fromtimestamp(submission.created_utc, tz=timezone.utc)
if pub_date < cutoff_date:
logging.info(f"Skipping old post: {submission.title} (Published: {pub_date})")
continue
articles.append({
"title": submission.title,
"link": f"https://www.reddit.com{submission.permalink}",
"summary": submission.selftext,
"feed_title": get_clean_source_name(subreddit_name),
"pub_date": pub_date,
"upvotes": submission.score,
"comment_count": submission.num_comments
})
logging.info(f"Fetched {len(articles)} posts from r/{subreddit_name}")
except Exception as e:
logging.error(f"Failed to fetch Reddit feed r/{subreddit_name}: {e}")
logging.info(f"Total Reddit posts fetched: {len(articles)}")
return articles
def curate_from_reddit():
articles = fetch_reddit_posts()
if not articles:
print("No Reddit posts available")
logging.info("No Reddit posts available")
return None, None, None
# Sort by upvotes descending
articles.sort(key=lambda x: x["upvotes"], reverse=True)
reddit = praw.Reddit(
client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
user_agent=REDDIT_USER_AGENT
)
attempts = 0
max_attempts = 10
while attempts < max_attempts and articles:
article = articles.pop(0) # Take highest-upvote post
title = article["title"]
link = article["link"]
summary = article["summary"]
source_name = "Reddit"
original_source = '<a href="https://www.reddit.com/">Reddit</a>'
if title in posted_titles:
print(f"Skipping already posted post: {title}")
logging.info(f"Skipping already posted post: {title}")
attempts += 1
continue
print(f"Trying Reddit Post: {title} from {source_name}")
logging.info(f"Trying Reddit Post: {title} from {source_name}")
image_query, relevance_keywords, skip = smart_image_and_filter(title, summary)
if skip or any(keyword in title.lower() or keyword in summary.lower() for keyword in RECIPE_KEYWORDS + ["homemade"]):
print(f"Skipping filtered Reddit post: {title}")
logging.info(f"Skipping filtered Reddit post: {title}")
attempts += 1
continue
top_comments = get_top_comments(link, reddit, limit=3)
interest_score = is_interesting_reddit(
title,
summary,
article["upvotes"],
article["comment_count"],
top_comments
)
logging.info(f"Interest Score: {interest_score} for '{title}'")
if interest_score < 6:
print(f"Reddit Interest Too Low: {interest_score}")
logging.info(f"Reddit Interest Too Low: {interest_score}")
attempts += 1
continue
num_paragraphs = determine_paragraph_count(interest_score)
extra_prompt = (
f"Generate exactly {num_paragraphs} paragraphs. "
f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details. "
"Incorporate relevant insights from these top comments if available: {', '.join(top_comments) if top_comments else 'None'}. "
"Do NOT introduce unrelated concepts unless in the content or comments. "
"If brief, expand on the core idea with relevant context about its appeal or significance."
)
content_to_summarize = f"{title}\n\n{summary}"
if top_comments:
content_to_summarize += f"\n\nTop Comments:\n{'\n'.join(top_comments)}"
final_summary = summarize_with_gpt4o(
content_to_summarize,
source_name,
link,
interest_score=interest_score,
extra_prompt=extra_prompt
)
if not final_summary:
logging.info(f"Summary failed for '{title}'")
attempts += 1
continue
final_summary = insert_link_naturally(final_summary, source_name, link)
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
if not post_data:
attempts += 1
continue
image_url, image_source, uploader, page_url = get_flickr_image_via_ddg(image_query, relevance_keywords)
if not image_url:
image_url, image_source, uploader, page_url = get_image(image_query)
hook = get_dynamic_hook(post_data["title"]).strip()
cta = select_best_cta(post_data["title"], final_summary, post_url=None)
post_data["content"] = f"{final_summary}\n\n{cta}"
post_id, post_url = post_to_wp(
post_data=post_data,
category=category,
link=link,
author=author,
image_url=image_url,
original_source=original_source,
image_source=image_source,
uploader=uploader,
pixabay_url=pixabay_url,
interest_score=interest_score
)
if post_id:
cta = select_best_cta(post_data["title"], final_summary, post_url=post_url)
post_data["content"] = f"{final_summary}\n\n{cta}"
post_to_wp(
post_data=post_data,
category=category,
link=link,
author=author,
image_url=image_url,
original_source=original_source,
image_source=image_source,
uploader=uploader,
pixabay_url=pixabay_url,
interest_score=interest_score,
post_id=post_id
)
timestamp = datetime.now(timezone.utc).isoformat()
save_json_file(POSTED_TITLES_FILE, title, timestamp)
posted_titles.add(title)
logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE} with timestamp {timestamp}")
if image_url:
save_json_file(USED_IMAGES_FILE, image_url, timestamp)
used_images.add(image_url)
logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE} with timestamp {timestamp}")
print(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Reddit *****")
print(f"Actual post URL: {post_url}")
logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Reddit *****")
logging.info(f"Actual post URL: {post_url}")
return post_data, category, random.randint(0, 1800)
attempts += 1
logging.info(f"WP posting failed for '{post_data['title']}'")
print("No interesting Reddit post found after attempts")
logging.info("No interesting Reddit post found after attempts")
return None, None, random.randint(600, 1800)
def run_reddit_automator():
print(f"{datetime.now(timezone.utc)} - INFO - ***** Reddit Automator Launched *****")
logging.info("***** Reddit Automator Launched *****")
post_data, category, sleep_time = curate_from_reddit()
if not post_data:
print(f"No postable Reddit article found - sleeping for {sleep_time} seconds")
logging.info(f"No postable Reddit article found - sleeping for {sleep_time} seconds")
else:
print(f"Completed Reddit run with sleep time: {sleep_time} seconds")
logging.info(f"Completed Reddit run with sleep time: {sleep_time} seconds")
print(f"Sleeping for {sleep_time}s")
time.sleep(sleep_time)
return post_data, category, sleep_time
if __name__ == "__main__":
run_reddit_automator()

@ -0,0 +1,330 @@
import requests
import random
import time
import logging
import os
import json
import email.utils
from datetime import datetime, timedelta, timezone
from bs4 import BeautifulSoup
from openai import OpenAI
from urllib.parse import quote
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from foodie_config import RSS_FEEDS, RSS_FEED_NAMES, AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, SUMMARY_PERSONA_PROMPTS, CATEGORIES, get_clean_source_name
from foodie_utils import (
load_json_file, save_json_file, get_image, generate_image_query,
upload_image_to_wp, determine_paragraph_count, insert_link_naturally, is_interesting,
generate_title_from_summary, summarize_with_gpt4o, generate_category_from_summary, post_to_wp,
prepare_post_data, select_best_author, smart_image_and_filter
)
from foodie_hooks import get_dynamic_hook, select_best_cta
import feedparser
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Optional
LOG_FILE = "/home/shane/foodie_automator/foodie_automator_rss.log"
LOG_PRUNE_DAYS = 30
MAX_WORKERS = 5 # Number of concurrent workers for parallel processing
RATE_LIMIT_DELAY = 1 # Delay between API calls in seconds
FEED_TIMEOUT = 30 # Timeout for feed requests in seconds
MAX_RETRIES = 3 # Maximum number of retries for failed requests
def setup_logging():
"""Configure logging with rotation and cleanup."""
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = []
for line in lines:
try:
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
if timestamp > cutoff:
pruned_lines.append(line)
except ValueError:
logging.warning(f"Skipping malformed log line: {line.strip()[:50]}...")
continue
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
def create_http_session() -> requests.Session:
"""Create and configure an HTTP session with retry logic."""
session = requests.Session()
retry_strategy = Retry(
total=MAX_RETRIES,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=10
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def fetch_feed(feed_url: str, session: requests.Session) -> Optional[feedparser.FeedParserDict]:
"""Fetch and parse an RSS feed with error handling and retries."""
try:
response = session.get(feed_url, timeout=FEED_TIMEOUT)
response.raise_for_status()
feed = feedparser.parse(response.content)
if feed.bozo:
logging.warning(f"Feed parsing error for {feed_url}: {feed.bozo_exception}")
return None
return feed
except Exception as e:
logging.error(f"Error fetching feed {feed_url}: {str(e)}")
return None
def is_interesting_rss(title: str, summary: str, pub_date: datetime) -> bool:
"""Enhanced content filtering with improved scoring."""
try:
# Basic validation
if not title or not summary:
return False
# Check if content is too old
if datetime.now(timezone.utc) - pub_date > timedelta(days=7):
return False
# Calculate interest score
score = 0
# Title analysis
title_lower = title.lower()
if any(keyword in title_lower for keyword in RECIPE_KEYWORDS):
score += 3
if any(keyword in title_lower for keyword in PROMO_KEYWORDS):
score += 2
if any(keyword in title_lower for keyword in HOME_KEYWORDS):
score += 1
# Content analysis
summary_lower = summary.lower()
if len(summary.split()) < 100:
score -= 2
if any(keyword in summary_lower for keyword in PRODUCT_KEYWORDS):
score += 1
return score >= 4
except Exception as e:
logging.error(f"Error in is_interesting_rss: {str(e)}")
return False
def fetch_rss_feeds() -> List[Dict[str, Any]]:
"""Fetch RSS feeds with parallel processing and improved error handling."""
session = create_http_session()
articles = []
try:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for feed_url in RSS_FEEDS:
future = executor.submit(process_feed, feed_url, session)
futures.append(future)
for future in as_completed(futures):
try:
feed_articles = future.result()
articles.extend(feed_articles)
except Exception as e:
logging.error(f"Error processing feed: {str(e)}")
continue
return articles
except Exception as e:
logging.error(f"Error in fetch_rss_feeds: {str(e)}")
return []
def process_feed(feed_url: str, session: requests.Session) -> List[Dict[str, Any]]:
"""Process a single RSS feed and extract articles."""
try:
feed = fetch_feed(feed_url, session)
if not feed:
return []
articles = []
for entry in feed.entries:
try:
pub_date = datetime.fromtimestamp(time.mktime(entry.published_parsed), tz=timezone.utc)
article = {
"title": entry.title,
"link": entry.link,
"summary": entry.summary if hasattr(entry, 'summary') else entry.description,
"feed_title": get_clean_source_name(feed.feed.title),
"pub_date": pub_date
}
if is_interesting_rss(article["title"], article["summary"], pub_date):
articles.append(article)
time.sleep(RATE_LIMIT_DELAY)
except Exception as e:
logging.warning(f"Error processing entry: {str(e)}")
continue
return articles
except Exception as e:
logging.error(f"Error processing feed {feed_url}: {str(e)}")
return []
def parse_date(date_str):
try:
parsed_date = email.utils.parsedate_to_datetime(date_str)
if parsed_date.tzinfo is None:
parsed_date = parsed_date.replace(tzinfo=timezone.utc)
return parsed_date
except Exception as e:
logging.error(f"Failed to parse date '{date_str}': {e}")
return datetime.now(timezone.utc)
def curate_from_rss():
articles = fetch_rss_feeds()
if not articles:
print("No RSS articles available")
logging.info("No RSS articles available")
return None, None, None
attempts = 0
max_attempts = 10
while attempts < max_attempts and articles:
article = articles.pop(0) # Take newest article
title = article["title"]
link = article["link"]
summary = article["summary"]
content = article["content"]
feed_url = article["feed_title"]
source_name = feed_url[0] if isinstance(feed_url, tuple) and len(feed_url) > 0 else feed_url
original_source = f'<a href="{link}">{source_name}</a>'
if title in posted_titles:
print(f"Skipping already posted article: {title}")
logging.info(f"Skipping already posted article: {title}")
attempts += 1
continue
print(f"Trying RSS Article: {title} from {source_name}")
logging.info(f"Trying RSS Article: {title} from {source_name}")
image_query, relevance_keywords, skip = smart_image_and_filter(title, summary)
if skip:
print(f"Skipping filtered RSS article: {title}")
logging.info(f"Skipping filtered RSS article: {title}")
attempts += 1
continue
# Score using title, summary, and content
scoring_content = f"{title}\n\n{summary}\n\nContent: {content}"
interest_score = is_interesting(scoring_content)
logging.info(f"Interest score for '{title}': {interest_score}")
if interest_score < 6:
print(f"RSS Interest Too Low: {interest_score}")
logging.info(f"RSS Interest Too Low: {interest_score}")
attempts += 1
continue
num_paragraphs = determine_paragraph_count(interest_score)
extra_prompt = (
f"Generate exactly {num_paragraphs} paragraphs. "
f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details. "
f"Do NOT introduce unrelated concepts. Expand on the core idea with relevant context about its appeal or significance."
)
content_to_summarize = scoring_content
final_summary = summarize_with_gpt4o(
content_to_summarize,
source_name,
link,
interest_score=interest_score,
extra_prompt=extra_prompt
)
if not final_summary:
logging.info(f"Summary failed for '{title}'")
attempts += 1
continue
final_summary = insert_link_naturally(final_summary, source_name, link)
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
if not post_data:
attempts += 1
continue
hook = get_dynamic_hook(post_data["title"]).strip()
cta = select_best_cta(post_data["title"], final_summary, post_url=None)
post_data["content"] = f"{final_summary}\n\n{cta}"
post_id, post_url = post_to_wp(
post_data=post_data,
category=category,
link=link,
author=author,
image_url=image_url,
original_source=original_source,
image_source=image_source,
uploader=uploader,
pixabay_url=pixabay_url,
interest_score=interest_score
)
if post_id:
cta = select_best_cta(post_data["title"], final_summary, post_url=post_url)
post_data["content"] = f"{final_summary}\n\n{cta}"
post_to_wp(
post_data=post_data,
category=category,
link=link,
author=author,
image_url=image_url,
original_source=original_source,
image_source=image_source,
uploader=uploader,
pixabay_url=pixabay_url,
interest_score=interest_score,
post_id=post_id
)
timestamp = datetime.now(timezone.utc).isoformat()
save_json_file(POSTED_TITLES_FILE, title, timestamp)
posted_titles.add(title)
logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE}")
if image_url:
save_json_file(USED_IMAGES_FILE, image_url, timestamp)
used_images.add(image_url)
logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE}")
print(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from RSS *****")
logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from RSS *****")
return post_data, category, random.randint(0, 1800)
attempts += 1
logging.info(f"WP posting failed for '{post_data['title']}'")
print("No interesting RSS article found after attempts")
logging.info("No interesting RSS article found after attempts")
return None, None, random.randint(600, 1800)
def run_rss_automator():
print(f"{datetime.now(timezone.utc)} - INFO - ***** RSS Automator Launched *****")
logging.info("***** RSS Automator Launched *****")
post_data, category, sleep_time = curate_from_rss()
print(f"Sleeping for {sleep_time}s")
logging.info(f"Completed run with sleep time: {sleep_time} seconds")
time.sleep(sleep_time)
return post_data, category, sleep_time
if __name__ == "__main__":
run_rss_automator()

@ -0,0 +1,162 @@
# foodie_config.py
# Constants shared across all automator scripts
OPENAI_API_KEY = "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"
PIXABAY_API_KEY = "14836528-999c19a033d77d463113b1fb8"
AUTHORS = [
{
"url": "https://insiderfoodie.com",
"username": "shanehill",
"password": "LKfH JF0x CnnU SSxK s9f1 993x",
"persona": "Visionary Editor",
"bio": "I oversee worldwide dining shifts, obsessed with the big picture. My edits deliver precise takes—charting the future of food with confidence."
},
{
"url": "https://insiderfoodie.com",
"username": "javiermorales",
"password": "r46q z0JX QL1q ztbH Tifk Cn28",
"persona": "Foodie Critic",
"bio": "I judge food scenes worldwide, wielding a fearless pen. My takes expose what shines and what flops—no compromise, just truth."
},
{
"url": "https://insiderfoodie.com",
"username": "aishapatel",
"password": "NyCa SOXd 5EVf bVvW KIoz wC0C",
"persona": "Trend Scout",
"bio": "I scout global food trends, obsessed with what’s emerging. My sharp predictions map the industry’s path—always one step ahead."
},
{
"url": "https://insiderfoodie.com",
"username": "liennguyen",
"password": "Xorz sdpp T08J 8buz cCba BGzW",
"persona": "Culture Connoisseur",
"bio": "I trace worldwide dining traditions, weaving past into present. My words uncover the soul of flavor—connecting cultures bite by bite."
},
{
"url": "https://insiderfoodie.com",
"username": "keishawashington",
"password": "PMjv bKMb FmUc bzZG ZV1f ZzpK",
"persona": "African-American Soul Food Sage",
"bio": "I bring soul food’s legacy to life, blending history with modern vibes. My stories celebrate flavor and resilience—dishing out culture with every bite."
},
{
"url": "https://insiderfoodie.com",
"username": "lilamoreau",
"password": "e3nv Vsg4 L9wv RgL6 dHkm T3UD",
"persona": "Global Street Food Nomad",
"bio": "I roam the globe chasing street eats, from stalls to trucks. My tales uncover bold flavors and gritty trends shaping food on the go."
}
]
POSTED_RSS_TITLES_FILE = '/home/shane/foodie_automator/posted_rss_titles.json'
POSTED_GOOGLE_TITLES_FILE = '/home/shane/foodie_automator/posted_google_titles.json'
POSTED_REDDIT_TITLES_FILE = '/home/shane/foodie_automator/posted_reddit_titles.json'
USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json'
EXPIRATION_DAYS = 3
IMAGE_EXPIRATION_DAYS = 7
RSS_FEEDS = [
"https://www.eater.com/rss/full.xml",
"https://modernrestaurantmanagement.com/feed/",
"https://thespoon.tech/feed/",
"https://www.nrn.com/rss.xml",
"https://rss.nytimes.com/services/xml/rss/nyt/DiningandWine.xml",
"https://www.bakingbusiness.com/rss/articles",
"https://www.theguardian.com/food/rss"
]
RSS_FEED_NAMES = {
"https://www.eater.com/rss/full.xml": ("Eater", "https://www.eater.com/"),
"https://modernrestaurantmanagement.com/feed/": ("Modern Restaurant Management", "https://modernrestaurantmanagement.com/"),
"https://thespoon.tech/feed/": ("The Spoon", "https://thespoon.tech/"),
"https://www.nrn.com/rss.xml": ("Nation's Restaurant News", "https://www.nrn.com/"),
"https://rss.nytimes.com/services/xml/rss/nyt/DiningandWine.xml": ("The New York Times", "https://www.nytimes.com/section/food"),
"https://www.bakingbusiness.com/rss/articles": ("Baking Business", "https://www.bakingbusiness.com/"),
"https://www.theguardian.com/food/rss": ("The Guardian Food", "https://www.theguardian.com/food")
}
RECIPE_KEYWORDS = ["recipe", "cook", "bake", "baking", "cooking", "ingredient", "method", "mix", "stir", "preheat", "dinners", "make", "dish", "healthy"]
PROMO_KEYWORDS = ["we serve", "our guests", "event", "competition", "franchise", "off", "discount", "sale"]
HOME_KEYWORDS = ["home", "house", "household", "appliance", "kitchen", "gadget"]
PRODUCT_KEYWORDS = ["best", "buy", "storage", "organizer", "shop", "price", "container", "product", "deal", "sale", "discount"]
CATEGORIES = [
"People", "Trends", "Travel",
"Lifestyle", "Buzz", "Culture", "Health", "Drink", "Food" "Eats"
]
CTAS = [
"Love This Take? Share It On <a href='{share_url}'><i class=\"tsi tsi-twitter\"></i></a>!",
"Dig This Scoop? Post It On <a href='{share_url}'><i class=\"tsi tsi-twitter\"></i></a>!",
"Wild For This? Spread It On <a href='{share_url}'><i class=\"tsi tsi-twitter\"></i></a>!",
"Crave This Read? Tweet It On <a href='{share_url}'><i class=\"tsi tsi-twitter\"></i></a>!",
"Buzzing Over This? Share On <a href='{share_url}'><i class=\"tsi tsi-twitter\"></i></a>!"
]
SUMMARY_PERSONA_PROMPTS = {
"Visionary Editor": (
"You’re a commanding food editor with a borderless view. Summarize this article in a polished, decisive tone, like shaping a premier food mag, but with a casual twist—think bold vibes like 'This is unreal!'. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally without mentioning the source name or URL directly in the text, with a slight Upworthy/Buzzfeed flair style. "
"Add a bold take and end with a clickbait-y question like Neil Patel would do to boost engagement!"
),
"Foodie Critic": (
"You’re a seasoned foodie reviewer with a sharp eye. Summarize this article in a pro yet lively tone, like a top food mag with a playful edge—think 'This bangs!'. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally without mentioning the source name or URL directly in the text, with a slight Upworthy/Buzzfeed flair style. "
"Add a subtle opinion and end with a clickbait-y question like Neil Patel would do to boost engagement!"
),
"Trend Scout": (
"You’re a forward-thinking editor obsessed with trends. Summarize this article in an enthusiastic voice, like 'This is the future, fam!'. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally without mentioning the source name or URL directly in the text, with a slight Upworthy/Buzzfeed flair style. "
"Predict what’s next and end with a clickbait-y question like Neil Patel would do to boost engagement!"
),
"Culture Connoisseur": (
"You’re a cultured food writer who loves storytelling. Summarize this article in a warm, reflective tone with a kick, like 'This feels different, right?'. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally without mentioning the source name or URL directly in the text, with a slight Upworthy/Buzzfeed flair style. "
"Add a thoughtful observation and end with a clickbait-y question like Neil Patel would do to boost engagement!"
),
"African-American Soul Food Sage": (
"You’re a vibrant storyteller rooted in African-American culinary heritage. Summarize this article in a soulful tone, like 'This got that heat, y’all!'. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally without mentioning the source name or URL directly in the text, with a slight Upworthy/Buzzfeed flair style. "
"Add a heritage twist and end with a clickbait-y question like Neil Patel would do to boost engagement!"
),
"Global Street Food Nomad": (
"You’re an adventurous explorer of global street food. Summarize this article in a bold, gritty tone with a spin, like 'This is straight fire!'. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally without mentioning the source name or URL directly in the text, with a slight Upworthy/Buzzfeed flair style. "
"Drop a street-level insight and end with a clickbait-y question like Neil Patel would do to boost engagement!"
)
}
REDDIT_CLIENT_ID = "GtoZmrM8VyrxMvb7gBLrLg"
REDDIT_CLIENT_SECRET = "YGTx69ZzvMn329pZj2qiEEXW82aeSA"
REDDIT_USER_AGENT = "foodie_trends_bot by /u/AskShaneHill"
REDDIT_SUBREDDITS = [
"food",
"FoodPorn",
"spicy"
]
FAST_FOOD_KEYWORDS = [
"mcdonald", "burger king", "wendy", "taco bell", "kfc",
"subway", "domino", "pizza hut", "chipotle", "dunkin",
"starbucks", "sonic", "arby", "jack in the box", "popeyes",
"fast food", "chain", "drive-thru"
]
SUMMARY_MODEL = "gpt-4o" # or "gpt-4.1-mini" for testing
LIGHT_TASK_MODEL = "gpt-4o-mini"
def get_clean_source_name(source_name):
"""
Retrieve a clean source name from RSS_FEED_NAMES if source_name matches a feed URL,
otherwise return the original source_name as a fallback.
"""
for feed_url, (clean_name, _) in RSS_FEED_NAMES.items():
if feed_url == source_name:
return clean_name
return source_name

@ -0,0 +1,44 @@
from foodie_config import OPENAI_API_KEY, LIGHT_TASK_MODEL
from openai import OpenAI
import logging
import random
from urllib.parse import quote
client = OpenAI(api_key=OPENAI_API_KEY)
def get_dynamic_hook(article_title):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Generate a short, catchy hook (under 100 characters) for a tweet based on this article title about food topics. "
"Make it bold and quirky with Upworthy/Buzzfeed flair (e.g., 'This food twist is wild!'), avoiding clichés like 'game-changer'. "
"Do not include emojis in the hook. "
"Return only the hook text."
)},
{"role": "user", "content": article_title}
],
max_tokens=30
)
hook = response.choices[0].message.content.strip().replace('**', '')
logging.info(f"Generated dynamic hook: {hook}")
return hook
except Exception as e:
logging.error(f"Dynamic hook generation failed: {e}")
return "This food scoop will blow your mind!"
def select_best_cta(article_title, article_summary, post_url):
# Use the provided post_url if available, otherwise a placeholder to be updated later
share_url_base = post_url if post_url else "https://insiderfoodie.com/placeholder"
share_url = f"https://x.com/intent/tweet?url={quote(share_url_base)}&text={quote(get_dynamic_hook(article_title))}"
cta_options = [
f"Can’t Get Enough? Share This Now On <a href='{share_url}'><i class='tsi tsi-twitter'></i></a>!",
f"Obsessed Yet? Spread the Word On <a href='{share_url}'><i class='tsi tsi-twitter'></i></a>!",
f"This Blew Your Mind, Right? Tweet It On <a href='{share_url}'><i class='tsi tsi-twitter'></i></a>!",
f"Ready to Spill the Tea? Share On <a href='{share_url}'><i class='tsi tsi-twitter'></i></a>!",
f"Too Wild to Keep Quiet? Post It On <a href='{share_url}'><i class='tsi tsi-twitter'></i></a>!"
]
selected_cta = random.choice(cta_options)
logging.info(f"Selected random CTA: {selected_cta}")
return selected_cta

@ -0,0 +1,952 @@
import base64
import json
import logging
import os
import random
import re
from PIL import Image
import pytesseract
import io
import tempfile
import requests
import time
from datetime import datetime, timedelta
from openai import OpenAI
from urllib.parse import quote
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from foodie_config import RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, SUMMARY_PERSONA_PROMPTS, get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL
client = OpenAI(api_key="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")
def load_json_file(filename, expiration_days=None):
data = []
if os.path.exists(filename):
try:
with open(filename, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
if line.strip():
try:
entry = json.loads(line.strip())
if not isinstance(entry, dict) or "title" not in entry or "timestamp" not in entry:
logging.warning(f"Skipping malformed entry in {filename} at line {i}: {entry}")
continue
data.append(entry)
except json.JSONDecodeError as e:
logging.warning(f"Skipping invalid JSON line in {filename} at line {i}: {e}")
if expiration_days:
cutoff = (datetime.now() - timedelta(days=expiration_days)).isoformat()
data = [entry for entry in data if entry["timestamp"] > cutoff]
logging.info(f"Loaded {len(data)} entries from {filename}, {len(data)} valid after expiration check")
except Exception as e:
logging.error(f"Failed to load {filename}: {e}")
data = [] # Reset to empty on failure
return data
def save_json_file(filename, key, value):
entry = {"title": key, "timestamp": value}
PRUNE_INTERVAL_DAYS = 180
try:
data = load_json_file(filename, expiration_days=PRUNE_INTERVAL_DAYS)
# Remove duplicates by title
data = [item for item in data if item["title"] != key]
data.append(entry)
with open(filename, 'w') as f:
for item in data:
json.dump(item, f)
f.write('\n')
logging.info(f"Saved '{key}' to {filename}")
print(f"DEBUG: Saved '{key}' to {filename}")
loaded_data = load_json_file(filename, expiration_days=PRUNE_INTERVAL_DAYS)
logging.info(f"Pruned {filename} to {len(loaded_data)} entries (older than {PRUNE_INTERVAL_DAYS} days removed)")
except Exception as e:
logging.error(f"Failed to save or prune {filename}: {e}")
def select_best_persona(interest_score, content=""):
logging.info("Using select_best_persona with interest_score and content")
personas = ["Visionary Editor", "Foodie Critic", "Trend Scout", "Culture Connoisseur"]
content_lower = content.lower()
if any(kw in content_lower for kw in ["tech", "ai", "innovation", "sustainability"]):
return random.choice(["Trend Scout", "Visionary Editor"])
elif any(kw in content_lower for kw in ["review", "critic", "taste", "flavor"]):
return "Foodie Critic"
elif any(kw in content_lower for kw in ["culture", "tradition", "history"]):
return "Culture Connoisseur"
if interest_score >= 8:
return random.choice(personas[:2])
elif interest_score >= 6:
return random.choice(personas[2:])
return random.choice(personas)
def get_image(search_query):
api_key = "14836528-999c19a033d77d463113b1fb8"
base_url = "https://pixabay.com/api/"
queries = [search_query.split()[:2], search_query.split()]
for query in queries:
short_query = " ".join(query)
params = {
"key": api_key,
"q": short_query,
"image_type": "photo",
"safesearch": True,
"per_page": 20
}
try:
logging.info(f"Fetching Pixabay image for query '{short_query}'")
response = requests.get(base_url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if not data.get("hits"):
logging.warning(f"No image hits for query '{short_query}'")
continue
valid_images = [
hit for hit in data["hits"]
if all(tag not in hit.get("tags", "").lower() for tag in ["dog", "cat", "family", "child", "baby"])
]
if not valid_images:
logging.warning(f"No valid images for query '{short_query}' after filtering")
continue
image = random.choice(valid_images)
image_url = image["webformatURL"]
image_source = "Pixabay"
uploader = image.get("user", "Unknown")
pixabay_url = image["pageURL"]
logging.info(f"Fetched image URL: {image_url} by {uploader} for query '{short_query}'")
print(f"DEBUG: Image selected for query '{short_query}': {image_url}")
return image_url, image_source, uploader, pixabay_url
except requests.exceptions.RequestException as e:
logging.error(f"Image fetch failed for query '{short_query}': {e}")
continue
logging.error(f"All Pixabay image queries failed: {queries}")
return None, None, None, None
def generate_image_query(content):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"From this content (title and summary), generate two sets of 2-3 concise keywords for an image search about restaurant/food industry trends:\n"
"1. Search keywords: For finding images (e.g., 'AI restaurant technology'). Focus on key themes like technology, sustainability, dining, or specific food concepts.\n"
"2. Relevance keywords: For filtering relevant images (e.g., 'ai tech dining'). Focus on core concepts to ensure match.\n"
"Avoid vague terms like 'trends', 'future', or unrelated words like 'dog', 'family'. "
"Return as JSON: {'search': 'keyword1 keyword2', 'relevance': 'keyword3 keyword4'}"
)},
{"role": "user", "content": content}
],
max_tokens=100
)
raw_result = response.choices[0].message.content.strip()
logging.info(f"Raw GPT image query response: '{raw_result}'")
print(f"DEBUG: Raw GPT image query response: '{raw_result}'")
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip()
result = json.loads(cleaned_result)
if not isinstance(result, dict) or "search" not in result or "relevance" not in result or len(result["search"].split()) < 2:
logging.warning(f"Invalid image query format: {result}, using fallback")
words = re.findall(r'\w+', content.lower())
filtered_words = [w for w in words if w not in RECIPE_KEYWORDS + PROMO_KEYWORDS + ['trends', 'future', 'dog', 'family']]
search = " ".join(filtered_words[:3]) or "restaurant innovation"
relevance = filtered_words[3:6] or ["dining", "tech"]
result = {"search": search, "relevance": " ".join(relevance)}
logging.info(f"Generated image query: {result}")
print(f"DEBUG: Image query from content: {result}")
return result["search"], result["relevance"].split()
except json.JSONDecodeError as e:
logging.error(f"JSON parsing failed for image query: {e}, raw response: '{raw_result}'")
words = re.findall(r'\w+', content.lower())
filtered_words = [w for w in words if w not in RECIPE_KEYWORDS + PROMO_KEYWORDS + ['trends', 'future', 'dog', 'family']]
search = " ".join(filtered_words[:3]) or "restaurant innovation"
relevance = filtered_words[3:6] or ["dining", "tech"]
logging.info(f"Fallback image query: {{'search': '{search}', 'relevance': '{' '.join(relevance)}'}}")
return search, relevance
except Exception as e:
logging.error(f"Image query generation failed: {e}")
print(f"Image Query Error: {e}")
return None, None
def smart_image_and_filter(title, summary):
try:
content = f"{title}\n\n{summary}"
prompt = (
"Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) "
"for an image search about food industry trends or viral content. Prioritize specific terms if present, "
"otherwise focus on the main theme. "
"Return 'SKIP' if the article is about home appliances, recipes, promotions, or contains 'homemade', else 'KEEP'. "
"Return as JSON: {'image_query': 'specific term', 'relevance': ['keyword1', 'keyword2'], 'action': 'KEEP' or 'SKIP'}"
)
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": content}
],
max_tokens=100
)
raw_result = response.choices[0].message.content.strip()
logging.info(f"Raw GPT smart image/filter response: '{raw_result}'")
# Clean and parse JSON
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip()
try:
result = json.loads(cleaned_result)
except json.JSONDecodeError as e:
logging.warning(f"JSON parsing failed: {e}, raw: '{cleaned_result}'. Using fallback.")
return "food trends", ["cuisine", "dining"], False
if not isinstance(result, dict) or "image_query" not in result or "relevance" not in result or "action" not in result:
logging.warning(f"Invalid GPT response format: {result}, using fallback")
return "food trends", ["cuisine", "dining"], False
image_query = result["image_query"]
relevance_keywords = result["relevance"]
skip_flag = result["action"] == "SKIP" or "homemade" in title.lower() or "homemade" in summary.lower()
logging.info(f"Smart image query: {image_query}, Relevance: {relevance_keywords}, Skip: {skip_flag}")
if not image_query or len(image_query.split()) < 2:
logging.warning(f"Image query '{image_query}' too vague, using fallback")
return "food trends", ["cuisine", "dining"], skip_flag
return image_query, relevance_keywords, skip_flag
except Exception as e:
logging.error(f"Smart image/filter failed: {e}, using fallback")
return "food trends", ["cuisine", "dining"], False
def upload_image_to_wp(image_url, post_title, wp_base_url, wp_username, wp_password, image_source="Pixabay", uploader=None, pixabay_url=None):
try:
safe_title = post_title.encode('ascii', 'ignore').decode('ascii').replace(' ', '_')[:50]
headers = {
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}",
"Content-Disposition": f"attachment; filename={safe_title}.jpg",
"Content-Type": "image/jpeg"
}
image_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
logging.info(f"Fetching image from {image_url} for '{post_title}'")
image_response = requests.get(image_url, headers=image_headers, timeout=10)
image_response.raise_for_status()
response = requests.post(
f"{wp_base_url}/media",
headers=headers,
data=image_response.content
)
response.raise_for_status()
image_id = response.json()["id"]
caption = f'<a href="{pixabay_url}">{image_source}</a> by {uploader}' if pixabay_url and uploader else image_source
requests.post(
f"{wp_base_url}/media/{image_id}",
headers={"Authorization": headers["Authorization"], "Content-Type": "application/json"},
json={"caption": caption}
)
logging.info(f"Uploaded image '{safe_title}.jpg' to WP (ID: {image_id}) with caption '{caption}'")
return image_id
except Exception as e:
logging.error(f"Image upload to WP failed for '{post_title}': {e}")
return None
def determine_paragraph_count(interest_score):
if interest_score >= 9:
return 5
elif interest_score >= 7:
return 4
return 3
def is_interesting(summary):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Rate this content from 0-10 based on its rarity, buzzworthiness, and engagement potential for food lovers, covering a wide range of food topics (skip recipes). "
"Score 8-10 for rare, highly shareable ideas that grab attention. "
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. "
"Return only a number."
)},
{"role": "user", "content": f"Content: {summary}"}
],
max_tokens=5
)
raw_score = response.choices[0].message.content.strip()
score = int(raw_score) if raw_score.isdigit() else 0
print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})")
logging.info(f"Interest Score: {score} (raw: {raw_score})")
return score
except Exception as e:
logging.error(f"Interestingness scoring failed: {e}")
print(f"Interest Error: {e}")
return 0
def generate_title_from_summary(summary):
banned_words = ["elevate", "elevating", "elevated"]
for attempt in range(3):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Generate a concise, engaging title (under 100 characters) based on this summary, covering food topics. "
"Craft it with Upworthy/Buzzfeed flair—think ‘you won’t believe this’ or ‘this is nuts’—for food insiders. "
"Avoid quotes, emojis, special characters, or the words 'elevate', 'elevating', 'elevated'. "
"End with a question to spark shares."
)},
{"role": "user", "content": f"Summary: {summary}"}
],
max_tokens=30
)
title = response.choices[0].message.content.strip().replace('"', '').replace("'", "")
if ':' in title:
title = title.split(':', 1)[1].strip()
if len(title) > 100 or any(word in title.lower() for word in banned_words):
reason = "length" if len(title) > 100 else "banned word"
print(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}")
logging.info(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}")
continue
logging.info(f"Generated title: {title}")
return title
except Exception as e:
logging.error(f"Title generation failed (attempt {attempt + 1}/3): {e}")
print(f"Title Error: {e}")
print("Failed to generate valid title after 3 attempts")
logging.info("Failed to generate valid title after 3 attempts")
return None
def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_prompt=""):
try:
persona = select_best_persona(interest_score, content)
prompt = SUMMARY_PERSONA_PROMPTS.get(persona, "Write a concise, engaging summary that captures the essence of the content for food lovers.")
logging.info(f"Using {persona} with interest_score and content")
full_prompt = (
f"{prompt}\n\n"
f"{extra_prompt}\n\n"
f"Content to summarize:\n{content}\n\n"
f"Source: {source_name}\n"
f"Link: {link}"
)
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": full_prompt},
{"role": "user", "content": content}
],
max_tokens=1000,
temperature=0.7
)
summary = response.choices[0].message.content.strip()
logging.info(f"Processed summary (Persona: {persona}): {summary}")
return summary
except Exception as e:
logging.error(f"Summary generation failed with model {SUMMARY_MODEL}: {e}")
return None
def smart_image_and_filter(title, summary):
try:
content = f"{title}\n\n{summary}"
prompt = (
'Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) '
'for an image search about food industry trends or viral content. Prioritize specific terms if present, '
'otherwise focus on the main theme. '
'Return "SKIP" if the article is about home appliances, recipes, promotions, or contains "homemade", else "KEEP". '
'Return as JSON with double quotes: {"image_query": "specific term", "relevance": ["keyword1", "keyword2"], "action": "KEEP" or "SKIP"}'
)
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": content}
],
max_tokens=100
)
raw_result = response.choices[0].message.content.strip()
logging.info(f"Raw GPT smart image/filter response: '{raw_result}'")
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip()
try:
result = json.loads(cleaned_result)
except json.JSONDecodeError as e:
logging.warning(f"JSON parsing failed: {e}, raw: '{cleaned_result}'. Using fallback.")
return "food trends", ["cuisine", "dining"], False
if not isinstance(result, dict) or "image_query" not in result or "relevance" not in result or "action" not in result:
logging.warning(f"Invalid GPT response format: {result}, using fallback")
return "food trends", ["cuisine", "dining"], False
image_query = result["image_query"]
relevance_keywords = result["relevance"]
skip_flag = result["action"] == "SKIP" or "homemade" in title.lower() or "homemade" in summary.lower()
logging.info(f"Smart image query: {image_query}, Relevance: {relevance_keywords}, Skip: {skip_flag}")
if not image_query or len(image_query.split()) < 2:
logging.warning(f"Image query '{image_query}' too vague, using fallback")
return "food trends", ["cuisine", "dining"], skip_flag
return image_query, relevance_keywords, skip_flag
except Exception as e:
logging.error(f"Smart image/filter failed: {e}, using fallback")
return "food trends", ["cuisine", "dining"], False
def is_interesting(summary):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Rate this content from 0-10 based on its rarity, buzzworthiness, and engagement potential for food lovers, covering a wide range of food topics (skip recipes). "
"Score 8-10 for rare, highly shareable ideas that grab attention. "
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. "
"Return only a number."
)},
{"role": "user", "content": f"Content: {summary}"}
],
max_tokens=5
)
raw_score = response.choices[0].message.content.strip()
score = int(raw_score) if raw_score.isdigit() else 0
print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})")
logging.info(f"Interest Score: {score} (raw: {raw_score})")
return score
except Exception as e:
logging.error(f"Interestingness scoring failed with model {LIGHT_TASK_MODEL}: {e}")
print(f"Interest Error: {e}")
return 0
def select_paragraphs(paragraphs, target_count, persona, original_content):
"""Select or generate paragraphs to match target_count, preserving key content."""
if len(paragraphs) == target_count and all(60 <= len(p.split()) <= 80 for p in paragraphs):
return paragraphs
# Score paragraphs by food-related keywords
keywords = ["food", "dish", "trend", "menu", "cuisine", "flavor", "taste", "eat", "dining", "restaurant"]
scores = []
for para in paragraphs:
score = sum(para.lower().count(kw) for kw in keywords)
word_count = len(para.split())
# Penalize paragraphs outside word range
score -= abs(word_count - 70) # Favor ~70 words
scores.append(score)
# Handle too many paragraphs
if len(paragraphs) > target_count:
# Keep last paragraph unless it's low-scoring
if scores[-1] >= min(scores[:-1]) or len(paragraphs) == target_count + 1:
selected_indices = sorted(range(len(paragraphs)-1), key=lambda i: scores[i], reverse=True)[:target_count-1] + [len(paragraphs)-1]
else:
selected_indices = sorted(range(len(paragraphs)), key=lambda i: scores[i], reverse=True)[:target_count]
selected = [paragraphs[i] for i in sorted(selected_indices)]
else:
selected = paragraphs[:]
# Handle word count adjustments or too few paragraphs
adjusted = []
for para in selected:
word_count = len(para.split())
if word_count < 60 or word_count > 80:
# Rephrase to fit 60-80 words
rephrase_prompt = (
f"Rephrase this paragraph to exactly 60-80 words, keeping the same tone as a {persona} and all key ideas: '{para}'"
)
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": rephrase_prompt},
{"role": "user", "content": para}
],
max_tokens=150,
temperature=0.7
)
new_para = response.choices[0].message.content.strip()
if 60 <= len(new_para.split()) <= 80:
adjusted.append(new_para)
else:
adjusted.append(para) # Fallback to original if rephrase fails
except Exception as e:
logging.warning(f"Rephrasing failed for paragraph: {e}")
adjusted.append(para)
else:
adjusted.append(para)
# Generate additional paragraphs if needed
while len(adjusted) < target_count:
extra_prompt = (
f"Generate one additional paragraph (60-80 words) in the style of a {persona}, "
f"based on this content: '{original_content[:200]}...'. Match the tone of: '{adjusted[-1] if adjusted else 'This trend is fire!'}'"
)
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": extra_prompt},
{"role": "user", "content": original_content}
],
max_tokens=150,
temperature=0.7
)
new_para = response.choices[0].message.content.strip()
if 60 <= len(new_para.split()) <= 80:
adjusted.append(new_para)
else:
adjusted.append("This trend is sparking buzz across menus!") # Fallback
except Exception as e:
logging.warning(f"Extra paragraph generation failed: {e}")
adjusted.append("This vibe is shaking up the food scene!")
return adjusted[:target_count]
def insert_link_naturally(summary, source_name, source_url):
import re
try:
prompt = (
"Take this summary and insert a single HTML link naturally into one paragraph (randomly chosen). "
"Use the format '<a href=\"{source_url}\">{source_name}</a>' and weave it into the text seamlessly, "
"e.g., 'The latest scoop from {source_name} reveals...' or '{source_name} uncovers this wild shift.' "
"Vary the phrasing creatively to avoid repetition (don’t always use 'dives into'). "
"Place the link at a sentence boundary (after a period, not within numbers like '6.30am' or '1.5'). "
"Maintain the original tone and flow, ensuring the link reads as part of the sentence, not standalone. "
"Return the modified summary with exactly one link, no extra formatting or newlines beyond the original.\n\n"
"Summary:\n{summary}\n\n"
"Source Name: {source_name}\nSource URL: {source_url}"
).format(summary=summary, source_name=source_name, source_url=source_url)
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": "Insert the link naturally into the summary."}
],
max_tokens=1000,
temperature=0.7
)
new_summary = response.choices[0].message.content.strip()
link_pattern = f'<a href="{source_url}">{source_name}</a>'
if new_summary and new_summary.count(link_pattern) == 1:
logging.info(f"Summary with naturally embedded link: {new_summary}")
return new_summary
logging.warning(f"GPT failed to insert link correctly: {new_summary}. Using fallback.")
except Exception as e:
logging.error(f"Link insertion failed: {e}")
# Fallback: Protect times and insert at sentence end
time_pattern = r'\b\d{1,2}\.\d{2}(?:am|pm)\b' # Matches 6.30am, 12.15pm
protected_summary = re.sub(time_pattern, lambda m: m.group(0).replace('.', '@'), summary)
paragraphs = protected_summary.split('\n')
if not paragraphs or all(not p.strip() for p in paragraphs):
logging.error("No valid paragraphs to insert link.")
return summary
target_para = random.choice([p for p in paragraphs if p.strip()])
phrases = [
f"The scoop from {link_pattern} spills the details",
f"{link_pattern} uncovers this wild shift",
f"This gem via {link_pattern} drops some truth",
f"{link_pattern} breaks down the buzz"
]
insertion_phrase = random.choice(phrases)
# Find sentence boundary, avoiding protected times
sentences = re.split(r'(?<=[.!?])\s+', target_para)
insertion_point = -1
for i, sent in enumerate(sentences):
if sent.strip() and '@' not in sent: # Avoid sentences with protected times
insertion_point = sum(len(s) + 1 for s in sentences[:i+1])
break
if insertion_point == -1:
insertion_point = len(target_para) # Append if no good boundary
# Add space after insertion phrase
new_para = f"{target_para[:insertion_point]} {insertion_phrase}. {target_para[insertion_point:]}".strip()
paragraphs[paragraphs.index(target_para)] = new_para
new_summary = '\n'.join(paragraphs)
# Restore periods in times
new_summary = new_summary.replace('@', '.')
logging.info(f"Fallback summary with link: {new_summary}")
return new_summary
def generate_category_from_summary(summary):
try:
if not isinstance(summary, str) or not summary.strip():
logging.warning(f"Invalid summary for category generation: {summary}. Defaulting to 'Trends'.")
return "Trends"
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Based on this summary, select the most relevant category from: Food, Culture, Trends, Health, Lifestyle, Drink, Eats. "
"Return only the category name."
)},
{"role": "user", "content": summary}
],
max_tokens=10
)
category = response.choices[0].message.content.strip()
logging.info(f"Generated category: {category}")
return category if category in ["Food", "Culture", "Trends", "Health", "Lifestyle", "Drink", "Eats"] else "Trends"
except Exception as e:
logging.error(f"Category generation failed: {e}")
return "Trends"
def get_wp_category_id(category_name, wp_base_url, wp_username, wp_password):
try:
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"}
response = requests.get(f"{wp_base_url}/categories", headers=headers, params={"search": category_name})
response.raise_for_status()
categories = response.json()
for cat in categories:
if cat["name"].lower() == category_name.lower():
return cat["id"]
return None
except Exception as e:
logging.error(f"Failed to get WP category ID for '{category_name}': {e}")
return None
def create_wp_category(category_name, wp_base_url, wp_username, wp_password):
try:
headers = {
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}",
"Content-Type": "application/json"
}
payload = {"name": category_name}
response = requests.post(f"{wp_base_url}/categories", headers=headers, json=payload)
response.raise_for_status()
return response.json()["id"]
except Exception as e:
logging.error(f"Failed to create WP category '{category_name}': {e}")
return None
def get_wp_tag_id(tag_name, wp_base_url, wp_username, wp_password):
try:
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"}
response = requests.get(f"{wp_base_url}/tags", headers=headers, params={"search": tag_name})
response.raise_for_status()
tags = response.json()
for tag in tags:
if tag["name"].lower() == tag_name.lower():
return tag["id"]
return None
except Exception as e:
logging.error(f"Failed to get WP tag ID for '{tag_name}': {e}")
return None
def post_to_wp(post_data, category, link, author, image_url, original_source, image_source="Pixabay", uploader=None, pixabay_url=None, interest_score=4, post_id=None):
wp_base_url = "https://insiderfoodie.com/wp-json/wp/v2"
logging.info(f"Starting post_to_wp for '{post_data['title']}', image_source: {image_source}")
if not isinstance(author, dict) or "username" not in author or "password" not in author:
raise ValueError(f"Invalid author data: {author}. Expected a dictionary with 'username' and 'password' keys.")
wp_username = author["username"]
wp_password = author["password"]
if not isinstance(interest_score, int):
logging.error(f"Invalid interest_score type: {type(interest_score)}, value: '{interest_score}'. Defaulting to 4.")
interest_score = 4
elif interest_score < 0 or interest_score > 10:
logging.warning(f"interest_score out of valid range (0-10): {interest_score}. Clamping to 4.")
interest_score = min(max(interest_score, 0), 10)
try:
headers = {
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}",
"Content-Type": "application/json"
}
auth_test = requests.get(f"{wp_base_url}/users/me", headers=headers)
auth_test.raise_for_status()
logging.info(f"Auth test passed for {wp_username}: {auth_test.json()['id']}")
category_id = get_wp_category_id(category, wp_base_url, wp_username, wp_password)
if not category_id:
category_id = create_wp_category(category, wp_base_url, wp_username, wp_password)
logging.info(f"Created new category '{category}' with ID {category_id}")
else:
logging.info(f"Found existing category '{category}' with ID {category_id}")
tags = [1]
if interest_score >= 9:
picks_tag_id = get_wp_tag_id("Picks", wp_base_url, wp_username, wp_password)
if picks_tag_id and picks_tag_id not in tags:
tags.append(picks_tag_id)
logging.info(f"Added 'Picks' tag (ID: {picks_tag_id}) to post due to high interest score: {interest_score}")
content = post_data["content"]
if content is None:
logging.error(f"Post content is None for title '{post_data['title']}' - using fallback")
content = "Content unavailable. Check the original source for details."
formatted_content = "\n".join(f"<p>{para}</p>" for para in content.split('\n') if para.strip())
author_id_map = {
"shanehill": 5,
"javiermorales": 2,
"aishapatel": 3,
"liennguyen": 4,
"keishawashington": 6,
"lilamoreau": 7
}
author_id = author_id_map.get(author["username"], 5)
payload = {
"title": post_data["title"],
"content": formatted_content,
"status": "publish",
"categories": [category_id],
"tags": tags,
"author": author_id,
"meta": {
"original_link": link,
"original_source": original_source,
"interest_score": interest_score
}
}
if image_url and not post_id:
logging.info(f"Attempting image upload for '{post_data['title']}', URL: {image_url}, source: {image_source}")
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, pixabay_url)
if not image_id:
logging.info(f"Flickr upload failed for '{post_data['title']}', falling back to Pixabay")
pixabay_query = post_data["title"][:50]
image_url, image_source, uploader, pixabay_url = get_image(pixabay_query)
if image_url:
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, pixabay_url)
if image_id:
payload["featured_media"] = image_id
else:
logging.warning(f"All image uploads failed for '{post_data['title']}' - posting without image")
endpoint = f"{wp_base_url}/posts/{post_id}" if post_id else f"{wp_base_url}/posts"
method = requests.post # Use POST for both create and update (WP API handles it)
logging.debug(f"Sending WP request to {endpoint} with payload: {json.dumps(payload, indent=2)}")
response = method(endpoint, headers=headers, json=payload)
response.raise_for_status()
post_info = response.json()
logging.debug(f"WP response: {json.dumps(post_info, indent=2)}")
if not isinstance(post_info, dict) or "id" not in post_info:
raise ValueError(f"Invalid WP response: {post_info}")
post_id = post_info["id"]
post_url = post_info["link"]
logging.info(f"Posted/Updated by {author['username']}: {post_data['title']} (ID: {post_id})")
return post_id, post_url
except requests.exceptions.RequestException as e:
logging.error(f"WP API request failed: {e} - Response: {e.response.text if e.response else 'No response'}")
print(f"WP Error: {e}")
return None, None
except KeyError as e:
logging.error(f"WP payload error - Missing key: {e} - Author data: {author}")
print(f"WP Error: {e}")
return None, None
except Exception as e:
logging.error(f"WP posting failed: {e}")
print(f"WP Error: {e}")
return None, None
def get_flickr_image_via_ddg(search_query, relevance_keywords):
try:
with DDGS() as ddgs:
results = ddgs.images(
f"{search_query} flickr site:flickr.com -poster -infographic -chart -graph -data -stats -text -typography",
license_image="sharecommercially",
max_results=30
)
if not results:
logging.warning(f"No Flickr images found via DDG for query '{search_query}'")
return None, None, None, None
headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}
candidates = []
for r in results:
image_url = r.get("image", "")
page_url = r.get("url", "")
if not image_url or "live.staticflickr.com" not in image_url:
continue
try:
response = requests.get(page_url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
tags_elem = soup.find_all('a', class_='tag')
tags = [tag.text.strip().lower() for tag in tags_elem] if tags_elem else []
title_elem = soup.find('h1', class_='photo-title')
title = title_elem.text.strip().lower() if title_elem else r.get("title", "").lower()
exclude_keywords = [
"poster", "infographic", "chart", "graph", "data", "stats", "text", "typography",
"design", "advertisement", "illustration", "diagram", "layout", "print"
]
matched_keywords = [kw for kw in exclude_keywords if kw in tags or kw in title]
if matched_keywords:
logging.info(f"Skipping text-heavy image: {image_url} (tags: {tags}, title: {title}, matched: {matched_keywords})")
continue
uploader = soup.find('a', class_='owner-name')
uploader = uploader.text.strip() if uploader else "Flickr User"
candidates.append({
"image_url": image_url,
"page_url": page_url,
"uploader": uploader,
"tags": tags,
"title": title
})
except requests.exceptions.RequestException as e:
logging.info(f"Skipping unavailable image: {image_url} (page: {page_url}, error: {e})")
continue
if not candidates:
logging.warning(f"No valid candidate images after filtering for '{search_query}'")
return None, None, None, None
result = random.choice(candidates)
image_url = result["image_url"]
# OCR check on the selected image
temp_file = None
try:
img_response = requests.get(image_url, headers=headers, timeout=10)
img_response.raise_for_status()
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
temp_file.write(img_response.content)
temp_path = temp_file.name
img = Image.open(temp_path)
text = pytesseract.image_to_string(img)
char_count = len(text.strip())
logging.info(f"OCR processed {image_url}: {char_count} characters detected")
if char_count > 200:
logging.info(f"Skipping text-heavy image (OCR): {image_url} (char_count: {char_count})")
return None, None, None, None # Fall back to Pixabay
# Success: Save and return
flickr_data = {
"title": search_query,
"image_url": image_url,
"source": "Flickr",
"uploader": result["uploader"],
"page_url": result["page_url"],
"timestamp": datetime.now().isoformat(),
"ocr_chars": char_count
}
flickr_file = "/home/shane/foodie_automator/flickr_images.json"
with open(flickr_file, 'a') as f:
json.dump(flickr_data, f)
f.write('\n')
logging.info(f"Saved Flickr image to {flickr_file}: {image_url}")
logging.info(f"Fetched Flickr image URL: {image_url} by {result['uploader']} for query '{search_query}' (tags: {result['tags']})")
print(f"DEBUG: Flickr image selected: {image_url}")
return image_url, "Flickr", result["uploader"], result["page_url"]
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
logging.warning(f"Rate limit hit for {image_url}. Falling back to Pixabay.")
return None, None, None, None
else:
logging.warning(f"Download failed for {image_url}: {e}")
return None, None, None, None
except Exception as e:
logging.warning(f"OCR processing failed for {image_url}: {e}")
return None, None, None, None
finally:
if temp_file and os.path.exists(temp_path):
os.unlink(temp_path)
except Exception as e:
logging.error(f"Flickr/DDG image fetch failed for '{search_query}': {e}")
return None, None, None, None
def select_best_author(summary):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Based on this restaurant/food industry trend summary, pick the most suitable author from: "
"shanehill, javiermorales, aishapatel, liennguyen, keishawashington, lilamoreau. "
"Consider their expertise: shanehill (global dining trends), javiermorales (food critique), "
"aishapatel (emerging food trends), liennguyen (cultural dining), keishawashington (soul food heritage), "
"lilamoreau (global street food). Return only the username."
)},
{"role": "user", "content": summary}
],
max_tokens=20
)
author = response.choices[0].message.content.strip()
valid_authors = ["shanehill", "javiermorales", "aishapatel", "liennguyen", "keishawashington", "lilamoreau"]
logging.info(f"Selected author: {author}")
return author if author in valid_authors else "shanehill"
except Exception as e:
logging.error(f"Author selection failed: {e}")
return "shanehill"
def prepare_post_data(final_summary, original_title, context_info=""):
innovative_title = generate_title_from_summary(final_summary)
if not innovative_title:
logging.info(f"Title generation failed for '{original_title}' {context_info}")
return None, None, None, None, None, None, None
# Note: This function still uses generate_image_query, but curate_from_rss overrides it with smart_image_and_filter
search_query, relevance_keywords = generate_image_query(f"{innovative_title}\n\n{final_summary}")
if not search_query:
logging.info(f"Image query generation failed for '{innovative_title}' {context_info}")
return None, None, None, None, None, None, None
logging.info(f"Fetching Flickr image for query: '{search_query}' {context_info}")
image_url, image_source, uploader, page_url = get_flickr_image_via_ddg(search_query, relevance_keywords)
if not image_url:
logging.info(f"Flickr fetch failed for '{search_query}' - falling back to Pixabay {context_info}")
image_query, _ = generate_image_query(f"{innovative_title}\n\n{final_summary}")
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
logging.info(f"Pixabay fetch failed for title '{innovative_title}' - falling back to summary {context_info}")
image_query, _ = generate_image_query(f"{final_summary}")
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
logging.info(f"Image fetch failed again for '{original_title}' - proceeding without image {context_info}")
post_data = {"title": innovative_title, "content": final_summary}
selected_username = select_best_author(final_summary)
author = next((a for a in AUTHORS if a["username"] == selected_username), None)
if not author:
logging.error(f"Author '{selected_username}' not found in AUTHORS, defaulting to shanehill")
author = {"username": "shanehill", "password": "LKfH JF0x CnnU SSxK s9f1 993x"}
category = generate_category_from_summary(final_summary)
return post_data, author, category, image_url, image_source, uploader, page_url

@ -0,0 +1,8 @@
requests==2.32.3
selenium==4.26.1
duckduckgo_search==6.2.11
openai==1.46.1
praw==7.7.1
beautifulsoup4==4.12.3
Pillow==10.4.0
pytesseract==0.3.13
Loading…
Cancel
Save