You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
998 lines
47 KiB
998 lines
47 KiB
import base64 |
|
import json |
|
import logging |
|
import os |
|
import random |
|
import re |
|
from PIL import Image |
|
import pytesseract |
|
import io |
|
import tempfile |
|
import requests |
|
import time |
|
from dotenv import load_dotenv |
|
import os |
|
from datetime import datetime, timezone, timedelta |
|
from openai import OpenAI |
|
from urllib.parse import quote |
|
from duckduckgo_search import DDGS |
|
from bs4 import BeautifulSoup |
|
from requests.adapters import HTTPAdapter |
|
from requests.packages.urllib3.util.retry import Retry |
|
from foodie_config import ( |
|
RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, SUMMARY_PERSONA_PROMPTS, |
|
get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL |
|
) |
|
load_dotenv() |
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
|
def load_json_file(filename, expiration_days=None): |
|
data = [] |
|
if os.path.exists(filename): |
|
try: |
|
with open(filename, 'r') as f: |
|
lines = f.readlines() |
|
for i, line in enumerate(lines, 1): |
|
if line.strip(): |
|
try: |
|
entry = json.loads(line.strip()) |
|
if not isinstance(entry, dict) or "title" not in entry or "timestamp" not in entry: |
|
logging.warning(f"Skipping malformed entry in {filename} at line {i}: {entry}") |
|
continue |
|
data.append(entry) |
|
except json.JSONDecodeError as e: |
|
logging.warning(f"Skipping invalid JSON line in {filename} at line {i}: {e}") |
|
if expiration_days: |
|
cutoff = (datetime.now() - timedelta(days=expiration_days)).isoformat() |
|
data = [entry for entry in data if entry["timestamp"] > cutoff] |
|
logging.info(f"Loaded {len(data)} entries from {filename}, {len(data)} valid after expiration check") |
|
except Exception as e: |
|
logging.error(f"Failed to load {filename}: {e}") |
|
data = [] # Reset to empty on failure |
|
return data |
|
|
|
def save_json_file(filename, key, value): |
|
entry = {"title": key, "timestamp": value} |
|
PRUNE_INTERVAL_DAYS = 180 |
|
try: |
|
data = load_json_file(filename, expiration_days=PRUNE_INTERVAL_DAYS) |
|
# Remove duplicates by title |
|
data = [item for item in data if item["title"] != key] |
|
data.append(entry) |
|
with open(filename, 'w') as f: |
|
for item in data: |
|
json.dump(item, f) |
|
f.write('\n') |
|
logging.info(f"Saved '{key}' to {filename}") |
|
print(f"DEBUG: Saved '{key}' to {filename}") |
|
loaded_data = load_json_file(filename, expiration_days=PRUNE_INTERVAL_DAYS) |
|
logging.info(f"Pruned {filename} to {len(loaded_data)} entries (older than {PRUNE_INTERVAL_DAYS} days removed)") |
|
except Exception as e: |
|
logging.error(f"Failed to save or prune {filename}: {e}") |
|
|
|
def select_best_persona(interest_score, content=""): |
|
logging.info("Using select_best_persona with interest_score and content") |
|
personas = ["Visionary Editor", "Foodie Critic", "Trend Scout", "Culture Connoisseur"] |
|
content_lower = content.lower() |
|
|
|
if any(kw in content_lower for kw in ["tech", "ai", "innovation", "sustainability"]): |
|
return random.choice(["Trend Scout", "Visionary Editor"]) |
|
elif any(kw in content_lower for kw in ["review", "critic", "taste", "flavor"]): |
|
return "Foodie Critic" |
|
elif any(kw in content_lower for kw in ["culture", "tradition", "history"]): |
|
return "Culture Connoisseur" |
|
|
|
if interest_score >= 8: |
|
return random.choice(personas[:2]) |
|
elif interest_score >= 6: |
|
return random.choice(personas[2:]) |
|
return random.choice(personas) |
|
|
|
def get_image(search_query): |
|
api_key = "14836528-999c19a033d77d463113b1fb8" |
|
base_url = "https://pixabay.com/api/" |
|
queries = [search_query.split()[:2], search_query.split()] |
|
|
|
for query in queries: |
|
short_query = " ".join(query) |
|
params = { |
|
"key": api_key, |
|
"q": short_query, |
|
"image_type": "photo", |
|
"safesearch": True, |
|
"per_page": 20 |
|
} |
|
try: |
|
logging.info(f"Fetching Pixabay image for query '{short_query}'") |
|
response = requests.get(base_url, params=params, timeout=10) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
if not data.get("hits"): |
|
logging.warning(f"No image hits for query '{short_query}'") |
|
continue |
|
|
|
valid_images = [ |
|
hit for hit in data["hits"] |
|
if all(tag not in hit.get("tags", "").lower() for tag in ["dog", "cat", "family", "child", "baby"]) |
|
] |
|
|
|
if not valid_images: |
|
logging.warning(f"No valid images for query '{short_query}' after filtering") |
|
continue |
|
|
|
image = random.choice(valid_images) |
|
image_url = image["webformatURL"] |
|
image_source = "Pixabay" |
|
uploader = image.get("user", "Unknown") |
|
pixabay_url = image["pageURL"] |
|
|
|
logging.info(f"Fetched image URL: {image_url} by {uploader} for query '{short_query}'") |
|
print(f"DEBUG: Image selected for query '{short_query}': {image_url}") |
|
return image_url, image_source, uploader, pixabay_url |
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"Image fetch failed for query '{short_query}': {e}") |
|
continue |
|
|
|
logging.error(f"All Pixabay image queries failed: {queries}") |
|
return None, None, None, None |
|
|
|
def generate_image_query(content): |
|
try: |
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": ( |
|
"From this content (title and summary), generate two sets of 2-3 concise keywords for an image search about restaurant/food industry trends:\n" |
|
"1. Search keywords: For finding images (e.g., 'AI restaurant technology'). Focus on key themes like technology, sustainability, dining, or specific food concepts.\n" |
|
"2. Relevance keywords: For filtering relevant images (e.g., 'ai tech dining'). Focus on core concepts to ensure match.\n" |
|
"Avoid vague terms like 'trends', 'future', or unrelated words like 'dog', 'family'. " |
|
"Return as JSON: {'search': 'keyword1 keyword2', 'relevance': 'keyword3 keyword4'}" |
|
)}, |
|
{"role": "user", "content": content} |
|
], |
|
max_tokens=100 |
|
) |
|
raw_result = response.choices[0].message.content.strip() |
|
logging.info(f"Raw GPT image query response: '{raw_result}'") |
|
print(f"DEBUG: Raw GPT image query response: '{raw_result}'") |
|
|
|
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip() |
|
result = json.loads(cleaned_result) |
|
if not isinstance(result, dict) or "search" not in result or "relevance" not in result or len(result["search"].split()) < 2: |
|
logging.warning(f"Invalid image query format: {result}, using fallback") |
|
words = re.findall(r'\w+', content.lower()) |
|
filtered_words = [w for w in words if w not in RECIPE_KEYWORDS + PROMO_KEYWORDS + ['trends', 'future', 'dog', 'family']] |
|
search = " ".join(filtered_words[:3]) or "restaurant innovation" |
|
relevance = filtered_words[3:6] or ["dining", "tech"] |
|
result = {"search": search, "relevance": " ".join(relevance)} |
|
|
|
logging.info(f"Generated image query: {result}") |
|
print(f"DEBUG: Image query from content: {result}") |
|
return result["search"], result["relevance"].split() |
|
except json.JSONDecodeError as e: |
|
logging.error(f"JSON parsing failed for image query: {e}, raw response: '{raw_result}'") |
|
words = re.findall(r'\w+', content.lower()) |
|
filtered_words = [w for w in words if w not in RECIPE_KEYWORDS + PROMO_KEYWORDS + ['trends', 'future', 'dog', 'family']] |
|
search = " ".join(filtered_words[:3]) or "restaurant innovation" |
|
relevance = filtered_words[3:6] or ["dining", "tech"] |
|
logging.info(f"Fallback image query: {{'search': '{search}', 'relevance': '{' '.join(relevance)}'}}") |
|
return search, relevance |
|
except Exception as e: |
|
logging.error(f"Image query generation failed: {e}") |
|
print(f"Image Query Error: {e}") |
|
return None, None |
|
|
|
def smart_image_and_filter(title, summary): |
|
try: |
|
content = f"{title}\n\n{summary}" |
|
|
|
prompt = ( |
|
"Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) " |
|
"for an image search about food industry trends or viral content. Prioritize specific terms if present, " |
|
"otherwise focus on the main theme. " |
|
"Return 'SKIP' if the article is about home appliances, recipes, promotions, or contains 'homemade', else 'KEEP'. " |
|
"Return as JSON: {'image_query': 'specific term', 'relevance': ['keyword1', 'keyword2'], 'action': 'KEEP' or 'SKIP'}" |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": prompt}, |
|
{"role": "user", "content": content} |
|
], |
|
max_tokens=100 |
|
) |
|
raw_result = response.choices[0].message.content.strip() |
|
logging.info(f"Raw GPT smart image/filter response: '{raw_result}'") |
|
|
|
# Clean and parse JSON |
|
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip() |
|
try: |
|
result = json.loads(cleaned_result) |
|
except json.JSONDecodeError as e: |
|
logging.warning(f"JSON parsing failed: {e}, raw: '{cleaned_result}'. Using fallback.") |
|
return "food trends", ["cuisine", "dining"], False |
|
|
|
if not isinstance(result, dict) or "image_query" not in result or "relevance" not in result or "action" not in result: |
|
logging.warning(f"Invalid GPT response format: {result}, using fallback") |
|
return "food trends", ["cuisine", "dining"], False |
|
|
|
image_query = result["image_query"] |
|
relevance_keywords = result["relevance"] |
|
skip_flag = result["action"] == "SKIP" or "homemade" in title.lower() or "homemade" in summary.lower() |
|
|
|
logging.info(f"Smart image query: {image_query}, Relevance: {relevance_keywords}, Skip: {skip_flag}") |
|
|
|
if not image_query or len(image_query.split()) < 2: |
|
logging.warning(f"Image query '{image_query}' too vague, using fallback") |
|
return "food trends", ["cuisine", "dining"], skip_flag |
|
|
|
return image_query, relevance_keywords, skip_flag |
|
|
|
except Exception as e: |
|
logging.error(f"Smart image/filter failed: {e}, using fallback") |
|
return "food trends", ["cuisine", "dining"], False |
|
|
|
def upload_image_to_wp(image_url, post_title, wp_base_url, wp_username, wp_password, image_source="Pixabay", uploader=None, pixabay_url=None): |
|
try: |
|
safe_title = post_title.encode('ascii', 'ignore').decode('ascii').replace(' ', '_')[:50] |
|
headers = { |
|
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}", |
|
"Content-Disposition": f"attachment; filename={safe_title}.jpg", |
|
"Content-Type": "image/jpeg" |
|
} |
|
image_headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
} |
|
logging.info(f"Fetching image from {image_url} for '{post_title}'") |
|
image_response = requests.get(image_url, headers=image_headers, timeout=10) |
|
image_response.raise_for_status() |
|
|
|
response = requests.post( |
|
f"{wp_base_url}/media", |
|
headers=headers, |
|
data=image_response.content |
|
) |
|
response.raise_for_status() |
|
|
|
image_id = response.json()["id"] |
|
caption = f'<a href="{pixabay_url}">{image_source}</a> by {uploader}' if pixabay_url and uploader else image_source |
|
requests.post( |
|
f"{wp_base_url}/media/{image_id}", |
|
headers={"Authorization": headers["Authorization"], "Content-Type": "application/json"}, |
|
json={"caption": caption} |
|
) |
|
|
|
logging.info(f"Uploaded image '{safe_title}.jpg' to WP (ID: {image_id}) with caption '{caption}'") |
|
return image_id |
|
except Exception as e: |
|
logging.error(f"Image upload to WP failed for '{post_title}': {e}") |
|
return None |
|
|
|
def determine_paragraph_count(interest_score): |
|
if interest_score >= 9: |
|
return 5 |
|
elif interest_score >= 7: |
|
return 4 |
|
return 3 |
|
|
|
def is_interesting(summary): |
|
try: |
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": ( |
|
"Rate this content from 0-10 based on its rarity, buzzworthiness, and engagement potential for food lovers, covering a wide range of food topics (skip recipes). " |
|
"Score 8-10 for rare, highly shareable ideas that grab attention. " |
|
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. " |
|
"Return only a number." |
|
)}, |
|
{"role": "user", "content": f"Content: {summary}"} |
|
], |
|
max_tokens=5 |
|
) |
|
raw_score = response.choices[0].message.content.strip() |
|
score = int(raw_score) if raw_score.isdigit() else 0 |
|
print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})") |
|
logging.info(f"Interest Score: {score} (raw: {raw_score})") |
|
return score |
|
except Exception as e: |
|
logging.error(f"Interestingness scoring failed: {e}") |
|
print(f"Interest Error: {e}") |
|
return 0 |
|
|
|
def generate_title_from_summary(summary): |
|
banned_words = ["elevate", "elevating", "elevated"] |
|
for attempt in range(3): |
|
try: |
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": ( |
|
"Generate a concise, engaging title (under 100 characters) based on this summary, covering food topics. " |
|
"Craft it with Upworthy/Buzzfeed flair—think ‘you won’t believe this’ or ‘this is nuts’—for food insiders. " |
|
"Avoid quotes, emojis, special characters, or the words 'elevate', 'elevating', 'elevated'. " |
|
"End with a question to spark shares." |
|
)}, |
|
{"role": "user", "content": f"Summary: {summary}"} |
|
], |
|
max_tokens=30 |
|
) |
|
title = response.choices[0].message.content.strip().replace('"', '').replace("'", "") |
|
if ':' in title: |
|
title = title.split(':', 1)[1].strip() |
|
if len(title) > 100 or any(word in title.lower() for word in banned_words): |
|
reason = "length" if len(title) > 100 else "banned word" |
|
print(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}") |
|
logging.info(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}") |
|
continue |
|
logging.info(f"Generated title: {title}") |
|
return title |
|
except Exception as e: |
|
logging.error(f"Title generation failed (attempt {attempt + 1}/3): {e}") |
|
print(f"Title Error: {e}") |
|
print("Failed to generate valid title after 3 attempts") |
|
logging.info("Failed to generate valid title after 3 attempts") |
|
return None |
|
|
|
def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_prompt=""): |
|
try: |
|
persona = select_best_persona(interest_score, content) |
|
prompt = SUMMARY_PERSONA_PROMPTS.get(persona, "Write a concise, engaging summary that captures the essence of the content for food lovers.") |
|
logging.info(f"Using {persona} with interest_score and content") |
|
|
|
full_prompt = ( |
|
f"{prompt}\n\n" |
|
f"{extra_prompt}\n\n" |
|
f"Content to summarize:\n{content}\n\n" |
|
f"Source: {source_name}\n" |
|
f"Link: {link}" |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model=SUMMARY_MODEL, |
|
messages=[ |
|
{"role": "system", "content": full_prompt}, |
|
{"role": "user", "content": content} |
|
], |
|
max_tokens=1000, |
|
temperature=0.7 |
|
) |
|
|
|
summary = response.choices[0].message.content.strip() |
|
logging.info(f"Processed summary (Persona: {persona}): {summary}") |
|
return summary |
|
|
|
except Exception as e: |
|
logging.error(f"Summary generation failed with model {SUMMARY_MODEL}: {e}") |
|
return None |
|
|
|
def smart_image_and_filter(title, summary): |
|
try: |
|
content = f"{title}\n\n{summary}" |
|
|
|
prompt = ( |
|
'Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) ' |
|
'for an image search about food industry trends or viral content. Prioritize specific terms if present, ' |
|
'otherwise focus on the main theme. ' |
|
'Return "SKIP" if the article is about home appliances, recipes, promotions, or contains "homemade", else "KEEP". ' |
|
'Return as JSON with double quotes: {"image_query": "specific term", "relevance": ["keyword1", "keyword2"], "action": "KEEP" or "SKIP"}' |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": prompt}, |
|
{"role": "user", "content": content} |
|
], |
|
max_tokens=100 |
|
) |
|
raw_result = response.choices[0].message.content.strip() |
|
logging.info(f"Raw GPT smart image/filter response: '{raw_result}'") |
|
|
|
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip() |
|
try: |
|
result = json.loads(cleaned_result) |
|
except json.JSONDecodeError as e: |
|
logging.warning(f"JSON parsing failed: {e}, raw: '{cleaned_result}'. Using fallback.") |
|
return "food trends", ["cuisine", "dining"], False |
|
|
|
if not isinstance(result, dict) or "image_query" not in result or "relevance" not in result or "action" not in result: |
|
logging.warning(f"Invalid GPT response format: {result}, using fallback") |
|
return "food trends", ["cuisine", "dining"], False |
|
|
|
image_query = result["image_query"] |
|
relevance_keywords = result["relevance"] |
|
skip_flag = result["action"] == "SKIP" or "homemade" in title.lower() or "homemade" in summary.lower() |
|
|
|
logging.info(f"Smart image query: {image_query}, Relevance: {relevance_keywords}, Skip: {skip_flag}") |
|
|
|
if not image_query or len(image_query.split()) < 2: |
|
logging.warning(f"Image query '{image_query}' too vague, using fallback") |
|
return "food trends", ["cuisine", "dining"], skip_flag |
|
|
|
return image_query, relevance_keywords, skip_flag |
|
|
|
except Exception as e: |
|
logging.error(f"Smart image/filter failed: {e}, using fallback") |
|
return "food trends", ["cuisine", "dining"], False |
|
|
|
def is_interesting(summary): |
|
try: |
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": ( |
|
"Rate this content from 0-10 based on its rarity, buzzworthiness, and engagement potential for food lovers, covering a wide range of food topics (skip recipes). " |
|
"Score 8-10 for rare, highly shareable ideas that grab attention. " |
|
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. " |
|
"Return only a number." |
|
)}, |
|
{"role": "user", "content": f"Content: {summary}"} |
|
], |
|
max_tokens=5 |
|
) |
|
raw_score = response.choices[0].message.content.strip() |
|
score = int(raw_score) if raw_score.isdigit() else 0 |
|
print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})") |
|
logging.info(f"Interest Score: {score} (raw: {raw_score})") |
|
return score |
|
except Exception as e: |
|
logging.error(f"Interestingness scoring failed with model {LIGHT_TASK_MODEL}: {e}") |
|
print(f"Interest Error: {e}") |
|
return 0 |
|
|
|
def select_paragraphs(paragraphs, target_count, persona, original_content): |
|
"""Select or generate paragraphs to match target_count, preserving key content.""" |
|
if len(paragraphs) == target_count and all(60 <= len(p.split()) <= 80 for p in paragraphs): |
|
return paragraphs |
|
|
|
# Score paragraphs by food-related keywords |
|
keywords = ["food", "dish", "trend", "menu", "cuisine", "flavor", "taste", "eat", "dining", "restaurant"] |
|
scores = [] |
|
for para in paragraphs: |
|
score = sum(para.lower().count(kw) for kw in keywords) |
|
word_count = len(para.split()) |
|
# Penalize paragraphs outside word range |
|
score -= abs(word_count - 70) # Favor ~70 words |
|
scores.append(score) |
|
|
|
# Handle too many paragraphs |
|
if len(paragraphs) > target_count: |
|
# Keep last paragraph unless it's low-scoring |
|
if scores[-1] >= min(scores[:-1]) or len(paragraphs) == target_count + 1: |
|
selected_indices = sorted(range(len(paragraphs)-1), key=lambda i: scores[i], reverse=True)[:target_count-1] + [len(paragraphs)-1] |
|
else: |
|
selected_indices = sorted(range(len(paragraphs)), key=lambda i: scores[i], reverse=True)[:target_count] |
|
selected = [paragraphs[i] for i in sorted(selected_indices)] |
|
else: |
|
selected = paragraphs[:] |
|
|
|
# Handle word count adjustments or too few paragraphs |
|
adjusted = [] |
|
for para in selected: |
|
word_count = len(para.split()) |
|
if word_count < 60 or word_count > 80: |
|
# Rephrase to fit 60-80 words |
|
rephrase_prompt = ( |
|
f"Rephrase this paragraph to exactly 60-80 words, keeping the same tone as a {persona} and all key ideas: '{para}'" |
|
) |
|
try: |
|
response = client.chat.completions.create( |
|
model=SUMMARY_MODEL, |
|
messages=[ |
|
{"role": "system", "content": rephrase_prompt}, |
|
{"role": "user", "content": para} |
|
], |
|
max_tokens=150, |
|
temperature=0.7 |
|
) |
|
new_para = response.choices[0].message.content.strip() |
|
if 60 <= len(new_para.split()) <= 80: |
|
adjusted.append(new_para) |
|
else: |
|
adjusted.append(para) # Fallback to original if rephrase fails |
|
except Exception as e: |
|
logging.warning(f"Rephrasing failed for paragraph: {e}") |
|
adjusted.append(para) |
|
else: |
|
adjusted.append(para) |
|
|
|
# Generate additional paragraphs if needed |
|
while len(adjusted) < target_count: |
|
extra_prompt = ( |
|
f"Generate one additional paragraph (60-80 words) in the style of a {persona}, " |
|
f"based on this content: '{original_content[:200]}...'. Match the tone of: '{adjusted[-1] if adjusted else 'This trend is fire!'}'" |
|
) |
|
try: |
|
response = client.chat.completions.create( |
|
model=SUMMARY_MODEL, |
|
messages=[ |
|
{"role": "system", "content": extra_prompt}, |
|
{"role": "user", "content": original_content} |
|
], |
|
max_tokens=150, |
|
temperature=0.7 |
|
) |
|
new_para = response.choices[0].message.content.strip() |
|
if 60 <= len(new_para.split()) <= 80: |
|
adjusted.append(new_para) |
|
else: |
|
adjusted.append("This trend is sparking buzz across menus!") # Fallback |
|
except Exception as e: |
|
logging.warning(f"Extra paragraph generation failed: {e}") |
|
adjusted.append("This vibe is shaking up the food scene!") |
|
|
|
return adjusted[:target_count] |
|
|
|
def insert_link_naturally(summary, source_name, source_url): |
|
import re |
|
try: |
|
prompt = ( |
|
"Take this summary and insert a single HTML link naturally into one paragraph (randomly chosen). " |
|
"Use the format '<a href=\"{source_url}\">{source_name}</a>' and weave it into the text seamlessly, " |
|
"e.g., 'The latest scoop from {source_name} reveals...' or '{source_name} uncovers this wild shift.' " |
|
"Vary the phrasing creatively to avoid repetition (don’t always use 'dives into'). " |
|
"Place the link at a sentence boundary (after a period, not within numbers like '6.30am' or '1.5'). " |
|
"Maintain the original tone and flow, ensuring the link reads as part of the sentence, not standalone. " |
|
"Return the modified summary with exactly one link, no extra formatting or newlines beyond the original.\n\n" |
|
"Summary:\n{summary}\n\n" |
|
"Source Name: {source_name}\nSource URL: {source_url}" |
|
).format(summary=summary, source_name=source_name, source_url=source_url) |
|
|
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": prompt}, |
|
{"role": "user", "content": "Insert the link naturally into the summary."} |
|
], |
|
max_tokens=1000, |
|
temperature=0.7 |
|
) |
|
new_summary = response.choices[0].message.content.strip() |
|
link_pattern = f'<a href="{source_url}">{source_name}</a>' |
|
if new_summary and new_summary.count(link_pattern) == 1: |
|
logging.info(f"Summary with naturally embedded link: {new_summary}") |
|
return new_summary |
|
|
|
logging.warning(f"GPT failed to insert link correctly: {new_summary}. Using fallback.") |
|
except Exception as e: |
|
logging.error(f"Link insertion failed: {e}") |
|
|
|
# Fallback: Protect times and insert at sentence end |
|
time_pattern = r'\b\d{1,2}\.\d{2}(?:am|pm)\b' # Matches 6.30am, 12.15pm |
|
protected_summary = re.sub(time_pattern, lambda m: m.group(0).replace('.', '@'), summary) |
|
paragraphs = protected_summary.split('\n') |
|
if not paragraphs or all(not p.strip() for p in paragraphs): |
|
logging.error("No valid paragraphs to insert link.") |
|
return summary |
|
|
|
target_para = random.choice([p for p in paragraphs if p.strip()]) |
|
phrases = [ |
|
f"The scoop from {link_pattern} spills the details", |
|
f"{link_pattern} uncovers this wild shift", |
|
f"This gem via {link_pattern} drops some truth", |
|
f"{link_pattern} breaks down the buzz" |
|
] |
|
insertion_phrase = random.choice(phrases) |
|
|
|
# Find sentence boundary, avoiding protected times |
|
sentences = re.split(r'(?<=[.!?])\s+', target_para) |
|
insertion_point = -1 |
|
for i, sent in enumerate(sentences): |
|
if sent.strip() and '@' not in sent: # Avoid sentences with protected times |
|
insertion_point = sum(len(s) + 1 for s in sentences[:i+1]) |
|
break |
|
if insertion_point == -1: |
|
insertion_point = len(target_para) # Append if no good boundary |
|
|
|
# Add space after insertion phrase |
|
new_para = f"{target_para[:insertion_point]} {insertion_phrase}. {target_para[insertion_point:]}".strip() |
|
paragraphs[paragraphs.index(target_para)] = new_para |
|
new_summary = '\n'.join(paragraphs) |
|
|
|
# Restore periods in times |
|
new_summary = new_summary.replace('@', '.') |
|
logging.info(f"Fallback summary with link: {new_summary}") |
|
return new_summary |
|
|
|
def generate_category_from_summary(summary): |
|
try: |
|
if not isinstance(summary, str) or not summary.strip(): |
|
logging.warning(f"Invalid summary for category generation: {summary}. Defaulting to 'Trends'.") |
|
return "Trends" |
|
|
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": ( |
|
"Based on this summary, select the most relevant category from: Food, Culture, Trends, Health, Lifestyle, Drink, Eats. " |
|
"Return only the category name." |
|
)}, |
|
{"role": "user", "content": summary} |
|
], |
|
max_tokens=10 |
|
) |
|
category = response.choices[0].message.content.strip() |
|
logging.info(f"Generated category: {category}") |
|
return category if category in ["Food", "Culture", "Trends", "Health", "Lifestyle", "Drink", "Eats"] else "Trends" |
|
except Exception as e: |
|
logging.error(f"Category generation failed: {e}") |
|
return "Trends" |
|
|
|
def get_wp_category_id(category_name, wp_base_url, wp_username, wp_password): |
|
try: |
|
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"} |
|
response = requests.get(f"{wp_base_url}/categories", headers=headers, params={"search": category_name}) |
|
response.raise_for_status() |
|
categories = response.json() |
|
for cat in categories: |
|
if cat["name"].lower() == category_name.lower(): |
|
return cat["id"] |
|
return None |
|
except Exception as e: |
|
logging.error(f"Failed to get WP category ID for '{category_name}': {e}") |
|
return None |
|
|
|
def create_wp_category(category_name, wp_base_url, wp_username, wp_password): |
|
try: |
|
headers = { |
|
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}", |
|
"Content-Type": "application/json" |
|
} |
|
payload = {"name": category_name} |
|
response = requests.post(f"{wp_base_url}/categories", headers=headers, json=payload) |
|
response.raise_for_status() |
|
return response.json()["id"] |
|
except Exception as e: |
|
logging.error(f"Failed to create WP category '{category_name}': {e}") |
|
return None |
|
|
|
def get_wp_tag_id(tag_name, wp_base_url, wp_username, wp_password): |
|
try: |
|
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"} |
|
response = requests.get(f"{wp_base_url}/tags", headers=headers, params={"search": tag_name}) |
|
response.raise_for_status() |
|
tags = response.json() |
|
for tag in tags: |
|
if tag["name"].lower() == tag_name.lower(): |
|
return tag["id"] |
|
return None |
|
except Exception as e: |
|
logging.error(f"Failed to get WP tag ID for '{tag_name}': {e}") |
|
return None |
|
|
|
def post_to_wp(post_data, category, link, author, image_url, original_source, image_source="Pixabay", uploader=None, pixabay_url=None, interest_score=4, post_id=None): |
|
wp_base_url = "https://insiderfoodie.com/wp-json/wp/v2" |
|
logging.info(f"Starting post_to_wp for '{post_data['title']}', image_source: {image_source}") |
|
|
|
if not isinstance(author, dict) or "username" not in author or "password" not in author: |
|
raise ValueError(f"Invalid author data: {author}. Expected a dictionary with 'username' and 'password' keys.") |
|
|
|
wp_username = author["username"] |
|
wp_password = author["password"] |
|
|
|
if not isinstance(interest_score, int): |
|
logging.error(f"Invalid interest_score type: {type(interest_score)}, value: '{interest_score}'. Defaulting to 4.") |
|
interest_score = 4 |
|
elif interest_score < 0 or interest_score > 10: |
|
logging.warning(f"interest_score out of valid range (0-10): {interest_score}. Clamping to 4.") |
|
interest_score = min(max(interest_score, 0), 10) |
|
|
|
try: |
|
headers = { |
|
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
auth_test = requests.get(f"{wp_base_url}/users/me", headers=headers) |
|
auth_test.raise_for_status() |
|
logging.info(f"Auth test passed for {wp_username}: {auth_test.json()['id']}") |
|
|
|
category_id = get_wp_category_id(category, wp_base_url, wp_username, wp_password) |
|
if not category_id: |
|
category_id = create_wp_category(category, wp_base_url, wp_username, wp_password) |
|
logging.info(f"Created new category '{category}' with ID {category_id}") |
|
else: |
|
logging.info(f"Found existing category '{category}' with ID {category_id}") |
|
|
|
tags = [1] |
|
if interest_score >= 9: |
|
picks_tag_id = get_wp_tag_id("Picks", wp_base_url, wp_username, wp_password) |
|
if picks_tag_id and picks_tag_id not in tags: |
|
tags.append(picks_tag_id) |
|
logging.info(f"Added 'Picks' tag (ID: {picks_tag_id}) to post due to high interest score: {interest_score}") |
|
|
|
content = post_data["content"] |
|
if content is None: |
|
logging.error(f"Post content is None for title '{post_data['title']}' - using fallback") |
|
content = "Content unavailable. Check the original source for details." |
|
formatted_content = "\n".join(f"<p>{para}</p>" for para in content.split('\n') if para.strip()) |
|
author_id_map = { |
|
"owenjohnson": 10, |
|
"javiermorales": 2, |
|
"aishapatel": 3, |
|
"trangnguyen": 12, |
|
"keishareid": 13, |
|
"lilamoreau": 7 |
|
} |
|
author_id = author_id_map.get(author["username"], 5) |
|
|
|
payload = { |
|
"title": post_data["title"], |
|
"content": formatted_content, |
|
"status": "publish", |
|
"categories": [category_id], |
|
"tags": tags, |
|
"author": author_id, |
|
"meta": { |
|
"original_link": link, |
|
"original_source": original_source, |
|
"interest_score": interest_score |
|
} |
|
} |
|
|
|
if image_url and not post_id: |
|
logging.info(f"Attempting image upload for '{post_data['title']}', URL: {image_url}, source: {image_source}") |
|
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, pixabay_url) |
|
if not image_id: |
|
logging.info(f"Flickr upload failed for '{post_data['title']}', falling back to Pixabay") |
|
pixabay_query = post_data["title"][:50] |
|
image_url, image_source, uploader, pixabay_url = get_image(pixabay_query) |
|
if image_url: |
|
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, pixabay_url) |
|
if image_id: |
|
payload["featured_media"] = image_id |
|
else: |
|
logging.warning(f"All image uploads failed for '{post_data['title']}' - posting without image") |
|
|
|
endpoint = f"{wp_base_url}/posts/{post_id}" if post_id else f"{wp_base_url}/posts" |
|
method = requests.post # Use POST for both create and update (WP API handles it) |
|
|
|
logging.debug(f"Sending WP request to {endpoint} with payload: {json.dumps(payload, indent=2)}") |
|
|
|
response = method(endpoint, headers=headers, json=payload) |
|
response.raise_for_status() |
|
|
|
post_info = response.json() |
|
logging.debug(f"WP response: {json.dumps(post_info, indent=2)}") |
|
|
|
if not isinstance(post_info, dict) or "id" not in post_info: |
|
raise ValueError(f"Invalid WP response: {post_info}") |
|
|
|
post_id = post_info["id"] |
|
post_url = post_info["link"] |
|
|
|
# Save to recent_posts.json |
|
timestamp = datetime.now(timezone.utc).isoformat() |
|
save_post_to_recent(post_data["title"], post_url, author["username"], timestamp) |
|
|
|
logging.info(f"Posted/Updated by {author['username']}: {post_data['title']} (ID: {post_id})") |
|
return post_id, post_url |
|
|
|
logging.info(f"Posted/Updated by {author['username']}: {post_data['title']} (ID: {post_id})") |
|
return post_id, post_url |
|
|
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"WP API request failed: {e} - Response: {e.response.text if e.response else 'No response'}") |
|
print(f"WP Error: {e}") |
|
return None, None |
|
except KeyError as e: |
|
logging.error(f"WP payload error - Missing key: {e} - Author data: {author}") |
|
print(f"WP Error: {e}") |
|
return None, None |
|
except Exception as e: |
|
logging.error(f"WP posting failed: {e}") |
|
print(f"WP Error: {e}") |
|
return None, None |
|
|
|
def get_flickr_image_via_ddg(search_query, relevance_keywords): |
|
try: |
|
with DDGS() as ddgs: |
|
results = ddgs.images( |
|
f"{search_query} flickr site:flickr.com -poster -infographic -chart -graph -data -stats -text -typography", |
|
license_image="sharecommercially", |
|
max_results=30 |
|
) |
|
if not results: |
|
logging.warning(f"No Flickr images found via DDG for query '{search_query}'") |
|
return None, None, None, None |
|
|
|
headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'} |
|
candidates = [] |
|
|
|
for r in results: |
|
image_url = r.get("image", "") |
|
page_url = r.get("url", "") |
|
if not image_url or "live.staticflickr.com" not in image_url: |
|
continue |
|
|
|
try: |
|
response = requests.get(page_url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
time.sleep(1) |
|
|
|
tags_elem = soup.find_all('a', class_='tag') |
|
tags = [tag.text.strip().lower() for tag in tags_elem] if tags_elem else [] |
|
title_elem = soup.find('h1', class_='photo-title') |
|
title = title_elem.text.strip().lower() if title_elem else r.get("title", "").lower() |
|
|
|
exclude_keywords = [ |
|
"poster", "infographic", "chart", "graph", "data", "stats", "text", "typography", |
|
"design", "advertisement", "illustration", "diagram", "layout", "print" |
|
] |
|
matched_keywords = [kw for kw in exclude_keywords if kw in tags or kw in title] |
|
if matched_keywords: |
|
logging.info(f"Skipping text-heavy image: {image_url} (tags: {tags}, title: {title}, matched: {matched_keywords})") |
|
continue |
|
|
|
uploader = soup.find('a', class_='owner-name') |
|
uploader = uploader.text.strip() if uploader else "Flickr User" |
|
candidates.append({ |
|
"image_url": image_url, |
|
"page_url": page_url, |
|
"uploader": uploader, |
|
"tags": tags, |
|
"title": title |
|
}) |
|
|
|
except requests.exceptions.RequestException as e: |
|
logging.info(f"Skipping unavailable image: {image_url} (page: {page_url}, error: {e})") |
|
continue |
|
|
|
if not candidates: |
|
logging.warning(f"No valid candidate images after filtering for '{search_query}'") |
|
return None, None, None, None |
|
|
|
result = random.choice(candidates) |
|
image_url = result["image_url"] |
|
|
|
# OCR check on the selected image |
|
temp_file = None |
|
try: |
|
img_response = requests.get(image_url, headers=headers, timeout=10) |
|
img_response.raise_for_status() |
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: |
|
temp_file.write(img_response.content) |
|
temp_path = temp_file.name |
|
|
|
img = Image.open(temp_path) |
|
text = pytesseract.image_to_string(img) |
|
char_count = len(text.strip()) |
|
logging.info(f"OCR processed {image_url}: {char_count} characters detected") |
|
|
|
if char_count > 200: |
|
logging.info(f"Skipping text-heavy image (OCR): {image_url} (char_count: {char_count})") |
|
return None, None, None, None # Fall back to Pixabay |
|
|
|
# Success: Save and return |
|
flickr_data = { |
|
"title": search_query, |
|
"image_url": image_url, |
|
"source": "Flickr", |
|
"uploader": result["uploader"], |
|
"page_url": result["page_url"], |
|
"timestamp": datetime.now().isoformat(), |
|
"ocr_chars": char_count |
|
} |
|
flickr_file = "/home/shane/foodie_automator/flickr_images.json" |
|
with open(flickr_file, 'a') as f: |
|
json.dump(flickr_data, f) |
|
f.write('\n') |
|
logging.info(f"Saved Flickr image to {flickr_file}: {image_url}") |
|
logging.info(f"Fetched Flickr image URL: {image_url} by {result['uploader']} for query '{search_query}' (tags: {result['tags']})") |
|
print(f"DEBUG: Flickr image selected: {image_url}") |
|
return image_url, "Flickr", result["uploader"], result["page_url"] |
|
|
|
except requests.exceptions.HTTPError as e: |
|
if e.response.status_code == 429: |
|
logging.warning(f"Rate limit hit for {image_url}. Falling back to Pixabay.") |
|
return None, None, None, None |
|
else: |
|
logging.warning(f"Download failed for {image_url}: {e}") |
|
return None, None, None, None |
|
except Exception as e: |
|
logging.warning(f"OCR processing failed for {image_url}: {e}") |
|
return None, None, None, None |
|
finally: |
|
if temp_file and os.path.exists(temp_path): |
|
os.unlink(temp_path) |
|
|
|
except Exception as e: |
|
logging.error(f"Flickr/DDG image fetch failed for '{search_query}': {e}") |
|
return None, None, None, None |
|
|
|
def select_best_author(summary): |
|
try: |
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": ( |
|
"Based on this restaurant/food industry trend summary, pick the most suitable author from: " |
|
"owenjohnson, javiermorales, aishapatel, trangnguyen, keishareid, lilamoreau. " |
|
"Consider their expertise: owenjohnson (global dining trends), javiermorales (food critique), " |
|
"aishapatel (emerging food trends), trangnguyen (cultural dining), keishareid (soul food heritage), " |
|
"lilamoreau (global street food). Return only the username." |
|
)}, |
|
{"role": "user", "content": summary} |
|
], |
|
max_tokens=20 |
|
) |
|
author = response.choices[0].message.content.strip() |
|
valid_authors = ["owenjohnson", "javiermorales", "aishapatel", "trangnguyen", "keishareid", "lilamoreau"] |
|
logging.info(f"Selected author: {author}") |
|
return author if author in valid_authors else "owenjohnson" |
|
except Exception as e: |
|
logging.error(f"Author selection failed: {e}") |
|
return "owenjohnson" |
|
|
|
def prepare_post_data(final_summary, original_title, context_info=""): |
|
innovative_title = generate_title_from_summary(final_summary) |
|
if not innovative_title: |
|
logging.info(f"Title generation failed for '{original_title}' {context_info}") |
|
return None, None, None, None, None, None, None |
|
|
|
# Note: This function still uses generate_image_query, but curate_from_rss overrides it with smart_image_and_filter |
|
search_query, relevance_keywords = generate_image_query(f"{innovative_title}\n\n{final_summary}") |
|
if not search_query: |
|
logging.info(f"Image query generation failed for '{innovative_title}' {context_info}") |
|
return None, None, None, None, None, None, None |
|
|
|
logging.info(f"Fetching Flickr image for query: '{search_query}' {context_info}") |
|
image_url, image_source, uploader, page_url = get_flickr_image_via_ddg(search_query, relevance_keywords) |
|
|
|
if not image_url: |
|
logging.info(f"Flickr fetch failed for '{search_query}' - falling back to Pixabay {context_info}") |
|
image_query, _ = generate_image_query(f"{innovative_title}\n\n{final_summary}") |
|
image_url, image_source, uploader, page_url = get_image(image_query) |
|
if not image_url: |
|
logging.info(f"Pixabay fetch failed for title '{innovative_title}' - falling back to summary {context_info}") |
|
image_query, _ = generate_image_query(f"{final_summary}") |
|
image_url, image_source, uploader, page_url = get_image(image_query) |
|
if not image_url: |
|
logging.info(f"Image fetch failed again for '{original_title}' - proceeding without image {context_info}") |
|
|
|
post_data = {"title": innovative_title, "content": final_summary} |
|
selected_username = select_best_author(final_summary) |
|
author = next((a for a in AUTHORS if a["username"] == selected_username), None) |
|
if not author: |
|
logging.error(f"Author '{selected_username}' not found in AUTHORS, defaulting to owenjohnson") |
|
author = {"username": "owenjohnson", "password": "rfjk xhn6 2RPy FuQ9 cGlU K8mC"} |
|
category = generate_category_from_summary(final_summary) |
|
|
|
return post_data, author, category, image_url, image_source, uploader, page_url |
|
|
|
def save_post_to_recent(post_title, post_url, author_username, timestamp): |
|
"""Save post details to recent_posts.json.""" |
|
try: |
|
recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json') |
|
entry = { |
|
"title": post_title, |
|
"url": post_url, |
|
"author_username": author_username, |
|
"timestamp": timestamp |
|
} |
|
recent_posts.append(entry) |
|
with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f: |
|
for item in recent_posts: |
|
json.dump(item, f) |
|
f.write('\n') |
|
logging.info(f"Saved post '{post_title}' to recent_posts.json") |
|
except Exception as e: |
|
logging.error(f"Failed to save post to recent_posts.json: {e}") |
|
|
|
def prune_recent_posts(): |
|
"""Prune recent_posts.json to keep only entries from the last 24 hours.""" |
|
try: |
|
cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat() |
|
recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json') |
|
recent_posts = [entry for entry in recent_posts if entry["timestamp"] > cutoff] |
|
with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f: |
|
for item in recent_posts: |
|
json.dump(item, f) |
|
f.write('\n') |
|
logging.info(f"Pruned recent_posts.json to {len(recent_posts)} entries") |
|
except Exception as e: |
|
logging.error(f"Failed to prune recent_posts.json: {e}") |