You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1946 lines
89 KiB
1946 lines
89 KiB
import base64 |
|
import json |
|
import logging |
|
import os |
|
import random |
|
import re |
|
from PIL import Image |
|
import pytesseract |
|
import io |
|
import tempfile |
|
import shutil |
|
import requests |
|
import time |
|
import openai |
|
import psutil |
|
from duckduckgo_search import DDGS |
|
from requests_oauthlib import OAuth1 |
|
from dotenv import load_dotenv |
|
from datetime import datetime, timezone, timedelta |
|
from openai import OpenAI |
|
from urllib.parse import quote |
|
from bs4 import BeautifulSoup |
|
from requests.adapters import HTTPAdapter |
|
from requests.packages.urllib3.util.retry import Retry |
|
import tweepy |
|
import flickr_api |
|
from filelock import FileLock |
|
from foodie_config import ( |
|
RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS, |
|
get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL, X_API_CREDENTIALS, |
|
FLICKR_API_KEY, FLICKR_API_SECRET, PIXABAY_API_KEY, RECENT_POSTS_FILE, USED_IMAGES_FILE, IMAGE_EXPIRATION_DAYS |
|
) |
|
from PIL import ImageEnhance, ImageFilter |
|
|
|
last_author_index = -1 |
|
# Global to track round-robin index |
|
round_robin_index = 0 |
|
# Define logger at module level |
|
logger = logging.getLogger(__name__) |
|
|
|
load_dotenv() |
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
IMAGE_UPLOAD_TIMEOUT = 30 # Added to fix NameError |
|
|
|
IMAGE_EXPIRATION_DAYS = 7 # 7 days, consistent with foodie_automator_rss.py |
|
|
|
def load_json_file(file_path, expiration_hours=None, default=None): |
|
""" |
|
Load JSON file, handling specific cases for author_state.json and other files. |
|
Args: |
|
file_path (str): Path to the JSON file. |
|
expiration_hours (float): Hours after which entries expire (for list-based files). |
|
default: Default value to return if file is missing or invalid. |
|
Returns: |
|
Loaded data or default value. |
|
""" |
|
logger = logging.getLogger(__name__) |
|
|
|
# Set default based on file type |
|
if default is None: |
|
if "author_state" in file_path: |
|
default = {"last_author_index": -1} |
|
elif "rate_limit_info" in file_path or "notification_tracking" in file_path: |
|
default = {} |
|
else: |
|
default = [] |
|
|
|
# Return default if file doesn't exist |
|
if not os.path.exists(file_path): |
|
logger.info(f"File {file_path} does not exist. Returning default: {default}") |
|
return default |
|
|
|
try: |
|
with open(file_path, 'r') as f: |
|
data = json.load(f) |
|
|
|
# Handle author_state.json (expects dict with last_author_index) |
|
if "author_state" in file_path: |
|
if not isinstance(data, dict): |
|
logger.warning(f"Data in {file_path} is not a dictionary, resetting to default") |
|
return default |
|
if "last_author_index" not in data: |
|
logger.warning(f"Missing last_author_index in {file_path}, resetting to default") |
|
return default |
|
return data |
|
|
|
# Handle rate_limit_info.json and notification_tracking.json (expect dict) |
|
if "rate_limit_info" in file_path or "notification_tracking" in file_path: |
|
if not isinstance(data, dict): |
|
logger.warning(f"Data in {file_path} is not a dictionary, resetting to default") |
|
return default |
|
return data |
|
|
|
# Handle list-based files |
|
if not isinstance(data, list): |
|
logger.warning(f"Data in {file_path} is not a list, resetting to default") |
|
return default |
|
|
|
# Apply expiration filtering for list-based files |
|
if expiration_hours is not None: |
|
# Use days for used_images.json, hours for others |
|
if "used_images" in file_path: |
|
expiration_delta = timedelta(days=expiration_hours) |
|
else: |
|
expiration_delta = timedelta(hours=expiration_hours) |
|
|
|
cutoff = datetime.now(timezone.utc) - expiration_delta |
|
filtered_data = [] |
|
for entry in data: |
|
if not isinstance(entry, dict) or "title" not in entry or "timestamp" not in entry: |
|
logger.warning(f"Skipping malformed entry in {file_path}: {entry}") |
|
continue |
|
try: |
|
timestamp = datetime.fromisoformat(entry["timestamp"]) |
|
if timestamp > cutoff: |
|
filtered_data.append(entry) |
|
except ValueError as e: |
|
logger.warning(f"Invalid timestamp in {file_path} entry {entry}: {e}") |
|
continue |
|
|
|
if len(filtered_data) < len(data): |
|
logger.info(f"Filtered {len(data) - len(filtered_data)} expired entries from {file_path}") |
|
save_json_file(file_path, filtered_data) |
|
data = filtered_data |
|
|
|
logger.info(f"Loaded {len(data)} valid entries from {file_path}") |
|
return data |
|
except json.JSONDecodeError as e: |
|
logger.error(f"Invalid JSON in {file_path}: {str(e)}. Resetting to default.") |
|
save_json_file(file_path, default) |
|
return default |
|
except Exception as e: |
|
logger.error(f"Failed to load {file_path}: {str(e)}. Returning default.") |
|
return default |
|
|
|
def save_json_file(file_path, data, timestamp=None): |
|
""" |
|
Save data to JSON file atomically. If timestamp is provided, append as an entry. |
|
""" |
|
logger = logging.getLogger(__name__) |
|
try: |
|
# If timestamp is provided, append as a new entry |
|
if timestamp: |
|
current_data = load_json_file(file_path, default=[]) |
|
new_entry = {'title': data, 'timestamp': timestamp} |
|
if new_entry not in current_data: # Avoid duplicates |
|
current_data.append(new_entry) |
|
data = current_data |
|
else: |
|
logger.info(f"Entry {data} already exists in {file_path}") |
|
return True |
|
|
|
# Validate JSON |
|
json.dumps(data) |
|
|
|
# Write to temp file |
|
temp_file = tempfile.NamedTemporaryFile('w', delete=False, encoding='utf-8') |
|
with open(temp_file.name, 'w', encoding='utf-8') as f: |
|
json.dump(data, f, indent=2) |
|
|
|
# Atomically move to target |
|
shutil.move(temp_file.name, file_path) |
|
logger.info(f"Saved data to {file_path}") |
|
return True |
|
except (json.JSONDecodeError, IOError) as e: |
|
logger.error(f"Failed to save {file_path}: {str(e)}") |
|
return False |
|
|
|
def generate_article_tweet(author, post, persona, summary=""): |
|
title = post["title"] |
|
url = post["url"] |
|
author_handle = f"@{author['username']}" |
|
|
|
prompt = ( |
|
f"Craft a sharp tweet (under 230 characters) for {author_handle} with the voice of '{persona}'. " |
|
f"Distill the essence of the article '{title}' and its summary into a concise, engaging message. " |
|
f"Summary: {summary}\n" |
|
f"Include one specific detail from the summary (e.g., a unique dish, location, or trend). " |
|
f"Include the raw URL '{url}' at the end. " |
|
f"Make it bold, spark curiosity, and invite engagement with a human touch. " |
|
f"Swap 'elevate' for dynamic terms like 'ignite' or 'unleash'. " |
|
f"Skip hashtags, emojis, or phrases like '[Read more]' or 'Read more'. " |
|
f"Skip any extra fluff or formatting around the URL—just append the raw URL after a space. " |
|
f"Example: 'Craving sushi? This Tokyo spot is unreal! {url}'" |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model=SUMMARY_MODEL, |
|
messages=[ |
|
{"role": "system", "content": "You are a social media viral expert crafting engaging tweets."}, |
|
{"role": "user", "content": prompt} |
|
], |
|
max_tokens=100, |
|
temperature=0.7 |
|
) |
|
|
|
tweet = response.choices[0].message.content.strip() |
|
|
|
# Post-generation check: Strip any emojis using regex |
|
tweet = re.sub(r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F700-\U0001F77F\U0001F780-\U0001F7FF\U0001F800-\U0001F8FF\U0001F900-\U0001F9FF\U0001FA00-\U0001FA6F\U0001FA70-\U0001FAFF\U00002702-\U000027B0\U000024C2-\U0001F251]', '', tweet).strip() |
|
|
|
# Strip "[Read more]" or similar phrases as an additional failsafe |
|
tweet = re.sub(r'\[Read more\]\(.*?\)|\bRead more\b', '', tweet).strip() |
|
|
|
# Strip leading or trailing quotation marks |
|
tweet = tweet.strip('"\'') |
|
|
|
# Remove the URL if it already exists in the tweet to avoid duplication |
|
tweet = re.sub(rf'\s*{re.escape(url)}$', '', tweet).strip() |
|
|
|
# Ensure tweet fits within 280 characters, accounting for URL (Twitter shortens to 23 chars) |
|
url_length = 23 |
|
max_tweet_length = 280 - url_length - 1 # Subtract 1 for the space before URL |
|
if len(tweet) > max_tweet_length: |
|
tweet = tweet[:max_tweet_length-3] + "..." |
|
|
|
# Append the URL exactly once |
|
tweet = tweet + " " + url |
|
|
|
logging.info(f"Generated tweet: {tweet}") |
|
return tweet |
|
|
|
def post_tweet(author, content, media_ids=None, reply_to_id=None, tweet_type="rss"): |
|
""" |
|
Post a tweet for the given author using X API v2. |
|
Returns (tweet_id, tweet_data) on success, (None, None) on failure. |
|
""" |
|
logger = logging.getLogger(__name__) |
|
username = author['username'] |
|
credentials = X_API_CREDENTIALS.get(username) |
|
if not credentials: |
|
logger.error(f"No X API credentials found for {username}") |
|
return None, None |
|
|
|
# Check rate limit |
|
can_post, remaining, reset = check_author_rate_limit(author) |
|
if not can_post: |
|
reset_time = datetime.fromtimestamp(reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') |
|
logger.info(f"Cannot post {tweet_type} tweet for {username}: rate-limited. Remaining: {remaining}, Reset at: {reset_time}") |
|
return None, None |
|
|
|
oauth = OAuth1( |
|
client_key=credentials['api_key'], |
|
client_secret=credentials['api_secret'], |
|
resource_owner_key=credentials['access_token'], |
|
resource_owner_secret=credentials['access_token_secret'] |
|
) |
|
url = 'https://api.x.com/2/tweets' |
|
payload = {'text': content} |
|
if media_ids: |
|
payload['media'] = {'media_ids': media_ids} |
|
if reply_to_id: |
|
payload['reply'] = {'in_reply_to_tweet_id': reply_to_id} |
|
|
|
try: |
|
response = requests.post(url, json=payload, auth=oauth) |
|
headers = response.headers |
|
|
|
# Update rate limit info |
|
rate_limit_file = '/home/shane/foodie_automator/rate_limit_info.json' |
|
rate_limit_info = load_json_file(rate_limit_file, default={}) |
|
if username in rate_limit_info: |
|
author_info = rate_limit_info[username] |
|
|
|
if response.status_code == 201: |
|
# Successful post - update remaining tweets and increment posted count |
|
author_info['tweets_posted_in_run'] = author_info.get('tweets_posted_in_run', 0) + 1 |
|
author_info['tweet_remaining'] = remaining - 1 # Decrement remaining tweets |
|
rate_limit_info[username] = author_info |
|
save_json_file(rate_limit_file, rate_limit_info) |
|
logger.info(f"Updated rate limit info for {username} ({tweet_type}): {remaining-1}/17 tweets remaining") |
|
elif response.status_code == 429: |
|
# Rate limit exceeded - update with API values |
|
remaining_str = headers.get('x-user-limit-24hour-remaining') |
|
reset_str = headers.get('x-user-limit-24hour-reset') |
|
if remaining_str is not None and reset_str is not None: |
|
try: |
|
remaining = int(remaining_str) |
|
reset = int(reset_str) |
|
author_info['tweet_remaining'] = remaining |
|
author_info['tweet_reset'] = reset |
|
author_info['tweets_posted_in_run'] = 0 # Reset the counter when rate limit is hit |
|
rate_limit_info[username] = author_info |
|
save_json_file(rate_limit_file, rate_limit_info) |
|
logger.info(f"Updated rate limit info from API for {username}: {remaining}/17 tweets remaining") |
|
except ValueError: |
|
logger.error(f"Failed to parse rate limit headers for {username}") |
|
else: |
|
logger.error(f"Missing rate limit headers for {username}") |
|
|
|
if response.status_code == 201: |
|
tweet_data = response.json() |
|
tweet_id = tweet_data.get('data', {}).get('id') |
|
logger.info(f"Successfully tweeted {tweet_type} for {username}: {content[:50]}... (ID: {tweet_id})") |
|
return tweet_id, tweet_data |
|
elif response.status_code == 429: |
|
logger.info(f"Rate limit exceeded for {username} ({tweet_type}): {remaining} remaining, reset at {datetime.fromtimestamp(reset, tz=timezone.utc)}") |
|
return None, None |
|
elif response.status_code == 403: |
|
error_data = response.json() |
|
error_message = error_data.get('detail', '') |
|
if "account is temporarily locked" in error_message.lower(): |
|
logger.error(f"Account lock detected for {username}: {error_message}") |
|
send_account_lock_alert(username, error_message) |
|
else: |
|
logger.error(f"Unexpected 403 response for {username}: {error_message}") |
|
return None, None |
|
else: |
|
logger.error(f"Failed to post {tweet_type} tweet for {username}: {response.status_code} - {response.text}") |
|
return None, None |
|
|
|
except Exception as e: |
|
logger.error(f"Unexpected error posting {tweet_type} tweet for {username}: {e}", exc_info=True) |
|
return None, None |
|
|
|
def select_best_persona(interest_score, content=""): |
|
logging.info("Using select_best_persona with interest_score and content") |
|
personas = ["Visionary Editor", "Foodie Critic", "Trend Scout", "Culture Connoisseur"] |
|
content_lower = content.lower() |
|
|
|
if any(kw in content_lower for kw in ["tech", "ai", "innovation", "sustainability"]): |
|
return random.choice(["Trend Scout", "Visionary Editor"]) |
|
elif any(kw in content_lower for kw in ["review", "critic", "taste", "flavor"]): |
|
return "Foodie Critic" |
|
elif any(kw in content_lower for kw in ["culture", "tradition", "history"]): |
|
return "Culture Connoisseur" |
|
|
|
if interest_score >= 8: |
|
return random.choice(personas[:2]) |
|
elif interest_score >= 6: |
|
return random.choice(personas[2:]) |
|
return random.choice(personas) |
|
|
|
def generate_image_query(title, summary): |
|
try: |
|
prompt = ( |
|
"Given the following article title and summary, generate a concise image search query (max 5 words) to find a relevant image. " |
|
"Also provide a list of relevance keywords (max 5 words) that should be associated with the image. " |
|
"Return the result as a JSON object with 'search' and 'relevance' keys.\n\n" |
|
f"Title: {title}\n\n" |
|
f"Summary: {summary}\n\n" |
|
"Example output:\n" |
|
"```json\n" |
|
"{\"search\": \"Italian cuisine trends\", \"relevance\": \"pasta wine dining culture\"}\n" |
|
"```" |
|
) |
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": prompt}, |
|
{"role": "user", "content": "Generate an image search query and relevance keywords."} |
|
], |
|
max_tokens=100, |
|
temperature=0.5 |
|
) |
|
raw_response = response.choices[0].message.content |
|
json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response) |
|
if not json_match: |
|
logging.warning(f"Failed to parse image query JSON: {raw_response}") |
|
return title, [], True |
|
|
|
query_data = json.loads(json_match.group(1)) |
|
search_query = query_data.get("search", title) |
|
relevance_keywords = query_data.get("relevance", "").split() |
|
|
|
# Log the JSON object in a single line |
|
log_json = json.dumps(query_data).replace('\n', ' ').replace('\r', ' ') |
|
logging.debug(f"Image query from content: {log_json}") |
|
|
|
return search_query, relevance_keywords, False |
|
except Exception as e: |
|
logging.warning(f"Image query generation failed: {e}. Using title as fallback.") |
|
return title, [], True |
|
|
|
def smart_image_and_filter(title, summary): |
|
try: |
|
logging.info(f"Processing title: raw_title='{title}', summary='{summary[:100]}...'") |
|
content = f"{title}\n\n{summary}" |
|
|
|
prompt = ( |
|
"Analyze this article title and summary. Perform the following tasks:\n" |
|
"1. Extract the most specific and defining term (e.g., a proper noun like 'Ozempic', a unique concept like 'GLP-1', or a niche topic like 'Sushi') that makes the article distinct.\n" |
|
"2. Generate a concise image search query (3-7 words) that MUST include the most specific term from step 1, combined with relevant contextual keywords (e.g., 'dining', 'trends').\n" |
|
"3. Identify the main topic of the article (e.g., a specific food item or cuisine).\n" |
|
"4. List relevance keywords (up to 5) for the image search, including the specific term and related concepts.\n" |
|
"5. Determine if the article should be skipped based on these rules:\n" |
|
" - SKIP if about home appliances, recipes, promotions, or contains '[homemade]' or 'homemade'.\n" |
|
" - SKIP if it includes recipe-related terms like 'cook', 'bake', or 'ingredient'.\n" |
|
" - KEEP otherwise.\n" |
|
"Return as JSON with double quotes for all property names and string values (e.g., " |
|
"{\"image_query\": \"Ozempic dining trends\", \"specific_term\": \"Ozempic\", \"relevance\": [\"Ozempic\", \"dining\", \"trends\"], \"main_topic\": \"dining trends\", \"action\": \"KEEP\"})." |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": prompt}, |
|
{"role": "user", "content": content} |
|
], |
|
max_tokens=150 |
|
) |
|
raw_result = response.choices[0].message.content.strip() |
|
logging.debug(f"Raw GPT response: '{raw_result}'") |
|
|
|
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip() |
|
fixed_result = re.sub(r"(?<!\\)'(?=\s*[\w\s]*\])|(?<=\[|\{|\s)'|'(?=\s*[\]\},:])|(?<=\w)'(?=\s*:)", '"', cleaned_result) |
|
|
|
try: |
|
result = json.loads(fixed_result) |
|
if not isinstance(result, dict) or "image_query" not in result or "specific_term" not in result or "relevance" not in result or "action" not in result: |
|
logging.warning(f"Invalid GPT response format: {result}, checking action before fallback") |
|
if isinstance(result, dict) and result.get("action") == "SKIP": |
|
logging.info(f"Respecting AI SKIP action for '{title}'") |
|
return "food trends", ["food"], "food", True |
|
main_topic = extract_main_topic(title.lower() + " " + summary.lower()) |
|
skip_flag = ( |
|
"[homemade]" in title.lower() or |
|
"homemade" in title.lower() or |
|
"homemade" in summary.lower() or |
|
any(kw in title.lower() or kw in summary.lower() for kw in RECIPE_KEYWORDS) |
|
) |
|
logging.info(f"Fallback for '{title}': Skip={skip_flag}, Reasons: " |
|
f"homemade_in_title={'[homemade]' in title.lower() or 'homemade' in title.lower()}, " |
|
f"homemade_in_summary={'homemade' in summary.lower()}, " |
|
f"recipe_keywords={any(kw in title.lower() or kw in summary.lower() for kw in RECIPE_KEYWORDS)}") |
|
return main_topic, [main_topic, "food"], main_topic, skip_flag |
|
except json.JSONDecodeError as e: |
|
logging.warning(f"JSON parsing failed: {e}, raw: '{fixed_result}'. Using fallback.") |
|
main_topic = extract_main_topic(title.lower() + " " + summary.lower()) |
|
skip_flag = ( |
|
"[homemade]" in title.lower() or |
|
"homemade" in title.lower() or |
|
"homemade" in summary.lower() or |
|
any(kw in title.lower() or kw in summary.lower() for kw in RECIPE_KEYWORDS) |
|
) |
|
logging.info(f"Fallback for '{title}': Skip={skip_flag}, Reasons: " |
|
f"homemade_in_title={'[homemade]' in title.lower() or 'homemade' in title.lower()}, " |
|
f"homemade_in_summary={'homemade' in summary.lower()}, " |
|
f"recipe_keywords={any(kw in title.lower() or kw in summary.lower() for kw in RECIPE_KEYWORDS)}") |
|
return main_topic, [main_topic, "food"], main_topic, skip_flag |
|
|
|
image_query = result["image_query"] |
|
specific_term = result["specific_term"] |
|
relevance_keywords = result["relevance"] |
|
main_topic = result.get("main_topic", extract_main_topic(title.lower() + " " + summary.lower())) |
|
skip_flag = ( |
|
result["action"] == "SKIP" or # Fixed typo: "aison" → "action" |
|
"[homemade]" in title.lower() or |
|
"homemade" in title.lower() or |
|
"homemade" in summary.lower() or |
|
any(kw in title.lower() or kw in summary.lower() for kw in RECIPE_KEYWORDS) |
|
) |
|
|
|
logging.info(f"Smart image query: {image_query}, Specific Term: {specific_term}, Relevance: {relevance_keywords}, Main Topic: {main_topic}, Skip: {skip_flag}, " |
|
f"Reasons: action={result['action']}, " |
|
f"homemade_in_title={'[homemade]' in title.lower() or 'homemade' in title.lower()}, " |
|
f"homemade_in_summary={'homemade' in summary.lower()}, " |
|
f"recipe_keywords={any(kw in title.lower() or kw in summary.lower() for kw in RECIPE_KEYWORDS)}") |
|
|
|
specific_single_words = ["kimchi", "sushi", "pizza", "taco", "burger", "chipotle", "starbucks", "mcdonalds"] |
|
if not image_query: |
|
logging.warning(f"Image query is empty, using fallback") |
|
return main_topic, [main_topic, "food"], main_topic, skip_flag |
|
if len(image_query.split()) < 2 and image_query.lower() not in specific_single_words: |
|
logging.warning(f"Image query '{image_query}' too vague, using fallback") |
|
return main_topic, [main_topic, "food"], main_topic, skip_flag |
|
|
|
return image_query, relevance_keywords, main_topic, skip_flag, specific_term |
|
|
|
except Exception as e: |
|
logging.error(f"Smart image/filter failed: {e}, using fallback") |
|
main_topic = extract_main_topic(title.lower() + " " + summary.lower()) |
|
skip_flag = ( |
|
"[homemade]" in title.lower() or |
|
"homemade" in title.lower() or |
|
"homemade" in summary.lower() or |
|
any(kw in title.lower() or kw in summary.lower() for kw in RECIPE_KEYWORDS) |
|
) |
|
logging.info(f"Fallback for '{title}': Skip={skip_flag}, Reasons: " |
|
f"homemade_in_title={'[homemade]' in title.lower() or 'homemade' in title.lower()}, " |
|
f"homemade_in_summary={'homemade' in summary.lower()}, " |
|
f"recipe_keywords={any(kw in title.lower() or kw in summary.lower() for kw in RECIPE_KEYWORDS)}") |
|
return main_topic, [main_topic, "food"], main_topic, skip_flag, "food" |
|
|
|
def extract_main_topic(text): |
|
# Common food-related keywords (expand as needed) |
|
food_keywords = ["kimchi", "sushi", "pizza", "taco", "burger", "ramen", "curry", "pasta", "salad", "soup"] |
|
for keyword in food_keywords: |
|
if keyword in text: |
|
return keyword |
|
# Fallback to a generic term if no specific food item is found |
|
return "food trends" |
|
|
|
def upload_image_to_wp(image_url, post_title, wp_base_url, wp_username, wp_password, image_source="Pixabay", uploader=None, page_url=None): |
|
try: |
|
safe_title = post_title.encode('ascii', 'ignore').decode('ascii').replace(' ', '_')[:50] |
|
headers = { |
|
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}", |
|
"Content-Disposition": f"attachment; filename={safe_title}.jpg", |
|
"Content-Type": "image/jpeg" |
|
} |
|
image_headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
} |
|
logging.info(f"Fetching image from {image_url} for '{post_title}'") |
|
|
|
image_response = None |
|
for attempt in range(3): |
|
try: |
|
image_response = requests.get(image_url, headers=image_headers, timeout=IMAGE_UPLOAD_TIMEOUT) |
|
if image_response.status_code == 429: |
|
wait_time = 10 * (2 ** attempt) |
|
logging.warning(f"Rate limit hit for {image_url}. Retrying after {wait_time}s (attempt {attempt+1}/3).") |
|
time.sleep(wait_time) |
|
continue |
|
image_response.raise_for_status() |
|
break |
|
except requests.exceptions.RequestException as e: |
|
logging.warning(f"Image fetch failed for {image_url} (attempt {attempt+1}/3): {e}") |
|
if attempt == 2: |
|
logging.error(f"Failed to fetch image {image_url} after 3 attempts") |
|
return None |
|
time.sleep(10 * (2 ** attempt)) |
|
else: |
|
logging.error(f"Failed to fetch image {image_url} after retries") |
|
return None |
|
|
|
if image_response is None: |
|
logging.error(f"Image response is None for {image_url}, cannot proceed with upload") |
|
return None |
|
|
|
response = requests.post( |
|
f"{wp_base_url}/media", |
|
headers=headers, |
|
data=image_response.content |
|
) |
|
response.raise_for_status() |
|
|
|
image_id = response.json()["id"] |
|
if page_url: |
|
# Updated caption: "Image via" in grey, source name in default link color |
|
caption = f'<span style="color: grey;">Image via </span><a href="{page_url}">{image_source}</a>' |
|
else: |
|
caption = image_source |
|
requests.post( |
|
f"{wp_base_url}/media/{image_id}", |
|
headers={"Authorization": headers["Authorization"], "Content-Type": "application/json"}, |
|
json={"caption": caption} |
|
) |
|
|
|
logging.info(f"Uploaded image '{safe_title}.jpg' to WP (ID: {image_id}) with caption '{caption}'") |
|
return image_id |
|
except Exception as e: |
|
logging.error(f"Image upload to WP failed for '{post_title}': {e}") |
|
print(f"Image upload to WP failed for '{post_title}': {e}") |
|
return None |
|
|
|
def determine_paragraph_count(interest_score): |
|
if interest_score >= 9: |
|
return 5 |
|
elif interest_score >= 7: |
|
return 4 |
|
return 3 |
|
|
|
def is_interesting(summary): |
|
try: |
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": ( |
|
"Rate this content from 0-10 based on its rarity, buzzworthiness, and engagement potential for food lovers, covering a wide range of food topics (skip recipes). " |
|
"Score 8-10 for rare, highly shareable ideas that grab attention. " |
|
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. " |
|
"Return only a number." |
|
)}, |
|
{"role": "user", "content": f"Content: {summary}"} |
|
], |
|
max_tokens=5 |
|
) |
|
raw_score = response.choices[0].message.content.strip() |
|
score = int(raw_score) if raw_score.isdigit() else 0 |
|
print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})") |
|
logging.info(f"Interest Score: {score} (raw: {raw_score})") |
|
return score |
|
except Exception as e: |
|
logging.error(f"Interestingness scoring failed: {e}") |
|
print(f"Interest Error: {e}") |
|
return 0 |
|
|
|
def generate_title_from_summary(summary): |
|
banned_words = ["elevate", "elevating", "elevated"] |
|
for attempt in range(3): |
|
try: |
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": ( |
|
"Generate a concise, engaging title (under 100 characters) based on this summary, covering food topics. " |
|
"Craft it with Upworthy/Buzzfeed flair—think 'you won't believe this' or 'this is nuts'—for food insiders. " |
|
"Avoid quotes, emojis, special characters, or the words 'elevate', 'elevating', 'elevated'. " |
|
"End with a question to spark shares." |
|
)}, |
|
{"role": "user", "content": f"Summary: {summary}"} |
|
], |
|
max_tokens=30 |
|
) |
|
title = response.choices[0].message.content.strip().replace('"', '').replace("'", "") |
|
if ':' in title: |
|
title = title.split(':', 1)[1].strip() |
|
if len(title) > 100 or any(word in title.lower() for word in banned_words): |
|
reason = "length" if len(title) > 100 else "banned word" |
|
print(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}") |
|
logging.info(f"Rejected title (attempt {attempt + 1}/3): '{title}' due to {reason}") |
|
continue |
|
logging.info(f"Generated title: {title}") |
|
return title |
|
except Exception as e: |
|
logging.error(f"Title generation failed (attempt {attempt + 1}/3): {e}") |
|
print(f"Title Error: {e}") |
|
print("Failed to generate valid title after 3 attempts") |
|
logging.info("Failed to generate valid title after 3 attempts") |
|
return None |
|
|
|
def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_prompt=""): |
|
try: |
|
persona = select_best_persona(interest_score, content) |
|
persona_config = PERSONA_CONFIGS.get(persona, { |
|
"article_prompt": "Write a concise, engaging summary that captures the essence of the content for food lovers.", |
|
"description": "a generic food writer", |
|
"tone": "an engaging tone" |
|
}) |
|
prompt = persona_config["article_prompt"].format( |
|
description=persona_config["description"], |
|
tone=persona_config["tone"], |
|
num_paragraphs=determine_paragraph_count(interest_score) |
|
) |
|
logging.info(f"Using {persona} with interest_score and content") |
|
|
|
full_prompt = ( |
|
f"{prompt}\n\n" |
|
f"Do not include the article title in the summary.\n\n" |
|
f"{extra_prompt}\n\n" |
|
f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'.\n" |
|
f"Content to summarize:\n{content}\n\n" |
|
f"Source: {source_name}\n" |
|
f"Link: {link}" |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model=SUMMARY_MODEL, |
|
messages=[ |
|
{"role": "system", "content": full_prompt}, |
|
{"role": "user", "content": content} |
|
], |
|
max_tokens=1000, |
|
temperature=0.7 |
|
) |
|
|
|
summary = response.choices[0].message.content.strip() |
|
|
|
# Post-process to remove the original title if it still appears |
|
# Extract the title from the content (assuming it's the first line or part of the prompt) |
|
# For simplicity, we can pass the title as an additional parameter if needed |
|
# Here, we'll assume the title is passed via the calling function (e.g., from foodie_automator_rss.py) |
|
# For now, we'll use a placeholder for the title removal logic |
|
# In foodie_automator_rss.py, the title is available as entry.title |
|
# We'll handle the title removal in the calling script instead |
|
logging.info(f"Processed summary (Persona: {persona}): {summary}") |
|
return summary |
|
|
|
except Exception as e: |
|
logging.error(f"Summary generation failed with model {SUMMARY_MODEL}: {e}") |
|
return None |
|
|
|
def insert_link_naturally(summary, source_name, source_url): |
|
try: |
|
logging.info(f"Input summary to insert_link_naturally: {summary!r}") |
|
|
|
# Split summary into paragraphs using \n\n (correct separator) |
|
paragraphs = summary.split('\n\n') |
|
if not paragraphs or all(not p.strip() for p in paragraphs): |
|
logging.error("No valid paragraphs to insert link.") |
|
return summary |
|
|
|
# Find paragraphs with at least two sentences |
|
eligible_paragraph_indices = [i for i, p in enumerate(paragraphs) if p.strip() and len(re.split(r'(?<=[.!?])\s+', p.strip())) >= 2] |
|
if not eligible_paragraph_indices: |
|
logging.warning("No paragraph with multiple sentences found, using fallback.") |
|
return append_link_as_fallback(summary, source_name, source_url) |
|
|
|
# Alternative phrases for variety (removed 'notes that' for natural flow) |
|
link_phrases = [ |
|
"according to {source}", |
|
"as reported by {source}" |
|
] |
|
|
|
best_candidate = None |
|
best_score = -1 |
|
best_paragraph_idx = None |
|
best_paragraph = None |
|
|
|
# Score each eligible paragraph and sentence for suitability |
|
for idx in eligible_paragraph_indices: |
|
para = paragraphs[idx] |
|
sentences = re.split(r'(?<=[.!?])\s+', para.strip()) |
|
eligible_sentences = [ |
|
(i, s) for i, s in enumerate(sentences) |
|
if s.strip() and not s.endswith('?') and not s.endswith('!') |
|
] |
|
if not eligible_sentences: |
|
continue |
|
for s_idx, sentence in eligible_sentences: |
|
score = 0 |
|
if any(word in sentence.lower() for word in ["is", "are", "has", "shows", "reveals"]): |
|
score += 2 |
|
score += len(sentence.split()) // 5 |
|
score += abs(s_idx - len(sentences) / 2) * -1 |
|
if score > best_score: |
|
best_score = score |
|
best_candidate = (s_idx, sentence) |
|
best_paragraph_idx = idx |
|
best_paragraph = para |
|
|
|
if best_candidate is None: |
|
logging.warning("No suitable sentence found, using fallback.") |
|
return append_link_as_fallback(summary, source_name, source_url) |
|
|
|
# Select a link phrase based on sentence structure |
|
sentence_idx, sentence = best_candidate |
|
link_phrase = random.choice(link_phrases) |
|
link_pattern = f'<a href="{source_url}">{source_name}</a>' |
|
formatted_link = link_phrase.format(source=link_pattern) |
|
|
|
# Insert the link at the end of the selected sentence (no capitalization needed) |
|
sentences = re.split(r'(?<=[.!?])\s+', best_paragraph.strip()) |
|
new_sentence = f"{sentence.rstrip('.')} {formatted_link}." |
|
sentences[sentence_idx] = new_sentence |
|
new_para = ' '.join(sentences) |
|
paragraphs[best_paragraph_idx] = new_para |
|
|
|
# Rejoin paragraphs with \n\n |
|
new_summary = '\n\n'.join(paragraphs) |
|
logging.info(f"Summary with naturally embedded link: {new_summary!r}") |
|
return new_summary |
|
|
|
except Exception as e: |
|
logging.error(f"Link insertion failed: {e}") |
|
return append_link_as_fallback(summary, source_name, source_url) |
|
|
|
def append_link_as_fallback(summary, source_name, source_url): |
|
"""Fallback method to append the link to the last paragraph.""" |
|
link_pattern = f'<a href="{source_url}">{source_name}</a>' |
|
# Split summary into paragraphs using the correct separator (\n\n) |
|
paragraphs = summary.split('\n\n') |
|
if not paragraphs: # Edge case: empty summary |
|
paragraphs = [""] |
|
# Append the credit to the last paragraph |
|
credit = f' We learned about this from {link_pattern}.' |
|
paragraphs[-1] += credit |
|
new_summary = '\n\n'.join(paragraphs) |
|
logging.info(f"Fallback summary with link appended to last paragraph: {new_summary!r}") |
|
return new_summary |
|
|
|
def generate_category_from_summary(summary): |
|
try: |
|
if not isinstance(summary, str) or not summary.strip(): |
|
logging.warning(f"Invalid summary for category generation: {summary}. Defaulting to 'Trends'.") |
|
return "Trends" |
|
|
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": ( |
|
"Based on this summary, select the most relevant category from: Buzz, Trends, Lifestyle, Culture, Health, Drink, Food, Eats. " |
|
"Return only the category name.\n" |
|
"\n" |
|
"Definitions and guidance for each category:\n" |
|
"- Buzz: Viral, trending, surprising, or highly shareable news, controversies, or social media moments.\n" |
|
"- Trends: New or emerging patterns, movements, or shifts in food, drink, or dining.\n" |
|
"- Lifestyle: Stories about people's habits, routines, wellness, or ways of living related to food and drink.\n" |
|
"- Culture: Food traditions, history, heritage, or cultural context.\n" |
|
"- Health: Nutrition, wellness, diets, or health impacts.\n" |
|
"- Drink: Beverages, coffee, tea, cocktails, or drink trends.\n" |
|
"- Food: General food topics, dishes, or ingredients.\n" |
|
"- Eats: Places to eat, restaurants, or food destinations.\n" |
|
"\n" |
|
"Try to balance category usage over time. Do NOT always default to 'Trends' or 'Food'—use 'Buzz' or 'Lifestyle' when appropriate.\n" |
|
"Return only the category name from the list above." |
|
)}, |
|
{"role": "user", "content": summary} |
|
], |
|
max_tokens=10 |
|
) |
|
category = response.choices[0].message.content.strip() |
|
logging.info(f"Generated category: {category}") |
|
return category if category in ["Buzz", "Trends", "Lifestyle", "Culture", "Health", "Drink", "Food", "Eats"] else "Trends" |
|
except Exception as e: |
|
logging.error(f"Category generation failed: {e}") |
|
return "Trends" |
|
|
|
def get_wp_category_id(category_name, wp_base_url, wp_username, wp_password): |
|
try: |
|
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"} |
|
response = requests.get(f"{wp_base_url}/categories", headers=headers, params={"search": category_name}) |
|
response.raise_for_status() |
|
categories = response.json() |
|
for cat in categories: |
|
if cat["name"].lower() == category_name.lower(): |
|
return cat["id"] |
|
return None |
|
except Exception as e: |
|
logging.error(f"Failed to get WP category ID for '{category_name}': {e}") |
|
return None |
|
|
|
def create_wp_category(category_name, wp_base_url, wp_username, wp_password): |
|
try: |
|
headers = { |
|
"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}", |
|
"Content-Type": "application/json" |
|
} |
|
payload = {"name": category_name} |
|
response = requests.post(f"{wp_base_url}/categories", headers=headers, json=payload) |
|
response.raise_for_status() |
|
return response.json()["id"] |
|
except Exception as e: |
|
logging.error(f"Failed to create WP category '{category_name}': {e}") |
|
return None |
|
|
|
def get_wp_tag_id(tag_name, wp_base_url, wp_username, wp_password): |
|
try: |
|
headers = {"Authorization": f"Basic {base64.b64encode(f'{wp_username}:{wp_password}'.encode()).decode()}"} |
|
response = requests.get(f"{wp_base_url}/tags", headers=headers, params={"search": tag_name}) |
|
response.raise_for_status() |
|
tags = response.json() |
|
for tag in tags: |
|
if tag["name"].lower() == tag_name.lower(): |
|
return tag["id"] |
|
return None |
|
except Exception as e: |
|
logging.error(f"Failed to get WP tag ID for '{tag_name}': {e}") |
|
return None |
|
|
|
def post_to_wp(post_data, category, link, author, image_url, original_source, image_source="Pixabay", uploader=None, page_url=None, interest_score=4, post_id=None, should_post_tweet=True, summary=None): |
|
""" |
|
Post or update content to WordPress, optionally tweeting the post. |
|
""" |
|
import logging |
|
import requests |
|
import base64 |
|
from foodie_config import X_API_CREDENTIALS |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
# Extract WordPress credentials from author dictionary |
|
wp_url = author.get("url") |
|
wp_username = author.get("username") |
|
wp_password = author.get("password") |
|
|
|
if not all([wp_url, wp_username, wp_password]): |
|
logger.error(f"Missing WordPress credentials for author: {wp_username or 'unknown'}") |
|
return None, None |
|
|
|
# Ensure wp_url ends with '/wp-json/wp/v2' |
|
if not wp_url.endswith('/wp-json/wp/v2'): |
|
wp_base_url = f"{wp_url.rstrip('/')}/wp-json/wp/v2" |
|
else: |
|
wp_base_url = wp_url |
|
|
|
# Hardcoded author ID map from old working version |
|
author_id_map = { |
|
"owenjohnson": 10, |
|
"javiermorales": 2, |
|
"aishapatel": 3, |
|
"trangnguyen": 12, |
|
"keishareid": 13, |
|
"lilamoreau": 7 |
|
} |
|
author_id = author_id_map.get(wp_username, 5) # Default to ID 5 if username not found |
|
|
|
try: |
|
headers = { |
|
"Authorization": "Basic " + base64.b64encode(f"{wp_username}:{wp_password}".encode()).decode(), |
|
"Content-Type": "application/json" |
|
} |
|
|
|
# Test authentication |
|
auth_test = requests.get(f"{wp_base_url}/users/me", headers=headers) |
|
auth_test.raise_for_status() |
|
logger.info(f"Auth test passed for {wp_username}: {auth_test.json()['id']}") |
|
|
|
# Get or create category ID |
|
category_id = get_wp_category_id(category, wp_base_url, wp_username, wp_password) |
|
if not category_id: |
|
category_id = create_wp_category(category, wp_base_url, wp_username, wp_password) |
|
if not category_id: |
|
logger.warning(f"Failed to get or create category '{category}', using default") |
|
category_id = 1 # Fallback to 'Uncategorized' |
|
else: |
|
logger.info(f"Created new category '{category}' with ID {category_id}") |
|
else: |
|
logger.info(f"Found existing category '{category}' with ID {category_id}") |
|
|
|
# Handle tags |
|
tags = [1] # Default tag ID (e.g., 'uncategorized') |
|
if interest_score >= 9: |
|
picks_tag_id = get_wp_tag_id("Picks", wp_base_url, wp_username, wp_password) |
|
if picks_tag_id and picks_tag_id not in tags: |
|
tags.append(picks_tag_id) |
|
logger.info(f"Added 'Picks' tag (ID: {picks_tag_id}) due to high interest score: {interest_score}") |
|
|
|
# Format content with <p> tags |
|
content = post_data["content"] |
|
if content is None: |
|
logger.error(f"Post content is None for title '{post_data['title']}' - using fallback") |
|
content = "Content unavailable. Check the original source for details." |
|
formatted_content = "\n".join(f"<p>{para}</p>" for para in content.split('\n') if para.strip()) |
|
|
|
# Upload image before posting |
|
image_id = None |
|
if image_url: |
|
logger.info(f"Attempting image upload for '{post_data['title']}', URL: {image_url}, source: {image_source}") |
|
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, page_url) |
|
if not image_id: |
|
logger.info(f"Flickr upload failed for '{post_data['title']}', falling back to Pixabay") |
|
pixabay_query = post_data["title"][:50] |
|
image_url, image_source, uploader, page_url = get_image(pixabay_query) |
|
if image_url: |
|
image_id = upload_image_to_wp(image_url, post_data["title"], wp_base_url, wp_username, wp_password, image_source, uploader, page_url) |
|
if not image_id: |
|
logger.warning(f"All image uploads failed for '{post_data['title']}' - posting without image") |
|
|
|
# Build payload |
|
payload = { |
|
"title": post_data["title"], |
|
"content": formatted_content, |
|
"status": post_data["status"], |
|
"categories": [category_id], |
|
"tags": tags, |
|
"author": author_id, |
|
"meta": { |
|
"original_link": link, |
|
"original_source": original_source, |
|
"interest_score": interest_score |
|
} |
|
} |
|
if image_id: |
|
payload["featured_media"] = image_id |
|
logger.info(f"Set featured image for post '{post_data['title']}': Media ID={image_id}") |
|
|
|
# Set endpoint for creating or updating post |
|
endpoint = f"{wp_base_url}/posts/{post_id}" if post_id else f"{wp_base_url}/posts" |
|
|
|
logger.debug(f"Sending POST to {endpoint} with payload: {json.dumps(payload, indent=2)}") |
|
response = requests.post(endpoint, headers=headers, json=payload) |
|
if response.status_code != 201 and response.status_code != 200: |
|
logger.error(f"WordPress API error: {response.status_code} - {response.text}") |
|
response.raise_for_status() |
|
|
|
post_info = response.json() |
|
if not isinstance(post_info, dict) or "id" not in post_info: |
|
raise ValueError(f"Invalid WP response: {post_info}") |
|
|
|
post_id = post_info["id"] |
|
post_url = post_info["link"] |
|
logger.info(f"{'Updated' if post_id else 'Posted'} WordPress post: {post_data['title']} (ID: {post_id})") |
|
|
|
# Save to recent posts |
|
timestamp = datetime.now(timezone.utc).isoformat() |
|
save_post_to_recent(post_data["title"], post_url, wp_username, timestamp) |
|
|
|
# Post tweet if enabled |
|
if should_post_tweet: |
|
credentials = X_API_CREDENTIALS.get(post_data["author"]) |
|
if credentials: |
|
# Select persona for the tweet (same logic as used in summarize_with_gpt4o) |
|
persona = select_best_persona(interest_score, post_data["content"]) |
|
logger.info(f"Selected persona for tweet: {persona}") |
|
# Generate GPT-based tweet |
|
tweet_post = { |
|
"title": post_data["title"], |
|
"url": post_url |
|
} |
|
# Use the provided summary if available, otherwise fall back to post_data["content"] |
|
tweet_summary = summary if summary is not None else post_data["content"] |
|
tweet_text = generate_article_tweet(author, tweet_post, persona, summary=tweet_summary) |
|
tweet_id, tweet_data = post_tweet(author, tweet_text, tweet_type="rss") |
|
if tweet_id: |
|
logger.info(f"Successfully tweeted for post: {post_data['title']} (Tweet ID: {tweet_id})") |
|
else: |
|
logger.warning(f"Failed to tweet for post: {post_data['title']}") |
|
|
|
return post_id, post_url |
|
|
|
except requests.exceptions.HTTPError as e: |
|
logger.error(f"Failed to {'update' if post_id else 'post'} WordPress post: {post_data['title']}: {e} - Response: {e.response.text}", exc_info=True) |
|
return None, None |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Failed to {'update' if post_id else 'post'} WordPress post: {post_data['title']}: {e}", exc_info=True) |
|
return None, None |
|
except Exception as e: |
|
logger.error(f"Failed to {'update' if post_id else 'post'} WordPress post: {post_data['title']}: {e}", exc_info=True) |
|
return None, None |
|
|
|
# Configure Flickr API with credentials |
|
flickr_api.set_keys(api_key=FLICKR_API_KEY, api_secret=FLICKR_API_SECRET) |
|
logging.info(f"Flickr API configured with key: {FLICKR_API_KEY[:4]}... and secret: {FLICKR_API_SECRET[:4]}...") |
|
|
|
# Global variable to track the last Flickr request time |
|
last_flickr_request_time = 0 |
|
|
|
# Flickr request counter |
|
flickr_request_count = 0 |
|
flickr_request_start_time = time.time() |
|
|
|
# Define exclude keywords for filtering unwanted image types |
|
exclude_keywords = [ |
|
"poster", "infographic", "chart", "graph", "data", "stats", "text", "typography", |
|
"design", "advertisement", "illustration", "diagram", "layout", "print" |
|
] |
|
|
|
# Initialize used_images as a set to track used image URLs |
|
used_images_file = "/home/shane/foodie_automator/used_images.json" |
|
used_images = set() |
|
|
|
# Load used images from file if it exists |
|
if os.path.exists(used_images_file): |
|
try: |
|
entries = load_json_file(used_images_file, IMAGE_EXPIRATION_DAYS * 24) # Use load_json_file for consistency |
|
for entry in entries: |
|
if isinstance(entry, dict) and "title" in entry and entry["title"].startswith('https://'): |
|
used_images.add(entry["title"]) |
|
else: |
|
logging.warning(f"Skipping invalid entry in {used_images_file}: {entry}") |
|
logging.info(f"Loaded {len(used_images)} used image URLs from {used_images_file}") |
|
except Exception as e: |
|
logging.warning(f"Failed to load used images from {used_images_file}: {e}. Resetting to empty set.") |
|
used_images = set() |
|
with open(used_images_file, 'w') as f: |
|
f.write("") |
|
|
|
# Function to save used_images to file |
|
def save_used_images(): |
|
""" |
|
Save used_images to used_images.json as a JSON array, preserving timestamps. |
|
""" |
|
try: |
|
# Create entries for used_images |
|
timestamp = datetime.now(timezone.utc).isoformat() |
|
entries = [ |
|
{"title": url, "timestamp": entry.get("timestamp", timestamp)} |
|
for url, entry in [ |
|
(url, next((e for e in load_json_file(used_images_file, IMAGE_EXPIRATION_DAYS * 24) if e["title"] == url), {})) |
|
for url in used_images |
|
] |
|
] |
|
# Use save_json_file for atomic write |
|
save_json_file(used_images_file, entries) |
|
logging.info(f"Saved {len(entries)} used image URLs to {used_images_file}") |
|
except Exception as e: |
|
logging.warning(f"Failed to save used images to {used_images_file}: {e}") |
|
|
|
def reset_flickr_request_count(): |
|
global flickr_request_count, flickr_request_start_time |
|
if time.time() - flickr_request_start_time >= 3600: # Reset every hour |
|
flickr_request_count = 0 |
|
flickr_request_start_time = time.time() |
|
|
|
def process_photo(photo, search_query): |
|
tags = [tag.text.lower() for tag in photo.getTags()] |
|
title = photo.title.lower() if photo.title else "" |
|
|
|
matched_keywords = [kw for kw in exclude_keywords if kw in tags or kw in title] |
|
if matched_keywords: |
|
logging.info(f"Skipping image with unwanted keywords: {photo.id} (tags: {tags}, title: {title}, matched: {matched_keywords})") |
|
return None |
|
|
|
# Try 'Large' size first, fall back to 'Medium' if unavailable |
|
img_url = None |
|
try: |
|
img_url = photo.getPhotoFile(size_label='Large') |
|
except flickr_api.flickrerrors.FlickrError as e: |
|
logging.info(f"Large size not available for photo {photo.id}: {e}, trying Medium") |
|
try: |
|
img_url = photo.getPhotoFile(size_label='Medium') |
|
except flickr_api.flickrerrors.FlickrError as e: |
|
logging.warning(f"Medium size not available for photo {photo.id}: {e}") |
|
return None |
|
|
|
if not img_url: |
|
logging.info(f"Image URL invalid for photo {photo.id}") |
|
return None |
|
|
|
# Check if the image is highly relevant to the query |
|
query_keywords = set(search_query.lower().split()) |
|
photo_keywords = set(tags + title.split()) |
|
is_relevant = bool(query_keywords & photo_keywords) # Check if any query keyword is in tags or title |
|
|
|
# Allow reuse of highly relevant images |
|
if img_url in used_images and not is_relevant: |
|
logging.info(f"Image already used and not highly relevant for photo {photo.id}: {img_url}") |
|
return None |
|
|
|
uploader = photo.owner.username |
|
page_url = f"https://www.flickr.com/photos/{photo.owner.nsid}/{photo.id}" |
|
|
|
used_images.add(img_url) |
|
save_used_images() |
|
|
|
flickr_data = { |
|
"title": search_query, |
|
"image_url": img_url, |
|
"source": "Flickr", |
|
"uploader": uploader, |
|
"page_url": page_url, |
|
"timestamp": datetime.now(timezone.utc).isoformat() |
|
} |
|
flickr_file = "/home/shane/foodie_automator/flickr_images.json" |
|
with open(flickr_file, 'a') as f: |
|
json.dump(flickr_data, f) |
|
f.write('\n') |
|
logging.info(f"Saved Flickr image metadata to {flickr_file}: {img_url}") |
|
|
|
logging.info(f"Selected Flickr image: {img_url} by {uploader} for query '{search_query}' (tags: {tags})") |
|
return img_url, "Flickr", uploader, page_url |
|
|
|
def search_flickr(query, per_page=5): |
|
try: |
|
photos = flickr_api.Photo.search( |
|
text=query, |
|
per_page=per_page, |
|
sort='relevance', |
|
safe_search=1, |
|
media='photos', |
|
license='4,5,9,10' |
|
) |
|
return photos |
|
except Exception as e: |
|
logging.warning(f"Flickr API error for query '{query}': {e}") |
|
return [] |
|
|
|
def fetch_photo_by_id(photo_id): |
|
try: |
|
photo = flickr_api.Photo(id=photo_id) |
|
return photo |
|
except Exception as e: |
|
logging.warning(f"Failed to fetch Flickr photo ID {photo_id}: {e}") |
|
return None |
|
|
|
def search_ddg_for_flickr(query): |
|
ddg_query = f"{query} site:flickr.com" |
|
ddg_url = f"https://duckduckgo.com/?q={quote(ddg_query)}" |
|
try: |
|
response = requests.get(ddg_url, headers={'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'}, timeout=10) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
photo_ids = set() |
|
for link in soup.find_all('a', href=True): |
|
href = link['href'] |
|
match = re.search(r'flickr\.com/photos/[^/]+/(\d+)', href) |
|
if match: |
|
photo_id = match.group(1) |
|
photo_ids.add(photo_id) |
|
|
|
photo_ids = list(photo_ids)[:2] # Limit to 2 IDs |
|
logging.info(f"Found {len(photo_ids)} Flickr photo IDs via DDG: {photo_ids}") |
|
return photo_ids |
|
except Exception as e: |
|
logging.warning(f"DDG search failed for query '{ddg_query}': {e}") |
|
return set() |
|
|
|
def classify_keywords(keywords): |
|
prompt = ( |
|
"Given the following keywords from an image search query, classify each as 'specific' (e.g., brand names, unique entities like 'Taco Bell' or 'Paris') or 'generic' (e.g., common or abstract terms like 'dining' or 'trends'). " |
|
"Return a JSON object mapping each keyword to its classification.\n\n" |
|
"Keywords: " + ", ".join(keywords) + "\n\n" |
|
"Example output format (do not use these exact keywords in your response):\n" |
|
"```json\n" |
|
"{\n" |
|
" \"keyword1\": \"specific\",\n" |
|
" \"keyword2\": \"generic\"\n" |
|
"}\n```" |
|
) |
|
try: |
|
response = client.chat.completions.create( |
|
model=LIGHT_TASK_MODEL, |
|
messages=[ |
|
{"role": "system", "content": "You are a helper that classifies keywords."}, |
|
{"role": "user", "content": prompt} |
|
], |
|
max_tokens=100, |
|
temperature=0.5 |
|
) |
|
raw_response = response.choices[0].message.content |
|
json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response) |
|
if not json_match: |
|
logging.warning(f"Failed to parse keyword classification JSON: {raw_response}") |
|
return {kw: "specific" for kw in keywords} |
|
|
|
classifications = json.loads(json_match.group(1)) |
|
return classifications |
|
except Exception as e: |
|
logging.warning(f"Keyword classification failed: {e}. Defaulting to all specific.") |
|
return {kw: "specific" for kw in keywords} |
|
|
|
def get_flickr_image(search_query, relevance_keywords, main_topic, specific_term=None): |
|
global used_images |
|
logger = logging.getLogger(__name__) |
|
|
|
def process_image(image_url, source_name, page_url): |
|
try: |
|
youtube_domains = ['youtube.com', 'ytimg.com'] |
|
if any(domain in image_url.lower() or domain in page_url.lower() for domain in youtube_domains): |
|
logger.info(f"Skipping YouTube image: {image_url}") |
|
return None |
|
|
|
headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'} |
|
response = requests.get(image_url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
img = Image.open(io.BytesIO(response.content)) |
|
|
|
width, height = img.size |
|
min_dimension = 1280 |
|
if width < min_dimension and height < min_dimension: |
|
logger.info(f"Skipping low-resolution image: {image_url} ({width}x{height})") |
|
return None |
|
|
|
aspect_ratio = width / height |
|
if (0.9 <= aspect_ratio <= 1.1) or "screenshot" in image_url.lower(): |
|
logger.info(f"Skipping potential screenshot: {image_url} (aspect ratio: {aspect_ratio})") |
|
return None |
|
|
|
watermark_domains = [ |
|
'shutterstock.com', 'gettyimages.com', 'istockphoto.com', 'adobestock.com', |
|
'123rf.com', 'dreamstime.com', 'alamy.com', 'stock.adobe.com', 'bigstockphoto.com', |
|
'depositphotos.com', 'fotolia.com', 'canstockphoto.com', 'stockfresh.com', |
|
'featurepics.com', 'stockvault.net', 'stockfreeimages.com', 'freeimages.com', |
|
'freepik.com', 'vecteezy.com', 'pikwizard.com', 'stockunlimited.com', |
|
'stockphoto.com', 'stockphotosecrets.com', 'stockphotopro.com', 'stockphotosite.com', |
|
'stockphotographysite.com', 'stockphotographysites.com', 'stockphotographysites.net', |
|
'stockphotographysites.org', 'stockphotographysites.info', 'stockphotographysites.biz', 'focusedcollection.com' |
|
] |
|
if any(domain in image_url.lower() or domain in page_url.lower() for domain in watermark_domains): |
|
logger.info(f"Skipping image from stock photo site (potential watermark): {image_url}") |
|
return None |
|
|
|
# Convert to grayscale |
|
img = img.convert("L") |
|
# Increase contrast |
|
enhancer = ImageEnhance.Contrast(img) |
|
img = enhancer.enhance(2) |
|
# Optional: sharpen |
|
img = img.filter(ImageFilter.SHARPEN) |
|
|
|
text = pytesseract.image_to_string(img).strip().lower() |
|
logger.info(f"OCR text for {image_url}: '{text}' (word count: {len(text.split())})") |
|
word_count = len(text.split()) |
|
if word_count > 5: |
|
logger.info(f"Skipping image with too much text: {image_url} ({word_count} words)") |
|
return None |
|
|
|
if image_url in used_images: |
|
logger.info(f"Image already used: {image_url}") |
|
return None |
|
|
|
used_images.add(image_url) |
|
save_used_images() |
|
uploader = "Unknown" |
|
logger.info(f"Selected image: {image_url} from {source_name} ({width}x{height})") |
|
return image_url, source_name, uploader, page_url |
|
except Exception as e: |
|
logger.warning(f"Failed to process image {image_url}: {e}") |
|
return None |
|
|
|
ddg_query = f"{search_query} license:public domain" |
|
logger.info(f"Searching DDG with query: '{ddg_query}'") |
|
try: |
|
with DDGS() as ddgs: |
|
results = ddgs.images(ddg_query, safesearch="on", max_results=20) |
|
prioritized_results = [] |
|
other_results = [] |
|
for result in results: |
|
image_url = result.get("image") |
|
page_url = result.get("url") |
|
source_match = re.search(r'https?://(?:www\.)?([^/]+)', page_url) |
|
if source_match: |
|
domain = source_match.group(1) |
|
source_name = domain.rsplit('.', 1)[0].capitalize() |
|
else: |
|
source_name = "Public Domain" |
|
|
|
if not image_url or not image_url.endswith(('.jpg', '.jpeg', '.png')): |
|
continue |
|
|
|
image_metadata = f"{result.get('title', '').lower()} {page_url.lower()}" |
|
if specific_term and specific_term.lower() in image_metadata: |
|
prioritized_results.append((image_url, source_name, page_url)) |
|
else: |
|
other_results.append((image_url, source_name, page_url)) |
|
|
|
for image_url, source_name, page_url in prioritized_results + other_results: |
|
result = process_image(image_url, source_name, page_url) |
|
if result: |
|
return result |
|
except Exception as e: |
|
logger.warning(f"DDG search failed for '{ddg_query}': {e}") |
|
|
|
logger.info(f"No valid DDG images, falling back to Pixabay for '{search_query}'") |
|
image_url, source_name, uploader, page_url = get_image(search_query, specific_term) |
|
if image_url: |
|
used_images.add(image_url) |
|
save_used_images() |
|
logger.info(f"Selected Pixabay image: {image_url}") |
|
return image_url, source_name, uploader, page_url |
|
|
|
logger.warning(f"No valid images found for query '{search_query}'") |
|
return None, None, None, None |
|
|
|
def get_image(search_query, specific_term=None): |
|
headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'} |
|
|
|
def process_image(image_url, source_name, page_url): |
|
"""Helper to process Pixabay images for watermarks and resolution.""" |
|
try: |
|
response = requests.get(image_url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
img = Image.open(io.BytesIO(response.content)) |
|
|
|
# Check resolution |
|
width, height = img.size |
|
min_dimension = 1280 |
|
if width < min_dimension and height < min_dimension: |
|
logger.info(f"Skipping low-resolution Pixabay image: {image_url} ({width}x{height})") |
|
return None |
|
|
|
# Check for watermarks via OCR |
|
text = pytesseract.image_to_string(img).strip().lower() |
|
watermark_phrases = [ |
|
'shutterstock', 'getty images', 'istock', 'adobe stock', 'watermark', |
|
'123rf', 'dreamstime', 'alamy', 'preview', 'stock photo' |
|
] |
|
if any(phrase in text for phrase in watermark_phrases): |
|
logger.info(f"Skipping watermarked Pixabay image: {image_url} (detected: {text})") |
|
return None |
|
|
|
word_count = len(text.split()) |
|
if word_count > 5: |
|
logger.info(f"Skipping Pixabay image with too much text: {image_url} ({word_count} words)") |
|
return None |
|
|
|
return image_url, source_name, page_url, width, height |
|
except Exception as e: |
|
logger.warning(f"Failed to process Pixabay image {image_url}: {e}") |
|
return None |
|
|
|
def fetch_pixabay_image(query): |
|
try: |
|
pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(query)}&image_type=photo&per_page=20" |
|
response = requests.get(pixabay_url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
for hit in data.get('hits', []): |
|
img_url = hit.get('largeImageURL') |
|
if not img_url or img_url in used_images: |
|
continue |
|
|
|
uploader = hit.get('user', 'Unknown') |
|
page_url = hit.get('pageURL', img_url) |
|
|
|
# Process the image for watermarks and resolution |
|
result = process_image(img_url, "Pixabay", page_url) |
|
if result: |
|
image_url, source_name, page_url, width, height = result |
|
used_images.add(img_url) |
|
save_used_images() |
|
logger.info(f"Selected Pixabay image: {img_url} by {uploader} for query '{query}' ({width}x{height})") |
|
return image_url, source_name, uploader, page_url |
|
|
|
logger.info(f"No valid Pixabay image found for query '{query}'. Trying fallback query.") |
|
return None, None, None, None |
|
|
|
except Exception as e: |
|
logger.warning(f"Pixabay image fetch failed for query '{query}': {e}") |
|
return None, None, None, None |
|
|
|
# Try with the original query |
|
image_url, source_name, uploader, page_url = fetch_pixabay_image(search_query) |
|
if image_url: |
|
return image_url, source_name, uploader, page_url |
|
|
|
# Fallback to a dynamic query using the specific term if provided |
|
if specific_term: |
|
fallback_query = f"{specific_term} dining trends" |
|
image_url, source_name, uploader, page_url = fetch_pixabay_image(fallback_query) |
|
if image_url: |
|
return image_url, source_name, uploader, page_url |
|
|
|
# Final fallback to a generic query |
|
fallback_query = "food dining trends" |
|
image_url, source_name, uploader, page_url = fetch_pixabay_image(fallback_query) |
|
if image_url: |
|
return image_url, source_name, uploader, page_url |
|
|
|
logger.error(f"All image fetch attempts failed for query '{search_query}'. Returning None.") |
|
return None, None, None, None |
|
|
|
def fetch_pixabay_image(query): |
|
try: |
|
pixabay_url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={quote(query)}&image_type=photo&per_page=20" |
|
response = requests.get(pixabay_url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
for hit in data.get('hits', []): |
|
img_url = hit.get('largeImageURL') |
|
if not img_url or img_url in used_images: |
|
continue |
|
|
|
uploader = hit.get('user', 'Unknown') |
|
page_url = hit.get('pageURL', img_url) |
|
|
|
# Process the image for watermarks and resolution |
|
result = process_image(img_url, "Pixabay", page_url) |
|
if result: |
|
image_url, source_name, page_url, width, height = result |
|
used_images.add(img_url) |
|
save_used_images() |
|
logger.info(f"Selected Pixabay image: {img_url} by {uploader} for query '{query}' ({width}x{height})") |
|
return image_url, source_name, uploader, page_url |
|
|
|
logger.info(f"No valid Pixabay image found for query '{query}'. Trying fallback query.") |
|
return None, None, None, None |
|
|
|
except Exception as e: |
|
logger.warning(f"Pixabay image fetch failed for query '{query}': {e}") |
|
return None, None, None, None |
|
|
|
# Try with the original query |
|
image_url, source_name, uploader, page_url = fetch_pixabay_image(search_query) |
|
if image_url: |
|
return image_url, source_name, uploader, page_url |
|
|
|
# Fallback to a generic query |
|
fallback_query = "food dining" |
|
image_url, source_name, uploader, page_url = fetch_pixabay_image(fallback_query) |
|
if image_url: |
|
return image_url, source_name, uploader, page_url |
|
|
|
logger.error(f"All image fetch attempts failed for query '{search_query}'. Returning None.") |
|
return None, None, None, None |
|
|
|
def select_best_author(content, interest_score): |
|
try: |
|
best_score = -1 |
|
best_author = None |
|
for author in AUTHORS: |
|
persona = PERSONA_CONFIGS.get(author["username"], {}) |
|
prompt = persona.get("prompt", "") |
|
current_score = interest_score |
|
if "trend" in prompt.lower(): |
|
current_score += 2 |
|
elif "recipe" in prompt.lower(): |
|
current_score += 1 |
|
|
|
if current_score > best_score: |
|
best_score = current_score |
|
best_author = author["username"] |
|
|
|
if not best_author: |
|
best_author = random.choice([author["username"] for author in AUTHORS]) |
|
|
|
logging.info(f"Selected author: {best_author} with adjusted score: {best_score}") |
|
return best_author |
|
except Exception as e: |
|
logging.error(f"Error in select_best_author: {e}") |
|
return random.choice([author["username"] for author in AUTHORS]) |
|
|
|
def get_next_author_round_robin(): |
|
""" |
|
Select the next author using round-robin, respecting real-time X API rate limits. |
|
Persists the last selected author index to ensure fair rotation across runs. |
|
Returns an author dict or None if no authors are available. |
|
""" |
|
logger = logging.getLogger(__name__) |
|
state_file = '/home/shane/foodie_automator/author_state.json' |
|
|
|
# Load or initialize state |
|
state = load_json_file(state_file, default={'last_author_index': -1}) |
|
last_index = state.get('last_author_index', -1) |
|
|
|
# Try each author, starting from the next one after last_index |
|
for i in range(len(AUTHORS)): |
|
index = (last_index + 1 + i) % len(AUTHORS) |
|
author = AUTHORS[index] |
|
username = author['username'] |
|
can_post, remaining, reset = check_author_rate_limit(author) |
|
if can_post: |
|
# Update state with the selected author index |
|
state['last_author_index'] = index |
|
save_json_file(state_file, state) |
|
logger.info(f"Selected author {username} with {remaining}/17 tweets remaining") |
|
return author |
|
else: |
|
reset_time = datetime.fromtimestamp(reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') |
|
logger.info(f"Author {username} is rate-limited. Remaining: {remaining}, Reset at: {reset_time}") |
|
|
|
logger.warning("No authors available due to tweet rate limits.") |
|
return None |
|
|
|
def send_account_lock_alert(username, error_message): |
|
"""Send email alert for account lockout.""" |
|
try: |
|
import smtplib |
|
from email.mime.text import MIMEText |
|
from email.mime.multipart import MIMEMultipart |
|
from foodie_config import EMAIL_CONFIG # Add this to your config file |
|
|
|
msg = MIMEMultipart() |
|
msg['From'] = EMAIL_CONFIG['from_email'] |
|
msg['To'] = EMAIL_CONFIG['to_email'] |
|
msg['Subject'] = f"🚨 X Account Lock Alert: {username}" |
|
|
|
body = f""" |
|
X Account Lock Alert! |
|
|
|
Username: {username} |
|
Time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')} |
|
Error: {error_message} |
|
|
|
Action Required: |
|
1. Visit https://twitter.com |
|
2. Log in to {username} |
|
3. Complete verification if prompted |
|
4. Unlock the account |
|
|
|
This is an automated alert from your foodie_automator system. |
|
""" |
|
|
|
msg.attach(MIMEText(body, 'plain')) |
|
|
|
with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server: |
|
server.starttls() |
|
server.login(EMAIL_CONFIG['smtp_username'], EMAIL_CONFIG['smtp_password']) |
|
server.send_message(msg) |
|
|
|
logger.info(f"Sent account lock alert email for {username}") |
|
except Exception as e: |
|
logger.error(f"Failed to send account lock alert email: {e}") |
|
|
|
def get_x_rate_limit_status(author): |
|
""" |
|
Check the X API Free tier rate limit by posting a test tweet. |
|
Returns (remaining, reset) based on app-level or user-level 24-hour headers. |
|
Returns (None, None) if the check fails. |
|
""" |
|
username = author['username'] |
|
credentials = X_API_CREDENTIALS.get(username) |
|
if not credentials: |
|
logger.error(f"No X API credentials found for {username}") |
|
return None, None |
|
|
|
oauth = OAuth1( |
|
client_key=credentials['api_key'], |
|
client_secret=credentials['api_secret'], |
|
resource_owner_key=credentials['access_token'], |
|
resource_owner_secret=credentials['access_token_secret'] |
|
) |
|
url = 'https://api.x.com/2/tweets' |
|
payload = {'text': f'Test tweet to check rate limits for {username} - please ignore {int(time.time())}'} |
|
|
|
# Add delay to avoid IP-based rate limiting |
|
logger.info(f"Waiting 5 seconds before attempting to post for {username}") |
|
time.sleep(5) |
|
|
|
try: |
|
response = requests.post(url, json=payload, auth=oauth) |
|
headers = response.headers |
|
logger.debug(f"Rate limit headers for {username}: {headers}") |
|
|
|
# Initialize defaults |
|
remaining = None |
|
reset = None |
|
current_time = int(time.time()) |
|
|
|
if response.status_code == 201: |
|
# Extract app-level 24-hour limits |
|
remaining_str = headers.get('x-app-limit-24hour-remaining') |
|
reset_str = headers.get('x-app-limit-24hour-reset') |
|
if remaining_str is None or reset_str is None: |
|
logger.error(f"App 24-hour limit headers missing for {username}: {headers}") |
|
return None, None |
|
elif response.status_code == 429: |
|
# Extract user-level 24-hour limits for rate limit exceeded |
|
remaining_str = headers.get('x-user-limit-24hour-remaining') |
|
reset_str = headers.get('x-user-limit-24hour-reset') |
|
if remaining_str is None or reset_str is None: |
|
logger.error(f"User 24-hour limit headers missing for {username}: {headers}") |
|
return None, None |
|
logger.info(f"Rate limit exceeded for {username}") |
|
elif response.status_code == 403: |
|
error_data = response.json() |
|
error_message = error_data.get('detail', '') |
|
if "account is temporarily locked" in error_message.lower(): |
|
logger.error(f"Account lock detected for {username}: {error_message}") |
|
send_account_lock_alert(username, error_message) |
|
else: |
|
logger.error(f"Unexpected 403 response for {username}: {error_message}") |
|
return None, None |
|
else: |
|
logger.error(f"Unexpected response for {username}: {response.status_code} - {response.text}") |
|
return None, None |
|
|
|
# Parse headers |
|
try: |
|
remaining = int(remaining_str) |
|
reset = int(reset_str) |
|
except ValueError as e: |
|
logger.error(f"Failed to parse rate limit headers for {username}: remaining={remaining_str}, reset={reset_str}, error={e}") |
|
return None, None |
|
|
|
# Validate remaining tweets |
|
if remaining < 0 or remaining > 17: # Free tier max is 17 |
|
logger.warning(f"Invalid remaining tweets for {username}: {remaining}. Capping at 17.") |
|
remaining = min(remaining, 17) |
|
|
|
# Ensure reset is in the future |
|
if reset <= current_time or reset > current_time + 2 * 86400: # Allow up to 48 hours |
|
logger.warning(f"Invalid reset time {reset} ({datetime.fromtimestamp(reset, tz=timezone.utc)}) for {username}. Setting to 24 hours from now.") |
|
reset = current_time + 86400 # 24 hours |
|
|
|
if response.status_code == 201: |
|
# Delete the test tweet |
|
tweet_id = response.json().get('data', {}).get('id') |
|
if tweet_id: |
|
delete_url = f'https://api.x.com/2/tweets/{tweet_id}' |
|
delete_response = requests.delete(delete_url, auth=oauth) |
|
if delete_response.status_code == 200: |
|
logger.info(f"Successfully deleted test tweet {tweet_id} for {username}") |
|
else: |
|
logger.warning(f"Failed to delete test tweet {tweet_id} for {username}: {delete_response.status_code} - {delete_response.text}") |
|
|
|
logger.info(f"Rate limit for {username}: {remaining} remaining, reset at {datetime.fromtimestamp(reset, tz=timezone.utc)}") |
|
return remaining, reset |
|
|
|
except Exception as e: |
|
logger.error(f"Unexpected error fetching X rate limit for {username}: {e}", exc_info=True) |
|
return None, None |
|
|
|
def update_system_activity(script_name, status, pid=None): |
|
""" |
|
Record or update a script's activity in system_activity.json. |
|
Args: |
|
script_name (str): Name of the script (e.g., 'foodie_engagement_tweet'). |
|
status (str): 'running' or 'stopped'. |
|
pid (int): Process ID (required for 'running', optional for 'stopped'). |
|
""" |
|
activity_file = "/home/shane/foodie_automator/system_activity.json" |
|
try: |
|
# Load existing activities |
|
activities = load_json_file(activity_file, default=[]) |
|
|
|
# Update or add entry |
|
timestamp = datetime.now(timezone.utc).isoformat() |
|
entry = { |
|
"script_name": script_name, |
|
"pid": pid if status == "running" else None, |
|
"start_time": timestamp if status == "running" else None, |
|
"stop_time": timestamp if status == "stopped" else None, |
|
"status": status |
|
} |
|
|
|
# Find existing entry for this script |
|
for i, act in enumerate(activities): |
|
if act["script_name"] == script_name and act["status"] == "running": |
|
if status == "stopped": |
|
activities[i]["status"] = "stopped" |
|
activities[i]["stop_time"] = timestamp |
|
activities[i]["pid"] = None |
|
break |
|
else: |
|
# No running entry found, append new entry |
|
if status == "running": |
|
activities.append(entry) |
|
|
|
# Save updated activities |
|
save_json_file(activity_file, activities) |
|
logger.info(f"Updated system activity: {script_name} is {status}") |
|
except Exception as e: |
|
logger.error(f"Failed to update system_activity.json for {script_name}: {e}") |
|
|
|
def prune_system_activity(tweet_reset_time): |
|
""" |
|
Prune system_activity.json entries older than 24 hours, aligned with tweet reset time. |
|
Args: |
|
tweet_reset_time (float): Unix timestamp of the tweet quota reset. |
|
""" |
|
activity_file = "/home/shane/foodie_automator/system_activity.json" |
|
try: |
|
activities = load_json_file(activity_file, default=[]) |
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=24) |
|
pruned_activities = [] |
|
|
|
for entry in activities: |
|
# Use start_time or stop_time for pruning |
|
time_str = entry.get("stop_time") or entry.get("start_time") |
|
if not time_str: |
|
continue |
|
try: |
|
entry_time = datetime.fromisoformat(time_str) |
|
if entry_time > cutoff: |
|
pruned_activities.append(entry) |
|
except ValueError: |
|
logger.warning(f"Invalid timestamp in system_activity.json: {time_str}") |
|
continue |
|
|
|
save_json_file(activity_file, pruned_activities) |
|
logger.info(f"Pruned system_activity.json to {len(pruned_activities)} entries") |
|
except Exception as e: |
|
logger.error(f"Failed to prune system_activity.json: {e}") |
|
|
|
def is_any_script_running(): |
|
""" |
|
Check if any script is running by inspecting system_activity.json and verifying PIDs. |
|
Returns True if at least one script (other than the current process) is running, False otherwise. |
|
""" |
|
activity_file = "/home/shane/foodie_automator/system_activity.json" |
|
current_pid = os.getpid() |
|
try: |
|
activities = load_json_file(activity_file, default=[]) |
|
logging.debug(f"[DEBUG] system_activity.json contents: {activities}") |
|
for entry in activities: |
|
logging.debug(f"[DEBUG] Checking entry: {entry}") |
|
if entry.get("status") == "running" and entry.get("pid") and entry.get("pid") != current_pid: |
|
try: |
|
import psutil |
|
process = psutil.Process(entry["pid"]) |
|
if process.is_running(): |
|
logging.debug(f"[DEBUG] Found running process: {entry['pid']}") |
|
return True |
|
except Exception as e: |
|
logging.debug(f"[DEBUG] Exception checking process: {e}") |
|
continue |
|
logging.debug("[DEBUG] No running scripts found.") |
|
return False |
|
except Exception as e: |
|
logging.error(f"Failed to check system_activity.json: {e}") |
|
return False |
|
|
|
def initialize_rate_limit_info(): |
|
""" |
|
Initialize rate_limit_info.json with proper structure for all authors. |
|
""" |
|
rate_limit_file = '/home/shane/foodie_automator/rate_limit_info.json' |
|
current_time = time.time() |
|
tweet_window_seconds = 86400 # 24 hours |
|
|
|
# Initialize with all authors |
|
rate_limit_info = {} |
|
for author in AUTHORS: |
|
username = author['username'] |
|
rate_limit_info[username] = { |
|
'tweet_remaining': 17, # Free tier max |
|
'tweet_reset': current_time + tweet_window_seconds, |
|
'tweets_posted_in_run': 0 |
|
} |
|
|
|
# Save the initialized data |
|
save_json_file(rate_limit_file, rate_limit_info) |
|
logger.info(f"Initialized rate_limit_info.json with {len(rate_limit_info)} authors") |
|
return rate_limit_info |
|
|
|
def check_author_rate_limit(author, max_tweets=17, tweet_window_seconds=86400): |
|
""" |
|
Check if an author can post based on their X API Free tier quota (17 tweets per 24 hours per user). |
|
Uses system_activity.json to determine if test tweets are needed. |
|
Returns (can_post, remaining, reset_timestamp) where can_post is True if tweets are available. |
|
""" |
|
rate_limit_file = '/home/shane/foodie_automator/rate_limit_info.json' |
|
current_time = time.time() |
|
|
|
# Load rate limit info |
|
rate_limit_info = load_json_file(rate_limit_file, default={}) |
|
username = author['username'] |
|
|
|
# Initialize author entry if missing or if file is empty |
|
if not rate_limit_info or username not in rate_limit_info: |
|
rate_limit_info = initialize_rate_limit_info() |
|
|
|
author_info = rate_limit_info[username] |
|
|
|
# Prune system_activity.json using the tweet reset time |
|
reset_time = author_info.get('tweet_reset', current_time + tweet_window_seconds) |
|
prune_system_activity(reset_time) |
|
|
|
# Debug: log is_any_script_running() |
|
logger.debug(f"[DEBUG] is_any_script_running() = {is_any_script_running()}") |
|
|
|
# Check if any script is running |
|
if is_any_script_running(): |
|
# At least one script is running, trust rate_limit_info.json |
|
logger.info(f"At least one script is running, using stored rate limit info for {username}") |
|
remaining = author_info.get('tweet_remaining', max_tweets) |
|
reset = author_info.get('tweet_reset', current_time + tweet_window_seconds) |
|
|
|
# Check if reset time has passed |
|
if current_time >= reset: |
|
logger.info(f"Reset time passed for {username}, resetting quota") |
|
remaining = max_tweets |
|
reset = current_time + tweet_window_seconds |
|
author_info['tweet_remaining'] = remaining |
|
author_info['tweet_reset'] = reset |
|
author_info['tweets_posted_in_run'] = 0 |
|
rate_limit_info[username] = author_info |
|
save_json_file(rate_limit_file, rate_limit_info) |
|
|
|
# Adjust for tweets posted in this run |
|
remaining = remaining - author_info.get('tweets_posted_in_run', 0) |
|
else: |
|
# No scripts are running, post test tweet to sync quota |
|
logger.debug(f"[DEBUG] NO scripts running, will call get_x_rate_limit_status for {username}") |
|
remaining, api_reset = get_x_rate_limit_status(author) |
|
logger.debug(f"[DEBUG] API returned: remaining={remaining}, api_reset={api_reset} for {username}") |
|
|
|
if remaining is None or api_reset is None: |
|
# If API call fails, use the stored rate limit info |
|
remaining = author_info.get('tweet_remaining', 0) |
|
reset = author_info.get('tweet_reset', current_time + tweet_window_seconds) |
|
|
|
# If reset time has passed, assume quota is exhausted |
|
if current_time >= reset: |
|
remaining = 0 |
|
reset = current_time + tweet_window_seconds |
|
logger.warning(f"Reset time passed and API check failed for {username}, assuming quota exhausted") |
|
else: |
|
logger.warning(f"API check failed for {username}, using stored quota: {remaining} remaining") |
|
else: |
|
# API call succeeded, update with actual values |
|
remaining = min(remaining, max_tweets) # Ensure within Free tier limit |
|
reset = api_reset |
|
logger.info(f"Updated rate limit info from API for {username}: {remaining} remaining") |
|
|
|
# Update author info |
|
author_info['tweet_remaining'] = remaining |
|
author_info['tweet_reset'] = reset |
|
# Don't reset tweets_posted_in_run here |
|
rate_limit_info[username] = author_info |
|
save_json_file(rate_limit_file, rate_limit_info) |
|
|
|
# Validate remaining tweets |
|
if remaining < 0: |
|
logger.warning(f"Negative remaining tweets for {username}: {remaining}. Setting to 0.") |
|
remaining = 0 |
|
# Always cap at zero |
|
remaining = max(0, remaining) |
|
|
|
can_post = remaining > 0 |
|
if not can_post: |
|
reset_time_dt = datetime.fromtimestamp(reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') |
|
logger.info(f"Author {username} quota exhausted. Remaining: {remaining}, Reset at: {reset_time_dt}") |
|
else: |
|
logger.info(f"Quota for {username}: {remaining}/{max_tweets} tweets remaining") |
|
|
|
return can_post, remaining, reset |
|
|
|
def prepare_post_data(summary, title, main_topic=None): |
|
try: |
|
logging.info(f"Preparing post data for summary: {summary[:100]}...") |
|
|
|
# Use the original generate_title_from_summary function to generate the title |
|
new_title = generate_title_from_summary(summary) |
|
if not new_title: |
|
logging.warning("Title generation failed, using fallback title") |
|
new_title = "A Tasty Food Discovery Awaits You" |
|
logging.info(f"Generated new title: '{new_title}'") |
|
|
|
# Update to unpack four values |
|
search_query, relevance_keywords, generated_main_topic, skip_flag = smart_image_and_filter(new_title, summary) |
|
if skip_flag: |
|
logging.info("Summary filtered out during post preparation") |
|
return None, None, None, None, None, None, None |
|
|
|
# Use the provided main_topic if available, otherwise use the generated one |
|
effective_main_topic = main_topic if main_topic else generated_main_topic |
|
|
|
image_url, image_source, uploader, page_url = get_flickr_image(search_query, relevance_keywords, effective_main_topic) |
|
if not image_url: |
|
image_url, image_source, uploader, page_url = get_image(search_query) |
|
|
|
if not image_url: |
|
logging.warning("No image found for post, skipping") |
|
return None, None, None, None, None, None, None |
|
|
|
# Select a full author dictionary from AUTHORS (already imported from foodie_config) |
|
author = random.choice(AUTHORS) |
|
|
|
categories = ["Buzz", "Trends", "Lifestyle", "Culture", "Health", "Drink", "Food", "Eats"] |
|
category = random.choice(categories) |
|
|
|
post_data = { |
|
"title": new_title, |
|
"content": summary, |
|
"status": "publish", |
|
"author": author["username"], # Use the username in post_data |
|
"categories": [category] |
|
} |
|
|
|
logging.info(f"Post data prepared: Title: '{new_title}', Category: {category}, Author: {author['username']}") |
|
return post_data, author, category, image_url, image_source, uploader, page_url |
|
|
|
except Exception as e: |
|
logging.error(f"Failed to prepare post data: {e}") |
|
return None, None, None, None, None, None, None |
|
|
|
|
|
def save_post_to_recent(post_title, post_url, author_username, timestamp): |
|
"""Save a post to recent_posts.json, maintaining a JSON array.""" |
|
try: |
|
recent_posts = load_json_file(RECENT_POSTS_FILE, expiration_hours=24) |
|
# Check for duplicates before appending |
|
entry = { |
|
"title": post_title, |
|
"url": post_url, |
|
"author_username": author_username, |
|
"timestamp": timestamp |
|
} |
|
key = (post_title, post_url, author_username) |
|
if any((p["title"], p["url"], p["author_username"]) == key for p in recent_posts): |
|
logging.debug(f"Skipping duplicate post: {post_title}") |
|
return |
|
recent_posts.append(entry) |
|
with open(RECENT_POSTS_FILE, 'w') as f: |
|
json.dump(recent_posts, f, indent=2) |
|
logging.info(f"Saved post '{post_title}' to {RECENT_POSTS_FILE}") |
|
except Exception as e: |
|
logging.error(f"Failed to save post to {RECENT_POSTS_FILE}: {e}") |
|
|
|
def prune_recent_posts(): |
|
"""Prune recent_posts.json to keep entries within the last 24 hours.""" |
|
try: |
|
recent_posts = load_json_file(RECENT_POSTS_FILE, expiration_hours=24) |
|
with open(RECENT_POSTS_FILE, 'w') as f: |
|
json.dump(recent_posts, f, indent=2) |
|
logging.info(f"Pruned {RECENT_POSTS_FILE} to {len(recent_posts)} entries") |
|
except Exception as e: |
|
logging.error(f"Failed to prune {RECENT_POSTS_FILE}: {e}") |