You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

1237 lines
54 KiB

import base64
import json
import logging
import os
import random
import re
from PIL import Image
import pytesseract
import io
import tempfile
import shutil
import requests
import time
import openai
from dotenv import load_dotenv
from datetime import datetime, timezone, timedelta
from openai import OpenAI
from urllib.parse import quote
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import tweepy
import flickr_api
from filelock import FileLock
from foodie_config import (
RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS,
get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL, X_API_CREDENTIALS,
FLICKR_API_KEY, FLICKR_API_SECRET, PIXABAY_API_KEY, RECENT_POSTS_FILE, USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS
)
last_author_index = -1
# Global to track round-robin index
round_robin_index = 0
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
IMAGE_UPLOAD_TIMEOUT = 30 # Added to fix NameError
IMAGE_EXPIRATION_DAYS = 7 # 7 days, consistent with foodie_automator_rss.py
def load_json_file(file_path, expiration_hours=None, default=None):
"""
Load JSON file, optionally filtering expired entries and returning default if invalid.
"""
logger = logging.getLogger(__name__)
if default is None:
default = [] # Default to list for posted_rss_titles.json and used_images.json
if not os.path.exists(file_path):
logger.info(f"File {file_path} does not exist. Returning default: {default}")
return default
try:
with open(file_path, 'r') as f:
data = json.load(f)
if expiration_hours is not None:
cutoff = datetime.now(timezone.utc) - timedelta(hours=expiration_hours)
filtered_data = [
entry for entry in data
if datetime.fromisoformat(entry['timestamp']) > cutoff
]
if len(filtered_data) < len(data):
logger.info(f"Filtered {len(data) - len(filtered_data)} expired entries from {file_path}")
save_json_file(file_path, filtered_data) # Save filtered data
data = filtered_data
logger.info(f"Loaded {len(data)} valid entries from {file_path}")
return data
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in {file_path}: {str(e)}. Resetting to default.")
save_json_file(file_path, default)
return default
except Exception as e:
logger.error(f"Failed to load {file_path}: {str(e)}. Returning default.")
return default
def save_json_file(file_path, data, timestamp=None):
"""
Save data to JSON file atomically. If timestamp is provided, append as an entry.
"""
logger = logging.getLogger(__name__)
try:
# If timestamp is provided, append as a new entry
if timestamp:
current_data = load_json_file(file_path, default=[])
new_entry = {'title': data, 'timestamp': timestamp}
if new_entry not in current_data: # Avoid duplicates
current_data.append(new_entry)
data = current_data
else:
logger.info(f"Entry {data} already exists in {file_path}")
return True
# Validate JSON
json.dumps(data)
# Write to temp file
temp_file = tempfile.NamedTemporaryFile('w', delete=False, encoding='utf-8')
with open(temp_file.name, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
# Atomically move to target
shutil.move(temp_file.name, file_path)
logger.info(f"Saved data to {file_path}")
return True
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Failed to save {file_path}: {str(e)}")
return False
def generate_article_tweet(author, post, persona):
title = post["title"]
url = post["url"]
author_handle = f"@{author['username']}"
prompt = (
f"Craft a sharp tweet (under 230 characters) for {author_handle} with the voice of '{persona}'. "
f"Distill the essence of the article '{title}' into a concise, engaging message. "
f"Include the raw URL '{url}' at the end. "
f"Do not wrap the tweet in quotation marks. "
f"Make it bold, spark curiosity, and invite engagement with a human touch. "
f"Swap 'elevate' for dynamic terms like 'ignite' or 'unleash'. "
f"Absolutely do not include hashtags, emojis, or phrases like '[Read more]' or 'Read more'. "
f"Skip any extra fluff or formatting around the URL—just append the raw URL after a space. "
f"Example: 'Love food trends? Check this out! {url}'"
)
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media viral expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=80,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
# Post-generation check: Strip any emojis using regex
tweet = re.sub(r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F700-\U0001F77F\U0001F780-\U0001F7FF\U0001F800-\U0001F8FF\U0001F900-\U0001F9FF\U0001FA00-\U0001FA6F\U0001FA70-\U0001FAFF\U00002702-\U000027B0\U000024C2-\U0001F251]', '', tweet).strip()
# Strip "[Read more]" or similar phrases as an additional failsafe
tweet = re.sub(r'\[Read more\]\(.*?\)|\bRead more\b', '', tweet).strip()
# Strip leading or trailing quotation marks
tweet = tweet.strip('"\'')
# Remove the URL if it already exists in the tweet to avoid duplication
tweet = re.sub(rf'\s*{re.escape(url)}$', '', tweet).strip()
# Ensure tweet fits within 280 characters, accounting for URL (Twitter shortens to 23 chars)
url_length = 23
max_tweet_length = 280 - url_length - 1 # Subtract 1 for the space before URL
if len(tweet) > max_tweet_length:
tweet = tweet[:max_tweet_length-3] + "..."
# Append the URL exactly once
tweet = tweet + " " + url
logging.info(f"Generated tweet: {tweet}")
return tweet
def post_tweet(author, tweet, reply_to_id=None):
"""
Post a tweet with real-time X API rate limit checking.
Updates rate_limit_info.json with tweet-specific limits.
"""
from foodie_config import X_API_CREDENTIALS
import logging
import tweepy
credentials = X_API_CREDENTIALS.get(author["username"])
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return False
logging.debug(f"Attempting to post tweet for {author['username']} (handle: {credentials['x_username']})")
logging.debug(f"Credentials: api_key={credentials['api_key'][:4]}..., access_token={credentials['access_token'][:4]}...")
logging.debug(f"Tweet content: {tweet}")
if reply_to_id:
logging.debug(f"Replying to tweet ID: {reply_to_id}")
rate_limit_file = '/home/shane/foodie_automator/rate_limit_info.json'
rate_limit_info = load_json_file(rate_limit_file, default={})
username = author["username"]
if username not in rate_limit_info:
rate_limit_info[username] = {
'tweet_remaining': 17,
'tweet_reset': time.time()
}
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
response = client.create_tweet(
text=tweet,
in_reply_to_tweet_id=reply_to_id
)
tweet_id = response.data['id']
logging.info(f"Successfully posted tweet {tweet_id} for {author['username']} (handle: {credentials['x_username']}): {tweet}")
# Update tweet rate limits (local decrement, headers on 429)
rate_limit_info[username]['tweet_remaining'] = max(0, rate_limit_info[username]['tweet_remaining'] - 1)
save_json_file(rate_limit_file, rate_limit_info)
logging.info(f"Updated tweet rate limit for {username}: {rate_limit_info[username]['tweet_remaining']} remaining, reset at {datetime.fromtimestamp(rate_limit_info[username]['tweet_reset'], tz=timezone.utc)}")
return {"id": tweet_id}
except tweepy.TweepyException as e:
logging.error(f"Failed to post tweet for {author['username']} (handle: {credentials['x_username']}): {e}")
if hasattr(e, 'response') and e.response and e.response.status_code == 429:
headers = e.response.headers
user_remaining = headers.get('x-user-limit-24hour-remaining', 0)
user_reset = headers.get('x-user-limit-24hour-reset', time.time() + 86400)
try:
user_remaining = int(user_remaining)
user_reset = int(user_reset)
except (ValueError, TypeError):
user_remaining = 0
user_reset = time.time() + 86400
rate_limit_info[username]['tweet_remaining'] = user_remaining
rate_limit_info[username]['tweet_reset'] = user_reset
save_json_file(rate_limit_file, rate_limit_info)
logging.info(f"Rate limit exceeded for {username}: {user_remaining} remaining, reset at {datetime.fromtimestamp(user_reset, tz=timezone.utc)}")
return False
except Exception as e:
logging.error(f"Unexpected error posting tweet for {author['username']} (handle: {credentials['x_username']}): {e}", exc_info=True)
return False
def select_best_persona(interest_score, content=""):
logging.info("Using select_best_persona with interest_score and content")
personas = ["Visionary Editor", "Foodie Critic", "Trend Scout", "Culture Connoisseur"]
content_lower = content.lower()
if any(kw in content_lower for kw in ["tech", "ai", "innovation", "sustainability"]):
return random.choice(["Trend Scout", "Visionary Editor"])
elif any(kw in content_lower for kw in ["review", "critic", "taste", "flavor"]):
return "Foodie Critic"
elif any(kw in content_lower for kw in ["culture", "tradition", "history"]):
return "Culture Connoisseur"
if interest_score >= 8:
return random.choice(personas[:2])
elif interest_score >= 6:
return random.choice(personas[2:])
return random.choice(personas)
def get_image(search_query):
headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}
# Try Pixabay with the original query
try:
pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(search_query)}&image_type=photo&per_page=10"
response = requests.get(pixabay_url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
for hit in data.get('hits', []):
img_url = hit.get('webformatURL')
if not img_url or img_url in used_images:
continue
uploader = hit.get('user', 'Unknown')
page_url = hit.get('pageURL', img_url)
used_images.add(img_url)
save_used_images()
logging.info(f"Selected Pixabay image: {img_url} by {uploader} for query '{search_query}'")
return img_url, "Pixabay", uploader, page_url
logging.info(f"No valid Pixabay image found for query '{search_query}'. Trying fallback query.")
except Exception as e:
logging.warning(f"Pixabay image fetch failed for query '{search_query}': {e}")
# Fallback to a generic query
fallback_query = "food dining"
try:
pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(fallback_query)}&image_type=photo&per_page=10"
response = requests.get(pixabay_url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
for hit in data.get('hits', []):
img_url = hit.get('webformatURL')
if not img_url or img_url in used_images:
continue
uploader = hit.get('user', 'Unknown')
page_url = hit.get('pageURL', img_url)
used_images.add(img_url)
save_used_images()
logging.info(f"Selected Pixabay fallback image: {img_url} by {uploader} for query '{fallback_query}'")
return img_url, "Pixabay", uploader, page_url
logging.warning(f"No valid Pixabay image found for fallback query '{fallback_query}'.")
except Exception as e:
logging.warning(f"Pixabay fallback image fetch failed for query '{fallback_query}': {e}")
# Ultimate fallback: return None but log clearly
logging.error(f"All image fetch attempts failed for query '{search_query}'. Returning None.")
return None, None, None, None
def generate_image_query(title, summary):
try:
prompt = (
"Given the following article title and summary, generate a concise image search query (max 5 words) to find a relevant image. "
"Also provide a list of relevance keywords (max 5 words) that should be associated with the image. "
"Return the result as a JSON object with 'search' and 'relevance' keys.\n\n"
f"Title: {title}\n\n"
f"Summary: {summary}\n\n"
"Example output:\n"
"```json\n"
"{\"search\": \"Italian cuisine trends\", \"relevance\": \"pasta wine dining culture\"}\n"
"```"
)
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": "Generate an image search query and relevance keywords."}
],
max_tokens=100,
temperature=0.5
)
raw_response = response.choices[0].message.content
json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response)
if not json_match:
logging.warning(f"Failed to parse image query JSON: {raw_response}")
return title, [], True
query_data = json.loads(json_match.group(1))
search_query = query_data.get("search", title)
relevance_keywords = query_data.get("relevance", "").split()
# Log the JSON object in a single line
log_json = json.dumps(query_data).replace('\n', ' ').replace('\r', ' ')
logging.debug(f"Image query from content: {log_json}")
return search_query, relevance_keywords, False
except Exception as e:
logging.warning(f"Image query generation failed: {e}. Using title as fallback.")
return title, [], True
def smart_image_and_filter(title, summary):
try:
content = f"{title}\n\n{summary}"
prompt = (
"Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) "
"for an image search about food industry trends or viral content. Prioritize specific multi-word terms if present, "
"otherwise focus on the main theme. Also identify the main topic of the article (e.g., a specific food item or cuisine). "
"Return 'SKIP' if the article is about home appliances, recipes, promotions, or contains 'homemade', else 'KEEP'. "
"Return as JSON with double quotes for all property names and string values (e.g., "
"{\"image_query\": \"fast food trends\", \"relevance\": [\"fast food\", \"dining\", \"culture\"], \"main_topic\": \"fast food\", \"action\": \"KEEP\"})."
)
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": content}
],
max_tokens=100
)
raw_result = response.choices[0].message.content.strip()
logging.debug(f"Raw GPT smart image/filter response: '{raw_result}'")
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip()
fixed_result = re.sub(r"(?<!\\)'(?=\s*[\w\s]*\])|(?<=\[|\{|\s)'|'(?=\s*[\]\},:])|(?<=\w)'(?=\s*:)", '"', cleaned_result)
try:
result = json.loads(fixed_result)
except json.JSONDecodeError as e:
logging.warning(f"JSON parsing failed: {e}, raw: '{fixed_result}'. Using fallback.")
main_topic = extract_main_topic(title.lower() + " " + summary.lower())
return main_topic, [main_topic, "food"], main_topic, False
if not isinstance(result, dict) or "image_query" not in result or "relevance" not in result or "action" not in result:
logging.warning(f"Invalid GPT response format: {result}, using fallback")
main_topic = extract_main_topic(title.lower() + " " + summary.lower())
return main_topic, [main_topic, "food"], main_topic, False
image_query = result["image_query"]
relevance_keywords = result["relevance"]
main_topic = result.get("main_topic", extract_main_topic(title.lower() + " " + summary.lower()))
skip_flag = result["action"] == "SKIP" or "homemade" in title.lower() or "homemade" in summary.lower()
logging.info(f"Smart image query: {image_query}, Relevance: {relevance_keywords}, Main Topic: {main_topic}, Skip: {skip_flag}")
specific_single_words = ["kimchi", "sushi", "pizza", "taco", "burger", "chipotle", "starbucks", "mcdonalds"]
if not image_query:
logging.warning(f"Image query is empty, using fallback")
return main_topic, [main_topic, "food"], main_topic, skip_flag
if len(image_query.split()) < 2 and image_query.lower() not in specific_single_words:
logging.warning(f"Image query '{image_query}' too vague, using fallback")
return main_topic, [main_topic, "food"], main_topic, skip_flag
return image_query, relevance_keywords, main_topic, skip_flag
except Exception as e:
logging.error(f"Smart image/filter failed: {e}, using fallback")
main_topic = extract_main_topic(title.lower() + " " + summary.lower())
return main_topic, [main_topic, "food"], main_topic, False
def extract_main_topic(text):
# Common food-related keywords (expand as needed)
food_keywords = ["kimchi", "sushi", "pizza", "taco", "burger", "ramen", "curry", "pasta", "salad", "soup"]
for keyword in food_keywords:
if keyword in text:
return keyword
# Fallback to a generic term if no specific food item is found
return "food trends"
def upload_image_to_wp(image_url, post_title, wp_base_url, wp_username, wp_password, image_source="Pixabay", uploader=None, page_url=None):
try:
safe_title = post_title.encode('ascii', 'ignore').decode('ascii').replace(' ', '_')[:50]
headers = {
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}",
"Content-Disposition": f"attachment; filename={safe_title}.jpg",
"Content-Type": "image/jpeg"
}
image_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
logging.info(f"Fetching image from {image_url} for '{post_title}'")
for attempt in range(3):
try:
image_response = requests.get(image_url, headers=image_headers, timeout=IMAGE_UPLOAD_TIMEOUT)
if image_response.status_code == 429:
wait_time = 10 * (2 ** attempt)
logging.warning(f"Rate limit hit for {image_url}. Retrying after {wait_time}s (attempt {attempt+1}/3).")
time.sleep(wait_time)
continue
image_response.raise_for_status()
break
except requests.exceptions.RequestException as e:
logging.warning(f"Image fetch failed for {image_url} (attempt {attempt+1}/3): {e}")
if attempt == 2:
logging.error(f"Failed to fetch image {image_url} after 3 attempts")
return None
time.sleep(10 * (2 ** attempt))
else:
logging.error(f"Failed to fetch image {image_url} after retries")
return None
response = requests.post(
f"{wp_base_url}/media",
headers=headers,
data=image_response.content
)
response.raise_for_status()
image_id = response.json()["id"]
if page_url and uploader:
caption = f'<a href="{page_url}">{image_source}</a> by {uploader}'
elif page_url:
caption = f'<a href="{page_url}">{image_source}</a>'
else:
caption = image_source
requests.post(
f"{wp_base_url}/media/{image_id}",
headers={"Authorization": headers["Authorization"], "Content-Type": "application/json"},
json={"caption": caption}
)
logging.info(f"Uploaded image '{safe_title}.jpg' to WP (ID: {image_id}) with caption '{caption}'")
return image_id
except Exception as e:
logging.error(f"Image upload to WP failed for '{post_title}': {e}")
print(f"Image upload to WP failed for '{post_title}': {e}")
return None
def determine_paragraph_count(interest_score):
if interest_score >= 9:
return 5
elif interest_score >= 7:
return 4
return 3
def is_interesting(summary):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Rate this content from 0-10 based on its rarity, buzzworthiness, and engagement potential for food lovers, covering a wide range of food topics (skip recipes). "
"Score 8-10 for rare, highly shareable ideas that grab attention. "
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. "
"Return only a number."
)},
{"role": "user", "content": f"Content: {summary}"}
],
max_tokens=5
)
raw_score = response.choices[0].message.content.strip()
score = int(raw_score) if raw_score.isdigit() else 0
print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})")
logging.info(f"Interest Score: {score} (raw: {raw_score})")
return score
except Exception as e:
logging.error(f"Interestingness scoring failed: {e}")
print(f"Interest Error: {e}")
return 0
def generate_title_from_summary(summary):
banned_words = ["elevate", "elevating", "elevated"]
for attempt in range(3):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Generate a concise, engaging title (under 100 characters) based on this summary, covering food topics. "
"Craft it with Upworthy/Buzzfeed flair—think ‘you won’t believe this’ or ‘this is nuts’—for food insiders. "
"Avoid quotes, emojis, special characters, or the words 'elevate', 'elevating', 'elevated'. "
"End with a question to spark shares."
)},
{"role": "user", "content": f"Summary: {summary}"}
],
max_tokens=30
)
title = response.choices[0].message.content.strip().replace('"', '').replace("'", "")
if ':' in title:
title = title.split(':', 1)[1].strip()
if len(title) > 100 or any(word in title.lower() for word in banned_words):
reason = "length" if len(title) > 100 else "banned word"
print(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}")
logging.info(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}")
continue
logging.info(f"Generated title: {title}")
return title
except Exception as e:
logging.error(f"Title generation failed (attempt {attempt + 1}/3): {e}")
print(f"Title Error: {e}")
print("Failed to generate valid title after 3 attempts")
logging.info("Failed to generate valid title after 3 attempts")
return None
def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_prompt=""):
try:
persona = select_best_persona(interest_score, content)
persona_config = PERSONA_CONFIGS.get(persona, {
"article_prompt": "Write a concise, engaging summary that captures the essence of the content for food lovers.",
"description": "a generic food writer",
"tone": "an engaging tone"
})
prompt = persona_config["article_prompt"].format(
description=persona_config["description"],
tone=persona_config["tone"],
num_paragraphs=determine_paragraph_count(interest_score)
)
logging.info(f"Using {persona} with interest_score and content")
full_prompt = (
f"{prompt}\n\n"
f"Do not include the article title in the summary.\n\n"
f"{extra_prompt}\n\n"
f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'.\n"
f"Content to summarize:\n{content}\n\n"
f"Source: {source_name}\n"
f"Link: {link}"
)
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": full_prompt},
{"role": "user", "content": content}
],
max_tokens=1000,
temperature=0.7
)
summary = response.choices[0].message.content.strip()
# Post-process to remove the original title if it still appears
# Extract the title from the content (assuming it's the first line or part of the prompt)
# For simplicity, we can pass the title as an additional parameter if needed
# Here, we'll assume the title is passed via the calling function (e.g., from foodie_automator_rss.py)
# For now, we'll use a placeholder for the title removal logic
# In foodie_automator_rss.py, the title is available as entry.title
# We'll handle the title removal in the calling script instead
logging.info(f"Processed summary (Persona: {persona}): {summary}")
return summary
except Exception as e:
logging.error(f"Summary generation failed with model {SUMMARY_MODEL}: {e}")
return None
def insert_link_naturally(summary, source_name, source_url):
try:
logging.info(f"Input summary to insert_link_naturally: {summary!r}")
paragraphs = summary.split('\n')
if not paragraphs or all(not p.strip() for p in paragraphs):
logging.error("No valid paragraphs to insert link.")
return summary
eligible_paragraphs = [p for p in paragraphs if p.strip() and len(re.split(r'(?<=[.!?])\s+', p.strip())) >= 2]
if not eligible_paragraphs:
logging.warning("No paragraph with multiple sentences found, appending to last paragraph.")
target_para = paragraphs[-1].strip()
link_pattern = f'<a href="{source_url}">{source_name}</a>'
new_para = f"{target_para} Source: {link_pattern}."
paragraphs[-1] = new_para
new_summary = '\n'.join(paragraphs)
logging.info(f"Appended link to summary: {new_summary!r}")
return new_summary
target_para = random.choice(eligible_paragraphs)
sentences = re.split(r'(?<=[.!?])\s+', target_para.strip())
eligible_sentences = [(i, s) for i, s in enumerate(sentences) if s.strip()]
if not eligible_sentences:
logging.error("No eligible sentences found for link insertion.")
return summary
sentence_idx, sentence = random.choice(eligible_sentences)
link_pattern = f'<a href="{source_url}">{source_name}</a>'
# Insert the link at the end of the sentence
new_sentence = f"{sentence.rstrip('.')} according to {link_pattern}."
sentences[sentence_idx] = new_sentence
new_para = ' '.join(sentences)
paragraphs[paragraphs.index(target_para)] = new_para
new_summary = '\n'.join(paragraphs)
logging.info(f"Summary with naturally embedded link: {new_summary!r}")
return new_summary
except Exception as e:
logging.error(f"Link insertion failed: {e}")
link_pattern = f'<a href="{source_url}">{source_name}</a>'
new_summary = f"{summary}\n\nSource: {link_pattern}."
logging.info(f"Fallback summary with link: {new_summary!r}")
return new_summary
def generate_category_from_summary(summary):
try:
if not isinstance(summary, str) or not summary.strip():
logging.warning(f"Invalid summary for category generation: {summary}. Defaulting to 'Trends'.")
return "Trends"
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Based on this summary, select the most relevant category from: Food, Culture, Trends, Health, Lifestyle, Drink, Eats. "
"Return only the category name."
)},
{"role": "user", "content": summary}
],
max_tokens=10
)
category = response.choices[0].message.content.strip()
logging.info(f"Generated category: {category}")
return category if category in ["Food", "Culture", "Trends", "Health", "Lifestyle", "Drink", "Eats"] else "Trends"
except Exception as e:
logging.error(f"Category generation failed: {e}")
return "Trends"
def get_wp_category_id(category_name, wp_base_url, wp_username, wp_password):
try:
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"}
response = requests.get(f"{wp_base_url}/categories", headers=headers, params={"search": category_name})
response.raise_for_status()
categories = response.json()
for cat in categories:
if cat["name"].lower() == category_name.lower():
return cat["id"]
return None
except Exception as e:
logging.error(f"Failed to get WP category ID for '{category_name}': {e}")
return None
def create_wp_category(category_name, wp_base_url, wp_username, wp_password):
try:
headers = {
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}",
"Content-Type": "application/json"
}
payload = {"name": category_name}
response = requests.post(f"{wp_base_url}/categories", headers=headers, json=payload)
response.raise_for_status()
return response.json()["id"]
except Exception as e:
logging.error(f"Failed to create WP category '{category_name}': {e}")
return None
def get_wp_tag_id(tag_name, wp_base_url, wp_username, wp_password):
try:
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"}
response = requests.get(f"{wp_base_url}/tags", headers=headers, params={"search": tag_name})
response.raise_for_status()
tags = response.json()
for tag in tags:
if tag["name"].lower() == tag_name.lower():
return tag["id"]
return None
except Exception as e:
logging.error(f"Failed to get WP tag ID for '{tag_name}': {e}")
return None
def post_to_wp(post_data, category, link, author, image_url, original_source, image_source, uploader, page_url, interest_score, post_id=None, should_post_tweet=True):
"""
Post or update content to WordPress, optionally tweeting the post.
"""
import logging
import requests
from foodie_config import X_API_CREDENTIALS # Removed WP_CREDENTIALS
logger = logging.getLogger(__name__)
# Extract WordPress credentials from author dictionary
wp_url = author.get("url")
wp_username = author.get("username")
wp_password = author.get("password")
if not all([wp_url, wp_username, wp_password]):
logger.error(f"Missing WordPress credentials for author: {author.get('username', 'unknown')}")
return None, None
# Ensure wp_url ends with '/wp-json/wp/v2'
if not wp_url.endswith('/wp-json/wp/v2'):
wp_base_url = f"{wp_url.rstrip('/')}/wp-json/wp/v2"
else:
wp_base_url = wp_url
endpoint = f"{wp_base_url}/posts"
if post_id:
endpoint += f"/{post_id}"
headers = {
"Authorization": "Basic " + base64.b64encode(f"{wp_username}:{wp_password}".encode()).decode(),
"Content-Type": "application/json"
}
# Get or create category ID
category_id = get_wp_category_id(category, wp_base_url, wp_username, wp_password)
if not category_id:
category_id = create_wp_category(category, wp_base_url, wp_username, wp_password)
if not category_id:
logger.warning(f"Failed to get or create category '{category}', using default")
category_id = 1 # Fallback to default category
payload = {
"title": post_data["title"],
"content": post_data["content"],
"status": post_data["status"],
"author": wp_username, # Use username directly
"categories": [category_id]
}
try:
response = requests.post(endpoint, headers=headers, json=payload)
response.raise_for_status()
post_id = response.json().get("id")
post_url = response.json().get("link")
logger.info(f"{'Updated' if post_id else 'Posted'} WordPress post: {post_data['title']} (ID: {post_id})")
if image_url and not post_id: # Only upload image for new posts
media_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, page_url)
if media_id:
requests.post(
f"{wp_base_url}/posts/{post_id}",
headers=headers,
json={"featured_media": media_id}
)
logger.info(f"Set featured image (Media ID: {media_id}) for post {post_id}")
if should_post_tweet and post_url:
credentials = X_API_CREDENTIALS.get(post_data["author"])
if credentials:
tweet_text = f"{post_data['title']}\n{post_url}"
if post_tweet(author, tweet_text): # Updated signature
logger.info(f"Successfully tweeted for post: {post_data['title']}")
else:
logger.warning(f"Failed to tweet for post: {post_data['title']}")
return post_id, post_url
except requests.exceptions.RequestException as e:
logger.error(f"Failed to {'update' if post_id else 'post'} WordPress post: {post_data['title']}: {e}", exc_info=True)
return None, None
# Configure Flickr API with credentials
flickr_api.set_keys(api_key=FLICKR_API_KEY, api_secret=FLICKR_API_SECRET)
logging.info(f"Flickr API configured with key: {FLICKR_API_KEY[:4]}... and secret: {FLICKR_API_SECRET[:4]}...")
# Global variable to track the last Flickr request time
last_flickr_request_time = 0
# Flickr request counter
flickr_request_count = 0
flickr_request_start_time = time.time()
# Define exclude keywords for filtering unwanted image types
exclude_keywords = [
"poster", "infographic", "chart", "graph", "data", "stats", "text", "typography",
"design", "advertisement", "illustration", "diagram", "layout", "print"
]
# Initialize used_images as a set to track used image URLs
used_images_file = "/home/shane/foodie_automator/used_images.json"
used_images = set()
# Load used images from file if it exists
if os.path.exists(used_images_file):
try:
entries = load_json_file(used_images_file, IMAGE_EXPIRATION_DAYS * 24) # Use load_json_file for consistency
for entry in entries:
if isinstance(entry, dict) and "title" in entry and entry["title"].startswith('https://'):
used_images.add(entry["title"])
else:
logging.warning(f"Skipping invalid entry in {used_images_file}: {entry}")
logging.info(f"Loaded {len(used_images)} used image URLs from {used_images_file}")
except Exception as e:
logging.warning(f"Failed to load used images from {used_images_file}: {e}. Resetting to empty set.")
used_images = set()
with open(used_images_file, 'w') as f:
f.write("")
# Function to save used_images to file
def save_used_images():
"""
Save used_images to used_images.json as a JSON array, preserving timestamps.
"""
try:
# Create entries for used_images
timestamp = datetime.now(timezone.utc).isoformat()
entries = [
{"title": url, "timestamp": entry.get("timestamp", timestamp)}
for url, entry in [
(url, next((e for e in load_json_file(used_images_file, IMAGE_EXPIRATION_DAYS * 24) if e["title"] == url), {}))
for url in used_images
]
]
# Use save_json_file for atomic write
save_json_file(used_images_file, entries)
logging.info(f"Saved {len(entries)} used image URLs to {used_images_file}")
except Exception as e:
logging.warning(f"Failed to save used images to {used_images_file}: {e}")
def reset_flickr_request_count():
global flickr_request_count, flickr_request_start_time
if time.time() - flickr_request_start_time >= 3600: # Reset every hour
flickr_request_count = 0
flickr_request_start_time = time.time()
def process_photo(photo, search_query):
tags = [tag.text.lower() for tag in photo.getTags()]
title = photo.title.lower() if photo.title else ""
matched_keywords = [kw for kw in exclude_keywords if kw in tags or kw in title]
if matched_keywords:
logging.info(f"Skipping image with unwanted keywords: {photo.id} (tags: {tags}, title: {title}, matched: {matched_keywords})")
return None
# Try 'Large' size first, fall back to 'Medium' if unavailable
img_url = None
try:
img_url = photo.getPhotoFile(size_label='Large')
except flickr_api.flickrerrors.FlickrError as e:
logging.info(f"Large size not available for photo {photo.id}: {e}, trying Medium")
try:
img_url = photo.getPhotoFile(size_label='Medium')
except flickr_api.flickrerrors.FlickrError as e:
logging.warning(f"Medium size not available for photo {photo.id}: {e}")
return None
if not img_url:
logging.info(f"Image URL invalid for photo {photo.id}")
return None
# Check if the image is highly relevant to the query
query_keywords = set(search_query.lower().split())
photo_keywords = set(tags + title.split())
is_relevant = bool(query_keywords & photo_keywords) # Check if any query keyword is in tags or title
# Allow reuse of highly relevant images
if img_url in used_images and not is_relevant:
logging.info(f"Image already used and not highly relevant for photo {photo.id}: {img_url}")
return None
uploader = photo.owner.username
page_url = f"https://www.flickr.com/photos/{photo.owner.nsid}/{photo.id}"
used_images.add(img_url)
save_used_images()
flickr_data = {
"title": search_query,
"image_url": img_url,
"source": "Flickr",
"uploader": uploader,
"page_url": page_url,
"timestamp": datetime.now(timezone.utc).isoformat()
}
flickr_file = "/home/shane/foodie_automator/flickr_images.json"
with open(flickr_file, 'a') as f:
json.dump(flickr_data, f)
f.write('\n')
logging.info(f"Saved Flickr image metadata to {flickr_file}: {img_url}")
logging.info(f"Selected Flickr image: {img_url} by {uploader} for query '{search_query}' (tags: {tags})")
return img_url, "Flickr", uploader, page_url
def search_flickr(query, per_page=5):
try:
photos = flickr_api.Photo.search(
text=query,
per_page=per_page,
sort='relevance',
safe_search=1,
media='photos',
license='4,5,9,10'
)
return photos
except Exception as e:
logging.warning(f"Flickr API error for query '{query}': {e}")
return []
def fetch_photo_by_id(photo_id):
try:
photo = flickr_api.Photo(id=photo_id)
return photo
except Exception as e:
logging.warning(f"Failed to fetch Flickr photo ID {photo_id}: {e}")
return None
def search_ddg_for_flickr(query):
ddg_query = f"{query} site:flickr.com"
ddg_url = f"https://duckduckgo.com/?q={quote(ddg_query)}"
try:
response = requests.get(ddg_url, headers={'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
photo_ids = set()
for link in soup.find_all('a', href=True):
href = link['href']
match = re.search(r'flickr\.com/photos/[^/]+/(\d+)', href)
if match:
photo_id = match.group(1)
photo_ids.add(photo_id)
photo_ids = list(photo_ids)[:2] # Limit to 2 IDs
logging.info(f"Found {len(photo_ids)} Flickr photo IDs via DDG: {photo_ids}")
return photo_ids
except Exception as e:
logging.warning(f"DDG search failed for query '{ddg_query}': {e}")
return set()
def classify_keywords(keywords):
prompt = (
"Given the following keywords from an image search query, classify each as 'specific' (e.g., brand names, unique entities like 'Taco Bell' or 'Paris') or 'generic' (e.g., common or abstract terms like 'dining' or 'trends'). "
"Return a JSON object mapping each keyword to its classification.\n\n"
"Keywords: " + ", ".join(keywords) + "\n\n"
"Example output format (do not use these exact keywords in your response):\n"
"```json\n"
"{\n"
" \"keyword1\": \"specific\",\n"
" \"keyword2\": \"generic\"\n"
"}\n```"
)
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": "You are a helper that classifies keywords."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.5
)
raw_response = response.choices[0].message.content
json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response)
if not json_match:
logging.warning(f"Failed to parse keyword classification JSON: {raw_response}")
return {kw: "specific" for kw in keywords}
classifications = json.loads(json_match.group(1))
return classifications
except Exception as e:
logging.warning(f"Keyword classification failed: {e}. Defaulting to all specific.")
return {kw: "specific" for kw in keywords}
def get_flickr_image(search_query, relevance_keywords, main_topic):
global last_flickr_request_time, flickr_request_count
reset_flickr_request_count()
flickr_request_count += 1
logging.info(f"Flickr request count: {flickr_request_count}/3600")
current_time = time.time()
time_since_last_request = current_time - last_flickr_request_time
if time_since_last_request < 10:
time.sleep(10 - time_since_last_request)
last_flickr_request_time = time.time()
# Step 1: Search Flickr directly with the original query
logging.info(f"Searching Flickr directly with query: '{search_query}'")
photos = search_flickr(search_query)
for photo in photos:
result = process_photo(photo, search_query)
if result:
return result
# Step 2: Search DDG to find Flickr photo IDs
logging.info(f"Searching DDG with query: '{search_query} site:flickr.com'")
photo_ids = search_ddg_for_flickr(search_query)
if photo_ids:
for photo_id in photo_ids:
photo = fetch_photo_by_id(photo_id)
if photo:
result = process_photo(photo, search_query)
if result:
return result
# Step 3: Break down the query into keywords and classify them
keywords = search_query.lower().split()
if len(keywords) > 1:
classifications = classify_keywords(keywords)
logging.info(f"Keyword classifications: {classifications}")
specific_keywords = [kw for kw, classification in classifications.items() if classification == "specific"]
if specific_keywords:
for keyword in specific_keywords:
logging.info(f"Searching Flickr with specific keyword: '{keyword}'")
photos = search_flickr(keyword)
for photo in photos:
result = process_photo(photo, search_query)
if result:
return result
# Step 4: Fallback using main topic
logging.info(f"No results found. Falling back to main topic: '{main_topic}'")
photos = search_flickr(main_topic)
for photo in photos:
result = process_photo(photo, main_topic)
if result:
return result
# Step 5: Final fallback using relevance keywords
fallback_query = " ".join(relevance_keywords) if isinstance(relevance_keywords, list) else relevance_keywords
logging.info(f"No results with main topic. Falling back to relevance keywords: '{fallback_query}'")
photos = search_flickr(fallback_query)
for photo in photos:
result = process_photo(photo, search_query)
if result:
return result
logging.warning(f"No valid Flickr image found for query '{search_query}' after all attempts.")
return None, None, None, None
def select_best_author(content, interest_score):
try:
best_score = -1
best_author = None
for author in AUTHORS:
persona = PERSONA_CONFIGS.get(author["username"], {})
prompt = persona.get("prompt", "")
current_score = interest_score
if "trend" in prompt.lower():
current_score += 2
elif "recipe" in prompt.lower():
current_score += 1
if current_score > best_score:
best_score = current_score
best_author = author["username"]
if not best_author:
best_author = random.choice([author["username"] for author in AUTHORS])
logging.info(f"Selected author: {best_author} with adjusted score: {best_score}")
return best_author
except Exception as e:
logging.error(f"Error in select_best_author: {e}")
return random.choice([author["username"] for author in AUTHORS])
def check_rate_limit(response):
"""Extract rate limit information from Twitter API response headers."""
try:
remaining = int(response.get('x-rate-limit-remaining', 0))
reset = int(response.get('x-rate-limit-reset', 0))
return remaining, reset
except (ValueError, TypeError) as e:
logging.warning(f"Failed to parse rate limit headers: {e}")
return None, None
def check_author_rate_limit(author, max_tweets=17, tweet_window_seconds=86400):
"""
Check if an author is rate-limited for tweets based on X API limits.
"""
logger = logging.getLogger(__name__)
rate_limit_file = '/home/shane/foodie_automator/rate_limit_info.json'
rate_limit_info = load_json_file(rate_limit_file, default={})
username = author['username']
if username not in rate_limit_info or not isinstance(rate_limit_info[username].get('tweet_reset'), (int, float)):
rate_limit_info[username] = {
'tweet_remaining': max_tweets,
'tweet_reset': time.time()
}
logger.info(f"Initialized tweet rate limit for {username}: {max_tweets} tweets available")
info = rate_limit_info[username]
current_time = time.time()
# Reset tweet limits if window expired or invalid
if current_time >= info.get('tweet_reset', 0) or info.get('tweet_reset', 0) < 1000000000:
info['tweet_remaining'] = max_tweets
info['tweet_reset'] = current_time + tweet_window_seconds
logger.info(f"Reset tweet rate limit for {username}: {max_tweets} tweets available")
save_json_file(rate_limit_file, rate_limit_info)
if info.get('tweet_remaining', 0) <= 0:
reset_time = datetime.fromtimestamp(info['tweet_reset'], tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"Author {username} is tweet rate-limited. Remaining: {info['tweet_remaining']}, Reset at: {reset_time}")
return True
logger.info(f"Tweet rate limit for {username}: {info['tweet_remaining']} tweets remaining")
return False
def get_next_author_round_robin():
"""
Select the next author using round-robin, respecting tweet rate limits.
Returns None if no author is available.
"""
from foodie_config import AUTHORS
global round_robin_index
logger = logging.getLogger(__name__)
for _ in range(len(AUTHORS)):
author = AUTHORS[round_robin_index % len(AUTHORS)]
round_robin_index = (round_robin_index + 1) % len(AUTHORS)
if not check_author_rate_limit(author):
logger.info(f"Selected author via round-robin: {author['username']}")
return author
logger.warning("No authors available due to tweet rate limits.")
return None
def prepare_post_data(summary, title, main_topic=None):
try:
logging.info(f"Preparing post data for summary: {summary[:100]}...")
# Use the original generate_title_from_summary function to generate the title
new_title = generate_title_from_summary(summary)
if not new_title:
logging.warning("Title generation failed, using fallback title")
new_title = "A Tasty Food Discovery Awaits You"
logging.info(f"Generated new title: '{new_title}'")
# Update to unpack four values
search_query, relevance_keywords, generated_main_topic, skip_flag = smart_image_and_filter(new_title, summary)
if skip_flag:
logging.info("Summary filtered out during post preparation")
return None, None, None, None, None, None, None
# Use the provided main_topic if available, otherwise use the generated one
effective_main_topic = main_topic if main_topic else generated_main_topic
image_url, image_source, uploader, page_url = get_flickr_image(search_query, relevance_keywords, effective_main_topic)
if not image_url:
image_url, image_source, uploader, page_url = get_image(search_query)
if not image_url:
logging.warning("No image found for post, skipping")
return None, None, None, None, None, None, None
# Select a full author dictionary from AUTHORS (already imported from foodie_config)
author = random.choice(AUTHORS)
categories = ["Food", "Trends", "Eats", "Culture"]
category = random.choice(categories)
post_data = {
"title": new_title,
"content": summary,
"status": "publish",
"author": author["username"], # Use the username in post_data
"categories": [category]
}
logging.info(f"Post data prepared: Title: '{new_title}', Category: {category}, Author: {author['username']}")
return post_data, author, category, image_url, image_source, uploader, page_url
except Exception as e:
logging.error(f"Failed to prepare post data: {e}")
return None, None, None, None, None, None, None
def save_post_to_recent(post_title, post_url, author_username, timestamp):
"""Save a post to recent_posts.json, maintaining a JSON array."""
try:
recent_posts = load_json_file(RECENT_POSTS_FILE, expiration_hours=24)
# Check for duplicates before appending
entry = {
"title": post_title,
"url": post_url,
"author_username": author_username,
"timestamp": timestamp
}
key = (post_title, post_url, author_username)
if any((p["title"], p["url"], p["author_username"]) == key for p in recent_posts):
logging.debug(f"Skipping duplicate post: {post_title}")
return
recent_posts.append(entry)
with open(RECENT_POSTS_FILE, 'w') as f:
json.dump(recent_posts, f, indent=2)
logging.info(f"Saved post '{post_title}' to {RECENT_POSTS_FILE}")
except Exception as e:
logging.error(f"Failed to save post to {RECENT_POSTS_FILE}: {e}")
def prune_recent_posts():
"""Prune recent_posts.json to keep entries within the last 24 hours."""
try:
recent_posts = load_json_file(RECENT_POSTS_FILE, expiration_hours=24)
with open(RECENT_POSTS_FILE, 'w') as f:
json.dump(recent_posts, f, indent=2)
logging.info(f"Pruned {RECENT_POSTS_FILE} to {len(recent_posts)} entries")
except Exception as e:
logging.error(f"Failed to prune {RECENT_POSTS_FILE}: {e}")