merge posting x into main files

This commit is contained in:
2025-04-28 21:23:12 +10:00
parent a1d2ce4215
commit ea7d36a22b
7 changed files with 394 additions and 446 deletions
+114 -187
View File
@@ -11,7 +11,6 @@ import tempfile
import requests
import time
from dotenv import load_dotenv
import os
from datetime import datetime, timezone, timedelta
from openai import OpenAI
from urllib.parse import quote
@@ -19,10 +18,12 @@ from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import tweepy
from foodie_config import (
RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, SUMMARY_PERSONA_PROMPTS,
get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL
RECIPE_KEYWORDS, PROMO_KEYWORDS, HOME_KEYWORDS, PRODUCT_KEYWORDS, PERSONA_CONFIGS,
get_clean_source_name, AUTHORS, LIGHT_TASK_MODEL, SUMMARY_MODEL, X_API_CREDENTIALS
)
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
@@ -43,7 +44,7 @@ def load_json_file(filename, expiration_days=None):
except json.JSONDecodeError as e:
logging.warning(f"Skipping invalid JSON line in {filename} at line {i}: {e}")
if expiration_days:
cutoff = (datetime.now() - timedelta(days=expiration_days)).isoformat()
cutoff = (datetime.now(timezone.utc) - timedelta(days=expiration_days)).isoformat()
data = [entry for entry in data if entry["timestamp"] > cutoff]
logging.info(f"Loaded {len(data)} entries from {filename}, {len(data)} valid after expiration check")
except Exception as e:
@@ -70,6 +71,95 @@ def save_json_file(filename, key, value):
except Exception as e:
logging.error(f"Failed to save or prune {filename}: {e}")
def load_post_counts():
counts = load_json_file('/home/shane/foodie_automator/x_post_counts.json')
if not counts:
counts = [{
"username": author["username"],
"month": datetime.now(timezone.utc).strftime("%Y-%m"),
"monthly_count": 0,
"day": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"daily_count": 0
} for author in AUTHORS]
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
current_day = datetime.now(timezone.utc).strftime("%Y-%m-%d")
for entry in counts:
if entry["month"] != current_month:
entry["month"] = current_month
entry["monthly_count"] = 0
if entry["day"] != current_day:
entry["day"] = current_day
entry["daily_count"] = 0
return counts
def save_post_counts(counts):
with open('/home/shane/foodie_automator/x_post_counts.json', 'w') as f:
for item in counts:
json.dump(item, f)
f.write('\n')
logging.info("Saved post counts to x_post_counts.json")
def generate_article_tweet(author, post, persona):
persona_config = PERSONA_CONFIGS[persona]
base_prompt = persona_config["x_prompt"].format(
description=persona_config["description"],
tone=persona_config["tone"]
)
prompt = base_prompt.replace(
"For article tweets, include the article title, a quirky hook, and the URL.",
f"Generate an article tweet including the title '{post['title']}', a quirky hook, and the URL '{post['url']}'."
)
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": f"Generate tweet for {post['title']}."}
],
max_tokens=100,
temperature=0.9
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.info(f"Generated article tweet for {author['username']}: {tweet}")
return tweet
except Exception as e:
logging.error(f"Failed to generate article tweet for {author['username']}: {e}")
return f"This trend is fire! Check out {post['title']} at {post['url']} #Foodie"
def post_tweet(author, tweet):
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return False
post_counts = load_post_counts()
author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None)
if author_count["monthly_count"] >= 500:
logging.warning(f"Monthly post limit (500) reached for {author['username']}")
return False
if author_count["daily_count"] >= 20:
logging.warning(f"Daily post limit (20) reached for {author['username']}")
return False
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
response = client.create_tweet(text=tweet)
author_count["monthly_count"] += 1
author_count["daily_count"] += 1
save_post_counts(post_counts)
logging.info(f"Posted tweet for {author['username']}: {tweet}")
return True
except Exception as e:
logging.error(f"Failed to post tweet for {author['username']}: {e}")
return False
def select_best_persona(interest_score, content=""):
logging.info("Using select_best_persona with interest_score and content")
personas = ["Visionary Editor", "Foodie Critic", "Trend Scout", "Culture Connoisseur"]
@@ -206,7 +296,6 @@ def smart_image_and_filter(title, summary):
raw_result = response.choices[0].message.content.strip()
logging.info(f"Raw GPT smart image/filter response: '{raw_result}'")
# Clean and parse JSON
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip()
try:
result = json.loads(cleaned_result)
@@ -339,14 +428,12 @@ def generate_title_from_summary(summary):
def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_prompt=""):
try:
persona = select_best_persona(interest_score, content)
# Access the persona configuration
persona_config = SUMMARY_PERSONA_PROMPTS.get(persona, {
"prompt": "Write a concise, engaging summary that captures the essence of the content for food lovers.",
persona_config = PERSONA_CONFIGS.get(persona, {
"article_prompt": "Write a concise, engaging summary that captures the essence of the content for food lovers.",
"description": "a generic food writer",
"tone": "an engaging tone"
})
# Format the prompt using description and tone
prompt = persona_config["prompt"].format(
prompt = persona_config["article_prompt"].format(
description=persona_config["description"],
tone=persona_config["tone"],
num_paragraphs=determine_paragraph_count(interest_score)
@@ -379,166 +466,7 @@ def summarize_with_gpt4o(content, source_name, link, interest_score=0, extra_pro
logging.error(f"Summary generation failed with model {SUMMARY_MODEL}: {e}")
return None
def smart_image_and_filter(title, summary):
try:
content = f"{title}\n\n{summary}"
prompt = (
'Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) '
'for an image search about food industry trends or viral content. Prioritize specific terms if present, '
'otherwise focus on the main theme. '
'Return "SKIP" if the article is about home appliances, recipes, promotions, or contains "homemade", else "KEEP". '
'Return as JSON with double quotes: {"image_query": "specific term", "relevance": ["keyword1", "keyword2"], "action": "KEEP" or "SKIP"}'
)
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": content}
],
max_tokens=100
)
raw_result = response.choices[0].message.content.strip()
logging.info(f"Raw GPT smart image/filter response: '{raw_result}'")
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip()
try:
result = json.loads(cleaned_result)
except json.JSONDecodeError as e:
logging.warning(f"JSON parsing failed: {e}, raw: '{cleaned_result}'. Using fallback.")
return "food trends", ["cuisine", "dining"], False
if not isinstance(result, dict) or "image_query" not in result or "relevance" not in result or "action" not in result:
logging.warning(f"Invalid GPT response format: {result}, using fallback")
return "food trends", ["cuisine", "dining"], False
image_query = result["image_query"]
relevance_keywords = result["relevance"]
skip_flag = result["action"] == "SKIP" or "homemade" in title.lower() or "homemade" in summary.lower()
logging.info(f"Smart image query: {image_query}, Relevance: {relevance_keywords}, Skip: {skip_flag}")
if not image_query or len(image_query.split()) < 2:
logging.warning(f"Image query '{image_query}' too vague, using fallback")
return "food trends", ["cuisine", "dining"], skip_flag
return image_query, relevance_keywords, skip_flag
except Exception as e:
logging.error(f"Smart image/filter failed: {e}, using fallback")
return "food trends", ["cuisine", "dining"], False
def is_interesting(summary):
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": (
"Rate this content from 0-10 based on its rarity, buzzworthiness, and engagement potential for food lovers, covering a wide range of food topics (skip recipes). "
"Score 8-10 for rare, highly shareable ideas that grab attention. "
"Score 5-7 for fresh, engaging updates with broad appeal. Score below 5 for common or unremarkable content. "
"Return only a number."
)},
{"role": "user", "content": f"Content: {summary}"}
],
max_tokens=5
)
raw_score = response.choices[0].message.content.strip()
score = int(raw_score) if raw_score.isdigit() else 0
print(f"Interest Score for '{summary[:50]}...': {score} (raw: {raw_score})")
logging.info(f"Interest Score: {score} (raw: {raw_score})")
return score
except Exception as e:
logging.error(f"Interestingness scoring failed with model {LIGHT_TASK_MODEL}: {e}")
print(f"Interest Error: {e}")
return 0
def select_paragraphs(paragraphs, target_count, persona, original_content):
"""Select or generate paragraphs to match target_count, preserving key content."""
if len(paragraphs) == target_count and all(60 <= len(p.split()) <= 80 for p in paragraphs):
return paragraphs
# Score paragraphs by food-related keywords
keywords = ["food", "dish", "trend", "menu", "cuisine", "flavor", "taste", "eat", "dining", "restaurant"]
scores = []
for para in paragraphs:
score = sum(para.lower().count(kw) for kw in keywords)
word_count = len(para.split())
# Penalize paragraphs outside word range
score -= abs(word_count - 70) # Favor ~70 words
scores.append(score)
# Handle too many paragraphs
if len(paragraphs) > target_count:
# Keep last paragraph unless it's low-scoring
if scores[-1] >= min(scores[:-1]) or len(paragraphs) == target_count + 1:
selected_indices = sorted(range(len(paragraphs)-1), key=lambda i: scores[i], reverse=True)[:target_count-1] + [len(paragraphs)-1]
else:
selected_indices = sorted(range(len(paragraphs)), key=lambda i: scores[i], reverse=True)[:target_count]
selected = [paragraphs[i] for i in sorted(selected_indices)]
else:
selected = paragraphs[:]
# Handle word count adjustments or too few paragraphs
adjusted = []
for para in selected:
word_count = len(para.split())
if word_count < 60 or word_count > 80:
# Rephrase to fit 60-80 words
rephrase_prompt = (
f"Rephrase this paragraph to exactly 60-80 words, keeping the same tone as a {persona} and all key ideas: '{para}'"
)
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": rephrase_prompt},
{"role": "user", "content": para}
],
max_tokens=150,
temperature=0.7
)
new_para = response.choices[0].message.content.strip()
if 60 <= len(new_para.split()) <= 80:
adjusted.append(new_para)
else:
adjusted.append(para) # Fallback to original if rephrase fails
except Exception as e:
logging.warning(f"Rephrasing failed for paragraph: {e}")
adjusted.append(para)
else:
adjusted.append(para)
# Generate additional paragraphs if needed
while len(adjusted) < target_count:
extra_prompt = (
f"Generate one additional paragraph (60-80 words) in the style of a {persona}, "
f"based on this content: '{original_content[:200]}...'. Match the tone of: '{adjusted[-1] if adjusted else 'This trend is fire!'}'"
)
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": extra_prompt},
{"role": "user", "content": original_content}
],
max_tokens=150,
temperature=0.7
)
new_para = response.choices[0].message.content.strip()
if 60 <= len(new_para.split()) <= 80:
adjusted.append(new_para)
else:
adjusted.append("This trend is sparking buzz across menus!") # Fallback
except Exception as e:
logging.warning(f"Extra paragraph generation failed: {e}")
adjusted.append("This vibe is shaking up the food scene!")
return adjusted[:target_count]
def insert_link_naturally(summary, source_name, source_url):
import re
try:
prompt = (
"Take this summary and insert a single HTML link naturally into one paragraph (randomly chosen). "
@@ -571,8 +499,7 @@ def insert_link_naturally(summary, source_name, source_url):
except Exception as e:
logging.error(f"Link insertion failed: {e}")
# Fallback: Protect times and insert at sentence end
time_pattern = r'\b\d{1,2}\.\d{2}(?:am|pm)\b' # Matches 6.30am, 12.15pm
time_pattern = r'\b\d{1,2}\.\d{2}(?:am|pm)\b'
protected_summary = re.sub(time_pattern, lambda m: m.group(0).replace('.', '@'), summary)
paragraphs = protected_summary.split('\n')
if not paragraphs or all(not p.strip() for p in paragraphs):
@@ -588,22 +515,19 @@ def insert_link_naturally(summary, source_name, source_url):
]
insertion_phrase = random.choice(phrases)
# Find sentence boundary, avoiding protected times
sentences = re.split(r'(?<=[.!?])\s+', target_para)
insertion_point = -1
for i, sent in enumerate(sentences):
if sent.strip() and '@' not in sent: # Avoid sentences with protected times
if sent.strip() and '@' not in sent:
insertion_point = sum(len(s) + 1 for s in sentences[:i+1])
break
if insertion_point == -1:
insertion_point = len(target_para) # Append if no good boundary
insertion_point = len(target_para)
# Add space after insertion phrase
new_para = f"{target_para[:insertion_point]} {insertion_phrase}. {target_para[insertion_point:]}".strip()
paragraphs[paragraphs.index(target_para)] = new_para
new_summary = '\n'.join(paragraphs)
# Restore periods in times
new_summary = new_summary.replace('@', '.')
logging.info(f"Fallback summary with link: {new_summary}")
return new_summary
@@ -759,7 +683,7 @@ def post_to_wp(post_data, category, link, author, image_url, original_source, im
logging.warning(f"All image uploads failed for '{post_data['title']}' - posting without image")
endpoint = f"{wp_base_url}/posts/{post_id}" if post_id else f"{wp_base_url}/posts"
method = requests.post # Use POST for both create and update (WP API handles it)
method = requests.post
logging.debug(f"Sending WP request to {endpoint} with payload: {json.dumps(payload, indent=2)}")
@@ -775,13 +699,21 @@ def post_to_wp(post_data, category, link, author, image_url, original_source, im
post_id = post_info["id"]
post_url = post_info["link"]
# Save to recent_posts.json
# Save to recent_posts.json
timestamp = datetime.now(timezone.utc).isoformat()
save_post_to_recent(post_data["title"], post_url, author["username"], timestamp)
logging.info(f"Posted/Updated by {author['username']}: {post_data['title']} (ID: {post_id})")
return post_id, post_url
# Post article tweet to X
try:
post = {"title": post_data["title"], "url": post_url}
tweet = generate_article_tweet(author, post, author["persona"])
if post_tweet(author, tweet):
logging.info(f"Successfully posted article tweet for {author['username']} on X")
else:
logging.warning(f"Failed to post article tweet for {author['username']} on X")
except Exception as e:
logging.error(f"Error posting article tweet for {author['username']}: {e}")
logging.info(f"Posted/Updated by {author['username']}: {post_data['title']} (ID: {post_id})")
return post_id, post_url
@@ -860,7 +792,6 @@ def get_flickr_image_via_ddg(search_query, relevance_keywords):
result = random.choice(candidates)
image_url = result["image_url"]
# OCR check on the selected image
temp_file = None
try:
img_response = requests.get(image_url, headers=headers, timeout=10)
@@ -876,9 +807,8 @@ def get_flickr_image_via_ddg(search_query, relevance_keywords):
if char_count > 200:
logging.info(f"Skipping text-heavy image (OCR): {image_url} (char_count: {char_count})")
return None, None, None, None # Fall back to Pixabay
return None, None, None, None
# Success: Save and return
flickr_data = {
"title": search_query,
"image_url": image_url,
@@ -945,7 +875,6 @@ def prepare_post_data(final_summary, original_title, context_info=""):
logging.info(f"Title generation failed for '{original_title}' {context_info}")
return None, None, None, None, None, None, None
# Note: This function still uses generate_image_query, but curate_from_rss overrides it with smart_image_and_filter
search_query, relevance_keywords = generate_image_query(f"{innovative_title}\n\n{final_summary}")
if not search_query:
logging.info(f"Image query generation failed for '{innovative_title}' {context_info}")
@@ -976,7 +905,6 @@ def prepare_post_data(final_summary, original_title, context_info=""):
return post_data, author, category, image_url, image_source, uploader, page_url
def save_post_to_recent(post_title, post_url, author_username, timestamp):
"""Save post details to recent_posts.json."""
try:
recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json')
entry = {
@@ -995,7 +923,6 @@ def save_post_to_recent(post_title, post_url, author_username, timestamp):
logging.error(f"Failed to save post to recent_posts.json: {e}")
def prune_recent_posts():
"""Prune recent_posts.json to keep only entries from the last 24 hours."""
try:
cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json')