Files
foodie-automator/foodie_automator_google.py
T
2025-05-03 16:23:06 +10:00

284 lines
12 KiB
Python

# foodie_automator_google.py
import requests
import random
import time
import logging
import re
import os
import json
import signal
import sys
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Tuple
from openai import OpenAI
from urllib.parse import quote
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, WebDriverException
from duckduckgo_search import DDGS
from foodie_config import (
AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS,
PERSONA_CONFIGS, CATEGORIES, get_clean_source_name, X_API_CREDENTIALS,
FILE_PATHS, EXPIRATION_DAYS, IMAGE_EXPIRATION_DAYS
)
from foodie_utils import (
load_json_file, save_json_file, get_image, generate_image_query,
upload_image_to_wp, select_best_persona, determine_paragraph_count,
is_interesting, generate_title_from_summary, summarize_with_gpt4o,
generate_category_from_summary, post_to_wp, prepare_post_data,
smart_image_and_filter, insert_link_naturally, get_flickr_image
)
from foodie_hooks import get_dynamic_hook, get_viral_share_prompt
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Global state
is_posting = False
logger = logging.getLogger(__name__)
class GoogleTrendsScraper:
def __init__(self):
self.driver = None
self.setup_logging()
self.setup_signal_handlers()
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.posted_titles = self.load_posted_titles()
self.used_images = self.load_used_images()
def setup_logging(self) -> None:
"""Configure logging for the scraper."""
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(FILE_PATHS["posted_google_titles"].with_suffix('.log'), mode='a')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(console_handler)
logger.info("Logging initialized for Google Trends scraper")
def setup_signal_handlers(self) -> None:
"""Set up signal handlers for graceful shutdown."""
def signal_handler(sig, frame):
logger.info("Received termination signal, checking if safe to exit...")
if is_posting:
logger.info("Currently posting, will exit after completion.")
else:
logger.info("Safe to exit immediately.")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def load_posted_titles(self) -> set:
"""Load and return the set of posted titles."""
try:
data = load_json_file(FILE_PATHS["posted_google_titles"], EXPIRATION_DAYS)
return {entry["title"] for entry in data}
except Exception as e:
logger.error(f"Error loading posted titles: {e}")
return set()
def load_used_images(self) -> set:
"""Load and return the set of used images."""
try:
data = load_json_file(FILE_PATHS["used_images"], IMAGE_EXPIRATION_DAYS)
return {entry["title"] for entry in data if "title" in entry}
except Exception as e:
logger.error(f"Error loading used images: {e}")
return set()
def parse_search_volume(self, volume_text: str) -> float:
"""Parse search volume from text into a numeric value."""
try:
volume_part = volume_text.split('\n')[0].lower().strip().replace('+', '')
if 'k' in volume_part:
return float(volume_part.replace('k', '')) * 1000
elif 'm' in volume_part:
return float(volume_part.replace('m', '')) * 1000000
return float(volume_part)
except (ValueError, AttributeError) as e:
logger.warning(f"Could not parse search volume from '{volume_text}': {e}")
return 0.0
def setup_driver(self) -> None:
"""Set up the Chrome WebDriver with appropriate options."""
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/125.0.0.0 Safari/537.36")
self.driver = webdriver.Chrome(options=chrome_options)
def scrape_google_trends(self, geo: str = 'US') -> List[Dict]:
"""Scrape Google Trends for the specified region."""
if not self.driver:
self.setup_driver()
trends = []
try:
for attempt in range(3):
try:
time.sleep(random.uniform(2, 5))
url = f"https://trends.google.com/trending?geo={geo}&hours=24&sort=search-volume&category=5"
logger.info(f"Navigating to {url} (attempt {attempt + 1})")
self.driver.get(url)
logger.info("Waiting for page to load...")
WebDriverWait(self.driver, 60).until(
EC.presence_of_element_located((By.TAG_NAME, "tbody"))
)
break
except TimeoutException:
logger.warning(f"Timeout on attempt {attempt + 1} for geo={geo}")
if attempt == 2:
logger.error(f"Failed after 3 attempts for geo={geo}")
return []
time.sleep(5)
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
rows = self.driver.find_elements(By.XPATH, "//tbody/tr")
logger.info(f"Found {len(rows)} rows in tbody for geo={geo}")
for row in rows:
try:
columns = row.find_elements(By.TAG_NAME, "td")
if len(columns) >= 3:
title = columns[1].text.strip()
search_volume = self.parse_search_volume(columns[2].text.strip())
if title and search_volume >= 20000:
link = f"https://trends.google.com/trends/explore?q={quote(title)}&geo={geo}"
trends.append({
"title": title,
"link": link,
"search_volume": search_volume
})
logger.info(f"Added trend: {title} with search volume: {search_volume}")
except Exception as e:
logger.warning(f"Row processing error: {e}")
continue
if trends:
trends.sort(key=lambda x: x["search_volume"], reverse=True)
logger.info(f"Extracted {len(trends)} trends for geo={geo}")
else:
logger.warning(f"No valid trends found with search volume >= 20K for geo={geo}")
except WebDriverException as e:
logger.error(f"WebDriver error: {e}")
finally:
if self.driver:
self.driver.quit()
self.driver = None
logger.info(f"Chrome driver closed for geo={geo}")
return trends
def fetch_duckduckgo_news_context(self, trend_title: str, hours: int = 24) -> str:
"""Fetch news context for a trend from DuckDuckGo."""
try:
with DDGS() as ddgs:
results = ddgs.news(f"{trend_title} news", timelimit="d", max_results=5)
titles = []
for r in results:
try:
date_str = r["date"]
dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=timezone.utc) if '+00:00' in date_str else datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
if dt > (datetime.now(timezone.utc) - timedelta(hours=hours)):
titles.append(r["title"].lower())
except ValueError as e:
logger.warning(f"Date parsing failed for '{date_str}': {e}")
continue
context = " ".join(titles) if titles else "No recent news found within 24 hours"
logger.info(f"DuckDuckGo News context for '{trend_title}': {context}")
return context
except Exception as e:
logger.warning(f"DuckDuckGo News context fetch failed for '{trend_title}': {e}")
return trend_title
def curate_from_google_trends(self, geo_list: List[str] = ['US']) -> Tuple[Optional[Dict], Optional[str], int]:
"""Curate content from Google Trends for multiple regions."""
all_trends = []
for geo in geo_list:
trends = self.scrape_google_trends(geo=geo)
if trends:
all_trends.extend(trends)
if not all_trends:
logger.info("No Google Trends data available")
return None, None, random.randint(600, 1800)
for trend in all_trends:
title = trend["title"]
if title in self.posted_titles:
logger.info(f"Skipping already posted trend: {title}")
continue
logger.info(f"Processing Google Trend: {title}")
image_query, relevance_keywords, skip = smart_image_and_filter(title, trend.get("summary", ""))
if skip:
logger.info(f"Skipping filtered Google Trend: {title}")
continue
scoring_content = f"{title}\n\n{trend.get('summary', '')}"
interest_score = is_interesting(scoring_content)
if interest_score < 6:
logger.info(f"Google Trends Interest Too Low: {interest_score}")
continue
num_paragraphs = determine_paragraph_count(interest_score)
extra_prompt = (
f"Generate exactly {num_paragraphs} paragraphs.\n"
f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n"
f"Do NOT introduce unrelated concepts.\n"
f"Expand on the core idea with relevant context about its appeal or significance in food trends.\n"
f"Do not include emojis in the summary."
)
final_summary = summarize_with_gpt4o(
scoring_content,
"Google Trends",
trend["link"],
interest_score=interest_score,
extra_prompt=extra_prompt
)
if not final_summary:
logger.info(f"Summary failed for '{title}'")
continue
final_summary = insert_link_naturally(final_summary, "Google Trends", trend["link"])
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
if post_data and author:
return post_data, author, random.randint(600, 1800)
return None, None, random.randint(600, 1800)
def run_google_trends_automator():
"""Main function to run the Google Trends automator."""
scraper = GoogleTrendsScraper()
while True:
try:
post_data, author, sleep_time = scraper.curate_from_google_trends()
if post_data and author:
global is_posting
is_posting = True
try:
post_to_wp(post_data, author)
logger.info(f"Successfully posted: {post_data['title']}")
finally:
is_posting = False
time.sleep(sleep_time)
except Exception as e:
logger.error(f"Error in Google Trends automator: {e}")
time.sleep(300) # Wait 5 minutes before retrying
if __name__ == "__main__":
run_google_trends_automator()