You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

223 lines
8.4 KiB

import json
import logging
import random
import time
import sys
import signal
import os
from datetime import datetime, timedelta, timezone
from openai import OpenAI
import tweepy
from foodie_config import OPENAI_API_KEY, AUTHORS, LIGHT_TASK_MODEL
from foodie_utils import load_json_file
from foodie_x_config import X_API_CREDENTIALS, X_PERSONA_PROMPTS, AUTHOR_BACKGROUNDS_FILE, X_POST_COUNTS_FILE, RECENT_POSTS_FILE
from dotenv import load_dotenv
load_dotenv()
LOG_FILE = "/home/shane/foodie_automator/foodie_x_poster.log"
LOG_PRUNE_DAYS = 30
def setup_logging():
if os.path.exists(LOG_FILE):
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
cutoff = datetime.now(timezone.utc) - timedelta(days=LOG_PRUNE_DAYS)
pruned_lines = [line for line in lines if datetime.strptime(line[:19], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) > cutoff]
with open(LOG_FILE, 'w') as f:
f.writelines(pruned_lines)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)
logging.info("Logging initialized for foodie_x_poster.py")
setup_logging()
client = OpenAI(api_key=OPENAI_API_KEY)
try:
with open(AUTHOR_BACKGROUNDS_FILE, 'r') as f:
AUTHOR_BACKGROUNDS = json.load(f)
except Exception as e:
logging.error(f"Failed to load author_backgrounds.json: {e}")
sys.exit(1)
def load_post_counts():
counts = load_json_file(X_POST_COUNTS_FILE)
if not counts:
counts = [{"username": author["username"], "count": 0, "month": datetime.now(timezone.utc).strftime("%Y-%m")} for author in AUTHORS]
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
for entry in counts:
if entry["month"] != current_month:
entry["count"] = 0
entry["month"] = current_month
return counts
def save_post_counts(counts):
with open(X_POST_COUNTS_FILE, 'w') as f:
for item in counts:
json.dump(item, f)
f.write('\n')
logging.info(f"Saved post counts to {X_POST_COUNTS_FILE}")
is_posting = False
def signal_handler(sig, frame):
logging.info("Received termination signal, checking if safe to exit...")
if is_posting:
logging.info("Currently posting, will exit after completion.")
else:
logging.info("Safe to exit immediately.")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def get_recent_posts_for_author(username):
posts = load_json_file(RECENT_POSTS_FILE)
return [post for post in posts if post["author_username"] == username]
def delete_used_post(post_title):
posts = load_json_file(RECENT_POSTS_FILE)
posts = [post for post in posts if post["title"] != post_title]
with open(RECENT_POSTS_FILE, 'w') as f:
for item in posts:
json.dump(item, f)
f.write('\n')
logging.info(f"Deleted post '{post_title}' from recent_posts.json")
def generate_article_tweet(author, post, persona):
prompt = X_PERSONA_PROMPTS[persona].replace(
"For article tweets, include the article title, a quirky hook, and the URL.",
f"Generate an article tweet including the title '{post['title']}', a quirky hook, and the URL '{post['url']}'."
)
try:
response = client.chat.completions.create(
model=LIGHT_TASK_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": f"Generate tweet for {post['title']}."}
],
max_tokens=100,
temperature=0.9
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.info(f"Generated article tweet for {author['username']}: {tweet}")
return tweet
except Exception as e:
logging.error(f"Failed to generate article tweet for {author['username']}: {e}")
return f"This trend is fire! Check out {post['title']} at {post['url']} #Foodie"
def generate_personal_tweet(author, persona):
background = next((bg for bg in AUTHOR_BACKGROUNDS if bg["username"] == author["username"]), {})
if not background:
logging.warning(f"No background found for {author['username']}")
return f"Loving my gig at InsiderFoodie, dishing out food trends! #FoodieLife"
# Get DOB and calculate age
dob = author.get('dob', '1980-01-01')
current_year = datetime.now().year
birth_year = int(dob.split('-')[0])
age = current_year - birth_year
is_role_reflection = random.choice([True, False])
if is_role_reflection:
content = f"Reflect on your role at InsiderFoodie as {author['persona']}. Mention you're {age} years old."
else:
content = (
f"Share a personal story about your background, considering you were born on {dob} and are {age} years old. "
f"Hometown: {background['hometown']}, Cultural influences: {background['cultural_influences']}, "
f"Early memory: {background['early_memory']}, Career path: {background['career_path']}."
)
prompt = X_PERSONA_PROMPTS[persona].replace(
"For personal tweets, reflect on your role at InsiderFoodie or background.",
content
)
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": f"Generate personal tweet for {author['username']}."}
],
max_tokens=100,
temperature=0.9
)
tweet = response.choices[0].message.content.strip()
if len(tweet) > 280:
tweet = tweet[:277] + "..."
logging.info(f"Generated personal tweet for {author['username']}: {tweet}")
return tweet
except Exception as e:
logging.error(f"Failed to generate personal tweet for {author['username']}: {e}")
return f"Loving my gig at InsiderFoodie, dishing out food trends! #FoodieLife"
def post_tweet(author, tweet):
global is_posting
credentials = next((cred for cred in X_API_CREDENTIALS if cred["username"] == author["username"]), None)
if not credentials:
logging.error(f"No X credentials found for {author['username']}")
return False
post_counts = load_post_counts()
author_count = next((entry for entry in post_counts if entry["username"] == author["username"]), None)
if author_count["count"] >= 450:
logging.warning(f"Post limit reached for {author['username']} this month")
return False
try:
client = tweepy.Client(
consumer_key=credentials["api_key"],
consumer_secret=credentials["api_secret"],
access_token=credentials["access_token"],
access_token_secret=credentials["access_token_secret"]
)
is_posting = True
response = client.create_tweet(text=tweet)
is_posting = False
author_count["count"] += 1
save_post_counts(post_counts)
logging.info(f"Posted tweet for {author['username']}: {tweet}")
return True
except Exception as e:
is_posting = False
logging.error(f"Failed to post tweet for {author['username']}: {e}")
return False
def main():
logging.info("***** X Poster Launched *****")
for author in AUTHORS:
posts = get_recent_posts_for_author(author["username"])
if not posts:
logging.info(f"No recent posts for {author['username']}, skipping")
continue
article_tweets = 0
for post in posts[:2]:
tweet = generate_article_tweet(author, post, author["persona"])
if post_tweet(author, tweet):
delete_used_post(post["title"])
article_tweets += 1
time.sleep(random.uniform(3600, 7200))
if article_tweets >= 2:
break
tweet = generate_personal_tweet(author, author["persona"])
post_tweet(author, tweet)
logging.info("X posting completed")
return random.randint(600, 1800)
if __name__ == "__main__":
sleep_time = main()
logging.info(f"Sleeping for {sleep_time} seconds")
time.sleep(sleep_time)