You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

433 lines
20 KiB

# foodie_engagement_tweet.py
import json
import logging
import random
import signal
import sys
import fcntl
import os
import time
import re
import unicodedata
from datetime import datetime, timedelta, timezone
import tweepy
from openai import OpenAI
from foodie_utils import post_tweet, load_post_counts, save_post_counts
from foodie_config import (
AUTHORS, SUMMARY_MODEL, X_API_CREDENTIALS, AUTHOR_BACKGROUNDS_FILE,
PERSONA_CONFIGS, ENGAGEMENT_REFERENCE_DATE_FILE
)
from dotenv import load_dotenv
load_dotenv()
REFERENCE_DATE_FILE = ENGAGEMENT_REFERENCE_DATE_FILE
LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_engagement_tweet.lock"
LOG_FILE = "/home/shane/foodie_automator/logs/foodie_engagement_tweet.log"
LOG_PRUNE_DAYS = 30
MAX_RETRIES = 3
RETRY_BACKOFF = 2
URL = "https://insiderfoodie.com"
URL_SHORTENED_LENGTH = 23 # Twitter's shortened URL length
CURRENT_YEAR = "2025" # Explicitly set the current year for the prompt
def setup_logging():
"""Initialize logging with pruning of old logs."""
try:
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = []
malformed_count = 0
for line in lines:
if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit():
malformed_count += 1
continue
try:
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
if timestamp > cutoff:
pruned_lines.append(line)
except ValueError:
malformed_count += 1
continue
if malformed_count > 0:
logging.info(f"Skipped {malformed_count} malformed log lines during pruning")
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logging.getLogger("openai").setLevel(logging.WARNING)
logging.getLogger("tweepy").setLevel(logging.WARNING)
logging.info("Logging initialized for foodie_engagement_tweet.py")
except Exception as e:
print(f"Failed to setup logging: {e}")
sys.exit(1)
def acquire_lock():
"""Acquire a lock to prevent concurrent runs."""
os.makedirs(os.path.dirname(LOCK_FILE), exist_ok=True)
lock_fd = open(LOCK_FILE, 'w')
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
lock_fd.write(str(os.getpid()))
lock_fd.flush()
return lock_fd
except IOError:
logging.info("Another instance of foodie_engagement_tweet.py is running")
sys.exit(0)
def signal_handler(sig, frame):
"""Handle termination signals gracefully."""
logging.info("Received termination signal, exiting...")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Initialize OpenAI client
try:
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
if not os.getenv("OPENAI_API_KEY"):
logging.error("OPENAI_API_KEY is not set in environment variables")
raise ValueError("OPENAI_API_KEY is required")
except Exception as e:
logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True)
sys.exit(1)
# Load author backgrounds into a dictionary for faster lookup
try:
if not os.path.exists(AUTHOR_BACKGROUNDS_FILE):
logging.error(f"Author backgrounds file not found at {AUTHOR_BACKGROUNDS_FILE}")
raise FileNotFoundError(f"Author backgrounds file not found at {AUTHOR_BACKGROUNDS_FILE}")
with open(AUTHOR_BACKGROUNDS_FILE, 'r') as f:
background_list = json.load(f)
if not isinstance(background_list, list):
logging.error(f"Invalid format in {AUTHOR_BACKGROUNDS_FILE}: Expected a list, got {type(background_list)}")
raise ValueError("Author backgrounds must be a list")
AUTHOR_BACKGROUNDS = {}
AUTHOR_BACKGROUNDS_LIST = background_list # Keep the original list for fallback lookup
for bg in background_list:
if "username" not in bg:
logging.error(f"Invalid entry in {AUTHOR_BACKGROUNDS_FILE}: Missing 'username' key in {bg}")
raise ValueError("Each author background must have a 'username' key")
username = bg["username"]
if not isinstance(username, str):
logging.error(f"Invalid username type in {AUTHOR_BACKGROUNDS_FILE}: {username} (type: {type(username)})")
raise ValueError("Username must be a string")
# Normalize the username to handle encoding differences
cleaned_username = unicodedata.normalize('NFC', username.strip().lower())
AUTHOR_BACKGROUNDS[cleaned_username] = bg
logging.debug(f"Added to AUTHOR_BACKGROUNDS: key='{cleaned_username}', value={bg}")
loaded_usernames = list(AUTHOR_BACKGROUNDS.keys())
logging.debug(f"Loaded author backgrounds: {loaded_usernames}")
except Exception as e:
logging.error(f"Failed to load author_backgrounds.json: {e}", exc_info=True)
AUTHOR_BACKGROUNDS = {}
AUTHOR_BACKGROUNDS_LIST = []
sys.exit(1)
def validate_twitter_credentials(author):
"""Validate Twitter API credentials for a specific author."""
username = author["username"]
credentials = X_API_CREDENTIALS.get(username)
if not credentials:
logging.error(f"No X credentials found for {username}")
return False
for attempt in range(MAX_RETRIES):
try:
twitter_client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
user = twitter_client.get_me()
logging.info(f"Credentials valid for {username} (handle: {credentials['x_username']})")
return True
except tweepy.TweepyException as e:
logging.warning(f"Failed to validate credentials for {username} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Credentials invalid for {username} after {MAX_RETRIES} attempts")
return False
return False
def remove_emojis(text):
"""Remove emojis from the given text, including variation selectors."""
# Unicode ranges for emojis, including variation selectors and combining characters
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F" # Emoticons
"\U0001F300-\U0001F5FF" # Symbols & Pictographs
"\U0001F680-\U0001F6FF" # Transport & Map Symbols
"\U0001F700-\U0001F77F" # Alchemical Symbols
"\U0001F780-\U0001F7FF" # Geometric Shapes Extended
"\U0001F800-\U0001F8FF" # Supplemental Arrows-C
"\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs
"\U0001FA00-\U0001FA6F" # Chess Symbols
"\U0001FA70-\U0001FAFF" # Symbols and Pictographs Extended-A
"\U00002700-\U000027BF" # Dingbats
"\U00002600-\U000026FF" # Miscellaneous Symbols
"\U0000FE00-\U0000FE0F" # Variation Selectors
"\U0000200D" # Zero Width Joiner
"\U0000200C" # Zero Width Non-Joiner
"]+",
flags=re.UNICODE
)
return emoji_pattern.sub(r"", text)
def get_reference_date():
"""Load or initialize the reference date for the 2-day interval."""
os.makedirs(os.path.dirname(REFERENCE_DATE_FILE), exist_ok=True)
if os.path.exists(REFERENCE_DATE_FILE):
try:
with open(REFERENCE_DATE_FILE, 'r') as f:
data = json.load(f)
reference_date = datetime.fromisoformat(data["reference_date"]).replace(tzinfo=timezone.utc)
logging.info(f"Loaded reference date: {reference_date.date()}")
return reference_date
except (json.JSONDecodeError, KeyError, ValueError) as e:
logging.error(f"Failed to load reference date from {REFERENCE_DATE_FILE}: {e}. Initializing new date.")
reference_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
try:
with open(REFERENCE_DATE_FILE, 'w') as f:
json.dump({"reference_date": reference_date.isoformat()}, f)
logging.info(f"Initialized reference date: {reference_date.date()}")
except Exception as e:
logging.error(f"Failed to save reference date to {REFERENCE_DATE_FILE}: {e}. Using current date.")
return reference_date
def generate_engagement_tweet(author):
"""Generate an engagement tweet using author background themes and persona."""
username = author["username"]
if not validate_twitter_credentials(author):
logging.error(f"Skipping tweet generation for {username} due to invalid credentials")
return None
credentials = X_API_CREDENTIALS.get(username)
author_handle = credentials["x_username"]
persona = author["persona"]
persona_config = PERSONA_CONFIGS.get(persona, PERSONA_CONFIGS["Visionary Editor"])
# Normalize and lookup background
username_cleaned = unicodedata.normalize('NFC', username.strip().lower())
logging.debug(f"Looking up background for username: raw='{username}', cleaned='{username_cleaned}'")
background = AUTHOR_BACKGROUNDS.get(username_cleaned, {})
# Debug comparison
available_usernames = list(AUTHOR_BACKGROUNDS.keys())
if username_cleaned in available_usernames:
logging.debug(f"Direct key check: '{username_cleaned}' found in AUTHOR_BACKGROUNDS keys")
else:
logging.debug(f"Direct key check: '{username_cleaned}' NOT found in AUTHOR_BACKGROUNDS keys")
# Byte-level comparison for the first available username
if available_usernames:
sample_key = available_usernames[0]
logging.debug(
f"Byte-level comparison sample: "
f"username_cleaned bytes = {list(username_cleaned.encode('utf-8'))}, "
f"sample background key bytes = {list(sample_key.encode('utf-8'))}"
)
# Fallback lookup if dictionary fails
if not background:
logging.debug(f"Dictionary lookup failed for '{username_cleaned}', attempting fallback lookup")
for bg in AUTHOR_BACKGROUNDS_LIST:
bg_username = bg.get("username", "")
if not isinstance(bg_username, str):
logging.warning(f"Skipping background entry with non-string username: {bg_username} (type: {type(bg_username)})")
continue
bg_username_cleaned = unicodedata.normalize('NFC', bg_username.strip().lower())
logging.debug(
f"Fallback comparison: "
f"author username (cleaned) = '{username_cleaned}', "
f"background username (cleaned) = '{bg_username_cleaned}'"
)
if bg_username_cleaned == username_cleaned:
background = bg
logging.debug(f"Fallback lookup succeeded for '{username_cleaned}'")
break
if not background or "engagement_themes" not in background:
logging.warning(
f"No background or engagement themes found for {username}. "
f"Attempted username (cleaned): {username_cleaned}. "
f"Available usernames: {available_usernames}. Using default theme."
)
theme = "food trends"
else:
theme = random.choice(background["engagement_themes"])
logging.debug(f"Selected engagement theme '{theme}' for {username}")
base_prompt = persona_config["x_prompt"].format(
description=persona_config["description"],
tone=persona_config["tone"]
)
prompt = (
f"{base_prompt}\n\n"
f"Generate an engagement tweet for {author_handle} asking a question about {theme} to engage the public. "
f"The current year is {CURRENT_YEAR}, and all references to the year should use {CURRENT_YEAR}. "
f"Keep it under 230 characters to ensure room for the URL. "
f"Use {persona_config['tone']}. "
f"Include a call to action to follow {author_handle} or like the tweet, followed by the URL {URL} (do not mention InsiderFoodie.com separately in the text). "
f"Strictly avoid using any emojis, hashtags, or reward-driven incentives (e.g., giveaways)—do not include them under any circumstances. "
f"Return only the tweet text."
)
for attempt in range(MAX_RETRIES):
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=80,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
# Remove emojis as a safeguard
tweet = remove_emojis(tweet)
# Check for duplicate URLs and remove if present
url_count = tweet.lower().count(URL.lower())
if url_count > 1:
logging.warning(f"Generated tweet for {username} contains duplicate URLs: {tweet}")
# Keep only the last occurrence of the URL
last_url_pos = tweet.rfind(URL)
tweet = tweet[:last_url_pos].replace(URL, "").strip() + " " + URL
logging.debug(f"Revised tweet after removing duplicate URL: {tweet}")
# Ensure the URL is at the end of the tweet
if not tweet.endswith(URL):
tweet = tweet.replace(URL, "").strip() + " " + URL
# Calculate tweet length considering Twitter's URL shortening
tweet_without_url = tweet.replace(URL, "")
total_length = len(tweet_without_url) + URL_SHORTENED_LENGTH
if total_length > 280:
logging.warning(f"Tweet for {username} exceeds 280 characters ({total_length}), truncating")
tweet_without_url = tweet_without_url[:(280 - URL_SHORTENED_LENGTH - 3)]
tweet = tweet_without_url + "..." + " " + URL
total_length = len(tweet_without_url) + 3 + URL_SHORTENED_LENGTH
logging.debug(f"Final tweet for {username} (length {total_length}): {tweet}")
return tweet
except Exception as e:
logging.warning(f"Failed to generate engagement tweet for {username} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Failed to generate engagement tweet after {MAX_RETRIES} attempts")
fallback = (
f"What's the hottest {theme}? Share and follow {author_handle} for more! {URL}"
)
# Ensure fallback tweet is within length limits
tweet_without_url = fallback.replace(URL, "")
total_length = len(tweet_without_url) + URL_SHORTENED_LENGTH
if total_length > 280:
tweet_without_url = tweet_without_url[:(280 - URL_SHORTENED_LENGTH - 3)]
fallback = tweet_without_url + "..." + " " + URL
# Remove emojis from fallback as well
fallback = remove_emojis(fallback)
logging.info(f"Using fallback engagement tweet: {fallback}")
return fallback
return None
def post_engagement_tweet():
"""Post engagement tweets for authors every 2 days."""
try:
logging.info("Starting foodie_engagement_tweet.py")
print("Starting foodie_engagement_tweet.py")
reference_date = get_reference_date()
current_date = datetime.now(timezone.utc)
days_since_reference = (current_date - reference_date).days
logging.info(f"Days since reference date ({reference_date.date()}): {days_since_reference}")
print(f"Days since reference date ({reference_date.date()}): {days_since_reference}")
if days_since_reference % 2 == 0:
logging.info("Today is an engagement tweet day (every 2 days). Posting...")
print("Today is an engagement tweet day (every 2 days). Posting...")
post_counts = load_post_counts()
for author in AUTHORS:
username = author["username"]
try:
author_count = next((entry for entry in post_counts if entry["username"] == username), None)
if not author_count:
logging.warning(f"No post count entry for {username}, initializing new entry")
author_count = {
"username": username,
"month": datetime.now(timezone.utc).strftime("%Y-%m"),
"monthly_count": 0,
"day": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"daily_count": 0
}
post_counts.append(author_count)
save_post_counts(post_counts)
if author_count["monthly_count"] >= 500:
logging.warning(f"Monthly post limit (500) reached for {username}, skipping")
continue
if author_count["daily_count"] >= 20:
logging.warning(f"Daily post limit (20) reached for {username}, skipping")
continue
tweet = generate_engagement_tweet(author)
if not tweet:
logging.error(f"Failed to generate engagement tweet for {username}, skipping")
continue
logging.info(f"Posting engagement tweet for {username}: {tweet}")
print(f"Posting engagement tweet for {username}: {tweet}")
if post_tweet(author, tweet):
logging.info(f"Successfully posted engagement tweet for {username}")
author_count["monthly_count"] += 1
author_count["daily_count"] += 1
save_post_counts(post_counts)
else:
logging.warning(f"Failed to post engagement tweet for {username}")
except Exception as e:
logging.error(f"Error posting engagement tweet for {username}: {e}", exc_info=True)
continue
else:
logging.info(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...")
print(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...")
logging.info("Completed foodie_engagement_tweet.py")
print("Completed foodie_engagement_tweet.py")
except Exception as e:
logging.error(f"Unexpected error in post_engagement_tweet: {e}", exc_info=True)
print(f"Error in post_engagement_tweet: {e}")
def main():
"""Main function to run the script."""
lock_fd = None
try:
lock_fd = acquire_lock()
setup_logging()
post_engagement_tweet()
except Exception as e:
logging.error(f"Fatal error in main: {e}", exc_info=True)
print(f"Fatal error: {e}")
sys.exit(1)
finally:
if lock_fd:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
os.remove(LOCK_FILE) if os.path.exists(LOCK_FILE) else None
if __name__ == "__main__":
main()