use cursor to optomize files

This commit is contained in:
2025-05-03 16:23:06 +10:00
parent 427a5cb919
commit 2ca39915e0
5 changed files with 1411 additions and 1634 deletions
+227 -291
View File
@@ -9,6 +9,7 @@ import json
import signal import signal
import sys import sys
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Tuple
from openai import OpenAI from openai import OpenAI
from urllib.parse import quote from urllib.parse import quote
from selenium import webdriver from selenium import webdriver
@@ -16,11 +17,12 @@ from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException from selenium.common.exceptions import TimeoutException, WebDriverException
from duckduckgo_search import DDGS from duckduckgo_search import DDGS
from foodie_config import ( from foodie_config import (
AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS,
PERSONA_CONFIGS, CATEGORIES, get_clean_source_name, X_API_CREDENTIALS PERSONA_CONFIGS, CATEGORIES, get_clean_source_name, X_API_CREDENTIALS,
FILE_PATHS, EXPIRATION_DAYS, IMAGE_EXPIRATION_DAYS
) )
from foodie_utils import ( from foodie_utils import (
load_json_file, save_json_file, get_image, generate_image_query, load_json_file, save_json_file, get_image, generate_image_query,
@@ -29,320 +31,254 @@ from foodie_utils import (
generate_category_from_summary, post_to_wp, prepare_post_data, generate_category_from_summary, post_to_wp, prepare_post_data,
smart_image_and_filter, insert_link_naturally, get_flickr_image smart_image_and_filter, insert_link_naturally, get_flickr_image
) )
from foodie_hooks import get_dynamic_hook, get_viral_share_prompt # Removed select_best_cta import from foodie_hooks import get_dynamic_hook, get_viral_share_prompt
from dotenv import load_dotenv from dotenv import load_dotenv
# Load environment variables
load_dotenv() load_dotenv()
# Global state
is_posting = False is_posting = False
logger = logging.getLogger(__name__)
def signal_handler(sig, frame): class GoogleTrendsScraper:
logging.info("Received termination signal, checking if safe to exit...") def __init__(self):
if is_posting: self.driver = None
logging.info("Currently posting, will exit after completion.") self.setup_logging()
else: self.setup_signal_handlers()
logging.info("Safe to exit immediately.") self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
sys.exit(0) self.posted_titles = self.load_posted_titles()
self.used_images = self.load_used_images()
signal.signal(signal.SIGTERM, signal_handler) def setup_logging(self) -> None:
signal.signal(signal.SIGINT, signal_handler) """Configure logging for the scraper."""
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(FILE_PATHS["posted_google_titles"].with_suffix('.log'), mode='a')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(console_handler)
logger.info("Logging initialized for Google Trends scraper")
logger = logging.getLogger() def setup_signal_handlers(self) -> None:
logger.setLevel(logging.INFO) """Set up signal handlers for graceful shutdown."""
file_handler = logging.FileHandler('/home/shane/foodie_automator/foodie_automator_google.log', mode='a') def signal_handler(sig, frame):
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.info("Received termination signal, checking if safe to exit...")
logger.addHandler(file_handler) if is_posting:
console_handler = logging.StreamHandler() logger.info("Currently posting, will exit after completion.")
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) else:
logger.addHandler(console_handler) logger.info("Safe to exit immediately.")
logging.info("Logging initialized for foodie_automator_google.py") sys.exit(0)
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
POSTED_TITLES_FILE = '/home/shane/foodie_automator/posted_google_titles.json' def load_posted_titles(self) -> set:
USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json' """Load and return the set of posted titles."""
EXPIRATION_HOURS = 24 try:
IMAGE_EXPIRATION_DAYS = 7 data = load_json_file(FILE_PATHS["posted_google_titles"], EXPIRATION_DAYS)
return {entry["title"] for entry in data}
except Exception as e:
logger.error(f"Error loading posted titles: {e}")
return set()
posted_titles_data = load_json_file(POSTED_TITLES_FILE, EXPIRATION_HOURS) def load_used_images(self) -> set:
posted_titles = set(entry["title"] for entry in posted_titles_data) """Load and return the set of used images."""
used_images = set(entry["title"] for entry in load_json_file(USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS) if "title" in entry) try:
data = load_json_file(FILE_PATHS["used_images"], IMAGE_EXPIRATION_DAYS)
return {entry["title"] for entry in data if "title" in entry}
except Exception as e:
logger.error(f"Error loading used images: {e}")
return set()
def parse_search_volume(volume_text): def parse_search_volume(self, volume_text: str) -> float:
try: """Parse search volume from text into a numeric value."""
volume_part = volume_text.split('\n')[0].lower().strip().replace('+', '') try:
if 'k' in volume_part: volume_part = volume_text.split('\n')[0].lower().strip().replace('+', '')
volume = float(volume_part.replace('k', '')) * 1000 if 'k' in volume_part:
elif 'm' in volume_part: return float(volume_part.replace('k', '')) * 1000
volume = float(volume_part.replace('m', '')) * 1000000 elif 'm' in volume_part:
else: return float(volume_part.replace('m', '')) * 1000000
volume = float(volume_part) return float(volume_part)
return volume except (ValueError, AttributeError) as e:
except (ValueError, AttributeError) as e: logger.warning(f"Could not parse search volume from '{volume_text}': {e}")
logging.warning(f"Could not parse search volume from '{volume_text}': {e}") return 0.0
return 0
def scrape_google_trends(geo='US'): def setup_driver(self) -> None:
chrome_options = Options() """Set up the Chrome WebDriver with appropriate options."""
chrome_options.add_argument("--headless") chrome_options = Options()
chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/125.0.0.0 Safari/537.36") chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/125.0.0.0 Safari/537.36")
self.driver = webdriver.Chrome(options=chrome_options)
driver = webdriver.Chrome(options=chrome_options) def scrape_google_trends(self, geo: str = 'US') -> List[Dict]:
try: """Scrape Google Trends for the specified region."""
for attempt in range(3): if not self.driver:
try: self.setup_driver()
time.sleep(random.uniform(2, 5))
url = f"https://trends.google.com/trending?geo={geo}&hours=24&sort=search-volume&category=5"
logging.info(f"Navigating to {url} (attempt {attempt + 1})")
driver.get(url)
logging.info("Waiting for page to load...")
WebDriverWait(driver, 60).until(
EC.presence_of_element_located((By.TAG_NAME, "tbody"))
)
break
except TimeoutException:
logging.warning(f"Timeout on attempt {attempt + 1} for geo={geo}")
if attempt == 2:
logging.error(f"Failed after 3 attempts for geo={geo}")
return []
time.sleep(5)
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
trends = [] trends = []
rows = driver.find_elements(By.XPATH, "//tbody/tr") try:
logging.info(f"Found {len(rows)} rows in tbody for geo={geo}") for attempt in range(3):
try:
time.sleep(random.uniform(2, 5))
url = f"https://trends.google.com/trending?geo={geo}&hours=24&sort=search-volume&category=5"
logger.info(f"Navigating to {url} (attempt {attempt + 1})")
self.driver.get(url)
cutoff_date = datetime.now(timezone.utc) - timedelta(hours=24) logger.info("Waiting for page to load...")
for row in rows: WebDriverWait(self.driver, 60).until(
try: EC.presence_of_element_located((By.TAG_NAME, "tbody"))
columns = row.find_elements(By.TAG_NAME, "td") )
if len(columns) >= 3: break
title = columns[1].text.strip() except TimeoutException:
search_volume_text = columns[2].text.strip() logger.warning(f"Timeout on attempt {attempt + 1} for geo={geo}")
search_volume = parse_search_volume(search_volume_text) if attempt == 2:
logging.info(f"Parsed trend: {title} with search volume: {search_volume}") logger.error(f"Failed after 3 attempts for geo={geo}")
if title and search_volume >= 20000: return []
link = f"https://trends.google.com/trends/explore?q={quote(title)}&geo={geo}" time.sleep(5)
trends.append({
"title": title, self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
"link": link, time.sleep(2)
"search_volume": search_volume
}) rows = self.driver.find_elements(By.XPATH, "//tbody/tr")
logging.info(f"Added trend: {title} with search volume: {search_volume}") logger.info(f"Found {len(rows)} rows in tbody for geo={geo}")
else:
logging.info(f"Skipping trend: {title} (volume: {search_volume} < 20K or no title)") for row in rows:
else: try:
logging.info(f"Skipping row with insufficient columns: {len(columns)}") columns = row.find_elements(By.TAG_NAME, "td")
except Exception as e: if len(columns) >= 3:
logging.warning(f"Row processing error: {e}") title = columns[1].text.strip()
search_volume = self.parse_search_volume(columns[2].text.strip())
if title and search_volume >= 20000:
link = f"https://trends.google.com/trends/explore?q={quote(title)}&geo={geo}"
trends.append({
"title": title,
"link": link,
"search_volume": search_volume
})
logger.info(f"Added trend: {title} with search volume: {search_volume}")
except Exception as e:
logger.warning(f"Row processing error: {e}")
continue
if trends:
trends.sort(key=lambda x: x["search_volume"], reverse=True)
logger.info(f"Extracted {len(trends)} trends for geo={geo}")
else:
logger.warning(f"No valid trends found with search volume >= 20K for geo={geo}")
except WebDriverException as e:
logger.error(f"WebDriver error: {e}")
finally:
if self.driver:
self.driver.quit()
self.driver = None
logger.info(f"Chrome driver closed for geo={geo}")
return trends
def fetch_duckduckgo_news_context(self, trend_title: str, hours: int = 24) -> str:
"""Fetch news context for a trend from DuckDuckGo."""
try:
with DDGS() as ddgs:
results = ddgs.news(f"{trend_title} news", timelimit="d", max_results=5)
titles = []
for r in results:
try:
date_str = r["date"]
dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=timezone.utc) if '+00:00' in date_str else datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
if dt > (datetime.now(timezone.utc) - timedelta(hours=hours)):
titles.append(r["title"].lower())
except ValueError as e:
logger.warning(f"Date parsing failed for '{date_str}': {e}")
continue
context = " ".join(titles) if titles else "No recent news found within 24 hours"
logger.info(f"DuckDuckGo News context for '{trend_title}': {context}")
return context
except Exception as e:
logger.warning(f"DuckDuckGo News context fetch failed for '{trend_title}': {e}")
return trend_title
def curate_from_google_trends(self, geo_list: List[str] = ['US']) -> Tuple[Optional[Dict], Optional[str], int]:
"""Curate content from Google Trends for multiple regions."""
all_trends = []
for geo in geo_list:
trends = self.scrape_google_trends(geo=geo)
if trends:
all_trends.extend(trends)
if not all_trends:
logger.info("No Google Trends data available")
return None, None, random.randint(600, 1800)
for trend in all_trends:
title = trend["title"]
if title in self.posted_titles:
logger.info(f"Skipping already posted trend: {title}")
continue continue
if trends: logger.info(f"Processing Google Trend: {title}")
trends.sort(key=lambda x: x["search_volume"], reverse=True) image_query, relevance_keywords, skip = smart_image_and_filter(title, trend.get("summary", ""))
logging.info(f"Extracted {len(trends)} trends for geo={geo}: {[t['title'] for t in trends]}") if skip:
print(f"Raw trends fetched for geo={geo}: {[t['title'] for t in trends]}") logger.info(f"Skipping filtered Google Trend: {title}")
else: continue
logging.warning(f"No valid trends found with search volume >= 20K for geo={geo}")
return trends
finally:
driver.quit()
logging.info(f"Chrome driver closed for geo={geo}")
def fetch_duckduckgo_news_context(trend_title, hours=24): scoring_content = f"{title}\n\n{trend.get('summary', '')}"
try: interest_score = is_interesting(scoring_content)
with DDGS() as ddgs: if interest_score < 6:
results = ddgs.news(f"{trend_title} news", timelimit="d", max_results=5) logger.info(f"Google Trends Interest Too Low: {interest_score}")
titles = [] continue
for r in results:
try:
date_str = r["date"]
if '+00:00' in date_str:
dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=timezone.utc)
else:
dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
if dt > (datetime.now(timezone.utc) - timedelta(hours=24)):
titles.append(r["title"].lower())
except ValueError as e:
logging.warning(f"Date parsing failed for '{date_str}': {e}")
continue
context = " ".join(titles) if titles else "No recent news found within 24 hours"
logging.info(f"DuckDuckGo News context for '{trend_title}': {context}")
return context
except Exception as e:
logging.warning(f"DuckDuckGo News context fetch failed for '{trend_title}': {e}")
return trend_title
def curate_from_google_trends(geo_list=['US']): num_paragraphs = determine_paragraph_count(interest_score)
all_trends = [] extra_prompt = (
for geo in geo_list: f"Generate exactly {num_paragraphs} paragraphs.\n"
trends = scrape_google_trends(geo=geo) f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n"
if trends: f"Do NOT introduce unrelated concepts.\n"
all_trends.extend(trends) f"Expand on the core idea with relevant context about its appeal or significance in food trends.\n"
f"Do not include emojis in the summary."
)
final_summary = summarize_with_gpt4o(
scoring_content,
"Google Trends",
trend["link"],
interest_score=interest_score,
extra_prompt=extra_prompt
)
if not final_summary:
logger.info(f"Summary failed for '{title}'")
continue
final_summary = insert_link_naturally(final_summary, "Google Trends", trend["link"])
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
if post_data and author:
return post_data, author, random.randint(600, 1800)
if not all_trends:
print("No Google Trends data available")
logging.info("No Google Trends data available")
return None, None, random.randint(600, 1800) return None, None, random.randint(600, 1800)
attempts = 0
max_attempts = 10
while attempts < max_attempts and all_trends:
trend = all_trends.pop(0)
title = trend["title"]
link = trend.get("link", "https://trends.google.com/")
summary = trend.get("summary", "")
source_name = "Google Trends"
original_source = f'<a href="{link}">{source_name}</a>'
if title in posted_titles:
print(f"Skipping already posted trend: {title}")
logging.info(f"Skipping already posted trend: {title}")
attempts += 1
continue
print(f"Trying Google Trend: {title} from {source_name}")
logging.info(f"Trying Google Trend: {title} from {source_name}")
image_query, relevance_keywords, skip = smart_image_and_filter(title, summary)
if skip:
print(f"Skipping filtered Google Trend: {title}")
logging.info(f"Skipping filtered Google Trend: {title}")
attempts += 1
continue
scoring_content = f"{title}\n\n{summary}"
interest_score = is_interesting(scoring_content)
logging.info(f"Interest score for '{title}': {interest_score}")
if interest_score < 6:
print(f"Google Trends Interest Too Low: {interest_score}")
logging.info(f"Google Trends Interest Too Low: {interest_score}")
attempts += 1
continue
num_paragraphs = determine_paragraph_count(interest_score)
extra_prompt = (
f"Generate exactly {num_paragraphs} paragraphs.\n"
f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n"
f"Do NOT introduce unrelated concepts.\n"
f"Expand on the core idea with relevant context about its appeal or significance in food trends.\n"
f"Do not include emojis in the summary."
)
content_to_summarize = scoring_content
final_summary = summarize_with_gpt4o(
content_to_summarize,
source_name,
link,
interest_score=interest_score,
extra_prompt=extra_prompt
)
if not final_summary:
logging.info(f"Summary failed for '{title}'")
attempts += 1
continue
final_summary = insert_link_naturally(final_summary, source_name, link)
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
if not post_data:
attempts += 1
continue
image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords)
if not image_url:
image_url, image_source, uploader, page_url = get_image(image_query)
hook = get_dynamic_hook(post_data["title"]).strip()
# Generate viral share prompt
share_prompt = get_viral_share_prompt(post_data["title"], final_summary)
share_links_template = (
f'<p>{share_prompt} '
f'<a href="https://x.com/intent/tweet?url={{post_url}}&text={{share_text}}" target="_blank"><i class="tsi tsi-twitter"></i></a> '
f'<a href="https://www.facebook.com/sharer/sharer.php?u={{post_url}}" target="_blank"><i class="tsi tsi-facebook"></i></a></p>'
)
post_data["content"] = f"{final_summary}\n\n{share_links_template}"
global is_posting
is_posting = True
try:
post_id, post_url = post_to_wp(
post_data=post_data,
category=category,
link=link,
author=author,
image_url=image_url,
original_source=original_source,
image_source=image_source,
uploader=uploader,
pixabay_url=pixabay_url,
interest_score=interest_score,
should_post_tweet=True
)
finally:
is_posting = False
if post_id:
share_text = f"Check out this foodie gem! {post_data['title']}"
share_text_encoded = quote(share_text)
post_url_encoded = quote(post_url)
share_links = share_links_template.format(post_url=post_url_encoded, share_text=share_text_encoded)
# Removed: cta = select_best_cta(post_data["title"], final_summary, post_url=post_url)
post_data["content"] = f"{final_summary}\n\n{share_links}" # Removed cta from content
is_posting = True
try:
post_to_wp(
post_data=post_data,
category=category,
link=link,
author=author,
image_url=image_url,
original_source=original_source,
image_source=image_source,
uploader=uploader,
pixabay_url=pixabay_url,
interest_score=interest_score,
post_id=post_id,
should_post_tweet=False
)
finally:
is_posting = False
timestamp = datetime.now(timezone.utc).isoformat()
save_json_file(POSTED_TITLES_FILE, title, timestamp)
posted_titles.add(title)
logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE}")
if image_url:
save_json_file(USED_IMAGES_FILE, image_url, timestamp)
used_images.add(image_url)
logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE}")
print(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Google Trends *****")
logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Google Trends *****")
return post_data, category, random.randint(0, 1800)
attempts += 1
logging.info(f"WP posting failed for '{post_data['title']}'")
print("No interesting Google Trend found after attempts")
logging.info("No interesting Google Trend found after attempts")
return None, None, random.randint(600, 1800)
def run_google_trends_automator(): def run_google_trends_automator():
logging.info("***** Google Trends Automator Launched *****") """Main function to run the Google Trends automator."""
geo_list = ['US', 'GB', 'AU'] scraper = GoogleTrendsScraper()
post_data, category, sleep_time = curate_from_google_trends(geo_list=geo_list) while True:
if sleep_time is None: try:
sleep_time = random.randint(600, 1800) post_data, author, sleep_time = scraper.curate_from_google_trends()
print(f"Sleeping for {sleep_time}s") if post_data and author:
logging.info(f"Completed run with sleep time: {sleep_time} seconds") global is_posting
time.sleep(sleep_time) is_posting = True
return post_data, category, sleep_time try:
post_to_wp(post_data, author)
logger.info(f"Successfully posted: {post_data['title']}")
finally:
is_posting = False
time.sleep(sleep_time)
except Exception as e:
logger.error(f"Error in Google Trends automator: {e}")
time.sleep(300) # Wait 5 minutes before retrying
if __name__ == "__main__": if __name__ == "__main__":
run_google_trends_automator() run_google_trends_automator()
+244 -311
View File
@@ -9,6 +9,7 @@ import signal
import sys import sys
import re import re
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Tuple, Set
from openai import OpenAI from openai import OpenAI
from urllib.parse import quote from urllib.parse import quote
from requests.packages.urllib3.util.retry import Retry from requests.packages.urllib3.util.retry import Retry
@@ -19,7 +20,7 @@ from foodie_config import (
AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS,
PERSONA_CONFIGS, CATEGORIES, get_clean_source_name, PERSONA_CONFIGS, CATEGORIES, get_clean_source_name,
REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT, LIGHT_TASK_MODEL, REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT, LIGHT_TASK_MODEL,
X_API_CREDENTIALS X_API_CREDENTIALS, FILE_PATHS, EXPIRATION_DAYS, IMAGE_EXPIRATION_DAYS
) )
from foodie_utils import ( from foodie_utils import (
load_json_file, save_json_file, get_image, generate_image_query, load_json_file, save_json_file, get_image, generate_image_query,
@@ -28,29 +29,48 @@ from foodie_utils import (
prepare_post_data, select_best_author, smart_image_and_filter, prepare_post_data, select_best_author, smart_image_and_filter,
get_flickr_image get_flickr_image
) )
from foodie_hooks import get_dynamic_hook, get_viral_share_prompt # Removed select_best_cta import from foodie_hooks import get_dynamic_hook, get_viral_share_prompt
# Load environment variables
load_dotenv() load_dotenv()
# Global state
is_posting = False is_posting = False
logger = logging.getLogger(__name__)
def signal_handler(sig, frame): class RedditScraper:
logging.info("Received termination signal, checking if safe to exit...") def __init__(self):
if is_posting: self.setup_logging()
logging.info("Currently posting, will exit after completion.") self.setup_signal_handlers()
else: self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
logging.info("Safe to exit immediately.") self.posted_titles = self.load_posted_titles()
sys.exit(0) self.used_images = self.load_used_images()
self.reddit = self.setup_reddit_client()
self.setup_requests_session()
signal.signal(signal.SIGTERM, signal_handler) def setup_logging(self) -> None:
signal.signal(signal.SIGINT, signal_handler) """Configure logging for the scraper."""
log_file = FILE_PATHS["posted_reddit_titles"].with_suffix('.log')
self.prune_old_logs(log_file)
LOG_FILE = "/home/shane/foodie_automator/foodie_automator_reddit.log" logging.basicConfig(
LOG_PRUNE_DAYS = 30 filename=str(log_file),
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("prawcore").setLevel(logging.WARNING)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logger.info("Logging initialized for Reddit scraper")
def setup_logging(): def prune_old_logs(self, log_file: str) -> None:
if os.path.exists(LOG_FILE): """Prune log entries older than LOG_PRUNE_DAYS."""
with open(LOG_FILE, 'r') as f: if not os.path.exists(log_file):
return
with open(log_file, 'r') as f:
lines = f.readlines() lines = f.readlines()
log_entries = [] log_entries = []
@@ -68,7 +88,7 @@ def setup_logging():
if current_entry: if current_entry:
log_entries.append(''.join(current_entry)) log_entries.append(''.join(current_entry))
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS) cutoff = datetime.now(timezone.utc) - timedelta(days=30) # LOG_PRUNE_DAYS
pruned_entries = [] pruned_entries = []
for entry in log_entries: for entry in log_entries:
try: try:
@@ -76,323 +96,236 @@ def setup_logging():
if timestamp > cutoff: if timestamp > cutoff:
pruned_entries.append(entry) pruned_entries.append(entry)
except ValueError: except ValueError:
logging.warning(f"Skipping malformed log entry (no timestamp): {entry[:50]}...") logger.warning(f"Skipping malformed log entry (no timestamp): {entry[:50]}...")
continue continue
with open(LOG_FILE, 'w') as f: with open(log_file, 'w') as f:
f.writelines(pruned_entries) f.writelines(pruned_entries)
logging.basicConfig( def setup_signal_handlers(self) -> None:
filename=LOG_FILE, """Set up signal handlers for graceful shutdown."""
level=logging.INFO, def signal_handler(sig, frame):
format="%(asctime)s - %(levelname)s - %(message)s" logger.info("Received termination signal, checking if safe to exit...")
) if is_posting:
logging.getLogger("requests").setLevel(logging.WARNING) logger.info("Currently posting, will exit after completion.")
logging.getLogger("prawcore").setLevel(logging.WARNING) else:
console_handler = logging.StreamHandler() logger.info("Safe to exit immediately.")
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) sys.exit(0)
logging.getLogger().addHandler(console_handler)
logging.info("Logging initialized for foodie_automator_reddit.py")
setup_logging() signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
POSTED_TITLES_FILE = '/home/shane/foodie_automator/posted_reddit_titles.json' def setup_reddit_client(self) -> praw.Reddit:
USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json' """Set up and return a Reddit client with proper configuration."""
EXPIRATION_HOURS = 24 return praw.Reddit(
IMAGE_EXPIRATION_DAYS = 7 client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
posted_titles_data = load_json_file(POSTED_TITLES_FILE, EXPIRATION_HOURS) user_agent=REDDIT_USER_AGENT
posted_titles = set(entry["title"] for entry in posted_titles_data if "title" in entry)
used_images_data = load_json_file(USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS)
used_images = set(entry["title"] for entry in used_images_data if "title" in entry)
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def clean_reddit_title(title):
cleaned_title = re.sub(r'^\[.*?\]\s*', '', title).strip()
logging.info(f"Cleaned Reddit title from '{title}' to '{cleaned_title}'")
return cleaned_title
def is_interesting_reddit(title, summary, upvotes, comment_count, top_comments):
try:
content = f"Title: {title}\n\nContent: {summary}"
if top_comments:
content += f"\n\nTop Comments:\n{'\n'.join(top_comments)}"
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Rate this Reddit post from 0-10 based on rarity, buzzworthiness, and engagement potential for food lovers, covering food topics (skip recipes). "
"Score 8-10 for rare, highly shareable ideas (e.g., unique dishes or restaurant trends). "
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. "
"Consider comments for added context (e.g., specific locations or unique details). "
"Return only a number."
)},
{"role": "user", "content": content}
],
max_tokens=5
) )
base_score = int(response.choices[0].message.content.strip()) if response.choices[0].message.content.strip().isdigit() else 0
engagement_boost = 0 def setup_requests_session(self) -> None:
if upvotes >= 500: """Set up a requests session with retry logic."""
engagement_boost += 3 self.session = requests.Session()
elif upvotes >= 100: retries = Retry(
engagement_boost += 2 total=5,
elif upvotes >= 50: backoff_factor=0.1,
engagement_boost += 1 status_forcelist=[500, 502, 503, 504]
)
self.session.mount('https://', HTTPAdapter(max_retries=retries))
if comment_count >= 100: def load_posted_titles(self) -> Set[str]:
engagement_boost += 2 """Load and return the set of posted titles."""
elif comment_count >= 20:
engagement_boost += 1
final_score = min(base_score + engagement_boost, 10)
logging.info(f"Reddit Interest Score: {final_score} (base: {base_score}, upvotes: {upvotes}, comments: {comment_count}, top_comments: {len(top_comments)}) for '{title}'")
print(f"Interest Score for '{title[:50]}...': {final_score} (base: {base_score}, upvotes: {upvotes}, comments: {comment_count})")
return final_score
except Exception as e:
logging.error(f"Reddit interestingness scoring failed: {e}")
print(f"Reddit Interest Error: {e}")
return 0
def get_top_comments(post_url, reddit, limit=3):
try:
submission = reddit.submission(url=post_url)
submission.comment_sort = 'top'
submission.comments.replace_more(limit=0)
top_comments = [comment.body for comment in submission.comments[:limit] if not comment.body.startswith('[deleted]')]
logging.info(f"Fetched {len(top_comments)} top comments for {post_url}")
return top_comments
except Exception as e:
logging.error(f"Failed to fetch comments for {post_url}: {e}")
return []
def fetch_reddit_posts():
reddit = praw.Reddit(
client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
user_agent=REDDIT_USER_AGENT
)
feeds = ['FoodPorn', 'restaurant', 'FoodIndustry', 'food']
articles = []
cutoff_date = datetime.now(timezone.utc) - timedelta(hours=EXPIRATION_HOURS)
logging.info(f"Starting fetch with cutoff date: {cutoff_date}")
for subreddit_name in feeds:
try: try:
subreddit = reddit.subreddit(subreddit_name) data = load_json_file(FILE_PATHS["posted_reddit_titles"], EXPIRATION_DAYS)
for submission in subreddit.top(time_filter='day', limit=100): return {entry["title"] for entry in data if "title" in entry}
pub_date = datetime.fromtimestamp(submission.created_utc, tz=timezone.utc)
if pub_date < cutoff_date:
logging.info(f"Skipping old post: {submission.title} (Published: {pub_date})")
continue
cleaned_title = clean_reddit_title(submission.title)
articles.append({
"title": cleaned_title,
"raw_title": submission.title,
"link": f"https://www.reddit.com{submission.permalink}",
"summary": submission.selftext,
"feed_title": get_clean_source_name(subreddit_name),
"pub_date": pub_date,
"upvotes": submission.score,
"comment_count": submission.num_comments
})
logging.info(f"Fetched {len(articles)} posts from r/{subreddit_name}")
except Exception as e: except Exception as e:
logging.error(f"Failed to fetch Reddit feed r/{subreddit_name}: {e}") logger.error(f"Error loading posted titles: {e}")
return set()
logging.info(f"Total Reddit posts fetched: {len(articles)}") def load_used_images(self) -> Set[str]:
return articles """Load and return the set of used images."""
def curate_from_reddit():
articles = fetch_reddit_posts()
if not articles:
print("No Reddit posts available")
logging.info("No Reddit posts available")
return None, None, None
articles.sort(key=lambda x: x["upvotes"], reverse=True)
reddit = praw.Reddit(
client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
user_agent=REDDIT_USER_AGENT
)
attempts = 0
max_attempts = 10
while attempts < max_attempts and articles:
article = articles.pop(0)
title = article["title"]
raw_title = article["raw_title"]
link = article["link"]
summary = article["summary"]
source_name = "Reddit"
original_source = '<a href="https://www.reddit.com/">Reddit</a>'
if raw_title in posted_titles:
print(f"Skipping already posted post: {raw_title}")
logging.info(f"Skipping already posted post: {raw_title}")
attempts += 1
continue
print(f"Trying Reddit Post: {title} from {source_name}")
logging.info(f"Trying Reddit Post: {title} from {source_name}")
image_query, relevance_keywords, skip = smart_image_and_filter(title, summary)
if skip or any(keyword in title.lower() or keyword in raw_title.lower() for keyword in RECIPE_KEYWORDS + ["homemade"]):
print(f"Skipping filtered Reddit post: {title}")
logging.info(f"Skipping filtered Reddit post: {title}")
attempts += 1
continue
top_comments = get_top_comments(link, reddit, limit=3)
interest_score = is_interesting_reddit(
title,
summary,
article["upvotes"],
article["comment_count"],
top_comments
)
logging.info(f"Interest Score: {interest_score} for '{title}'")
if interest_score < 6:
print(f"Reddit Interest Too Low: {interest_score}")
logging.info(f"Reddit Interest Too Low: {interest_score}")
attempts += 1
continue
num_paragraphs = determine_paragraph_count(interest_score)
extra_prompt = (
f"Generate exactly {num_paragraphs} paragraphs.\n"
f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n"
f"Incorporate relevant insights from these top comments if available: {', '.join(top_comments) if top_comments else 'None'}.\n"
f"Do NOT introduce unrelated concepts unless in the content or comments.\n"
f"If brief, expand on the core idea with relevant context about its appeal or significance.\n"
f"Do not include emojis in the summary."
)
content_to_summarize = f"{title}\n\n{summary}"
if top_comments:
content_to_summarize += f"\n\nTop Comments:\n{'\n'.join(top_comments)}"
final_summary = summarize_with_gpt4o(
content_to_summarize,
source_name,
link,
interest_score=interest_score,
extra_prompt=extra_prompt
)
if not final_summary:
logging.info(f"Summary failed for '{title}'")
attempts += 1
continue
final_summary = insert_link_naturally(final_summary, source_name, link)
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
if not post_data:
attempts += 1
continue
image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords)
if not image_url:
image_url, image_source, uploader, page_url = get_image(image_query)
hook = get_dynamic_hook(post_data["title"]).strip()
# Removed: cta = select_best_cta(post_data["title"], final_summary, post_url=None)
# Generate viral share prompt
share_prompt = get_viral_share_prompt(post_data["title"], final_summary)
share_links_template = (
f'<p>{share_prompt} '
f'<a href="https://x.com/intent/tweet?url={{post_url}}&text={{share_text}}" target="_blank"><i class="tsi tsi-twitter"></i></a> '
f'<a href="https://www.facebook.com/sharer/sharer.php?u={{post_url}}" target="_blank"><i class="tsi tsi-facebook"></i></a></p>'
)
post_data["content"] = f"{final_summary}\n\n{share_links_template}" # Removed cta from content
global is_posting
is_posting = True
try: try:
post_id, post_url = post_to_wp( data = load_json_file(FILE_PATHS["used_images"], IMAGE_EXPIRATION_DAYS)
post_data=post_data, return {entry["title"] for entry in data if "title" in entry}
category=category, except Exception as e:
link=link, logger.error(f"Error loading used images: {e}")
author=author, return set()
image_url=image_url,
original_source=original_source, def clean_reddit_title(self, title: str) -> str:
image_source=image_source, """Clean and standardize Reddit post titles."""
uploader=uploader, cleaned_title = re.sub(r'^\[.*?\]\s*', '', title).strip()
pixabay_url=pixabay_url, logger.info(f"Cleaned Reddit title from '{title}' to '{cleaned_title}'")
interest_score=interest_score, return cleaned_title
should_post_tweet=True
def is_interesting_reddit(self, title: str, summary: str, upvotes: int, comment_count: int, top_comments: List[str]) -> int:
"""Determine the interest score for a Reddit post."""
try:
content = f"Title: {title}\n\nContent: {summary}"
if top_comments:
content += f"\n\nTop Comments:\n{'\n'.join(top_comments)}"
response = self.client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Rate this Reddit post from 0-10 based on rarity, buzzworthiness, and engagement potential for food lovers, covering food topics (skip recipes). "
"Score 8-10 for rare, highly shareable ideas (e.g., unique dishes or restaurant trends). "
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. "
"Consider comments for added context (e.g., specific locations or unique details). "
"Return only a number."
)},
{"role": "user", "content": content}
],
max_tokens=5
) )
finally: base_score = int(response.choices[0].message.content.strip()) if response.choices[0].message.content.strip().isdigit() else 0
is_posting = False
if post_id: engagement_boost = 0
share_text = f"Check out this foodie gem! {post_data['title']}" if upvotes >= 500:
share_text_encoded = quote(share_text) engagement_boost += 3
post_url_encoded = quote(post_url) elif upvotes >= 100:
share_links = share_links_template.format(post_url=post_url_encoded, share_text=share_text_encoded) engagement_boost += 2
# Removed: cta = select_best_cta(post_data["title"], final_summary, post_url=post_url) elif upvotes >= 50:
post_data["content"] = f"{final_summary}\n\n{share_links}" # Removed cta from content engagement_boost += 1
is_posting = True
if comment_count >= 100:
engagement_boost += 2
elif comment_count >= 20:
engagement_boost += 1
final_score = min(base_score + engagement_boost, 10)
logger.info(f"Reddit Interest Score: {final_score} (base: {base_score}, upvotes: {upvotes}, comments: {comment_count}, top_comments: {len(top_comments)}) for '{title}'")
return final_score
except Exception as e:
logger.error(f"Reddit interestingness scoring failed: {e}")
return 0
def get_top_comments(self, post_url: str, limit: int = 3) -> List[str]:
"""Fetch top comments for a Reddit post."""
try:
submission = self.reddit.submission(url=post_url)
submission.comment_sort = 'top'
submission.comments.replace_more(limit=0)
top_comments = [comment.body for comment in submission.comments[:limit] if not comment.body.startswith('[deleted]')]
logger.info(f"Fetched {len(top_comments)} top comments for {post_url}")
return top_comments
except Exception as e:
logger.error(f"Failed to fetch comments for {post_url}: {e}")
return []
def fetch_reddit_posts(self) -> List[Dict]:
"""Fetch posts from configured Reddit subreddits."""
feeds = ['FoodPorn', 'restaurant', 'FoodIndustry', 'food']
articles = []
cutoff_date = datetime.now(timezone.utc) - timedelta(hours=24)
logger.info(f"Starting fetch with cutoff date: {cutoff_date}")
for subreddit_name in feeds:
try: try:
post_to_wp( subreddit = self.reddit.subreddit(subreddit_name)
post_data=post_data, for submission in subreddit.top(time_filter='day', limit=100):
category=category, pub_date = datetime.fromtimestamp(submission.created_utc, tz=timezone.utc)
link=link, if pub_date < cutoff_date:
author=author, logger.info(f"Skipping old post: {submission.title} (Published: {pub_date})")
image_url=image_url, continue
original_source=original_source, cleaned_title = self.clean_reddit_title(submission.title)
image_source=image_source, articles.append({
uploader=uploader, "title": cleaned_title,
pixabay_url=pixabay_url, "raw_title": submission.title,
interest_score=interest_score, "link": f"https://www.reddit.com{submission.permalink}",
post_id=post_id, "summary": submission.selftext,
should_post_tweet=False "feed_title": get_clean_source_name(subreddit_name),
) "pub_date": pub_date,
finally: "upvotes": submission.score,
is_posting = False "comment_count": submission.num_comments
})
logger.info(f"Fetched {len(articles)} posts from r/{subreddit_name}")
except Exception as e:
logger.error(f"Failed to fetch Reddit feed r/{subreddit_name}: {e}")
timestamp = datetime.now(timezone.utc).isoformat() logger.info(f"Total Reddit posts fetched: {len(articles)}")
save_json_file(POSTED_TITLES_FILE, raw_title, timestamp) return articles
posted_titles.add(raw_title)
logging.info(f"Successfully saved '{raw_title}' to {POSTED_TITLES_FILE} with timestamp {timestamp}")
if image_url: def curate_from_reddit(self) -> Tuple[Optional[Dict], Optional[str], int]:
save_json_file(USED_IMAGES_FILE, image_url, timestamp) """Curate content from Reddit posts."""
used_images.add(image_url) articles = self.fetch_reddit_posts()
logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE} with timestamp {timestamp}") if not articles:
logger.info("No Reddit posts available")
return None, None, random.randint(600, 1800)
print(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Reddit *****") articles.sort(key=lambda x: x["upvotes"], reverse=True)
print(f"Actual post URL: {post_url}")
logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Reddit *****")
logging.info(f"Actual post URL: {post_url}")
return post_data, category, random.randint(0, 1800)
attempts += 1 for article in articles:
logging.info(f"WP posting failed for '{post_data['title']}'") title = article["title"]
raw_title = article["raw_title"]
link = article["link"]
summary = article["summary"]
print("No interesting Reddit post found after attempts") if raw_title in self.posted_titles:
logging.info("No interesting Reddit post found after attempts") logger.info(f"Skipping already posted post: {raw_title}")
return None, None, random.randint(600, 1800) continue
logger.info(f"Processing Reddit Post: {title}")
image_query, relevance_keywords, skip = smart_image_and_filter(title, summary)
if skip or any(keyword in title.lower() or keyword in raw_title.lower() for keyword in RECIPE_KEYWORDS + ["homemade"]):
logger.info(f"Skipping filtered Reddit post: {title}")
continue
top_comments = self.get_top_comments(link)
interest_score = self.is_interesting_reddit(title, summary, article["upvotes"], article["comment_count"], top_comments)
if interest_score < 6:
logger.info(f"Reddit Interest Too Low: {interest_score}")
continue
num_paragraphs = determine_paragraph_count(interest_score)
extra_prompt = (
f"Generate exactly {num_paragraphs} paragraphs.\n"
f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n"
f"Do NOT introduce unrelated concepts.\n"
f"Expand on the core idea with relevant context about its appeal or significance in food trends.\n"
f"Do not include emojis in the summary."
)
final_summary = summarize_with_gpt4o(
f"{title}\n\n{summary}",
"Reddit",
link,
interest_score=interest_score,
extra_prompt=extra_prompt
)
if not final_summary:
logger.info(f"Summary failed for '{title}'")
continue
final_summary = insert_link_naturally(final_summary, "Reddit", link)
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
if post_data and author:
return post_data, author, random.randint(600, 1800)
return None, None, random.randint(600, 1800)
def run_reddit_automator(): def run_reddit_automator():
print(f"{datetime.now(timezone.utc)} - INFO - ***** Reddit Automator Launched *****") """Main function to run the Reddit automator."""
logging.info("***** Reddit Automator Launched *****") scraper = RedditScraper()
while True:
post_data, category, sleep_time = curate_from_reddit() try:
if not post_data: post_data, author, sleep_time = scraper.curate_from_reddit()
print(f"No postable Reddit article found - sleeping for {sleep_time} seconds") if post_data and author:
logging.info(f"No postable Reddit article found - sleeping for {sleep_time} seconds") global is_posting
else: is_posting = True
print(f"Completed Reddit run with sleep time: {sleep_time} seconds") try:
logging.info(f"Completed Reddit run with sleep time: {sleep_time} seconds") post_to_wp(post_data, author)
print(f"Sleeping for {sleep_time}s") logger.info(f"Successfully posted: {post_data['title']}")
time.sleep(sleep_time) finally:
return post_data, category, sleep_time is_posting = False
time.sleep(sleep_time)
except Exception as e:
logger.error(f"Error in Reddit automator: {e}")
time.sleep(300) # Wait 5 minutes before retrying
if __name__ == "__main__": if __name__ == "__main__":
run_reddit_automator() run_reddit_automator()
+212 -281
View File
@@ -10,6 +10,7 @@ import sys
import re import re
import email.utils import email.utils
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Tuple, Set
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from openai import OpenAI from openai import OpenAI
from urllib.parse import quote from urllib.parse import quote
@@ -18,7 +19,8 @@ from requests.adapters import HTTPAdapter
from foodie_config import ( from foodie_config import (
RSS_FEEDS, RSS_FEED_NAMES, AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, RSS_FEEDS, RSS_FEED_NAMES, AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS,
HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS, CATEGORIES, HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS, CATEGORIES,
get_clean_source_name, X_API_CREDENTIALS get_clean_source_name, X_API_CREDENTIALS, FILE_PATHS, EXPIRATION_DAYS,
IMAGE_EXPIRATION_DAYS, LIGHT_TASK_MODEL
) )
from foodie_utils import ( from foodie_utils import (
load_json_file, save_json_file, get_image, generate_image_query, load_json_file, save_json_file, get_image, generate_image_query,
@@ -30,42 +32,50 @@ from foodie_utils import (
from foodie_hooks import get_dynamic_hook, get_viral_share_prompt from foodie_hooks import get_dynamic_hook, get_viral_share_prompt
from dotenv import load_dotenv from dotenv import load_dotenv
# Load environment variables
load_dotenv() load_dotenv()
# Global state
is_posting = False is_posting = False
logger = logging.getLogger(__name__)
def signal_handler(sig, frame): class RSSScraper:
logging.info("Received termination signal, checking if safe to exit...") def __init__(self):
if is_posting: self.setup_logging()
logging.info("Currently posting, will exit after completion.") self.setup_signal_handlers()
else: self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
logging.info("Safe to exit immediately.") self.posted_titles = self.load_posted_titles()
sys.exit(0) self.used_images = self.load_used_images()
self.session = self.setup_http_session()
signal.signal(signal.SIGTERM, signal_handler) def setup_logging(self) -> None:
signal.signal(signal.SIGINT, signal_handler) """Configure logging for the scraper."""
log_file = FILE_PATHS["posted_rss_titles"].with_suffix('.log')
self.prune_old_logs(log_file)
LOG_FILE = "/home/shane/foodie_automator/foodie_automator_rss.log" logging.basicConfig(
LOG_PRUNE_DAYS = 30 filename=str(log_file),
FEED_TIMEOUT = 15 level=logging.INFO,
MAX_RETRIES = 3 format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.getLogger("requests").setLevel(logging.WARNING)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logger.info("Logging initialized for RSS scraper")
POSTED_TITLES_FILE = '/home/shane/foodie_automator/posted_rss_titles.json' def prune_old_logs(self, log_file: str) -> None:
USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json' """Prune log entries older than LOG_PRUNE_DAYS."""
EXPIRATION_HOURS = 24 if not os.path.exists(log_file):
IMAGE_EXPIRATION_DAYS = 7 return
posted_titles_data = load_json_file(POSTED_TITLES_FILE, EXPIRATION_HOURS) with open(log_file, 'r') as f:
posted_titles = set(entry["title"] for entry in posted_titles_data)
used_images = set(entry["title"] for entry in load_json_file(USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS) if "title" in entry)
def setup_logging():
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines() lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
cutoff = datetime.now(timezone.utc) - timedelta(days=30) # LOG_PRUNE_DAYS
pruned_lines = [] pruned_lines = []
malformed_count = 0 malformed_count = 0
for line in lines: for line in lines:
if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit(): if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit():
malformed_count += 1 malformed_count += 1
@@ -77,290 +87,211 @@ def setup_logging():
except ValueError: except ValueError:
malformed_count += 1 malformed_count += 1
continue continue
if malformed_count > 0: if malformed_count > 0:
logging.info(f"Skipped {malformed_count} malformed log lines during pruning") logger.warning(f"Skipped {malformed_count} malformed log lines during pruning")
with open(LOG_FILE, 'w') as f:
with open(log_file, 'w') as f:
f.writelines(pruned_lines) f.writelines(pruned_lines)
logging.basicConfig( def setup_signal_handlers(self) -> None:
filename=LOG_FILE, """Set up signal handlers for graceful shutdown."""
level=logging.INFO, def signal_handler(sig, frame):
format="%(asctime)s - %(levelname)s - %(message)s", logger.info("Received termination signal, checking if safe to exit...")
datefmt="%Y-%m-%d %H:%M:%S" if is_posting:
) logger.info("Currently posting, will exit after completion.")
console_handler = logging.StreamHandler() else:
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.info("Safe to exit immediately.")
logging.getLogger().addHandler(console_handler) sys.exit(0)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.info("Logging initialized for foodie_automator_rss.py")
setup_logging() signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def create_http_session() -> requests.Session: def setup_http_session(self) -> requests.Session:
session = requests.Session() """Set up a requests session with retry logic."""
retry_strategy = Retry( session = requests.Session()
total=MAX_RETRIES, retry_strategy = Retry(
backoff_factor=2, total=3,
status_forcelist=[403, 429, 500, 502, 503, 504], backoff_factor=2,
allowed_methods=["GET", "POST"] status_forcelist=[403, 429, 500, 502, 503, 504],
) allowed_methods=["GET", "POST"]
adapter = HTTPAdapter( )
max_retries=retry_strategy, adapter = HTTPAdapter(
pool_connections=10, max_retries=retry_strategy,
pool_maxsize=10 pool_connections=10,
) pool_maxsize=10
session.mount("http://", adapter) )
session.mount("https://", adapter) session.mount("http://", adapter)
session.headers.update({ session.mount("https://", adapter)
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36' session.headers.update({
}) 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
return session })
return session
def parse_date(date_str): def load_posted_titles(self) -> Set[str]:
try: """Load and return the set of posted titles."""
parsed_date = email.utils.parsedate_to_datetime(date_str)
if parsed_date.tzinfo is None:
parsed_date = parsed_date.replace(tzinfo=timezone.utc)
return parsed_date
except Exception as e:
logging.error(f"Failed to parse date '{date_str}': {e}")
return datetime.now(timezone.utc)
def fetch_rss_feeds():
logging.info("Starting fetch_rss_feeds")
articles = []
cutoff_date = datetime.now(timezone.utc) - timedelta(hours=EXPIRATION_HOURS)
session = create_http_session()
if not RSS_FEEDS:
logging.error("RSS_FEEDS is empty in foodie_config.py")
return articles
for feed_url in RSS_FEEDS:
logging.info(f"Processing feed: {feed_url}")
try: try:
response = session.get(feed_url, timeout=FEED_TIMEOUT) data = load_json_file(FILE_PATHS["posted_rss_titles"], EXPIRATION_DAYS)
response.raise_for_status() return {entry["title"] for entry in data if "title" in entry}
soup = BeautifulSoup(response.content, 'xml')
items = soup.find_all('item')
feed_title = RSS_FEED_NAMES.get(feed_url, (get_clean_source_name(feed_url), feed_url))
for item in items:
try:
title = item.find('title').text.strip() if item.find('title') else "Untitled"
link = item.find('link').text.strip() if item.find('link') else ""
pub_date = item.find('pubDate')
pub_date = parse_date(pub_date.text) if pub_date else datetime.now(timezone.utc)
if pub_date < cutoff_date:
logging.info(f"Skipping old article: {title} (Published: {pub_date})")
continue
description = item.find('description')
summary = BeautifulSoup(description.text, 'html.parser').get_text().strip() if description else ""
content = item.find('content:encoded')
content_text = BeautifulSoup(content.text, 'html.parser').get_text().strip() if content else summary
articles.append({
"title": title,
"link": link,
"summary": summary,
"content": content_text,
"feed_title": feed_title[0] if isinstance(feed_title, tuple) else feed_title,
"pub_date": pub_date
})
logging.debug(f"Processed article: {title}")
except Exception as e:
logging.warning(f"Error processing entry in {feed_url}: {e}")
continue
logging.info(f"Filtered to {len(articles)} articles from {feed_url}")
except Exception as e: except Exception as e:
logging.error(f"Failed to fetch RSS feed {feed_url}: {e}") logger.error(f"Error loading posted titles: {e}")
continue return set()
articles.sort(key=lambda x: x["pub_date"], reverse=True) def load_used_images(self) -> Set[str]:
logging.info(f"Total RSS articles fetched: {len(articles)}") """Load and return the set of used images."""
return articles try:
data = load_json_file(FILE_PATHS["used_images"], IMAGE_EXPIRATION_DAYS)
return {entry["title"] for entry in data if "title" in entry}
except Exception as e:
logger.error(f"Error loading used images: {e}")
return set()
def curate_from_rss(): def parse_date(self, date_str: str) -> datetime:
articles = fetch_rss_feeds() """Parse a date string into a datetime object."""
if not articles: try:
print("No RSS articles available") parsed_date = email.utils.parsedate_to_datetime(date_str)
logging.info("No RSS articles available") if parsed_date.tzinfo is None:
return None, None, random.randint(600, 1800) parsed_date = parsed_date.replace(tzinfo=timezone.utc)
return parsed_date
except Exception as e:
logger.error(f"Failed to parse date '{date_str}': {e}")
return datetime.now(timezone.utc)
attempts = 0 def fetch_rss_feeds(self) -> List[Dict]:
max_attempts = 10 """Fetch and process RSS feeds."""
while attempts < max_attempts and articles: logger.info("Starting fetch_rss_feeds")
article = articles.pop(0) articles = []
title = article["title"] cutoff_date = datetime.now(timezone.utc) - timedelta(hours=24)
link = article["link"]
summary = article["summary"]
content = article["content"]
source_name = article["feed_title"]
original_source = f'<a href="{link}">{source_name}</a>'
if title in posted_titles: if not RSS_FEEDS:
print(f"Skipping already posted article: {title}") logger.error("RSS_FEEDS is empty in foodie_config.py")
logging.info(f"Skipping already posted article: {title}") return articles
attempts += 1
continue
print(f"Trying RSS Article: {title} from {source_name}") for feed_url in RSS_FEEDS:
logging.info(f"Trying RSS Article: {title} from {source_name}") logger.info(f"Processing feed: {feed_url}")
try:
response = self.session.get(feed_url, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'xml')
items = soup.find_all('item')
image_query, relevance_keywords, skip = smart_image_and_filter(title, summary) feed_title = RSS_FEED_NAMES.get(feed_url, (get_clean_source_name(feed_url), feed_url))
if skip: for item in items:
print(f"Skipping filtered RSS article: {title}") try:
logging.info(f"Skipping filtered RSS article: {title}") title = item.find('title').text.strip() if item.find('title') else "Untitled"
attempts += 1 link = item.find('link').text.strip() if item.find('link') else ""
continue pub_date = item.find('pubDate')
pub_date = self.parse_date(pub_date.text) if pub_date else datetime.now(timezone.utc)
scoring_content = f"{title}\n\n{summary}\n\nContent: {content}" if pub_date < cutoff_date:
interest_score = is_interesting(scoring_content) logger.info(f"Skipping old article: {title} (Published: {pub_date})")
logging.info(f"Interest score for '{title}': {interest_score}") continue
if interest_score < 6:
print(f"RSS Interest Too Low: {interest_score}")
logging.info(f"RSS Interest Too Low: {interest_score}")
attempts += 1
continue
num_paragraphs = determine_paragraph_count(interest_score) description = item.find('description')
extra_prompt = ( summary = BeautifulSoup(description.text, 'html.parser').get_text().strip() if description else ""
f"Generate exactly {num_paragraphs} paragraphs.\n" content = item.find('content:encoded')
f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n" content_text = BeautifulSoup(content.text, 'html.parser').get_text().strip() if content else summary
f"Do NOT introduce unrelated concepts.\n"
f"Expand on the core idea with relevant context about its appeal or significance.\n"
f"Do not include emojis in the summary."
)
content_to_summarize = scoring_content
final_summary = summarize_with_gpt4o(
content_to_summarize,
source_name,
link,
interest_score=interest_score,
extra_prompt=extra_prompt
)
if not final_summary:
logging.info(f"Summary failed for '{title}'")
attempts += 1
continue
# Remove the original title from the summary while preserving paragraphs articles.append({
title_pattern = re.compile( "title": title,
r'\*\*' + re.escape(title) + r'\*\*|' + re.escape(title), "link": link,
re.IGNORECASE "summary": summary,
) "content": content_text,
paragraphs = final_summary.split('\n') "feed_title": feed_title[0] if isinstance(feed_title, tuple) else feed_title,
cleaned_paragraphs = [] "pub_date": pub_date
for para in paragraphs: })
if para.strip(): except Exception as e:
cleaned_para = title_pattern.sub('', para).strip() logger.warning(f"Error processing entry in {feed_url}: {e}")
cleaned_para = re.sub(r'\s+', ' ', cleaned_para) continue
cleaned_paragraphs.append(cleaned_para) logger.info(f"Filtered to {len(articles)} articles from {feed_url}")
final_summary = '\n'.join(cleaned_paragraphs) except Exception as e:
logger.error(f"Failed to fetch RSS feed {feed_url}: {e}")
final_summary = insert_link_naturally(final_summary, source_name, link)
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
if not post_data:
attempts += 1
continue
# Fetch image
image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords)
if not image_url:
logging.info(f"Flickr fetch failed for '{image_query}'. Falling back to Pixabay.")
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
logging.info(f"Pixabay fetch failed for '{image_query}'. Skipping article '{title}'.")
attempts += 1
continue continue
hook = get_dynamic_hook(post_data["title"]).strip() articles.sort(key=lambda x: x["pub_date"], reverse=True)
logger.info(f"Total RSS articles fetched: {len(articles)}")
return articles
# Generate viral share prompt def curate_from_rss(self) -> Tuple[Optional[Dict], Optional[str], int]:
share_prompt = get_viral_share_prompt(post_data["title"], final_summary) """Curate content from RSS feeds."""
share_links_template = ( articles = self.fetch_rss_feeds()
f'<p>{share_prompt} ' if not articles:
f'<a href="https://x.com/intent/tweet?url={{post_url}}&text={{share_text}}" target="_blank"><i class="tsi tsi-twitter"></i></a> ' logger.info("No RSS articles available")
f'<a href="https://www.facebook.com/sharer/sharer.php?u={{post_url}}" target="_blank"><i class="tsi tsi-facebook"></i></a></p>' return None, None, random.randint(600, 1800)
)
post_data["content"] = f"{final_summary}\n\n{share_links_template}" # Removed cta from content
global is_posting for article in articles:
is_posting = True title = article["title"]
try: link = article["link"]
post_id, post_url = post_to_wp( summary = article["summary"]
post_data=post_data, content = article["content"]
category=category, source_name = article["feed_title"]
link=link,
author=author, if title in self.posted_titles:
image_url=image_url, logger.info(f"Skipping already posted article: {title}")
original_source=original_source, continue
image_source=image_source,
uploader=uploader, logger.info(f"Processing RSS Article: {title} from {source_name}")
pixabay_url=pixabay_url,
interest_score=interest_score, image_query, relevance_keywords, skip = smart_image_and_filter(title, summary)
should_post_tweet=True if skip:
logger.info(f"Skipping filtered RSS article: {title}")
continue
scoring_content = f"{title}\n\n{summary}\n\nContent: {content}"
interest_score = is_interesting(scoring_content)
logger.info(f"Interest score for '{title}': {interest_score}")
if interest_score < 6:
logger.info(f"RSS Interest Too Low: {interest_score}")
continue
num_paragraphs = determine_paragraph_count(interest_score)
extra_prompt = (
f"Generate exactly {num_paragraphs} paragraphs.\n"
f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n"
f"Do NOT introduce unrelated concepts.\n"
f"Expand on the core idea with relevant context about its appeal or significance.\n"
f"Do not include emojis in the summary."
) )
finally:
is_posting = False
if post_id: final_summary = summarize_with_gpt4o(
share_text = f"Check out this foodie gem! {post_data['title']}" scoring_content,
share_text_encoded = quote(share_text) source_name,
post_url_encoded = quote(post_url) link,
share_links = share_links_template.format(post_url=post_url_encoded, share_text=share_text_encoded) interest_score=interest_score,
# Removed: cta = select_best_cta(post_data["title"], final_summary, post_url=post_url) extra_prompt=extra_prompt
post_data["content"] = f"{final_summary}\n\n{share_links}" # Removed cta from content )
is_posting = True
try:
post_to_wp(
post_data=post_data,
category=category,
link=link,
author=author,
image_url=image_url,
original_source=original_source,
image_source=image_source,
uploader=uploader,
pixabay_url=pixabay_url,
interest_score=interest_score,
post_id=post_id,
should_post_tweet=False
)
finally:
is_posting = False
timestamp = datetime.now(timezone.utc).isoformat() if not final_summary:
save_json_file(POSTED_TITLES_FILE, title, timestamp) logger.info(f"Summary failed for '{title}'")
posted_titles.add(title) continue
logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE}")
if image_url: final_summary = insert_link_naturally(final_summary, source_name, link)
save_json_file(USED_IMAGES_FILE, image_url, timestamp) post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
used_images.add(image_url)
logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE}")
print(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from RSS *****") if post_data and author:
logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from RSS *****") return post_data, author, random.randint(600, 1800)
return post_data, category, random.randint(0, 1800)
attempts += 1 return None, None, random.randint(600, 1800)
logging.info(f"WP posting failed for '{post_data['title']}'")
print("No interesting RSS article found after attempts")
logging.info("No interesting RSS article found after attempts")
return None, None, random.randint(600, 1800)
def run_rss_automator(): def run_rss_automator():
print(f"{datetime.now(timezone.utc)} - INFO - ***** RSS Automator Launched *****") """Main function to run the RSS automator."""
logging.info("***** RSS Automator Launched *****") scraper = RSSScraper()
post_data, category, sleep_time = curate_from_rss() while True:
print(f"Sleeping for {sleep_time}s") try:
logging.info(f"Completed run with sleep time: {sleep_time} seconds") post_data, author, sleep_time = scraper.curate_from_rss()
time.sleep(sleep_time) if post_data and author:
return post_data, category, sleep_time global is_posting
is_posting = True
try:
post_to_wp(post_data, author)
logger.info(f"Successfully posted: {post_data['title']}")
finally:
is_posting = False
time.sleep(sleep_time)
except Exception as e:
logger.error(f"Error in RSS automator: {e}")
time.sleep(300) # Wait 5 minutes before retrying
if __name__ == "__main__": if __name__ == "__main__":
run_rss_automator() run_rss_automator()
+116 -31
View File
@@ -2,14 +2,71 @@
# Constants shared across all automator scripts # Constants shared across all automator scripts
from dotenv import load_dotenv from dotenv import load_dotenv
import os import os
from typing import Dict, List, Optional, TypedDict, Union
from pathlib import Path
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('foodie_automator.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv() load_dotenv()
# API Keys
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PIXABAY_API_KEY = os.getenv("PIXABAY_API_KEY") PIXABAY_API_KEY = os.getenv("PIXABAY_API_KEY")
FLICKR_API_KEY = os.getenv("FLICKR_API_KEY") FLICKR_API_KEY = os.getenv("FLICKR_API_KEY")
FLICKR_API_SECRET = os.getenv("FLICKR_API_SECRET") FLICKR_API_SECRET = os.getenv("FLICKR_API_SECRET")
AUTHORS = [ # Validate required API keys
def validate_api_keys() -> None:
"""Validate that all required API keys are present."""
required_keys = {
"OPENAI_API_KEY": OPENAI_API_KEY,
"PIXABAY_API_KEY": PIXABAY_API_KEY,
"FLICKR_API_KEY": FLICKR_API_KEY,
"FLICKR_API_SECRET": FLICKR_API_SECRET
}
missing_keys = [key for key, value in required_keys.items() if not value]
if missing_keys:
logger.error(f"Missing required API keys: {', '.join(missing_keys)}")
raise ValueError(f"Missing required API keys: {', '.join(missing_keys)}")
# Type definitions
class AuthorConfig(TypedDict):
url: str
username: str
password: str
persona: str
bio: str
dob: str
class XCredentials(TypedDict):
username: str
x_username: str
api_key: str
api_secret: str
access_token: str
access_token_secret: str
client_secret: str
class PersonaConfig(TypedDict):
description: str
tone: str
article_prompt: str
x_prompt: str
# Author configurations
AUTHORS: List[AuthorConfig] = [
{ {
"url": "https://insiderfoodie.com", "url": "https://insiderfoodie.com",
"username": "owenjohnson", "username": "owenjohnson",
@@ -31,7 +88,7 @@ AUTHORS = [
"username": "aishapatel", "username": "aishapatel",
"password": os.getenv("AISHAPATEL_PASSWORD"), "password": os.getenv("AISHAPATEL_PASSWORD"),
"persona": "Trend Scout", "persona": "Trend Scout",
"bio": "I scout global food trends, obsessed with whats emerging. My sharp predictions map the industrys path—always one step ahead.", "bio": "I scout global food trends, obsessed with what's emerging. My sharp predictions map the industry's path—always one step ahead.",
"dob": "1999-03-15" "dob": "1999-03-15"
}, },
{ {
@@ -47,7 +104,7 @@ AUTHORS = [
"username": "keishareid", "username": "keishareid",
"password": os.getenv("KEISHAREID_PASSWORD"), "password": os.getenv("KEISHAREID_PASSWORD"),
"persona": "African-American Soul Food Sage", "persona": "African-American Soul Food Sage",
"bio": "I bring soul foods legacy to life, blending history with modern vibes. My stories celebrate flavor and resilience—dishing out culture with every bite.", "bio": "I bring soul food's legacy to life, blending history with modern vibes. My stories celebrate flavor and resilience—dishing out culture with every bite.",
"dob": "1994-06-10" "dob": "1994-06-10"
}, },
{ {
@@ -60,7 +117,8 @@ AUTHORS = [
} }
] ]
X_API_CREDENTIALS = [ # X (Twitter) API credentials
X_API_CREDENTIALS: List[XCredentials] = [
{ {
"username": "owenjohnson", "username": "owenjohnson",
"x_username": "@insiderfoodieowen", "x_username": "@insiderfoodieowen",
@@ -117,12 +175,13 @@ X_API_CREDENTIALS = [
} }
] ]
PERSONA_CONFIGS = { # Persona configurations
PERSONA_CONFIGS: Dict[str, PersonaConfig] = {
"Visionary Editor": { "Visionary Editor": {
"description": "a commanding food editor with a borderless view", "description": "a commanding food editor with a borderless view",
"tone": "a polished and insightful tone, like 'This redefines culinary excellence.'", "tone": "a polished and insightful tone, like 'This redefines culinary excellence.'",
"article_prompt": ( "article_prompt": (
"Youre {description}. Summarize this article in {tone}. " "You're {description}. Summarize this article in {tone}. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. "
"Add a bold take and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." "Add a bold take and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary."
@@ -139,7 +198,7 @@ PERSONA_CONFIGS = {
"description": "a seasoned foodie reviewer with a sharp eye", "description": "a seasoned foodie reviewer with a sharp eye",
"tone": "a professional yet engaging tone, like 'This dish is a revelation.'", "tone": "a professional yet engaging tone, like 'This dish is a revelation.'",
"article_prompt": ( "article_prompt": (
"Youre {description}. Summarize this article in {tone}. " "You're {description}. Summarize this article in {tone}. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. "
"Add a subtle opinion and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." "Add a subtle opinion and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary."
@@ -154,12 +213,12 @@ PERSONA_CONFIGS = {
}, },
"Trend Scout": { "Trend Scout": {
"description": "a forward-thinking editor obsessed with trends", "description": "a forward-thinking editor obsessed with trends",
"tone": "an insightful and forward-looking tone, like 'This sets the stage for whats next.'", "tone": "an insightful and forward-looking tone, like 'This sets the stage for what's next.'",
"article_prompt": ( "article_prompt": (
"Youre {description}. Summarize this article in {tone}. " "You're {description}. Summarize this article in {tone}. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. "
"Predict whats next and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." "Predict what's next and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary."
), ),
"x_prompt": ( "x_prompt": (
"Craft a tweet as {description}. Keep it under 280 characters, using {tone}. " "Craft a tweet as {description}. Keep it under 280 characters, using {tone}. "
@@ -173,7 +232,7 @@ PERSONA_CONFIGS = {
"description": "a cultured food writer who loves storytelling", "description": "a cultured food writer who loves storytelling",
"tone": "a warm and thoughtful tone, like 'This evokes a sense of tradition.'", "tone": "a warm and thoughtful tone, like 'This evokes a sense of tradition.'",
"article_prompt": ( "article_prompt": (
"Youre {description}. Summarize this article in {tone}. " "You're {description}. Summarize this article in {tone}. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. "
"Add a thoughtful observation and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." "Add a thoughtful observation and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary."
@@ -190,7 +249,7 @@ PERSONA_CONFIGS = {
"description": "a vibrant storyteller rooted in African-American culinary heritage", "description": "a vibrant storyteller rooted in African-American culinary heritage",
"tone": "a heartfelt and authentic tone, like 'This captures the essence of heritage.'", "tone": "a heartfelt and authentic tone, like 'This captures the essence of heritage.'",
"article_prompt": ( "article_prompt": (
"Youre {description}. Summarize this article in {tone}. " "You're {description}. Summarize this article in {tone}. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. "
"Add a heritage twist and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." "Add a heritage twist and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary."
@@ -207,7 +266,7 @@ PERSONA_CONFIGS = {
"description": "an adventurous explorer of global street food", "description": "an adventurous explorer of global street food",
"tone": "a bold and adventurous tone, like 'This takes you on a global journey.'", "tone": "a bold and adventurous tone, like 'This takes you on a global journey.'",
"article_prompt": ( "article_prompt": (
"Youre {description}. Summarize this article in {tone}. " "You're {description}. Summarize this article in {tone}. "
"Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. "
"Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. "
"Drop a street-level insight and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." "Drop a street-level insight and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary."
@@ -223,25 +282,30 @@ PERSONA_CONFIGS = {
} }
# File paths # File paths
POSTED_RSS_TITLES_FILE = '/home/shane/foodie_automator/posted_rss_titles.json' BASE_DIR = Path("/home/shane/foodie_automator")
POSTED_GOOGLE_TITLES_FILE = '/home/shane/foodie_automator/posted_google_titles.json' FILE_PATHS = {
POSTED_REDDIT_TITLES_FILE = '/home/shane/foodie_automator/posted_reddit_titles.json' "posted_rss_titles": BASE_DIR / "posted_rss_titles.json",
USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json' "posted_google_titles": BASE_DIR / "posted_google_titles.json",
AUTHOR_BACKGROUNDS_FILE = '/home/shane/foodie_automator/author_backgrounds.json' "posted_reddit_titles": BASE_DIR / "posted_reddit_titles.json",
X_POST_COUNTS_FILE = '/home/shane/foodie_automator/x_post_counts.json' "used_images": BASE_DIR / "used_images.json",
RECENT_POSTS_FILE = '/home/shane/foodie_automator/recent_posts.json' "author_backgrounds": BASE_DIR / "author_backgrounds.json",
"x_post_counts": BASE_DIR / "x_post_counts.json",
"recent_posts": BASE_DIR / "recent_posts.json"
}
# Expiration periods
EXPIRATION_DAYS = 3 EXPIRATION_DAYS = 3
IMAGE_EXPIRATION_DAYS = 7 IMAGE_EXPIRATION_DAYS = 7
RSS_FEEDS = [ # RSS feed configurations
RSS_FEEDS: List[str] = [
"https://www.eater.com/rss/full.xml", "https://www.eater.com/rss/full.xml",
"https://www.nrn.com/rss.xml", "https://www.nrn.com/rss.xml",
"https://rss.nytimes.com/services/xml/rss/nyt/DiningandWine.xml", "https://rss.nytimes.com/services/xml/rss/nyt/DiningandWine.xml",
"https://www.theguardian.com/food/rss" "https://www.theguardian.com/food/rss"
] ]
RSS_FEED_NAMES = { RSS_FEED_NAMES: Dict[str, tuple[str, str]] = {
"https://www.eater.com/rss/full.xml": ("Eater", "https://www.eater.com/"), "https://www.eater.com/rss/full.xml": ("Eater", "https://www.eater.com/"),
"https://www.nrn.com/rss.xml": ("Nation's Restaurant News", "https://www.nrn.com/"), "https://www.nrn.com/rss.xml": ("Nation's Restaurant News", "https://www.nrn.com/"),
"https://rss.nytimes.com/services/xml/rss/nyt/DiningandWine.xml": ("The New York Times", "https://www.nytimes.com/section/food"), "https://rss.nytimes.com/services/xml/rss/nyt/DiningandWine.xml": ("The New York Times", "https://www.nytimes.com/section/food"),
@@ -276,12 +340,33 @@ FAST_FOOD_KEYWORDS = [
SUMMARY_MODEL = "gpt-4o" # or "gpt-4.1-mini" for testing SUMMARY_MODEL = "gpt-4o" # or "gpt-4.1-mini" for testing
LIGHT_TASK_MODEL = "gpt-4o-mini" LIGHT_TASK_MODEL = "gpt-4o-mini"
def get_clean_source_name(source_name): def get_clean_source_name(source_name: str) -> str:
""" """Clean and standardize source names."""
Retrieve a clean source name from RSS_FEED_NAMES if source_name matches a feed URL, try:
otherwise return the original source_name as a fallback. # Remove common prefixes and suffixes
""" clean_name = source_name.strip()
for feed_url, (clean_name, _) in RSS_FEED_NAMES.items(): clean_name = clean_name.replace("The ", "").replace("the ", "")
if feed_url == source_name: clean_name = clean_name.replace("Food", "").replace("food", "")
return clean_name clean_name = clean_name.replace("Dining", "").replace("dining", "")
return source_name clean_name = clean_name.replace("Restaurant", "").replace("restaurant", "")
# Remove any remaining whitespace
clean_name = " ".join(clean_name.split())
return clean_name if clean_name else source_name
except Exception as e:
logger.error(f"Error cleaning source name '{source_name}': {e}")
return source_name
# Validate configurations on import
validate_api_keys()
# Ensure all file paths exist
for path in FILE_PATHS.values():
path.parent.mkdir(parents=True, exist_ok=True)
if not path.exists():
path.touch()
logger.info(f"Created missing file: {path}")
# Log successful configuration
logger.info("Configuration loaded successfully")
+606 -714
View File
File diff suppressed because it is too large Load Diff