You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

342 lines
15 KiB

# foodie_engagement_tweet.py
import json
import logging
import random
import signal
import sys
import fcntl
import os
import time
from datetime import datetime, timedelta, timezone
import tweepy
from openai import OpenAI
from foodie_utils import post_tweet, load_post_counts, save_post_counts
from foodie_config import (
AUTHORS, SUMMARY_MODEL, X_API_CREDENTIALS, AUTHOR_BACKGROUNDS_FILE,
PERSONA_CONFIGS, ENGAGEMENT_REFERENCE_DATE_FILE
)
from dotenv import load_dotenv
load_dotenv()
REFERENCE_DATE_FILE = ENGAGEMENT_REFERENCE_DATE_FILE
LOCK_FILE = "/home/shane/foodie_automator/locks/foodie_engagement_tweet.lock"
LOG_FILE = "/home/shane/foodie_automator/logs/foodie_engagement_tweet.log"
LOG_PRUNE_DAYS = 30
MAX_RETRIES = 3
RETRY_BACKOFF = 2
URL = "https://insiderfoodie.com"
URL_SHORTENED_LENGTH = 23 # Twitter's shortened URL length
def setup_logging():
"""Initialize logging with pruning of old logs."""
try:
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = []
malformed_count = 0
for line in lines:
if len(line) < 19 or not line[:19].replace('-', '').replace(':', '').replace(' ', '').isdigit():
malformed_count += 1
continue
try:
timestamp = datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
if timestamp > cutoff:
pruned_lines.append(line)
except ValueError:
malformed_count += 1
continue
if malformed_count > 0:
logging.info(f"Skipped {malformed_count} malformed log lines during pruning")
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logging.getLogger("openai").setLevel(logging.WARNING)
logging.getLogger("tweepy").setLevel(logging.WARNING)
logging.info("Logging initialized for foodie_engagement_tweet.py")
except Exception as e:
print(f"Failed to setup logging: {e}")
sys.exit(1)
def acquire_lock():
"""Acquire a lock to prevent concurrent runs."""
os.makedirs(os.path.dirname(LOCK_FILE), exist_ok=True)
lock_fd = open(LOCK_FILE, 'w')
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
lock_fd.write(str(os.getpid()))
lock_fd.flush()
return lock_fd
except IOError:
logging.info("Another instance of foodie_engagement_tweet.py is running")
sys.exit(0)
def signal_handler(sig, frame):
"""Handle termination signals gracefully."""
logging.info("Received termination signal, exiting...")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Initialize OpenAI client
try:
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
if not os.getenv("OPENAI_API_KEY"):
logging.error("OPENAI_API_KEY is not set in environment variables")
raise ValueError("OPENAI_API_KEY is required")
except Exception as e:
logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True)
sys.exit(1)
# Load author backgrounds
try:
with open(AUTHOR_BACKGROUNDS_FILE, 'r') as f:
AUTHOR_BACKGROUNDS = json.load(f)
logging.debug(f"Loaded author backgrounds: {[bg['username'] for bg in AUTHOR_BACKGROUNDS]}")
except Exception as e:
logging.error(f"Failed to load author_backgrounds.json: {e}", exc_info=True)
AUTHOR_BACKGROUNDS = []
sys.exit(1)
def validate_twitter_credentials(author):
"""Validate Twitter API credentials for a specific author."""
username = author["username"]
credentials = X_API_CREDENTIALS.get(username)
if not credentials:
logging.error(f"No X credentials found for {username}")
return False
for attempt in range(MAX_RETRIES):
try:
twitter_client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
user = twitter_client.get_me()
logging.info(f"Credentials valid for {username} (handle: {credentials['x_username']})")
return True
except tweepy.TweepyException as e:
logging.warning(f"Failed to validate credentials for {username} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Credentials invalid for {username} after {MAX_RETRIES} attempts")
return False
return False
def get_reference_date():
"""Load or initialize the reference date for the 2-day interval."""
os.makedirs(os.path.dirname(REFERENCE_DATE_FILE), exist_ok=True)
if os.path.exists(REFERENCE_DATE_FILE):
try:
with open(REFERENCE_DATE_FILE, 'r') as f:
data = json.load(f)
reference_date = datetime.fromisoformat(data["reference_date"]).replace(tzinfo=timezone.utc)
logging.info(f"Loaded reference date: {reference_date.date()}")
return reference_date
except (json.JSONDecodeError, KeyError, ValueError) as e:
logging.error(f"Failed to load reference date from {REFERENCE_DATE_FILE}: {e}. Initializing new date.")
reference_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
try:
with open(REFERENCE_DATE_FILE, 'w') as f:
json.dump({"reference_date": reference_date.isoformat()}, f)
logging.info(f"Initialized reference date: {reference_date.date()}")
except Exception as e:
logging.error(f"Failed to save reference date to {REFERENCE_DATE_FILE}: {e}. Using current date.")
return reference_date
def generate_engagement_tweet(author):
"""Generate an engagement tweet using author background themes and persona."""
username = author["username"]
if not validate_twitter_credentials(author):
logging.error(f"Skipping tweet generation for {username} due to invalid credentials")
return None
credentials = X_API_CREDENTIALS.get(username)
author_handle = credentials["x_username"]
persona = author["persona"]
persona_config = PERSONA_CONFIGS.get(persona, PERSONA_CONFIGS["Visionary Editor"])
# Case-insensitive lookup for background
background = next(
(bg for bg in AUTHOR_BACKGROUNDS if bg["username"].lower() == username.lower()),
{}
)
if not background or "engagement_themes" not in background:
logging.warning(f"No background or engagement themes found for {username}, using default theme")
theme = "food trends"
else:
theme = random.choice(background["engagement_themes"])
logging.debug(f"Selected engagement theme '{theme}' for {username}")
base_prompt = persona_config["x_prompt"].format(
description=persona_config["description"],
tone=persona_config["tone"]
)
prompt = (
f"{base_prompt}\n\n"
f"Generate an engagement tweet for {author_handle} asking a question about {theme} to engage the public. "
f"Keep it under 230 characters to ensure room for the URL. "
f"Use {persona_config['tone']}. "
f"Include a call to action to follow {author_handle} or like the tweet, followed by the URL {URL} (do not mention InsiderFoodie.com separately in the text). "
f"Avoid using the word 'elevate'—use more humanized language like 'level up' or 'bring to life'. "
f"Do not include emojis, hashtags, or reward-driven incentives (e.g., giveaways). "
f"Return only the tweet text."
)
for attempt in range(MAX_RETRIES):
try:
response = client.chat.completions.create(
model=SUMMARY_MODEL,
messages=[
{"role": "system", "content": "You are a social media expert crafting engaging tweets."},
{"role": "user", "content": prompt}
],
max_tokens=80,
temperature=0.7
)
tweet = response.choices[0].message.content.strip()
# Check for duplicate URLs and remove if present
url_count = tweet.lower().count(URL.lower())
if url_count > 1:
logging.warning(f"Generated tweet for {username} contains duplicate URLs: {tweet}")
# Keep only the last occurrence of the URL
last_url_pos = tweet.rfind(URL)
tweet = tweet[:last_url_pos].replace(URL, "").strip() + " " + URL
logging.debug(f"Revised tweet after removing duplicate URL: {tweet}")
# Ensure the URL is at the end of the tweet
if not tweet.endswith(URL):
tweet = tweet.replace(URL, "").strip() + " " + URL
# Calculate tweet length considering Twitter's URL shortening
tweet_without_url = tweet.replace(URL, "")
total_length = len(tweet_without_url) + URL_SHORTENED_LENGTH
if total_length > 280:
logging.warning(f"Tweet for {username} exceeds 280 characters ({total_length}), truncating")
tweet_without_url = tweet_without_url[:(280 - URL_SHORTENED_LENGTH - 3)]
tweet = tweet_without_url + "..." + " " + URL
total_length = len(tweet_without_url) + 3 + URL_SHORTENED_LENGTH
logging.debug(f"Final tweet for {username} (length {total_length}): {tweet}")
return tweet
except Exception as e:
logging.warning(f"Failed to generate engagement tweet for {username} (attempt {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BACKOFF * (2 ** attempt))
else:
logging.error(f"Failed to generate engagement tweet after {MAX_RETRIES} attempts")
fallback = (
f"What's the hottest {theme}? Share and follow {author_handle} for more! {URL}"
)
# Ensure fallback tweet is within length limits
tweet_without_url = fallback.replace(URL, "")
total_length = len(tweet_without_url) + URL_SHORTENED_LENGTH
if total_length > 280:
tweet_without_url = tweet_without_url[:(280 - URL_SHORTENED_LENGTH - 3)]
fallback = tweet_without_url + "..." + " " + URL
logging.info(f"Using fallback engagement tweet: {fallback}")
return fallback
return None
def post_engagement_tweet():
"""Post engagement tweets for authors every 2 days."""
try:
logging.info("Starting foodie_engagement_tweet.py")
print("Starting foodie_engagement_tweet.py")
reference_date = get_reference_date()
current_date = datetime.now(timezone.utc)
days_since_reference = (current_date - reference_date).days
logging.info(f"Days since reference date ({reference_date.date()}): {days_since_reference}")
print(f"Days since reference date ({reference_date.date()}): {days_since_reference}")
if days_since_reference % 2 == 0:
logging.info("Today is an engagement tweet day (every 2 days). Posting...")
print("Today is an engagement tweet day (every 2 days). Posting...")
post_counts = load_post_counts()
for author in AUTHORS:
username = author["username"]
try:
author_count = next((entry for entry in post_counts if entry["username"] == username), None)
if not author_count:
logging.warning(f"No post count entry for {username}, initializing new entry")
author_count = {
"username": username,
"month": datetime.now(timezone.utc).strftime("%Y-%m"),
"monthly_count": 0,
"day": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"daily_count": 0
}
post_counts.append(author_count)
save_post_counts(post_counts)
if author_count["monthly_count"] >= 500:
logging.warning(f"Monthly post limit (500) reached for {username}, skipping")
continue
if author_count["daily_count"] >= 20:
logging.warning(f"Daily post limit (20) reached for {username}, skipping")
continue
tweet = generate_engagement_tweet(author)
if not tweet:
logging.error(f"Failed to generate engagement tweet for {username}, skipping")
continue
logging.info(f"Posting engagement tweet for {username}: {tweet}")
print(f"Posting engagement tweet for {username}: {tweet}")
if post_tweet(author, tweet):
logging.info(f"Successfully posted engagement tweet for {username}")
author_count["monthly_count"] += 1
author_count["daily_count"] += 1
save_post_counts(post_counts)
else:
logging.warning(f"Failed to post engagement tweet for {username}")
except Exception as e:
logging.error(f"Error posting engagement tweet for {username}: {e}", exc_info=True)
continue
else:
logging.info(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...")
print(f"Today is not an engagement tweet day (every 2 days). Days since reference: {days_since_reference}. Skipping...")
logging.info("Completed foodie_engagement_tweet.py")
print("Completed foodie_engagement_tweet.py")
except Exception as e:
logging.error(f"Unexpected error in post_engagement_tweet: {e}", exc_info=True)
print(f"Error in post_engagement_tweet: {e}")
def main():
"""Main function to run the script."""
lock_fd = None
try:
lock_fd = acquire_lock()
setup_logging()
post_engagement_tweet()
except Exception as e:
logging.error(f"Fatal error in main: {e}", exc_info=True)
print(f"Fatal error: {e}")
sys.exit(1)
finally:
if lock_fd:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
os.remove(LOCK_FILE) if os.path.exists(LOCK_FILE) else None
if __name__ == "__main__":
main()