You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

1351 lines
60 KiB

import base64
import json
import logging
import os
import random
import re
from PIL import Image
import pytesseract
import io
import tempfile
import requests
import time
import openai
from dotenv import load_dotenv
from datetime import datetime, timezone, timedelta
from openai import OpenAI
from urllib.parse import quote
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import tweepy
import flickr_api
from foodie_config import (
RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS,
get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL, X_API_CREDENTIALS,
FLICKR_API_KEY, FLICKR_API_SECRET, PIXABAY_API_KEY
)
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def load_json_file(filename, expiration_days=None):
data = []
if os.path.exists(filename):
try:
with open(filename, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
if line.strip():
try:
entry = json.loads(line.strip())
if not isinstance(entry, dict) or "title" not in entry or "timestamp" not in entry:
logging.warning(f"Skipping malformed entry in {filename} at line {i}: {entry}")
continue
data.append(entry)
except json.JSONDecodeError as e:
logging.warning(f"Skipping invalid JSON line in {filename} at line {i}: {e}")
if expiration_days:
cutoff = (datetime.now(timezone.utc) - timedelta(days=expiration_days)).isoformat()
data = [entry for entry in data if entry["timestamp"] > cutoff]
logging.info(f"Loaded {len(data)} entries from {filename}, {len(data)} valid after expiration check")
except Exception as e:
logging.error(f"Failed to load {filename}: {e}")
data = [] # Reset to empty on failure
return data
def save_json_file(filename, key, value):
entry = {"title": key, "timestamp": value}
PRUNE_INTERVAL_DAYS = 180
try:
data = load_json_file(filename, expiration_days=PRUNE_INTERVAL_DAYS)
# Remove duplicates by title
data = [item for item in data if item["title"] != key]
data.append(entry)
# Special handling for used_images.json to save as a flat list
if filename.endswith('used_images.json'):
flat_data = [item["title"] for item in data if isinstance(item, dict) and "title" in item]
with open(filename, 'w') as f:
json.dump(flat_data, f)
else:
with open(filename, 'w') as f:
for item in data:
json.dump(item, f)
f.write('\n')
logging.info(f"Saved '{key}' to {filename}")
print(f"DEBUG: Saved '{key}' to {filename}")
loaded_data = load_json_file(filename, expiration_days=PRUNE_INTERVAL_DAYS)
logging.info(f"Pruned {filename} to {len(loaded_data)} entries (older than {PRUNE_INTERVAL_DAYS} days removed)")
except Exception as e:
logging.error(f"Failed to save or prune {filename}: {e}")
def load_post_counts():
counts = []
filename = '/home/shane/foodie_automator/x_post_counts.json'
if os.path.exists(filename):
try:
with open(filename, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
if line.strip():
try:
entry = json.loads(line.strip())
# Check for expected fields in x_post_counts.json
if not isinstance(entry, dict) or "username" not in entry or "month" not in entry or "monthly_count" not in entry or "day" not in entry or "daily_count" not in entry:
logging.warning(f"Skipping malformed entry in {filename} at line {i}: {entry}")
continue
counts.append(entry)
except json.JSONDecodeError as e:
logging.warning(f"Skipping invalid JSON line in {filename} at line {i}: {e}")
logging.info(f"Loaded {len(counts)} entries from {filename}")
except Exception as e:
logging.error(f"Failed to load {filename}: {e}")
counts = [] # Reset to empty on failure
if not counts:
counts = [{
"username": author["username"],
"month": datetime.now(timezone.utc).strftime("%Y-%m"),
"monthly_count": 0,
"day": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"daily_count": 0
} for author in AUTHORS]
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
current_day = datetime.now(timezone.utc).strftime("%Y-%m-%d")
for entry in counts:
if entry["month"] != current_month:
entry["month"] = current_month
entry["monthly_count"] = 0
if entry["day"] != current_day:
entry["day"] = current_day
entry["daily_count"] = 0
return counts
def save_post_counts(counts):
with open('/home/shane/foodie_automator/x_post_counts.json', 'w') as f:
for item in counts:
json.dump(item, f)
f.write('\n')
logging.info("Saved post counts to x_post_counts.json")
import re
def generate_article_tweet(author, post, persona):
title = post["title"]
url = post["url"]
author_handle = f"@{author['username']}"
prompt = (
f"Craft a sharp tweet (under 280 characters) for {author_handle} with the voice of '{persona}'. "
f"Distill the essence of the article '{title}' and include the raw URL '{url}' at the end. "
f"Make it bold, spark curiosity, and invite engagement with a human touch. "
f"Swap 'elevate' for dynamic terms like 'ignite' or 'unleash'. "
f"Absolutely do not include hashtags, emojis, or phrases like '[Read more]' or 'Read more'. "
f"Skip any extra fluff or formatting around the URL—just append the raw URL after a space."
)
response = openai.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media viral expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
# Post-generation check: Strip any emojis using regex
tweet = re.sub(r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F700-\U0001F77F\U0001F780-\U0001F7FF\U0001F800-\U0001F8FF\U0001F900-\U0001F9FF\U0001FA00-\U0001FA6F\U0001FA70-\U0001FAFF\U00002702-\U000027B0\U000024C2-\U0001F251]', '', tweet).strip()
# Optionally, strip "[Read more]" or similar phrases as an additional failsafe
tweet = re.sub(r'\[Read more\]\(.*?\)|\bRead more\b', '', tweet).strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
return tweet
def post_tweet(author, tweet):
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return False
post_counts = load_post_counts()
author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None)
if author_count["monthly_count"] >= 500:
logging.warning(f"Monthly post limit (500) reached for {author['username']}")
return False
if author_count["daily_count"] >= 20:
logging.warning(f"Daily post limit (20) reached for {author['username']}")
return False
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
response = client.create_tweet(text=tweet)
author_count["monthly_count"] += 1
author_count["daily_count"] += 1
save_post_counts(post_counts)
logging.info(f"Posted tweet for {author['username']}: {tweet}")
return True
except Exception as e:
logging.error(f"Failed to post tweet for {author['username']}: {e}")
return False
def select_best_persona(interest_score, content=""):
logging.info("Using select_best_persona with interest_score and content")
personas = ["Visionary Editor", "Foodie Critic", "Trend Scout", "Culture Connoisseur"]
content_lower = content.lower()
if any(kw in content_lower for kw in ["tech", "ai", "innovation", "sustainability"]):
return random.choice(["Trend Scout", "Visionary Editor"])
elif any(kw in content_lower for kw in ["review", "critic", "taste", "flavor"]):
return "Foodie Critic"
elif any(kw in content_lower for kw in ["culture", "tradition", "history"]):
return "Culture Connoisseur"
if interest_score >= 8:
return random.choice(personas[:2])
elif interest_score >= 6:
return random.choice(personas[2:])
return random.choice(personas)
def get_image(search_query):
global last_flickr_request_time, flickr_request_count
reset_flickr_request_count()
flickr_request_count += 1
logging.info(f"Flickr request count: {flickr_request_count}/3600")
current_time = time.time()
time_since_last_request = current_time - last_flickr_request_time
if time_since_last_request < 5:
time.sleep(5 - time_since_last_request)
last_flickr_request_time = time.time()
headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}
def search_flickr(query, per_page=20):
try:
photos = flickr_api.Photo.search(
text=query,
per_page=per_page,
sort='relevance',
safe_search=1,
media='photos',
license='4,5,9,10'
)
return photos
except Exception as e:
logging.warning(f"Flickr API error for query '{query}': {e}")
return []
def fetch_photo_by_id(photo_id):
try:
photo = flickr_api.Photo(id=photo_id)
return photo
except Exception as e:
logging.warning(f"Failed to fetch Flickr photo ID {photo_id}: {e}")
return None
def process_photo(photo):
tags = [tag.text.lower() for tag in photo.getTags()]
title = photo.title.lower() if photo.title else ""
matched_keywords = [kw for kw in exclude_keywords if kw in tags or kw in title]
if matched_keywords:
logging.info(f"Skipping image with unwanted keywords: {photo.id} (tags: {tags}, title: {title}, matched: {matched_keywords})")
return None
img_url = photo.getPhotoFile(size_label='Medium')
if not img_url or img_url in used_images:
return None
temp_file = None
try:
for attempt in range(3):
img_response = requests.get(img_url, headers=headers, timeout=10)
if img_response.status_code == 429:
wait_time = 5 * (2 ** attempt)
logging.warning(f"Rate limit hit for {img_url}. Retrying after {wait_time}s (attempt {attempt+1}/3).")
time.sleep(wait_time)
continue
img_response.raise_for_status()
break
else:
logging.warning(f"Rate limit hit for {img_url} after retries. Falling back to Pixabay.")
return None
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
temp_file.write(img_response.content)
temp_path = temp_file.name
img = Image.open(temp_path)
text = pytesseract.image_to_string(img)
char_count = len(text.strip())
logging.info(f"OCR processed {img_url}: {char_count} characters detected")
if char_count > 200:
logging.info(f"Skipping text-heavy image (OCR): {img_url} (char_count: {char_count})")
return None
uploader = photo.owner.username
page_url = f"https://www.flickr.com/photos/{photo.owner.nsid}/{photo.id}"
used_images.add(img_url)
save_used_images()
flickr_data = {
"title": search_query,
"image_url": img_url,
"source": "Flickr",
"uploader": uploader,
"page_url": page_url,
"timestamp": datetime.now(timezone.utc).isoformat(),
"ocr_chars": char_count
}
flickr_file = "/home/shane/foodie_automator/flickr_images.json"
with open(flickr_file, 'a') as f:
json.dump(flickr_data, f)
f.write('\n')
logging.info(f"Saved Flickr image to {flickr_file}: {img_url}")
logging.info(f"Fallback Flickr image: {img_url} by {uploader} for query '{search_query}' (tags: {tags})")
return img_url, "Flickr", uploader, page_url
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
logging.warning(f"Rate limit hit for {img_url} after retries. Falling back to Pixabay.")
return None
else:
logging.warning(f"Download failed for {img_url}: {e}")
return None
except Exception as e:
logging.warning(f"OCR processing failed for {img_url}: {e}")
return None
finally:
if temp_file and os.path.exists(temp_path):
os.unlink(temp_path)
def search_ddg_for_flickr(query):
ddg_query = f"{query} site:flickr.com"
ddg_url = f"https://duckduckgo.com/?q={quote(ddg_query)}"
try:
response = requests.get(ddg_url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
photo_ids = set()
for link in soup.find_all('a', href=True):
href = link['href']
match = re.search(r'flickr\.com/photos/[^/]+/(\d+)', href)
if match:
photo_id = match.group(1)
photo_ids.add(photo_id)
photo_ids = list(photo_ids)[:5] # Limit to 5 IDs
logging.info(f"Found {len(photo_ids)} Flickr photo IDs via DDG: {photo_ids}")
return photo_ids
except Exception as e:
logging.warning(f"DDG search failed for query '{ddg_query}': {e}")
return set()
def classify_keywords(keywords):
prompt = (
"Given the following keywords from an image search query, classify each as 'specific' (e.g., brand names, unique entities like 'Taco Bell' or 'Paris') or 'generic' (e.g., common or abstract terms like 'dining' or 'trends'). "
"Return a JSON object mapping each keyword to its classification.\n\n"
"Keywords: " + ", ".join(keywords) + "\n\n"
"Example output format (do not use these exact keywords in your response):\n"
"```json\n"
"{\n"
" \"keyword1\": \"specific\",\n"
" \"keyword2\": \"generic\"\n"
"}\n```"
)
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": "You are a helper that classifies keywords."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.5
)
raw_response = response.choices[0].message.content
json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response)
if not json_match:
logging.warning(f"Failed to parse keyword classification JSON: {raw_response}")
return {kw: "specific" for kw in keywords}
classifications = json.loads(json_match.group(1))
return classifications
except Exception as e:
logging.warning(f"Keyword classification failed: {e}. Defaulting to all specific.")
return {kw: "specific" for kw in keywords}
# Step 1: Search DDG to find Flickr photo IDs
logging.info(f"Searching DDG with query: '{search_query} site:flickr.com'")
photo_ids = search_ddg_for_flickr(search_query)
if photo_ids:
for photo_id in photo_ids:
photo = fetch_photo_by_id(photo_id)
if photo:
result = process_photo(photo)
if result:
return result
# Step 2: Break down the query into keywords and classify them for direct Flickr API search
keywords = search_query.lower().split()
if len(keywords) > 1:
classifications = classify_keywords(keywords)
logging.info(f"Keyword classifications: {classifications}")
specific_keywords = [kw for kw, classification in classifications.items() if classification == "specific"]
if specific_keywords:
for keyword in specific_keywords:
logging.info(f"Searching Flickr with specific keyword: '{keyword}'")
photos = search_flickr(keyword)
for photo in photos:
result = process_photo(photo)
if result:
return result
# Step 3: Final fallback to a generic food-related query
logging.info(f"No results found. Falling back to generic query: 'food dining'")
photos = search_flickr("food dining")
for photo in photos:
result = process_photo(photo)
if result:
return result
logging.warning(f"No valid Flickr image found in fallback for query '{search_query}'. Trying Pixabay.")
# Fallback to Pixabay
try:
pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(search_query)}&image_type=photo&per_page=10"
response = requests.get(pixabay_url, timeout=10)
response.raise_for_status()
data = response.json()
for hit in data.get('hits', []):
img_url = hit.get('webformatURL')
if not img_url or img_url in used_images:
continue
uploader = hit.get('user', 'Unknown')
page_url = hit.get('pageURL', img_url)
used_images.add(img_url)
save_used_images()
logging.debug(f"Image selected for query '{search_query}': {img_url}")
return img_url, "Pixabay", uploader, page_url
logging.warning(f"No valid Pixabay image found for query '{search_query}'.")
return None, None, None, None
except Exception as e:
logging.error(f"Pixabay image fetch failed for query '{search_query}': {e}")
return None, None, None, None
def generate_image_query(content):
prompt = (
"Given the following content, generate a concise image search query (max 5 words) that would likely yield relevant, visually appealing images on platforms like Flickr or Pixabay. "
"Identify and prioritize specific entities like brand names or unique terms over abstract or generic concepts. "
"Focus on concrete, visual concepts related to food, dining, or restaurants. "
"Also provide relevance keywords (max 5 words) to filter results, using general themes related to the content. "
"Return the result as a JSON object with 'search' and 'relevance' keys.\n\n"
"Content:\n"
f"{content}\n\n"
"Example output:\n"
"```json\n"
"{\n"
" \"search\": \"Wingstop dining\",\n"
" \"relevance\": \"fast food dining\"\n"
"}\n```"
)
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": "You are a helpful assistant that generates concise image search queries."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.5
)
raw_response = response.choices[0].message.content
logging.debug(f"Raw GPT image query response: '{raw_response}'")
# Extract JSON from the response
json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response)
if not json_match:
logging.warning(f"Failed to parse image query JSON from GPT response: {raw_response}")
return "food dining", ["dining", "trends"]
query_data = json.loads(json_match.group(1))
search_query = query_data.get("search", "food dining")
relevance_keywords = query_data.get("relevance", ["dining", "trends"])
logging.debug(f"Image query from content: {query_data}")
return search_query, relevance_keywords
except Exception as e:
logging.warning(f"Failed to generate image query: {e}. Using fallback.")
return "food dining", ["dining", "trends"]
def smart_image_and_filter(title, summary):
try:
content = f"{title}\n\n{summary}"
prompt = (
"Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) "
"for an image search about food industry trends or viral content. Prioritize specific terms if present, "
"otherwise focus on the main theme. "
"Return 'SKIP' if the article is about home appliances, recipes, promotions, or contains 'homemade', else 'KEEP'. "
"Return as JSON with double quotes for all property names and string values (e.g., {\"image_query\": \"specific term\", \"relevance\": [\"keyword1\", \"keyword2\"], \"action\": \"KEEP\" or \"SKIP\"})."
)
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": content}
],
max_tokens=100
)
raw_result = response.choices[0].message.content.strip()
logging.info(f"Raw GPT smart image/filter response: '{raw_result}'")
# Remove ```json markers and fix single quotes in JSON structure
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip()
# Replace single quotes with double quotes, but preserve single quotes within string values
fixed_result = re.sub(r"(?<!\\)'(?=\s*[\w\s]*\])|(?<=\[|\{|\s)'|'(?=\s*[\]\},:])|(?<=\w)'(?=\s*:)", '"', cleaned_result)
try:
result = json.loads(fixed_result)
except json.JSONDecodeError as e:
logging.warning(f"JSON parsing failed: {e}, raw: '{fixed_result}'. Using fallback.")
return "food trends", ["cuisine", "dining"], False
if not isinstance(result, dict) or "image_query" not in result or "relevance" not in result or "action" not in result:
logging.warning(f"Invalid GPT response format: {result}, using fallback")
return "food trends", ["cuisine", "dining"], False
image_query = result["image_query"]
relevance_keywords = result["relevance"]
skip_flag = result["action"] == "SKIP" or "homemade" in title.lower() or "homemade" in summary.lower()
logging.info(f"Smart image query: {image_query}, Relevance: {relevance_keywords}, Skip: {skip_flag}")
if not image_query or len(image_query.split()) < 2:
logging.warning(f"Image query '{image_query}' too vague, using fallback")
return "food trends", ["cuisine", "dining"], skip_flag
return image_query, relevance_keywords, skip_flag
except Exception as e:
logging.error(f"Smart image/filter failed: {e}, using fallback")
return "food trends", ["cuisine", "dining"], False
def upload_image_to_wp(image_url, post_title, wp_base_url, wp_username, wp_password, image_source="Pixabay", uploader=None, pixabay_url=None):
try:
safe_title = post_title.encode('ascii', 'ignore').decode('ascii').replace(' ', '_')[:50]
headers = {
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}",
"Content-Disposition": f"attachment; filename={safe_title}.jpg",
"Content-Type": "image/jpeg"
}
image_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
logging.info(f"Fetching image from {image_url} for '{post_title}'")
image_response = requests.get(image_url, headers=image_headers, timeout=10)
image_response.raise_for_status()
response = requests.post(
f"{wp_base_url}/media",
headers=headers,
data=image_response.content
)
response.raise_for_status()
image_id = response.json()["id"]
caption = f'<a href="{pixabay_url}">{image_source}</a> by {uploader}' if pixabay_url and uploader else image_source
requests.post(
f"{wp_base_url}/media/{image_id}",
headers={"Authorization": headers["Authorization"], "Content-Type": "application/json"},
json={"caption": caption}
)
logging.info(f"Uploaded image '{safe_title}.jpg' to WP (ID: {image_id}) with caption '{caption}'")
return image_id
except Exception as e:
logging.error(f"Image upload to WP failed for '{post_title}': {e}")
return None
def determine_paragraph_count(interest_score):
if interest_score >= 9:
return 5
elif interest_score >= 7:
return 4
return 3
def is_interesting(summary):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Rate this content from 0-10 based on its rarity, buzzworthiness, and engagement potential for food lovers, covering a wide range of food topics (skip recipes). "
"Score 8-10 for rare, highly shareable ideas that grab attention. "
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. "
"Return only a number."
)},
{"role": "user", "content": f"Content: {summary}"}
],
max_tokens=5
)
raw_score = response.choices[0].message.content.strip()
score = int(raw_score) if raw_score.isdigit() else 0
print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})")
logging.info(f"Interest Score: {score} (raw: {raw_score})")
return score
except Exception as e:
logging.error(f"Interestingness scoring failed: {e}")
print(f"Interest Error: {e}")
return 0
def generate_title_from_summary(summary):
banned_words = ["elevate", "elevating", "elevated"]
for attempt in range(3):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Generate a concise, engaging title (under 100 characters) based on this summary, covering food topics. "
"Craft it with Upworthy/Buzzfeed flair—think ‘you won’t believe this’ or ‘this is nuts’—for food insiders. "
"Avoid quotes, emojis, special characters, or the words 'elevate', 'elevating', 'elevated'. "
"End with a question to spark shares."
)},
{"role": "user", "content": f"Summary: {summary}"}
],
max_tokens=30
)
title = response.choices[0].message.content.strip().replace('"', '').replace("'", "")
if ':' in title:
title = title.split(':', 1)[1].strip()
if len(title) > 100 or any(word in title.lower() for word in banned_words):
reason = "length" if len(title) > 100 else "banned word"
print(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}")
logging.info(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}")
continue
logging.info(f"Generated title: {title}")
return title
except Exception as e:
logging.error(f"Title generation failed (attempt {attempt + 1}/3): {e}")
print(f"Title Error: {e}")
print("Failed to generate valid title after 3 attempts")
logging.info("Failed to generate valid title after 3 attempts")
return None
def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_prompt=""):
try:
persona = select_best_persona(interest_score, content)
persona_config = PERSONA_CONFIGS.get(persona, {
"article_prompt": "Write a concise, engaging summary that captures the essence of the content for food lovers.",
"description": "a generic food writer",
"tone": "an engaging tone"
})
prompt = persona_config["article_prompt"].format(
description=persona_config["description"],
tone=persona_config["tone"],
num_paragraphs=determine_paragraph_count(interest_score)
)
logging.info(f"Using {persona} with interest_score and content")
full_prompt = (
f"{prompt}\n\n"
f"{extra_prompt}\n\n"
f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'.\n"
f"Content to summarize:\n{content}\n\n"
f"Source: {source_name}\n"
f"Link: {link}"
)
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": full_prompt},
{"role": "user", "content": content}
],
max_tokens=1000,
temperature=0.7
)
summary = response.choices[0].message.content.strip()
logging.info(f"Processed summary (Persona: {persona}): {summary}")
return summary
except Exception as e:
logging.error(f"Summary generation failed with model {SUMMARY_MODEL}: {e}")
return None
def insert_link_naturally(summary, source_name, source_url):
try:
# Log the input summary to debug its structure
logging.info(f"Input summary to insert_link_naturally: {summary!r}")
prompt = (
"Take this summary and insert a single HTML link naturally into one paragraph (randomly chosen). "
"Use the format '<a href=\"{source_url}\">{source_name}</a>' and weave it into the text seamlessly, "
"e.g., 'The latest scoop from {source_name} reveals...' or '{source_name} uncovers this wild shift.' "
"Vary the phrasing creatively to avoid repetition (don’t always use 'dives into'). "
"Place the link at a sentence boundary (after a period, not within numbers like '6.30am' or '1.5'). "
"Maintain the original tone, flow, and paragraph structure, preserving all existing newlines exactly as they are. "
"Each paragraph in the input summary is separated by a single \\n; ensure the output maintains this exact separation. "
"Do not add or remove newlines beyond the original summary structure. "
"Return the modified summary with exactly one link.\n\n"
"Summary:\n{summary}\n\n"
"Source Name: {source_name}\nSource URL: {source_url}"
).format(summary=summary, source_name=source_name, source_url=source_url)
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": "Insert the link naturally into the summary."}
],
max_tokens=1000,
temperature=0.7
)
new_summary = response.choices[0].message.content.strip()
link_pattern = f'<a href="{source_url}">{source_name}</a>'
if new_summary and new_summary.count(link_pattern) == 1:
# Normalize paragraph separation to ensure a single \n break
# Split by newlines, but do not filter out paragraphs to preserve the count
paragraphs = new_summary.split('\n')
# Strip each paragraph, but keep all paragraphs even if empty
paragraphs = [p.strip() for p in paragraphs]
new_summary = '\n'.join(paragraphs)
logging.info(f"Summary with naturally embedded link (normalized): {new_summary!r}")
return new_summary
logging.warning(f"GPT failed to insert link correctly: {new_summary}. Using fallback.")
except Exception as e:
logging.error(f"Link insertion failed: {e}")
# Fallback path
time_pattern = r'\b\d{1,2}\.\d{2}(?:am|pm)\b'
protected_summary = re.sub(time_pattern, lambda m: m.group(0).replace('.', '@'), summary)
paragraphs = protected_summary.split('\n')
if not paragraphs or all(not p.strip() for p in paragraphs):
logging.error("No valid paragraphs to insert link.")
return summary
target_para = random.choice([p for p in paragraphs if p.strip()])
phrases = [
f"The scoop from {link_pattern} spills the details",
f"{link_pattern} uncovers this wild shift",
f"This gem via {link_pattern} drops some truth",
f"{link_pattern} breaks down the buzz"
]
insertion_phrase = random.choice(phrases)
sentences = re.split(r'(?<=[.!?])\s+', target_para)
insertion_point = -1
for i, sent in enumerate(sentences):
if sent.strip() and '@' not in sent:
insertion_point = sum(len(s) + 1 for s in sentences[:i+1])
break
if insertion_point == -1:
insertion_point = len(target_para)
new_para = f"{target_para[:insertion_point]} {insertion_phrase}. {target_para[insertion_point:]}".strip()
paragraphs[paragraphs.index(target_para)] = new_para
new_summary = '\n'.join(paragraphs)
new_summary = new_summary.replace('@', '.')
logging.info(f"Fallback summary with link: {new_summary!r}")
return new_summary
def generate_category_from_summary(summary):
try:
if not isinstance(summary, str) or not summary.strip():
logging.warning(f"Invalid summary for category generation: {summary}. Defaulting to 'Trends'.")
return "Trends"
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Based on this summary, select the most relevant category from: Food, Culture, Trends, Health, Lifestyle, Drink, Eats. "
"Return only the category name."
)},
{"role": "user", "content": summary}
],
max_tokens=10
)
category = response.choices[0].message.content.strip()
logging.info(f"Generated category: {category}")
return category if category in ["Food", "Culture", "Trends", "Health", "Lifestyle", "Drink", "Eats"] else "Trends"
except Exception as e:
logging.error(f"Category generation failed: {e}")
return "Trends"
def get_wp_category_id(category_name, wp_base_url, wp_username, wp_password):
try:
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"}
response = requests.get(f"{wp_base_url}/categories", headers=headers, params={"search": category_name})
response.raise_for_status()
categories = response.json()
for cat in categories:
if cat["name"].lower() == category_name.lower():
return cat["id"]
return None
except Exception as e:
logging.error(f"Failed to get WP category ID for '{category_name}': {e}")
return None
def create_wp_category(category_name, wp_base_url, wp_username, wp_password):
try:
headers = {
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}",
"Content-Type": "application/json"
}
payload = {"name": category_name}
response = requests.post(f"{wp_base_url}/categories", headers=headers, json=payload)
response.raise_for_status()
return response.json()["id"]
except Exception as e:
logging.error(f"Failed to create WP category '{category_name}': {e}")
return None
def get_wp_tag_id(tag_name, wp_base_url, wp_username, wp_password):
try:
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"}
response = requests.get(f"{wp_base_url}/tags", headers=headers, params={"search": tag_name})
response.raise_for_status()
tags = response.json()
for tag in tags:
if tag["name"].lower() == tag_name.lower():
return tag["id"]
return None
except Exception as e:
logging.error(f"Failed to get WP tag ID for '{tag_name}': {e}")
return None
def post_to_wp(post_data, category, link, author, image_url, original_source, image_source="Pixabay", uploader=None, pixabay_url=None, interest_score=4, post_id=None, should_post_tweet=True):
wp_base_url = "https://insiderfoodie.com/wp-json/wp/v2"
logging.info(f"Starting post_to_wp for '{post_data['title']}', image_source: {image_source}")
if not isinstance(author, dict) or "username" not in author or "password" not in author:
raise ValueError(f"Invalid author data: {author}. Expected a dictionary with 'username' and 'password' keys.")
wp_username = author["username"]
wp_password = author["password"]
if not isinstance(interest_score, int):
logging.error(f"Invalid interest_score type: {type(interest_score)}, value: '{interest_score}'. Defaulting to 4.")
interest_score = 4
elif interest_score < 0 or interest_score > 10:
logging.warning(f"interest_score out of valid range (0-10): {interest_score}. Clamping to 4.")
interest_score = min(max(interest_score, 0), 10)
try:
headers = {
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}",
"Content-Type": "application/json"
}
auth_test = requests.get(f"{wp_base_url}/users/me", headers=headers)
auth_test.raise_for_status()
logging.info(f"Auth test passed for {wp_username}: {auth_test.json()['id']}")
category_id = get_wp_category_id(category, wp_base_url, wp_username, wp_password)
if not category_id:
category_id = create_wp_category(category, wp_base_url, wp_username, wp_password)
logging.info(f"Created new category '{category}' with ID {category_id}")
else:
logging.info(f"Found existing category '{category}' with ID {category_id}")
tags = [1]
if interest_score >= 9:
picks_tag_id = get_wp_tag_id("Picks", wp_base_url, wp_username, wp_password)
if picks_tag_id and picks_tag_id not in tags:
tags.append(picks_tag_id)
logging.info(f"Added 'Picks' tag (ID: {picks_tag_id}) to post due to high interest score: {interest_score}")
content = post_data["content"]
if content is None:
logging.error(f"Post content is None for title '{post_data['title']}' - using fallback")
content = "Content unavailable. Check the original source for details."
formatted_content = "\n".join(f"<p>{para}</p>" for para in content.split('\n') if para.strip())
author_id_map = {
"owenjohnson": 10,
"javiermorales": 2,
"aishapatel": 3,
"trangnguyen": 12,
"keishareid": 13,
"lilamoreau": 7
}
author_id = author_id_map.get(author["username"], 5)
payload = {
"title": post_data["title"],
"content": formatted_content,
"status": "publish",
"categories": [category_id],
"tags": tags,
"author": author_id,
"meta": {
"original_link": link,
"original_source": original_source,
"interest_score": interest_score
}
}
if image_url and not post_id:
logging.info(f"Attempting image upload for '{post_data['title']}', URL: {image_url}, source: {image_source}")
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, pixabay_url)
if not image_id:
logging.info(f"Flickr upload failed for '{post_data['title']}', falling back to Pixabay")
pixabay_query = post_data["title"][:50]
image_url, image_source, uploader, pixabay_url = get_image(pixabay_query)
if image_url:
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, pixabay_url)
if image_id:
payload["featured_media"] = image_id
else:
logging.warning(f"All image uploads failed for '{post_data['title']}' - posting without image")
endpoint = f"{wp_base_url}/posts/{post_id}" if post_id else f"{wp_base_url}/posts"
method = requests.post
logging.debug(f"Sending WP request to {endpoint} with payload: {json.dumps(payload, indent=2)}")
response = method(endpoint, headers=headers, json=payload)
response.raise_for_status()
post_info = response.json()
logging.debug(f"WP response: {json.dumps(post_info, indent=2)}")
if not isinstance(post_info, dict) or "id" not in post_info:
raise ValueError(f"Invalid WP response: {post_info}")
post_id = post_info["id"]
post_url = post_info["link"]
# Save to recent_posts.json
timestamp = datetime.now(timezone.utc).isoformat()
save_post_to_recent(post_data["title"], post_url, author["username"], timestamp)
# Post article tweet to X only if should_post_tweet is True
if should_post_tweet:
try:
post = {"title": post_data["title"], "url": post_url}
tweet = generate_article_tweet(author, post, author["persona"])
if post_tweet(author, tweet): # Use the actual post_tweet function
logging.info(f"Successfully posted article tweet for {author['username']} on X")
else:
logging.warning(f"Failed to post article tweet for {author['username']} on X")
except Exception as e:
logging.error(f"Error posting article tweet for {author['username']}: {e}")
logging.info(f"Posted/Updated by {author['username']}: {post_data['title']} (ID: {post_id})")
return post_id, post_url
except requests.exceptions.RequestException as e:
logging.error(f"WP API request failed: {e} - Response: {e.response.text if e.response else 'No response'}")
print(f"WP Error: {e}")
return None, None
except KeyError as e:
logging.error(f"WP payload error - Missing key: {e} - Author data: {author}")
print(f"WP Error: {e}")
return None, None
except Exception as e:
logging.error(f"WP posting failed: {e}")
print(f"WP Error: {e}")
return None, None
# Configure Flickr API with credentials
flickr_api.set_keys(api_key=FLICKR_API_KEY, api_secret=FLICKR_API_SECRET)
logging.info(f"Flickr API configured with key: {FLICKR_API_KEY[:4]}... and secret: {FLICKR_API_SECRET[:4]}...")
# Global variable to track the last Flickr request time
last_flickr_request_time = 0
# Flickr request counter
flickr_request_count = 0
flickr_request_start_time = time.time()
# Define exclude keywords for filtering unwanted image types
exclude_keywords = [
"poster", "infographic", "chart", "graph", "data", "stats", "text", "typography",
"design", "advertisement", "illustration", "diagram", "layout", "print"
]
# Initialize used_images as a set to track used image URLs
used_images_file = "/home/shane/foodie_automator/used_images.json"
used_images = set()
# Load used images from file if it exists
if os.path.exists(used_images_file):
try:
with open(used_images_file, 'r') as f:
data = json.load(f)
# Handle malformed format (list of lists)
if isinstance(data, list) and data and isinstance(data[0], list):
logging.warning(f"Fixing malformed used_images.json format: {data[:2]}...")
flat_data = []
for item in data:
if isinstance(item, list):
flat_data.extend(item)
else:
flat_data.append(item)
used_images.update(flat_data)
else:
used_images.update(data)
logging.info(f"Loaded {len(used_images)} used image URLs from {used_images_file}")
except Exception as e:
logging.warning(f"Failed to load used images from {used_images_file}: {e}")
# Function to save used_images to file
def save_used_images():
try:
with open(used_images_file, 'w') as f:
json.dump(list(used_images), f)
logging.info(f"Saved {len(used_images)} used image URLs to {used_images_file}")
except Exception as e:
logging.warning(f"Failed to save used images to {used_images_file}: {e}")
def reset_flickr_request_count():
global flickr_request_count, flickr_request_start_time
if time.time() - flickr_request_start_time >= 3600: # Reset every hour
flickr_request_count = 0
flickr_request_start_time = time.time()
def get_flickr_image(search_query, relevance_keywords):
global last_flickr_request_time, flickr_request_count
reset_flickr_request_count()
flickr_request_count += 1
logging.info(f"Flickr request count: {flickr_request_count}/3600")
# Enforce a minimum delay of 5 seconds between Flickr requests
current_time = time.time()
time_since_last_request = current_time - last_flickr_request_time
if time_since_last_request < 5:
time.sleep(5 - time_since_last_request)
last_flickr_request_time = time.time()
headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}
# Helper function to search Flickr with a given query
def search_flickr(query, per_page=20):
try:
photos = flickr_api.Photo.search(
text=query,
per_page=per_page,
sort='relevance',
safe_search=1,
media='photos',
license='4,5,9,10'
)
return photos
except Exception as e:
logging.warning(f"Flickr API error for query '{query}': {e}")
return []
# Helper function to fetch a Flickr photo by ID
def fetch_photo_by_id(photo_id):
try:
photo = flickr_api.Photo(id=photo_id)
return photo
except Exception as e:
logging.warning(f"Failed to fetch Flickr photo ID {photo_id}: {e}")
return None
# Helper function to process a photo
def process_photo(photo):
tags = [tag.text.lower() for tag in photo.getTags()]
title = photo.title.lower() if photo.title else ""
matched_keywords = [kw for kw in exclude_keywords if kw in tags or kw in title]
if matched_keywords:
logging.info(f"Skipping image with unwanted keywords: {photo.id} (tags: {tags}, title: {title}, matched: {matched_keywords})")
return None
img_url = photo.getPhotoFile(size_label='Large')
if not img_url:
img_url = photo.getPhotoFile(size_label='Medium')
if not img_url or img_url in used_images:
return None
temp_file = None
try:
for attempt in range(3):
img_response = requests.get(img_url, headers=headers, timeout=10)
if img_response.status_code == 429:
wait_time = 5 * (2 ** attempt)
logging.warning(f"Rate limit hit for {img_url}. Retrying after {wait_time}s (attempt {attempt+1}/3).")
time.sleep(wait_time)
continue
img_response.raise_for_status()
break
else:
logging.warning(f"Rate limit hit for {img_url} after retries. Falling back to Pixabay.")
return None
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
temp_file.write(img_response.content)
temp_path = temp_file.name
img = Image.open(temp_path)
text = pytesseract.image_to_string(img)
char_count = len(text.strip())
logging.info(f"OCR processed {img_url}: {char_count} characters detected")
if char_count > 200:
logging.info(f"Skipping text-heavy image (OCR): {img_url} (char_count: {char_count})")
return None
uploader = photo.owner.username
page_url = f"https://www.flickr.com/photos/{photo.owner.nsid}/{photo.id}"
used_images.add(img_url)
save_used_images()
flickr_data = {
"title": search_query,
"image_url": img_url,
"source": "Flickr",
"uploader": uploader,
"page_url": page_url,
"timestamp": datetime.now(timezone.utc).isoformat(),
"ocr_chars": char_count
}
flickr_file = "/home/shane/foodie_automator/flickr_images.json"
with open(flickr_file, 'a') as f:
json.dump(flickr_data, f)
f.write('\n')
logging.info(f"Saved Flickr image to {flickr_file}: {img_url}")
logging.info(f"Fetched Flickr image: {img_url} by {uploader} for query '{search_query}' (tags: {tags})")
return img_url, "Flickr", uploader, page_url
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
logging.warning(f"Rate limit hit for {img_url} after retries. Falling back to Pixabay.")
return None
else:
logging.warning(f"Download failed for {img_url}: {e}")
return None
except Exception as e:
logging.warning(f"OCR processing failed for {img_url}: {e}")
return None
finally:
if temp_file and os.path.exists(temp_path):
os.unlink(temp_path)
# Helper function to search DDG and extract Flickr photo IDs
def search_ddg_for_flickr(query):
ddg_query = f"{query} site:flickr.com"
ddg_url = f"https://duckduckgo.com/?q={quote(ddg_query)}"
try:
response = requests.get(ddg_url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
photo_ids = set()
for link in soup.find_all('a', href=True):
href = link['href']
match = re.search(r'flickr\.com/photos/[^/]+/(\d+)', href)
if match:
photo_id = match.group(1)
photo_ids.add(photo_id)
photo_ids = list(photo_ids)[:5] # Limit to 5 IDs
logging.info(f"Found {len(photo_ids)} Flickr photo IDs via DDG: {photo_ids}")
return photo_ids
except Exception as e:
logging.warning(f"DDG search failed for query '{ddg_query}': {e}")
return set()
# Helper function to classify keywords as specific or generic
def classify_keywords(keywords):
prompt = (
"Given the following keywords from an image search query, classify each as 'specific' (e.g., brand names, unique entities like 'Taco Bell' or 'Paris') or 'generic' (e.g., common or abstract terms like 'dining' or 'trends'). "
"Return a JSON object mapping each keyword to its classification.\n\n"
"Keywords: " + ", ".join(keywords) + "\n\n"
"Example output format (do not use these exact keywords in your response):\n"
"```json\n"
"{\n"
" \"keyword1\": \"specific\",\n"
" \"keyword2\": \"generic\"\n"
"}\n```"
)
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": "You are a helper that classifies keywords."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.5
)
raw_response = response.choices[0].message.content
json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response)
if not json_match:
logging.warning(f"Failed to parse keyword classification JSON: {raw_response}")
return {kw: "specific" for kw in keywords}
classifications = json.loads(json_match.group(1))
return classifications
except Exception as e:
logging.warning(f"Keyword classification failed: {e}. Defaulting to all specific.")
return {kw: "specific" for kw in keywords}
# Step 1: Search DDG to find Flickr photo IDs
logging.info(f"Searching DDG with query: '{search_query} site:flickr.com'")
photo_ids = search_ddg_for_flickr(search_query)
if photo_ids:
for photo_id in photo_ids:
photo = fetch_photo_by_id(photo_id)
if photo:
result = process_photo(photo)
if result:
return result
# Step 2: Break down the query into keywords and classify them for direct Flickr API search
keywords = search_query.lower().split()
if len(keywords) > 1:
classifications = classify_keywords(keywords)
logging.info(f"Keyword classifications: {classifications}")
# Prioritize specific keywords
specific_keywords = [kw for kw, classification in classifications.items() if classification == "specific"]
if specific_keywords:
for keyword in specific_keywords:
logging.info(f"Searching Flickr with specific keyword: '{keyword}'")
photos = search_flickr(keyword)
for photo in photos:
result = process_photo(photo)
if result:
return result
# Step 3: Final fallback using relevance keywords
fallback_query = " ".join(relevance_keywords) if isinstance(relevance_keywords, list) else relevance_keywords
logging.info(f"No results found. Falling back to generic query: '{fallback_query}'")
photos = search_flickr(fallback_query)
for photo in photos:
result = process_photo(photo)
if result:
return result
logging.warning(f"No valid Flickr image found for query '{search_query}' after all attempts.")
return None, None, None, None
def select_best_author(summary):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Based on this restaurant/food industry trend summary, pick the most suitable author from: "
"owenjohnson, javiermorales, aishapatel, trangnguyen, keishareid, lilamoreau. "
"Consider their expertise: owenjohnson (global dining trends), javiermorales (food critique), "
"aishapatel (emerging food trends), trangnguyen (cultural dining), keishareid (soul food heritage), "
"lilamoreau (global street food). Return only the username."
)},
{"role": "user", "content": summary}
],
max_tokens=20
)
author = response.choices[0].message.content.strip()
valid_authors = ["owenjohnson", "javiermorales", "aishapatel", "trangnguyen", "keishareid", "lilamoreau"]
logging.info(f"Selected author: {author}")
return author if author in valid_authors else "owenjohnson"
except Exception as e:
logging.error(f"Author selection failed: {e}")
return "owenjohnson"
def prepare_post_data(final_summary, original_title, context_info=""):
innovative_title = generate_title_from_summary(final_summary)
if not innovative_title:
logging.info(f"Title generation failed for '{original_title}' {context_info}")
return None, None, None, None, None, None, None
search_query, relevance_keywords = generate_image_query(f"{innovative_title}\n\n{final_summary}")
if not search_query:
logging.info(f"Image query generation failed for '{innovative_title}' {context_info}")
return None, None, None, None, None, None, None
logging.info(f"Fetching Flickr image for query: '{search_query}' {context_info}")
image_url, image_source, uploader, page_url = get_flickr_image(search_query, relevance_keywords)
if not image_url:
logging.info(f"Flickr fetch failed for '{search_query}' - falling back to Pixabay {context_info}")
image_query, _ = generate_image_query(f"{innovative_title}\n\n{final_summary}")
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
logging.info(f"Pixabay fetch failed for title '{innovative_title}' - falling back to summary {context_info}")
image_query, _ = generate_image_query(f"{final_summary}")
image_url, image_source, uploader, page_url = get_image(image_query)
if not image_url:
logging.info(f"Image fetch failed again for '{original_title}' - proceeding without image {context_info}")
post_data = {"title": innovative_title, "content": final_summary}
selected_username = select_best_author(final_summary)
author = next((a for a in AUTHORS if a["username"] == selected_username), None)
if not author:
logging.error(f"Author '{selected_username}' not found in AUTHORS, defaulting to owenjohnson")
author = {"username": "owenjohnson", "password": "rfjk xhn6 2RPy FuQ9 cGlU K8mC"}
category = generate_category_from_summary(final_summary)
return post_data, author, category, image_url, image_source, uploader, page_url
def save_post_to_recent(post_title, post_url, author_username, timestamp):
try:
recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json')
entry = {
"title": post_title,
"url": post_url,
"author_username": author_username,
"timestamp": timestamp
}
recent_posts.append(entry)
with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f:
for item in recent_posts:
json.dump(item, f)
f.write('\n')
logging.info(f"Saved post '{post_title}' to recent_posts.json")
except Exception as e:
logging.error(f"Failed to save post to recent_posts.json: {e}")
def prune_recent_posts():
try:
cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json')
recent_posts = [entry for entry in recent_posts if entry["timestamp"] > cutoff]
with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f:
for item in recent_posts:
json.dump(item, f)
f.write('\n')
logging.info(f"Pruned recent_posts.json to {len(recent_posts)} entries")
except Exception as e:
logging.error(f"Failed to prune recent_posts.json: {e}")