57 lines
2.0 KiB
Python
57 lines
2.0 KiB
Python
# rate_limiter.py
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class RateLimiter:
|
|
"""A rate limiter that enforces a maximum number of requests per time period."""
|
|
|
|
def __init__(self, max_requests: int, time_window: int):
|
|
"""
|
|
Initialize the rate limiter.
|
|
|
|
Args:
|
|
max_requests: Maximum number of requests allowed in the time window
|
|
time_window: Time window in seconds
|
|
"""
|
|
self.max_requests = max_requests
|
|
self.time_window = time_window
|
|
self.requests = []
|
|
self.last_cleanup = datetime.now()
|
|
|
|
def _cleanup_old_requests(self) -> None:
|
|
"""Remove requests older than the time window."""
|
|
now = datetime.now()
|
|
if (now - self.last_cleanup).total_seconds() > self.time_window:
|
|
cutoff = now - timedelta(seconds=self.time_window)
|
|
self.requests = [req for req in self.requests if req > cutoff]
|
|
self.last_cleanup = now
|
|
|
|
def wait_if_needed(self) -> Optional[float]:
|
|
"""
|
|
Wait if necessary to respect the rate limit.
|
|
|
|
Returns:
|
|
Optional[float]: The number of seconds waited, or None if no wait was needed
|
|
"""
|
|
self._cleanup_old_requests()
|
|
|
|
if len(self.requests) >= self.max_requests:
|
|
oldest_request = self.requests[0]
|
|
wait_time = (datetime.now() - oldest_request).total_seconds()
|
|
if wait_time < self.time_window:
|
|
sleep_time = self.time_window - wait_time
|
|
logger.info(f"Rate limit reached. Waiting {sleep_time:.2f} seconds...")
|
|
time.sleep(sleep_time)
|
|
return sleep_time
|
|
|
|
self.requests.append(datetime.now())
|
|
return None
|
|
|
|
def reset(self) -> None:
|
|
"""Reset the rate limiter's request history."""
|
|
self.requests = []
|
|
self.last_cleanup = datetime.now() |