You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

236 lines
8.9 KiB

# foodie_x_poster.py
import json
import logging
import random
import time
import sys
import signal
import os
from datetime import datetime, timedelta, timezone
from openai import OpenAI
import tweepy
from foodie_config import OPENAI_API_KEY, AUTHORS, LIGHT_TASK_MODEL
from foodie_utils import load_json_file
from foodie_x_config import X_API_CREDENTIALS, X_PERSONA_PROMPTS, AUTHOR_BACKGROUNDS_FILE, X_POST_COUNTS_FILE, RECENT_POSTS_FILE
from dotenv import load_dotenv
load_dotenv()
LOG_FILE = "/home/shane/foodie_automator/foodie_x_poster.log"
LOG_PRUNE_DAYS = 30
def setup_logging():
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = [line for line in lines if datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) > cutoff]
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logging.info("Logging initialized for foodie_x_poster.py")
setup_logging()
client = OpenAI(api_key=OPENAI_API_KEY)
try:
with open(AUTHOR_BACKGROUNDS_FILE, 'r') as f:
AUTHOR_BACKGROUNDS = json.load(f)
except Exception as e:
logging.error(f"Failed to load author_backgrounds.json: {e}")
sys.exit(1)
def load_post_counts():
counts = load_json_file(X_POST_COUNTS_FILE)
if not counts:
counts = [{"username": author["username"], "count": 0, "month": datetime.now(timezone.utc).strftime("%Y-%m")} for author in AUTHORS]
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
for entry in counts:
if entry["month"] != current_month:
entry["count"] = 0
entry["month"] = current_month
return counts
def save_post_counts(counts):
with open(X_POST_COUNTS_FILE, 'w') as f:
for item in counts:
json.dump(item, f)
f.write('\n')
logging.info(f"Saved post counts to {X_POST_COUNTS_FILE}")
is_posting = False
def signal_handler(sig, frame):
logging.info("Received termination signal, checking if safe to exit...")
if is_posting:
logging.info("Currently posting, will exit after completion.")
else:
logging.info("Safe to exit immediately.")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def get_recent_posts_for_author(username):
posts = load_json_file(RECENT_POSTS_FILE)
return [post for post in posts if post["author_username"] == username]
def delete_used_post(post_title):
posts = load_json_file(RECENT_POSTS_FILE)
posts = [post for post in posts if post["title"] != post_title]
with open(RECENT_POSTS_FILE, 'w') as f:
for item in posts:
json.dump(item, f)
f.write('\n')
logging.info(f"Deleted post '{post_title}' from recent_posts.json")
def generate_article_tweet(author, post, persona):
# Format the prompt using description and tone
persona_config = X_PERSONA_PROMPTS[persona]
base_prompt = persona_config["prompt"].format(
description=persona_config["description"],
tone=persona_config["tone"]
)
prompt = base_prompt.replace(
"For article tweets, include the article title, a quirky hook, and the URL.",
f"Generate an article tweet including the title '{post['title']}', a quirky hook, and the URL '{post['url']}'."
)
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": f"Generate tweet for {post['title']}."}
],
max_tokens=100,
temperature=0.9
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.info(f"Generated article tweet for {author['username']}: {tweet}")
return tweet
except Exception as e:
logging.error(f"Failed to generate article tweet for {author['username']}: {e}")
return f"This trend is fire! Check out {post['title']} at {post['url']} #Foodie"
def generate_personal_tweet(author, persona):
background = next((bg for bg in AUTHOR_BACKGROUNDS if bg["username"] == author["username"]), {})
if not background:
logging.warning(f"No background found for {author['username']}")
return f"Loving my gig at InsiderFoodie, dishing out food trends! #FoodieLife"
# Get DOB and calculate age
dob = author.get('dob', '1980-01-01')
current_year = datetime.now().year
birth_year = int(dob.split('-')[0])
age = current_year - birth_year
is_role_reflection = random.choice([True, False])
if is_role_reflection:
content = f"Reflect on your role at InsiderFoodie as {author['persona']}. Mention you're {age} years old."
else:
content = (
f"Share a personal story about your background, considering you were born on {dob} and are {age} years old. "
f"Hometown: {background['hometown']}, Cultural influences: {background['cultural_influences']}, "
f"Early memory: {background['early_memory']}, Career path: {background['career_path']}."
)
# Format the prompt using description and tone
persona_config = X_PERSONA_PROMPTS[persona]
base_prompt = persona_config["prompt"].format(
description=persona_config["description"],
tone=persona_config["tone"]
)
prompt = base_prompt.replace(
"For personal tweets, reflect on your role at InsiderFoodie or background.",
content
)
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": f"Generate personal tweet for {author['username']}."}
],
max_tokens=100,
temperature=0.9
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.info(f"Generated personal tweet for {author['username']}: {tweet}")
return tweet
except Exception as e:
logging.error(f"Failed to generate personal tweet for {author['username']}: {e}")
return f"Loving my gig at InsiderFoodie, dishing out food trends! #FoodieLife"
def post_tweet(author, tweet):
global is_posting
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return False
post_counts = load_post_counts()
author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None)
if author_count["count"] >= 450:
logging.warning(f"Post limit reached for {author['username']} this month")
return False
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
is_posting = True
response = client.create_tweet(text=tweet)
is_posting = False
author_count["count"] += 1
save_post_counts(post_counts)
logging.info(f"Posted tweet for {author['username']}: {tweet}")
return True
except Exception as e:
is_posting = False
logging.error(f"Failed to post tweet for {author['username']}: {e}")
return False
def main():
logging.info("***** X Poster Launched *****")
for author in AUTHORS:
posts = get_recent_posts_for_author(author["username"])
if not posts:
logging.info(f"No recent posts for {author['username']}, skipping")
continue
article_tweets = 0
for post in posts[:2]:
tweet = generate_article_tweet(author, post, author["persona"])
if post_tweet(author, tweet):
delete_used_post(post["title"])
article_tweets += 1
time.sleep(random.uniform(3600, 7200))
if article_tweets >= 2:
break
tweet = generate_personal_tweet(author, author["persona"])
post_tweet(author, tweet)
logging.info("X posting completed")
return random.randint(600, 1800)
if __name__ == "__main__":
sleep_time = main()
logging.info(f"Sleeping for {sleep_time} seconds")
time.sleep(sleep_time)