This commit is contained in:
2025-05-03 17:12:10 +10:00
parent b265b5aa30
commit bfddb14950
+167 -121
View File
@@ -30,6 +30,8 @@ from pathlib import Path
from functools import lru_cache
import hashlib
from rate_limiter import RateLimiter
from wordpress_xmlrpc.client import Client
from wordpress_xmlrpc.methods.media import UploadFile, NewPost
# Configure logging
logging.basicConfig(
@@ -51,6 +53,17 @@ used_images = set()
pixabay_rate_limiter = RateLimiter(max_requests=100, time_window=3600) # 100 requests per hour
flickr_rate_limiter = RateLimiter(max_requests=3600, time_window=3600) # 3600 requests per hour
# Add file paths
FILE_PATHS = {
"posted_rss_titles": "/home/shane/foodie_automator/posted_rss_titles.json",
"posted_reddit_titles": "/home/shane/foodie_automator/posted_reddit_titles.json",
"used_images": "/home/shane/foodie_automator/used_images.json",
"recent_posts": "/home/shane/foodie_automator/recent_posts.json",
"x_post_counts": "/home/shane/foodie_automator/x_post_counts.json"
}
USED_IMAGES_FILE = FILE_PATHS["used_images"]
def validate_json_entry(entry: Dict[str, Any]) -> bool:
"""Validate the structure of a JSON entry."""
required_fields = {"title", "timestamp"}
@@ -133,7 +146,7 @@ def save_json_file(file_path, title, timestamp):
def load_post_counts():
counts = []
filename = '/home/shane/foodie_automator/x_post_counts.json'
filename = FILE_PATHS["x_post_counts"]
if os.path.exists(filename):
try:
with open(filename, 'r') as f:
@@ -175,7 +188,7 @@ def load_post_counts():
return counts
def save_post_counts(counts):
with open('/home/shane/foodie_automator/x_post_counts.json', 'w') as f:
with open(FILE_PATHS["x_post_counts"], 'w') as f:
for item in counts:
json.dump(item, f)
f.write('\n')
@@ -471,92 +484,107 @@ def upload_image_to_wp(image_url: str, post_title: str, wp_base_url: str, wp_use
logger.error(f"Image upload to WP failed for '{post_title}': {e}")
return None
def post_to_wp(post_data: Dict[str, Any], category: str, link: str, author: Dict[str, str],
image_url: Optional[str] = None, original_source: str = "",
image_source: str = "Pixabay", uploader: Optional[str] = None,
pixabay_url: Optional[str] = None, interest_score: int = 4,
post_id: Optional[int] = None, should_post_tweet: bool = True) -> Tuple[Optional[int], Optional[str]]:
"""Post content to WordPress with improved error handling and validation."""
def post_to_wp(
post_data: Dict[str, Any],
category: str,
link: str,
author: Dict[str, str],
image_url: Optional[str] = None,
original_source: Optional[str] = None,
image_source: Optional[str] = None,
uploader: Optional[str] = None,
pixabay_url: Optional[str] = None,
interest_score: Optional[int] = None
) -> Tuple[Optional[int], Optional[str]]:
"""
Post content to WordPress with proper attribution and formatting.
Args:
post_data: The post content and metadata
category: The post category
link: The original article link
author: The author information
image_url: Optional image URL
original_source: Optional original source name
image_source: Optional image source
uploader: Optional image uploader
pixabay_url: Optional Pixabay image URL
interest_score: Optional interest score
Returns:
Tuple of (post_id, post_url) or (None, None) if failed
"""
try:
# Validate input data
if not isinstance(post_data, dict) or "title" not in post_data or "content" not in post_data:
logger.error(f"Invalid post_data format: {post_data}")
# Load WordPress credentials from environment
wp_url = os.getenv('WORDPRESS_URL')
wp_username = os.getenv('WORDPRESS_USERNAME')
wp_password = os.getenv('WORDPRESS_PASSWORD')
if not all([wp_url, wp_username, wp_password]):
logger.error("Missing WordPress credentials in environment variables")
return None, None
if not isinstance(author, dict) or "username" not in author or "password" not in author:
logger.error(f"Invalid author data: {author}")
return None, None
# Initialize WordPress API client
wp = Client(
wp_url,
wp_username,
wp_password
)
# Get category ID
category_id = wp_api.get_category_id(category)
if not category_id:
logger.error(f"Failed to get/create category '{category}'")
return None, None
# Prepare tags
tags = [1] # Default tag
if interest_score >= 9:
picks_tag_id = wp_api.get_tag_id("Picks")
if picks_tag_id:
tags.append(picks_tag_id)
# Handle image upload
image_id = None
# Upload featured image if provided
featured_image_id = None
if image_url:
image_id = upload_image_to_wp(
image_url, post_data["title"],
wp_api.base_url, author["username"], author["password"],
image_source, uploader, pixabay_url
)
try:
# Download image
response = requests.get(image_url, timeout=30)
response.raise_for_status()
# Prepare post payload
payload = {
"title": post_data["title"],
"content": "\n".join(f"<p>{para}</p>" for para in post_data["content"].split('\n') if para.strip()),
"status": "publish",
"categories": [category_id],
"tags": tags,
"author": author.get("id", 5), # Default author ID
"meta": {
"original_link": link,
"original_source": original_source,
"interest_score": interest_score
# Create image filename
image_filename = f"{post_data['title'].replace(' ', '_')}.jpg"
# Upload to WordPress
media_data = {
'file': (image_filename, response.content, 'image/jpeg'),
'title': post_data['title'],
'caption': f"Image source: {image_source}\nUploader: {uploader}\nURL: {pixabay_url}" if image_source else None
}
media = wp.call(UploadFile(media_data))
featured_image_id = media['id']
except Exception as e:
logger.error(f"Failed to upload image '{image_filename}' to WordPress: {e}")
# Continue without image
# Prepare post data
post = {
'title': post_data['title'],
'content': post_data['content'],
'status': 'publish',
'categories': [category],
'author': author['id'],
'featured_media': featured_image_id,
'meta': {
'original_source': original_source,
'original_link': link,
'interest_score': interest_score
}
}
if image_id:
payload["featured_media"] = image_id
# Create post
result = wp.call(NewPost(post))
# Create or update post
endpoint = f"posts/{post_id}" if post_id else "posts"
method = "POST" if not post_id else "PUT"
if result and 'id' in result:
post_id = result['id']
post_url = f"{wp_url}/?p={post_id}"
logger.info(f"Successfully posted to WordPress (ID: {post_id})")
return post_id, post_url
response = wp_api._make_request(method, endpoint, json=payload)
if not response:
logger.error("Failed to create/update WordPress post")
return None, None
logger.error("Failed to create WordPress post")
return None, None
post_id = response["id"]
post_url = response["link"]
# Save to recent posts
timestamp = datetime.now(timezone.utc).isoformat()
save_post_to_recent(post_data["title"], post_url, author["username"], timestamp)
# Post tweet if requested
if should_post_tweet:
try:
post = {"title": post_data["title"], "url": post_url}
tweet = generate_article_tweet(author, post, author.get("persona", "Foodie Critic"))
if post_tweet(author, tweet):
logger.info(f"Successfully posted article tweet for {author['username']}")
except Exception as e:
logger.error(f"Error posting article tweet: {e}")
logger.info(f"Successfully posted/updated '{post_data['title']}' (ID: {post_id})")
return post_id, post_url
except Exception as e:
logger.error(f"WordPress posting failed: {e}")
logger.error(f"WordPress API request failed: {e}")
return None, None
def determine_paragraph_count(interest_score):
@@ -838,7 +866,7 @@ def prepare_post_data(final_summary, original_title, context_info=""):
def save_post_to_recent(post_title, post_url, author_username, timestamp):
try:
recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json')
recent_posts = load_json_file(FILE_PATHS["recent_posts"], 24)
entry = {
"title": post_title,
"url": post_url,
@@ -846,7 +874,7 @@ def save_post_to_recent(post_title, post_url, author_username, timestamp):
"timestamp": timestamp
}
recent_posts.append(entry)
with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f:
with open(FILE_PATHS["recent_posts"], 'w') as f:
for item in recent_posts:
json.dump(item, f)
f.write('\n')
@@ -857,9 +885,9 @@ def save_post_to_recent(post_title, post_url, author_username, timestamp):
def prune_recent_posts():
try:
cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
recent_posts = load_json_file('/home/shane/foodie_automator/recent_posts.json')
recent_posts = load_json_file(FILE_PATHS["recent_posts"], 24)
recent_posts = [entry for entry in recent_posts if entry["timestamp"] > cutoff]
with open('/home/shane/foodie_automator/recent_posts.json', 'w') as f:
with open(FILE_PATHS["recent_posts"], 'w') as f:
for item in recent_posts:
json.dump(item, f)
f.write('\n')
@@ -995,60 +1023,75 @@ def generate_image_query(title: str, summary: str) -> Tuple[str, List[str], bool
logger.warning(f"Image query generation failed: {e}. Using title as fallback.")
return title, [], True
def smart_image_and_filter(title: str, summary: str) -> Tuple[str, List[str], bool]:
"""Smart image filtering with improved error handling."""
def smart_image_and_filter(title: str, content: str) -> Tuple[str, List[str], bool]:
"""
Generate an image query and determine if the content should be filtered.
Args:
title: The article title
content: The article content
Returns:
Tuple of (image_query, relevance_keywords, should_skip)
"""
try:
content = f"{title}\n\n{summary}"
# Prepare prompt for GPT
prompt = f"""
Analyze this food-related content and determine:
1. A good image search query
2. Relevant keywords
3. Whether to skip this content
prompt = (
"Analyze this article title and summary. Extract key entities (brands, locations, cuisines, or topics) "
"for an image search about food industry trends or viral content. Prioritize specific terms if present, "
"otherwise focus on the main theme. "
"Return 'SKIP' if the article is about home appliances, recipes, promotions, or contains 'homemade', else 'KEEP'. "
"Return as JSON with double quotes for all property names and string values (e.g., {\"image_query\": \"specific term\", \"relevance\": [\"keyword1\", \"keyword2\"], \"action\": \"KEEP\" or \"SKIP\"})."
)
Title: {title}
Content: {content}
Return a JSON object with:
- image_query: A concise search query for finding relevant images
- relevance: List of relevant keywords
- action: Either "KEEP" or "SKIP"
Keep content that is:
- About food trends, innovations, or interesting culinary topics
- Has broad appeal to food enthusiasts
- Contains unique or noteworthy information
Skip content that is:
- Basic recipes or cooking instructions
- Restaurant reviews or menu items
- Generic food news without unique angles
"""
# Get response from GPT
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
model="gpt-4",
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": content}
{"role": "system", "content": "You are a food content curator."},
{"role": "user", "content": prompt}
],
max_tokens=100
temperature=0.7,
max_tokens=150
)
raw_result = response.choices[0].message.content.strip()
logger.info(f"Raw GPT smart image/filter response: '{raw_result}'")
# Remove ```json markers and fix single quotes in JSON structure
cleaned_result = re.sub(r'```json\s*|\s*```', '', raw_result).strip()
# Replace single quotes with double quotes, but preserve single quotes within string values
fixed_result = re.sub(r"(?<!\\)'(?=\s*[\w\s]*\])|(?<=\[|\{|\s)'|'(?=\s*[\]\},:])|(?<=\w)'(?=\s*:)", '"', cleaned_result)
# Parse response
try:
result = json.loads(fixed_result)
result = json.loads(response.choices[0].message.content)
image_query = result.get("image_query", "")
relevance = result.get("relevance", [])
action = result.get("action", "KEEP")
logger.info(f"Raw GPT smart image/filter response: '{response.choices[0].message.content}'")
logger.info(f"Smart image query: {image_query}, Relevance: {relevance}, Skip: {action == 'SKIP'}")
return image_query, relevance, action == "SKIP"
except json.JSONDecodeError as e:
logger.warning(f"JSON parsing failed: {e}, raw: '{fixed_result}'. Using fallback.")
return "food trends", ["cuisine", "dining"], False
if not isinstance(result, dict) or "image_query" not in result or "relevance" not in result or "action" not in result:
logger.warning(f"Invalid GPT response format: {result}, using fallback")
return "food trends", ["cuisine", "dining"], False
image_query = result["image_query"]
relevance_keywords = result["relevance"]
skip_flag = result["action"] == "SKIP" or "homemade" in title.lower() or "homemade" in summary.lower()
logger.info(f"Smart image query: {image_query}, Relevance: {relevance_keywords}, Skip: {skip_flag}")
if not image_query or len(image_query.split()) < 2:
logger.warning(f"Image query '{image_query}' too vague, using fallback")
return "food trends", ["cuisine", "dining"], skip_flag
return image_query, relevance_keywords, skip_flag
logger.warning(f"JSON parsing failed: {e}, raw: '{response.choices[0].message.content}'. Using fallback.")
# Fallback to basic filtering
return title, [], "recipe" in title.lower() or "how to" in title.lower()
except Exception as e:
logger.error(f"Smart image/filter failed: {e}, using fallback")
return "food trends", ["cuisine", "dining"], False
logger.error(f"Error in smart image/filter: {e}")
return title, [], False
def classify_keywords(keywords):
prompt = (
@@ -1159,6 +1202,9 @@ def get_flickr_image(search_query: str, relevance_keywords: List[str] = None) ->
# Get photo info for attribution
info = photo.getInfo()
if not hasattr(info, 'owner') or not hasattr(info.owner, 'username'):
continue
uploader = info.owner.username
page_url = f"https://www.flickr.com/photos/{info.owner.id}/{photo_id}"