Source code for core.mail_proxy.proxy

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: Apache-2.0
"""MailProxy: runtime layer with SMTP sending, background loops, and metrics.

MailProxy extends MailProxyBase with the full runtime for email dispatch:

Components:
    - SmtpSender: Connection pool, rate limiting, dispatch loop
    - ClientReporter: Delivery report sync to upstream services
    - AttachmentManager: Fetch attachments with two-tier cache
    - Prometheus MailMetrics: Counters and gauges for monitoring

Class Hierarchy:
    MailProxyBase (proxy_base.py): config, db, tables, endpoints, api/cli
        └── MailProxy (this class): +SmtpSender, +ClientReporter, +metrics
            └── MailProxy with MailProxy_EE mixin: +bounce detection (EE)

EE Hook Methods (CE stubs, overridden by MailProxy_EE):
    - __init_proxy_ee__(): Initialize EE state (bounce_receiver, _bounce_config)
    - _start_proxy_ee(): Start bounce poller background task
    - _stop_proxy_ee(): Stop bounce poller

Command Handling:
    handle_command() is the entry point for external control. Commands are
    routed to local handlers (runtime-dependent) or EndpointDispatcher (CRUD).

    Local commands (require runtime state):
        run now, listTenantsSyncStatus, addMessages, deleteMessages,
        cleanupMessages, deleteAccount

    Delegated commands (via EndpointDispatcher):
        addTenant, getTenant, listTenants, updateTenant, deleteTenant,
        addAccount, listAccounts, getAccount, ...

Background Tasks:
    - SmtpSender.dispatch_loop: Fetch pending messages, send via SMTP
    - ClientReporter.sync_loop: Report delivery events to upstream

Usage (recommended factory):
    proxy = await MailProxy.create(db_path="/data/mail.db", start_active=True)
    await proxy.stop()

Usage (via API property):
    proxy = MailProxy(config=ProxyConfig(db_path="/data/mail.db"))
    app = proxy.api  # Lifespan calls start()/stop() automatically

Module Constants:
    PRIORITY_LABELS: {0: "immediate", 1: "high", 2: "medium", 3: "low"}
    LABEL_TO_PRIORITY: Reverse mapping
    DEFAULT_PRIORITY: 2 (medium)
"""

from __future__ import annotations

import asyncio
import logging
import time
from collections.abc import Iterable
from datetime import datetime, timezone
from typing import Any

from tools.prometheus import MailMetrics

from .interface import EndpointDispatcher
from .proxy_base import MailProxyBase
from .proxy_config import ProxyConfig
from .reporting import DEFAULT_SYNC_INTERVAL, ClientReporter
from .smtp import (
    AttachmentManager,
    SmtpSender,
    TieredCache,
)
from .smtp.retry import RetryStrategy

PRIORITY_LABELS = {
    0: "immediate",
    1: "high",
    2: "medium",
    3: "low",
}
LABEL_TO_PRIORITY = {label: value for value, label in PRIORITY_LABELS.items()}
DEFAULT_PRIORITY = 2


[docs] class MailProxy(MailProxyBase): """Runtime layer: SMTP sending, background loops, metrics, command handling. Extends MailProxyBase with: - SmtpSender: SMTP connection pool, rate limiter, dispatch loop - ClientReporter: Delivery report sync loop - AttachmentManager: Fetch attachments with caching - MailMetrics: Prometheus counters and gauges Class Attributes: is_enterprise: True when EE modules installed (set dynamically) Instance Attributes (from base): config: ProxyConfig instance db: SqlDb with autodiscovered tables endpoints: Dict of endpoint instances Instance Attributes (runtime): smtp_sender: SmtpSender instance (pool, rate_limiter, dispatch) client_reporter: ClientReporter instance (sync loop) attachments: AttachmentManager instance metrics: MailMetrics instance logger: Logger for diagnostics Compatibility Properties (delegate to smtp_sender): pool: SMTP connection pool rate_limiter: Per-account rate limiter """ # ------------------------------------------------------------------------- # Class attributes # ------------------------------------------------------------------------- is_enterprise: bool = False """True when EE modules installed. Set dynamically in mail_proxy/__init__.py.""" # ------------------------------------------------------------------------- # Compatibility properties (delegate to smtp_sender) # ------------------------------------------------------------------------- @property def pool(self): """SMTP connection pool (delegate to smtp_sender.pool).""" return self.smtp_sender.pool @property def rate_limiter(self): """Per-account rate limiter (delegate to smtp_sender.rate_limiter).""" return self.smtp_sender.rate_limiter # ------------------------------------------------------------------------- # EE hook methods (CE stubs, overridden by MailProxy_EE mixin) # -------------------------------------------------------------------------
[docs] def __init_proxy_ee__(self) -> None: """Initialize EE state (bounce_receiver, _bounce_config). CE stub.""" pass
async def _start_proxy_ee(self) -> None: """Start EE background tasks (bounce poller). CE stub.""" pass async def _stop_proxy_ee(self) -> None: """Stop EE background tasks. CE stub.""" pass
[docs] def __init__( self, config: ProxyConfig | None = None, *, logger: logging.Logger | None = None, metrics: MailMetrics | None = None, ): """Initialize the mail dispatcher with ProxyConfig. Args: config: ProxyConfig instance with all configuration. If None, creates default config. logger: Custom logger instance. If None, uses default logger. metrics: Prometheus metrics collector. If None, creates new instance. """ import math cfg = config or ProxyConfig() # Initialize base class (config, db with autodiscovered tables, endpoints) MailProxyBase.__init__(self, config=cfg) self.default_host: str | None = None self.default_port: int | None = None self.default_user: str | None = None self.default_password: str | None = None self.default_use_tls: bool | None = False self.logger = logger or logging.getLogger("AsyncMailService") self.metrics = metrics or MailMetrics() # SmtpSender manages pool, rate_limiter, dispatch loop, email building self.smtp_sender = SmtpSender(self) self._queue_put_timeout = cfg.queue.put_timeout self._max_enqueue_batch = cfg.queue.max_enqueue_batch self._attachment_timeout = cfg.timing.attachment_timeout base_send_interval = max(0.05, float(cfg.timing.send_loop_interval)) self._smtp_batch_size = max(1, int(cfg.queue.message_size)) self._report_retention_seconds = cfg.timing.report_retention_seconds self._test_mode = bool(cfg.test_mode) self._stop = asyncio.Event() self._active = cfg.start_active self._send_loop_interval = math.inf if self._test_mode else base_send_interval self._result_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue( maxsize=cfg.queue.result_size ) # ClientReporter manages delivery report sync loop self.client_reporter = ClientReporter(self) self._client_sync_url = cfg.client_sync.url self._client_sync_user = cfg.client_sync.user self._client_sync_password = cfg.client_sync.password self._client_sync_token = cfg.client_sync.token self._report_delivery_callable = cfg.report_delivery_callable # Attachments and cache will be initialized in init() self._attachment_cache: TieredCache | None = None self.attachments: AttachmentManager | None = None priority_value, _ = self._normalise_priority(cfg.default_priority, DEFAULT_PRIORITY) self._default_priority = priority_value self._log_delivery_activity = bool(cfg.log_delivery_activity) self._retry_strategy = RetryStrategy( max_retries=cfg.retry.max_retries, delays=cfg.retry.delays, ) self._batch_size_per_account = 50 self._max_concurrent_sends = max(1, int(cfg.concurrency.max_sends)) self._max_concurrent_per_account = max(1, int(cfg.concurrency.max_per_account)) self._max_concurrent_attachments = max(1, int(cfg.concurrency.max_attachments)) self._attachment_semaphore: asyncio.Semaphore | None = None # Initialize endpoint dispatcher for command routing self._dispatcher = EndpointDispatcher(self.db, proxy=self) # Initialize EE components (overridden in MailProxy_EE mixin) self.__init_proxy_ee__()
[docs] @classmethod async def create(cls, **kwargs) -> MailProxy: """Create and initialize a MailProxy instance. This is the recommended way to create instances. It ensures proper async initialization is completed before returning a ready-to-use proxy. Args: **kwargs: All arguments accepted by MailProxy.__init__(). Returns: Fully initialized MailProxy instance with background tasks running. Example: proxy = await MailProxy.create(db_path="./mail.db") # Ready to use immediately - no need to call start() Note: For cases requiring delayed startup, use the traditional pattern:: proxy = MailProxy(db_path="./mail.db") # ... additional setup ... await proxy.start() """ instance = cls(**kwargs) await instance.start() return instance
# ------------------------------------------------------------------------- # Utility methods (static) # ------------------------------------------------------------------------- @staticmethod def _utc_now_iso() -> str: """Return the current UTC timestamp as ISO-8601 string. Returns: str: ISO-8601 formatted timestamp with 'Z' suffix. """ return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") @staticmethod def _utc_now_epoch() -> int: """Return the current UTC timestamp as seconds since Unix epoch. Returns: int: Unix timestamp in seconds. """ return int(datetime.now(timezone.utc).timestamp())
[docs] async def init(self) -> None: """Initialize persistence layer and attachment manager. Performs the following initialization steps: 1. Initialize database schema and run migrations (via MailProxyBase.init) 2. Load cache configuration (from env vars or config file) 3. Initialize attachment cache (memory and disk tiers) 4. Create the AttachmentManager """ await MailProxyBase.init(self) await self._refresh_queue_gauge() # Initialize metrics for all existing accounts so they appear in /metrics # even before any email activity occurs await self._init_account_metrics() # Initialize attachment cache if configured cache_cfg = self.config.cache if cache_cfg.enabled: self._attachment_cache = TieredCache( memory_max_mb=cache_cfg.memory_max_mb, memory_ttl_seconds=cache_cfg.memory_ttl_seconds, disk_dir=cache_cfg.disk_dir, disk_max_mb=cache_cfg.disk_max_mb, disk_ttl_seconds=cache_cfg.disk_ttl_seconds, disk_threshold_kb=cache_cfg.disk_threshold_kb, ) await self._attachment_cache.init() self.logger.info( f"Attachment cache initialized (memory={cache_cfg.memory_max_mb}MB, " f"disk={cache_cfg.disk_dir})" ) # Initialize attachment manager (tenant-specific config applied per-message) # storage_manager=None means only absolute paths work; tenant-specific managers are # created in _get_attachment_manager_for_message() with tenant's storage config self.attachments = AttachmentManager(storage_manager=None, cache=self._attachment_cache) # Initialize attachment fetch semaphore to limit memory pressure self._attachment_semaphore = asyncio.Semaphore(self._max_concurrent_attachments)
def _normalise_priority(self, value: Any, default: Any = DEFAULT_PRIORITY) -> tuple[int, str]: """Convert a priority value to internal numeric representation. Accepts integers (0-3), strings ("immediate", "high", "medium", "low"), or numeric strings and normalizes to (int, label) tuple. Args: value: Priority value to normalize. default: Fallback if value is invalid. Returns: Tuple of (priority_int, priority_label). """ if isinstance(default, str): fallback = LABEL_TO_PRIORITY.get(default.lower(), DEFAULT_PRIORITY) elif isinstance(default, (int, float)): try: fallback = int(default) except (TypeError, ValueError): fallback = DEFAULT_PRIORITY else: fallback = DEFAULT_PRIORITY fallback = max(0, min(fallback, max(PRIORITY_LABELS))) if value is None: priority = fallback elif isinstance(value, str): key = value.lower() if key in LABEL_TO_PRIORITY: priority = LABEL_TO_PRIORITY[key] else: try: priority = int(value) except ValueError: priority = fallback else: try: priority = int(value) except (TypeError, ValueError): priority = fallback priority = max(0, min(priority, max(PRIORITY_LABELS))) label = PRIORITY_LABELS.get(priority, PRIORITY_LABELS[fallback]) return priority, label @staticmethod def _summarise_addresses(value: Any) -> str: """Create a compact string summary of email addresses for logging. Args: value: String, list, or other iterable of email addresses. Returns: Comma-separated addresses, truncated to 200 chars if needed. """ if not value: return "-" if isinstance(value, str): items = [part.strip() for part in value.split(",") if part.strip()] elif isinstance(value, (list, tuple, set)): items = [str(item).strip() for item in value if item] else: items = [str(value).strip()] preview = ", ".join(item for item in items if item) if len(preview) > 200: return f"{preview[:197]}..." return preview or "-" # Commands that modify state and should be logged for audit trail _LOGGED_COMMANDS = frozenset( { "addMessages", "deleteMessages", "cleanupMessages", "addAccount", "deleteAccount", "addTenant", "updateTenant", "deleteTenant", "suspend", "activate", } ) # ------------------------------------------------------------------------- # Command handling (public API) # -------------------------------------------------------------------------
[docs] async def handle_command( self, cmd: str, payload: dict[str, Any] | None = None ) -> dict[str, Any]: """Execute an external control command. Dispatches the command to the appropriate handler method. Supported commands: - ``run now``: Trigger immediate dispatch cycle - ``suspend``: Pause the scheduler - ``activate``: Resume the scheduler - ``addAccount``, ``listAccounts``, ``deleteAccount``: SMTP account management - ``addMessages``, ``deleteMessages``, ``listMessages``: Message queue management - ``cleanupMessages``: Remove old reported messages - ``addTenant``, ``getTenant``, ``listTenants``, ``updateTenant``, ``deleteTenant``: Tenant management State-modifying commands are automatically logged to the command_log table for audit trail and replay capability. Args: cmd: Command name to execute. payload: Command-specific parameters. Returns: dict: Command result with ``ok`` status and command-specific data. """ payload = payload or {} # Log state-modifying commands for audit trail should_log = cmd in self._LOGGED_COMMANDS tenant_id = payload.get("tenant_id") if isinstance(payload, dict) else None result = await self._execute_command(cmd, payload) # Log after execution to capture result status if should_log: try: ok = result.get("ok", False) if isinstance(result, dict) else False await self.db.table("command_log").log_command( endpoint=cmd, payload=payload, tenant_id=tenant_id, response_status=200 if ok else 400, response_body=result, ) except Exception as e: self.logger.warning(f"Failed to log command {cmd}: {e}") return result
async def _execute_command(self, cmd: str, payload: dict[str, Any]) -> dict[str, Any]: """Internal command dispatcher. Commands requiring proxy runtime state are handled here directly. Other commands are delegated to the EndpointDispatcher. """ # Commands requiring proxy runtime state match cmd: case "run now": tenant_id = payload.get("tenant_id") if isinstance(payload, dict) else None self.smtp_sender.wake() self.client_reporter.wake(tenant_id) return {"ok": True} case "listTenantsSyncStatus": # Requires client_reporter._last_sync runtime state tenants = await self.db.table("tenants").list_all() now = time.time() result_tenants = [] for tenant in tenants: tenant_id = tenant.get("id") last_sync_ts = self.client_reporter._last_sync.get(tenant_id) next_sync_due = False in_dnd = False if last_sync_ts is not None: if last_sync_ts > now: in_dnd = True elif (now - last_sync_ts) >= DEFAULT_SYNC_INTERVAL: next_sync_due = True else: next_sync_due = True result_tenants.append( { "id": tenant_id, "name": tenant.get("name"), "active": tenant.get("active", True), "client_base_url": tenant.get("client_base_url"), "last_sync_ts": last_sync_ts, "next_sync_due": next_sync_due, "in_dnd": in_dnd, } ) return { "ok": True, "tenants": result_tenants, "sync_interval_seconds": DEFAULT_SYNC_INTERVAL, } # Commands with proxy-specific side effects (metrics refresh, result publishing) case "addMessages": return await self._handle_add_messages(payload) case "deleteMessages": tenant_id = payload.get("tenant_id") if isinstance(payload, dict) else None if not tenant_id: return {"ok": False, "error": "tenant_id is required"} ids = payload.get("ids") if isinstance(payload, dict) else [] removed, not_found, unauthorized = await self._delete_messages(ids or [], tenant_id) await self._refresh_queue_gauge() return { "ok": True, "removed": removed, "not_found": not_found, "unauthorized": unauthorized, } case "cleanupMessages": tenant_id = payload.get("tenant_id") if isinstance(payload, dict) else None if not tenant_id: return {"ok": False, "error": "tenant_id is required"} older_than = ( payload.get("older_than_seconds") if isinstance(payload, dict) else None ) removed = await self._cleanup_reported_messages(older_than, tenant_id) return {"ok": True, "removed": removed} case "deleteAccount": tenant_id = payload.get("tenant_id") if isinstance(payload, dict) else None if not tenant_id: return {"ok": False, "error": "tenant_id is required"} account_id = payload.get("id") try: await self.db.table("accounts").get(tenant_id, account_id) except ValueError: return {"ok": False, "error": "account not found or not owned by tenant"} await self.db.table("accounts").remove(tenant_id, account_id) await self._refresh_queue_gauge() return {"ok": True} case _: # Delegate to EndpointDispatcher for standard CRUD commands return await self._dispatcher.dispatch(cmd, payload) async def _handle_add_messages(self, payload: dict[str, Any]) -> dict[str, Any]: """Process the addMessages command to enqueue emails for delivery. Validates each message in the batch, checking required fields and account configuration. Invalid messages are rejected with detailed reasons and optionally persisted for error reporting. Args: payload: Dict with ``messages`` list and optional ``default_priority``. Returns: dict: Result with ``ok``, ``queued`` count, and ``rejected`` list. """ messages = payload.get("messages") if isinstance(payload, dict) else None if not isinstance(messages, list): return {"ok": False, "error": "messages must be a list"} if len(messages) > self._max_enqueue_batch: return { "ok": False, "error": f"Cannot enqueue more than {self._max_enqueue_batch} messages at once", } default_priority_value = 2 if "default_priority" in payload: default_priority_value, _ = self._normalise_priority(payload.get("default_priority"), 2) validated: list[dict[str, Any]] = [] rejected: list[dict[str, Any]] = [] rejected_for_sync: list[dict[str, Any]] = [] # Messages to report via proxy_sync now_ts = self._utc_now_epoch() for item in messages: if not isinstance(item, dict): rejected.append({"id": None, "reason": "invalid payload"}) continue is_valid, reason = await self._validate_enqueue_payload(item) if not is_valid: msg_id = item.get("id") rejected.append({"id": msg_id, "reason": reason}) if msg_id: # Insert rejected message into DB with error for proxy_sync notification priority, _ = self._normalise_priority( item.get("priority"), default_priority_value ) entry = { "id": msg_id, "tenant_id": item.get("tenant_id"), "account_id": item.get("account_id"), "priority": priority, "payload": item, "deferred_ts": None, "batch_code": item.get("batch_code"), } inserted_items = await self.db.table("messages").insert_batch([entry]) if inserted_items: pk = inserted_items[0]["pk"] await self.db.table("message_events").add_event( pk, "error", now_ts, description=reason or "validation error" ) rejected_for_sync.append( { "id": msg_id, "status": "error", "error": reason, "timestamp": self._utc_now_iso(), "account": item.get("account_id"), } ) continue priority, _ = self._normalise_priority(item.get("priority"), default_priority_value) item["priority"] = priority if "deferred_ts" in item and item["deferred_ts"] is None: item.pop("deferred_ts") validated.append(item) entries = [] inserted: list[dict[str, str]] = [] if validated: entries = [ { "id": msg["id"], "tenant_id": msg["tenant_id"], # Required for multi-tenant isolation "account_id": msg.get("account_id"), "priority": int(msg["priority"]), "payload": msg, "deferred_ts": msg.get("deferred_ts"), "batch_code": msg.get("batch_code"), } for msg in validated ] inserted = await self.db.table("messages").insert_batch(entries) # Messages not inserted were already sent (sent_ts IS NOT NULL) inserted_ids = {item["id"] for item in inserted} for msg in validated: if msg["id"] not in inserted_ids: rejected.append({"id": msg["id"], "reason": "already sent"}) await self._refresh_queue_gauge() # Notify client via proxy_sync for rejected messages if rejected_for_sync: for event in rejected_for_sync: await self._publish_result(event) queued_count = len(inserted) # ok is False only if ALL messages were rejected due to validation errors # (not for "already sent" which is a normal case) validation_failures = [r for r in rejected if r.get("reason") != "already sent"] ok = queued_count > 0 or len(validation_failures) == 0 result: dict[str, Any] = { "ok": ok, "queued": queued_count, "rejected": rejected, } return result async def _delete_messages( self, message_ids: Iterable[str], tenant_id: str ) -> tuple[int, list[str], list[str]]: """Remove messages from the queue by their IDs, with tenant validation. Args: message_ids: Iterable of message IDs to delete. tenant_id: Tenant ID - only messages belonging to this tenant will be deleted. Returns: Tuple of (count of removed messages, list of IDs not found, list of unauthorized IDs). """ ids = {mid for mid in message_ids if mid} if not ids: return 0, [], [] # Get messages that belong to this tenant (via account relationship) authorized_ids = await self.db.table("messages").get_ids_for_tenant(list(ids), tenant_id) removed = 0 missing: list[str] = [] unauthorized: list[str] = [] for mid in sorted(ids): if mid not in authorized_ids: unauthorized.append(mid) continue if await self.db.table("messages").delete(mid, tenant_id): removed += 1 else: missing.append(mid) return removed, missing, unauthorized async def _cleanup_reported_messages( self, older_than_seconds: int | None = None, tenant_id: str | None = None ) -> int: """Remove reported messages older than the specified threshold. Args: older_than_seconds: Remove messages reported more than this many seconds ago. If None, uses the configured retention period. tenant_id: If provided, only cleanup messages belonging to this tenant. Returns: Number of messages removed. """ if older_than_seconds is None: retention = self._report_retention_seconds else: retention = max(0, int(older_than_seconds)) threshold = self._utc_now_epoch() - retention if tenant_id: removed = await self.db.table("messages").remove_fully_reported_before_for_tenant( threshold, tenant_id ) else: removed = await self.db.table("messages").remove_fully_reported_before(threshold) if removed: await self._refresh_queue_gauge() return removed async def _validate_enqueue_payload(self, payload: dict[str, Any]) -> tuple[bool, str | None]: """Validate a message payload before enqueueing. Checks for required fields (id, tenant_id, account_id, from, to, subject) and verifies that the specified SMTP account exists for the tenant. Args: payload: Message payload dict to validate. Returns: Tuple of (is_valid, error_reason). error_reason is None if valid. """ msg_id = payload.get("id") if not msg_id: return False, "missing id" tenant_id = payload.get("tenant_id") if not tenant_id: return False, "missing tenant_id" account_id = payload.get("account_id") if not account_id: return False, "missing account_id" payload.setdefault("priority", 2) sender = payload.get("from") if not sender: return False, "missing from" recipients = payload.get("to") if not recipients: return False, "missing to" if isinstance(recipients, (list, tuple, set)): if not any(recipients): return False, "missing to" subject = payload.get("subject") if not subject: return False, "missing subject" # Verify account exists and belongs to tenant try: await self.db.table("accounts").get(tenant_id, account_id) except Exception: return False, "account not found for tenant" return True, None # ------------------------------------------------------------------------- # Lifecycle (start/stop) # -------------------------------------------------------------------------
[docs] async def start(self) -> None: """Start the background scheduler and maintenance tasks. Initializes the persistence layer and spawns background tasks for: - SMTP dispatch via smtp_sender - Client report loop: sends delivery reports to upstream services """ self.logger.debug("Starting MailProxy...") await self.init() self._stop.clear() self.logger.debug("Starting SMTP sender...") await self.smtp_sender.start() self.logger.debug("Starting client reporter...") await self.client_reporter.start() # Start EE components (overridden in MailProxy_EE mixin) await self._start_proxy_ee() self.logger.debug("All background tasks created")
[docs] async def stop(self) -> None: """Stop all background tasks gracefully. Signals all running loops to terminate and waits for them to complete. Outstanding operations are allowed to finish before returning. """ self._stop.set() await self.smtp_sender.stop() await self.client_reporter.stop() # Stop EE components (overridden in MailProxy_EE mixin) await self._stop_proxy_ee() await self.db.adapter.close()
# ------------------------------------------------------------------------- # Messaging and metrics (internal) # -------------------------------------------------------------------------
[docs] async def results(self): """Async generator that yields delivery result events. Yields: dict: Delivery event with message ID, status, timestamp, and error info. """ while True: event = await self._result_queue.get() yield event
async def _put_with_backpressure( self, queue: asyncio.Queue[Any], item: Any, queue_name: str ) -> None: """Push an item to a queue with timeout-based backpressure. Args: queue: Target asyncio.Queue. item: Item to enqueue. queue_name: Name for logging purposes. """ try: await asyncio.wait_for(queue.put(item), timeout=self._queue_put_timeout) except asyncio.TimeoutError: # pragma: no cover - defensive self.logger.error( "Timed out while enqueuing item into %s queue; dropping item", queue_name ) def _log_delivery_event(self, event: dict[str, Any]) -> None: """Log a delivery outcome when verbose logging is enabled. Args: event: Delivery event dict with status, id, account, and error info. """ if not self._log_delivery_activity: return status = (event.get("status") or "unknown").lower() msg_id = event.get("id") or "-" account = event.get("account") or event.get("account_id") or "default" match status: case "sent": self.logger.info("Delivery succeeded for message %s (account=%s)", msg_id, account) case "deferred": deferred_until = event.get("deferred_until") if isinstance(deferred_until, (int, float)): deferred_repr = ( datetime.fromtimestamp(float(deferred_until), timezone.utc) .isoformat() .replace("+00:00", "Z") ) else: deferred_repr = deferred_until or "-" self.logger.info( "Delivery deferred for message %s (account=%s) until %s", msg_id, account, deferred_repr, ) case "error": reason = event.get("error") or event.get("error_code") or "unknown error" self.logger.warning( "Delivery failed for message %s (account=%s): %s", msg_id, account, reason, ) case _: self.logger.info( "Delivery event for message %s (account=%s): %s", msg_id, account, status ) async def _publish_result(self, event: dict[str, Any]) -> None: """Publish a delivery event to the result queue. Args: event: Delivery event dict to publish. """ self._log_delivery_event(event) await self._put_with_backpressure(self._result_queue, event, "result") async def _refresh_queue_gauge(self) -> None: """Update the Prometheus gauge for pending message count. Queries the database for active (unsent, unreported) messages and updates the metrics collector. """ try: count = await self.db.table("messages").count_active() except Exception: # pragma: no cover - defensive self.logger.exception("Failed to refresh queue gauge") return self.metrics.set_pending(count) async def _init_account_metrics(self) -> None: """Initialize Prometheus counters for all existing accounts. Prometheus counters with labels only appear in output after they have been incremented at least once. This method ensures metrics appear in /metrics output even before any email activity by initializing all counters for each configured SMTP account. Always initializes at least the "default" account to ensure basic metrics are visible even when no accounts are configured. """ try: # Always initialize "default" account for basic metrics visibility self.metrics.init_account() # Uses defaults for all labels # Also initialize pending gauge to 0 self.metrics.set_pending(0) # Get all tenants to map tenant_id -> tenant_name tenants = await self.db.table("tenants").list_all() tenant_names = {t["id"]: t.get("name", t["id"]) for t in tenants} accounts = await self.db.table("accounts").list_all() for account in accounts: tenant_id = account.get("tenant_id", "default") account_id = account.get("id", "default") self.metrics.init_account( tenant_id=tenant_id, tenant_name=tenant_names.get(tenant_id, tenant_id), account_id=account_id, account_name=account_id, # No separate name field for accounts ) self.logger.debug("Initialized metrics for %d accounts", len(accounts) + 1) except Exception: # pragma: no cover - defensive self.logger.exception("Failed to initialize account metrics")