1228 lines
52 KiB
Python
1228 lines
52 KiB
Python
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
from PIL import Image
|
|
import pytesseract
|
|
import io
|
|
import tempfile
|
|
import requests
|
|
import time
|
|
import openai
|
|
from dotenv import load_dotenv
|
|
from datetime import datetime, timezone, timedelta
|
|
from openai import OpenAI
|
|
from urllib.parse import quote
|
|
from bs4 import BeautifulSoup
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.packages.urllib3.util.retry import Retry
|
|
import tweepy
|
|
import flickr_api
|
|
from foodie_config import (
|
|
RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS,
|
|
get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL, X_API_CREDENTIALS,
|
|
FLICKR_API_KEY, FLICKR_API_SECRET, PIXABAY_API_KEY
|
|
)
|
|
from typing import List, Dict, Any, Optional, Union, Tuple
|
|
from pathlib import Path
|
|
from functools import lru_cache
|
|
import hashlib
|
|
from rate_limiter import RateLimiter
|
|
from wordpress_xmlrpc import Client
|
|
from wordpress_xmlrpc.methods.media import UploadFile
|
|
from wordpress_xmlrpc.methods.posts import NewPost
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('foodie_automator.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
|
|
|
# Initialize global variables
|
|
used_images = set()
|
|
pixabay_rate_limiter = RateLimiter(max_requests=100, time_window=3600) # 100 requests per hour
|
|
flickr_rate_limiter = RateLimiter(max_requests=3600, time_window=3600) # 3600 requests per hour
|
|
|
|
# Add file paths
|
|
FILE_PATHS = {
|
|
"posted_rss_titles": "/home/shane/foodie_automator/posted_rss_titles.json",
|
|
"posted_reddit_titles": "/home/shane/foodie_automator/posted_reddit_titles.json",
|
|
"used_images": "/home/shane/foodie_automator/used_images.json",
|
|
"recent_posts": "/home/shane/foodie_automator/recent_posts.json",
|
|
"x_post_counts": "/home/shane/foodie_automator/x_post_counts.json"
|
|
}
|
|
|
|
USED_IMAGES_FILE = FILE_PATHS["used_images"]
|
|
|
|
def validate_json_entry(entry: Dict[str, Any]) -> bool:
|
|
"""Validate the structure of a JSON entry."""
|
|
required_fields = {"title", "timestamp"}
|
|
return (
|
|
isinstance(entry, dict) and
|
|
all(field in entry for field in required_fields) and
|
|
isinstance(entry["title"], str) and
|
|
isinstance(entry["timestamp"], str)
|
|
)
|
|
|
|
def load_json_file(file_path: Union[str, Path], expiration_hours: int) -> List[Dict[str, Any]]:
|
|
"""
|
|
Load and validate JSON entries from a file, filtering by expiration time.
|
|
|
|
Args:
|
|
file_path: Path to the JSON file
|
|
expiration_hours: Number of hours before entries expire
|
|
|
|
Returns:
|
|
List of valid entries that haven't expired
|
|
"""
|
|
entries: List[Dict[str, Any]] = []
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=expiration_hours)
|
|
|
|
if not isinstance(file_path, Path):
|
|
file_path = Path(file_path)
|
|
|
|
if not file_path.exists():
|
|
logger.info(f"File {file_path} does not exist, returning empty list")
|
|
return entries
|
|
|
|
try:
|
|
with file_path.open('r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
try:
|
|
entry = json.loads(line.strip())
|
|
if not validate_json_entry(entry):
|
|
logger.warning(f"Skipping malformed entry in {file_path} at line {i}: {line.strip()}")
|
|
continue
|
|
|
|
timestamp = datetime.fromisoformat(entry["timestamp"])
|
|
if timestamp > cutoff:
|
|
entries.append(entry)
|
|
else:
|
|
logger.debug(f"Entry expired in {file_path}: {entry['title']}")
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Skipping invalid JSON line in {file_path} at line {i}: {e}")
|
|
continue
|
|
except Exception as e:
|
|
logger.warning(f"Skipping malformed entry in {file_path} at line {i}: {line.strip()}")
|
|
continue
|
|
|
|
logger.info(f"Loaded {len(entries)} entries from {file_path}, {len(entries)} valid after expiration check")
|
|
return entries
|
|
except Exception as e:
|
|
logger.error(f"Failed to load {file_path}: {e}")
|
|
return entries
|
|
|
|
def save_json_file(file_path, title, timestamp):
|
|
try:
|
|
entries = load_json_file(file_path, 24 if "posted_" in file_path else 7 * 24) # 24 hours for titles, 7 days for images
|
|
entry = {"title": title, "timestamp": timestamp}
|
|
entries.append(entry)
|
|
|
|
# Prune entries older than expiration period
|
|
expiration_hours = 24 if "posted_" in file_path else 7 * 24
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=expiration_hours)
|
|
pruned_entries = [e for e in entries if datetime.fromisoformat(e["timestamp"]) > cutoff]
|
|
|
|
with open(file_path, 'w') as f:
|
|
for entry in pruned_entries:
|
|
f.write(json.dumps(entry) + '\n')
|
|
|
|
logger.info(f"Saved '{title}' to {file_path}")
|
|
logger.info(f"Pruned {file_path} to {len(pruned_entries)} entries (older than {expiration_hours//24} days removed)")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save to {file_path}: {e}")
|
|
|
|
def load_post_counts():
|
|
counts = []
|
|
filename = FILE_PATHS["x_post_counts"]
|
|
if os.path.exists(filename):
|
|
try:
|
|
with open(filename, 'r') as f:
|
|
lines = f.readlines()
|
|
for i, line in enumerate(lines, 1):
|
|
if line.strip():
|
|
try:
|
|
entry = json.loads(line.strip())
|
|
# Check for expected fields in x_post_counts.json
|
|
if not isinstance(entry, dict) or "username" not in entry or "month" not in entry or "monthly_count" not in entry or "day" not in entry or "daily_count" not in entry:
|
|
logger.warning(f"Skipping malformed entry in {filename} at line {i}: {entry}")
|
|
continue
|
|
counts.append(entry)
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Skipping invalid JSON line in {filename} at line {i}: {e}")
|
|
logger.info(f"Loaded {len(counts)} entries from {filename}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load {filename}: {e}")
|
|
counts = [] # Reset to empty on failure
|
|
|
|
if not counts:
|
|
counts = [{
|
|
"username": author["username"],
|
|
"month": datetime.now(timezone.utc).strftime("%Y-%m"),
|
|
"monthly_count": 0,
|
|
"day": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
|
|
"daily_count": 0
|
|
} for author in AUTHORS]
|
|
|
|
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
|
|
current_day = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
for entry in counts:
|
|
if entry["month"] != current_month:
|
|
entry["month"] = current_month
|
|
entry["monthly_count"] = 0
|
|
if entry["day"] != current_day:
|
|
entry["day"] = current_day
|
|
entry["daily_count"] = 0
|
|
return counts
|
|
|
|
def save_post_counts(counts):
|
|
with open(FILE_PATHS["x_post_counts"], 'w') as f:
|
|
for item in counts:
|
|
json.dump(item, f)
|
|
f.write('\n')
|
|
logger.info("Saved post counts to x_post_counts.json")
|
|
|
|
import re
|
|
|
|
def generate_article_tweet(author, post, persona):
|
|
title = post["title"]
|
|
url = post["url"]
|
|
author_handle = f"@{author['username']}"
|
|
|
|
prompt = (
|
|
f"Craft a sharp tweet (under 230 characters) for {author_handle} with the voice of '{persona}'. "
|
|
f"Distill the essence of the article '{title}' into a concise, engaging message. "
|
|
f"Include the raw URL '{url}' at the end. "
|
|
f"Do not wrap the tweet in quotation marks. "
|
|
f"Make it bold, spark curiosity, and invite engagement with a human touch. "
|
|
f"Swap 'elevate' for dynamic terms like 'ignite' or 'unleash'. "
|
|
f"Absolutely do not include hashtags, emojis, or phrases like '[Read more]' or 'Read more'. "
|
|
f"Skip any extra fluff or formatting around the URL—just append the raw URL after a space. "
|
|
f"Example: 'Love food trends? Check this out! {url}'"
|
|
)
|
|
|
|
response = client.chat.completions.create(
|
|
model=SUMMARY_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": "You are a social media viral expert crafting engaging tweets."},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
max_tokens=80,
|
|
temperature=0.7
|
|
)
|
|
|
|
tweet = response.choices[0].message.content.strip()
|
|
|
|
# Post-generation check: Strip any emojis using regex
|
|
tweet = re.sub(r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F700-\U0001F77F\U0001F780-\U0001F7FF\U0001F800-\U0001F8FF\U0001F900-\U0001F9FF\U0001FA00-\U0001FA6F\U0001FA70-\U0001FAFF\U00002702-\U000027B0\U000024C2-\U0001F251]', '', tweet).strip()
|
|
|
|
# Strip "[Read more]" or similar phrases as an additional failsafe
|
|
tweet = re.sub(r'\[Read more\]\(.*?\)|\bRead more\b', '', tweet).strip()
|
|
|
|
# Strip leading or trailing quotation marks
|
|
tweet = tweet.strip('"\'')
|
|
|
|
# Ensure tweet fits within 280 characters, accounting for URL (Twitter shortens to 23 chars)
|
|
url_length = 23
|
|
max_tweet_length = 280 - url_length - 1 # Subtract 1 for the space before URL
|
|
if len(tweet) > max_tweet_length:
|
|
tweet = tweet[:max_tweet_length-3] + "... " + url
|
|
|
|
logger.info(f"Generated tweet: {tweet}")
|
|
return tweet
|
|
|
|
def post_tweet(author, tweet):
|
|
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
|
|
if not credentials:
|
|
logger.error(f"No X credentials found for {author['username']}")
|
|
return False
|
|
|
|
post_counts = load_post_counts()
|
|
author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None)
|
|
if author_count["monthly_count"] >= 500:
|
|
logger.warning(f"Monthly post limit (500) reached for {author['username']}")
|
|
return False
|
|
if author_count["daily_count"] >= 20:
|
|
logger.warning(f"Daily post limit (20) reached for {author['username']}")
|
|
return False
|
|
|
|
try:
|
|
client = tweepy.Client(
|
|
consumer_key=credentials["api_key"],
|
|
consumer_secret=credentials["api_secret"],
|
|
access_token=credentials["access_token"],
|
|
access_token_secret=credentials["access_token_secret"]
|
|
)
|
|
response = client.create_tweet(text=tweet)
|
|
author_count["monthly_count"] += 1
|
|
author_count["daily_count"] += 1
|
|
save_post_counts(post_counts)
|
|
logger.info(f"Posted tweet for {author['username']}: {tweet}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to post tweet for {author['username']}: {e}")
|
|
return False
|
|
|
|
def select_best_persona(interest_score, content=""):
|
|
logger.info("Using select_best_persona with interest_score and content")
|
|
personas = ["Visionary Editor", "Foodie Critic", "Trend Scout", "Culture Connoisseur"]
|
|
content_lower = content.lower()
|
|
|
|
if any(kw in content_lower for kw in ["tech", "ai", "innovation", "sustainability"]):
|
|
return random.choice(["Trend Scout", "Visionary Editor"])
|
|
elif any(kw in content_lower for kw in ["review", "critic", "taste", "flavor"]):
|
|
return "Foodie Critic"
|
|
elif any(kw in content_lower for kw in ["culture", "tradition", "history"]):
|
|
return "Culture Connoisseur"
|
|
|
|
if interest_score >= 8:
|
|
return random.choice(personas[:2])
|
|
elif interest_score >= 6:
|
|
return random.choice(personas[2:])
|
|
return random.choice(personas)
|
|
|
|
# Add caching for API responses
|
|
@lru_cache(maxsize=100)
|
|
def get_cached_image_url(image_url: str) -> Optional[bytes]:
|
|
"""Cache image downloads to avoid repeated requests."""
|
|
try:
|
|
response = requests.get(image_url, timeout=10)
|
|
response.raise_for_status()
|
|
return response.content
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cache image {image_url}: {e}")
|
|
return None
|
|
|
|
def get_image_hash(image_content: bytes) -> str:
|
|
"""Generate a hash for image content."""
|
|
return hashlib.md5(image_content).hexdigest()
|
|
|
|
class WordPressAPI:
|
|
def __init__(self, base_url: str, username: str, password: str):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.auth_header = f"Basic {base64.b64encode(f'{username}:{password}'.encode()).decode()}"
|
|
self.headers = {
|
|
"Authorization": self.auth_header,
|
|
"Content-Type": "application/json"
|
|
}
|
|
self.rate_limiter = RateLimiter(max_requests=100, time_window=60)
|
|
logger.info(f"WordPress API configured for {base_url}")
|
|
|
|
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
|
|
"""Make a WordPress API request with rate limiting and retry logic."""
|
|
self.rate_limiter.wait_if_needed()
|
|
max_retries = 3
|
|
retry_delay = 2
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
response = requests.request(
|
|
method,
|
|
f"{self.base_url}/{endpoint}",
|
|
headers=self.headers,
|
|
**kwargs
|
|
)
|
|
|
|
if response.status_code == 429: # Rate limit
|
|
wait_time = retry_delay * (2 ** attempt)
|
|
logger.warning(f"Rate limit hit. Retrying after {wait_time}s (attempt {attempt+1}/{max_retries})")
|
|
time.sleep(wait_time)
|
|
continue
|
|
|
|
response.raise_for_status()
|
|
return response.json() if response.content else None
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if attempt == max_retries - 1:
|
|
logger.error(f"WordPress API request failed after {max_retries} attempts: {e}")
|
|
return None
|
|
time.sleep(retry_delay * (2 ** attempt))
|
|
|
|
return None
|
|
|
|
def upload_media(self, image_content: bytes, filename: str, caption: Optional[str] = None) -> Optional[int]:
|
|
"""Upload media to WordPress with improved error handling."""
|
|
try:
|
|
headers = {
|
|
"Authorization": self.auth_header,
|
|
"Content-Disposition": f"attachment; filename={filename}",
|
|
"Content-Type": "image/jpeg"
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{self.base_url}/media",
|
|
headers=headers,
|
|
data=image_content
|
|
)
|
|
response.raise_for_status()
|
|
|
|
media_id = response.json()["id"]
|
|
if caption:
|
|
self._make_request(
|
|
"POST",
|
|
f"media/{media_id}",
|
|
json={"caption": caption}
|
|
)
|
|
|
|
logger.info(f"Uploaded media '{filename}' (ID: {media_id})")
|
|
return media_id
|
|
except Exception as e:
|
|
logger.error(f"Media upload failed for '{filename}': {e}")
|
|
return None
|
|
|
|
def get_category_id(self, category_name: str) -> Optional[int]:
|
|
"""Get or create a WordPress category."""
|
|
try:
|
|
# Try to find existing category
|
|
response = self._make_request(
|
|
"GET",
|
|
"categories",
|
|
params={"search": category_name}
|
|
)
|
|
if response:
|
|
for cat in response:
|
|
if cat["name"].lower() == category_name.lower():
|
|
return cat["id"]
|
|
|
|
# Create new category if not found
|
|
response = self._make_request(
|
|
"POST",
|
|
"categories",
|
|
json={"name": category_name}
|
|
)
|
|
return response["id"] if response else None
|
|
except Exception as e:
|
|
logger.error(f"Failed to get/create category '{category_name}': {e}")
|
|
return None
|
|
|
|
def get_tag_id(self, tag_name: str) -> Optional[int]:
|
|
"""Get or create a WordPress tag."""
|
|
try:
|
|
response = self._make_request(
|
|
"GET",
|
|
"tags",
|
|
params={"search": tag_name}
|
|
)
|
|
if response:
|
|
for tag in response:
|
|
if tag["name"].lower() == tag_name.lower():
|
|
return tag["id"]
|
|
|
|
response = self._make_request(
|
|
"POST",
|
|
"tags",
|
|
json={"name": tag_name}
|
|
)
|
|
return response["id"] if response else None
|
|
except Exception as e:
|
|
logger.error(f"Failed to get/create tag '{tag_name}': {e}")
|
|
return None
|
|
|
|
# Initialize WordPress API
|
|
wp_api = WordPressAPI(
|
|
"https://insiderfoodie.com/wp-json/wp/v2",
|
|
os.getenv("WP_USERNAME", ""),
|
|
os.getenv("WP_PASSWORD", "")
|
|
)
|
|
|
|
def upload_image_to_wp(image_url: str, post_title: str, wp_base_url: str, wp_username: str, wp_password: str,
|
|
image_source: str = "Pixabay", uploader: Optional[str] = None, pixabay_url: Optional[str] = None) -> Optional[int]:
|
|
"""Upload an image to WordPress with improved error handling and caching."""
|
|
try:
|
|
safe_title = post_title.encode('ascii', 'ignore').decode('ascii').replace(' ', '_')[:50]
|
|
filename = f"{safe_title}.jpg"
|
|
|
|
# Try to get cached image content first
|
|
image_content = get_cached_image_url(image_url)
|
|
if not image_content:
|
|
# If not in cache, download with retry logic
|
|
for attempt in range(3):
|
|
try:
|
|
response = requests.get(image_url, timeout=10)
|
|
if response.status_code == 429:
|
|
wait_time = 10 * (2 ** attempt)
|
|
logger.warning(f"Rate limit hit for {image_url}. Retrying after {wait_time}s (attempt {attempt+1}/3).")
|
|
time.sleep(wait_time)
|
|
continue
|
|
response.raise_for_status()
|
|
image_content = response.content
|
|
break
|
|
except requests.exceptions.RequestException as e:
|
|
if attempt == 2:
|
|
logger.warning(f"Failed to download image after {attempt+1} attempts: {e}")
|
|
return None
|
|
time.sleep(2 ** attempt)
|
|
|
|
if not image_content:
|
|
logger.error(f"Failed to get image content for {image_url}")
|
|
return None
|
|
|
|
# Create caption with attribution
|
|
caption = f'<a href="{pixabay_url}">{image_source}</a> by {uploader}' if pixabay_url and uploader else image_source
|
|
|
|
# Upload to WordPress using the API class
|
|
media_id = wp_api.upload_media(image_content, filename, caption)
|
|
if not media_id:
|
|
logger.error(f"Failed to upload image '{filename}' to WordPress")
|
|
return None
|
|
|
|
logger.info(f"Successfully uploaded image '{filename}' to WordPress (ID: {media_id})")
|
|
return media_id
|
|
except Exception as e:
|
|
logger.error(f"Image upload to WP failed for '{post_title}': {e}")
|
|
return None
|
|
|
|
def post_to_wp(
|
|
post_data: Dict[str, Any],
|
|
category: str,
|
|
link: str,
|
|
author: Dict[str, str],
|
|
image_url: Optional[str] = None,
|
|
original_source: Optional[str] = None,
|
|
image_source: Optional[str] = None,
|
|
uploader: Optional[str] = None,
|
|
pixabay_url: Optional[str] = None,
|
|
interest_score: Optional[int] = None
|
|
) -> Tuple[Optional[int], Optional[str]]:
|
|
"""
|
|
Post content to WordPress with proper attribution and formatting.
|
|
|
|
Args:
|
|
post_data: The post content and metadata
|
|
category: The post category
|
|
link: The original article link
|
|
author: The author information
|
|
image_url: Optional image URL
|
|
original_source: Optional original source name
|
|
image_source: Optional image source
|
|
uploader: Optional image uploader
|
|
pixabay_url: Optional Pixabay image URL
|
|
interest_score: Optional interest score
|
|
|
|
Returns:
|
|
Tuple of (post_id, post_url) or (None, None) if failed
|
|
"""
|
|
try:
|
|
# Get WordPress credentials from environment
|
|
wp_url = "https://insiderfoodie.com/xmlrpc.php" # Updated XML-RPC endpoint
|
|
wp_username = author["username"]
|
|
wp_password = os.getenv(f"{wp_username.upper()}_PASSWORD")
|
|
|
|
if not wp_password:
|
|
logger.error(f"Missing WordPress password for author {wp_username}")
|
|
return None, None
|
|
|
|
# Initialize WordPress API client
|
|
wp = Client(
|
|
wp_url,
|
|
wp_username,
|
|
wp_password
|
|
)
|
|
|
|
# Upload featured image if provided
|
|
featured_image_id = None
|
|
if image_url:
|
|
try:
|
|
# Download image
|
|
response = requests.get(image_url, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
# Create image filename
|
|
image_filename = f"{post_data['title'].replace(' ', '_')}.jpg"
|
|
|
|
# Upload to WordPress
|
|
media_data = {
|
|
'file': (image_filename, response.content, 'image/jpeg'),
|
|
'title': post_data['title'],
|
|
'caption': f"Image source: {image_source}\nUploader: {uploader}\nURL: {pixabay_url}" if image_source else None
|
|
}
|
|
|
|
media = wp.call(UploadFile(media_data))
|
|
featured_image_id = media['id']
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to upload image '{image_filename}' to WordPress: {e}")
|
|
# Continue without image
|
|
|
|
# Prepare post data
|
|
post = {
|
|
'title': post_data['title'],
|
|
'content': post_data['content'],
|
|
'status': 'publish',
|
|
'categories': [category],
|
|
'author': author['id'],
|
|
'featured_media': featured_image_id,
|
|
'meta': {
|
|
'original_source': original_source,
|
|
'original_link': link,
|
|
'interest_score': interest_score
|
|
}
|
|
}
|
|
|
|
# Create post
|
|
result = wp.call(NewPost(post))
|
|
|
|
if result and 'id' in result:
|
|
post_id = result['id']
|
|
post_url = f"https://insiderfoodie.com/?p={post_id}"
|
|
logger.info(f"Successfully posted to WordPress (ID: {post_id})")
|
|
return post_id, post_url
|
|
|
|
logger.error("Failed to create WordPress post")
|
|
return None, None
|
|
|
|
except Exception as e:
|
|
logger.error(f"WordPress API request failed: {e}")
|
|
return None, None
|
|
|
|
def determine_paragraph_count(interest_score):
|
|
if interest_score >= 9:
|
|
return 5
|
|
elif interest_score >= 7:
|
|
return 4
|
|
return 3
|
|
|
|
def is_interesting(summary):
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=LIGHT_TASK_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": (
|
|
"Rate this content from 0-10 based on its rarity, buzzworthiness, and engagement potential for food lovers, covering a wide range of food topics (skip recipes). "
|
|
"Score 8-10 for rare, highly shareable ideas that grab attention. "
|
|
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. "
|
|
"Return only a number."
|
|
)},
|
|
{"role": "user", "content": f"Content: {summary}"}
|
|
],
|
|
max_tokens=5
|
|
)
|
|
raw_score = response.choices[0].message.content.strip()
|
|
score = int(raw_score) if raw_score.isdigit() else 0
|
|
print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})")
|
|
logger.info(f"Interest Score: {score} (raw: {raw_score})")
|
|
return score
|
|
except Exception as e:
|
|
logger.error(f"Interestingness scoring failed: {e}")
|
|
print(f"Interest Error: {e}")
|
|
return 0
|
|
|
|
def generate_title_from_summary(summary):
|
|
banned_words = ["elevate", "elevating", "elevated"]
|
|
for attempt in range(3):
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=LIGHT_TASK_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": (
|
|
"Generate a concise, engaging title (under 100 characters) based on this summary, covering food topics. "
|
|
"Craft it with Upworthy/Buzzfeed flair—think 'you won't believe this' or 'this is nuts'—for food insiders. "
|
|
"Avoid quotes, emojis, special characters, or the words 'elevate', 'elevating', 'elevated'. "
|
|
"End with a question to spark shares."
|
|
)},
|
|
{"role": "user", "content": f"Summary: {summary}"}
|
|
],
|
|
max_tokens=30
|
|
)
|
|
title = response.choices[0].message.content.strip().replace('"', '').replace("'", "")
|
|
if ':' in title:
|
|
title = title.split(':', 1)[1].strip()
|
|
if len(title) > 100 or any(word in title.lower() for word in banned_words):
|
|
reason = "length" if len(title) > 100 else "banned word"
|
|
print(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}")
|
|
logger.info(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}")
|
|
continue
|
|
logger.info(f"Generated title: {title}")
|
|
return title
|
|
except Exception as e:
|
|
logger.error(f"Title generation failed (attempt {attempt + 1}/3): {e}")
|
|
print(f"Title Error: {e}")
|
|
print("Failed to generate valid title after 3 attempts")
|
|
logger.info("Failed to generate valid title after 3 attempts")
|
|
return None
|
|
|
|
def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_prompt=""):
|
|
try:
|
|
persona = select_best_persona(interest_score, content)
|
|
persona_config = PERSONA_CONFIGS.get(persona, {
|
|
"article_prompt": "Write a concise, engaging summary that captures the essence of the content for food lovers.",
|
|
"description": "a generic food writer",
|
|
"tone": "an engaging tone"
|
|
})
|
|
prompt = persona_config["article_prompt"].format(
|
|
description=persona_config["description"],
|
|
tone=persona_config["tone"],
|
|
num_paragraphs=determine_paragraph_count(interest_score)
|
|
)
|
|
logger.info(f"Using {persona} with interest_score and content")
|
|
|
|
full_prompt = (
|
|
f"{prompt}\n\n"
|
|
f"Do not include the article title in the summary.\n\n"
|
|
f"{extra_prompt}\n\n"
|
|
f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'.\n"
|
|
f"Content to summarize:\n{content}\n\n"
|
|
f"Source: {source_name}\n"
|
|
f"Link: {link}"
|
|
)
|
|
|
|
response = client.chat.completions.create(
|
|
model=SUMMARY_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": full_prompt},
|
|
{"role": "user", "content": content}
|
|
],
|
|
max_tokens=1000,
|
|
temperature=0.7
|
|
)
|
|
|
|
summary = response.choices[0].message.content.strip()
|
|
|
|
# Post-process to remove the original title if it still appears
|
|
# Extract the title from the content (assuming it's the first line or part of the prompt)
|
|
# For simplicity, we can pass the title as an additional parameter if needed
|
|
# Here, we'll assume the title is passed via the calling function (e.g., from foodie_automator_rss.py)
|
|
# For now, we'll use a placeholder for the title removal logic
|
|
# In foodie_automator_rss.py, the title is available as entry.title
|
|
# We'll handle the title removal in the calling script instead
|
|
logger.info(f"Processed summary (Persona: {persona}): {summary}")
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error(f"Summary generation failed with model {SUMMARY_MODEL}: {e}")
|
|
return None
|
|
|
|
def insert_link_naturally(summary, source_name, source_url):
|
|
try:
|
|
logger.info(f"Input summary to insert_link_naturally: {summary!r}")
|
|
|
|
prompt = (
|
|
"Take this summary and insert a single HTML link naturally into one paragraph (randomly chosen). "
|
|
"Use the format '<a href=\"{source_url}\">{source_name}</a>' and weave it into the text seamlessly, "
|
|
"e.g., 'The latest scoop from {source_name} reveals...' or '{source_name} shares this insight.' "
|
|
"Vary the phrasing creatively to avoid repetition (don't always use 'dives into'). "
|
|
"Place the link at a sentence boundary (after a period, not within numbers like '6.30am' or '1.5'). "
|
|
"Maintain the original tone, flow, and paragraph structure, preserving all existing newlines exactly as they are. "
|
|
"Each paragraph in the input summary is separated by a single \\n; ensure the output maintains this exact separation. "
|
|
"Do not add or remove newlines beyond the original summary structure. "
|
|
"Return the modified summary with exactly one link.\n\n"
|
|
"Summary:\n{summary}\n\n"
|
|
"Source Name: {source_name}\nSource URL: {source_url}"
|
|
).format(summary=summary, source_name=source_name, source_url=source_url)
|
|
|
|
response = client.chat.completions.create(
|
|
model=LIGHT_TASK_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": prompt},
|
|
{"role": "user", "content": "Insert the link naturally into the summary."}
|
|
],
|
|
max_tokens=1000,
|
|
temperature=0.7
|
|
)
|
|
new_summary = response.choices[0].message.content.strip()
|
|
link_pattern = f'<a href="{source_url}">{source_name}</a>'
|
|
if new_summary and new_summary.count(link_pattern) == 1:
|
|
paragraphs = new_summary.split('\n')
|
|
paragraphs = [p.strip() for p in paragraphs]
|
|
new_summary = '\n'.join(paragraphs)
|
|
logger.info(f"Summary with naturally embedded link (normalized): {new_summary!r}")
|
|
return new_summary
|
|
|
|
logger.warning(f"GPT failed to insert link correctly: {new_summary}. Using fallback.")
|
|
except Exception as e:
|
|
logger.error(f"Link insertion failed: {e}")
|
|
|
|
# Fallback path
|
|
time_pattern = r'\b\d{1,2}\.\d{2}(?:am|pm)\b'
|
|
protected_summary = re.sub(time_pattern, lambda m: m.group(0).replace('.', '@'), summary)
|
|
paragraphs = protected_summary.split('\n')
|
|
if not paragraphs or all(not p.strip() for p in paragraphs):
|
|
logger.error("No valid paragraphs to insert link.")
|
|
return summary
|
|
|
|
target_para = random.choice([p for p in paragraphs if p.strip()])
|
|
link_pattern = f'<a href="{source_url}">{source_name}</a>'
|
|
phrases = [
|
|
f"Learn more from {link_pattern}",
|
|
f"{link_pattern} shares this insight",
|
|
f"Discover more at {link_pattern}",
|
|
f"Check out {link_pattern} for details"
|
|
]
|
|
insertion_phrase = random.choice(phrases)
|
|
|
|
sentences = re.split(r'(?<=[.!?])\s+', target_para)
|
|
insertion_point = -1
|
|
for i, sent in enumerate(sentences):
|
|
if sent.strip() and '@' not in sent:
|
|
insertion_point = sum(len(s) + 1 for s in sentences[:i+1])
|
|
break
|
|
if insertion_point == -1:
|
|
insertion_point = len(target_para)
|
|
|
|
new_para = f"{target_para[:insertion_point]} {insertion_phrase}. {target_para[insertion_point:]}".strip()
|
|
paragraphs[paragraphs.index(target_para)] = new_para
|
|
new_summary = '\n'.join(paragraphs)
|
|
|
|
new_summary = new_summary.replace('@', '.')
|
|
logger.info(f"Fallback summary with link: {new_summary!r}")
|
|
return new_summary
|
|
|
|
def generate_category_from_summary(summary):
|
|
try:
|
|
if not isinstance(summary, str) or not summary.strip():
|
|
logger.warning(f"Invalid summary for category generation: {summary}. Defaulting to 'Trends'.")
|
|
return "Trends"
|
|
|
|
response = client.chat.completions.create(
|
|
model=LIGHT_TASK_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": (
|
|
"Based on this summary, select the most relevant category from: Food, Culture, Trends, Health, Lifestyle, Drink, Eats. "
|
|
"Return only the category name."
|
|
)},
|
|
{"role": "user", "content": summary}
|
|
],
|
|
max_tokens=10
|
|
)
|
|
category = response.choices[0].message.content.strip()
|
|
logger.info(f"Generated category: {category}")
|
|
return category if category in ["Food", "Culture", "Trends", "Health", "Lifestyle", "Drink", "Eats"] else "Trends"
|
|
except Exception as e:
|
|
logger.error(f"Category generation failed: {e}")
|
|
return "Trends"
|
|
|
|
def select_best_author(summary):
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=LIGHT_TASK_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": (
|
|
"Based on this restaurant/food industry trend summary, pick the most suitable author from: "
|
|
"owenjohnson, javiermorales, aishapatel, trangnguyen, keishareid, lilamoreau. "
|
|
"Consider their expertise: owenjohnson (global dining trends), javiermorales (food critique), "
|
|
"aishapatel (emerging food trends), trangnguyen (cultural dining), keishareid (soul food heritage), "
|
|
"lilamoreau (global street food). Return only the username."
|
|
)},
|
|
{"role": "user", "content": summary}
|
|
],
|
|
max_tokens=20
|
|
)
|
|
author = response.choices[0].message.content.strip()
|
|
valid_authors = ["owenjohnson", "javiermorales", "aishapatel", "trangnguyen", "keishareid", "lilamoreau"]
|
|
logger.info(f"Selected author: {author}")
|
|
return author if author in valid_authors else "owenjohnson"
|
|
except Exception as e:
|
|
logger.error(f"Author selection failed: {e}")
|
|
return "owenjohnson"
|
|
|
|
def prepare_post_data(final_summary, original_title, context_info=""):
|
|
innovative_title = generate_title_from_summary(final_summary)
|
|
if not innovative_title:
|
|
logger.info(f"Title generation failed for '{original_title}' {context_info}")
|
|
return None, None, None, None, None, None, None
|
|
|
|
# Pass innovative_title and final_summary as separate arguments
|
|
search_query, relevance_keywords, _ = generate_image_query(innovative_title, final_summary)
|
|
if not search_query:
|
|
logger.info(f"Image query generation failed for '{innovative_title}' {context_info}")
|
|
return None, None, None, None, None, None, None
|
|
|
|
logger.info(f"Fetching Flickr image for query: '{search_query}' {context_info}")
|
|
image_url, image_source, uploader, page_url = get_flickr_image(search_query, relevance_keywords)
|
|
|
|
if not image_url:
|
|
logger.info(f"Flickr fetch failed for '{search_query}' - falling back to Pixabay {context_info}")
|
|
# Use the same title and summary for fallback
|
|
image_query, _, _ = generate_image_query(innovative_title, final_summary)
|
|
image_url, image_source, uploader, page_url = get_image(image_query)
|
|
if not image_url:
|
|
logger.info(f"Pixabay fetch failed for title '{innovative_title}' - falling back to summary {context_info}")
|
|
image_query, _, _ = generate_image_query(final_summary, final_summary) # Using summary as both title and summary for fallback
|
|
image_url, image_source, uploader, page_url = get_image(image_query)
|
|
if not image_url:
|
|
logger.info(f"Image fetch failed again for '{original_title}' - proceeding without image {context_info}")
|
|
|
|
post_data = {"title": innovative_title, "content": final_summary}
|
|
selected_username = select_best_author(final_summary)
|
|
author = next((a for a in AUTHORS if a["username"] == selected_username), None)
|
|
if not author:
|
|
logger.error(f"Author '{selected_username}' not found in AUTHORS, defaulting to owenjohnson")
|
|
author = {"username": "owenjohnson", "password": "rfjk xhn6 2RPy FuQ9 cGlU K8mC"}
|
|
category = generate_category_from_summary(final_summary)
|
|
|
|
return post_data, author, category, image_url, image_source, uploader, page_url
|
|
|
|
def save_post_to_recent(post_title, post_url, author_username, timestamp):
|
|
try:
|
|
recent_posts = load_json_file(FILE_PATHS["recent_posts"], 24)
|
|
entry = {
|
|
"title": post_title,
|
|
"url": post_url,
|
|
"author_username": author_username,
|
|
"timestamp": timestamp
|
|
}
|
|
recent_posts.append(entry)
|
|
with open(FILE_PATHS["recent_posts"], 'w') as f:
|
|
for item in recent_posts:
|
|
json.dump(item, f)
|
|
f.write('\n')
|
|
logger.info(f"Saved post '{post_title}' to recent_posts.json")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save post to recent_posts.json: {e}")
|
|
|
|
def prune_recent_posts():
|
|
try:
|
|
cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
|
|
recent_posts = load_json_file(FILE_PATHS["recent_posts"], 24)
|
|
recent_posts = [entry for entry in recent_posts if entry["timestamp"] > cutoff]
|
|
with open(FILE_PATHS["recent_posts"], 'w') as f:
|
|
for item in recent_posts:
|
|
json.dump(item, f)
|
|
f.write('\n')
|
|
logger.info(f"Pruned recent_posts.json to {len(recent_posts)} entries")
|
|
except Exception as e:
|
|
logger.error(f"Failed to prune recent_posts.json: {e}")
|
|
|
|
def load_used_images():
|
|
"""Load the set of used image URLs from file."""
|
|
global used_images
|
|
try:
|
|
if os.path.exists(USED_IMAGES_FILE):
|
|
with open(USED_IMAGES_FILE, 'r') as f:
|
|
used_images = set(json.loads(line.strip())['url'] for line in f if line.strip())
|
|
logger.info(f"Loaded {len(used_images)} used images from {USED_IMAGES_FILE}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load used images: {e}")
|
|
used_images = set()
|
|
|
|
def save_used_images():
|
|
"""Save the set of used image URLs to file."""
|
|
try:
|
|
with open(USED_IMAGES_FILE, 'w') as f:
|
|
for url in used_images:
|
|
json.dump({'url': url, 'timestamp': datetime.now(timezone.utc).isoformat()}, f)
|
|
f.write('\n')
|
|
logger.info(f"Saved {len(used_images)} used images to {USED_IMAGES_FILE}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save used images: {e}")
|
|
|
|
# Load used images on startup
|
|
load_used_images()
|
|
|
|
def get_image(search_query: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
|
|
"""Get an image with improved rate limiting and error handling."""
|
|
headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}
|
|
|
|
# Try Pixabay with rate limiting
|
|
try:
|
|
pixabay_rate_limiter.wait_if_needed()
|
|
pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(search_query)}&image_type=photo&per_page=10"
|
|
response = requests.get(pixabay_url, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
for hit in data.get('hits', []):
|
|
img_url = hit.get('webformatURL')
|
|
if not img_url or img_url in used_images:
|
|
continue
|
|
uploader = hit.get('user', 'Unknown')
|
|
page_url = hit.get('pageURL', img_url)
|
|
|
|
used_images.add(img_url)
|
|
save_used_images()
|
|
|
|
logger.info(f"Selected Pixabay image: {img_url} by {uploader} for query '{search_query}'")
|
|
return img_url, "Pixabay", uploader, page_url
|
|
|
|
logger.info(f"No valid Pixabay image found for query '{search_query}'. Trying fallback query.")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Pixabay image fetch failed for query '{search_query}': {e}")
|
|
|
|
# Fallback to a generic query with rate limiting
|
|
fallback_query = "food dining"
|
|
try:
|
|
pixabay_rate_limiter.wait_if_needed()
|
|
pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(fallback_query)}&image_type=photo&per_page=10"
|
|
response = requests.get(pixabay_url, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
for hit in data.get('hits', []):
|
|
img_url = hit.get('webformatURL')
|
|
if not img_url or img_url in used_images:
|
|
continue
|
|
uploader = hit.get('user', 'Unknown')
|
|
page_url = hit.get('pageURL', img_url)
|
|
|
|
used_images.add(img_url)
|
|
save_used_images()
|
|
|
|
logger.info(f"Selected Pixabay fallback image: {img_url} by {uploader} for query '{fallback_query}'")
|
|
return img_url, "Pixabay", uploader, page_url
|
|
|
|
logger.warning(f"No valid Pixabay image found for fallback query '{fallback_query}'.")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Pixabay fallback image fetch failed for query '{fallback_query}': {e}")
|
|
|
|
logger.error(f"All image fetch attempts failed for query '{search_query}'. Returning None.")
|
|
return None, None, None, None
|
|
|
|
def generate_image_query(title: str, summary: str) -> Tuple[str, List[str], bool]:
|
|
"""Generate an image search query with improved error handling."""
|
|
try:
|
|
prompt = (
|
|
"Given the following article title and summary, generate a concise image search query (max 5 words) to find a relevant image. "
|
|
"Also provide a list of relevance keywords (max 5 words) that should be associated with the image. "
|
|
"Return the result as a JSON object with 'search' and 'relevance' keys.\n\n"
|
|
f"Title: {title}\n\n"
|
|
f"Summary: {summary}\n\n"
|
|
"Example output:\n"
|
|
"```json\n"
|
|
"{\"search\": \"Italian cuisine trends\", \"relevance\": \"pasta wine dining culture\"}\n"
|
|
"```"
|
|
)
|
|
response = client.chat.completions.create(
|
|
model=LIGHT_TASK_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": prompt},
|
|
{"role": "user", "content": "Generate an image search query and relevance keywords."}
|
|
],
|
|
max_tokens=100,
|
|
temperature=0.5
|
|
)
|
|
raw_response = response.choices[0].message.content
|
|
json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response)
|
|
if not json_match:
|
|
logger.warning(f"Failed to parse image query JSON: {raw_response}")
|
|
return title, [], True
|
|
|
|
query_data = json.loads(json_match.group(1))
|
|
search_query = query_data.get("search", title)
|
|
relevance_keywords = query_data.get("relevance", "").split()
|
|
|
|
# Log the JSON object in a single line
|
|
log_json = json.dumps(query_data).replace('\n', ' ').replace('\r', ' ')
|
|
logger.debug(f"Image query from content: {log_json}")
|
|
|
|
return search_query, relevance_keywords, False
|
|
except Exception as e:
|
|
logger.warning(f"Image query generation failed: {e}. Using title as fallback.")
|
|
return title, [], True
|
|
|
|
def smart_image_and_filter(title: str, content: str) -> Tuple[str, List[str], bool]:
|
|
"""
|
|
Generate an image query and determine if the content should be filtered.
|
|
|
|
Args:
|
|
title: The article title
|
|
content: The article content
|
|
|
|
Returns:
|
|
Tuple of (image_query, relevance_keywords, should_skip)
|
|
"""
|
|
try:
|
|
# Prepare prompt for GPT
|
|
prompt = f"""
|
|
Analyze this food-related content and determine:
|
|
1. A good image search query
|
|
2. Relevant keywords
|
|
3. Whether to skip this content
|
|
|
|
Title: {title}
|
|
Content: {content}
|
|
|
|
Return a JSON object with:
|
|
- image_query: A concise search query for finding relevant images
|
|
- relevance: List of relevant keywords
|
|
- action: Either "KEEP" or "SKIP"
|
|
|
|
Keep content that is:
|
|
- About food trends, innovations, or interesting culinary topics
|
|
- Has broad appeal to food enthusiasts
|
|
- Contains unique or noteworthy information
|
|
|
|
Skip content that is:
|
|
- Basic recipes or cooking instructions
|
|
- Restaurant reviews or menu items
|
|
- Generic food news without unique angles
|
|
"""
|
|
|
|
# Get response from GPT
|
|
response = client.chat.completions.create(
|
|
model="gpt-4",
|
|
messages=[
|
|
{"role": "system", "content": "You are a food content curator."},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
temperature=0.7,
|
|
max_tokens=150
|
|
)
|
|
|
|
# Parse response
|
|
try:
|
|
result = json.loads(response.choices[0].message.content)
|
|
image_query = result.get("image_query", "")
|
|
relevance = result.get("relevance", [])
|
|
action = result.get("action", "KEEP")
|
|
|
|
logger.info(f"Raw GPT smart image/filter response: '{response.choices[0].message.content}'")
|
|
logger.info(f"Smart image query: {image_query}, Relevance: {relevance}, Skip: {action == 'SKIP'}")
|
|
|
|
return image_query, relevance, action == "SKIP"
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"JSON parsing failed: {e}, raw: '{response.choices[0].message.content}'. Using fallback.")
|
|
# Fallback to basic filtering
|
|
return title, [], "recipe" in title.lower() or "how to" in title.lower()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in smart image/filter: {e}")
|
|
return title, [], False
|
|
|
|
def classify_keywords(keywords):
|
|
prompt = (
|
|
"Given the following keywords from an image search query, classify each as 'specific' (e.g., brand names, unique entities like 'Taco Bell' or 'Paris') or 'generic' (e.g., common or abstract terms like 'dining' or 'trends'). "
|
|
"Return a JSON object mapping each keyword to its classification.\n\n"
|
|
"Keywords: " + ", ".join(keywords) + "\n\n"
|
|
"Example output format (do not use these exact keywords in your response):\n"
|
|
"```json\n"
|
|
"{\n"
|
|
" \"keyword1\": \"specific\",\n"
|
|
" \"keyword2\": \"generic\"\n"
|
|
"}\n```"
|
|
)
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=LIGHT_TASK_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": "You are a helper that classifies keywords."},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
max_tokens=100,
|
|
temperature=0.5
|
|
)
|
|
raw_response = response.choices[0].message.content
|
|
json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response)
|
|
if not json_match:
|
|
logger.warning(f"Failed to parse keyword classification JSON: {raw_response}")
|
|
return {kw: "specific" for kw in keywords}
|
|
|
|
classifications = json.loads(json_match.group(1))
|
|
return classifications
|
|
except Exception as e:
|
|
logger.warning(f"Keyword classification failed: {e}. Defaulting to all specific.")
|
|
return {kw: "specific" for kw in keywords}
|
|
|
|
def search_ddg_for_flickr(query):
|
|
ddg_query = f"{query} site:flickr.com"
|
|
ddg_url = f"https://duckduckgo.com/?q={quote(ddg_query)}"
|
|
try:
|
|
response = requests.get(ddg_url, headers={'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}, timeout=10)
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
photo_ids = set()
|
|
for link in soup.find_all('a', href=True):
|
|
href = link['href']
|
|
match = re.search(r'flickr\.com/photos/[^/]+/(\d+)', href)
|
|
if match:
|
|
photo_id = match.group(1)
|
|
photo_ids.add(photo_id)
|
|
|
|
photo_ids = list(photo_ids)[:2] # Limit to 2 IDs
|
|
logger.info(f"Found {len(photo_ids)} Flickr photo IDs via DDG: {photo_ids}")
|
|
return photo_ids
|
|
except Exception as e:
|
|
logger.warning(f"DDG search failed for query '{ddg_query}': {e}")
|
|
return set()
|
|
|
|
def get_flickr_image(search_query: str, relevance_keywords: List[str] = None) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
|
|
"""
|
|
Get an image from Flickr using the search query and relevance keywords.
|
|
|
|
Args:
|
|
search_query: The search query to find images
|
|
relevance_keywords: Optional list of keywords to help filter relevant images
|
|
|
|
Returns:
|
|
Tuple of (image_url, image_source, uploader, page_url) or (None, None, None, None) if no image found
|
|
"""
|
|
try:
|
|
# Initialize Flickr API
|
|
flickr_api.set_keys(api_key=FLICKR_API_KEY, api_secret=FLICKR_API_SECRET)
|
|
|
|
# Try to find photo IDs via DuckDuckGo first
|
|
photo_ids = search_ddg_for_flickr(search_query)
|
|
if not photo_ids:
|
|
# Fallback to direct Flickr search
|
|
photos = flickr_api.Photo.search(
|
|
text=search_query,
|
|
sort='relevance',
|
|
per_page=10,
|
|
safe_search=1
|
|
)
|
|
photo_ids = [photo.id for photo in photos]
|
|
|
|
if not photo_ids:
|
|
logger.warning(f"No Flickr photos found for query '{search_query}'")
|
|
return None, None, None, None
|
|
|
|
# Try each photo ID until we find a suitable image
|
|
for photo_id in photo_ids:
|
|
try:
|
|
photo = flickr_api.Photo(id=photo_id)
|
|
sizes = photo.getSizes()
|
|
|
|
# Get the largest available size
|
|
size = sizes.get('Large', sizes.get('Medium', sizes.get('Small')))
|
|
if not size:
|
|
continue
|
|
|
|
img_url = size['source']
|
|
if not img_url:
|
|
continue
|
|
|
|
# Check if image is already used
|
|
if img_url in used_images:
|
|
continue
|
|
|
|
# Get photo info for attribution
|
|
info = photo.getInfo()
|
|
if not hasattr(info, 'owner') or not hasattr(info.owner, 'username'):
|
|
continue
|
|
|
|
uploader = info.owner.username
|
|
page_url = f"https://www.flickr.com/photos/{info.owner.id}/{photo_id}"
|
|
|
|
# Save to used images
|
|
used_images.add(img_url)
|
|
save_used_images()
|
|
|
|
logger.info(f"Selected Flickr image: {img_url} by {uploader} for query '{search_query}'")
|
|
return img_url, "Flickr", uploader, page_url
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to process Flickr photo {photo_id}: {e}")
|
|
continue
|
|
|
|
logger.warning(f"No suitable Flickr images found for query '{search_query}'")
|
|
return None, None, None, None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Flickr image fetch failed for query '{search_query}': {e}")
|
|
return None, None, None, None |