128 lines
5.0 KiB
Python
128 lines
5.0 KiB
Python
# foodie_x_poster.py
|
||
import json
|
||
import logging
|
||
import random
|
||
import time
|
||
import sys
|
||
import signal
|
||
import os
|
||
from datetime import datetime, timezone, timedelta
|
||
from openai import OpenAI
|
||
from foodie_config import OPENAI_API_KEY, AUTHORS, LIGHT_TASK_MODEL, PERSONA_CONFIGS, AUTHOR_BACKGROUNDS_FILE
|
||
from foodie_utils import load_json_file, post_tweet, check_author_rate_limit
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
LOG_FILE = "/home/shane/foodie_automator/foodie_x_poster.log"
|
||
LOG_PRUNE_DAYS = 30
|
||
|
||
def setup_logging():
|
||
if os.path.exists(LOG_FILE):
|
||
with open(LOG_FILE, 'r') as f:
|
||
lines = f.readlines()
|
||
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
|
||
pruned_lines = [line for line in lines if datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) > cutoff]
|
||
with open(LOG_FILE, 'w') as f:
|
||
f.writelines(pruned_lines)
|
||
logging.basicConfig(
|
||
filename=LOG_FILE,
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||
datefmt="%Y-%m-%d %H:%M:%S"
|
||
)
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
||
logging.getLogger().addHandler(console_handler)
|
||
logging.info("Logging initialized for foodie_x_poster.py")
|
||
|
||
setup_logging()
|
||
|
||
client = OpenAI(api_key=OPENAI_API_KEY)
|
||
|
||
try:
|
||
with open(AUTHOR_BACKGROUNDS_FILE, 'r') as f:
|
||
AUTHOR_BACKGROUNDS = json.load(f)
|
||
except Exception as e:
|
||
logging.error(f"Failed to load author_backgrounds.json: {e}")
|
||
sys.exit(1)
|
||
|
||
is_posting = False
|
||
|
||
def signal_handler(sig, frame):
|
||
logging.info("Received termination signal, checking if safe to exit...")
|
||
if is_posting:
|
||
logging.info("Currently posting, will exit after completion.")
|
||
else:
|
||
logging.info("Safe to exit immediately.")
|
||
sys.exit(0)
|
||
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
|
||
def generate_engagement_tweet(author, persona):
|
||
background = next((bg for bg in AUTHOR_BACKGROUNDS if bg["username"] == author["username"]), {})
|
||
if not background or "engagement_themes" not in background:
|
||
logging.warning(f"No background or engagement themes found for {author['username']}")
|
||
return "What food trends are you loving right now? Share your thoughts! #FoodieTrends"
|
||
|
||
theme = random.choice(background["engagement_themes"])
|
||
persona_config = PERSONA_CONFIGS[persona]
|
||
base_prompt = persona_config["x_prompt"].format(
|
||
description=persona_config["description"],
|
||
tone=persona_config["tone"]
|
||
)
|
||
prompt = base_prompt.replace(
|
||
"For engagement tweets, ask a question about food trends, foods, or articles to engage the public.",
|
||
f"Generate an engagement tweet asking a question about {theme} to engage the public."
|
||
)
|
||
try:
|
||
response = client.chat.completions.create(
|
||
model=LIGHT_TASK_MODEL,
|
||
messages=[
|
||
{"role": "system", "content": prompt},
|
||
{"role": "user", "content": f"Generate engagement tweet for {author['username']} about {theme}."}
|
||
],
|
||
max_tokens=100,
|
||
temperature=0.9
|
||
)
|
||
tweet = response.choices[0].message.content.strip()
|
||
if len(tweet) > 280:
|
||
tweet = tweet[:277] + "..."
|
||
logging.info(f"Generated engagement tweet for {author['username']}: {tweet}")
|
||
return tweet
|
||
except Exception as e:
|
||
logging.error(f"Failed to generate engagement tweet for {author['username']}: {e}")
|
||
return f"What’s your take on {theme}? Let’s talk!"
|
||
|
||
def main():
|
||
global is_posting
|
||
logging.info("***** X Poster Launched *****")
|
||
for author in AUTHORS:
|
||
# Check if the author can post before generating the tweet
|
||
can_post, remaining, reset = check_author_rate_limit(author)
|
||
if not can_post:
|
||
reset_time = datetime.fromtimestamp(reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') if reset else "Unknown"
|
||
logging.info(f"Skipping engagement tweet for {author['username']} due to rate limit. Remaining: {remaining}, Reset at: {reset_time}")
|
||
continue
|
||
|
||
is_posting = True
|
||
try:
|
||
tweet = generate_engagement_tweet(author, author["persona"])
|
||
if post_tweet(author, tweet):
|
||
logging.info(f"Successfully posted engagement tweet for {author['username']}")
|
||
else:
|
||
logging.warning(f"Failed to post engagement tweet for {author['username']}")
|
||
except Exception as e:
|
||
logging.error(f"Error posting engagement tweet for {author['username']}: {e}", exc_info=True)
|
||
finally:
|
||
is_posting = False
|
||
time.sleep(random.uniform(3600, 7200))
|
||
|
||
logging.info("X posting completed")
|
||
return random.randint(600, 1800)
|
||
|
||
if __name__ == "__main__":
|
||
sleep_time = main()
|
||
logging.info(f"Sleeping for {sleep_time} seconds")
|
||
time.sleep(sleep_time) |