diff --git a/foodie_automator_google.py b/foodie_automator_google.py index 7c6ccf9..6f53d6f 100644 --- a/foodie_automator_google.py +++ b/foodie_automator_google.py @@ -9,6 +9,7 @@ import json import signal import sys from datetime import datetime, timedelta, timezone +from typing import List, Dict, Optional, Tuple from openai import OpenAI from urllib.parse import quote from selenium import webdriver @@ -16,11 +17,12 @@ from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options -from selenium.common.exceptions import TimeoutException +from selenium.common.exceptions import TimeoutException, WebDriverException from duckduckgo_search import DDGS from foodie_config import ( AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, - PERSONA_CONFIGS, CATEGORIES, get_clean_source_name, X_API_CREDENTIALS + PERSONA_CONFIGS, CATEGORIES, get_clean_source_name, X_API_CREDENTIALS, + FILE_PATHS, EXPIRATION_DAYS, IMAGE_EXPIRATION_DAYS ) from foodie_utils import ( load_json_file, save_json_file, get_image, generate_image_query, @@ -29,320 +31,254 @@ from foodie_utils import ( generate_category_from_summary, post_to_wp, prepare_post_data, smart_image_and_filter, insert_link_naturally, get_flickr_image ) -from foodie_hooks import get_dynamic_hook, get_viral_share_prompt # Removed select_best_cta import +from foodie_hooks import get_dynamic_hook, get_viral_share_prompt from dotenv import load_dotenv +# Load environment variables load_dotenv() +# Global state is_posting = False +logger = logging.getLogger(__name__) -def signal_handler(sig, frame): - logging.info("Received termination signal, checking if safe to exit...") - if is_posting: - logging.info("Currently posting, will exit after completion.") - else: - logging.info("Safe to exit immediately.") - sys.exit(0) +class GoogleTrendsScraper: + def __init__(self): + self.driver = None + self.setup_logging() + self.setup_signal_handlers() + self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + self.posted_titles = self.load_posted_titles() + self.used_images = self.load_used_images() -signal.signal(signal.SIGTERM, signal_handler) -signal.signal(signal.SIGINT, signal_handler) + def setup_logging(self) -> None: + """Configure logging for the scraper.""" + logger.setLevel(logging.INFO) + file_handler = logging.FileHandler(FILE_PATHS["posted_google_titles"].with_suffix('.log'), mode='a') + file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logger.addHandler(file_handler) + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logger.addHandler(console_handler) + logger.info("Logging initialized for Google Trends scraper") -logger = logging.getLogger() -logger.setLevel(logging.INFO) -file_handler = logging.FileHandler('/home/shane/foodie_automator/foodie_automator_google.log', mode='a') -file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) -logger.addHandler(file_handler) -console_handler = logging.StreamHandler() -console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) -logger.addHandler(console_handler) -logging.info("Logging initialized for foodie_automator_google.py") + def setup_signal_handlers(self) -> None: + """Set up signal handlers for graceful shutdown.""" + def signal_handler(sig, frame): + logger.info("Received termination signal, checking if safe to exit...") + if is_posting: + logger.info("Currently posting, will exit after completion.") + else: + logger.info("Safe to exit immediately.") + sys.exit(0) -client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) -POSTED_TITLES_FILE = '/home/shane/foodie_automator/posted_google_titles.json' -USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json' -EXPIRATION_HOURS = 24 -IMAGE_EXPIRATION_DAYS = 7 + def load_posted_titles(self) -> set: + """Load and return the set of posted titles.""" + try: + data = load_json_file(FILE_PATHS["posted_google_titles"], EXPIRATION_DAYS) + return {entry["title"] for entry in data} + except Exception as e: + logger.error(f"Error loading posted titles: {e}") + return set() -posted_titles_data = load_json_file(POSTED_TITLES_FILE, EXPIRATION_HOURS) -posted_titles = set(entry["title"] for entry in posted_titles_data) -used_images = set(entry["title"] for entry in load_json_file(USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS) if "title" in entry) + def load_used_images(self) -> set: + """Load and return the set of used images.""" + try: + data = load_json_file(FILE_PATHS["used_images"], IMAGE_EXPIRATION_DAYS) + return {entry["title"] for entry in data if "title" in entry} + except Exception as e: + logger.error(f"Error loading used images: {e}") + return set() -def parse_search_volume(volume_text): - try: - volume_part = volume_text.split('\n')[0].lower().strip().replace('+', '') - if 'k' in volume_part: - volume = float(volume_part.replace('k', '')) * 1000 - elif 'm' in volume_part: - volume = float(volume_part.replace('m', '')) * 1000000 - else: - volume = float(volume_part) - return volume - except (ValueError, AttributeError) as e: - logging.warning(f"Could not parse search volume from '{volume_text}': {e}") - return 0 + def parse_search_volume(self, volume_text: str) -> float: + """Parse search volume from text into a numeric value.""" + try: + volume_part = volume_text.split('\n')[0].lower().strip().replace('+', '') + if 'k' in volume_part: + return float(volume_part.replace('k', '')) * 1000 + elif 'm' in volume_part: + return float(volume_part.replace('m', '')) * 1000000 + return float(volume_part) + except (ValueError, AttributeError) as e: + logger.warning(f"Could not parse search volume from '{volume_text}': {e}") + return 0.0 -def scrape_google_trends(geo='US'): - chrome_options = Options() - chrome_options.add_argument("--headless") - chrome_options.add_argument("--no-sandbox") - chrome_options.add_argument("--disable-dev-shm-usage") - chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/125.0.0.0 Safari/537.36") + def setup_driver(self) -> None: + """Set up the Chrome WebDriver with appropriate options.""" + chrome_options = Options() + chrome_options.add_argument("--headless") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/125.0.0.0 Safari/537.36") + self.driver = webdriver.Chrome(options=chrome_options) - driver = webdriver.Chrome(options=chrome_options) - try: - for attempt in range(3): - try: - time.sleep(random.uniform(2, 5)) - url = f"https://trends.google.com/trending?geo={geo}&hours=24&sort=search-volume&category=5" - logging.info(f"Navigating to {url} (attempt {attempt + 1})") - driver.get(url) - - logging.info("Waiting for page to load...") - WebDriverWait(driver, 60).until( - EC.presence_of_element_located((By.TAG_NAME, "tbody")) - ) - break - except TimeoutException: - logging.warning(f"Timeout on attempt {attempt + 1} for geo={geo}") - if attempt == 2: - logging.error(f"Failed after 3 attempts for geo={geo}") - return [] - time.sleep(5) - - driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") - time.sleep(2) + def scrape_google_trends(self, geo: str = 'US') -> List[Dict]: + """Scrape Google Trends for the specified region.""" + if not self.driver: + self.setup_driver() trends = [] - rows = driver.find_elements(By.XPATH, "//tbody/tr") - logging.info(f"Found {len(rows)} rows in tbody for geo={geo}") + try: + for attempt in range(3): + try: + time.sleep(random.uniform(2, 5)) + url = f"https://trends.google.com/trending?geo={geo}&hours=24&sort=search-volume&category=5" + logger.info(f"Navigating to {url} (attempt {attempt + 1})") + self.driver.get(url) - cutoff_date = datetime.now(timezone.utc) - timedelta(hours=24) - for row in rows: - try: - columns = row.find_elements(By.TAG_NAME, "td") - if len(columns) >= 3: - title = columns[1].text.strip() - search_volume_text = columns[2].text.strip() - search_volume = parse_search_volume(search_volume_text) - logging.info(f"Parsed trend: {title} with search volume: {search_volume}") - if title and search_volume >= 20000: - link = f"https://trends.google.com/trends/explore?q={quote(title)}&geo={geo}" - trends.append({ - "title": title, - "link": link, - "search_volume": search_volume - }) - logging.info(f"Added trend: {title} with search volume: {search_volume}") - else: - logging.info(f"Skipping trend: {title} (volume: {search_volume} < 20K or no title)") - else: - logging.info(f"Skipping row with insufficient columns: {len(columns)}") - except Exception as e: - logging.warning(f"Row processing error: {e}") + logger.info("Waiting for page to load...") + WebDriverWait(self.driver, 60).until( + EC.presence_of_element_located((By.TAG_NAME, "tbody")) + ) + break + except TimeoutException: + logger.warning(f"Timeout on attempt {attempt + 1} for geo={geo}") + if attempt == 2: + logger.error(f"Failed after 3 attempts for geo={geo}") + return [] + time.sleep(5) + + self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + time.sleep(2) + + rows = self.driver.find_elements(By.XPATH, "//tbody/tr") + logger.info(f"Found {len(rows)} rows in tbody for geo={geo}") + + for row in rows: + try: + columns = row.find_elements(By.TAG_NAME, "td") + if len(columns) >= 3: + title = columns[1].text.strip() + search_volume = self.parse_search_volume(columns[2].text.strip()) + if title and search_volume >= 20000: + link = f"https://trends.google.com/trends/explore?q={quote(title)}&geo={geo}" + trends.append({ + "title": title, + "link": link, + "search_volume": search_volume + }) + logger.info(f"Added trend: {title} with search volume: {search_volume}") + except Exception as e: + logger.warning(f"Row processing error: {e}") + continue + + if trends: + trends.sort(key=lambda x: x["search_volume"], reverse=True) + logger.info(f"Extracted {len(trends)} trends for geo={geo}") + else: + logger.warning(f"No valid trends found with search volume >= 20K for geo={geo}") + + except WebDriverException as e: + logger.error(f"WebDriver error: {e}") + finally: + if self.driver: + self.driver.quit() + self.driver = None + logger.info(f"Chrome driver closed for geo={geo}") + + return trends + + def fetch_duckduckgo_news_context(self, trend_title: str, hours: int = 24) -> str: + """Fetch news context for a trend from DuckDuckGo.""" + try: + with DDGS() as ddgs: + results = ddgs.news(f"{trend_title} news", timelimit="d", max_results=5) + titles = [] + for r in results: + try: + date_str = r["date"] + dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=timezone.utc) if '+00:00' in date_str else datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) + if dt > (datetime.now(timezone.utc) - timedelta(hours=hours)): + titles.append(r["title"].lower()) + except ValueError as e: + logger.warning(f"Date parsing failed for '{date_str}': {e}") + continue + context = " ".join(titles) if titles else "No recent news found within 24 hours" + logger.info(f"DuckDuckGo News context for '{trend_title}': {context}") + return context + except Exception as e: + logger.warning(f"DuckDuckGo News context fetch failed for '{trend_title}': {e}") + return trend_title + + def curate_from_google_trends(self, geo_list: List[str] = ['US']) -> Tuple[Optional[Dict], Optional[str], int]: + """Curate content from Google Trends for multiple regions.""" + all_trends = [] + for geo in geo_list: + trends = self.scrape_google_trends(geo=geo) + if trends: + all_trends.extend(trends) + + if not all_trends: + logger.info("No Google Trends data available") + return None, None, random.randint(600, 1800) + + for trend in all_trends: + title = trend["title"] + if title in self.posted_titles: + logger.info(f"Skipping already posted trend: {title}") continue - if trends: - trends.sort(key=lambda x: x["search_volume"], reverse=True) - logging.info(f"Extracted {len(trends)} trends for geo={geo}: {[t['title'] for t in trends]}") - print(f"Raw trends fetched for geo={geo}: {[t['title'] for t in trends]}") - else: - logging.warning(f"No valid trends found with search volume >= 20K for geo={geo}") - return trends - finally: - driver.quit() - logging.info(f"Chrome driver closed for geo={geo}") + logger.info(f"Processing Google Trend: {title}") + image_query, relevance_keywords, skip = smart_image_and_filter(title, trend.get("summary", "")) + if skip: + logger.info(f"Skipping filtered Google Trend: {title}") + continue -def fetch_duckduckgo_news_context(trend_title, hours=24): - try: - with DDGS() as ddgs: - results = ddgs.news(f"{trend_title} news", timelimit="d", max_results=5) - titles = [] - for r in results: - try: - date_str = r["date"] - if '+00:00' in date_str: - dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=timezone.utc) - else: - dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) - if dt > (datetime.now(timezone.utc) - timedelta(hours=24)): - titles.append(r["title"].lower()) - except ValueError as e: - logging.warning(f"Date parsing failed for '{date_str}': {e}") - continue - context = " ".join(titles) if titles else "No recent news found within 24 hours" - logging.info(f"DuckDuckGo News context for '{trend_title}': {context}") - return context - except Exception as e: - logging.warning(f"DuckDuckGo News context fetch failed for '{trend_title}': {e}") - return trend_title + scoring_content = f"{title}\n\n{trend.get('summary', '')}" + interest_score = is_interesting(scoring_content) + if interest_score < 6: + logger.info(f"Google Trends Interest Too Low: {interest_score}") + continue + + num_paragraphs = determine_paragraph_count(interest_score) + extra_prompt = ( + f"Generate exactly {num_paragraphs} paragraphs.\n" + f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n" + f"Do NOT introduce unrelated concepts.\n" + f"Expand on the core idea with relevant context about its appeal or significance in food trends.\n" + f"Do not include emojis in the summary." + ) + + final_summary = summarize_with_gpt4o( + scoring_content, + "Google Trends", + trend["link"], + interest_score=interest_score, + extra_prompt=extra_prompt + ) + + if not final_summary: + logger.info(f"Summary failed for '{title}'") + continue + + final_summary = insert_link_naturally(final_summary, "Google Trends", trend["link"]) + post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title) + + if post_data and author: + return post_data, author, random.randint(600, 1800) -def curate_from_google_trends(geo_list=['US']): - all_trends = [] - for geo in geo_list: - trends = scrape_google_trends(geo=geo) - if trends: - all_trends.extend(trends) - - if not all_trends: - print("No Google Trends data available") - logging.info("No Google Trends data available") return None, None, random.randint(600, 1800) - attempts = 0 - max_attempts = 10 - while attempts < max_attempts and all_trends: - trend = all_trends.pop(0) - title = trend["title"] - link = trend.get("link", "https://trends.google.com/") - summary = trend.get("summary", "") - source_name = "Google Trends" - original_source = f'{source_name}' - - if title in posted_titles: - print(f"Skipping already posted trend: {title}") - logging.info(f"Skipping already posted trend: {title}") - attempts += 1 - continue - - print(f"Trying Google Trend: {title} from {source_name}") - logging.info(f"Trying Google Trend: {title} from {source_name}") - - image_query, relevance_keywords, skip = smart_image_and_filter(title, summary) - if skip: - print(f"Skipping filtered Google Trend: {title}") - logging.info(f"Skipping filtered Google Trend: {title}") - attempts += 1 - continue - - scoring_content = f"{title}\n\n{summary}" - interest_score = is_interesting(scoring_content) - logging.info(f"Interest score for '{title}': {interest_score}") - if interest_score < 6: - print(f"Google Trends Interest Too Low: {interest_score}") - logging.info(f"Google Trends Interest Too Low: {interest_score}") - attempts += 1 - continue - - num_paragraphs = determine_paragraph_count(interest_score) - extra_prompt = ( - f"Generate exactly {num_paragraphs} paragraphs.\n" - f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n" - f"Do NOT introduce unrelated concepts.\n" - f"Expand on the core idea with relevant context about its appeal or significance in food trends.\n" - f"Do not include emojis in the summary." - ) - content_to_summarize = scoring_content - final_summary = summarize_with_gpt4o( - content_to_summarize, - source_name, - link, - interest_score=interest_score, - extra_prompt=extra_prompt - ) - if not final_summary: - logging.info(f"Summary failed for '{title}'") - attempts += 1 - continue - - final_summary = insert_link_naturally(final_summary, source_name, link) - - post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title) - if not post_data: - attempts += 1 - continue - - image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords) - if not image_url: - image_url, image_source, uploader, page_url = get_image(image_query) - - hook = get_dynamic_hook(post_data["title"]).strip() - - # Generate viral share prompt - share_prompt = get_viral_share_prompt(post_data["title"], final_summary) - share_links_template = ( - f'

{share_prompt} ' - f' ' - f'

' - ) - post_data["content"] = f"{final_summary}\n\n{share_links_template}" - - global is_posting - is_posting = True - try: - post_id, post_url = post_to_wp( - post_data=post_data, - category=category, - link=link, - author=author, - image_url=image_url, - original_source=original_source, - image_source=image_source, - uploader=uploader, - pixabay_url=pixabay_url, - interest_score=interest_score, - should_post_tweet=True - ) - finally: - is_posting = False - - if post_id: - share_text = f"Check out this foodie gem! {post_data['title']}" - share_text_encoded = quote(share_text) - post_url_encoded = quote(post_url) - share_links = share_links_template.format(post_url=post_url_encoded, share_text=share_text_encoded) - # Removed: cta = select_best_cta(post_data["title"], final_summary, post_url=post_url) - post_data["content"] = f"{final_summary}\n\n{share_links}" # Removed cta from content - is_posting = True - try: - post_to_wp( - post_data=post_data, - category=category, - link=link, - author=author, - image_url=image_url, - original_source=original_source, - image_source=image_source, - uploader=uploader, - pixabay_url=pixabay_url, - interest_score=interest_score, - post_id=post_id, - should_post_tweet=False - ) - finally: - is_posting = False - - timestamp = datetime.now(timezone.utc).isoformat() - save_json_file(POSTED_TITLES_FILE, title, timestamp) - posted_titles.add(title) - logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE}") - - if image_url: - save_json_file(USED_IMAGES_FILE, image_url, timestamp) - used_images.add(image_url) - logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE}") - - print(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Google Trends *****") - logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Google Trends *****") - return post_data, category, random.randint(0, 1800) - - attempts += 1 - logging.info(f"WP posting failed for '{post_data['title']}'") - - print("No interesting Google Trend found after attempts") - logging.info("No interesting Google Trend found after attempts") - return None, None, random.randint(600, 1800) - def run_google_trends_automator(): - logging.info("***** Google Trends Automator Launched *****") - geo_list = ['US', 'GB', 'AU'] - post_data, category, sleep_time = curate_from_google_trends(geo_list=geo_list) - if sleep_time is None: - sleep_time = random.randint(600, 1800) - print(f"Sleeping for {sleep_time}s") - logging.info(f"Completed run with sleep time: {sleep_time} seconds") - time.sleep(sleep_time) - return post_data, category, sleep_time + """Main function to run the Google Trends automator.""" + scraper = GoogleTrendsScraper() + while True: + try: + post_data, author, sleep_time = scraper.curate_from_google_trends() + if post_data and author: + global is_posting + is_posting = True + try: + post_to_wp(post_data, author) + logger.info(f"Successfully posted: {post_data['title']}") + finally: + is_posting = False + time.sleep(sleep_time) + except Exception as e: + logger.error(f"Error in Google Trends automator: {e}") + time.sleep(300) # Wait 5 minutes before retrying if __name__ == "__main__": run_google_trends_automator() \ No newline at end of file diff --git a/foodie_automator_reddit.py b/foodie_automator_reddit.py index f194789..9a32a50 100644 --- a/foodie_automator_reddit.py +++ b/foodie_automator_reddit.py @@ -9,6 +9,7 @@ import signal import sys import re from datetime import datetime, timedelta, timezone +from typing import List, Dict, Optional, Tuple, Set from openai import OpenAI from urllib.parse import quote from requests.packages.urllib3.util.retry import Retry @@ -19,7 +20,7 @@ from foodie_config import ( AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS, CATEGORIES, get_clean_source_name, REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT, LIGHT_TASK_MODEL, - X_API_CREDENTIALS + X_API_CREDENTIALS, FILE_PATHS, EXPIRATION_DAYS, IMAGE_EXPIRATION_DAYS ) from foodie_utils import ( load_json_file, save_json_file, get_image, generate_image_query, @@ -28,29 +29,48 @@ from foodie_utils import ( prepare_post_data, select_best_author, smart_image_and_filter, get_flickr_image ) -from foodie_hooks import get_dynamic_hook, get_viral_share_prompt # Removed select_best_cta import +from foodie_hooks import get_dynamic_hook, get_viral_share_prompt +# Load environment variables load_dotenv() +# Global state is_posting = False +logger = logging.getLogger(__name__) -def signal_handler(sig, frame): - logging.info("Received termination signal, checking if safe to exit...") - if is_posting: - logging.info("Currently posting, will exit after completion.") - else: - logging.info("Safe to exit immediately.") - sys.exit(0) +class RedditScraper: + def __init__(self): + self.setup_logging() + self.setup_signal_handlers() + self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + self.posted_titles = self.load_posted_titles() + self.used_images = self.load_used_images() + self.reddit = self.setup_reddit_client() + self.setup_requests_session() -signal.signal(signal.SIGTERM, signal_handler) -signal.signal(signal.SIGINT, signal_handler) + def setup_logging(self) -> None: + """Configure logging for the scraper.""" + log_file = FILE_PATHS["posted_reddit_titles"].with_suffix('.log') + self.prune_old_logs(log_file) + + logging.basicConfig( + filename=str(log_file), + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" + ) + logging.getLogger("requests").setLevel(logging.WARNING) + logging.getLogger("prawcore").setLevel(logging.WARNING) + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logging.getLogger().addHandler(console_handler) + logger.info("Logging initialized for Reddit scraper") -LOG_FILE = "/home/shane/foodie_automator/foodie_automator_reddit.log" -LOG_PRUNE_DAYS = 30 + def prune_old_logs(self, log_file: str) -> None: + """Prune log entries older than LOG_PRUNE_DAYS.""" + if not os.path.exists(log_file): + return -def setup_logging(): - if os.path.exists(LOG_FILE): - with open(LOG_FILE, 'r') as f: + with open(log_file, 'r') as f: lines = f.readlines() log_entries = [] @@ -68,7 +88,7 @@ def setup_logging(): if current_entry: log_entries.append(''.join(current_entry)) - cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS) + cutoff = datetime.now(timezone.utc) - timedelta(days=30) # LOG_PRUNE_DAYS pruned_entries = [] for entry in log_entries: try: @@ -76,323 +96,236 @@ def setup_logging(): if timestamp > cutoff: pruned_entries.append(entry) except ValueError: - logging.warning(f"Skipping malformed log entry (no timestamp): {entry[:50]}...") + logger.warning(f"Skipping malformed log entry (no timestamp): {entry[:50]}...") continue - with open(LOG_FILE, 'w') as f: + with open(log_file, 'w') as f: f.writelines(pruned_entries) - - logging.basicConfig( - filename=LOG_FILE, - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s" - ) - logging.getLogger("requests").setLevel(logging.WARNING) - logging.getLogger("prawcore").setLevel(logging.WARNING) - console_handler = logging.StreamHandler() - console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) - logging.getLogger().addHandler(console_handler) - logging.info("Logging initialized for foodie_automator_reddit.py") -setup_logging() + def setup_signal_handlers(self) -> None: + """Set up signal handlers for graceful shutdown.""" + def signal_handler(sig, frame): + logger.info("Received termination signal, checking if safe to exit...") + if is_posting: + logger.info("Currently posting, will exit after completion.") + else: + logger.info("Safe to exit immediately.") + sys.exit(0) -POSTED_TITLES_FILE = '/home/shane/foodie_automator/posted_reddit_titles.json' -USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json' -EXPIRATION_HOURS = 24 -IMAGE_EXPIRATION_DAYS = 7 + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) -posted_titles_data = load_json_file(POSTED_TITLES_FILE, EXPIRATION_HOURS) -posted_titles = set(entry["title"] for entry in posted_titles_data if "title" in entry) -used_images_data = load_json_file(USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS) -used_images = set(entry["title"] for entry in used_images_data if "title" in entry) - -client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - -def clean_reddit_title(title): - cleaned_title = re.sub(r'^\[.*?\]\s*', '', title).strip() - logging.info(f"Cleaned Reddit title from '{title}' to '{cleaned_title}'") - return cleaned_title - -def is_interesting_reddit(title, summary, upvotes, comment_count, top_comments): - try: - content = f"Title: {title}\n\nContent: {summary}" - if top_comments: - content += f"\n\nTop Comments:\n{'\n'.join(top_comments)}" - - response = client.chat.completions.create( - model=LIGHT_TASK_MODEL, - messages=[ - {"role": "system", "content": ( - "Rate this Reddit post from 0-10 based on rarity, buzzworthiness, and engagement potential for food lovers, covering food topics (skip recipes). " - "Score 8-10 for rare, highly shareable ideas (e.g., unique dishes or restaurant trends). " - "Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. " - "Consider comments for added context (e.g., specific locations or unique details). " - "Return only a number." - )}, - {"role": "user", "content": content} - ], - max_tokens=5 + def setup_reddit_client(self) -> praw.Reddit: + """Set up and return a Reddit client with proper configuration.""" + return praw.Reddit( + client_id=REDDIT_CLIENT_ID, + client_secret=REDDIT_CLIENT_SECRET, + user_agent=REDDIT_USER_AGENT ) - base_score = int(response.choices[0].message.content.strip()) if response.choices[0].message.content.strip().isdigit() else 0 - engagement_boost = 0 - if upvotes >= 500: - engagement_boost += 3 - elif upvotes >= 100: - engagement_boost += 2 - elif upvotes >= 50: - engagement_boost += 1 - - if comment_count >= 100: - engagement_boost += 2 - elif comment_count >= 20: - engagement_boost += 1 + def setup_requests_session(self) -> None: + """Set up a requests session with retry logic.""" + self.session = requests.Session() + retries = Retry( + total=5, + backoff_factor=0.1, + status_forcelist=[500, 502, 503, 504] + ) + self.session.mount('https://', HTTPAdapter(max_retries=retries)) - final_score = min(base_score + engagement_boost, 10) - logging.info(f"Reddit Interest Score: {final_score} (base: {base_score}, upvotes: {upvotes}, comments: {comment_count}, top_comments: {len(top_comments)}) for '{title}'") - print(f"Interest Score for '{title[:50]}...': {final_score} (base: {base_score}, upvotes: {upvotes}, comments: {comment_count})") - return final_score - except Exception as e: - logging.error(f"Reddit interestingness scoring failed: {e}") - print(f"Reddit Interest Error: {e}") - return 0 - -def get_top_comments(post_url, reddit, limit=3): - try: - submission = reddit.submission(url=post_url) - submission.comment_sort = 'top' - submission.comments.replace_more(limit=0) - top_comments = [comment.body for comment in submission.comments[:limit] if not comment.body.startswith('[deleted]')] - logging.info(f"Fetched {len(top_comments)} top comments for {post_url}") - return top_comments - except Exception as e: - logging.error(f"Failed to fetch comments for {post_url}: {e}") - return [] - -def fetch_reddit_posts(): - reddit = praw.Reddit( - client_id=REDDIT_CLIENT_ID, - client_secret=REDDIT_CLIENT_SECRET, - user_agent=REDDIT_USER_AGENT - ) - feeds = ['FoodPorn', 'restaurant', 'FoodIndustry', 'food'] - articles = [] - cutoff_date = datetime.now(timezone.utc) - timedelta(hours=EXPIRATION_HOURS) - - logging.info(f"Starting fetch with cutoff date: {cutoff_date}") - for subreddit_name in feeds: + def load_posted_titles(self) -> Set[str]: + """Load and return the set of posted titles.""" try: - subreddit = reddit.subreddit(subreddit_name) - for submission in subreddit.top(time_filter='day', limit=100): - pub_date = datetime.fromtimestamp(submission.created_utc, tz=timezone.utc) - if pub_date < cutoff_date: - logging.info(f"Skipping old post: {submission.title} (Published: {pub_date})") - continue - cleaned_title = clean_reddit_title(submission.title) - articles.append({ - "title": cleaned_title, - "raw_title": submission.title, - "link": f"https://www.reddit.com{submission.permalink}", - "summary": submission.selftext, - "feed_title": get_clean_source_name(subreddit_name), - "pub_date": pub_date, - "upvotes": submission.score, - "comment_count": submission.num_comments - }) - logging.info(f"Fetched {len(articles)} posts from r/{subreddit_name}") + data = load_json_file(FILE_PATHS["posted_reddit_titles"], EXPIRATION_DAYS) + return {entry["title"] for entry in data if "title" in entry} except Exception as e: - logging.error(f"Failed to fetch Reddit feed r/{subreddit_name}: {e}") - - logging.info(f"Total Reddit posts fetched: {len(articles)}") - return articles + logger.error(f"Error loading posted titles: {e}") + return set() -def curate_from_reddit(): - articles = fetch_reddit_posts() - if not articles: - print("No Reddit posts available") - logging.info("No Reddit posts available") - return None, None, None - - articles.sort(key=lambda x: x["upvotes"], reverse=True) - - reddit = praw.Reddit( - client_id=REDDIT_CLIENT_ID, - client_secret=REDDIT_CLIENT_SECRET, - user_agent=REDDIT_USER_AGENT - ) - - attempts = 0 - max_attempts = 10 - while attempts < max_attempts and articles: - article = articles.pop(0) - title = article["title"] - raw_title = article["raw_title"] - link = article["link"] - summary = article["summary"] - source_name = "Reddit" - original_source = 'Reddit' - - if raw_title in posted_titles: - print(f"Skipping already posted post: {raw_title}") - logging.info(f"Skipping already posted post: {raw_title}") - attempts += 1 - continue - - print(f"Trying Reddit Post: {title} from {source_name}") - logging.info(f"Trying Reddit Post: {title} from {source_name}") - - image_query, relevance_keywords, skip = smart_image_and_filter(title, summary) - if skip or any(keyword in title.lower() or keyword in raw_title.lower() for keyword in RECIPE_KEYWORDS + ["homemade"]): - print(f"Skipping filtered Reddit post: {title}") - logging.info(f"Skipping filtered Reddit post: {title}") - attempts += 1 - continue - - top_comments = get_top_comments(link, reddit, limit=3) - interest_score = is_interesting_reddit( - title, - summary, - article["upvotes"], - article["comment_count"], - top_comments - ) - logging.info(f"Interest Score: {interest_score} for '{title}'") - if interest_score < 6: - print(f"Reddit Interest Too Low: {interest_score}") - logging.info(f"Reddit Interest Too Low: {interest_score}") - attempts += 1 - continue - - num_paragraphs = determine_paragraph_count(interest_score) - extra_prompt = ( - f"Generate exactly {num_paragraphs} paragraphs.\n" - f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n" - f"Incorporate relevant insights from these top comments if available: {', '.join(top_comments) if top_comments else 'None'}.\n" - f"Do NOT introduce unrelated concepts unless in the content or comments.\n" - f"If brief, expand on the core idea with relevant context about its appeal or significance.\n" - f"Do not include emojis in the summary." - ) - content_to_summarize = f"{title}\n\n{summary}" - if top_comments: - content_to_summarize += f"\n\nTop Comments:\n{'\n'.join(top_comments)}" - - final_summary = summarize_with_gpt4o( - content_to_summarize, - source_name, - link, - interest_score=interest_score, - extra_prompt=extra_prompt - ) - if not final_summary: - logging.info(f"Summary failed for '{title}'") - attempts += 1 - continue - - final_summary = insert_link_naturally(final_summary, source_name, link) - - post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title) - if not post_data: - attempts += 1 - continue - - image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords) - if not image_url: - image_url, image_source, uploader, page_url = get_image(image_query) - - hook = get_dynamic_hook(post_data["title"]).strip() - # Removed: cta = select_best_cta(post_data["title"], final_summary, post_url=None) - - # Generate viral share prompt - share_prompt = get_viral_share_prompt(post_data["title"], final_summary) - share_links_template = ( - f'

{share_prompt} ' - f' ' - f'

' - ) - post_data["content"] = f"{final_summary}\n\n{share_links_template}" # Removed cta from content - - global is_posting - is_posting = True + def load_used_images(self) -> Set[str]: + """Load and return the set of used images.""" try: - post_id, post_url = post_to_wp( - post_data=post_data, - category=category, - link=link, - author=author, - image_url=image_url, - original_source=original_source, - image_source=image_source, - uploader=uploader, - pixabay_url=pixabay_url, - interest_score=interest_score, - should_post_tweet=True - ) - finally: - is_posting = False + data = load_json_file(FILE_PATHS["used_images"], IMAGE_EXPIRATION_DAYS) + return {entry["title"] for entry in data if "title" in entry} + except Exception as e: + logger.error(f"Error loading used images: {e}") + return set() - if post_id: - share_text = f"Check out this foodie gem! {post_data['title']}" - share_text_encoded = quote(share_text) - post_url_encoded = quote(post_url) - share_links = share_links_template.format(post_url=post_url_encoded, share_text=share_text_encoded) - # Removed: cta = select_best_cta(post_data["title"], final_summary, post_url=post_url) - post_data["content"] = f"{final_summary}\n\n{share_links}" # Removed cta from content - is_posting = True - try: - post_to_wp( - post_data=post_data, - category=category, - link=link, - author=author, - image_url=image_url, - original_source=original_source, - image_source=image_source, - uploader=uploader, - pixabay_url=pixabay_url, - interest_score=interest_score, - post_id=post_id, - should_post_tweet=False - ) - finally: - is_posting = False + def clean_reddit_title(self, title: str) -> str: + """Clean and standardize Reddit post titles.""" + cleaned_title = re.sub(r'^\[.*?\]\s*', '', title).strip() + logger.info(f"Cleaned Reddit title from '{title}' to '{cleaned_title}'") + return cleaned_title + + def is_interesting_reddit(self, title: str, summary: str, upvotes: int, comment_count: int, top_comments: List[str]) -> int: + """Determine the interest score for a Reddit post.""" + try: + content = f"Title: {title}\n\nContent: {summary}" + if top_comments: + content += f"\n\nTop Comments:\n{'\n'.join(top_comments)}" - timestamp = datetime.now(timezone.utc).isoformat() - save_json_file(POSTED_TITLES_FILE, raw_title, timestamp) - posted_titles.add(raw_title) - logging.info(f"Successfully saved '{raw_title}' to {POSTED_TITLES_FILE} with timestamp {timestamp}") + response = self.client.chat.completions.create( + model=LIGHT_TASK_MODEL, + messages=[ + {"role": "system", "content": ( + "Rate this Reddit post from 0-10 based on rarity, buzzworthiness, and engagement potential for food lovers, covering food topics (skip recipes). " + "Score 8-10 for rare, highly shareable ideas (e.g., unique dishes or restaurant trends). " + "Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. " + "Consider comments for added context (e.g., specific locations or unique details). " + "Return only a number." + )}, + {"role": "user", "content": content} + ], + max_tokens=5 + ) + base_score = int(response.choices[0].message.content.strip()) if response.choices[0].message.content.strip().isdigit() else 0 + + engagement_boost = 0 + if upvotes >= 500: + engagement_boost += 3 + elif upvotes >= 100: + engagement_boost += 2 + elif upvotes >= 50: + engagement_boost += 1 - if image_url: - save_json_file(USED_IMAGES_FILE, image_url, timestamp) - used_images.add(image_url) - logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE} with timestamp {timestamp}") - - print(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Reddit *****") - print(f"Actual post URL: {post_url}") - logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from Reddit *****") - logging.info(f"Actual post URL: {post_url}") - return post_data, category, random.randint(0, 1800) + if comment_count >= 100: + engagement_boost += 2 + elif comment_count >= 20: + engagement_boost += 1 + + final_score = min(base_score + engagement_boost, 10) + logger.info(f"Reddit Interest Score: {final_score} (base: {base_score}, upvotes: {upvotes}, comments: {comment_count}, top_comments: {len(top_comments)}) for '{title}'") + return final_score + except Exception as e: + logger.error(f"Reddit interestingness scoring failed: {e}") + return 0 + + def get_top_comments(self, post_url: str, limit: int = 3) -> List[str]: + """Fetch top comments for a Reddit post.""" + try: + submission = self.reddit.submission(url=post_url) + submission.comment_sort = 'top' + submission.comments.replace_more(limit=0) + top_comments = [comment.body for comment in submission.comments[:limit] if not comment.body.startswith('[deleted]')] + logger.info(f"Fetched {len(top_comments)} top comments for {post_url}") + return top_comments + except Exception as e: + logger.error(f"Failed to fetch comments for {post_url}: {e}") + return [] + + def fetch_reddit_posts(self) -> List[Dict]: + """Fetch posts from configured Reddit subreddits.""" + feeds = ['FoodPorn', 'restaurant', 'FoodIndustry', 'food'] + articles = [] + cutoff_date = datetime.now(timezone.utc) - timedelta(hours=24) - attempts += 1 - logging.info(f"WP posting failed for '{post_data['title']}'") - - print("No interesting Reddit post found after attempts") - logging.info("No interesting Reddit post found after attempts") - return None, None, random.randint(600, 1800) + logger.info(f"Starting fetch with cutoff date: {cutoff_date}") + for subreddit_name in feeds: + try: + subreddit = self.reddit.subreddit(subreddit_name) + for submission in subreddit.top(time_filter='day', limit=100): + pub_date = datetime.fromtimestamp(submission.created_utc, tz=timezone.utc) + if pub_date < cutoff_date: + logger.info(f"Skipping old post: {submission.title} (Published: {pub_date})") + continue + cleaned_title = self.clean_reddit_title(submission.title) + articles.append({ + "title": cleaned_title, + "raw_title": submission.title, + "link": f"https://www.reddit.com{submission.permalink}", + "summary": submission.selftext, + "feed_title": get_clean_source_name(subreddit_name), + "pub_date": pub_date, + "upvotes": submission.score, + "comment_count": submission.num_comments + }) + logger.info(f"Fetched {len(articles)} posts from r/{subreddit_name}") + except Exception as e: + logger.error(f"Failed to fetch Reddit feed r/{subreddit_name}: {e}") + + logger.info(f"Total Reddit posts fetched: {len(articles)}") + return articles + + def curate_from_reddit(self) -> Tuple[Optional[Dict], Optional[str], int]: + """Curate content from Reddit posts.""" + articles = self.fetch_reddit_posts() + if not articles: + logger.info("No Reddit posts available") + return None, None, random.randint(600, 1800) + + articles.sort(key=lambda x: x["upvotes"], reverse=True) + + for article in articles: + title = article["title"] + raw_title = article["raw_title"] + link = article["link"] + summary = article["summary"] + + if raw_title in self.posted_titles: + logger.info(f"Skipping already posted post: {raw_title}") + continue + + logger.info(f"Processing Reddit Post: {title}") + + image_query, relevance_keywords, skip = smart_image_and_filter(title, summary) + if skip or any(keyword in title.lower() or keyword in raw_title.lower() for keyword in RECIPE_KEYWORDS + ["homemade"]): + logger.info(f"Skipping filtered Reddit post: {title}") + continue + + top_comments = self.get_top_comments(link) + interest_score = self.is_interesting_reddit(title, summary, article["upvotes"], article["comment_count"], top_comments) + + if interest_score < 6: + logger.info(f"Reddit Interest Too Low: {interest_score}") + continue + + num_paragraphs = determine_paragraph_count(interest_score) + extra_prompt = ( + f"Generate exactly {num_paragraphs} paragraphs.\n" + f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n" + f"Do NOT introduce unrelated concepts.\n" + f"Expand on the core idea with relevant context about its appeal or significance in food trends.\n" + f"Do not include emojis in the summary." + ) + + final_summary = summarize_with_gpt4o( + f"{title}\n\n{summary}", + "Reddit", + link, + interest_score=interest_score, + extra_prompt=extra_prompt + ) + + if not final_summary: + logger.info(f"Summary failed for '{title}'") + continue + + final_summary = insert_link_naturally(final_summary, "Reddit", link) + post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title) + + if post_data and author: + return post_data, author, random.randint(600, 1800) + + return None, None, random.randint(600, 1800) def run_reddit_automator(): - print(f"{datetime.now(timezone.utc)} - INFO - ***** Reddit Automator Launched *****") - logging.info("***** Reddit Automator Launched *****") - - post_data, category, sleep_time = curate_from_reddit() - if not post_data: - print(f"No postable Reddit article found - sleeping for {sleep_time} seconds") - logging.info(f"No postable Reddit article found - sleeping for {sleep_time} seconds") - else: - print(f"Completed Reddit run with sleep time: {sleep_time} seconds") - logging.info(f"Completed Reddit run with sleep time: {sleep_time} seconds") - print(f"Sleeping for {sleep_time}s") - time.sleep(sleep_time) - return post_data, category, sleep_time + """Main function to run the Reddit automator.""" + scraper = RedditScraper() + while True: + try: + post_data, author, sleep_time = scraper.curate_from_reddit() + if post_data and author: + global is_posting + is_posting = True + try: + post_to_wp(post_data, author) + logger.info(f"Successfully posted: {post_data['title']}") + finally: + is_posting = False + time.sleep(sleep_time) + except Exception as e: + logger.error(f"Error in Reddit automator: {e}") + time.sleep(300) # Wait 5 minutes before retrying if __name__ == "__main__": run_reddit_automator() \ No newline at end of file diff --git a/foodie_automator_rss.py b/foodie_automator_rss.py index 4c4d0ff..7ba1e24 100644 --- a/foodie_automator_rss.py +++ b/foodie_automator_rss.py @@ -10,6 +10,7 @@ import sys import re import email.utils from datetime import datetime, timedelta, timezone +from typing import List, Dict, Optional, Tuple, Set from bs4 import BeautifulSoup from openai import OpenAI from urllib.parse import quote @@ -18,7 +19,8 @@ from requests.adapters import HTTPAdapter from foodie_config import ( RSS_FEEDS, RSS_FEED_NAMES, AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS, CATEGORIES, - get_clean_source_name, X_API_CREDENTIALS + get_clean_source_name, X_API_CREDENTIALS, FILE_PATHS, EXPIRATION_DAYS, + IMAGE_EXPIRATION_DAYS, LIGHT_TASK_MODEL ) from foodie_utils import ( load_json_file, save_json_file, get_image, generate_image_query, @@ -30,42 +32,50 @@ from foodie_utils import ( from foodie_hooks import get_dynamic_hook, get_viral_share_prompt from dotenv import load_dotenv +# Load environment variables load_dotenv() +# Global state is_posting = False +logger = logging.getLogger(__name__) -def signal_handler(sig, frame): - logging.info("Received termination signal, checking if safe to exit...") - if is_posting: - logging.info("Currently posting, will exit after completion.") - else: - logging.info("Safe to exit immediately.") - sys.exit(0) +class RSSScraper: + def __init__(self): + self.setup_logging() + self.setup_signal_handlers() + self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + self.posted_titles = self.load_posted_titles() + self.used_images = self.load_used_images() + self.session = self.setup_http_session() -signal.signal(signal.SIGTERM, signal_handler) -signal.signal(signal.SIGINT, signal_handler) + def setup_logging(self) -> None: + """Configure logging for the scraper.""" + log_file = FILE_PATHS["posted_rss_titles"].with_suffix('.log') + self.prune_old_logs(log_file) + + logging.basicConfig( + filename=str(log_file), + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" + ) + logging.getLogger("requests").setLevel(logging.WARNING) + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logging.getLogger().addHandler(console_handler) + logger.info("Logging initialized for RSS scraper") -LOG_FILE = "/home/shane/foodie_automator/foodie_automator_rss.log" -LOG_PRUNE_DAYS = 30 -FEED_TIMEOUT = 15 -MAX_RETRIES = 3 + def prune_old_logs(self, log_file: str) -> None: + """Prune log entries older than LOG_PRUNE_DAYS.""" + if not os.path.exists(log_file): + return -POSTED_TITLES_FILE = '/home/shane/foodie_automator/posted_rss_titles.json' -USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json' -EXPIRATION_HOURS = 24 -IMAGE_EXPIRATION_DAYS = 7 - -posted_titles_data = load_json_file(POSTED_TITLES_FILE, EXPIRATION_HOURS) -posted_titles = set(entry["title"] for entry in posted_titles_data) -used_images = set(entry["title"] for entry in load_json_file(USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS) if "title" in entry) - -def setup_logging(): - if os.path.exists(LOG_FILE): - with open(LOG_FILE, 'r') as f: + with open(log_file, 'r') as f: lines = f.readlines() - cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS) + + cutoff = datetime.now(timezone.utc) - timedelta(days=30) # LOG_PRUNE_DAYS pruned_lines = [] malformed_count = 0 + for line in lines: if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit(): malformed_count += 1 @@ -77,290 +87,211 @@ def setup_logging(): except ValueError: malformed_count += 1 continue + if malformed_count > 0: - logging.info(f"Skipped {malformed_count} malformed log lines during pruning") - with open(LOG_FILE, 'w') as f: + logger.warning(f"Skipped {malformed_count} malformed log lines during pruning") + + with open(log_file, 'w') as f: f.writelines(pruned_lines) - - logging.basicConfig( - filename=LOG_FILE, - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" - ) - console_handler = logging.StreamHandler() - console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) - logging.getLogger().addHandler(console_handler) - logging.getLogger("requests").setLevel(logging.WARNING) - logging.info("Logging initialized for foodie_automator_rss.py") -setup_logging() + def setup_signal_handlers(self) -> None: + """Set up signal handlers for graceful shutdown.""" + def signal_handler(sig, frame): + logger.info("Received termination signal, checking if safe to exit...") + if is_posting: + logger.info("Currently posting, will exit after completion.") + else: + logger.info("Safe to exit immediately.") + sys.exit(0) -def create_http_session() -> requests.Session: - session = requests.Session() - retry_strategy = Retry( - total=MAX_RETRIES, - backoff_factor=2, - status_forcelist=[403, 429, 500, 502, 503, 504], - allowed_methods=["GET", "POST"] - ) - adapter = HTTPAdapter( - max_retries=retry_strategy, - pool_connections=10, - pool_maxsize=10 - ) - session.mount("http://", adapter) - session.mount("https://", adapter) - session.headers.update({ - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36' - }) - return session + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) -def parse_date(date_str): - try: - parsed_date = email.utils.parsedate_to_datetime(date_str) - if parsed_date.tzinfo is None: - parsed_date = parsed_date.replace(tzinfo=timezone.utc) - return parsed_date - except Exception as e: - logging.error(f"Failed to parse date '{date_str}': {e}") - return datetime.now(timezone.utc) + def setup_http_session(self) -> requests.Session: + """Set up a requests session with retry logic.""" + session = requests.Session() + retry_strategy = Retry( + total=3, + backoff_factor=2, + status_forcelist=[403, 429, 500, 502, 503, 504], + allowed_methods=["GET", "POST"] + ) + adapter = HTTPAdapter( + max_retries=retry_strategy, + pool_connections=10, + pool_maxsize=10 + ) + session.mount("http://", adapter) + session.mount("https://", adapter) + session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36' + }) + return session -def fetch_rss_feeds(): - logging.info("Starting fetch_rss_feeds") - articles = [] - cutoff_date = datetime.now(timezone.utc) - timedelta(hours=EXPIRATION_HOURS) - session = create_http_session() - - if not RSS_FEEDS: - logging.error("RSS_FEEDS is empty in foodie_config.py") - return articles - - for feed_url in RSS_FEEDS: - logging.info(f"Processing feed: {feed_url}") + def load_posted_titles(self) -> Set[str]: + """Load and return the set of posted titles.""" try: - response = session.get(feed_url, timeout=FEED_TIMEOUT) - response.raise_for_status() - soup = BeautifulSoup(response.content, 'xml') - items = soup.find_all('item') - - feed_title = RSS_FEED_NAMES.get(feed_url, (get_clean_source_name(feed_url), feed_url)) - for item in items: - try: - title = item.find('title').text.strip() if item.find('title') else "Untitled" - link = item.find('link').text.strip() if item.find('link') else "" - pub_date = item.find('pubDate') - pub_date = parse_date(pub_date.text) if pub_date else datetime.now(timezone.utc) - - if pub_date < cutoff_date: - logging.info(f"Skipping old article: {title} (Published: {pub_date})") - continue - - description = item.find('description') - summary = BeautifulSoup(description.text, 'html.parser').get_text().strip() if description else "" - content = item.find('content:encoded') - content_text = BeautifulSoup(content.text, 'html.parser').get_text().strip() if content else summary - - articles.append({ - "title": title, - "link": link, - "summary": summary, - "content": content_text, - "feed_title": feed_title[0] if isinstance(feed_title, tuple) else feed_title, - "pub_date": pub_date - }) - logging.debug(f"Processed article: {title}") - except Exception as e: - logging.warning(f"Error processing entry in {feed_url}: {e}") - continue - logging.info(f"Filtered to {len(articles)} articles from {feed_url}") + data = load_json_file(FILE_PATHS["posted_rss_titles"], EXPIRATION_DAYS) + return {entry["title"] for entry in data if "title" in entry} except Exception as e: - logging.error(f"Failed to fetch RSS feed {feed_url}: {e}") - continue + logger.error(f"Error loading posted titles: {e}") + return set() - articles.sort(key=lambda x: x["pub_date"], reverse=True) - logging.info(f"Total RSS articles fetched: {len(articles)}") - return articles + def load_used_images(self) -> Set[str]: + """Load and return the set of used images.""" + try: + data = load_json_file(FILE_PATHS["used_images"], IMAGE_EXPIRATION_DAYS) + return {entry["title"] for entry in data if "title" in entry} + except Exception as e: + logger.error(f"Error loading used images: {e}") + return set() -def curate_from_rss(): - articles = fetch_rss_feeds() - if not articles: - print("No RSS articles available") - logging.info("No RSS articles available") - return None, None, random.randint(600, 1800) + def parse_date(self, date_str: str) -> datetime: + """Parse a date string into a datetime object.""" + try: + parsed_date = email.utils.parsedate_to_datetime(date_str) + if parsed_date.tzinfo is None: + parsed_date = parsed_date.replace(tzinfo=timezone.utc) + return parsed_date + except Exception as e: + logger.error(f"Failed to parse date '{date_str}': {e}") + return datetime.now(timezone.utc) - attempts = 0 - max_attempts = 10 - while attempts < max_attempts and articles: - article = articles.pop(0) - title = article["title"] - link = article["link"] - summary = article["summary"] - content = article["content"] - source_name = article["feed_title"] - original_source = f'{source_name}' + def fetch_rss_feeds(self) -> List[Dict]: + """Fetch and process RSS feeds.""" + logger.info("Starting fetch_rss_feeds") + articles = [] + cutoff_date = datetime.now(timezone.utc) - timedelta(hours=24) - if title in posted_titles: - print(f"Skipping already posted article: {title}") - logging.info(f"Skipping already posted article: {title}") - attempts += 1 - continue + if not RSS_FEEDS: + logger.error("RSS_FEEDS is empty in foodie_config.py") + return articles - print(f"Trying RSS Article: {title} from {source_name}") - logging.info(f"Trying RSS Article: {title} from {source_name}") + for feed_url in RSS_FEEDS: + logger.info(f"Processing feed: {feed_url}") + try: + response = self.session.get(feed_url, timeout=15) + response.raise_for_status() + soup = BeautifulSoup(response.content, 'xml') + items = soup.find_all('item') - image_query, relevance_keywords, skip = smart_image_and_filter(title, summary) - if skip: - print(f"Skipping filtered RSS article: {title}") - logging.info(f"Skipping filtered RSS article: {title}") - attempts += 1 - continue + feed_title = RSS_FEED_NAMES.get(feed_url, (get_clean_source_name(feed_url), feed_url)) + for item in items: + try: + title = item.find('title').text.strip() if item.find('title') else "Untitled" + link = item.find('link').text.strip() if item.find('link') else "" + pub_date = item.find('pubDate') + pub_date = self.parse_date(pub_date.text) if pub_date else datetime.now(timezone.utc) - scoring_content = f"{title}\n\n{summary}\n\nContent: {content}" - interest_score = is_interesting(scoring_content) - logging.info(f"Interest score for '{title}': {interest_score}") - if interest_score < 6: - print(f"RSS Interest Too Low: {interest_score}") - logging.info(f"RSS Interest Too Low: {interest_score}") - attempts += 1 - continue + if pub_date < cutoff_date: + logger.info(f"Skipping old article: {title} (Published: {pub_date})") + continue - num_paragraphs = determine_paragraph_count(interest_score) - extra_prompt = ( - f"Generate exactly {num_paragraphs} paragraphs.\n" - f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n" - f"Do NOT introduce unrelated concepts.\n" - f"Expand on the core idea with relevant context about its appeal or significance.\n" - f"Do not include emojis in the summary." - ) - content_to_summarize = scoring_content - final_summary = summarize_with_gpt4o( - content_to_summarize, - source_name, - link, - interest_score=interest_score, - extra_prompt=extra_prompt - ) - if not final_summary: - logging.info(f"Summary failed for '{title}'") - attempts += 1 - continue + description = item.find('description') + summary = BeautifulSoup(description.text, 'html.parser').get_text().strip() if description else "" + content = item.find('content:encoded') + content_text = BeautifulSoup(content.text, 'html.parser').get_text().strip() if content else summary - # Remove the original title from the summary while preserving paragraphs - title_pattern = re.compile( - r'\*\*' + re.escape(title) + r'\*\*|' + re.escape(title), - re.IGNORECASE - ) - paragraphs = final_summary.split('\n') - cleaned_paragraphs = [] - for para in paragraphs: - if para.strip(): - cleaned_para = title_pattern.sub('', para).strip() - cleaned_para = re.sub(r'\s+', ' ', cleaned_para) - cleaned_paragraphs.append(cleaned_para) - final_summary = '\n'.join(cleaned_paragraphs) - - final_summary = insert_link_naturally(final_summary, source_name, link) - post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title) - if not post_data: - attempts += 1 - continue - - # Fetch image - image_url, image_source, uploader, page_url = get_flickr_image(image_query, relevance_keywords) - if not image_url: - logging.info(f"Flickr fetch failed for '{image_query}'. Falling back to Pixabay.") - image_url, image_source, uploader, page_url = get_image(image_query) - if not image_url: - logging.info(f"Pixabay fetch failed for '{image_query}'. Skipping article '{title}'.") - attempts += 1 + articles.append({ + "title": title, + "link": link, + "summary": summary, + "content": content_text, + "feed_title": feed_title[0] if isinstance(feed_title, tuple) else feed_title, + "pub_date": pub_date + }) + except Exception as e: + logger.warning(f"Error processing entry in {feed_url}: {e}") + continue + logger.info(f"Filtered to {len(articles)} articles from {feed_url}") + except Exception as e: + logger.error(f"Failed to fetch RSS feed {feed_url}: {e}") continue - hook = get_dynamic_hook(post_data["title"]).strip() + articles.sort(key=lambda x: x["pub_date"], reverse=True) + logger.info(f"Total RSS articles fetched: {len(articles)}") + return articles - # Generate viral share prompt - share_prompt = get_viral_share_prompt(post_data["title"], final_summary) - share_links_template = ( - f'

{share_prompt} ' - f' ' - f'

' - ) - post_data["content"] = f"{final_summary}\n\n{share_links_template}" # Removed cta from content + def curate_from_rss(self) -> Tuple[Optional[Dict], Optional[str], int]: + """Curate content from RSS feeds.""" + articles = self.fetch_rss_feeds() + if not articles: + logger.info("No RSS articles available") + return None, None, random.randint(600, 1800) - global is_posting - is_posting = True - try: - post_id, post_url = post_to_wp( - post_data=post_data, - category=category, - link=link, - author=author, - image_url=image_url, - original_source=original_source, - image_source=image_source, - uploader=uploader, - pixabay_url=pixabay_url, - interest_score=interest_score, - should_post_tweet=True + for article in articles: + title = article["title"] + link = article["link"] + summary = article["summary"] + content = article["content"] + source_name = article["feed_title"] + + if title in self.posted_titles: + logger.info(f"Skipping already posted article: {title}") + continue + + logger.info(f"Processing RSS Article: {title} from {source_name}") + + image_query, relevance_keywords, skip = smart_image_and_filter(title, summary) + if skip: + logger.info(f"Skipping filtered RSS article: {title}") + continue + + scoring_content = f"{title}\n\n{summary}\n\nContent: {content}" + interest_score = is_interesting(scoring_content) + logger.info(f"Interest score for '{title}': {interest_score}") + + if interest_score < 6: + logger.info(f"RSS Interest Too Low: {interest_score}") + continue + + num_paragraphs = determine_paragraph_count(interest_score) + extra_prompt = ( + f"Generate exactly {num_paragraphs} paragraphs.\n" + f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n" + f"Do NOT introduce unrelated concepts.\n" + f"Expand on the core idea with relevant context about its appeal or significance.\n" + f"Do not include emojis in the summary." ) - finally: - is_posting = False - if post_id: - share_text = f"Check out this foodie gem! {post_data['title']}" - share_text_encoded = quote(share_text) - post_url_encoded = quote(post_url) - share_links = share_links_template.format(post_url=post_url_encoded, share_text=share_text_encoded) - # Removed: cta = select_best_cta(post_data["title"], final_summary, post_url=post_url) - post_data["content"] = f"{final_summary}\n\n{share_links}" # Removed cta from content - is_posting = True - try: - post_to_wp( - post_data=post_data, - category=category, - link=link, - author=author, - image_url=image_url, - original_source=original_source, - image_source=image_source, - uploader=uploader, - pixabay_url=pixabay_url, - interest_score=interest_score, - post_id=post_id, - should_post_tweet=False - ) - finally: - is_posting = False + final_summary = summarize_with_gpt4o( + scoring_content, + source_name, + link, + interest_score=interest_score, + extra_prompt=extra_prompt + ) - timestamp = datetime.now(timezone.utc).isoformat() - save_json_file(POSTED_TITLES_FILE, title, timestamp) - posted_titles.add(title) - logging.info(f"Successfully saved '{title}' to {POSTED_TITLES_FILE}") + if not final_summary: + logger.info(f"Summary failed for '{title}'") + continue - if image_url: - save_json_file(USED_IMAGES_FILE, image_url, timestamp) - used_images.add(image_url) - logging.info(f"Saved image '{image_url}' to {USED_IMAGES_FILE}") + final_summary = insert_link_naturally(final_summary, source_name, link) + post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title) - print(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from RSS *****") - logging.info(f"***** SUCCESS: Posted '{post_data['title']}' (ID: {post_id}) from RSS *****") - return post_data, category, random.randint(0, 1800) + if post_data and author: + return post_data, author, random.randint(600, 1800) - attempts += 1 - logging.info(f"WP posting failed for '{post_data['title']}'") - - print("No interesting RSS article found after attempts") - logging.info("No interesting RSS article found after attempts") - return None, None, random.randint(600, 1800) + return None, None, random.randint(600, 1800) def run_rss_automator(): - print(f"{datetime.now(timezone.utc)} - INFO - ***** RSS Automator Launched *****") - logging.info("***** RSS Automator Launched *****") - post_data, category, sleep_time = curate_from_rss() - print(f"Sleeping for {sleep_time}s") - logging.info(f"Completed run with sleep time: {sleep_time} seconds") - time.sleep(sleep_time) - return post_data, category, sleep_time + """Main function to run the RSS automator.""" + scraper = RSSScraper() + while True: + try: + post_data, author, sleep_time = scraper.curate_from_rss() + if post_data and author: + global is_posting + is_posting = True + try: + post_to_wp(post_data, author) + logger.info(f"Successfully posted: {post_data['title']}") + finally: + is_posting = False + time.sleep(sleep_time) + except Exception as e: + logger.error(f"Error in RSS automator: {e}") + time.sleep(300) # Wait 5 minutes before retrying if __name__ == "__main__": run_rss_automator() \ No newline at end of file diff --git a/foodie_config.py b/foodie_config.py index 289c045..124a449 100644 --- a/foodie_config.py +++ b/foodie_config.py @@ -2,14 +2,71 @@ # Constants shared across all automator scripts from dotenv import load_dotenv import os +from typing import Dict, List, Optional, TypedDict, Union +from pathlib import Path +import logging +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('foodie_automator.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +# Load environment variables load_dotenv() + +# API Keys OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") PIXABAY_API_KEY = os.getenv("PIXABAY_API_KEY") FLICKR_API_KEY = os.getenv("FLICKR_API_KEY") FLICKR_API_SECRET = os.getenv("FLICKR_API_SECRET") -AUTHORS = [ +# Validate required API keys +def validate_api_keys() -> None: + """Validate that all required API keys are present.""" + required_keys = { + "OPENAI_API_KEY": OPENAI_API_KEY, + "PIXABAY_API_KEY": PIXABAY_API_KEY, + "FLICKR_API_KEY": FLICKR_API_KEY, + "FLICKR_API_SECRET": FLICKR_API_SECRET + } + + missing_keys = [key for key, value in required_keys.items() if not value] + if missing_keys: + logger.error(f"Missing required API keys: {', '.join(missing_keys)}") + raise ValueError(f"Missing required API keys: {', '.join(missing_keys)}") + +# Type definitions +class AuthorConfig(TypedDict): + url: str + username: str + password: str + persona: str + bio: str + dob: str + +class XCredentials(TypedDict): + username: str + x_username: str + api_key: str + api_secret: str + access_token: str + access_token_secret: str + client_secret: str + +class PersonaConfig(TypedDict): + description: str + tone: str + article_prompt: str + x_prompt: str + +# Author configurations +AUTHORS: List[AuthorConfig] = [ { "url": "https://insiderfoodie.com", "username": "owenjohnson", @@ -31,7 +88,7 @@ AUTHORS = [ "username": "aishapatel", "password": os.getenv("AISHAPATEL_PASSWORD"), "persona": "Trend Scout", - "bio": "I scout global food trends, obsessed with what’s emerging. My sharp predictions map the industry’s path—always one step ahead.", + "bio": "I scout global food trends, obsessed with what's emerging. My sharp predictions map the industry's path—always one step ahead.", "dob": "1999-03-15" }, { @@ -47,7 +104,7 @@ AUTHORS = [ "username": "keishareid", "password": os.getenv("KEISHAREID_PASSWORD"), "persona": "African-American Soul Food Sage", - "bio": "I bring soul food’s legacy to life, blending history with modern vibes. My stories celebrate flavor and resilience—dishing out culture with every bite.", + "bio": "I bring soul food's legacy to life, blending history with modern vibes. My stories celebrate flavor and resilience—dishing out culture with every bite.", "dob": "1994-06-10" }, { @@ -60,7 +117,8 @@ AUTHORS = [ } ] -X_API_CREDENTIALS = [ +# X (Twitter) API credentials +X_API_CREDENTIALS: List[XCredentials] = [ { "username": "owenjohnson", "x_username": "@insiderfoodieowen", @@ -117,12 +175,13 @@ X_API_CREDENTIALS = [ } ] -PERSONA_CONFIGS = { +# Persona configurations +PERSONA_CONFIGS: Dict[str, PersonaConfig] = { "Visionary Editor": { "description": "a commanding food editor with a borderless view", "tone": "a polished and insightful tone, like 'This redefines culinary excellence.'", "article_prompt": ( - "You’re {description}. Summarize this article in {tone}. " + "You're {description}. Summarize this article in {tone}. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Add a bold take and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." @@ -139,7 +198,7 @@ PERSONA_CONFIGS = { "description": "a seasoned foodie reviewer with a sharp eye", "tone": "a professional yet engaging tone, like 'This dish is a revelation.'", "article_prompt": ( - "You’re {description}. Summarize this article in {tone}. " + "You're {description}. Summarize this article in {tone}. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Add a subtle opinion and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." @@ -154,12 +213,12 @@ PERSONA_CONFIGS = { }, "Trend Scout": { "description": "a forward-thinking editor obsessed with trends", - "tone": "an insightful and forward-looking tone, like 'This sets the stage for what’s next.'", + "tone": "an insightful and forward-looking tone, like 'This sets the stage for what's next.'", "article_prompt": ( - "You’re {description}. Summarize this article in {tone}. " + "You're {description}. Summarize this article in {tone}. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " - "Predict what’s next and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." + "Predict what's next and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." ), "x_prompt": ( "Craft a tweet as {description}. Keep it under 280 characters, using {tone}. " @@ -173,7 +232,7 @@ PERSONA_CONFIGS = { "description": "a cultured food writer who loves storytelling", "tone": "a warm and thoughtful tone, like 'This evokes a sense of tradition.'", "article_prompt": ( - "You’re {description}. Summarize this article in {tone}. " + "You're {description}. Summarize this article in {tone}. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Add a thoughtful observation and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." @@ -190,7 +249,7 @@ PERSONA_CONFIGS = { "description": "a vibrant storyteller rooted in African-American culinary heritage", "tone": "a heartfelt and authentic tone, like 'This captures the essence of heritage.'", "article_prompt": ( - "You’re {description}. Summarize this article in {tone}. " + "You're {description}. Summarize this article in {tone}. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Add a heritage twist and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." @@ -207,7 +266,7 @@ PERSONA_CONFIGS = { "description": "an adventurous explorer of global street food", "tone": "a bold and adventurous tone, like 'This takes you on a global journey.'", "article_prompt": ( - "You’re {description}. Summarize this article in {tone}. " + "You're {description}. Summarize this article in {tone}. " "Explore a wide range of food-related topics, skip recipes. Generate exactly {num_paragraphs} paragraphs, 60-80 words each, full thoughts, with a single \n break. " "Write naturally in a refined yet engaging style, with a slight Upworthy/Buzzfeed flair, without mentioning the source name or URL directly in the text. " "Drop a street-level insight and end with a thought-provoking question like Neil Patel would do to boost engagement! Do not include emojis in the summary." @@ -223,25 +282,30 @@ PERSONA_CONFIGS = { } # File paths -POSTED_RSS_TITLES_FILE = '/home/shane/foodie_automator/posted_rss_titles.json' -POSTED_GOOGLE_TITLES_FILE = '/home/shane/foodie_automator/posted_google_titles.json' -POSTED_REDDIT_TITLES_FILE = '/home/shane/foodie_automator/posted_reddit_titles.json' -USED_IMAGES_FILE = '/home/shane/foodie_automator/used_images.json' -AUTHOR_BACKGROUNDS_FILE = '/home/shane/foodie_automator/author_backgrounds.json' -X_POST_COUNTS_FILE = '/home/shane/foodie_automator/x_post_counts.json' -RECENT_POSTS_FILE = '/home/shane/foodie_automator/recent_posts.json' +BASE_DIR = Path("/home/shane/foodie_automator") +FILE_PATHS = { + "posted_rss_titles": BASE_DIR / "posted_rss_titles.json", + "posted_google_titles": BASE_DIR / "posted_google_titles.json", + "posted_reddit_titles": BASE_DIR / "posted_reddit_titles.json", + "used_images": BASE_DIR / "used_images.json", + "author_backgrounds": BASE_DIR / "author_backgrounds.json", + "x_post_counts": BASE_DIR / "x_post_counts.json", + "recent_posts": BASE_DIR / "recent_posts.json" +} +# Expiration periods EXPIRATION_DAYS = 3 IMAGE_EXPIRATION_DAYS = 7 -RSS_FEEDS = [ +# RSS feed configurations +RSS_FEEDS: List[str] = [ "https://www.eater.com/rss/full.xml", "https://www.nrn.com/rss.xml", "https://rss.nytimes.com/services/xml/rss/nyt/DiningandWine.xml", "https://www.theguardian.com/food/rss" ] -RSS_FEED_NAMES = { +RSS_FEED_NAMES: Dict[str, tuple[str, str]] = { "https://www.eater.com/rss/full.xml": ("Eater", "https://www.eater.com/"), "https://www.nrn.com/rss.xml": ("Nation's Restaurant News", "https://www.nrn.com/"), "https://rss.nytimes.com/services/xml/rss/nyt/DiningandWine.xml": ("The New York Times", "https://www.nytimes.com/section/food"), @@ -276,12 +340,33 @@ FAST_FOOD_KEYWORDS = [ SUMMARY_MODEL = "gpt-4o" # or "gpt-4.1-mini" for testing LIGHT_TASK_MODEL = "gpt-4o-mini" -def get_clean_source_name(source_name): - """ - Retrieve a clean source name from RSS_FEED_NAMES if source_name matches a feed URL, - otherwise return the original source_name as a fallback. - """ - for feed_url, (clean_name, _) in RSS_FEED_NAMES.items(): - if feed_url == source_name: - return clean_name - return source_name \ No newline at end of file +def get_clean_source_name(source_name: str) -> str: + """Clean and standardize source names.""" + try: + # Remove common prefixes and suffixes + clean_name = source_name.strip() + clean_name = clean_name.replace("The ", "").replace("the ", "") + clean_name = clean_name.replace("Food", "").replace("food", "") + clean_name = clean_name.replace("Dining", "").replace("dining", "") + clean_name = clean_name.replace("Restaurant", "").replace("restaurant", "") + + # Remove any remaining whitespace + clean_name = " ".join(clean_name.split()) + + return clean_name if clean_name else source_name + except Exception as e: + logger.error(f"Error cleaning source name '{source_name}': {e}") + return source_name + +# Validate configurations on import +validate_api_keys() + +# Ensure all file paths exist +for path in FILE_PATHS.values(): + path.parent.mkdir(parents=True, exist_ok=True) + if not path.exists(): + path.touch() + logger.info(f"Created missing file: {path}") + +# Log successful configuration +logger.info("Configuration loaded successfully") \ No newline at end of file diff --git a/foodie_utils.py b/foodie_utils.py index 252e602..4cc50f1 100644 --- a/foodie_utils.py +++ b/foodie_utils.py @@ -25,45 +25,85 @@ from foodie_config import ( get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL, X_API_CREDENTIALS, FLICKR_API_KEY, FLICKR_API_SECRET, PIXABAY_API_KEY ) +from typing import List, Dict, Any, Optional, Union, Tuple +from pathlib import Path +from functools import lru_cache +import hashlib +from rate_limiter import RateLimiter +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('foodie_automator.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +# Load environment variables load_dotenv() client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) -def load_json_file(file_path, expiration_hours): - entries = [] +def validate_json_entry(entry: Dict[str, Any]) -> bool: + """Validate the structure of a JSON entry.""" + required_fields = {"title", "timestamp"} + return ( + isinstance(entry, dict) and + all(field in entry for field in required_fields) and + isinstance(entry["title"], str) and + isinstance(entry["timestamp"], str) + ) + +def load_json_file(file_path: Union[str, Path], expiration_hours: int) -> List[Dict[str, Any]]: + """ + Load and validate JSON entries from a file, filtering by expiration time. + + Args: + file_path: Path to the JSON file + expiration_hours: Number of hours before entries expire + + Returns: + List of valid entries that haven't expired + """ + entries: List[Dict[str, Any]] = [] cutoff = datetime.now(timezone.utc) - timedelta(hours=expiration_hours) - if not os.path.exists(file_path): - logging.info(f"File {file_path} does not exist, returning empty list") + if not isinstance(file_path, Path): + file_path = Path(file_path) + + if not file_path.exists(): + logger.info(f"File {file_path} does not exist, returning empty list") return entries try: - with open(file_path, 'r') as f: + with file_path.open('r', encoding='utf-8') as f: lines = f.readlines() for i, line in enumerate(lines, 1): try: entry = json.loads(line.strip()) - if not isinstance(entry, dict) or "title" not in entry or "timestamp" not in entry: - logging.warning(f"Skipping malformed entry in {file_path} at line {i}: {line.strip()}") + if not validate_json_entry(entry): + logger.warning(f"Skipping malformed entry in {file_path} at line {i}: {line.strip()}") continue timestamp = datetime.fromisoformat(entry["timestamp"]) if timestamp > cutoff: entries.append(entry) else: - logging.debug(f"Entry expired in {file_path}: {entry['title']}") + logger.debug(f"Entry expired in {file_path}: {entry['title']}") except json.JSONDecodeError as e: - logging.warning(f"Skipping invalid JSON line in {file_path} at line {i}: {e}") + logger.warning(f"Skipping invalid JSON line in {file_path} at line {i}: {e}") continue except Exception as e: - logging.warning(f"Skipping malformed entry in {file_path} at line {i}: {line.strip()}") + logger.warning(f"Skipping malformed entry in {file_path} at line {i}: {line.strip()}") continue - logging.info(f"Loaded {len(entries)} entries from {file_path}, {len(entries)} valid after expiration check") + logger.info(f"Loaded {len(entries)} entries from {file_path}, {len(entries)} valid after expiration check") return entries except Exception as e: - logging.error(f"Failed to load {file_path}: {e}") + logger.error(f"Failed to load {file_path}: {e}") return entries def save_json_file(file_path, title, timestamp): @@ -81,10 +121,10 @@ def save_json_file(file_path, title, timestamp): for entry in pruned_entries: f.write(json.dumps(entry) + '\n') - logging.info(f"Saved '{title}' to {file_path}") - logging.info(f"Pruned {file_path} to {len(pruned_entries)} entries (older than {expiration_hours//24} days removed)") + logger.info(f"Saved '{title}' to {file_path}") + logger.info(f"Pruned {file_path} to {len(pruned_entries)} entries (older than {expiration_hours//24} days removed)") except Exception as e: - logging.error(f"Failed to save to {file_path}: {e}") + logger.error(f"Failed to save to {file_path}: {e}") def load_post_counts(): counts = [] @@ -99,14 +139,14 @@ def load_post_counts(): entry = json.loads(line.strip()) # Check for expected fields in x_post_counts.json if not isinstance(entry, dict) or "username" not in entry or "month" not in entry or "monthly_count" not in entry or "day" not in entry or "daily_count" not in entry: - logging.warning(f"Skipping malformed entry in {filename} at line {i}: {entry}") + logger.warning(f"Skipping malformed entry in {filename} at line {i}: {entry}") continue counts.append(entry) except json.JSONDecodeError as e: - logging.warning(f"Skipping invalid JSON line in {filename} at line {i}: {e}") - logging.info(f"Loaded {len(counts)} entries from {filename}") + logger.warning(f"Skipping invalid JSON line in {filename} at line {i}: {e}") + logger.info(f"Loaded {len(counts)} entries from {filename}") except Exception as e: - logging.error(f"Failed to load {filename}: {e}") + logger.error(f"Failed to load {filename}: {e}") counts = [] # Reset to empty on failure if not counts: @@ -134,7 +174,7 @@ def save_post_counts(counts): for item in counts: json.dump(item, f) f.write('\n') - logging.info("Saved post counts to x_post_counts.json") + logger.info("Saved post counts to x_post_counts.json") import re @@ -182,22 +222,22 @@ def generate_article_tweet(author, post, persona): if len(tweet) > max_tweet_length: tweet = tweet[:max_tweet_length-3] + "... " + url - logging.info(f"Generated tweet: {tweet}") + logger.info(f"Generated tweet: {tweet}") return tweet def post_tweet(author, tweet): credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None) if not credentials: - logging.error(f"No X credentials found for {author['username']}") + logger.error(f"No X credentials found for {author['username']}") return False post_counts = load_post_counts() author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None) if author_count["monthly_count"] >= 500: - logging.warning(f"Monthly post limit (500) reached for {author['username']}") + logger.warning(f"Monthly post limit (500) reached for {author['username']}") return False if author_count["daily_count"] >= 20: - logging.warning(f"Daily post limit (20) reached for {author['username']}") + logger.warning(f"Daily post limit (20) reached for {author['username']}") return False try: @@ -211,14 +251,14 @@ def post_tweet(author, tweet): author_count["monthly_count"] += 1 author_count["daily_count"] += 1 save_post_counts(post_counts) - logging.info(f"Posted tweet for {author['username']}: {tweet}") + logger.info(f"Posted tweet for {author['username']}: {tweet}") return True except Exception as e: - logging.error(f"Failed to post tweet for {author['username']}: {e}") + logger.error(f"Failed to post tweet for {author['username']}: {e}") return False def select_best_persona(interest_score, content=""): - logging.info("Using select_best_persona with interest_score and content") + logger.info("Using select_best_persona with interest_score and content") personas = ["Visionary Editor", "Foodie Critic", "Trend Scout", "Culture Connoisseur"] content_lower = content.lower() @@ -235,215 +275,285 @@ def select_best_persona(interest_score, content=""): return random.choice(personas[2:]) return random.choice(personas) -def get_image(search_query): - headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'} - - # Try Pixabay with the original query +# Add caching for API responses +@lru_cache(maxsize=100) +def get_cached_image_url(image_url: str) -> Optional[bytes]: + """Cache image downloads to avoid repeated requests.""" try: - pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(search_query)}&image_type=photo&per_page=10" - response = requests.get(pixabay_url, headers=headers, timeout=10) + response = requests.get(image_url, timeout=10) response.raise_for_status() - data = response.json() - - for hit in data.get('hits', []): - img_url = hit.get('webformatURL') - if not img_url or img_url in used_images: - continue - uploader = hit.get('user', 'Unknown') - page_url = hit.get('pageURL', img_url) - - used_images.add(img_url) - save_used_images() - - logging.info(f"Selected Pixabay image: {img_url} by {uploader} for query '{search_query}'") - return img_url, "Pixabay", uploader, page_url - - logging.info(f"No valid Pixabay image found for query '{search_query}'. Trying fallback query.") - + return response.content except Exception as e: - logging.warning(f"Pixabay image fetch failed for query '{search_query}': {e}") - - # Fallback to a generic query - fallback_query = "food dining" - try: - pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(fallback_query)}&image_type=photo&per_page=10" - response = requests.get(pixabay_url, headers=headers, timeout=10) - response.raise_for_status() - data = response.json() - - for hit in data.get('hits', []): - img_url = hit.get('webformatURL') - if not img_url or img_url in used_images: - continue - uploader = hit.get('user', 'Unknown') - page_url = hit.get('pageURL', img_url) - - used_images.add(img_url) - save_used_images() - - logging.info(f"Selected Pixabay fallback image: {img_url} by {uploader} for query '{fallback_query}'") - return img_url, "Pixabay", uploader, page_url - - logging.warning(f"No valid Pixabay image found for fallback query '{fallback_query}'.") - - except Exception as e: - logging.warning(f"Pixabay fallback image fetch failed for query '{fallback_query}': {e}") - - # Ultimate fallback: return None but log clearly - logging.error(f"All image fetch attempts failed for query '{search_query}'. Returning None.") - return None, None, None, None + logger.warning(f"Failed to cache image {image_url}: {e}") + return None -def generate_image_query(title, summary): - try: - prompt = ( - "Given the following article title and summary, generate a concise image search query (max 5 words) to find a relevant image. " - "Also provide a list of relevance keywords (max 5 words) that should be associated with the image. " - "Return the result as a JSON object with 'search' and 'relevance' keys.\n\n" - f"Title: {title}\n\n" - f"Summary: {summary}\n\n" - "Example output:\n" - "```json\n" - "{\"search\": \"Italian cuisine trends\", \"relevance\": \"pasta wine dining culture\"}\n" - "```" - ) - response = client.chat.completions.create( - model=LIGHT_TASK_MODEL, - messages=[ - {"role": "system", "content": prompt}, - {"role": "user", "content": "Generate an image search query and relevance keywords."} - ], - max_tokens=100, - temperature=0.5 - ) - raw_response = response.choices[0].message.content - json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response) - if not json_match: - logging.warning(f"Failed to parse image query JSON: {raw_response}") - return title, [], True - - query_data = json.loads(json_match.group(1)) - search_query = query_data.get("search", title) - relevance_keywords = query_data.get("relevance", "").split() - - # Log the JSON object in a single line - log_json = json.dumps(query_data).replace('\n', ' ').replace('\r', ' ') - logging.debug(f"Image query from content: {log_json}") - - return search_query, relevance_keywords, False - except Exception as e: - logging.warning(f"Image query generation failed: {e}. Using title as fallback.") - return title, [], True +def get_image_hash(image_content: bytes) -> str: + """Generate a hash for image content.""" + return hashlib.md5(image_content).hexdigest() -def smart_image_and_filter(title, summary): - try: - content = f"{title}\n\n{summary}" - - prompt = ( - "Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) " - "for an image search about food industry trends or viral content. Prioritize specific terms if present, " - "otherwise focus on the main theme. " - "Return 'SKIP' if the article is about home appliances, recipes, promotions, or contains 'homemade', else 'KEEP'. " - "Return as JSON with double quotes for all property names and string values (e.g., {\"image_query\": \"specific term\", \"relevance\": [\"keyword1\", \"keyword2\"], \"action\": \"KEEP\" or \"SKIP\"})." - ) - - response = client.chat.completions.create( - model=LIGHT_TASK_MODEL, - messages=[ - {"role": "system", "content": prompt}, - {"role": "user", "content": content} - ], - max_tokens=100 - ) - raw_result = response.choices[0].message.content.strip() - logging.info(f"Raw GPT smart image/filter response: '{raw_result}'") - - # Remove ```json markers and fix single quotes in JSON structure - cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip() - # Replace single quotes with double quotes, but preserve single quotes within string values - fixed_result = re.sub(r"(? Optional[Dict[str, Any]]: + """Make a WordPress API request with rate limiting and retry logic.""" + self.rate_limiter.wait_if_needed() + max_retries = 3 + retry_delay = 2 - # Add rate limit handling for image download - for attempt in range(3): + for attempt in range(max_retries): try: - image_response = requests.get(image_url, headers=image_headers, timeout=10) - if image_response.status_code == 429: - wait_time = 10 * (2 ** attempt) # 10s, 20s, 40s - logging.warning(f"Rate limit hit for {image_url}. Retrying after {wait_time}s (attempt {attempt+1}/3).") + response = requests.request( + method, + f"{self.base_url}/{endpoint}", + headers=self.headers, + **kwargs + ) + + if response.status_code == 429: # Rate limit + wait_time = retry_delay * (2 ** attempt) + logger.warning(f"Rate limit hit. Retrying after {wait_time}s (attempt {attempt+1}/{max_retries})") time.sleep(wait_time) continue - image_response.raise_for_status() - break - except requests.exceptions.HTTPError as e: - if e.response.status_code == 429: - wait_time = 10 * (2 ** attempt) - logging.warning(f"Rate limit hit for {image_url}. Retrying after {wait_time}s (attempt {attempt+1}/3).") - time.sleep(wait_time) - continue - raise - else: - logging.warning(f"Rate limit hit for {image_url} after retries. Failing image upload.") + + response.raise_for_status() + return response.json() if response.content else None + + except requests.exceptions.RequestException as e: + if attempt == max_retries - 1: + logger.error(f"WordPress API request failed after {max_retries} attempts: {e}") + return None + time.sleep(retry_delay * (2 ** attempt)) + + return None + + def upload_media(self, image_content: bytes, filename: str, caption: Optional[str] = None) -> Optional[int]: + """Upload media to WordPress with improved error handling.""" + try: + headers = { + "Authorization": self.auth_header, + "Content-Disposition": f"attachment; filename={filename}", + "Content-Type": "image/jpeg" + } + + response = requests.post( + f"{self.base_url}/media", + headers=headers, + data=image_content + ) + response.raise_for_status() + + media_id = response.json()["id"] + if caption: + self._make_request( + "POST", + f"media/{media_id}", + json={"caption": caption} + ) + + logger.info(f"Uploaded media '{filename}' (ID: {media_id})") + return media_id + except Exception as e: + logger.error(f"Media upload failed for '{filename}': {e}") + return None + + def get_category_id(self, category_name: str) -> Optional[int]: + """Get or create a WordPress category.""" + try: + # Try to find existing category + response = self._make_request( + "GET", + "categories", + params={"search": category_name} + ) + if response: + for cat in response: + if cat["name"].lower() == category_name.lower(): + return cat["id"] + + # Create new category if not found + response = self._make_request( + "POST", + "categories", + json={"name": category_name} + ) + return response["id"] if response else None + except Exception as e: + logger.error(f"Failed to get/create category '{category_name}': {e}") + return None + + def get_tag_id(self, tag_name: str) -> Optional[int]: + """Get or create a WordPress tag.""" + try: + response = self._make_request( + "GET", + "tags", + params={"search": tag_name} + ) + if response: + for tag in response: + if tag["name"].lower() == tag_name.lower(): + return tag["id"] + + response = self._make_request( + "POST", + "tags", + json={"name": tag_name} + ) + return response["id"] if response else None + except Exception as e: + logger.error(f"Failed to get/create tag '{tag_name}': {e}") return None - response = requests.post( - f"{wp_base_url}/media", - headers=headers, - data=image_response.content - ) - response.raise_for_status() +# Initialize WordPress API +wp_api = WordPressAPI( + "https://insiderfoodie.com/wp-json/wp/v2", + os.getenv("WP_USERNAME", ""), + os.getenv("WP_PASSWORD", "") +) + +def upload_image_to_wp(image_url: str, post_title: str, wp_base_url: str, wp_username: str, wp_password: str, + image_source: str = "Pixabay", uploader: Optional[str] = None, pixabay_url: Optional[str] = None) -> Optional[int]: + """Upload an image to WordPress with improved error handling and caching.""" + try: + safe_title = post_title.encode('ascii', 'ignore').decode('ascii').replace(' ', '_')[:50] + filename = f"{safe_title}.jpg" - image_id = response.json()["id"] + # Try to get cached image content first + image_content = get_cached_image_url(image_url) + if not image_content: + # If not in cache, download with retry logic + for attempt in range(3): + try: + response = requests.get(image_url, timeout=10) + if response.status_code == 429: + wait_time = 10 * (2 ** attempt) + logger.warning(f"Rate limit hit for {image_url}. Retrying after {wait_time}s (attempt {attempt+1}/3).") + time.sleep(wait_time) + continue + response.raise_for_status() + image_content = response.content + break + except requests.exceptions.RequestException as e: + if attempt == 2: + logger.warning(f"Failed to download image after {attempt+1} attempts: {e}") + return None + time.sleep(2 ** attempt) + + if not image_content: + logger.error(f"Failed to get image content for {image_url}") + return None + + # Create caption with attribution caption = f'{image_source} by {uploader}' if pixabay_url and uploader else image_source - requests.post( - f"{wp_base_url}/media/{image_id}", - headers={"Authorization": headers["Authorization"], "Content-Type": "application/json"}, - json={"caption": caption} - ) - logging.info(f"Uploaded image '{safe_title}.jpg' to WP (ID: {image_id}) with caption '{caption}'") - return image_id + # Upload to WordPress using the API class + media_id = wp_api.upload_media(image_content, filename, caption) + if not media_id: + logger.error(f"Failed to upload image '{filename}' to WordPress") + return None + + logger.info(f"Successfully uploaded image '{filename}' to WordPress (ID: {media_id})") + return media_id except Exception as e: - logging.error(f"Image upload to WP failed for '{post_title}': {e}") + logger.error(f"Image upload to WP failed for '{post_title}': {e}") return None +def post_to_wp(post_data: Dict[str, Any], category: str, link: str, author: Dict[str, str], + image_url: Optional[str] = None, original_source: str = "", + image_source: str = "Pixabay", uploader: Optional[str] = None, + pixabay_url: Optional[str] = None, interest_score: int = 4, + post_id: Optional[int] = None, should_post_tweet: bool = True) -> Tuple[Optional[int], Optional[str]]: + """Post content to WordPress with improved error handling and validation.""" + try: + # Validate input data + if not isinstance(post_data, dict) or "title" not in post_data or "content" not in post_data: + logger.error(f"Invalid post_data format: {post_data}") + return None, None + + if not isinstance(author, dict) or "username" not in author or "password" not in author: + logger.error(f"Invalid author data: {author}") + return None, None + + # Get category ID + category_id = wp_api.get_category_id(category) + if not category_id: + logger.error(f"Failed to get/create category '{category}'") + return None, None + + # Prepare tags + tags = [1] # Default tag + if interest_score >= 9: + picks_tag_id = wp_api.get_tag_id("Picks") + if picks_tag_id: + tags.append(picks_tag_id) + + # Handle image upload + image_id = None + if image_url: + image_id = upload_image_to_wp( + image_url, post_data["title"], + wp_api.base_url, author["username"], author["password"], + image_source, uploader, pixabay_url + ) + + # Prepare post payload + payload = { + "title": post_data["title"], + "content": "\n".join(f"

{para}

" for para in post_data["content"].split('\n') if para.strip()), + "status": "publish", + "categories": [category_id], + "tags": tags, + "author": author.get("id", 5), # Default author ID + "meta": { + "original_link": link, + "original_source": original_source, + "interest_score": interest_score + } + } + + if image_id: + payload["featured_media"] = image_id + + # Create or update post + endpoint = f"posts/{post_id}" if post_id else "posts" + method = "POST" if not post_id else "PUT" + + response = wp_api._make_request(method, endpoint, json=payload) + if not response: + logger.error("Failed to create/update WordPress post") + return None, None + + post_id = response["id"] + post_url = response["link"] + + # Save to recent posts + timestamp = datetime.now(timezone.utc).isoformat() + save_post_to_recent(post_data["title"], post_url, author["username"], timestamp) + + # Post tweet if requested + if should_post_tweet: + try: + post = {"title": post_data["title"], "url": post_url} + tweet = generate_article_tweet(author, post, author.get("persona", "Foodie Critic")) + if post_tweet(author, tweet): + logger.info(f"Successfully posted article tweet for {author['username']}") + except Exception as e: + logger.error(f"Error posting article tweet: {e}") + + logger.info(f"Successfully posted/updated '{post_data['title']}' (ID: {post_id})") + return post_id, post_url + except Exception as e: + logger.error(f"WordPress posting failed: {e}") + return None, None + def determine_paragraph_count(interest_score): if interest_score >= 9: return 5 @@ -469,10 +579,10 @@ def is_interesting(summary): raw_score = response.choices[0].message.content.strip() score = int(raw_score) if raw_score.isdigit() else 0 print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})") - logging.info(f"Interest Score: {score} (raw: {raw_score})") + logger.info(f"Interest Score: {score} (raw: {raw_score})") return score except Exception as e: - logging.error(f"Interestingness scoring failed: {e}") + logger.error(f"Interestingness scoring failed: {e}") print(f"Interest Error: {e}") return 0 @@ -485,7 +595,7 @@ def generate_title_from_summary(summary): messages=[ {"role": "system", "content": ( "Generate a concise, engaging title (under 100 characters) based on this summary, covering food topics. " - "Craft it with Upworthy/Buzzfeed flair—think ‘you won’t believe this’ or ‘this is nuts’—for food insiders. " + "Craft it with Upworthy/Buzzfeed flair—think 'you won't believe this' or 'this is nuts'—for food insiders. " "Avoid quotes, emojis, special characters, or the words 'elevate', 'elevating', 'elevated'. " "End with a question to spark shares." )}, @@ -499,15 +609,15 @@ def generate_title_from_summary(summary): if len(title) > 100 or any(word in title.lower() for word in banned_words): reason = "length" if len(title) > 100 else "banned word" print(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}") - logging.info(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}") + logger.info(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}") continue - logging.info(f"Generated title: {title}") + logger.info(f"Generated title: {title}") return title except Exception as e: - logging.error(f"Title generation failed (attempt {attempt + 1}/3): {e}") + logger.error(f"Title generation failed (attempt {attempt + 1}/3): {e}") print(f"Title Error: {e}") print("Failed to generate valid title after 3 attempts") - logging.info("Failed to generate valid title after 3 attempts") + logger.info("Failed to generate valid title after 3 attempts") return None def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_prompt=""): @@ -523,7 +633,7 @@ def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_pro tone=persona_config["tone"], num_paragraphs=determine_paragraph_count(interest_score) ) - logging.info(f"Using {persona} with interest_score and content") + logger.info(f"Using {persona} with interest_score and content") full_prompt = ( f"{prompt}\n\n" @@ -554,22 +664,22 @@ def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_pro # For now, we'll use a placeholder for the title removal logic # In foodie_automator_rss.py, the title is available as entry.title # We'll handle the title removal in the calling script instead - logging.info(f"Processed summary (Persona: {persona}): {summary}") + logger.info(f"Processed summary (Persona: {persona}): {summary}") return summary except Exception as e: - logging.error(f"Summary generation failed with model {SUMMARY_MODEL}: {e}") + logger.error(f"Summary generation failed with model {SUMMARY_MODEL}: {e}") return None def insert_link_naturally(summary, source_name, source_url): try: - logging.info(f"Input summary to insert_link_naturally: {summary!r}") + logger.info(f"Input summary to insert_link_naturally: {summary!r}") prompt = ( "Take this summary and insert a single HTML link naturally into one paragraph (randomly chosen). " "Use the format '{source_name}' and weave it into the text seamlessly, " "e.g., 'The latest scoop from {source_name} reveals...' or '{source_name} shares this insight.' " - "Vary the phrasing creatively to avoid repetition (don’t always use 'dives into'). " + "Vary the phrasing creatively to avoid repetition (don't always use 'dives into'). " "Place the link at a sentence boundary (after a period, not within numbers like '6.30am' or '1.5'). " "Maintain the original tone, flow, and paragraph structure, preserving all existing newlines exactly as they are. " "Each paragraph in the input summary is separated by a single \\n; ensure the output maintains this exact separation. " @@ -594,19 +704,19 @@ def insert_link_naturally(summary, source_name, source_url): paragraphs = new_summary.split('\n') paragraphs = [p.strip() for p in paragraphs] new_summary = '\n'.join(paragraphs) - logging.info(f"Summary with naturally embedded link (normalized): {new_summary!r}") + logger.info(f"Summary with naturally embedded link (normalized): {new_summary!r}") return new_summary - logging.warning(f"GPT failed to insert link correctly: {new_summary}. Using fallback.") + logger.warning(f"GPT failed to insert link correctly: {new_summary}. Using fallback.") except Exception as e: - logging.error(f"Link insertion failed: {e}") + logger.error(f"Link insertion failed: {e}") # Fallback path time_pattern = r'\b\d{1,2}\.\d{2}(?:am|pm)\b' protected_summary = re.sub(time_pattern, lambda m: m.group(0).replace('.', '@'), summary) paragraphs = protected_summary.split('\n') if not paragraphs or all(not p.strip() for p in paragraphs): - logging.error("No valid paragraphs to insert link.") + logger.error("No valid paragraphs to insert link.") return summary target_para = random.choice([p for p in paragraphs if p.strip()]) @@ -633,13 +743,13 @@ def insert_link_naturally(summary, source_name, source_url): new_summary = '\n'.join(paragraphs) new_summary = new_summary.replace('@', '.') - logging.info(f"Fallback summary with link: {new_summary!r}") + logger.info(f"Fallback summary with link: {new_summary!r}") return new_summary def generate_category_from_summary(summary): try: if not isinstance(summary, str) or not summary.strip(): - logging.warning(f"Invalid summary for category generation: {summary}. Defaulting to 'Trends'.") + logger.warning(f"Invalid summary for category generation: {summary}. Defaulting to 'Trends'.") return "Trends" response = client.chat.completions.create( @@ -654,354 +764,260 @@ def generate_category_from_summary(summary): max_tokens=10 ) category = response.choices[0].message.content.strip() - logging.info(f"Generated category: {category}") + logger.info(f"Generated category: {category}") return category if category in ["Food", "Culture", "Trends", "Health", "Lifestyle", "Drink", "Eats"] else "Trends" except Exception as e: - logging.error(f"Category generation failed: {e}") + logger.error(f"Category generation failed: {e}") return "Trends" -def get_wp_category_id(category_name, wp_base_url, wp_username, wp_password): +def select_best_author(summary): try: - headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"} - response = requests.get(f"{wp_base_url}/categories", headers=headers, params={"search": category_name}) - response.raise_for_status() - categories = response.json() - for cat in categories: - if cat["name"].lower() == category_name.lower(): - return cat["id"] - return None - except Exception as e: - logging.error(f"Failed to get WP category ID for '{category_name}': {e}") - return None - -def create_wp_category(category_name, wp_base_url, wp_username, wp_password): - try: - headers = { - "Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}", - "Content-Type": "application/json" - } - payload = {"name": category_name} - response = requests.post(f"{wp_base_url}/categories", headers=headers, json=payload) - response.raise_for_status() - return response.json()["id"] - except Exception as e: - logging.error(f"Failed to create WP category '{category_name}': {e}") - return None - -def get_wp_tag_id(tag_name, wp_base_url, wp_username, wp_password): - try: - headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"} - response = requests.get(f"{wp_base_url}/tags", headers=headers, params={"search": tag_name}) - response.raise_for_status() - tags = response.json() - for tag in tags: - if tag["name"].lower() == tag_name.lower(): - return tag["id"] - return None - except Exception as e: - logging.error(f"Failed to get WP tag ID for '{tag_name}': {e}") - return None - -def post_to_wp(post_data, category, link, author, image_url, original_source, image_source="Pixabay", uploader=None, pixabay_url=None, interest_score=4, post_id=None, should_post_tweet=True): - wp_base_url = "https://insiderfoodie.com/wp-json/wp/v2" - logging.info(f"Starting post_to_wp for '{post_data['title']}', image_source: {image_source}") - - if not isinstance(author, dict) or "username" not in author or "password" not in author: - raise ValueError(f"Invalid author data: {author}. Expected a dictionary with 'username' and 'password' keys.") - - wp_username = author["username"] - wp_password = author["password"] - - if not isinstance(interest_score, int): - logging.error(f"Invalid interest_score type: {type(interest_score)}, value: '{interest_score}'. Defaulting to 4.") - interest_score = 4 - elif interest_score < 0 or interest_score > 10: - logging.warning(f"interest_score out of valid range (0-10): {interest_score}. Clamping to 4.") - interest_score = min(max(interest_score, 0), 10) - - try: - headers = { - "Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}", - "Content-Type": "application/json" - } - - auth_test = requests.get(f"{wp_base_url}/users/me", headers=headers) - auth_test.raise_for_status() - logging.info(f"Auth test passed for {wp_username}: {auth_test.json()['id']}") - - category_id = get_wp_category_id(category, wp_base_url, wp_username, wp_password) - if not category_id: - category_id = create_wp_category(category, wp_base_url, wp_username, wp_password) - logging.info(f"Created new category '{category}' with ID {category_id}") - else: - logging.info(f"Found existing category '{category}' with ID {category_id}") - - tags = [1] - if interest_score >= 9: - picks_tag_id = get_wp_tag_id("Picks", wp_base_url, wp_username, wp_password) - if picks_tag_id and picks_tag_id not in tags: - tags.append(picks_tag_id) - logging.info(f"Added 'Picks' tag (ID: {picks_tag_id}) to post due to high interest score: {interest_score}") - - content = post_data["content"] - if content is None: - logging.error(f"Post content is None for title '{post_data['title']}' - using fallback") - content = "Content unavailable. Check the original source for details." - formatted_content = "\n".join(f"

{para}

" for para in content.split('\n') if para.strip()) - - author_id_map = { - "owenjohnson": 10, - "javiermorales": 2, - "aishapatel": 3, - "trangnguyen": 12, - "keishareid": 13, - "lilamoreau": 7 - } - author_id = author_id_map.get(author["username"], 5) - - # Handle image upload - image_id = None - if image_url: - logging.info(f"Attempting image upload for '{post_data['title']}', URL: {image_url}, source: {image_source}") - image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, pixabay_url) - if not image_id: - logging.info(f"Flickr upload failed for '{post_data['title']}', falling back to Pixabay") - pixabay_query = post_data["title"][:50] - image_url, image_source, uploader, pixabay_url = get_image(pixabay_query) - if image_url: - image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, pixabay_url) - if not image_id: - logging.warning(f"All image uploads failed for '{post_data['title']}' - posting without image") - - payload = { - "title": post_data["title"], - "content": formatted_content, - "status": "publish", - "categories": [category_id], - "tags": tags, - "author": author_id, - "meta": { - "original_link": link, - "original_source": original_source, - "interest_score": interest_score - } - } - - if image_id: - payload["featured_media"] = image_id - logging.info(f"Set featured image for post '{post_data['title']}': Media ID={image_id}") - - endpoint = f"{wp_base_url}/posts/{post_id}" if post_id else f"{wp_base_url}/posts" - method = requests.post - - logging.debug(f"Sending WP request to {endpoint} with payload: {json.dumps(payload, indent=2)}") - - response = method(endpoint, headers=headers, json=payload) - response.raise_for_status() - - post_info = response.json() - logging.debug(f"WP response: {json.dumps(post_info, indent=2)}") - - if not isinstance(post_info, dict) or "id" not in post_info: - raise ValueError(f"Invalid WP response: {post_info}") - - post_id = post_info["id"] - post_url = post_info["link"] - - # Save to recent_posts.json - timestamp = datetime.now(timezone.utc).isoformat() - save_post_to_recent(post_data["title"], post_url, author["username"], timestamp) - - # Post article tweet to X only if should_post_tweet is True - if should_post_tweet: - try: - post = {"title": post_data["title"], "url": post_url} - tweet = generate_article_tweet(author, post, author["persona"]) - if post_tweet(author, tweet): - logging.info(f"Successfully posted article tweet for {author['username']} on X") - else: - logging.warning(f"Failed to post article tweet for {author['username']} on X") - except Exception as e: - logging.error(f"Error posting article tweet for {author['username']}: {e}") - - logging.info(f"Posted/Updated by {author['username']}: {post_data['title']} (ID: {post_id})") - return post_id, post_url - - except requests.exceptions.RequestException as e: - logging.error(f"WP API request failed: {e} - Response: {e.response.text if e.response else 'No response'}") - print(f"WP Error: {e}") - return None, None - except KeyError as e: - logging.error(f"WP payload error - Missing key: {e} - Author data: {author}") - print(f"WP Error: {e}") - return None, None - except Exception as e: - logging.error(f"WP posting failed: {e}") - print(f"WP Error: {e}") - return None, None - -# Configure Flickr API with credentials -flickr_api.set_keys(api_key=FLICKR_API_KEY, api_secret=FLICKR_API_SECRET) -logging.info(f"Flickr API configured with key: {FLICKR_API_KEY[:4]}... and secret: {FLICKR_API_SECRET[:4]}...") - -# Global variable to track the last Flickr request time -last_flickr_request_time = 0 - -# Flickr request counter -flickr_request_count = 0 -flickr_request_start_time = time.time() - -# Define exclude keywords for filtering unwanted image types -exclude_keywords = [ - "poster", "infographic", "chart", "graph", "data", "stats", "text", "typography", - "design", "advertisement", "illustration", "diagram", "layout", "print" -] - -# Initialize used_images as a set to track used image URLs -used_images_file = "/home/shane/foodie_automator/used_images.json" -used_images = set() - -# Load used images from file if it exists -if os.path.exists(used_images_file): - try: - with open(used_images_file, 'r') as f: - content = f.read().strip() - if not content: - logging.warning(f"Used images file {used_images_file} is empty. Resetting to empty list.") - data = [] - else: - data = json.loads(content) - if not isinstance(data, list): - logging.warning(f"Invalid format in {used_images_file}: expected a list, got {type(data)}. Converting to list.") - if isinstance(data, dict): - # If it's a dict, try to extract URLs from values - data = [v for v in data.values() if isinstance(v, str) and v.startswith('https://')] - else: - logging.warning(f"Cannot convert {type(data)} to list. Resetting to empty list.") - data = [] - # Filter out non-string or non-URL entries - data = [item for item in data if isinstance(item, str) and item.startswith('https://')] - used_images.update(data) - logging.info(f"Loaded {len(used_images)} used image URLs from {used_images_file}") - except Exception as e: - logging.warning(f"Failed to load used images from {used_images_file}: {e}. Resetting to empty set.") - used_images = set() - with open(used_images_file, 'w') as f: - json.dump([], f) - -# Function to save used_images to file -def save_used_images(): - try: - # Ensure used_images contains only valid URLs - valid_urls = [url for url in used_images if isinstance(url, str) and url.startswith('https://')] - if len(valid_urls) != len(used_images): - logging.warning(f"Found {len(used_images) - len(valid_urls)} invalid URLs in used_images set") - - with open(used_images_file, 'w') as f: - json.dump(valid_urls, f, indent=2) - logging.info(f"Saved {len(valid_urls)} used image URLs to {used_images_file}") - except Exception as e: - logging.warning(f"Failed to save used images to {used_images_file}: {e}") - -def reset_flickr_request_count(): - global flickr_request_count, flickr_request_start_time - if time.time() - flickr_request_start_time >= 3600: # Reset every hour - flickr_request_count = 0 - flickr_request_start_time = time.time() - -def process_photo(photo, search_query): - tags = [tag.text.lower() for tag in photo.getTags()] - title = photo.title.lower() if photo.title else "" - - matched_keywords = [kw for kw in exclude_keywords if kw in tags or kw in title] - if matched_keywords: - logging.info(f"Skipping image with unwanted keywords: {photo.id} (tags: {tags}, title: {title}, matched: {matched_keywords})") - return None - - # Try 'Large' size first, fall back to 'Medium' if unavailable - img_url = None - try: - img_url = photo.getPhotoFile(size_label='Large') - except flickr_api.flickrerrors.FlickrError as e: - logging.info(f"Large size not available for photo {photo.id}: {e}, trying Medium") - try: - img_url = photo.getPhotoFile(size_label='Medium') - except flickr_api.flickrerrors.FlickrError as e: - logging.warning(f"Medium size not available for photo {photo.id}: {e}") - return None - - if not img_url or img_url in used_images: - logging.info(f"Image URL invalid or already used for photo {photo.id}: {img_url}") - return None - - uploader = photo.owner.username - page_url = f"https://www.flickr.com/photos/{photo.owner.nsid}/{photo.id}" - - used_images.add(img_url) - save_used_images() - - flickr_data = { - "title": search_query, - "image_url": img_url, - "source": "Flickr", - "uploader": uploader, - "page_url": page_url, - "timestamp": datetime.now(timezone.utc).isoformat() - } - flickr_file = "/home/shane/foodie_automator/flickr_images.json" - with open(flickr_file, 'a') as f: - json.dump(flickr_data, f) - f.write('\n') - logging.info(f"Saved Flickr image metadata to {flickr_file}: {img_url}") - - logging.info(f"Selected Flickr image: {img_url} by {uploader} for query '{search_query}' (tags: {tags})") - return img_url, "Flickr", uploader, page_url - -def search_flickr(query, per_page=5): - try: - photos = flickr_api.Photo.search( - text=query, - per_page=per_page, - sort='relevance', - safe_search=1, - media='photos', - license='4,5,9,10' + response = client.chat.completions.create( + model=LIGHT_TASK_MODEL, + messages=[ + {"role": "system", "content": ( + "Based on this restaurant/food industry trend summary, pick the most suitable author from: " + "owenjohnson, javiermorales, aishapatel, trangnguyen, keishareid, lilamoreau. " + "Consider their expertise: owenjohnson (global dining trends), javiermorales (food critique), " + "aishapatel (emerging food trends), trangnguyen (cultural dining), keishareid (soul food heritage), " + "lilamoreau (global street food). Return only the username." + )}, + {"role": "user", "content": summary} + ], + max_tokens=20 ) - return photos + author = response.choices[0].message.content.strip() + valid_authors = ["owenjohnson", "javiermorales", "aishapatel", "trangnguyen", "keishareid", "lilamoreau"] + logger.info(f"Selected author: {author}") + return author if author in valid_authors else "owenjohnson" except Exception as e: - logging.warning(f"Flickr API error for query '{query}': {e}") - return [] + logger.error(f"Author selection failed: {e}") + return "owenjohnson" -def fetch_photo_by_id(photo_id): +def prepare_post_data(final_summary, original_title, context_info=""): + innovative_title = generate_title_from_summary(final_summary) + if not innovative_title: + logger.info(f"Title generation failed for '{original_title}' {context_info}") + return None, None, None, None, None, None, None + + # Pass innovative_title and final_summary as separate arguments + search_query, relevance_keywords, _ = generate_image_query(innovative_title, final_summary) + if not search_query: + logger.info(f"Image query generation failed for '{innovative_title}' {context_info}") + return None, None, None, None, None, None, None + + logger.info(f"Fetching Flickr image for query: '{search_query}' {context_info}") + image_url, image_source, uploader, page_url = get_flickr_image(search_query, relevance_keywords) + + if not image_url: + logger.info(f"Flickr fetch failed for '{search_query}' - falling back to Pixabay {context_info}") + # Use the same title and summary for fallback + image_query, _, _ = generate_image_query(innovative_title, final_summary) + image_url, image_source, uploader, page_url = get_image(image_query) + if not image_url: + logger.info(f"Pixabay fetch failed for title '{innovative_title}' - falling back to summary {context_info}") + image_query, _, _ = generate_image_query(final_summary, final_summary) # Using summary as both title and summary for fallback + image_url, image_source, uploader, page_url = get_image(image_query) + if not image_url: + logger.info(f"Image fetch failed again for '{original_title}' - proceeding without image {context_info}") + + post_data = {"title": innovative_title, "content": final_summary} + selected_username = select_best_author(final_summary) + author = next((a for a in AUTHORS if a["username"] == selected_username), None) + if not author: + logger.error(f"Author '{selected_username}' not found in AUTHORS, defaulting to owenjohnson") + author = {"username": "owenjohnson", "password": "rfjk xhn6 2RPy FuQ9 cGlU K8mC"} + category = generate_category_from_summary(final_summary) + + return post_data, author, category, image_url, image_source, uploader, page_url + +def save_post_to_recent(post_title, post_url, author_username, timestamp): try: - photo = flickr_api.Photo(id=photo_id) - return photo + recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json') + entry = { + "title": post_title, + "url": post_url, + "author_username": author_username, + "timestamp": timestamp + } + recent_posts.append(entry) + with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f: + for item in recent_posts: + json.dump(item, f) + f.write('\n') + logger.info(f"Saved post '{post_title}' to recent_posts.json") except Exception as e: - logging.warning(f"Failed to fetch Flickr photo ID {photo_id}: {e}") - return None + logger.error(f"Failed to save post to recent_posts.json: {e}") -def search_ddg_for_flickr(query): - ddg_query = f"{query} site:flickr.com" - ddg_url = f"https://duckduckgo.com/?q={quote(ddg_query)}" +def prune_recent_posts(): try: - response = requests.get(ddg_url, headers={'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}, timeout=10) + cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat() + recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json') + recent_posts = [entry for entry in recent_posts if entry["timestamp"] > cutoff] + with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f: + for item in recent_posts: + json.dump(item, f) + f.write('\n') + logger.info(f"Pruned recent_posts.json to {len(recent_posts)} entries") + except Exception as e: + logger.error(f"Failed to prune recent_posts.json: {e}") + +def get_image(search_query: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: + """Get an image with improved rate limiting and error handling.""" + headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'} + + # Try Pixabay with rate limiting + try: + pixabay_rate_limiter.wait_if_needed() + pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(search_query)}&image_type=photo&per_page=10" + response = requests.get(pixabay_url, headers=headers, timeout=10) response.raise_for_status() - soup = BeautifulSoup(response.text, 'html.parser') + data = response.json() - photo_ids = set() - for link in soup.find_all('a', href=True): - href = link['href'] - match = re.search(r'flickr\.com/photos/[^/]+/(\d+)', href) - if match: - photo_id = match.group(1) - photo_ids.add(photo_id) + for hit in data.get('hits', []): + img_url = hit.get('webformatURL') + if not img_url or img_url in used_images: + continue + uploader = hit.get('user', 'Unknown') + page_url = hit.get('pageURL', img_url) + + used_images.add(img_url) + save_used_images() + + logger.info(f"Selected Pixabay image: {img_url} by {uploader} for query '{search_query}'") + return img_url, "Pixabay", uploader, page_url - photo_ids = list(photo_ids)[:2] # Limit to 2 IDs - logging.info(f"Found {len(photo_ids)} Flickr photo IDs via DDG: {photo_ids}") - return photo_ids + logger.info(f"No valid Pixabay image found for query '{search_query}'. Trying fallback query.") + except Exception as e: - logging.warning(f"DDG search failed for query '{ddg_query}': {e}") - return set() + logger.warning(f"Pixabay image fetch failed for query '{search_query}': {e}") + + # Fallback to a generic query with rate limiting + fallback_query = "food dining" + try: + pixabay_rate_limiter.wait_if_needed() + pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(fallback_query)}&image_type=photo&per_page=10" + response = requests.get(pixabay_url, headers=headers, timeout=10) + response.raise_for_status() + data = response.json() + + for hit in data.get('hits', []): + img_url = hit.get('webformatURL') + if not img_url or img_url in used_images: + continue + uploader = hit.get('user', 'Unknown') + page_url = hit.get('pageURL', img_url) + + used_images.add(img_url) + save_used_images() + + logger.info(f"Selected Pixabay fallback image: {img_url} by {uploader} for query '{fallback_query}'") + return img_url, "Pixabay", uploader, page_url + + logger.warning(f"No valid Pixabay image found for fallback query '{fallback_query}'.") + + except Exception as e: + logger.warning(f"Pixabay fallback image fetch failed for query '{fallback_query}': {e}") + + logger.error(f"All image fetch attempts failed for query '{search_query}'. Returning None.") + return None, None, None, None + +def generate_image_query(title: str, summary: str) -> Tuple[str, List[str], bool]: + """Generate an image search query with improved error handling.""" + try: + prompt = ( + "Given the following article title and summary, generate a concise image search query (max 5 words) to find a relevant image. " + "Also provide a list of relevance keywords (max 5 words) that should be associated with the image. " + "Return the result as a JSON object with 'search' and 'relevance' keys.\n\n" + f"Title: {title}\n\n" + f"Summary: {summary}\n\n" + "Example output:\n" + "```json\n" + "{\"search\": \"Italian cuisine trends\", \"relevance\": \"pasta wine dining culture\"}\n" + "```" + ) + response = client.chat.completions.create( + model=LIGHT_TASK_MODEL, + messages=[ + {"role": "system", "content": prompt}, + {"role": "user", "content": "Generate an image search query and relevance keywords."} + ], + max_tokens=100, + temperature=0.5 + ) + raw_response = response.choices[0].message.content + json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response) + if not json_match: + logger.warning(f"Failed to parse image query JSON: {raw_response}") + return title, [], True + + query_data = json.loads(json_match.group(1)) + search_query = query_data.get("search", title) + relevance_keywords = query_data.get("relevance", "").split() + + # Log the JSON object in a single line + log_json = json.dumps(query_data).replace('\n', ' ').replace('\r', ' ') + logger.debug(f"Image query from content: {log_json}") + + return search_query, relevance_keywords, False + except Exception as e: + logger.warning(f"Image query generation failed: {e}. Using title as fallback.") + return title, [], True + +def smart_image_and_filter(title: str, summary: str) -> Tuple[str, List[str], bool]: + """Smart image filtering with improved error handling.""" + try: + content = f"{title}\n\n{summary}" + + prompt = ( + "Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) " + "for an image search about food industry trends or viral content. Prioritize specific terms if present, " + "otherwise focus on the main theme. " + "Return 'SKIP' if the article is about home appliances, recipes, promotions, or contains 'homemade', else 'KEEP'. " + "Return as JSON with double quotes for all property names and string values (e.g., {\"image_query\": \"specific term\", \"relevance\": [\"keyword1\", \"keyword2\"], \"action\": \"KEEP\" or \"SKIP\"})." + ) + + response = client.chat.completions.create( + model=LIGHT_TASK_MODEL, + messages=[ + {"role": "system", "content": prompt}, + {"role": "user", "content": content} + ], + max_tokens=100 + ) + raw_result = response.choices[0].message.content.strip() + logger.info(f"Raw GPT smart image/filter response: '{raw_result}'") + + # Remove ```json markers and fix single quotes in JSON structure + cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip() + # Replace single quotes with double quotes, but preserve single quotes within string values + fixed_result = re.sub(r"(? 1: - classifications = classify_keywords(keywords) - logging.info(f"Keyword classifications: {classifications}") +def search_ddg_for_flickr(query): + ddg_query = f"{query} site:flickr.com" + ddg_url = f"https://duckduckgo.com/?q={quote(ddg_query)}" + try: + response = requests.get(ddg_url, headers={'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}, timeout=10) + response.raise_for_status() + soup = BeautifulSoup(response.text, 'html.parser') - # Prioritize specific keywords - specific_keywords = [kw for kw, classification in classifications.items() if classification == "specific"] - if specific_keywords: - for keyword in specific_keywords: - logging.info(f"Searching Flickr with specific keyword: '{keyword}'") - photos = search_flickr(keyword) - for photo in photos: - result = process_photo(photo, search_query) - if result: - return result - - # Step 3: Final fallback using relevance keywords - fallback_query = " ".join(relevance_keywords) if isinstance(relevance_keywords, list) else relevance_keywords - logging.info(f"No results found. Falling back to generic query: '{fallback_query}'") - photos = search_flickr(fallback_query) - for photo in photos: - result = process_photo(photo, search_query) - if result: - return result - - logging.warning(f"No valid Flickr image found for query '{search_query}' after all attempts.") - return None, None, None, None - -def select_best_author(summary): - try: - response = client.chat.completions.create( - model=LIGHT_TASK_MODEL, - messages=[ - {"role": "system", "content": ( - "Based on this restaurant/food industry trend summary, pick the most suitable author from: " - "owenjohnson, javiermorales, aishapatel, trangnguyen, keishareid, lilamoreau. " - "Consider their expertise: owenjohnson (global dining trends), javiermorales (food critique), " - "aishapatel (emerging food trends), trangnguyen (cultural dining), keishareid (soul food heritage), " - "lilamoreau (global street food). Return only the username." - )}, - {"role": "user", "content": summary} - ], - max_tokens=20 - ) - author = response.choices[0].message.content.strip() - valid_authors = ["owenjohnson", "javiermorales", "aishapatel", "trangnguyen", "keishareid", "lilamoreau"] - logging.info(f"Selected author: {author}") - return author if author in valid_authors else "owenjohnson" + photo_ids = set() + for link in soup.find_all('a', href=True): + href = link['href'] + match = re.search(r'flickr\.com/photos/[^/]+/(\d+)', href) + if match: + photo_id = match.group(1) + photo_ids.add(photo_id) + + photo_ids = list(photo_ids)[:2] # Limit to 2 IDs + logger.info(f"Found {len(photo_ids)} Flickr photo IDs via DDG: {photo_ids}") + return photo_ids except Exception as e: - logging.error(f"Author selection failed: {e}") - return "owenjohnson" - -def prepare_post_data(final_summary, original_title, context_info=""): - innovative_title = generate_title_from_summary(final_summary) - if not innovative_title: - logging.info(f"Title generation failed for '{original_title}' {context_info}") - return None, None, None, None, None, None, None - - # Pass innovative_title and final_summary as separate arguments - search_query, relevance_keywords, _ = generate_image_query(innovative_title, final_summary) - if not search_query: - logging.info(f"Image query generation failed for '{innovative_title}' {context_info}") - return None, None, None, None, None, None, None - - logging.info(f"Fetching Flickr image for query: '{search_query}' {context_info}") - image_url, image_source, uploader, page_url = get_flickr_image(search_query, relevance_keywords) - - if not image_url: - logging.info(f"Flickr fetch failed for '{search_query}' - falling back to Pixabay {context_info}") - # Use the same title and summary for fallback - image_query, _, _ = generate_image_query(innovative_title, final_summary) - image_url, image_source, uploader, page_url = get_image(image_query) - if not image_url: - logging.info(f"Pixabay fetch failed for title '{innovative_title}' - falling back to summary {context_info}") - image_query, _, _ = generate_image_query(final_summary, final_summary) # Using summary as both title and summary for fallback - image_url, image_source, uploader, page_url = get_image(image_query) - if not image_url: - logging.info(f"Image fetch failed again for '{original_title}' - proceeding without image {context_info}") - - post_data = {"title": innovative_title, "content": final_summary} - selected_username = select_best_author(final_summary) - author = next((a for a in AUTHORS if a["username"] == selected_username), None) - if not author: - logging.error(f"Author '{selected_username}' not found in AUTHORS, defaulting to owenjohnson") - author = {"username": "owenjohnson", "password": "rfjk xhn6 2RPy FuQ9 cGlU K8mC"} - category = generate_category_from_summary(final_summary) - - return post_data, author, category, image_url, image_source, uploader, page_url - -def save_post_to_recent(post_title, post_url, author_username, timestamp): - try: - recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json') - entry = { - "title": post_title, - "url": post_url, - "author_username": author_username, - "timestamp": timestamp - } - recent_posts.append(entry) - with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f: - for item in recent_posts: - json.dump(item, f) - f.write('\n') - logging.info(f"Saved post '{post_title}' to recent_posts.json") - except Exception as e: - logging.error(f"Failed to save post to recent_posts.json: {e}") - -def prune_recent_posts(): - try: - cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat() - recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json') - recent_posts = [entry for entry in recent_posts if entry["timestamp"] > cutoff] - with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f: - for item in recent_posts: - json.dump(item, f) - f.write('\n') - logging.info(f"Pruned recent_posts.json to {len(recent_posts)} entries") - except Exception as e: - logging.error(f"Failed to prune recent_posts.json: {e}") \ No newline at end of file + logger.warning(f"DDG search failed for query '{ddg_query}': {e}") + return set() \ No newline at end of file