#!/usr/bin/env python3 import logging logging.basicConfig( filename='/home/shane/foodie_automator/logs/check_x_capacity.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s' ) logging.info("TEST: Logging is configured and working.") from datetime import datetime, timezone from foodie_utils import ( AUTHORS, check_author_rate_limit, load_json_file, get_x_rate_limit_status, update_system_activity, is_any_script_running, save_json_file ) import time import sys import os import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from foodie_config import EMAIL_CONFIG # File to track sent notifications NOTIFICATION_FILE = '/home/shane/foodie_automator/notification_tracking.json' def load_notification_tracking(): """Load notification tracking data as a dict. If not a dict, reset to {}.""" data = load_json_file(NOTIFICATION_FILE, default={}) if not isinstance(data, dict): logging.warning(f"notification_tracking.json was not a dict, resetting to empty dict.") data = {} save_json_file(NOTIFICATION_FILE, data) return data def save_notification_tracking(tracking_data): """Save notification tracking data as a dict.""" if not isinstance(tracking_data, dict): logging.warning(f"Attempted to save non-dict to notification_tracking.json, resetting to empty dict.") tracking_data = {} save_json_file(NOTIFICATION_FILE, tracking_data) def should_send_notification(username, reset_time): """Check if we should send a notification for this author.""" tracking = load_notification_tracking() author_data = tracking.get(username, {}) reset_time_str = str(reset_time) logging.debug(f"[DEBUG] should_send_notification: username={username}, reset_time_str={reset_time_str}, author_data={author_data}") if not author_data or str(author_data.get('reset_time')) != reset_time_str: logging.info(f"[DEBUG] Sending notification for {username}. Previous reset_time: {author_data.get('reset_time')}, New reset_time: {reset_time_str}") tracking[username] = { 'last_notification': datetime.now(timezone.utc).isoformat(), 'reset_time': reset_time_str } save_notification_tracking(tracking) return True logging.info(f"[DEBUG] Skipping notification for {username}. Already notified for reset_time: {reset_time_str}") return False def send_capacity_alert(username, remaining, reset_time): """Send email alert when an author's tweet capacity is full.""" # Always use string for reset_time reset_time_str = str(reset_time) logging.debug(f"[DEBUG] send_capacity_alert: username={username}, remaining={remaining}, reset_time_str={reset_time_str}") if not should_send_notification(username, reset_time_str): logger.info(f"Skipping duplicate notification for {username}") return try: msg = MIMEMultipart() msg['From'] = EMAIL_CONFIG['from_email'] msg['To'] = EMAIL_CONFIG['to_email'] msg['Subject'] = f"⚠️ X Capacity Alert: {username}" body = f""" X Tweet Capacity Alert! Username: {username} Time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')} Remaining Tweets: {remaining}/17 Reset Time: {reset_time_str} This author has reached their daily tweet limit. The quota will reset at the time shown above. This is an automated alert from your foodie_automator system. """ msg.attach(MIMEText(body, 'plain')) with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server: server.starttls() server.login(EMAIL_CONFIG['smtp_username'], EMAIL_CONFIG['smtp_password']) server.send_message(msg) logger.info(f"Sent capacity alert email for {username}") except Exception as e: logger.error(f"Failed to send capacity alert email: {e}") def display_author_status(author): """Display detailed status for a single author.""" username = author['username'] can_post, remaining, reset = check_author_rate_limit(author) reset_time = datetime.fromtimestamp(reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') # Only check API if no scripts are running if not is_any_script_running(): api_remaining, api_reset = get_x_rate_limit_status(author) if api_remaining is not None: api_reset_time = datetime.fromtimestamp(api_reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') # Use API values as primary display remaining = api_remaining reset_time = api_reset_time can_post = remaining > 0 status = "✅" if can_post else "❌" print(f"\n{status} {username}:") print(f" • Remaining tweets: {remaining}/17") print(f" • Reset time: {reset_time}") print(f" • Can post: {'Yes' if can_post else 'No'}") # Send alert if capacity is full if remaining == 0: send_capacity_alert(username, remaining, reset_time) # Show API status for verification if not is_any_script_running(): api_remaining, api_reset = get_x_rate_limit_status(author) if api_remaining is not None: api_reset_time = datetime.fromtimestamp(api_reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') print(f" • API Status: {api_remaining} remaining, resets at {api_reset_time}") def display_total_capacity(): """Display total capacity across all authors.""" total_capacity = len(AUTHORS) * 17 total_used = 0 available_authors = 0 print("\n=== X Posting Capacity Status ===") print(f"Total daily capacity: {total_capacity} tweets ({len(AUTHORS)} authors × 17 tweets)") print("\nAuthor Status:") for author in AUTHORS: can_post, remaining, _ = check_author_rate_limit(author) # Only check API if no scripts are running if not is_any_script_running(): api_remaining, _ = get_x_rate_limit_status(author) if api_remaining is not None: remaining = api_remaining can_post = remaining > 0 used = 17 - remaining total_used += used if can_post: available_authors += 1 display_author_status(author) print("\n=== Summary ===") print(f"Total tweets used today: {total_used}") print(f"Total tweets remaining: {total_capacity - total_used}") print(f"Authors available to post: {available_authors}/{len(AUTHORS)}") # Calculate percentage used percent_used = (total_used / total_capacity) * 100 print(f"Capacity used: {percent_used:.1f}%") if percent_used > 80: print("\n⚠️ Warning: High capacity usage! Consider adding more authors.") elif percent_used > 60: print("\nℹ️ Note: Moderate capacity usage. Monitor usage patterns.") def main(): try: # Update system activity update_system_activity("check_x_capacity", "running", os.getpid()) # Display capacity status display_total_capacity() # Update system activity update_system_activity("check_x_capacity", "stopped") except KeyboardInterrupt: print("\nScript interrupted by user") update_system_activity("check_x_capacity", "stopped") sys.exit(0) except Exception as e: logger.error(f"Error: {e}") update_system_activity("check_x_capacity", "stopped") sys.exit(1) if __name__ == "__main__": main()