|
|
#!/usr/bin/env python3 |
|
|
import logging |
|
|
import pytz |
|
|
|
|
|
logging.basicConfig( |
|
|
filename='/home/shane/foodie_automator/logs/check_x_capacity.log', |
|
|
level=logging.DEBUG, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logging.info("TEST: Logging is configured and working.") |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
from datetime import datetime, timezone |
|
|
from foodie_utils import ( |
|
|
AUTHORS, check_author_rate_limit, load_json_file, |
|
|
get_x_rate_limit_status, update_system_activity, is_any_script_running, |
|
|
save_json_file |
|
|
) |
|
|
import time |
|
|
import sys |
|
|
import os |
|
|
import smtplib |
|
|
from email.mime.text import MIMEText |
|
|
from email.mime.multipart import MIMEMultipart |
|
|
from foodie_config import EMAIL_CONFIG |
|
|
|
|
|
# File to track sent notifications |
|
|
NOTIFICATION_FILE = '/home/shane/foodie_automator/notification_tracking.json' |
|
|
|
|
|
def load_notification_tracking(): |
|
|
"""Load notification tracking data as a dict. If not a dict, reset to {}.""" |
|
|
data = load_json_file(NOTIFICATION_FILE, default={}) |
|
|
if not isinstance(data, dict): |
|
|
logging.warning(f"notification_tracking.json was not a dict, resetting to empty dict.") |
|
|
data = {} |
|
|
save_json_file(NOTIFICATION_FILE, data) |
|
|
return data |
|
|
|
|
|
def save_notification_tracking(tracking_data): |
|
|
"""Save notification tracking data as a dict.""" |
|
|
if not isinstance(tracking_data, dict): |
|
|
logging.warning(f"Attempted to save non-dict to notification_tracking.json, resetting to empty dict.") |
|
|
tracking_data = {} |
|
|
save_json_file(NOTIFICATION_FILE, tracking_data) |
|
|
|
|
|
def should_send_notification(username, reset_time): |
|
|
"""Check if we should send a notification for this author.""" |
|
|
tracking = load_notification_tracking() |
|
|
author_data = tracking.get(username, {}) |
|
|
reset_time_str = str(reset_time) |
|
|
logging.debug(f"[DEBUG] should_send_notification: username={username}, reset_time_str={reset_time_str}, author_data={author_data}") |
|
|
if not author_data or str(author_data.get('reset_time')) != reset_time_str: |
|
|
logging.info(f"[DEBUG] Sending notification for {username}. Previous reset_time: {author_data.get('reset_time')}, New reset_time: {reset_time_str}") |
|
|
tracking[username] = { |
|
|
'last_notification': datetime.now(timezone.utc).isoformat(), |
|
|
'reset_time': reset_time_str |
|
|
} |
|
|
save_notification_tracking(tracking) |
|
|
return True |
|
|
logging.info(f"[DEBUG] Skipping notification for {username}. Already notified for reset_time: {reset_time_str}") |
|
|
return False |
|
|
|
|
|
def send_capacity_alert(username, remaining, reset_time): |
|
|
"""Send email alert when an author's tweet capacity is full.""" |
|
|
# Always use string for reset_time |
|
|
reset_time_str = str(reset_time) |
|
|
logging.debug(f"[DEBUG] send_capacity_alert: username={username}, remaining={remaining}, reset_time_str={reset_time_str}") |
|
|
if not should_send_notification(username, reset_time_str): |
|
|
logger.info(f"Skipping duplicate notification for {username}") |
|
|
return |
|
|
try: |
|
|
msg = MIMEMultipart() |
|
|
msg['From'] = EMAIL_CONFIG['from_email'] |
|
|
msg['To'] = EMAIL_CONFIG['to_email'] |
|
|
msg['Subject'] = f"⚠️ X Capacity Alert: {username}" |
|
|
body = f""" |
|
|
X Tweet Capacity Alert! |
|
|
|
|
|
Username: {username} |
|
|
Time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')} |
|
|
Remaining Tweets: {remaining}/17 |
|
|
Reset Time: {reset_time_str} |
|
|
|
|
|
This author has reached their daily tweet limit. |
|
|
The quota will reset at the time shown above. |
|
|
|
|
|
This is an automated alert from your foodie_automator system. |
|
|
""" |
|
|
msg.attach(MIMEText(body, 'plain')) |
|
|
with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server: |
|
|
server.starttls() |
|
|
server.login(EMAIL_CONFIG['smtp_username'], EMAIL_CONFIG['smtp_password']) |
|
|
server.send_message(msg) |
|
|
logger.info(f"Sent capacity alert email for {username}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to send capacity alert email: {e}") |
|
|
|
|
|
def display_author_status(author): |
|
|
"""Display detailed status for a single author.""" |
|
|
username = author['username'] |
|
|
can_post, remaining, reset = check_author_rate_limit(author) |
|
|
reset_time_utc = datetime.fromtimestamp(reset, tz=timezone.utc) |
|
|
reset_time_str = reset_time_utc.strftime('%Y-%m-%d %H:%M:%S UTC') |
|
|
# Convert to Sydney time |
|
|
try: |
|
|
sydney_tz = pytz.timezone('Australia/Sydney') |
|
|
reset_time_sydney = reset_time_utc.astimezone(sydney_tz) |
|
|
reset_time_sydney_str = reset_time_sydney.strftime('%Y-%m-%d %H:%M:%S %Z') |
|
|
except Exception as e: |
|
|
reset_time_sydney_str = 'N/A' |
|
|
|
|
|
status = "✅" if can_post else "❌" |
|
|
print(f"\n{status} {username}:") |
|
|
print(f" • Remaining tweets: {remaining}/17") |
|
|
print(f" • Reset time (UTC): {reset_time_str}") |
|
|
print(f" • Reset time (Sydney): {reset_time_sydney_str}") |
|
|
print(f" • Can post: {'Yes' if can_post else 'No'}") |
|
|
|
|
|
# Send alert if capacity is full |
|
|
if remaining == 0: |
|
|
send_capacity_alert(username, remaining, reset_time_str) |
|
|
|
|
|
# Show API status for verification |
|
|
if not is_any_script_running(): |
|
|
api_remaining, api_reset = get_x_rate_limit_status(author) |
|
|
if api_remaining is not None: |
|
|
api_reset_time = datetime.fromtimestamp(api_reset, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') |
|
|
print(f" • API Status: {api_remaining} remaining, resets at {api_reset_time}") |
|
|
|
|
|
def display_total_capacity(): |
|
|
"""Display total capacity across all authors.""" |
|
|
total_capacity = len(AUTHORS) * 17 |
|
|
total_used = 0 |
|
|
available_authors = 0 |
|
|
|
|
|
print("\n=== X Posting Capacity Status ===") |
|
|
print(f"Total daily capacity: {total_capacity} tweets ({len(AUTHORS)} authors × 17 tweets)") |
|
|
print("\nAuthor Status:") |
|
|
|
|
|
for author in AUTHORS: |
|
|
can_post, remaining, _ = check_author_rate_limit(author) |
|
|
# Only check API if no scripts are running |
|
|
if not is_any_script_running(): |
|
|
api_remaining, _ = get_x_rate_limit_status(author) |
|
|
if api_remaining is not None: |
|
|
remaining = api_remaining |
|
|
can_post = remaining > 0 |
|
|
|
|
|
used = 17 - remaining |
|
|
total_used += used |
|
|
if can_post: |
|
|
available_authors += 1 |
|
|
display_author_status(author) |
|
|
|
|
|
print("\n=== Summary ===") |
|
|
print(f"Total tweets used today: {total_used}") |
|
|
print(f"Total tweets remaining: {total_capacity - total_used}") |
|
|
print(f"Authors available to post: {available_authors}/{len(AUTHORS)}") |
|
|
|
|
|
# Calculate percentage used |
|
|
percent_used = (total_used / total_capacity) * 100 |
|
|
print(f"Capacity used: {percent_used:.1f}%") |
|
|
|
|
|
if percent_used > 80: |
|
|
print("\n⚠️ Warning: High capacity usage! Consider adding more authors.") |
|
|
elif percent_used > 60: |
|
|
print("\nℹ️ Note: Moderate capacity usage. Monitor usage patterns.") |
|
|
|
|
|
def main(): |
|
|
try: |
|
|
# Update system activity |
|
|
update_system_activity("check_x_capacity", "running", os.getpid()) |
|
|
|
|
|
# Display capacity status |
|
|
display_total_capacity() |
|
|
|
|
|
# Update system activity |
|
|
update_system_activity("check_x_capacity", "stopped") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\nScript interrupted by user") |
|
|
update_system_activity("check_x_capacity", "stopped") |
|
|
sys.exit(0) |
|
|
except Exception as e: |
|
|
logger.error(f"Error: {e}") |
|
|
update_system_activity("check_x_capacity", "stopped") |
|
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |