284 lines
12 KiB
Python
284 lines
12 KiB
Python
# foodie_automator_google.py
|
|
import requests
|
|
import random
|
|
import time
|
|
import logging
|
|
import re
|
|
import os
|
|
import json
|
|
import signal
|
|
import sys
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import List, Dict, Optional, Tuple
|
|
from openai import OpenAI
|
|
from urllib.parse import quote
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.common.exceptions import TimeoutException, WebDriverException
|
|
from duckduckgo_search import DDGS
|
|
from foodie_config import (
|
|
AUTHORS, RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS,
|
|
PERSONA_CONFIGS, CATEGORIES, get_clean_source_name, X_API_CREDENTIALS,
|
|
FILE_PATHS, EXPIRATION_DAYS, IMAGE_EXPIRATION_DAYS
|
|
)
|
|
from foodie_utils import (
|
|
load_json_file, save_json_file, get_image, generate_image_query,
|
|
upload_image_to_wp, select_best_persona, determine_paragraph_count,
|
|
is_interesting, generate_title_from_summary, summarize_with_gpt4o,
|
|
generate_category_from_summary, post_to_wp, prepare_post_data,
|
|
smart_image_and_filter, insert_link_naturally, get_flickr_image
|
|
)
|
|
from foodie_hooks import get_dynamic_hook, get_viral_share_prompt
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Global state
|
|
is_posting = False
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class GoogleTrendsScraper:
|
|
def __init__(self):
|
|
self.driver = None
|
|
self.setup_logging()
|
|
self.setup_signal_handlers()
|
|
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
|
self.posted_titles = self.load_posted_titles()
|
|
self.used_images = self.load_used_images()
|
|
|
|
def setup_logging(self) -> None:
|
|
"""Configure logging for the scraper."""
|
|
logger.setLevel(logging.INFO)
|
|
file_handler = logging.FileHandler(FILE_PATHS["posted_google_titles"].with_suffix('.log'), mode='a')
|
|
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
logger.addHandler(file_handler)
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
logger.addHandler(console_handler)
|
|
logger.info("Logging initialized for Google Trends scraper")
|
|
|
|
def setup_signal_handlers(self) -> None:
|
|
"""Set up signal handlers for graceful shutdown."""
|
|
def signal_handler(sig, frame):
|
|
logger.info("Received termination signal, checking if safe to exit...")
|
|
if is_posting:
|
|
logger.info("Currently posting, will exit after completion.")
|
|
else:
|
|
logger.info("Safe to exit immediately.")
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
def load_posted_titles(self) -> set:
|
|
"""Load and return the set of posted titles."""
|
|
try:
|
|
data = load_json_file(FILE_PATHS["posted_google_titles"], EXPIRATION_DAYS)
|
|
return {entry["title"] for entry in data}
|
|
except Exception as e:
|
|
logger.error(f"Error loading posted titles: {e}")
|
|
return set()
|
|
|
|
def load_used_images(self) -> set:
|
|
"""Load and return the set of used images."""
|
|
try:
|
|
data = load_json_file(FILE_PATHS["used_images"], IMAGE_EXPIRATION_DAYS)
|
|
return {entry["url"] for entry in data if "url" in entry}
|
|
except Exception as e:
|
|
logger.error(f"Error loading used images: {e}")
|
|
return set()
|
|
|
|
def parse_search_volume(self, volume_text: str) -> float:
|
|
"""Parse search volume from text into a numeric value."""
|
|
try:
|
|
volume_part = volume_text.split('\n')[0].lower().strip().replace('+', '')
|
|
if 'k' in volume_part:
|
|
return float(volume_part.replace('k', '')) * 1000
|
|
elif 'm' in volume_part:
|
|
return float(volume_part.replace('m', '')) * 1000000
|
|
return float(volume_part)
|
|
except (ValueError, AttributeError) as e:
|
|
logger.warning(f"Could not parse search volume from '{volume_text}': {e}")
|
|
return 0.0
|
|
|
|
def setup_driver(self) -> None:
|
|
"""Set up the Chrome WebDriver with appropriate options."""
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/125.0.0.0 Safari/537.36")
|
|
self.driver = webdriver.Chrome(options=chrome_options)
|
|
|
|
def scrape_google_trends(self, geo: str = 'US') -> List[Dict]:
|
|
"""Scrape Google Trends for the specified region."""
|
|
if not self.driver:
|
|
self.setup_driver()
|
|
|
|
trends = []
|
|
try:
|
|
for attempt in range(3):
|
|
try:
|
|
time.sleep(random.uniform(2, 5))
|
|
url = f"https://trends.google.com/trending?geo={geo}&hours=24&sort=search-volume&category=5"
|
|
logger.info(f"Navigating to {url} (attempt {attempt + 1})")
|
|
self.driver.get(url)
|
|
|
|
logger.info("Waiting for page to load...")
|
|
WebDriverWait(self.driver, 60).until(
|
|
EC.presence_of_element_located((By.TAG_NAME, "tbody"))
|
|
)
|
|
break
|
|
except TimeoutException:
|
|
logger.warning(f"Timeout on attempt {attempt + 1} for geo={geo}")
|
|
if attempt == 2:
|
|
logger.error(f"Failed after 3 attempts for geo={geo}")
|
|
return []
|
|
time.sleep(5)
|
|
|
|
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
|
|
time.sleep(2)
|
|
|
|
rows = self.driver.find_elements(By.XPATH, "//tbody/tr")
|
|
logger.info(f"Found {len(rows)} rows in tbody for geo={geo}")
|
|
|
|
for row in rows:
|
|
try:
|
|
columns = row.find_elements(By.TAG_NAME, "td")
|
|
if len(columns) >= 3:
|
|
title = columns[1].text.strip()
|
|
search_volume = self.parse_search_volume(columns[2].text.strip())
|
|
if title and search_volume >= 20000:
|
|
link = f"https://trends.google.com/trends/explore?q={quote(title)}&geo={geo}"
|
|
trends.append({
|
|
"title": title,
|
|
"link": link,
|
|
"search_volume": search_volume
|
|
})
|
|
logger.info(f"Added trend: {title} with search volume: {search_volume}")
|
|
except Exception as e:
|
|
logger.warning(f"Row processing error: {e}")
|
|
continue
|
|
|
|
if trends:
|
|
trends.sort(key=lambda x: x["search_volume"], reverse=True)
|
|
logger.info(f"Extracted {len(trends)} trends for geo={geo}")
|
|
else:
|
|
logger.warning(f"No valid trends found with search volume >= 20K for geo={geo}")
|
|
|
|
except WebDriverException as e:
|
|
logger.error(f"WebDriver error: {e}")
|
|
finally:
|
|
if self.driver:
|
|
self.driver.quit()
|
|
self.driver = None
|
|
logger.info(f"Chrome driver closed for geo={geo}")
|
|
|
|
return trends
|
|
|
|
def fetch_duckduckgo_news_context(self, trend_title: str, hours: int = 24) -> str:
|
|
"""Fetch news context for a trend from DuckDuckGo."""
|
|
try:
|
|
with DDGS() as ddgs:
|
|
results = ddgs.news(f"{trend_title} news", timelimit="d", max_results=5)
|
|
titles = []
|
|
for r in results:
|
|
try:
|
|
date_str = r["date"]
|
|
dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=timezone.utc) if '+00:00' in date_str else datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
|
|
if dt > (datetime.now(timezone.utc) - timedelta(hours=hours)):
|
|
titles.append(r["title"].lower())
|
|
except ValueError as e:
|
|
logger.warning(f"Date parsing failed for '{date_str}': {e}")
|
|
continue
|
|
context = " ".join(titles) if titles else "No recent news found within 24 hours"
|
|
logger.info(f"DuckDuckGo News context for '{trend_title}': {context}")
|
|
return context
|
|
except Exception as e:
|
|
logger.warning(f"DuckDuckGo News context fetch failed for '{trend_title}': {e}")
|
|
return trend_title
|
|
|
|
def curate_from_google_trends(self, geo_list: List[str] = ['US']) -> Tuple[Optional[Dict], Optional[str], int]:
|
|
"""Curate content from Google Trends for multiple regions."""
|
|
all_trends = []
|
|
for geo in geo_list:
|
|
trends = self.scrape_google_trends(geo=geo)
|
|
if trends:
|
|
all_trends.extend(trends)
|
|
|
|
if not all_trends:
|
|
logger.info("No Google Trends data available")
|
|
return None, None, random.randint(600, 1800)
|
|
|
|
for trend in all_trends:
|
|
title = trend["title"]
|
|
if title in self.posted_titles:
|
|
logger.info(f"Skipping already posted trend: {title}")
|
|
continue
|
|
|
|
logger.info(f"Processing Google Trend: {title}")
|
|
image_query, relevance_keywords, skip = smart_image_and_filter(title, trend.get("summary", ""))
|
|
if skip:
|
|
logger.info(f"Skipping filtered Google Trend: {title}")
|
|
continue
|
|
|
|
scoring_content = f"{title}\n\n{trend.get('summary', '')}"
|
|
interest_score = is_interesting(scoring_content)
|
|
if interest_score < 6:
|
|
logger.info(f"Google Trends Interest Too Low: {interest_score}")
|
|
continue
|
|
|
|
num_paragraphs = determine_paragraph_count(interest_score)
|
|
extra_prompt = (
|
|
f"Generate exactly {num_paragraphs} paragraphs.\n"
|
|
f"FOCUS: Summarize ONLY the provided content, explicitly mentioning '{title}' and sticking to its specific topic and details.\n"
|
|
f"Do NOT introduce unrelated concepts.\n"
|
|
f"Expand on the core idea with relevant context about its appeal or significance in food trends.\n"
|
|
f"Do not include emojis in the summary."
|
|
)
|
|
|
|
final_summary = summarize_with_gpt4o(
|
|
scoring_content,
|
|
"Google Trends",
|
|
trend["link"],
|
|
interest_score=interest_score,
|
|
extra_prompt=extra_prompt
|
|
)
|
|
|
|
if not final_summary:
|
|
logger.info(f"Summary failed for '{title}'")
|
|
continue
|
|
|
|
final_summary = insert_link_naturally(final_summary, "Google Trends", trend["link"])
|
|
post_data, author, category, image_url, image_source, uploader, pixabay_url = prepare_post_data(final_summary, title)
|
|
|
|
if post_data and author:
|
|
return post_data, author, random.randint(600, 1800)
|
|
|
|
return None, None, random.randint(600, 1800)
|
|
|
|
def run_google_trends_automator():
|
|
"""Main function to run the Google Trends automator."""
|
|
scraper = GoogleTrendsScraper()
|
|
while True:
|
|
try:
|
|
post_data, author, sleep_time = scraper.curate_from_google_trends()
|
|
if post_data and author:
|
|
global is_posting
|
|
is_posting = True
|
|
try:
|
|
post_to_wp(post_data, author)
|
|
logger.info(f"Successfully posted: {post_data['title']}")
|
|
finally:
|
|
is_posting = False
|
|
time.sleep(sleep_time)
|
|
except Exception as e:
|
|
logger.error(f"Error in Google Trends automator: {e}")
|
|
time.sleep(300) # Wait 5 minutes before retrying
|
|
|
|
if __name__ == "__main__":
|
|
run_google_trends_automator() |