Source code for core.mail_proxy.smtp.rate_limiter

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: Apache-2.0
"""In-memory sliding-window rate limiter.

This module implements per-account rate limiting with configurable limits
at minute, hour, and day granularity. The limiter uses in-memory storage
to track send history, enabling fast rate limiting decisions.

The sliding window approach ensures fair distribution of sends over time
rather than allowing burst behavior at window boundaries.

To handle parallel dispatch correctly, the limiter tracks "in-flight" sends
in memory. This ensures that concurrent sends are counted even before they
complete and are logged.

Note: Send history is lost on service restart. This is acceptable for rate
limiting purposes as it only allows a brief window of potentially higher
sending rates after restart.

Example:
    Using the rate limiter::

        rate_limiter = RateLimiter()
        deferred_until, should_reject = await rate_limiter.check_and_plan(account)
        if deferred_until:
            if should_reject:
                # Message should be rejected with rate limit error
                return {"error": "rate_limit_exceeded"}
            else:
                # Message should be deferred until this timestamp
                await persistence.set_deferred(msg_id, deferred_until)
        else:
            # Safe to send now
            try:
                await send_message(msg)
                await rate_limiter.log_send(account_id)
            except Exception:
                await rate_limiter.release_slot(account_id)
                raise
"""

from __future__ import annotations

import asyncio
import logging
import time
from collections import deque
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from .sender import SmtpSender

logger = logging.getLogger(__name__)

# Time windows in seconds
WINDOW_MINUTE = 60
WINDOW_HOUR = 3600
WINDOW_DAY = 86400


[docs] class RateLimiter: """Per-account sliding-window rate limiter with in-memory storage. Enforces configurable send rate limits at three granularities: - Per minute - Per hour - Per day When any limit is exceeded, the limiter calculates the earliest timestamp at which the message can be safely sent without violating the limit. Tracks in-flight sends in memory to handle parallel dispatch correctly. """
[docs] def __init__(self, smtp_sender: SmtpSender | None = None) -> None: """Initialize the rate limiter. Args: smtp_sender: Parent SmtpSender instance for accessing proxy resources. """ self.smtp_sender = smtp_sender # Per-account send history: account_id -> deque of timestamps self._send_history: dict[str, deque[int]] = {} # In-flight sends: account_id -> count self._in_flight: dict[str, int] = {} self._lock = asyncio.Lock()
def _count_since(self, account_id: str, since_ts: int) -> int: """Count sends since the given timestamp for an account. Args: account_id: The account identifier. since_ts: Unix timestamp to count from. Returns: Number of sends after since_ts. """ history = self._send_history.get(account_id) if not history: return 0 # Count entries > since_ts (deque is sorted oldest-first) return sum(1 for ts in history if ts > since_ts) def _cleanup_old_entries(self, account_id: str, now: int) -> None: """Remove entries older than the largest window (1 day). Args: account_id: The account identifier. now: Current unix timestamp. """ history = self._send_history.get(account_id) if not history: return cutoff = now - WINDOW_DAY # Remove old entries from left side (oldest first) while history and history[0] <= cutoff: history.popleft()
[docs] async def check_and_plan(self, account: dict[str, Any]) -> tuple[int | None, bool]: """Check rate limits and calculate deferral timestamp if exceeded. Evaluates the account's configured rate limits against recent send history plus in-flight sends. If any limit is exceeded, returns a tuple indicating the deferral time and whether to reject. If the check passes, reserves a slot by incrementing the in-flight counter. The caller MUST call either log_send() on success or release_slot() on failure to release the reservation. Limits are checked in order of granularity (minute, hour, day) and the first exceeded limit determines the deferral time. Args: account: Account configuration dictionary containing: - id: The account identifier (required). - limit_per_minute: Max sends per minute (optional). - limit_per_hour: Max sends per hour (optional). - limit_per_day: Max sends per day (optional). - limit_behavior: "defer" (default) or "reject". Returns: Tuple of (deferred_until, should_reject): - deferred_until: Unix timestamp until which message should be deferred, or None if sending is permitted immediately. - should_reject: True if limit_behavior is "reject" and limit was exceeded, meaning message should be rejected with error. """ account_id = account["id"] now = int(time.time()) behavior = account.get("limit_behavior", "defer") def lim(key: str) -> int | None: """Extract a positive integer limit or None.""" v = account.get(key) if v is None: return None return int(v) if int(v) > 0 else None per_min = lim("limit_per_minute") per_hour = lim("limit_per_hour") per_day = lim("limit_per_day") # No limits configured - allow immediately if per_min is None and per_hour is None and per_day is None: return (None, False) async with self._lock: # Cleanup old entries first self._cleanup_old_entries(account_id, now) in_flight = self._in_flight.get(account_id, 0) logger.debug( "Rate check for %s: in_flight=%d, per_min=%s, per_hour=%s, per_day=%s", account_id, in_flight, per_min, per_hour, per_day, ) if per_min is not None: c = self._count_since(account_id, now - WINDOW_MINUTE) logger.debug( "Rate check %s: count=%d + in_flight=%d vs limit=%d", account_id, c, in_flight, per_min, ) if c + in_flight >= per_min: logger.info( "Rate limit (minute) hit for %s: %d+%d >= %d, behavior=%s", account_id, c, in_flight, per_min, behavior, ) return ((now // WINDOW_MINUTE + 1) * WINDOW_MINUTE, behavior == "reject") if per_hour is not None: c = self._count_since(account_id, now - WINDOW_HOUR) if c + in_flight >= per_hour: logger.info( "Rate limit (hour) hit for %s: %d+%d >= %d", account_id, c, in_flight, per_hour, ) return ((now // WINDOW_HOUR + 1) * WINDOW_HOUR, behavior == "reject") if per_day is not None: c = self._count_since(account_id, now - WINDOW_DAY) if c + in_flight >= per_day: logger.info( "Rate limit (day) hit for %s: %d+%d >= %d", account_id, c, in_flight, per_day, ) return ((now // WINDOW_DAY + 1) * WINDOW_DAY, behavior == "reject") # Reserve a slot for this send self._in_flight[account_id] = in_flight + 1 logger.debug("Rate check %s: ALLOWED, in_flight now %d", account_id, in_flight + 1) return (None, False)
[docs] async def log_send(self, account_id: str) -> None: """Record a successful send for rate limiting purposes. Must be called after each successful message delivery to maintain accurate rate limit tracking. Releases the in-flight slot reserved by check_and_plan(). Args: account_id: The SMTP account identifier that sent the message. """ now = int(time.time()) async with self._lock: # Release in-flight slot if account_id in self._in_flight and self._in_flight[account_id] > 0: self._in_flight[account_id] -= 1 # Add to send history if account_id not in self._send_history: self._send_history[account_id] = deque() self._send_history[account_id].append(now)
[docs] async def release_slot(self, account_id: str) -> None: """Release an in-flight slot without logging a send. Call this when a send fails after check_and_plan() returned None. This ensures the in-flight counter stays accurate. Args: account_id: The SMTP account identifier. """ async with self._lock: if account_id in self._in_flight and self._in_flight[account_id] > 0: self._in_flight[account_id] -= 1
[docs] async def purge_for_account(self, account_id: str) -> int: """Clear all rate limit data for an account. Call this when an account is deleted. Args: account_id: The SMTP account identifier. Returns: Number of entries cleared. """ async with self._lock: count = 0 if account_id in self._send_history: count = len(self._send_history[account_id]) del self._send_history[account_id] if account_id in self._in_flight: del self._in_flight[account_id] return count
[docs] def clear(self) -> None: """Clear all rate limit data. Used for testing.""" self._send_history.clear() self._in_flight.clear()
__all__ = ["RateLimiter"]