Python Modules

This document provides API reference for the genro-mail-proxy Python modules.

The package is organized into four main namespaces:

  • core.mail_proxy - Core functionality (Apache 2.0)

  • enterprise.mail_proxy - Enterprise features (BSL 1.1)

  • sql - Database abstraction layer

  • storage - Storage node management

  • tools - Shared utilities

Core Package (Apache 2.0)

Main Components

Email dispatcher microservice with multi-tenant support.

This package provides an asynchronous email dispatch service with queueing, rate limiting, retry logic, and multi-backend attachments.

Components:

MailProxy: Main service class with SMTP sender and background loops. MailProxyBase: Foundation layer with database and endpoint discovery. ProxyConfig: Hierarchical configuration dataclasses.

Features:
  • Multi-tenant isolation with per-tenant configuration

  • Priority-based message queuing (immediate/high/medium/low)

  • Per-account rate limiting (minute/hour/day)

  • Automatic retry with exponential backoff

  • Attachment fetching (HTTP endpoint, URL, base64, filesystem)

  • Delivery report callbacks to client applications

  • Prometheus metrics for monitoring

  • FastAPI REST API for control and message submission

  • SQLite/PostgreSQL persistence

Example

Create and run the mail service:

from core.mail_proxy.proxy import MailProxy
from core.mail_proxy.interface import create_app

proxy = MailProxy(db_path="/data/mail.db")
app = create_app(proxy, api_token="secret")

# Or via CLI
# mail-proxy serve --port 8000

Note

Enterprise Edition (EE) extends this package with bounce detection, PEC (certified email), and per-tenant API keys when the enterprise package is installed.

core.mail_proxy.main()[source]

CLI entry point. Creates a MailProxy and runs the CLI.

Resolves the current instance from context (via mail-proxy use) and configures the MailProxy with the correct database path.

Return type:

None

MailProxy: runtime layer with SMTP sending, background loops, and metrics.

MailProxy extends MailProxyBase with the full runtime for email dispatch:

Components:
  • SmtpSender: Connection pool, rate limiting, dispatch loop

  • ClientReporter: Delivery report sync to upstream services

  • AttachmentManager: Fetch attachments with two-tier cache

  • Prometheus MailMetrics: Counters and gauges for monitoring

Class Hierarchy:
MailProxyBase (proxy_base.py): config, db, tables, endpoints, api/cli
└── MailProxy (this class): +SmtpSender, +ClientReporter, +metrics

└── MailProxy with MailProxy_EE mixin: +bounce detection (EE)

EE Hook Methods (CE stubs, overridden by MailProxy_EE):
  • __init_proxy_ee__(): Initialize EE state (bounce_receiver, _bounce_config)

  • _start_proxy_ee(): Start bounce poller background task

  • _stop_proxy_ee(): Stop bounce poller

Command Handling:

handle_command() is the entry point for external control. Commands are routed to local handlers (runtime-dependent) or EndpointDispatcher (CRUD).

Local commands (require runtime state):

run now, listTenantsSyncStatus, addMessages, deleteMessages, cleanupMessages, deleteAccount

Delegated commands (via EndpointDispatcher):

addTenant, getTenant, listTenants, updateTenant, deleteTenant, addAccount, listAccounts, getAccount, …

Background Tasks:
  • SmtpSender.dispatch_loop: Fetch pending messages, send via SMTP

  • ClientReporter.sync_loop: Report delivery events to upstream

Usage (recommended factory):

proxy = await MailProxy.create(db_path=”/data/mail.db”, start_active=True) await proxy.stop()

Usage (via API property):

proxy = MailProxy(config=ProxyConfig(db_path=”/data/mail.db”)) app = proxy.api # Lifespan calls start()/stop() automatically

Module Constants:

PRIORITY_LABELS: {0: “immediate”, 1: “high”, 2: “medium”, 3: “low”} LABEL_TO_PRIORITY: Reverse mapping DEFAULT_PRIORITY: 2 (medium)

class core.mail_proxy.proxy.MailProxy(config=None, *, logger=None, metrics=None)[source]

Bases: MailProxyBase

Runtime layer: SMTP sending, background loops, metrics, command handling.

Extends MailProxyBase with: - SmtpSender: SMTP connection pool, rate limiter, dispatch loop - ClientReporter: Delivery report sync loop - AttachmentManager: Fetch attachments with caching - MailMetrics: Prometheus counters and gauges

Class Attributes:

is_enterprise: True when EE modules installed (set dynamically)

Instance Attributes (from base):

config: ProxyConfig instance db: SqlDb with autodiscovered tables endpoints: Dict of endpoint instances

Instance Attributes (runtime):

smtp_sender: SmtpSender instance (pool, rate_limiter, dispatch) client_reporter: ClientReporter instance (sync loop) attachments: AttachmentManager instance metrics: MailMetrics instance logger: Logger for diagnostics

Compatibility Properties (delegate to smtp_sender):

pool: SMTP connection pool rate_limiter: Per-account rate limiter

Parameters:
is_enterprise: bool = False

True when EE modules installed. Set dynamically in mail_proxy/__init__.py.

property pool

SMTP connection pool (delegate to smtp_sender.pool).

property rate_limiter

Per-account rate limiter (delegate to smtp_sender.rate_limiter).

__init_proxy_ee__()[source]

Initialize EE state (bounce_receiver, _bounce_config). CE stub.

Return type:

None

__init__(config=None, *, logger=None, metrics=None)[source]

Initialize the mail dispatcher with ProxyConfig.

Parameters:
  • config (ProxyConfig | None) – ProxyConfig instance with all configuration. If None, creates default config.

  • logger (Logger | None) – Custom logger instance. If None, uses default logger.

  • metrics (MailMetrics | None) – Prometheus metrics collector. If None, creates new instance.

async classmethod create(**kwargs)[source]

Create and initialize a MailProxy instance.

This is the recommended way to create instances. It ensures proper async initialization is completed before returning a ready-to-use proxy.

Parameters:

**kwargs – All arguments accepted by MailProxy.__init__().

Return type:

MailProxy

Returns:

Fully initialized MailProxy instance with background tasks running.

Example

proxy = await MailProxy.create(db_path=”./mail.db”) # Ready to use immediately - no need to call start()

Note

For cases requiring delayed startup, use the traditional pattern:

proxy = MailProxy(db_path="./mail.db")
# ... additional setup ...
await proxy.start()
async init()[source]

Initialize persistence layer and attachment manager.

Performs the following initialization steps: 1. Initialize database schema and run migrations (via MailProxyBase.init) 2. Load cache configuration (from env vars or config file) 3. Initialize attachment cache (memory and disk tiers) 4. Create the AttachmentManager

Return type:

None

async handle_command(cmd, payload=None)[source]

Execute an external control command.

Dispatches the command to the appropriate handler method. Supported commands: - run now: Trigger immediate dispatch cycle - suspend: Pause the scheduler - activate: Resume the scheduler - addAccount, listAccounts, deleteAccount: SMTP account management - addMessages, deleteMessages, listMessages: Message queue management - cleanupMessages: Remove old reported messages - addTenant, getTenant, listTenants, updateTenant, deleteTenant: Tenant management

State-modifying commands are automatically logged to the command_log table for audit trail and replay capability.

Parameters:
  • cmd (str) – Command name to execute.

  • payload (dict[str, Any] | None) – Command-specific parameters.

Returns:

Command result with ok status and command-specific data.

Return type:

dict

async start()[source]

Start the background scheduler and maintenance tasks.

Initializes the persistence layer and spawns background tasks for: - SMTP dispatch via smtp_sender - Client report loop: sends delivery reports to upstream services

Return type:

None

async stop()[source]

Stop all background tasks gracefully.

Signals all running loops to terminate and waits for them to complete. Outstanding operations are allowed to finish before returning.

Return type:

None

async results()[source]

Async generator that yields delivery result events.

Yields:

dict – Delivery event with message ID, status, timestamp, and error info.

Base class for MailProxy: database, tables, endpoints, and interface factories.

MailProxyBase is the foundation layer of genro-mail-proxy, providing:

  1. Configuration: ProxyConfig instance at self.config

  2. Database: SqlDb at self.db with autodiscovered Table classes

  3. Endpoints: Registry at self.endpoints with autodiscovered Endpoint classes

  4. Interfaces: Lazy api (FastAPI) and cli (Click) properties

Class Hierarchy:
MailProxyBase (this class)
└── MailProxy (proxy.py): adds SMTP sender, background loops, metrics

└── MailProxy with MailProxy_EE mixin: adds bounce detection (EE)

Discovery Mechanism:

Tables from core.mail_proxy.entities.*/table.py are composed with optional EE mixins from enterprise.mail_proxy.entities.*/table_ee.py. Endpoints follow the same pattern with endpoint.py and endpoint_ee.py.

Discovered entities (CE):
  • tenants: Multi-tenant isolation

  • accounts: SMTP account configuration

  • messages: Message queue with priority

  • message_events: Delivery event history

  • command_log: API audit trail

  • instance: Service-level configuration

EE extensions (when enterprise package installed):
  • accounts: +PEC (certified email) fields

  • tenants: +API key management

  • instance: +Bounce detection config

Usage (testing without runtime):

proxy = MailProxyBase(db_path=”:memory:”) await proxy.init() await proxy.db.table(“tenants”).add({“id”: “t1”, “name”: “Test”})

Usage (production via proxy.api):

proxy = MailProxy(config=ProxyConfig(db_path=”/data/mail.db”)) app = proxy.api # FastAPI app with auto-start/stop lifespan

class core.mail_proxy.proxy_base.MailProxyBase(config=None)[source]

Bases: object

Foundation layer: config, database, tables, endpoints, interface factories.

config

ProxyConfig instance with all configuration

db

SqlDb with autodiscovered Table classes (CE + EE mixins)

endpoints

Dict of Endpoint instances keyed by name

Properties:

api: FastAPI app (lazy, created on first access) cli: Click CLI group (lazy, created on first access)

Subclassed by MailProxy which adds SMTP sender, background loops, etc.

Parameters:

config (ProxyConfig | None)

__init__(config=None)[source]

Initialize base proxy with config and database.

Parameters:

config (ProxyConfig | None) – ProxyConfig instance. If None, creates default.

endpoints: dict[str, BaseEndpoint]
property encryption_key: bytes | None

Encryption key for database field encryption. None if not configured.

set_encryption_key(key)[source]

Set encryption key programmatically (for testing).

Parameters:

key (bytes)

Return type:

None

endpoint(name)[source]

Get endpoint by name.

Parameters:

name (str)

Return type:

BaseEndpoint

async init()[source]

Initialize database: connect, create tables, run migrations, detect edition.

Return type:

None

async close()[source]

Close database connection.

Return type:

None

property api: FastAPI

FastAPI app with all endpoints, auth middleware, and lifespan.

Created on first access. Includes default lifespan that calls proxy.start() on startup and proxy.stop() on shutdown.

Usage:

uvicorn core.mail_proxy.server:app

property cli: Group

Click CLI group with endpoint commands and service commands.

Created on first access. Includes: - Endpoint commands: tenants, accounts, messages, instance, command_log - Service commands: serve, connect, stats, token, run-now

Usage:

mail-proxy –help

Configuration dataclasses for MailProxy (Community Edition).

This module defines the configuration hierarchy for genro-mail-proxy. ProxyConfig is the single entry point for all configuration, organizing settings into logical nested groups (timing, queue, concurrency, etc.).

Architecture:

ProxyConfig is used by MailProxyBase (and thus MailProxy) to configure the service. It is CE-only: Enterprise Edition does not extend these dataclasses but rather stores EE-specific config in the database (via InstanceTable_EE for bounce detection).

Usage:
config = ProxyConfig(

db_path=”/data/mail.db”, timing=TimingConfig(send_loop_interval=1.0), concurrency=ConcurrencyConfig(max_sends=20),

) proxy = MailProxy(config=config)

# Access nested config interval = proxy.config.timing.send_loop_interval

Nested Config Classes:
  • TimingConfig: Intervals, timeouts, retention periods

  • QueueConfig: Queue sizes, batch limits

  • ConcurrencyConfig: Parallelism limits (sends, attachments)

  • ClientSyncConfig: Upstream reporting (URL, auth)

  • RetryConfig: Retry behavior (max attempts, delays)

  • CacheConfig: Attachment cache (memory/disk tiers)

class core.mail_proxy.proxy_config.CacheConfig(memory_max_mb=50.0, memory_ttl_seconds=300, disk_dir=None, disk_max_mb=500.0, disk_ttl_seconds=3600, disk_threshold_kb=100.0)[source]

Bases: object

Two-tier attachment cache configuration (memory + disk).

Parameters:
memory_max_mb: float = 50.0

Max memory cache size in MB.

memory_ttl_seconds: int = 300

Memory cache TTL in seconds.

disk_dir: str | None = None

Directory for disk cache. None disables disk caching.

disk_max_mb: float = 500.0

Max disk cache size in MB.

__init__(memory_max_mb=50.0, memory_ttl_seconds=300, disk_dir=None, disk_max_mb=500.0, disk_ttl_seconds=3600, disk_threshold_kb=100.0)
Parameters:
disk_ttl_seconds: int = 3600

Disk cache TTL in seconds.

disk_threshold_kb: float = 100.0

Size threshold for disk vs memory (items larger go to disk).

property enabled: bool

Check if caching is enabled (disk dir configured).

class core.mail_proxy.proxy_config.ClientSyncConfig(url=None, user=None, password=None, token=None)[source]

Bases: object

Upstream delivery report synchronization settings.

Parameters:
url: str | None = None

URL for posting delivery reports to upstream service.

user: str | None = None

Username for client sync authentication.

password: str | None = None

Password for client sync authentication.

token: str | None = None

Bearer token for client sync authentication.

__init__(url=None, user=None, password=None, token=None)
Parameters:
class core.mail_proxy.proxy_config.ConcurrencyConfig(max_sends=10, max_per_account=3, max_attachments=3)[source]

Bases: object

Parallelism limits for SMTP sends and attachment fetches.

Parameters:
  • max_sends (int)

  • max_per_account (int)

  • max_attachments (int)

max_sends: int = 10

Maximum concurrent SMTP sends globally.

max_per_account: int = 3

Maximum concurrent sends per SMTP account.

max_attachments: int = 3

Maximum concurrent attachment fetches.

__init__(max_sends=10, max_per_account=3, max_attachments=3)
Parameters:
  • max_sends (int)

  • max_per_account (int)

  • max_attachments (int)

class core.mail_proxy.proxy_config.ProxyConfig(db_path='/data/mail_service.db', instance_name='mail-proxy', port=8000, api_token=None, timing=<factory>, queue=<factory>, concurrency=<factory>, client_sync=<factory>, retry=<factory>, cache=<factory>, default_priority=2, test_mode=False, log_delivery_activity=False, start_active=False, report_delivery_callable=None)[source]

Bases: object

Main configuration container for MailProxy (CE).

Single entry point for all proxy configuration. Groups settings into logical nested structures for clean organization and access.

This class is CE-only. Enterprise Edition stores additional config (bounce detection, PEC) in the database via table extensions.

Nested Groups:

timing: Intervals, timeouts, retention periods queue: Queue sizes, batch limits concurrency: Parallelism limits (sends, attachments) client_sync: Upstream reporting (URL, auth) retry: Retry behavior (max attempts, delays) cache: Attachment cache (memory/disk tiers)

Top-Level Settings:

db_path: SQLite/PostgreSQL database path instance_name: Service identifier for display port: Default API server port api_token: Optional bearer token for API auth default_priority: Default message priority (0-3) test_mode: Disable auto-processing for tests log_delivery_activity: Verbose delivery logging start_active: Start processing immediately report_delivery_callable: Custom delivery report handler

Example

config = ProxyConfig(

db_path=”/data/mail.db”, timing=TimingConfig(send_loop_interval=1.0), concurrency=ConcurrencyConfig(max_sends=20),

) proxy = MailProxy(config=config) interval = proxy.config.timing.send_loop_interval

Parameters:
__init__(db_path='/data/mail_service.db', instance_name='mail-proxy', port=8000, api_token=None, timing=<factory>, queue=<factory>, concurrency=<factory>, client_sync=<factory>, retry=<factory>, cache=<factory>, default_priority=2, test_mode=False, log_delivery_activity=False, start_active=False, report_delivery_callable=None)
Parameters:
db_path: str = '/data/mail_service.db'

SQLite database path for persistence.

instance_name: str = 'mail-proxy'

Instance name for display and identification.

port: int = 8000

Default port for API server.

api_token: str | None = None

API authentication token. If None, no auth required.

timing: TimingConfig

Timing and interval settings.

queue: QueueConfig

Queue size and batch settings.

concurrency: ConcurrencyConfig

Concurrency limits.

client_sync: ClientSyncConfig

Client synchronization settings.

retry: RetryConfig

Retry behavior settings.

cache: CacheConfig

Attachment cache settings.

default_priority: int = 2

Default message priority (0=immediate, 1=high, 2=medium, 3=low).

test_mode: bool = False

Enable test mode (disables automatic loop processing).

log_delivery_activity: bool = False

Enable verbose delivery activity logging.

start_active: bool = False

Whether to start processing messages immediately.

report_delivery_callable: Callable[[dict[str, Any]], Awaitable[None]] | None = None

Optional async callable for custom report delivery.

class core.mail_proxy.proxy_config.QueueConfig(result_size=1000, message_size=10000, put_timeout=5.0, max_enqueue_batch=1000)[source]

Bases: object

Queue sizes and batch limits for message processing.

Parameters:
  • result_size (int)

  • message_size (int)

  • put_timeout (float)

  • max_enqueue_batch (int)

result_size: int = 1000

Maximum size of the delivery result queue.

message_size: int = 10000

Maximum messages to fetch per SMTP cycle.

put_timeout: float = 5.0

Timeout in seconds for queue operations.

max_enqueue_batch: int = 1000

Maximum messages allowed in single addMessages call.

__init__(result_size=1000, message_size=10000, put_timeout=5.0, max_enqueue_batch=1000)
Parameters:
  • result_size (int)

  • message_size (int)

  • put_timeout (float)

  • max_enqueue_batch (int)

class core.mail_proxy.proxy_config.RetryConfig(max_retries=3, delays=(60, 300, 900))[source]

Bases: object

SMTP retry behavior with exponential backoff.

Parameters:
max_retries: int = 3

Maximum retry attempts.

delays: tuple[int, ...] = (60, 300, 900)

Delay in seconds between retries (exponential backoff).

__init__(max_retries=3, delays=(60, 300, 900))
Parameters:
class core.mail_proxy.proxy_config.TimingConfig(send_loop_interval=0.5, attachment_timeout=30, report_retention_seconds=604800)[source]

Bases: object

Timing and interval settings for the dispatch service.

Parameters:
  • send_loop_interval (float)

  • attachment_timeout (int)

  • report_retention_seconds (int)

send_loop_interval: float = 0.5

Seconds between SMTP dispatch loop iterations.

attachment_timeout: int = 30

Timeout in seconds for fetching attachments.

report_retention_seconds: int = 604800

How long to retain reported messages (default 7 days).

__init__(send_loop_interval=0.5, attachment_timeout=30, report_retention_seconds=604800)
Parameters:
  • send_loop_interval (float)

  • attachment_timeout (int)

  • report_retention_seconds (int)

ASGI application entry point for uvicorn.

This module provides the FastAPI application instance for deployment with ASGI servers like uvicorn, hypercorn, or gunicorn+uvicorn.

Configuration via environment variables:

GMP_DB_PATH: Database path (SQLite file or PostgreSQL URL) GMP_API_TOKEN: API authentication token

Components:

app: FastAPI application with full MailProxy lifecycle management. _proxy: Internal MailProxy instance (use app instead).

Example

Run with uvicorn:

GMP_DB_PATH=/data/mail.db GMP_API_TOKEN=secret \
    uvicorn core.mail_proxy.server:app --host 0.0.0.0 --port 8000

Run with Docker:

docker run -e GMP_DB_PATH=postgresql://... -e GMP_API_TOKEN=secret ...

Or via CLI (reads from ~/.mail-proxy/<name>/config.ini):

mail-proxy serve --port 8000

Note

The application includes a lifespan context manager that calls proxy.start() on startup and proxy.stop() on shutdown, ensuring proper initialization of background tasks and graceful cleanup.

Design Decision (2025-02):

API tokens are stored in plaintext in config.ini for CLI instances. This is acceptable for development; production uses Docker with environment variables. Hashing like tenant tokens was considered but deferred since Docker passes tokens via env vars anyway.

Interface Layer

FastAPI route generation from endpoint classes via introspection.

This module generates REST API routes automatically from endpoint classes by introspecting method signatures and creating appropriate handlers.

Components:

create_app: FastAPI application factory. register_endpoint: Register endpoint methods as FastAPI routes. verify_tenant_token: Token verification for tenant-scoped requests. require_admin_token: Admin-only endpoint protection. require_token: General authentication dependency.

Example

Create and run the API server:

from core.mail_proxy.interface import create_app
from core.mail_proxy.proxy import MailProxy

proxy = MailProxy(db_path="/data/mail.db")
app = create_app(proxy, api_token="secret")

# Run with uvicorn
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Register custom endpoints:

from fastapi import FastAPI
from core.mail_proxy.interface import register_endpoint

app = FastAPI()
endpoint = MyCustomEndpoint(table)
register_endpoint(app, endpoint)

Note

Authentication uses X-API-Token header. Global token grants admin access to all tenants. Tenant tokens restrict access to own resources.

core.mail_proxy.interface.api_base.create_app(svc, api_token=None, lifespan=None, tenant_tokens_enabled=False)[source]

Create and configure the FastAPI application.

Parameters:
  • svc (MailProxy) – MailProxy instance implementing business logic.

  • api_token (str | None) – Optional global token for X-API-Token authentication.

  • lifespan (Callable[[FastAPI], AbstractAsyncContextManager[None]] | None) – Optional lifespan context manager. If None, creates default that starts/stops the proxy service.

  • tenant_tokens_enabled (bool) – When True, enables per-tenant API keys.

Return type:

FastAPI

Returns:

Configured FastAPI application with all routes registered.

Example

from core.mail_proxy.proxy import MailProxy
from core.mail_proxy.interface import create_app

proxy = MailProxy(db_path="/data/mail.db")
app = create_app(proxy, api_token="admin-secret")

# Run with uvicorn
import uvicorn
uvicorn.run(app)
core.mail_proxy.interface.api_base.register_endpoint(app, endpoint, prefix='')[source]

Register all methods of an endpoint as FastAPI routes.

Introspects the endpoint to discover async methods and creates appropriate GET (query params) or POST (body) routes.

Parameters:
  • app (FastAPI | APIRouter) – FastAPI app or APIRouter to register routes on.

  • endpoint (Any) – Endpoint instance (BaseEndpoint or duck-typed).

  • prefix (str) – Optional URL prefix. Defaults to /{endpoint.name}.

Return type:

None

Example

endpoint = AccountEndpoint(db.table("accounts"))
register_endpoint(app, endpoint)
# Creates routes: GET /accounts/list, POST /accounts/add, etc.
async core.mail_proxy.interface.api_base.require_admin_token(request, api_token=Depends(dependency=<fastapi.security.api_key.APIKeyHeader object>, use_cache=True, scope=None))[source]

Require global admin token for admin-only endpoints.

Admin-only endpoints include tenant management, API key operations, and instance configuration.

Parameters:
  • request (Request) – FastAPI request object.

  • api_token (str | None) – Token from X-API-Token header (via Depends).

Raises:

HTTPException – 401 if not global admin token, 403 if tenant token.

Return type:

None

async core.mail_proxy.interface.api_base.require_token(request, api_token=Depends(dependency=<fastapi.security.api_key.APIKeyHeader object>, use_cache=True, scope=None))[source]

Validate API token from X-API-Token header.

Accepts global admin token (full access) or tenant token (own resources). Stores token info in request.state for downstream verification.

Parameters:
  • request (Request) – FastAPI request object.

  • api_token (str | None) – Token from X-API-Token header (via Depends).

Raises:

HTTPException – 401 if token is invalid.

Return type:

None

async core.mail_proxy.interface.api_base.verify_tenant_token(tenant_id, api_token, global_token)[source]

Verify API token for a tenant-scoped request.

Parameters:
  • tenant_id (str | None) – The tenant ID from the request.

  • api_token (str | None) – The token from X-API-Token header.

  • global_token (str | None) – The configured global API token (admin).

Raises:

HTTPException – 401 if token is invalid or tenant_id mismatch.

Return type:

None

Note

  • Global token grants access to any tenant

  • Tenant token grants access ONLY to own resources

  • No token configured = open access

Click command generation from endpoint classes via introspection.

This module generates CLI commands automatically from endpoint classes by introspecting method signatures and creating Click commands.

Components:

register_endpoint: Register endpoint methods as Click commands.

Example

Register endpoint commands:

import click
from core.mail_proxy.interface import register_cli_endpoint
from core.mail_proxy.entities.account import AccountEndpoint

@click.group()
def cli():
    pass

endpoint = AccountEndpoint(table)
register_cli_endpoint(cli, endpoint)
# Creates: cli accounts add, cli accounts get, cli accounts list

Generated commands:

mail-proxy accounts list                    # uses context tenant
mail-proxy accounts list acme               # explicit tenant
mail-proxy accounts add main --host smtp.example.com

Note

  • tenant_id is special: optional positional with context fallback

  • Other required params become positional arguments

  • Optional params become –options

  • Boolean params become –flag/–no-flag toggles

  • Method underscores become dashes (add_batch → add-batch)

core.mail_proxy.interface.cli_base.register_endpoint(group, endpoint, run_async=None)[source]

Register all methods of an endpoint as Click commands.

Creates a subgroup named after the endpoint and adds commands for each public async method.

Parameters:
  • group (Group) – Click group to add commands to.

  • endpoint (Any) – Endpoint instance with async methods.

  • run_async (Callable | None) – Function to run async code. Defaults to asyncio.run.

Return type:

Group

Returns:

The created Click subgroup with all endpoint commands.

Example

@click.group()
def cli():
    pass

endpoint = AccountEndpoint(db.table("accounts"))
register_endpoint(cli, endpoint)

# Now available:
# cli accounts list
# cli accounts add <id> --host <host> --port <port>
# cli accounts delete <id>

Special CLI commands not derived from endpoint introspection.

This module provides CLI commands that don’t map directly to REST API endpoints. These are administrative and utility commands requiring special handling (interactive sessions, file I/O, server communication).

Components:

add_connect_command: Interactive Python REPL with pre-configured client. add_stats_command: Display aggregate queue statistics. add_send_command: Queue email from .eml file. add_token_command: API token management (show/regenerate). add_run_now_command: Trigger immediate dispatch cycle via HTTP. add_list_command: List all configured instances with status. add_stop_command: Stop running instances. add_restart_command: Restart running instances.

Instance Management:

Instances are stored in ~/.mail-proxy/<name>/ with config.ini files. The list/stop/restart commands manage these instances by tracking PID files for process management.

Example

Add special commands to CLI group:

from core.mail_proxy.interface.cli_commands import (
    add_connect_command,
    add_stats_command,
    add_send_command,
    add_list_command,
    add_stop_command,
)

@click.group()
def cli():
    pass

add_connect_command(cli, get_url, get_token, "myinstance")
add_stats_command(cli, db)
add_send_command(cli, db, "tenant1")
add_list_command(cli)
add_stop_command(cli)

Run commands:

mail-proxy myinstance connect
mail-proxy myinstance stats --json
mail-proxy myinstance tenant1 send email.eml
mail-proxy list
mail-proxy stop myserver

Note

These commands are registered separately from endpoint-derived commands because they require special parameters (callbacks, file paths) or interactive behavior not suitable for introspection.

core.mail_proxy.interface.cli_commands.add_connect_command(group, get_url, get_token, instance_name)[source]

Register ‘connect’ command for interactive Python REPL.

Creates a REPL session with a pre-configured MailProxyClient for interactive server administration and debugging.

Parameters:
  • group (Group) – Click group to register command on.

  • get_url (Callable[[], str]) – Callback returning server URL (from instance config).

  • get_token (Callable[[], str | None]) – Callback returning API token (from instance config).

  • instance_name (str) – Instance name for display and client configuration.

Return type:

None

Example

mail-proxy myserver connect
mail-proxy myserver connect --url http://remote:8000 --token secret

# In REPL:
>>> proxy.status()
>>> proxy.messages.list(tenant_id="acme")
core.mail_proxy.interface.cli_commands.add_stats_command(group, db)[source]

Register ‘stats’ command for aggregate queue statistics.

Displays tenant/account/message counts with breakdown by status.

Parameters:
  • group (click.Group) – Click group to register command on.

  • db (MailProxyDb) – Database instance for querying statistics.

Return type:

None

Example

mail-proxy myserver stats
mail-proxy myserver stats --json
core.mail_proxy.interface.cli_commands.add_send_command(group, db, tenant_id)[source]

Register ‘send’ command to queue email from .eml file.

Parses RFC 5322 email file and queues for delivery.

Parameters:
  • group (click.Group) – Click group to register command on.

  • db (MailProxyDb) – Database instance for message operations.

  • tenant_id (str) – Tenant context for the send operation.

Return type:

None

Example

mail-proxy myserver acme send email.eml
mail-proxy myserver acme send email.eml --account smtp1 --priority 1
core.mail_proxy.interface.cli_commands.add_token_command(group, db)[source]

Register ‘token’ command for API token management.

Shows current token or regenerates a new one.

Parameters:
  • group (click.Group) – Click group to register command on.

  • db (MailProxyDb) – Database instance for token storage.

Return type:

None

Example

mail-proxy myserver token
mail-proxy myserver token --regenerate
core.mail_proxy.interface.cli_commands.add_run_now_command(group, get_url, get_token, tenant_id=None)[source]

Register ‘run-now’ command to trigger immediate dispatch.

Sends HTTP POST to running server to force dispatch cycle.

Parameters:
  • group (Group) – Click group to register command on.

  • get_url (Callable[[], str]) – Callback returning server URL.

  • get_token (Callable[[], str | None]) – Callback returning API token.

  • tenant_id (str | None) – Optional tenant scope (None = all tenants).

Return type:

None

Example

mail-proxy myserver run-now
mail-proxy myserver acme run-now
core.mail_proxy.interface.cli_commands.add_serve_command(group)[source]

Register ‘serve’ command to start a mail-proxy server instance.

Parameters:

group (Group) – Click group to register command on.

Return type:

None

Example

mail-proxy serve                    # Start default-mailer
mail-proxy serve myserver           # Start/create myserver
mail-proxy serve myserver -p 8080   # Start on specific port
mail-proxy serve myserver -c        # Start and open REPL
core.mail_proxy.interface.cli_commands.add_list_command(group)[source]

Register ‘list’ command to show all configured instances.

Parameters:

group (Group) – Click group to register command on.

Return type:

None

Example

mail-proxy list
core.mail_proxy.interface.cli_commands.add_stop_command(group)[source]

Register ‘stop’ command to stop running instances.

Parameters:

group (Group) – Click group to register command on.

Return type:

None

Example

mail-proxy stop              # Stop all running instances
mail-proxy stop myserver     # Stop specific instance
mail-proxy stop myserver -f  # Force kill
core.mail_proxy.interface.cli_commands.add_restart_command(group)[source]

Register ‘restart’ command to restart running instances.

Parameters:

group (Group) – Click group to register command on.

Return type:

None

Example

mail-proxy restart              # Restart all running instances
mail-proxy restart myserver     # Restart specific instance
core.mail_proxy.interface.cli_commands.add_use_command(group)[source]

Register ‘use’ command to select current context (instance/tenant).

Sets the default instance and optionally tenant for subsequent commands.

Parameters:

group (Group) – Click group to register command on.

Return type:

None

Example

mail-proxy use production           # instance only
mail-proxy use production/acme      # instance + tenant
mail-proxy use /beta                # change tenant only
core.mail_proxy.interface.cli_commands.add_current_command(group)[source]

Register ‘current’ command to show current context.

Parameters:

group (Group) – Click group to register command on.

Return type:

None

Example

mail-proxy current
mail-proxy current --export
core.mail_proxy.interface.cli_commands.resolve_context(explicit_instance=None, explicit_tenant=None)[source]

Resolve active instance and tenant using priority chain.

Resolution order for instance:
  1. Explicit argument

  2. GMP_INSTANCE environment variable

  3. ~/.mail-proxy/.current file (instance part)

  4. Auto-select if only one instance exists

Resolution order for tenant:
  1. Explicit argument

  2. GMP_TENANT environment variable

  3. ~/.mail-proxy/.current file (tenant part)

Parameters:
  • explicit_instance (str | None) – Explicitly specified instance name.

  • explicit_tenant (str | None) – Explicitly specified tenant name.

Return type:

tuple[str | None, str | None]

Returns:

(instance, tenant) tuple. Either can be None.

core.mail_proxy.interface.cli_commands.require_context(explicit_instance=None, explicit_tenant=None, require_tenant=False)[source]

Resolve context or exit with error if ambiguous.

Parameters:
  • explicit_instance (str | None) – Explicitly specified instance name.

  • explicit_tenant (str | None) – Explicitly specified tenant name.

  • require_tenant (bool) – If True, tenant must be resolved.

Return type:

tuple[str, str | None]

Returns:

(instance, tenant) tuple.

Raises:

SystemExit – If required context cannot be resolved.

core.mail_proxy.interface.cli_commands.resolve_instance(explicit=None)[source]

Resolve the active instance (backwards compatible wrapper).

Parameters:

explicit (str | None)

Return type:

str | None

core.mail_proxy.interface.cli_commands.require_instance(explicit=None)[source]

Resolve instance or exit (backwards compatible wrapper).

Parameters:

explicit (str | None)

Return type:

str

Base class for endpoint introspection and command dispatch.

This module provides the foundation for automatic API/CLI generation from endpoint classes via method introspection.

Components:

POST: Decorator to mark methods as HTTP POST. BaseEndpoint: Base class with introspection capabilities. EndpointDispatcher: Routes commands to endpoint methods.

Example

Define an endpoint:

from core.mail_proxy.interface.endpoint_base import BaseEndpoint, POST

class MyEndpoint(BaseEndpoint):
    name = "items"

    async def list(self, active_only: bool = False) -> list[dict]:
        """List all items."""
        return await self.table.list_all(active_only=active_only)

    @POST
    async def add(self, id: str, name: str) -> dict:
        """Add a new item."""
        return await self.table.add({"id": id, "name": name})

Use with dispatcher:

dispatcher = EndpointDispatcher(db)
result = await dispatcher.dispatch("addMessages", {"messages": [...]})

Note

BaseEndpoint.discover() scans CE and EE packages for endpoint classes and composes them when both exist for an entity.

class core.mail_proxy.interface.endpoint_base.BaseEndpoint(table)[source]

Bases: object

Base class for all endpoints with introspection capabilities.

Provides method discovery, HTTP method inference, and Pydantic model generation from signatures for automatic API/CLI generation.

name

Endpoint name used in URL paths and CLI groups.

table

Database table instance for operations.

Example

Create a custom endpoint:

class ItemEndpoint(BaseEndpoint):
    name = "items"

    async def get(self, item_id: str) -> dict:
        item = await self.table.get(item_id)
        if not item:
            raise ValueError(f"Item '{item_id}' not found")
        return item

    @POST
    async def add(self, id: str, name: str) -> dict:
        return await self.table.add({"id": id, "name": name})

# Register with FastAPI
endpoint = ItemEndpoint(db.table("items"))
register_endpoint(app, endpoint)
Parameters:

table (Any)

name: str = ''
__init__(table)[source]

Initialize endpoint with table reference.

Parameters:

table (Any) – Database table instance for operations.

get_methods()[source]

Return all public async methods for API/CLI generation.

Return type:

list[tuple[str, Callable]]

Returns:

List of (method_name, method) tuples for all public async methods (excluding those starting with underscore).

get_http_method(method_name)[source]

Determine HTTP method for an endpoint method.

Parameters:

method_name (str) – Name of the endpoint method.

Return type:

str

Returns:

“POST” if decorated with @POST, otherwise “GET”.

create_request_model(method_name)[source]

Create Pydantic model from method signature.

Used by API layer to validate and parse request bodies.

Parameters:

method_name (str) – Name of the method to introspect.

Return type:

type

Returns:

Dynamically created Pydantic model class.

is_simple_params(method_name)[source]

Check if method has only simple params suitable for query string.

Parameters:

method_name (str) – Name of the method to check.

Return type:

bool

Returns:

False if any parameter is list or dict (including Optional[list]).

count_params(method_name)[source]

Count non-self parameters for a method.

Parameters:

method_name (str) – Name of the method.

Return type:

int

Returns:

Number of parameters excluding ‘self’.

classmethod discover()[source]

Autodiscover all endpoint classes from entities/ directories.

Scans CE and EE packages for endpoint.py and endpoint_ee.py modules. When both exist for an entity, composes them with EE mixin first.

Return type:

list[type[BaseEndpoint]]

Returns:

List of endpoint classes ready for instantiation.

Example

for endpoint_class in BaseEndpoint.discover():
    table = db.table(endpoint_class.name)
    endpoint = endpoint_class(table)
    register_endpoint(app, endpoint)
class core.mail_proxy.interface.endpoint_base.EndpointDispatcher(db, proxy=None)[source]

Bases: object

Dispatches commands to appropriate endpoint methods.

Centralizes command routing, mapping legacy camelCase commands to endpoint.method pairs for backward compatibility.

COMMAND_MAP

Maps command names to (endpoint_name, method_name).

db

Database instance for table access.

proxy

Optional MailProxy for operations needing runtime state.

Example

Use dispatcher for legacy API compatibility:

dispatcher = EndpointDispatcher(db, proxy=proxy)

# Dispatch legacy command
result = await dispatcher.dispatch(
    "addMessages",
    {"messages": [{"to": "user@example.com"}]}
)
# Returns: {"ok": True, "count": 1}

# Direct endpoint access
messages_endpoint = dispatcher.get_endpoint("messages")
await messages_endpoint.add_batch(messages=[...])
Parameters:
COMMAND_MAP: dict[str, tuple[str, str]] = {'activate': ('tenants', 'activate_batch'), 'addAccount': ('accounts', 'add'), 'addMessages': ('messages', 'add_batch'), 'addTenant': ('tenants', 'add'), 'cleanupMessages': ('messages', 'cleanup'), 'deleteAccount': ('accounts', 'delete'), 'deleteMessages': ('messages', 'delete_batch'), 'deleteTenant': ('tenants', 'delete'), 'getInstance': ('instance', 'get'), 'getTenant': ('tenants', 'get'), 'listAccounts': ('accounts', 'list'), 'listMessages': ('messages', 'list'), 'listTenants': ('tenants', 'list'), 'listTenantsSyncStatus': ('instance', 'get_sync_status'), 'suspend': ('tenants', 'suspend_batch'), 'updateInstance': ('instance', 'update'), 'updateTenant': ('tenants', 'update')}
__init__(db, proxy=None)[source]

Initialize dispatcher with database and optional proxy.

Parameters:
  • db (SqlDb) – MailProxyDb instance for table access.

  • proxy (Any) – Optional MailProxy for operations needing runtime state.

async dispatch(cmd, payload)[source]

Dispatch a command to the appropriate endpoint method.

Parameters:
  • cmd (str) – Command name (e.g., “addMessages”, “listTenants”).

  • payload (dict[str, Any]) – Command parameters as dict.

Returns:

True/False, …}.

Return type:

Result dict in legacy format {“ok”

Example

result = await dispatcher.dispatch(
    "addTenant",
    {"id": "acme", "name": "Acme Corp"}
)
if result["ok"]:
    print(f"Created tenant: {result['id']}")
get_endpoint(name)[source]

Get endpoint by name for direct access.

Parameters:

name (str) – Endpoint name (e.g., “messages”, “accounts”).

Return type:

BaseEndpoint

Returns:

BaseEndpoint instance for direct method calls.

core.mail_proxy.interface.endpoint_base.POST(method)[source]

Decorator to mark an endpoint method as POST.

POST methods receive parameters via JSON request body instead of query parameters.

Parameters:

method (Callable) – The async method to decorate.

Return type:

Callable

Returns:

The decorated method with _http_post attribute set.

Example

@POST
async def add(self, id: str, data: dict) -> dict:
    """Add item with complex data."""
    ...

Entities

Each entity follows the table + endpoint pattern for database persistence and API exposure.

Account Entity

SMTP account configuration table manager.

This module provides the AccountsTable class for managing SMTP server configurations in a multi-tenant environment. Each account belongs to a tenant and defines connection parameters for outgoing email delivery.

The table uses a UUID primary key (pk) with a unique constraint on (tenant_id, id) for multi-tenant isolation. This allows each tenant to use their own account identifiers without conflicts.

Example

Basic account management:

from core.mail_proxy.proxy_base import MailProxyBase

proxy = MailProxyBase(db_path=":memory:")
await proxy.init()

accounts = proxy.db.table("accounts")

# Add an SMTP account
pk = await accounts.add({
    "id": "main",
    "tenant_id": "acme",
    "host": "smtp.gmail.com",
    "port": 587,
    "user": "sender@acme.com",
    "password": "app-password",
    "use_tls": True,
})

# Retrieve account
account = await accounts.get("acme", "main")

# List all accounts for a tenant
all_accounts = await accounts.list_all(tenant_id="acme")
core.mail_proxy.entities.account.table.name

Table name in database (“accounts”).

core.mail_proxy.entities.account.table.pkey

Primary key column name (“pk”).

Note

Enterprise Edition (EE) extends this class with PEC (Posta Elettronica Certificata) support via AccountsTable_EE mixin, adding IMAP polling for delivery receipts.

class core.mail_proxy.entities.account.table.AccountsTable(db)[source]

Bases: Table

SMTP account configurations for outgoing email delivery.

Manages SMTP server connection parameters including host, port, credentials, TLS settings, and rate limits. Each account belongs to a tenant and is identified by a client-provided ID.

The table schema includes:
  • pk: Internal UUID primary key

  • id: Client-provided account identifier (unique per tenant)

  • tenant_id: Foreign key to tenants table

  • host, port: SMTP server connection

  • user, password: Authentication credentials (password encrypted)

  • ttl: Connection cache TTL in seconds

  • limit_per_minute/hour/day: Rate limiting thresholds

  • limit_behavior: Action when rate exceeded (“defer” or “reject”)

  • use_tls: TLS/STARTTLS mode

  • batch_size: Max messages per connection

Example

Adding an account with rate limits:

pk = await accounts.add({
    "id": "transactional",
    "tenant_id": "acme",
    "host": "smtp.sendgrid.net",
    "port": 587,
    "user": "apikey",
    "password": "SG.xxxxx",
    "use_tls": True,
    "limit_per_hour": 1000,
    "limit_per_day": 10000,
    "limit_behavior": "defer",
})
Parameters:

db (SqlDb)

name: str = 'accounts'
pkey: str | None = 'pk'
create_table_sql()[source]

Generate CREATE TABLE statement with multi-tenant unique constraint.

Adds UNIQUE (tenant_id, id) constraint to ensure each tenant has unique account identifiers while allowing the same ID across different tenants.

Return type:

str

Returns:

SQL CREATE TABLE statement with UNIQUE constraint.

configure()[source]

Define table columns.

Return type:

None

Columns:

pk: UUID primary key (auto-generated on insert). id: Client account identifier (required). tenant_id: Owning tenant (FK to tenants, required). host: SMTP server hostname (required). port: SMTP server port (required). user: SMTP username for authentication. password: SMTP password (encrypted at rest). ttl: Connection cache TTL in seconds (default: 300). limit_per_minute: Max emails per minute. limit_per_hour: Max emails per hour. limit_per_day: Max emails per day. limit_behavior: Rate limit action (“defer” or “reject”). use_tls: TLS mode (1=STARTTLS, 0=none, NULL=auto). batch_size: Messages per SMTP connection. created_at: Record creation timestamp. updated_at: Last modification timestamp.

Note

EE columns (is_pec_account, imap_*) are added by AccountsTable_EE.configure() when enterprise package is installed.

async migrate_from_legacy_schema()[source]

Migrate from composite primary key to UUID primary key.

Legacy databases used PRIMARY KEY (tenant_id, id). This migration adds a UUID ‘pk’ column as the new primary key while preserving the UNIQUE constraint on (tenant_id, id).

The migration process:
  1. Create new table with UUID pk column

  2. Copy existing rows, generating UUIDs

  3. Drop old table and rename new table

Return type:

bool

Returns:

True if migration was performed, False if not needed (pk column already exists or table doesn’t exist).

Note

Safe to call on every startup. Skips silently if migration is not needed.

async add(acc)[source]

Insert or update an SMTP account configuration.

Performs an upsert based on (tenant_id, id). If the account exists, updates all fields. If new, generates a UUID for the pk column.

Parameters:

acc (dict[str, Any]) – Account configuration dict with keys: - id (required): Client account identifier. - tenant_id (required): Owning tenant ID. - host (required): SMTP server hostname. - port (required): SMTP server port. - user: SMTP username. - password: SMTP password (will be encrypted). - ttl: Connection cache TTL (default: 300). - limit_per_minute/hour/day: Rate limits. - limit_behavior: “defer” or “reject” (default: “defer”). - use_tls: True/False/None for TLS mode. - batch_size: Messages per connection. - is_pec_account: True for PEC accounts (EE). - imap_*: IMAP settings for PEC (EE).

Return type:

str

Returns:

The account’s internal UUID (pk).

Example

pk = await accounts.add({
    "id": "marketing",
    "tenant_id": "acme",
    "host": "smtp.mailgun.org",
    "port": 587,
    "user": "postmaster@acme.com",
    "password": "secret",
    "use_tls": True,
    "limit_per_hour": 500,
})
async get(tenant_id, account_id)[source]

Retrieve a single SMTP account by tenant and ID.

Parameters:
  • tenant_id (str) – The tenant that owns this account.

  • account_id (str) – The client-provided account identifier.

Return type:

dict[str, Any]

Returns:

Account dict with use_tls converted to bool/None.

Raises:

ValueError – If account not found for this tenant.

Example

try:
    account = await accounts.get("acme", "main")
    print(f"SMTP host: {account['host']}")
except ValueError:
    print("Account not found")
async list_all(tenant_id=None)[source]

List SMTP accounts, optionally filtered by tenant.

Parameters:

tenant_id (str | None) – Filter by tenant ID. If None, returns all accounts.

Return type:

list[dict[str, Any]]

Returns:

List of account dicts ordered by ID, with boolean fields decoded.

Example

# All accounts for a tenant
acme_accounts = await accounts.list_all(tenant_id="acme")

# All accounts across all tenants (admin view)
all_accounts = await accounts.list_all()
async remove(tenant_id, account_id)[source]

Delete an SMTP account.

Parameters:
  • tenant_id (str) – The tenant that owns this account.

  • account_id (str) – The account identifier to delete.

Return type:

None

Note

Messages referencing this account should be cleaned up separately or via foreign key CASCADE constraints.

async sync_schema()[source]

Synchronize table schema with column definitions.

Adds missing columns and ensures the UNIQUE index on (tenant_id, id) exists for multi-tenant isolation.

Safe to call on every startup.

Return type:

None

SMTP account REST API endpoint.

This module provides the AccountEndpoint class exposing CRUD operations for SMTP account configurations via REST API and CLI commands.

The endpoint is designed for automatic introspection by api_base and cli_base modules, which generate FastAPI routes and Typer commands from method signatures using inspect and pydantic.create_model.

Example

Register endpoint with the API router:

from core.mail_proxy.entities.account import AccountEndpoint

endpoint = AccountEndpoint(proxy.db.table("accounts"))
# Routes auto-generated: POST /accounts, GET /accounts/{id}, etc.

CLI commands auto-generated:

mail-proxy accounts add --tenant-id acme --id main --host smtp.example.com --port 587
mail-proxy accounts list --tenant-id acme
mail-proxy accounts get --tenant-id acme --account-id main
mail-proxy accounts delete --tenant-id acme --account-id main

Note

Enterprise Edition (EE) extends this with AccountEndpoint_EE mixin adding PEC-specific fields (is_pec_account, imap_* settings).

class core.mail_proxy.entities.account.endpoint.AccountEndpoint(table)[source]

Bases: BaseEndpoint

REST API endpoint for SMTP account management.

Provides CRUD operations for SMTP accounts. Each method is introspected to auto-generate API routes and CLI commands.

name

Endpoint name used in URL paths (“accounts”).

table

AccountsTable instance for database operations.

Example

Using the endpoint programmatically:

endpoint = AccountEndpoint(db.table("accounts"))

# Add account
account = await endpoint.add(
    id="main",
    tenant_id="acme",
    host="smtp.gmail.com",
    port=587,
    user="sender@acme.com",
    password="app-password",
    use_tls=True,
)

# List accounts
accounts = await endpoint.list(tenant_id="acme")
Parameters:

table (AccountsTable)

name: str = 'accounts'
__init__(table)[source]

Initialize endpoint with table reference.

Parameters:

table (AccountsTable) – AccountsTable instance for database operations.

async add(id, tenant_id, host, port, user=None, password=None, use_tls=True, batch_size=None, ttl=300, limit_per_minute=None, limit_per_hour=None, limit_per_day=None, limit_behavior='defer')[source]

Add or update an SMTP account configuration.

Performs upsert: creates new account or updates existing one based on (tenant_id, id) composite key.

Parameters:
  • id (str) – Account identifier (unique within tenant).

  • tenant_id (str) – Owning tenant ID.

  • host (str) – SMTP server hostname.

  • port (int) – SMTP server port (typically 25, 465, or 587).

  • user (str | None) – SMTP username for authentication.

  • password (str | None) – SMTP password (encrypted at rest).

  • use_tls (bool) – Enable STARTTLS (True) or plain connection (False).

  • batch_size (int | None) – Max messages per SMTP connection.

  • ttl (int) – Connection cache TTL in seconds.

  • limit_per_minute (int | None) – Rate limit per minute.

  • limit_per_hour (int | None) – Rate limit per hour.

  • limit_per_day (int | None) – Rate limit per day.

  • limit_behavior (Literal['defer', 'reject']) – Action when rate exceeded (“defer” or “reject”).

Return type:

dict

Returns:

Complete account record after insert/update.

async get(tenant_id, account_id)[source]

Retrieve a single SMTP account by tenant and ID.

Parameters:
  • tenant_id (str) – Tenant that owns the account.

  • account_id (str) – Account identifier.

Return type:

dict

Returns:

Account configuration dict.

Raises:

ValueError – If account not found.

async list(tenant_id)[source]

List all SMTP accounts for a tenant.

Parameters:

tenant_id (str) – Tenant to list accounts for.

Return type:

list[dict]

Returns:

List of account dicts ordered by ID.

async delete(tenant_id, account_id)[source]

Delete an SMTP account.

Parameters:
  • tenant_id (str) – Tenant that owns the account.

  • account_id (str) – Account identifier to delete.

Return type:

None

Tenant Entity

Tenant configuration table manager.

This module provides the TenantsTable class for managing tenant configurations in a multi-tenant mail proxy environment.

In Community Edition (CE), a single “default” tenant is used implicitly. Enterprise Edition (EE) extends with full multi-tenant management via TenantsTable_EE mixin, adding API key authentication and tenant CRUD.

Each tenant can configure:
  • Client authentication (for callback URLs)

  • Rate limits (per minute/hour/day)

  • Large file handling (warn/reject/rewrite)

  • Batch suspension (pause specific campaigns)

Example

Basic tenant operations:

from core.mail_proxy.proxy_base import MailProxyBase

proxy = MailProxyBase(db_path=":memory:")
await proxy.init()

tenants = proxy.db.table("tenants")

# Ensure default tenant exists (CE mode)
await tenants.ensure_default()

# Get tenant config
tenant = await tenants.get("default")

# Suspend a batch
await tenants.suspend_batch("default", "newsletter-q1")

# Activate all batches
await tenants.activate_batch("default")

Note

Enterprise Edition (EE) extends this class with TenantsTable_EE mixin, adding: add(), list_all(), update_fields(), remove(), create_api_key(), revoke_api_key(), get_tenant_by_token().

class core.mail_proxy.entities.tenant.table.TenantsTable(db)[source]

Bases: Table

Tenant configuration storage table.

Manages tenant settings including client authentication, rate limits, and batch suspension for campaign control.

name

Table name (“tenants”).

pkey

Primary key column (“id”).

Table Schema:
  • id: Tenant identifier (primary key)

  • name: Display name

  • client_auth: JSON dict with HTTP auth config for callbacks

  • client_base_url: Base URL for client callbacks

  • client_sync_path: Path for sync endpoint

  • client_attachment_path: Path for attachment endpoint

  • rate_limits: JSON dict with per-minute/hour/day limits

  • large_file_config: JSON dict for large attachment handling

  • active: 0/1 flag for tenant status

  • suspended_batches: Comma-separated batch codes or “*” for all

  • api_key_hash: Hashed API key (EE only)

  • api_key_expires_at: API key expiration (EE only)

  • created_at, updated_at: Timestamps

Example

Work with tenant configuration:

tenants = proxy.db.table("tenants")

# Get tenant
tenant = await tenants.get("acme")

# Check batch suspension
is_suspended = tenants.is_batch_suspended(
    tenant["suspended_batches"],
    "campaign-001",
)

# Suspend a batch
await tenants.suspend_batch("acme", "campaign-001")
Parameters:

db (SqlDb)

name: str = 'tenants'
pkey: str | None = 'id'
configure()[source]

Define table columns.

Return type:

None

Columns:

id: Tenant identifier (primary key string). name: Human-readable tenant name. client_auth: JSON dict with auth method, credentials for callbacks. client_base_url: Base URL for client HTTP callbacks. client_sync_path: Path appended to base_url for sync endpoint. client_attachment_path: Path for attachment fetch endpoint. rate_limits: JSON dict with per_minute, per_hour, per_day limits. large_file_config: JSON dict with threshold, action settings. active: 1=active, 0=disabled (INTEGER for SQLite). suspended_batches: Comma-separated batch codes or “*” for all. api_key_hash: Bcrypt hash of API key (EE only). api_key_expires_at: API key expiration timestamp (EE only). created_at: Row creation timestamp. updated_at: Last modification timestamp.

async get(tenant_id)[source]

Fetch a tenant configuration by ID.

Parameters:

tenant_id (str) – Tenant identifier.

Return type:

dict[str, Any] | None

Returns:

Tenant dict with ‘active’ converted to bool, or None if not found.

is_batch_suspended(suspended_batches, batch_code)[source]

Check if a batch is suspended.

Parameters:
  • suspended_batches (str | None) – Tenant’s suspended_batches value (comma-separated or “*”).

  • batch_code (str | None) – Message’s batch_code (None if no batch).

Return type:

bool

Returns:

True if the message should be skipped.

Note

  • “*” suspends all messages regardless of batch_code

  • Messages without batch_code are only suspended by “*”

  • Specific batch codes must match exactly

Example

# All suspended
is_batch_suspended("*", "any-batch")  # True
is_batch_suspended("*", None)  # True

# Specific batches
is_batch_suspended("batch1,batch2", "batch1")  # True
is_batch_suspended("batch1,batch2", "batch3")  # False
is_batch_suspended("batch1,batch2", None)  # False
async ensure_default()[source]

Ensure the ‘default’ tenant exists for CE single-tenant mode.

Creates the default tenant without API key. In CE mode, all operations use the instance token. When upgrading to EE, admin can generate tenant token via create_api_key().

Return type:

None

async suspend_batch(tenant_id, batch_code=None)[source]

Suspend sending for a tenant.

Suspended batches are skipped by the dispatcher. Use for:
  • Pausing a campaign (specific batch_code)

  • Emergency stop (batch_code=None suspends all)

Parameters:
  • tenant_id (str) – Tenant identifier.

  • batch_code (str | None) – Batch to suspend. If None, suspends all (“*”).

Return type:

bool

Returns:

True if tenant found and updated, False if not found.

Example

# Suspend specific batch
await tenants.suspend_batch("acme", "newsletter-q1")

# Suspend all sending
await tenants.suspend_batch("acme")
async activate_batch(tenant_id, batch_code=None)[source]

Resume sending for a tenant.

Removes batch from suspension list. If batch_code is None, clears ALL suspensions.

Parameters:
  • tenant_id (str) – Tenant identifier.

  • batch_code (str | None) – Batch to activate. If None, clears all suspensions.

Return type:

bool

Returns:

True if updated successfully. False if tenant not found or cannot remove single batch from “*”.

Note

Cannot remove a single batch when full suspension (“*”) is active. Must call activate_batch(tenant_id, None) first to clear all.

Example

# Activate specific batch
await tenants.activate_batch("acme", "newsletter-q1")

# Clear all suspensions
await tenants.activate_batch("acme")
async get_suspended_batches(tenant_id)[source]

Get suspended batch codes for a tenant.

Parameters:

tenant_id (str) – Tenant identifier.

Return type:

set[str]

Returns:

Set of batch codes, {“*”} if all suspended, or empty set.

Example

suspended = await tenants.get_suspended_batches("acme")
if "*" in suspended:
    print("All sending suspended")
elif "campaign-001" in suspended:
    print("Campaign 001 is suspended")

Tenant REST API endpoint.

This module provides the TenantEndpoint class exposing CRUD operations for tenant configurations via REST API and CLI commands.

The endpoint is designed for automatic introspection by api_base and cli_base modules, which generate FastAPI routes and Typer commands from method signatures.

Example

CLI commands auto-generated:

mail-proxy tenants add --id acme --name "Acme Corp"
mail-proxy tenants list
mail-proxy tenants get --tenant-id acme
mail-proxy tenants delete --tenant-id acme
mail-proxy tenants suspend-batch --tenant-id acme --batch-code newsletter
mail-proxy tenants activate-batch --tenant-id acme

Note

Enterprise Edition (EE) extends this with TenantEndpoint_EE mixin adding API key management (create_api_key, revoke_api_key).

class core.mail_proxy.entities.tenant.endpoint.AuthMethod(value)[source]

Bases: str, Enum

Authentication methods for HTTP client callbacks.

NONE

No authentication required.

BEARER

Bearer token authentication.

BASIC

HTTP Basic authentication.

NONE = 'none'
BEARER = 'bearer'
BASIC = 'basic'
class core.mail_proxy.entities.tenant.endpoint.LargeFileAction(value)[source]

Bases: str, Enum

Action when attachment exceeds size threshold.

WARN

Log warning but proceed with sending.

REJECT

Reject the message entirely.

REWRITE

Store file externally and rewrite attachment URL.

WARN = 'warn'
REJECT = 'reject'
REWRITE = 'rewrite'
class core.mail_proxy.entities.tenant.endpoint.TenantEndpoint(table)[source]

Bases: BaseEndpoint

REST API endpoint for tenant management.

Provides CRUD operations for tenant configurations including batch suspension for campaign control.

name

Endpoint name used in URL paths (“tenants”).

table

TenantsTable instance for database operations.

Example

Using the endpoint programmatically:

endpoint = TenantEndpoint(db.table("tenants"))

# Add tenant
tenant = await endpoint.add(
    id="acme",
    name="Acme Corp",
    client_base_url="https://acme.com",
)

# Suspend a batch
await endpoint.suspend_batch(tenant_id="acme", batch_code="newsletter")

# List tenants
tenants = await endpoint.list()
Parameters:

table (TenantsTable)

name: str = 'tenants'
__init__(table)[source]

Initialize endpoint with table reference.

Parameters:

table (TenantsTable) – TenantsTable instance for database operations.

async add(id, name=None, client_auth=None, client_base_url=None, client_sync_path=None, client_attachment_path=None, rate_limits=None, large_file_config=None, active=True)[source]

Add or update a tenant configuration.

Parameters:
  • id (str) – Tenant identifier (unique).

  • name (str | None) – Human-readable tenant name.

  • client_auth (dict[str, Any] | None) – HTTP auth config for callbacks (method, credentials).

  • client_base_url (str | None) – Base URL for client HTTP callbacks.

  • client_sync_path (str | None) – Path for sync endpoint (default: /mail-proxy/sync).

  • client_attachment_path (str | None) – Path for attachments (default: /mail-proxy/attachments).

  • rate_limits (dict[str, Any] | None) – Rate limit config (per_minute, per_hour, per_day).

  • large_file_config (dict[str, Any] | None) – Large file handling (threshold, action).

  • active (bool) – Whether tenant is active.

Return type:

dict

Returns:

Tenant dict, including api_key for new tenants (EE only).

async get(tenant_id)[source]

Retrieve a single tenant configuration.

Parameters:

tenant_id (str) – Tenant identifier.

Return type:

dict

Returns:

Tenant configuration dict.

Raises:

ValueError – If tenant not found.

async list(active_only=False)[source]

List all tenants.

Parameters:

active_only (bool) – If True, only return active tenants.

Return type:

list[dict]

Returns:

List of tenant configuration dicts.

async delete(tenant_id)[source]

Delete a tenant and all associated data.

Parameters:

tenant_id (str) – Tenant identifier to delete.

Return type:

bool

Returns:

True if deleted.

async update(tenant_id, name=None, client_auth=None, client_base_url=None, client_sync_path=None, client_attachment_path=None, rate_limits=None, large_file_config=None, active=None)[source]

Update tenant configuration fields.

Only provided fields are updated; None values are ignored.

Parameters:
  • tenant_id (str) – Tenant identifier.

  • name (str | None) – New tenant name.

  • client_auth (dict[str, Any] | None) – New auth config.

  • client_base_url (str | None) – New base URL.

  • client_sync_path (str | None) – New sync path.

  • client_attachment_path (str | None) – New attachment path.

  • rate_limits (dict[str, Any] | None) – New rate limits.

  • large_file_config (dict[str, Any] | None) – New large file config.

  • active (bool | None) – New active status.

Return type:

dict

Returns:

Updated tenant configuration dict.

async suspend_batch(tenant_id, batch_code=None)[source]

Suspend sending for a tenant.

Suspended batches are skipped by the dispatcher.

Parameters:
  • tenant_id (str) – Tenant identifier.

  • batch_code (str | None) – Batch to suspend. If None, suspends all sending.

Return type:

dict

Returns:

Dict with ok=True and suspended_batches list.

Raises:

ValueError – If tenant not found.

async activate_batch(tenant_id, batch_code=None)[source]

Resume sending for a tenant.

Removes batch from suspension list.

Parameters:
  • tenant_id (str) – Tenant identifier.

  • batch_code (str | None) – Batch to activate. If None, clears all suspensions.

Return type:

dict

Returns:

Dict with ok=True and remaining suspended_batches.

Raises:

ValueError – If tenant not found or cannot remove single batch from “*”.

async get_suspended_batches(tenant_id)[source]

Get suspended batches for a tenant.

Parameters:

tenant_id (str) – Tenant identifier.

Return type:

dict

Returns:

Dict with ok=True and suspended_batches list.

Raises:

ValueError – If tenant not found.

core.mail_proxy.entities.tenant.endpoint.get_tenant_attachment_url(tenant)[source]

Build full attachment fetch URL from tenant config.

Parameters:

tenant (dict[str, Any]) – Tenant configuration dict.

Return type:

str | None

Returns:

Full URL (base_url + attachment_path) or None if no base_url.

Example

tenant = {"client_base_url": "https://acme.com"}
url = get_tenant_attachment_url(tenant)
# Returns: "https://acme.com/mail-proxy/attachments" (default path)
core.mail_proxy.entities.tenant.endpoint.get_tenant_sync_url(tenant)[source]

Build full sync callback URL from tenant config.

Parameters:

tenant (dict[str, Any]) – Tenant configuration dict.

Return type:

str | None

Returns:

Full URL (base_url + sync_path) or None if no base_url.

Example

tenant = {"client_base_url": "https://acme.com", "client_sync_path": "/api/sync"}
url = get_tenant_sync_url(tenant)
# Returns: "https://acme.com/api/sync"

Message Entity

Email message queue table manager.

This module provides the MessagesTable class for managing email messages in the delivery queue. Each message contains a JSON payload with email content and is associated with a tenant and SMTP account.

Messages progress through states:
  • Pending: smtp_ts IS NULL, deferred_ts IS NULL

  • Deferred: smtp_ts IS NULL, deferred_ts IS NOT NULL

  • Processed: smtp_ts IS NOT NULL

The table uses a UUID primary key (pk) with a unique constraint on (tenant_id, id) for multi-tenant isolation and idempotent inserts.

Example

Queue and send messages:

from core.mail_proxy.proxy_base import MailProxyBase

proxy = MailProxyBase(db_path=":memory:")
await proxy.init()

messages = proxy.db.table("messages")

# Queue a message batch
result = await messages.insert_batch([{
    "id": "msg-001",
    "tenant_id": "acme",
    "account_id": "main",
    "payload": {
        "from": "sender@acme.com",
        "to": ["user@example.com"],
        "subject": "Hello",
        "body": "Test message",
    },
}])

# Fetch messages ready for delivery
import time
ready = await messages.fetch_ready(limit=10, now_ts=int(time.time()))

# Mark as sent
for msg in ready:
    await messages.mark_sent(msg["pk"], int(time.time()))

Note

Enterprise Edition (EE) extends this class with MessagesTable_EE mixin, adding PEC-specific methods for Italian certified email.

class core.mail_proxy.entities.message.table.MessagesTable(db)[source]

Bases: Table

Email message queue with scheduling and deferred delivery.

Manages the email delivery queue with support for priority ordering, deferred delivery (retry scheduling), batch operations, and multi-tenant isolation.

name

Table name (“messages”).

pkey

Primary key column (“pk”, UUID string).

Table Schema:
  • pk: UUID primary key (generated on insert)

  • id: Client-provided message identifier (unique per tenant)

  • tenant_id: Tenant identifier for isolation

  • account_id: Legacy account reference (business key)

  • account_pk: FK to accounts.pk (UUID)

  • priority: Delivery priority (0=immediate, 1=high, 2=medium, 3=low)

  • payload: JSON email content (from, to, subject, body, etc.)

  • batch_code: Optional campaign/batch identifier

  • deferred_ts: Retry timestamp (NULL = ready for delivery)

  • smtp_ts: SMTP attempt timestamp (NULL = pending)

  • is_pec: PEC flag for Italian certified email (EE)

  • created_at, updated_at: Timestamps

Example

Basic message queue operations:

messages = proxy.db.table("messages")

# Insert messages
await messages.insert_batch([
    {"id": "m1", "tenant_id": "t1", "account_id": "a1",
     "payload": {"from": "a@b.com", "to": ["c@d.com"], ...}},
])

# Fetch ready messages
ready = await messages.fetch_ready(limit=100, now_ts=now)

# Mark sent/error
await messages.mark_sent(pk, now)
await messages.mark_error(pk, now)
Parameters:

db (SqlDb)

name: str = 'messages'
pkey: str | None = 'pk'
create_table_sql()[source]

Generate CREATE TABLE with UNIQUE constraint.

Adds UNIQUE (tenant_id, id) constraint to ensure idempotent inserts and multi-tenant isolation.

Return type:

str

Returns:

SQL CREATE TABLE statement with UNIQUE constraint.

configure()[source]

Define table columns.

Return type:

None

Columns:

pk: UUID primary key (generated via get_uuid()). id: Client message identifier (unique per tenant). tenant_id: Owning tenant (denormalized for query efficiency). account_id: Legacy account reference (business key). account_pk: FK to accounts.pk UUID. priority: Delivery priority (0-3, default 2). payload: JSON email content. batch_code: Optional batch/campaign identifier. created_at, updated_at: Timestamps. deferred_ts: Unix timestamp for retry scheduling. smtp_ts: Unix timestamp when SMTP was attempted. is_pec: PEC flag (1=awaiting receipts, EE only).

async migrate_from_legacy_schema()[source]

Migrate from INTEGER pk to UUID pk schema.

Legacy databases used INTEGER autoincrement primary key. This migration adds UUID ‘pk’ column as new primary key.

Return type:

bool

Returns:

True if migration performed, False if not needed.

async migrate_account_pk()[source]

Populate account_pk from account_id + tenant_id.

Links messages to accounts via UUID instead of business key.

Return type:

bool

Returns:

True if migration performed, False if not needed.

async insert_batch(entries, pec_account_ids=None, tenant_id=None, auto_pec=True)[source]

Persist a batch of messages for delivery.

Performs upsert based on (tenant_id, id):
  • New message: insert with generated UUID pk

  • Existing pending: update fields

  • Existing processed: skip (already sent)

Parameters:
  • entries (Sequence[dict[str, Any]]) – List of message dicts. Each must have: - id: Client message identifier - tenant_id or use tenant_id param - account_id: SMTP account identifier - payload: Email content dict Optional: priority, deferred_ts, batch_code, account_pk.

  • pec_account_ids (set[str] | None) – Set of account IDs that are PEC accounts. If None and auto_pec=True, fetched from accounts table.

  • tenant_id (str | None) – Default tenant ID if not in each entry.

  • auto_pec (bool) – If True, auto-fetch PEC account IDs.

Returns:

msg_id, “pk”: pk} for inserted/updated messages.

Return type:

List of {“id”

Example

result = await messages.insert_batch([
    {
        "id": "msg-001",
        "tenant_id": "acme",
        "account_id": "main",
        "payload": {
            "from": "sender@acme.com",
            "to": ["user@example.com"],
            "subject": "Test",
            "body": "Hello",
        },
    },
])
# Returns: [{"id": "msg-001", "pk": "uuid-..."}]
async fetch_ready(*, limit, now_ts, priority=None, min_priority=None)[source]

Fetch messages ready for SMTP delivery.

Returns pending messages ordered by priority and creation time. Excludes messages from suspended tenants/batches.

Parameters:
  • limit (int) – Maximum messages to fetch.

  • now_ts (int) – Current Unix timestamp for deferred check.

  • priority (int | None) – Exact priority to filter (0-3).

  • min_priority (int | None) – Minimum priority to filter.

Return type:

list[dict[str, Any]]

Returns:

List of message dicts with decoded payload.

Note

Suspension logic:
  • tenant.suspended_batches = “*”: all messages skipped

  • tenant.suspended_batches contains batch_code: skipped

  • Messages without batch_code: only skipped when “*”

async set_deferred(pk, deferred_ts)[source]

Schedule message for retry at specified timestamp.

Resets smtp_ts to NULL so message becomes pending again.

Parameters:
  • pk (str) – Message UUID primary key.

  • deferred_ts (int) – Unix timestamp for retry.

Return type:

None

async clear_deferred(pk)[source]

Clear deferred timestamp, making message immediately ready.

Parameters:

pk (str) – Message UUID primary key.

Return type:

None

async mark_sent(pk, smtp_ts)[source]

Mark message as successfully sent.

Parameters:
  • pk (str) – Message UUID primary key.

  • smtp_ts (int) – Unix timestamp of successful SMTP send.

Return type:

None

async mark_error(pk, smtp_ts)[source]

Mark message as sent with error.

Parameters:
  • pk (str) – Message UUID primary key.

  • smtp_ts (int) – Unix timestamp of failed SMTP attempt.

Return type:

None

async update_payload(pk, payload)[source]

Update message payload.

Parameters:
  • pk (str) – Message UUID primary key.

  • payload (dict[str, Any]) – New email content dict.

Return type:

None

async get(msg_id, tenant_id)[source]

Get message by client ID and tenant.

Parameters:
  • msg_id (str) – Client-provided message ID.

  • tenant_id (str) – Tenant identifier.

Return type:

dict[str, Any] | None

Returns:

Message dict with decoded payload, or None if not found.

async get_by_pk(pk)[source]

Get message by internal primary key.

Parameters:

pk (str) – Message UUID primary key.

Return type:

dict[str, Any] | None

Returns:

Message dict with decoded payload, or None if not found.

async remove_by_pk(pk)[source]

Delete message by primary key.

Parameters:

pk (str) – Message UUID primary key.

Return type:

bool

Returns:

True if deleted, False if not found.

async purge_for_account(account_id)[source]

Delete all messages for an account.

Parameters:

account_id (str) – Account identifier.

Return type:

None

async existing_ids(ids)[source]

Check which message IDs already exist.

Parameters:

ids (Iterable[str]) – Iterable of message IDs to check.

Return type:

set[str]

Returns:

Set of IDs that exist in the messages table.

async get_ids_for_tenant(ids, tenant_id)[source]

Get message IDs that belong to a tenant.

Validates ownership by checking tenant_id in accounts table.

Parameters:
  • ids (list[str]) – List of message IDs to check.

  • tenant_id (str) – Tenant identifier.

Return type:

set[str]

Returns:

Set of message IDs owned by the tenant.

async remove_fully_reported_before(threshold_ts)[source]

Delete messages whose events are all reported before threshold.

A message can be removed when:
  • It has been processed (smtp_ts IS NOT NULL)

  • All its events have been reported

  • Most recent reported_ts is older than threshold

Parameters:

threshold_ts (int) – Unix timestamp threshold.

Return type:

int

Returns:

Number of deleted messages.

async remove_fully_reported_before_for_tenant(threshold_ts, tenant_id)[source]

Delete fully reported messages for a tenant.

Parameters:
  • threshold_ts (int) – Unix timestamp threshold.

  • tenant_id (str) – Tenant identifier.

Return type:

int

Returns:

Number of deleted messages.

async list_all(*, tenant_id=None, active_only=False, include_history=False)[source]

List messages with optional filters.

Parameters:
  • tenant_id (str | None) – Filter by tenant.

  • active_only (bool) – Only return pending messages (smtp_ts IS NULL).

  • include_history (bool) – Include event history for each message.

Return type:

list[dict[str, Any]]

Returns:

List of message dicts with decoded payload and optional history.

async count_active()[source]

Count messages awaiting delivery.

Return type:

int

Returns:

Number of messages with smtp_ts IS NULL.

async count_pending_for_tenant(tenant_id, batch_code=None)[source]

Count pending messages for a tenant.

Parameters:
  • tenant_id (str) – Tenant identifier.

  • batch_code (str | None) – Optional batch code filter.

Return type:

int

Returns:

Number of pending messages.

Email message REST API endpoint.

This module provides the MessageEndpoint class exposing CRUD operations for email messages via REST API and CLI commands.

The endpoint is designed for automatic introspection by api_base and cli_base modules, which generate FastAPI routes and Typer commands from method signatures.

Example

CLI commands auto-generated:

mail-proxy messages add --tenant-id acme --id msg-001 --account-id main ...
mail-proxy messages list --tenant-id acme
mail-proxy messages get --message-id msg-001 --tenant-id acme
mail-proxy messages delete --message-pk uuid-...
mail-proxy messages add-batch --messages '[{...}, {...}]'
mail-proxy messages cleanup --tenant-id acme

Note

Enterprise Edition (EE) extends this with MessageEndpoint_EE mixin adding PEC-specific status information.

class core.mail_proxy.entities.message.endpoint.AttachmentPayload(**data)[source]

Bases: BaseModel

Email attachment specification.

filename

Attachment filename (may contain MD5 marker).

storage_path

Content location. Format depends on fetch_mode: - endpoint: query params (e.g., “doc_id=123”) - http_url: full URL (e.g., “https://files.example.com/file.pdf”) - base64: base64-encoded content - filesystem: absolute path (e.g., “/var/attachments/file.pdf”)

mime_type

Optional MIME type override.

fetch_mode

Explicit fetch mode. If not provided, inferred from path.

content_md5

MD5 hash for cache lookup.

auth

Optional authentication override for HTTP requests.

Example

Attachment from HTTP URL:

AttachmentPayload(
    filename="report.pdf",
    storage_path="https://cdn.example.com/docs/report.pdf",
    fetch_mode=FetchMode.HTTP_URL,
)
Parameters:

data (Any)

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

filename: Annotated[str, FieldInfo(annotation=NoneType, required=True, description='Attachment filename', metadata=[MinLen(min_length=1), MaxLen(max_length=255)])]
storage_path: Annotated[str, FieldInfo(annotation=NoneType, required=True, description='Storage path', metadata=[MinLen(min_length=1)])]
mime_type: Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='MIME type override')]
fetch_mode: Annotated[FetchMode | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Fetch mode')]
content_md5: Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='MD5 hash', metadata=[_PydanticGeneralMetadata(pattern='^[a-fA-F0-9]{32}$')])]
auth: Annotated[dict[str, Any] | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Auth override')]
class core.mail_proxy.entities.message.endpoint.FetchMode(value)[source]

Bases: str, Enum

Source mode for fetching email attachments.

ENDPOINT

Fetch from configured HTTP endpoint with path parameter.

HTTP_URL

Fetch directly from a full HTTP/HTTPS URL.

BASE64

Inline base64-encoded content.

FILESYSTEM

Fetch from local filesystem path.

ENDPOINT = 'endpoint'
HTTP_URL = 'http_url'
BASE64 = 'base64'
FILESYSTEM = 'filesystem'
class core.mail_proxy.entities.message.endpoint.MessageEndpoint(table)[source]

Bases: BaseEndpoint

REST API endpoint for email message management.

Provides CRUD operations for email messages including batch operations for bulk insert/delete and cleanup of old messages.

name

Endpoint name used in URL paths (“messages”).

table

MessagesTable instance for database operations.

Example

Using the endpoint programmatically:

endpoint = MessageEndpoint(db.table("messages"))

# Add a single message
msg = await endpoint.add(
    id="msg-001",
    tenant_id="acme",
    account_id="main",
    from_addr="sender@acme.com",
    to=["user@example.com"],
    subject="Test",
    body="Hello",
)

# List messages
messages = await endpoint.list(tenant_id="acme")
Parameters:

table (MessagesTable)

name: str = 'messages'
__init__(table)[source]

Initialize endpoint with table reference.

Parameters:

table (MessagesTable) – MessagesTable instance for database operations.

async add(id, tenant_id, account_id, from_addr, to, subject, body, cc=None, bcc=None, reply_to=None, return_path=None, content_type='plain', message_id=None, priority=2, deferred_ts=None, batch_code=None, attachments=None, headers=None)[source]

Add a new message to the delivery queue.

Parameters:
  • id (str) – Unique message identifier (client-provided).

  • tenant_id (str) – Tenant identifier.

  • account_id (str) – SMTP account to use for sending.

  • from_addr (str) – Sender email address.

  • to (list[str]) – List of recipient addresses.

  • subject (str) – Email subject line.

  • body (str) – Email body content.

  • cc (list[str] | None) – List of CC addresses.

  • bcc (list[str] | None) – List of BCC addresses.

  • reply_to (str | None) – Reply-To address.

  • return_path (str | None) – Return-Path (envelope sender) address.

  • content_type (Literal['plain', 'html']) – Body content type (“plain” or “html”).

  • message_id (str | None) – Custom Message-ID header.

  • priority (int) – Delivery priority (0=immediate, 1=high, 2=medium, 3=low).

  • deferred_ts (int | None) – Unix timestamp to defer delivery until.

  • batch_code (str | None) – Batch/campaign identifier for grouping.

  • attachments (list[dict[str, Any]] | None) – List of attachment specifications.

  • headers (dict[str, str] | None) – Additional email headers.

Return type:

dict

Returns:

Dict with message id and pk.

Raises:

ValueError – If message could not be added.

async get(message_id, tenant_id)[source]

Retrieve a single message by ID.

Parameters:
  • message_id (str) – Client-provided message identifier.

  • tenant_id (str) – Tenant identifier.

Return type:

dict

Returns:

Message dict with status and payload.

Raises:

ValueError – If message not found.

async list(tenant_id, active_only=False, include_history=False)[source]

List messages for a tenant.

Parameters:
  • tenant_id (str) – Tenant to list messages for.

  • active_only (bool) – Only return pending messages.

  • include_history (bool) – Include event history for each message.

Return type:

list[dict]

Returns:

List of message dicts with status info.

async delete(message_pk)[source]

Delete a message by internal primary key.

Parameters:

message_pk (str) – Internal message UUID.

Return type:

bool

Returns:

True if deleted, False if not found.

async count_active()[source]

Count messages awaiting delivery.

Return type:

int

Returns:

Number of active (pending) messages.

async count_pending_for_tenant(tenant_id, batch_code=None)[source]

Count pending messages for a tenant.

Parameters:
  • tenant_id (str) – Tenant identifier.

  • batch_code (str | None) – Optional batch code filter.

Return type:

int

Returns:

Number of pending messages.

async add_batch(messages, default_priority=None)[source]

Add multiple messages in a single operation.

Validates each message and queues valid ones. Invalid messages are reported in the rejected list.

Parameters:
  • messages (list[dict[str, Any]]) – List of message dicts. Each must have: - id: Unique message identifier - tenant_id: Tenant identifier - account_id: SMTP account to use - from (or from_addr): Sender address - to: Recipient(s) - subject: Email subject Optional: body, cc, bcc, reply_to, return_path, content_type, message_id, priority, deferred_ts, batch_code, attachments, headers.

  • default_priority (int | None) – Default priority for messages without explicit priority.

Returns:

  • ok: True

  • queued: Number of messages queued

  • rejected: List of {id, reason} for invalid messages

Return type:

Dict with

Example

result = await endpoint.add_batch([
    {"id": "m1", "tenant_id": "t1", "account_id": "a1",
     "from": "a@b.com", "to": ["c@d.com"], "subject": "Hi"},
    {"id": "m2", ...},
])
# Returns: {"ok": True, "queued": 2, "rejected": []}
async delete_batch(tenant_id, ids)[source]

Delete multiple messages by their IDs.

Validates ownership before deletion.

Parameters:
  • tenant_id (str) – Tenant identifier for authorization check.

  • ids (list[str]) – List of message IDs to delete.

Returns:

  • ok: True

  • removed: Number of messages deleted

  • not_found: List of IDs that don’t exist

  • unauthorized: List of IDs belonging to other tenants

Return type:

Dict with

async cleanup(tenant_id, older_than_seconds=None)[source]

Clean up fully reported messages older than retention period.

Removes messages that have been delivered and reported to the client, freeing up database space.

Parameters:
  • tenant_id (str) – Tenant identifier.

  • older_than_seconds (int | None) – Messages reported before (now - older_than_seconds) will be deleted. Defaults to 86400 (24 hours).

Return type:

dict

Returns:

Dict with ok=True and removed count.

class core.mail_proxy.entities.message.endpoint.MessageStatus(value)[source]

Bases: str, Enum

Current delivery status of an email message.

PENDING

Queued and waiting for delivery attempt.

DEFERRED

Temporarily delayed (rate limit or soft error).

SENT

Successfully delivered to SMTP server.

ERROR

Delivery failed with permanent error.

PENDING = 'pending'
DEFERRED = 'deferred'
SENT = 'sent'
ERROR = 'error'

Message Event Entity

Message event table for delivery tracking.

This module implements event-based tracking for message lifecycle changes. Each significant state change (sent, error, deferred, bounce, PEC receipts) is recorded as a separate event, enabling complete delivery history.

Components:

MessageEventTable: Table manager for message events.

Event Types:
  • deferred: Message was deferred (rate limit, temporary failure)

  • sent: Message successfully delivered via SMTP

  • error: Permanent delivery failure

  • bounce: Bounce notification received (EE)

  • pec_acceptance: PEC acceptance receipt (EE)

  • pec_delivery: PEC delivery receipt (EE)

  • pec_error: PEC error notification (EE)

Example

Record and query events:

from core.mail_proxy.proxy_base import MailProxyBase

proxy = MailProxyBase(db_path=":memory:")
await proxy.init()

events = proxy.db.table("message_events")

# Record a sent event
await events.add_event(
    message_pk="550e8400-e29b-41d4-a716-446655440000",
    event_type="sent",
    event_ts=1704067200,
    description="250 OK id=1abc-2def",
)

# Fetch unreported events for sync
unreported = await events.fetch_unreported(limit=100)

# Mark as reported
event_ids = [e["event_id"] for e in unreported]
await events.mark_reported(event_ids, reported_ts=1704067260)

Note

Events are linked to messages via message_pk (UUID primary key). The trigger_on_inserted method automatically updates message status when events are recorded.

class core.mail_proxy.entities.message_event.table.MessageEventTable(db)[source]

Bases: Table

Message events storage table.

Records delivery events for complete message history and reporting. Each event links to a message via message_pk and includes timestamp, type, description, and optional metadata.

name

Table name (“message_events”).

pkey

Primary key column (“id”, auto-increment INTEGER).

Table Schema:
  • id: Auto-increment primary key

  • message_pk: FK to messages.pk (UUID)

  • event_type: Event category (sent, error, deferred, etc.)

  • event_ts: Unix timestamp when event occurred

  • description: Error message, bounce reason, etc.

  • metadata: JSON for extra data (bounce_type, deferred_ts)

  • reported_ts: When event was synced to client

Example

Query message history:

events = proxy.db.table("message_events")

# Get all events for a message
history = await events.get_events_for_message(
    "550e8400-e29b-41d4-a716-446655440000"
)
for event in history:
    print(f"{event['event_type']} at {event['event_ts']}")

# Check pending reports
count = await events.count_unreported_for_message(message_pk)
Parameters:

db (SqlDb)

name: str = 'message_events'
pkey: str | None = 'id'
new_pkey_value()[source]

Return None for INTEGER PRIMARY KEY autoincrement.

Return type:

None

configure()[source]

Define table columns.

Return type:

None

Columns:

id: Auto-increment primary key (INTEGER). message_pk: Reference to messages.pk (UUID string). event_type: Event category for filtering. event_ts: Unix timestamp of event occurrence. description: Human-readable event details. metadata: JSON-encoded extra data. reported_ts: Unix timestamp when synced to client.

async trigger_on_inserted(record)[source]

Update message status based on event type.

Called automatically after event insert. Updates the message’s state in the messages table based on event_type.

Parameters:

record (dict[str, Any]) – Inserted event record with event_type and message_pk.

Return type:

None

Note

  • “sent” → marks message as sent

  • “error” → marks message as error

  • “deferred” → sets deferred_ts from metadata or event_ts

async add_event(message_pk, event_type, event_ts, description=None, metadata=None)[source]

Record a message event.

Parameters:
  • message_pk (str) – Message’s internal pk (UUID).

  • event_type (str) – Event category (sent, error, deferred, bounce, pec_*).

  • event_ts (int) – Unix timestamp when event occurred.

  • description (str | None) – Optional error message or reason.

  • metadata (dict[str, Any] | None) – Optional dict serialized as JSON.

Return type:

int

Returns:

Number of rows inserted (typically 1).

Note

Triggers are called automatically after insert, updating message status based on event_type.

async fetch_unreported(limit)[source]

Fetch events not yet reported to clients.

Parameters:

limit (int) – Maximum number of events to return.

Return type:

list[dict[str, Any]]

Returns:

List of event dicts with message_id for external reporting, ordered chronologically by event_ts.

Example

unreported = await events.fetch_unreported(limit=100)
for event in unreported:
    # event contains: event_id, message_id, event_type,
    # event_ts, description, metadata, account_id, tenant_id
    await report_to_client(event)
async mark_reported(event_ids, reported_ts)[source]

Mark events as reported to client.

Parameters:
  • event_ids (list[int]) – List of event IDs to mark.

  • reported_ts (int) – Unix timestamp of report.

Return type:

None

async get_events_for_message(message_pk)[source]

Get all events for a message, ordered chronologically.

Parameters:

message_pk (str) – Internal message pk (UUID).

Return type:

list[dict[str, Any]]

Returns:

List of event dicts with parsed metadata.

async delete_for_message(message_pk)[source]

Delete all events for a message.

Parameters:

message_pk (str) – Internal message pk (UUID).

Return type:

int

Returns:

Number of deleted records.

async count_unreported_for_message(message_pk)[source]

Count unreported events for a message.

Parameters:

message_pk (str) – Internal message pk (UUID).

Return type:

int

Returns:

Number of events with reported_ts IS NULL.

Instance Entity

Instance configuration table manager.

This module provides the InstanceTable class for managing instance-level configuration in a singleton pattern (single row with id=1).

The instance table stores:
  • Service identity: name, api_token

  • Edition: “ce” (Community) or “ee” (Enterprise)

  • Flexible config: JSON storage for additional settings

Configuration access follows a dual pattern:
  • Typed columns: name, api_token, edition (direct column access)

  • JSON config: Additional key-value pairs in config column

Example

Basic instance configuration:

from core.mail_proxy.proxy_base import MailProxyBase

proxy = MailProxyBase(db_path=":memory:")
await proxy.init()

instance = proxy.db.table("instance")

# Set instance name
await instance.set_name("production-mailer")

# Set API token
await instance.set_api_token("secret-token")

# Store additional config
await instance.set_config("host", "0.0.0.0")
await instance.set_config("port", "8080")

# Check edition
is_ee = await instance.is_enterprise()

Note

Enterprise Edition (EE) extends this class with InstanceTable_EE mixin, adding bounce detection IMAP configuration columns.

class core.mail_proxy.entities.instance.table.InstanceTable(db)[source]

Bases: Table

Singleton table for instance-level configuration.

Stores instance-wide settings in a single row (id=1). Provides typed access to common settings and flexible JSON storage for additional configuration.

name

Table name (“instance”).

pkey

Primary key column (“id”).

Table Schema:
  • id: Always 1 (singleton pattern)

  • name: Instance display name

  • api_token: Master API token for authentication

  • edition: “ce” (Community) or “ee” (Enterprise)

  • config: JSON storage for additional settings

  • created_at: Creation timestamp

  • updated_at: Last modification timestamp

Example

Configure instance settings:

instance = proxy.db.table("instance")

# Typed column access
await instance.set_name("my-mailer")
name = await instance.get_name()

# JSON config access
await instance.set_config("max_workers", "4")
workers = await instance.get_config("max_workers")

# Get all config merged
all_config = await instance.get_all_config()
Parameters:

db (SqlDb)

name: str = 'instance'
pkey: str | None = 'id'
configure()[source]

Define table columns.

Return type:

None

Columns:

id: Singleton ID (always 1, INTEGER primary key). name: Instance display name (default: “mail-proxy”). api_token: Master API token for authentication. edition: “ce” or “ee” (default: “ce”). config: JSON storage for additional key-value settings. created_at: Row creation timestamp. updated_at: Last modification timestamp.

Note

EE columns (bounce_*) are added by InstanceTable_EE.configure().

async get_instance()[source]

Get the singleton instance configuration.

Return type:

dict[str, Any] | None

Returns:

Instance record dict, or None if not yet created.

async ensure_instance()[source]

Get or create the singleton instance configuration.

Creates the singleton row if it doesn’t exist.

Return type:

dict[str, Any]

Returns:

Instance record dict (never None).

async update_instance(updates)[source]

Update the singleton instance configuration.

Parameters:

updates (dict[str, Any]) – Dict of column names to new values.

Return type:

None

Example

await instance.update_instance({
    "name": "production",
    "api_token": "new-token",
})
async get_name()[source]

Get instance display name.

Return type:

str

Returns:

Instance name, defaults to “mail-proxy” if not set.

async set_name(name)[source]

Set instance display name.

Parameters:

name (str) – New instance name.

Return type:

None

async get_api_token()[source]

Get master API token.

Return type:

str | None

Returns:

API token string, or None if not set.

async set_api_token(token)[source]

Set master API token.

Parameters:

token (str) – New API token.

Return type:

None

async get_edition()[source]

Get current edition.

Return type:

str

Returns:

“ce” (Community Edition) or “ee” (Enterprise Edition).

async is_enterprise()[source]

Check if running in Enterprise Edition mode.

Return type:

bool

Returns:

True if edition is “ee”, False otherwise.

async set_edition(edition)[source]

Set edition.

Parameters:

edition (str) – “ce” (Community) or “ee” (Enterprise).

Raises:

ValueError – If edition is not “ce” or “ee”.

Return type:

None

async get_config(key, default=None)[source]

Get a configuration value by key.

Uses dual access pattern:
  • Keys in _TYPED_CONFIG_KEYS: read from typed columns

  • Other keys: read from JSON config column

Parameters:
  • key (str) – Configuration key name.

  • default (str | None) – Default value if key not found.

Return type:

str | None

Returns:

Configuration value as string, or default.

Example

# Typed column
name = await instance.get_config("name")

# JSON config
port = await instance.get_config("port", "8080")
async set_config(key, value)[source]

Set a configuration value.

Uses dual access pattern:
  • Keys in _TYPED_CONFIG_KEYS: save to typed columns

  • Other keys: save to JSON config column

Parameters:
  • key (str) – Configuration key name.

  • value (str) – Configuration value (string).

Return type:

None

Example

# Typed column
await instance.set_config("name", "production")

# JSON config
await instance.set_config("host", "0.0.0.0")
async get_all_config()[source]

Get all configuration values merged.

Returns typed columns and JSON config merged into a single dict. JSON config values override typed columns if same key exists.

Return type:

dict[str, Any]

Returns:

Dict with all configuration key-value pairs.

Example

config = await instance.get_all_config()
# Returns: {"name": "my-mailer", "edition": "ce", "host": "0.0.0.0", ...}

Instance REST API endpoint for service-level operations.

This module provides the InstanceEndpoint class exposing service-level operations for the mail proxy via REST API and CLI commands.

Operations include:
  • health: Container orchestration health check (unauthenticated)

  • status: Authenticated service status with active state

  • run_now: Trigger immediate dispatch cycle

  • suspend/activate: Control message sending per tenant/batch

  • get/update: Instance configuration management

  • get_sync_status: Monitor tenant synchronization health

  • upgrade_to_ee: Transition from Community to Enterprise Edition

Example

CLI commands auto-generated:

mail-proxy instance health
mail-proxy instance status
mail-proxy instance run-now --tenant-id acme
mail-proxy instance suspend --tenant-id acme
mail-proxy instance activate --tenant-id acme
mail-proxy instance get
mail-proxy instance update --name production
mail-proxy instance get-sync-status
mail-proxy instance upgrade-to-ee

Note

Enterprise Edition (EE) extends this with InstanceEndpoint_EE mixin adding bounce detection configuration operations.

class core.mail_proxy.entities.instance.endpoint.InstanceEndpoint(table, proxy=None)[source]

Bases: BaseEndpoint

REST API endpoint for instance-level operations.

Provides service management operations including health checks, dispatch control, and configuration management.

name

Endpoint name used in URL paths (“instance”).

table

InstanceTable instance for configuration storage.

proxy

Optional MailProxy instance for service operations.

Example

Using the endpoint programmatically:

endpoint = InstanceEndpoint(db.table("instance"), proxy)

# Check service health
health = await endpoint.health()

# Trigger dispatch
await endpoint.run_now(tenant_id="acme")

# Update configuration
await endpoint.update(name="production")
Parameters:
name: str = 'instance'
__init__(table, proxy=None)[source]

Initialize endpoint with table and optional proxy reference.

Parameters:
  • table (InstanceTable) – InstanceTable for configuration storage.

  • proxy (object | None) – Optional MailProxy instance for service operations. When provided, enables run_now, suspend, activate, and get_sync_status to interact with the running service.

async health()[source]

Health check for container orchestration.

Lightweight endpoint for liveness/readiness probes. Does not require authentication. Returns immediately without database access.

Return type:

dict

Returns:

Dict with status “ok”.

Example

# Kubernetes liveness probe
# GET /instance/health
{"status": "ok"}
async status()[source]

Authenticated service status.

Returns the current active state of the mail proxy service. Requires authentication.

Return type:

dict

Returns:

Dict with ok=True and active boolean indicating if the dispatch loop is running.

async run_now(tenant_id=None)[source]

Trigger immediate dispatch cycle.

Resets the tenant’s sync timer, causing the next dispatch loop iteration to process messages immediately.

Parameters:

tenant_id (str | None) – If provided, only reset this tenant’s sync timer. If None, triggers dispatch for all tenants.

Return type:

dict

Returns:

Dict with ok=True.

async suspend(tenant_id, batch_code=None)[source]

Suspend message sending for a tenant.

Prevents messages from being dispatched for the specified tenant or batch. Messages remain in queue and will be sent when activated.

Parameters:
  • tenant_id (str) – Tenant to suspend.

  • batch_code (str | None) – Optional batch code. If None, suspends all batches.

Return type:

dict

Returns:

Dict with suspended batches list and pending message count.

async activate(tenant_id, batch_code=None)[source]

Resume message sending for a tenant.

Removes suspension for the specified tenant or batch, allowing queued messages to be dispatched.

Parameters:
  • tenant_id (str) – Tenant to activate.

  • batch_code (str | None) – Optional batch code. If None, clears all suspensions.

Return type:

dict

Returns:

Dict with remaining suspended batches list.

async get()[source]

Get instance configuration.

Return type:

dict

Returns:

Dict with ok=True and all instance configuration fields.

async update(name=None, api_token=None, edition=None)[source]

Update instance configuration.

Parameters:
  • name (str | None) – New instance display name.

  • api_token (str | None) – New master API token.

  • edition (str | None) – New edition (“ce” or “ee”).

Return type:

dict

Returns:

Dict with ok=True.

async get_sync_status()[source]

Get sync status for all tenants.

Returns synchronization health information for each tenant, useful for monitoring and debugging delivery issues.

Returns:

  • id: Tenant identifier

  • last_sync_ts: Unix timestamp of last sync

  • next_sync_due: True if sync interval has expired

  • in_dnd: True if tenant is in Do Not Disturb mode

Return type:

Dict with ok=True and tenants list. Each tenant contains

async upgrade_to_ee()[source]

Upgrade from Community Edition to Enterprise Edition.

Performs explicit upgrade from CE to EE mode:
  1. Verifies Enterprise modules are installed

  2. Sets edition=”ee” in instance configuration

  3. Optionally generates API key for “default” tenant

The upgrade is idempotent - calling when already EE is safe.

Return type:

dict

Returns:

Dict with ok=True, edition, optional default_tenant_token, and descriptive message.

Raises:

ValueError – If Enterprise modules are not installed.

Command Log Entity

Command log table for API audit trail.

This module implements an audit log for API commands, recording every state-modifying operation with its full payload. This enables:

  • Debugging: Trace what happened and when

  • Migration: Replay commands to reconstruct state

  • Recovery: Restore from command history after failures

  • Compliance: Audit trail for regulatory requirements

Logged commands include:
  • POST /commands/add-messages: Queue messages for delivery

  • POST /commands/delete-messages: Remove messages from queue

  • POST /commands/cleanup-messages: Remove old reported messages

  • POST /account: Create/update SMTP account

  • DELETE /account/{id}: Remove SMTP account

  • POST /tenant: Create/update tenant

  • DELETE /tenant/{id}: Remove tenant

Example

Log an API command:

from core.mail_proxy.proxy_base import MailProxyBase

proxy = MailProxyBase(db_path=":memory:")
await proxy.init()

log = proxy.db.table("command_log")

# Record a command
cmd_id = await log.log_command(
    endpoint="POST /commands/add-messages",
    payload={"messages": [{"to": "user@example.com"}]},
    tenant_id="acme",
    response_status=200,
)

# List recent commands
commands = await log.list_commands(tenant_id="acme", limit=10)

# Export for replay
export = await log.export_commands(tenant_id="acme")
class core.mail_proxy.entities.command_log.table.CommandLogTable(db)[source]

Bases: Table

Audit log table for API command tracking.

Records every state-modifying API command with timestamp, endpoint, tenant context, request payload, and response status. Commands are stored chronologically for replay and debugging.

name

Table name (“command_log”).

pkey

Primary key column (“id”, auto-increment INTEGER).

Table Schema:
  • id: Auto-increment primary key

  • command_ts: Unix timestamp of command

  • endpoint: HTTP method + path (e.g., “POST /commands/add-messages”)

  • tenant_id: Tenant context (nullable for global commands)

  • payload: JSON request body

  • response_status: HTTP status code

  • response_body: JSON response summary

Example

Query command history:

log = proxy.db.table("command_log")

# Get last 24 hours of commands for a tenant
import time
since = int(time.time()) - 86400
commands = await log.list_commands(
    tenant_id="acme",
    since_ts=since,
)

# Export for migration
export = await log.export_commands()
# Returns: [{"endpoint": "...", "payload": {...}, ...}, ...]
Parameters:

db (SqlDb)

name: str = 'command_log'
pkey: str | None = 'id'
new_pkey_value()[source]

Return None for INTEGER PRIMARY KEY autoincrement.

Return type:

None

configure()[source]

Define table columns.

Return type:

None

Columns:

id: Auto-increment primary key (INTEGER). command_ts: Unix timestamp of command execution. endpoint: HTTP method and path combined. tenant_id: Tenant context for multi-tenant filtering. payload: JSON-serialized request body. response_status: HTTP response status code. response_body: JSON-serialized response summary.

async log_command(endpoint, payload, *, tenant_id=None, response_status=None, response_body=None, command_ts=None)[source]

Record an API command in the audit log.

Parameters:
  • endpoint (str) – HTTP method + path (e.g., “POST /commands/add-messages”).

  • payload (dict[str, Any]) – Request body as dict (will be JSON-serialized).

  • tenant_id (str | None) – Tenant context for multi-tenant commands.

  • response_status (int | None) – HTTP response status code.

  • response_body (dict[str, Any] | None) – Response body as dict (will be JSON-serialized).

  • command_ts (int | None) – Unix timestamp. Defaults to current time.

Return type:

int

Returns:

Auto-generated command log ID.

Example

cmd_id = await log.log_command(
    endpoint="POST /account",
    payload={"id": "main", "host": "smtp.example.com"},
    tenant_id="acme",
    response_status=200,
)
async list_commands(*, tenant_id=None, since_ts=None, until_ts=None, endpoint_filter=None, limit=100, offset=0)[source]

List logged commands with optional filters.

Parameters:
  • tenant_id (str | None) – Filter by tenant.

  • since_ts (int | None) – Include commands with command_ts >= since_ts.

  • until_ts (int | None) – Include commands with command_ts <= until_ts.

  • endpoint_filter (str | None) – Filter by endpoint (SQL LIKE partial match).

  • limit (int) – Maximum number of results to return.

  • offset (int) – Skip first N results for pagination.

Return type:

list[dict[str, Any]]

Returns:

List of command records with parsed JSON fields, ordered by timestamp ascending.

Example

# Get add-messages commands for last hour
import time
since = int(time.time()) - 3600
commands = await log.list_commands(
    tenant_id="acme",
    since_ts=since,
    endpoint_filter="add-messages",
)
async get_command(command_id)[source]

Retrieve a single command log entry by ID.

Parameters:

command_id (int) – Command log entry ID.

Return type:

dict[str, Any] | None

Returns:

Command record dict with parsed JSON fields, or None if not found.

async export_commands(*, tenant_id=None, since_ts=None, until_ts=None)[source]

Export commands in replay-friendly format.

Returns minimal fields needed for replay: endpoint, tenant_id, payload, and command_ts (for ordering). Response data is excluded.

Parameters:
  • tenant_id (str | None) – Filter by tenant.

  • since_ts (int | None) – Include commands with command_ts >= since_ts.

  • until_ts (int | None) – Include commands with command_ts <= until_ts.

Return type:

list[dict[str, Any]]

Returns:

List of command dicts suitable for replay.

Example

# Export all commands for migration
export = await log.export_commands()

# Replay on new instance
for cmd in export:
    await api.call(cmd["endpoint"], cmd["payload"])
async purge_before(threshold_ts)[source]

Delete command logs older than threshold.

Used for log rotation to prevent unbounded growth.

Parameters:

threshold_ts (int) – Delete commands with command_ts < threshold_ts.

Return type:

int

Returns:

Number of deleted records.

Example

# Delete logs older than 30 days
import time
threshold = int(time.time()) - (30 * 86400)
deleted = await log.purge_before(threshold)
print(f"Purged {deleted} old log entries")

Command log REST API endpoint.

This module provides the CommandLogEndpoint class exposing operations for querying and managing the API command audit trail.

The endpoint is designed for automatic introspection by api_base and cli_base modules, which generate FastAPI routes and Typer commands from method signatures.

Example

CLI commands auto-generated:

mail-proxy command_log list --tenant-id acme --limit 50
mail-proxy command_log get --command-id 123
mail-proxy command_log export --tenant-id acme
mail-proxy command_log purge --threshold-ts 1700000000

Note

The command log is append-only during normal operation. Only the purge command removes entries, for log rotation purposes.

class core.mail_proxy.entities.command_log.endpoint.CommandLogEndpoint(table)[source]

Bases: BaseEndpoint

REST API endpoint for command log audit trail.

Provides read access to the command log with filtering and export capabilities, plus a purge operation for log rotation.

name

Endpoint name used in URL paths (“command_log”).

table

CommandLogTable instance for database operations.

Example

Using the endpoint programmatically:

endpoint = CommandLogEndpoint(db.table("command_log"))

# List recent commands
commands = await endpoint.list(tenant_id="acme", limit=10)

# Export for backup
export = await endpoint.export(tenant_id="acme")
Parameters:

table (CommandLogTable)

name: str = 'command_log'
__init__(table)[source]

Initialize endpoint with table reference.

Parameters:

table (CommandLogTable) – CommandLogTable instance for database operations.

async list(tenant_id=None, since_ts=None, until_ts=None, endpoint_filter=None, limit=100, offset=0)[source]

List logged commands with optional filters.

Parameters:
  • tenant_id (str | None) – Filter by tenant.

  • since_ts (int | None) – Include commands with command_ts >= since_ts.

  • until_ts (int | None) – Include commands with command_ts <= until_ts.

  • endpoint_filter (str | None) – Filter by endpoint (partial match).

  • limit (int) – Maximum results to return.

  • offset (int) – Skip first N results for pagination.

Return type:

list[dict[str, Any]]

Returns:

List of command records ordered by timestamp.

async get(command_id)[source]

Retrieve a specific command by ID.

Parameters:

command_id (int) – Command log entry ID.

Return type:

dict[str, Any]

Returns:

Command record dict with parsed JSON fields.

Raises:

ValueError – If command not found.

async export(tenant_id=None, since_ts=None, until_ts=None)[source]

Export commands in replay-friendly format.

Returns minimal fields needed for replay, excluding response data.

Parameters:
  • tenant_id (str | None) – Filter by tenant.

  • since_ts (int | None) – Include commands with command_ts >= since_ts.

  • until_ts (int | None) – Include commands with command_ts <= until_ts.

Returns:

endpoint, tenant_id, payload, command_ts.

Return type:

List of command dicts with

async purge(threshold_ts)[source]

Delete command logs older than threshold.

Used for log rotation to prevent unbounded growth.

Parameters:

threshold_ts (int) – Delete commands with command_ts < threshold_ts.

Return type:

dict[str, Any]

Returns:

Dict with “ok” status and “deleted” count.

Storage Entity

Storages table: per-tenant storage backend configurations.

class core.mail_proxy.entities.storage.table.StoragesTable(db)[source]

Bases: Table

Storages table: named storage backends per tenant.

Each tenant can have multiple named storage backends (e.g., HOME, SALES, ARCHIVE). CE supports only local filesystem; EE adds S3, GCS, Azure via fsspec.

Schema: pk (UUID), tenant_id, name (unique per tenant), protocol, config (JSON).

Parameters:

db (SqlDb)

name: str = 'storages'
pkey: str | None = 'pk'
create_table_sql()[source]

Generate CREATE TABLE with UNIQUE (tenant_id, name).

Return type:

str

configure()[source]

Override to define columns. Called during __init__.

Return type:

None

async add(storage)[source]

Insert or update a storage configuration.

Parameters:

storage (dict[str, Any]) – Dict with tenant_id, name, protocol, and protocol-specific config.

Return type:

str

Returns:

The storage’s internal pk (UUID).

async get(tenant_id, name)[source]

Fetch a single storage configuration.

Parameters:
  • tenant_id (str) – The tenant that owns this storage.

  • name (str) – The storage name (e.g., “HOME”).

Raises:

ValueError – If storage not found for this tenant.

Return type:

dict[str, Any]

async list_all(tenant_id=None)[source]

Return storages, optionally filtered by tenant.

Parameters:

tenant_id (str | None)

Return type:

list[dict[str, Any]]

async remove(tenant_id, name)[source]

Remove a storage configuration.

Parameters:
  • tenant_id (str) – The tenant that owns this storage.

  • name (str) – The storage name.

Return type:

bool

Returns:

True if deleted, False if not found.

async get_storage_manager(tenant_id)[source]

Get a configured StorageManager for a tenant.

Returns a StorageManager with all tenant’s storages registered.

Parameters:

tenant_id (str)

Storage endpoint: CRUD operations for tenant storage backends.

Designed for introspection by api_base/cli_base to auto-generate routes/commands.

class core.mail_proxy.entities.storage.endpoint.StorageEndpoint(table)[source]

Bases: BaseEndpoint

Storage management endpoint. Methods are introspected for API/CLI generation.

Parameters:

table (StoragesTable)

name: str = 'storages'
__init__(table)[source]

Initialize endpoint with table reference.

Parameters:

table (StoragesTable) – Database table instance for operations.

async add(tenant_id, name, protocol, config=None)[source]

Add or update a storage backend for a tenant.

Parameters:
  • tenant_id (str) – The tenant ID.

  • name (str) – Storage name (e.g., “HOME”, “SALES”).

  • protocol (str) – Storage protocol (local, s3, gcs, azure).

  • config (dict[str, Any] | None) – Protocol-specific configuration.

Return type:

dict

For local protocol:

config: {“base_path”: “/data/attachments”}

For S3 protocol (EE only):
config: {“bucket”: “my-bucket”, “prefix”: “attachments/”,

“aws_access_key_id”: “…”, “aws_secret_access_key”: “…”}

For GCS protocol (EE only):
config: {“bucket”: “my-bucket”, “prefix”: “attachments/”,

“project”: “…”, “token”: “…”}

For Azure protocol (EE only):
config: {“container”: “my-container”, “prefix”: “attachments/”,

“account_name”: “…”, “account_key”: “…”}

async get(tenant_id, name)[source]

Get a single storage configuration.

Parameters:
Return type:

dict

async list(tenant_id)[source]

List all storage backends for a tenant.

Parameters:

tenant_id (str)

Return type:

list[dict]

async delete(tenant_id, name)[source]

Delete a storage backend.

Parameters:
Return type:

dict

SMTP Components

SMTP Sender - main coordinator for email dispatch.

This module provides SmtpSender, the central component for SMTP email delivery. It coordinates: - Background dispatch loop for processing queued messages - Rate limiting per account - Email construction with attachments - SMTP connection management via pool - Retry logic with exponential backoff

SmtpSender is instantiated by MailProxy and accessed via proxy.smtp_sender.

Example

# SmtpSender is created by MailProxy proxy = MailProxy(db_path=”mail.db”) await proxy.start() # Starts smtp_sender automatically

# Direct access if needed proxy.smtp_sender.wake() # Trigger immediate dispatch cycle

class core.mail_proxy.smtp.sender.SmtpSender(proxy)[source]

Bases: object

Central coordinator for SMTP email dispatch.

Manages the complete email sending pipeline: - Fetches ready messages from database queue - Builds EmailMessage objects with attachments - Applies rate limiting per account - Sends via SMTP connection pool - Handles retries for temporary failures - Records delivery events

proxy

Parent MailProxy instance for accessing db, config, metrics.

pool

SMTP connection pool for connection reuse.

rate_limiter

Per-account rate limiting controller.

Parameters:

proxy (MailProxy)

__init__(proxy)[source]

Initialize SmtpSender.

Parameters:

proxy (MailProxy) – Parent MailProxy instance.

property db

Database access via proxy.

property config

Configuration via proxy.

property logger

Logger via proxy.

property metrics: MailMetrics

Prometheus metrics via proxy.

property attachments: AttachmentManager

Global attachment manager via proxy.

property default_host: str | None

Default SMTP host via proxy.

property default_port: int | None

Default SMTP port via proxy.

property default_user: str | None

Default SMTP user via proxy.

property default_password: str | None

Default SMTP password via proxy.

property default_use_tls: bool | None

Default use TLS via proxy.

async start()[source]

Start background dispatch and cleanup loops.

Return type:

None

async stop()[source]

Stop all background tasks gracefully.

Return type:

None

wake()[source]

Wake the dispatch loop for immediate processing.

Return type:

None

exception core.mail_proxy.smtp.sender.AccountConfigurationError(message='Missing SMTP account configuration')[source]

Bases: RuntimeError

Raised when SMTP account configuration is missing or invalid.

Parameters:

message (str)

__init__(message='Missing SMTP account configuration')[source]
Parameters:

message (str)

exception core.mail_proxy.smtp.sender.AttachmentTooLargeError(filename, size_mb, max_size_mb)[source]

Bases: ValueError

Raised when an attachment exceeds the size limit and action is ‘reject’.

Parameters:
__init__(filename, size_mb, max_size_mb)[source]
Parameters:

Asyncio-friendly SMTP connection pool with acquire/release pattern.

This module provides a connection pool for SMTP clients using a true resource pool pattern with acquire/release semantics. Connections are pooled by account key (host:port:user) and can be shared between concurrent coroutines.

Features: - Connection pooling per SMTP account - Configurable max connections per account - TTL-based connection expiration - Health checking via SMTP NOOP commands - Automatic reconnection when connections become stale - Context manager support for automatic release

Example

Using the SMTP pool with context manager:

pool = SMTPPool(ttl=300, max_per_account=5)

async with pool.connection(
    host="smtp.example.com",
    port=465,
    user="sender@example.com",
    password="secret",
    use_tls=True
) as smtp:
    await smtp.send_message(message)
# Connection automatically released back to pool

Manual acquire/release:

smtp = await pool.acquire(host, port, user, password, use_tls=True)
try:
    await smtp.send_message(message)
finally:
    await pool.release(smtp)
class core.mail_proxy.smtp.pool.SMTPPool(ttl=300, max_per_account=5)[source]

Bases: object

SMTP connection pool with acquire/release semantics.

Maintains pools of SMTP connections indexed by account key (host:port:user). Connections can be acquired by any coroutine and must be released after use. This enables efficient connection sharing across concurrent operations.

The pool uses TTL-based expiration and NOOP health checks to ensure connections remain valid. Invalid connections are discarded and replaced.

ttl

Maximum age in seconds for pooled connections.

max_per_account

Maximum connections per SMTP account.

Parameters:
  • ttl (int)

  • max_per_account (int)

__init__(ttl=300, max_per_account=5)[source]

Initialize the SMTP connection pool.

Parameters:
  • ttl (int) – Time-to-live in seconds for connections. Defaults to 300.

  • max_per_account (int) – Max concurrent connections per account. Defaults to 5.

async acquire(host, port, user, password, *, use_tls, timeout=30.0)[source]

Acquire a connection from the pool.

Returns an idle connection if available, otherwise creates a new one. If max connections reached, waits until one becomes available.

Parameters:
  • host (str) – SMTP server hostname.

  • port (int) – SMTP server port.

  • user (str | None) – Username for auth, or None.

  • password (str | None) – Password for auth, or None.

  • use_tls (bool) – Whether to use TLS.

  • timeout (float | None) – Max seconds to wait for connection. None = wait forever.

Return type:

SMTP

Returns:

SMTP connection ready for use.

Raises:

asyncio.TimeoutError – If timeout waiting for connection.

async release(smtp)[source]

Release a connection back to the pool.

The connection is returned to the idle pool for reuse by other coroutines. If the connection is unhealthy, it is closed instead.

Parameters:

smtp (SMTP) – The SMTP connection to release.

Return type:

None

connection(host, port, user, password, *, use_tls, timeout=30.0)[source]

Context manager for automatic acquire/release.

Example

async with pool.connection(host, port, user, pwd, use_tls=True) as smtp:

await smtp.send_message(msg)

Parameters:
async get_connection(host, port, user, password, *, use_tls)[source]

Legacy API: Get a connection (acquire without explicit release).

DEPRECATED: Use acquire()/release() or connection() context manager.

This method exists for backward compatibility. Connections obtained this way should be released manually, or they will be cleaned up when the pool is cleaned.

Parameters:
  • host (str) – SMTP server hostname.

  • port (int) – SMTP server port.

  • user (str | None) – Username for auth, or None.

  • password (str | None) – Password for auth, or None.

  • use_tls (bool) – Whether to use TLS.

Return type:

SMTP

Returns:

Connected SMTP instance.

async cleanup()[source]

Remove expired and unhealthy connections from idle pools.

Should be called periodically to prevent resource leaks.

Return type:

None

async close_all()[source]

Close all connections in the pool.

Use when shutting down the application.

Return type:

None

stats()[source]

Return pool statistics.

Return type:

dict

Returns:

Dict with idle counts, active counts per account.

class core.mail_proxy.smtp.pool.PooledConnection(smtp, account_key, created_at=<factory>, last_used=<factory>)[source]

Bases: object

Wrapper for a pooled SMTP connection with metadata.

Parameters:
  • smtp (SMTP)

  • account_key (str)

  • created_at (float)

  • last_used (float)

smtp: SMTP
account_key: str
created_at: float
last_used: float
touch()[source]

Update last_used timestamp.

Return type:

None

age()[source]

Return connection age in seconds.

Return type:

float

idle_time()[source]

Return time since last use in seconds.

Return type:

float

__init__(smtp, account_key, created_at=<factory>, last_used=<factory>)
Parameters:
  • smtp (SMTP)

  • account_key (str)

  • created_at (float)

  • last_used (float)

Attachment fetching from multiple storage backends.

Provides AttachmentManager for retrieving email attachments with: - Base64 inline content decoding - Filesystem fetching with path traversal protection - HTTP fetching with authentication support - Optional MD5-based caching

Supported fetch_mode values: - endpoint: HTTP POST to tenant’s attachment URL - http_url: Direct HTTP fetch from URL in storage_path - base64: Inline base64-encoded content in storage_path - filesystem: Local filesystem path (absolute or relative to base_dir)

If fetch_mode is not specified, it is inferred from storage_path format: - base64:... prefix -> base64 (prefix is stripped) - http:// or https:// prefix -> http_url - / (absolute path) -> filesystem - otherwise -> endpoint (default)

class core.mail_proxy.smtp.attachments.Base64Fetcher[source]

Bases: object

Decoder for base64-encoded inline attachment content.

async fetch(base64_content)[source]
Parameters:

base64_content (str)

Return type:

bytes | None

class core.mail_proxy.smtp.attachments.StorageFetcher(storage_manager=None)[source]

Bases: object

Fetcher for attachments via StorageManager (mount:path format).

In CE: supports only ‘local’ protocol (filesystem). In EE: supports S3, GCS, Azure via fsspec.

Path formats: - “mount:path/to/file” → uses configured mount point - “/absolute/path” → direct filesystem access (legacy compatibility)

Parameters:

storage_manager (Any)

__init__(storage_manager=None)[source]

Initialize with optional StorageManager.

Parameters:

storage_manager (Any) – StorageManager instance. If None, only absolute paths work.

async fetch(path)[source]

Fetch file content from storage.

Parameters:

path (str) – Either “mount:relative/path” or “/absolute/path”.

Return type:

bytes | None

Returns:

File content as bytes.

Raises:
property storage_manager: Any

Get the StorageManager instance.

class core.mail_proxy.smtp.attachments.HttpFetcher(default_endpoint=None, auth_config=None)[source]

Bases: object

Fetcher for HTTP-served attachments with authentication support.

Parameters:
__init__(default_endpoint=None, auth_config=None)[source]
Parameters:
async fetch(path, auth_override=None)[source]
Parameters:
Return type:

bytes

property default_endpoint: str | None
class core.mail_proxy.smtp.attachments.AttachmentManager(storage_manager=None, http_endpoint=None, http_auth_config=None, cache=None)[source]

Bases: object

High-level interface for fetching email attachments from multiple sources.

Parameters:
__init__(storage_manager=None, http_endpoint=None, http_auth_config=None, cache=None)[source]
Parameters:
static parse_filename(filename)[source]

Extract MD5 marker from filename if present.

Parameters:

filename (str)

Return type:

tuple[str, str | None]

async fetch(att)[source]

Retrieve attachment content with caching and filename cleanup.

Parameters:

att (dict[str, Any])

Return type:

tuple[bytes, str] | None

static guess_mime(filename)[source]

Determine the MIME type for a filename based on its extension.

Parameters:

filename (str)

Return type:

tuple[str, str]

Two-tiered cache for attachment content.

Provides content-addressable caching using MD5 hash as key with automatic tier selection based on content size.

Components:

MemoryCache: Fast LRU cache with short TTL for small files. DiskCache: Persistent cache with longer TTL for larger files. TieredCache: Combined memory + disk cache with automatic routing.

Example

Create and use a tiered cache:

cache = TieredCache(
    memory_max_mb=50,
    disk_dir="/var/cache/attachments",
    disk_threshold_kb=100,
)
await cache.init()

content = await cache.get("a1b2c3d4...")
if content is None:
    content = await fetch_from_storage()
    await cache.set(TieredCache.compute_md5(content), content)

Note

Files smaller than disk_threshold_kb are cached in memory only. Larger files go to disk cache. Memory cache is checked first on reads for optimal performance.

class core.mail_proxy.smtp.cache.MemoryCache(max_mb=50, ttl_seconds=300)[source]

Bases: object

LRU in-memory cache with TTL and size limits.

_max_bytes

Maximum cache size in bytes.

_ttl_seconds

Time-to-live for entries.

Parameters:
__init__(max_mb=50, ttl_seconds=300)[source]
Parameters:
get(md5_hash)[source]
Parameters:

md5_hash (str)

Return type:

bytes | None

set(md5_hash, content)[source]
Parameters:
Return type:

None

clear()[source]
Return type:

None

cleanup_expired()[source]
Return type:

int

property size_bytes: int
property entry_count: int
class core.mail_proxy.smtp.cache.DiskCache(cache_dir, max_mb=500, ttl_seconds=3600)[source]

Bases: object

Persistent disk cache with TTL and size limits.

Uses subdirectory structure based on MD5 prefix for efficient filesystem operations with large numbers of files.

_cache_dir

Root directory for cached files.

_max_bytes

Maximum total cache size in bytes.

_ttl_seconds

Time-to-live for entries.

Parameters:
__init__(cache_dir, max_mb=500, ttl_seconds=3600)[source]
Parameters:
async init()[source]
Return type:

None

async get(md5_hash)[source]
Parameters:

md5_hash (str)

Return type:

bytes | None

async set(md5_hash, content)[source]
Parameters:
Return type:

None

async cleanup_expired()[source]
Return type:

int

async clear()[source]
Return type:

None

class core.mail_proxy.smtp.cache.TieredCache(memory_max_mb=50, memory_ttl_seconds=300, disk_dir=None, disk_max_mb=500, disk_ttl_seconds=3600, disk_threshold_kb=100)[source]

Bases: object

Two-tiered cache combining memory and disk storage.

Routes content to appropriate tier based on size threshold. Small files go to memory for fast access, large files to disk for persistence. Reads check memory first, then disk.

_memory

MemoryCache instance for small files.

_disk

Optional DiskCache for large files.

_threshold_bytes

Size threshold for tier selection.

Parameters:
__init__(memory_max_mb=50, memory_ttl_seconds=300, disk_dir=None, disk_max_mb=500, disk_ttl_seconds=3600, disk_threshold_kb=100)[source]
Parameters:
async init()[source]
Return type:

None

async get(md5_hash)[source]
Parameters:

md5_hash (str)

Return type:

bytes | None

async set(md5_hash, content)[source]
Parameters:
Return type:

None

async cleanup_expired()[source]
Return type:

tuple[int, int]

async clear()[source]
Return type:

None

static compute_md5(content)[source]
Parameters:

content (bytes)

Return type:

str

Retry strategy for SMTP message delivery.

This module provides a configurable retry strategy that determines when and how to retry failed message deliveries.

Example

Using default retry strategy:

strategy = RetryStrategy()

# Check if error is temporary
is_temp, smtp_code = strategy.classify_error(exception)

# Check if should retry
if strategy.should_retry(retry_count, exception):
    delay = strategy.calculate_delay(retry_count)
    # schedule retry after delay seconds
class core.mail_proxy.smtp.retry.RetryStrategy(max_retries=5, delays=(60, 300, 900, 3600, 7200))[source]

Bases: object

Configuration and logic for message retry behavior.

max_retries

Maximum number of retry attempts before permanent failure.

delays

Tuple of delay intervals in seconds for each retry attempt. If retry_count exceeds the length, the last delay is used.

Parameters:
max_retries: int = 5
delays: tuple[int, ...] = (60, 300, 900, 3600, 7200)
calculate_delay(retry_count)[source]

Calculate delay in seconds before next retry attempt.

Parameters:

retry_count (int) – Number of previous retry attempts (0-indexed).

Return type:

int

Returns:

Delay in seconds before next retry.

should_retry(retry_count, error)[source]

Determine if message should be retried.

Parameters:
  • retry_count (int) – Current retry count (0-indexed).

  • error (Exception) – The exception that caused the failure.

Return type:

bool

Returns:

True if the message should be retried.

classify_error(exc)[source]

Classify an SMTP error as temporary or permanent.

Parameters:

exc (Exception) – The exception to classify.

Return type:

tuple[bool, int | None]

Returns:

Tuple of (is_temporary, smtp_code). - is_temporary: True if the error should trigger a retry. - smtp_code: The SMTP error code if available, None otherwise.

__init__(max_retries=5, delays=(60, 300, 900, 3600, 7200))
Parameters:

In-memory sliding-window rate limiter.

This module implements per-account rate limiting with configurable limits at minute, hour, and day granularity. The limiter uses in-memory storage to track send history, enabling fast rate limiting decisions.

The sliding window approach ensures fair distribution of sends over time rather than allowing burst behavior at window boundaries.

To handle parallel dispatch correctly, the limiter tracks “in-flight” sends in memory. This ensures that concurrent sends are counted even before they complete and are logged.

Note: Send history is lost on service restart. This is acceptable for rate limiting purposes as it only allows a brief window of potentially higher sending rates after restart.

Example

Using the rate limiter:

rate_limiter = RateLimiter()
deferred_until, should_reject = await rate_limiter.check_and_plan(account)
if deferred_until:
    if should_reject:
        # Message should be rejected with rate limit error
        return {"error": "rate_limit_exceeded"}
    else:
        # Message should be deferred until this timestamp
        await persistence.set_deferred(msg_id, deferred_until)
else:
    # Safe to send now
    try:
        await send_message(msg)
        await rate_limiter.log_send(account_id)
    except Exception:
        await rate_limiter.release_slot(account_id)
        raise
class core.mail_proxy.smtp.rate_limiter.RateLimiter(smtp_sender=None)[source]

Bases: object

Per-account sliding-window rate limiter with in-memory storage.

Enforces configurable send rate limits at three granularities: - Per minute - Per hour - Per day

When any limit is exceeded, the limiter calculates the earliest timestamp at which the message can be safely sent without violating the limit.

Tracks in-flight sends in memory to handle parallel dispatch correctly.

Parameters:

smtp_sender (SmtpSender | None)

__init__(smtp_sender=None)[source]

Initialize the rate limiter.

Parameters:

smtp_sender (SmtpSender | None) – Parent SmtpSender instance for accessing proxy resources.

async check_and_plan(account)[source]

Check rate limits and calculate deferral timestamp if exceeded.

Evaluates the account’s configured rate limits against recent send history plus in-flight sends. If any limit is exceeded, returns a tuple indicating the deferral time and whether to reject.

If the check passes, reserves a slot by incrementing the in-flight counter. The caller MUST call either log_send() on success or release_slot() on failure to release the reservation.

Limits are checked in order of granularity (minute, hour, day) and the first exceeded limit determines the deferral time.

Parameters:

account (dict[str, Any]) – Account configuration dictionary containing: - id: The account identifier (required). - limit_per_minute: Max sends per minute (optional). - limit_per_hour: Max sends per hour (optional). - limit_per_day: Max sends per day (optional). - limit_behavior: “defer” (default) or “reject”.

Returns:

  • deferred_until: Unix timestamp until which message should be deferred, or None if sending is permitted immediately.

  • should_reject: True if limit_behavior is “reject” and limit was exceeded, meaning message should be rejected with error.

Return type:

Tuple of (deferred_until, should_reject)

async log_send(account_id)[source]

Record a successful send for rate limiting purposes.

Must be called after each successful message delivery to maintain accurate rate limit tracking. Releases the in-flight slot reserved by check_and_plan().

Parameters:

account_id (str) – The SMTP account identifier that sent the message.

Return type:

None

async release_slot(account_id)[source]

Release an in-flight slot without logging a send.

Call this when a send fails after check_and_plan() returned None. This ensures the in-flight counter stays accurate.

Parameters:

account_id (str) – The SMTP account identifier.

Return type:

None

async purge_for_account(account_id)[source]

Clear all rate limit data for an account.

Call this when an account is deleted.

Parameters:

account_id (str) – The SMTP account identifier.

Return type:

int

Returns:

Number of entries cleared.

clear()[source]

Clear all rate limit data. Used for testing.

Return type:

None

Reporting

Client Reporter - delivery report synchronization with upstream clients.

This module provides ClientReporter, the component responsible for: - Fetching unreported delivery events from message_events table - Sending delivery reports to tenant-specific or global sync URLs - Handling “do not disturb” schedules via next_sync_after - Managing report retention cleanup

ClientReporter is instantiated by MailProxy and accessed via proxy.client_reporter.

Example

# ClientReporter is created by MailProxy proxy = MailProxy(db_path=”mail.db”) await proxy.start() # Starts client_reporter automatically

# Direct access if needed proxy.client_reporter.wake() # Trigger immediate report cycle

class core.mail_proxy.reporting.client_reporter.ClientReporter(proxy)[source]

Bases: object

Delivery report synchronization with upstream clients.

Manages the background loop that: - Fetches unreported events from message_events table - Groups events by tenant and sends to appropriate sync URLs - Handles authentication (bearer token, basic auth) - Respects “do not disturb” schedules from client responses - Cleans up old reported messages based on retention policy

proxy

Parent MailProxy instance for accessing db, config, etc.

Parameters:

proxy (MailProxy)

__init__(proxy)[source]

Initialize ClientReporter.

Parameters:

proxy (MailProxy) – Parent MailProxy instance.

property db

Database access via proxy.

property logger

Logger via proxy.

property metrics

Prometheus metrics via proxy.

async start()[source]

Start the background report loop.

Return type:

None

async stop()[source]

Stop the background report loop gracefully.

Return type:

None

wake(tenant_id=None)[source]

Wake the report loop for immediate processing.

Parameters:

tenant_id (str | None) – If provided, only sync this tenant. Otherwise sync all.

Return type:

None

Enterprise Package (BSL 1.1)

Enterprise features require a commercial license for production use.

Proxy Extension

Enterprise Edition features for genro-mail-proxy.

This package extends the core mail-proxy with enterprise capabilities: bounce detection, PEC receipt handling, and large file storage.

Components:

MailProxy_EE: Mixin adding bounce detection to MailProxy. bounce/: IMAP polling for bounce notification detection. pec/: PEC (Italian certified email) receipt handling. attachments/: Large file storage via fsspec (S3/GCS/Azure). entities/: EE table mixins (AccountsTable_EE, TenantsTable_EE). imap/: Async IMAP client wrapper (aioimaplib).

Example

EE features are auto-enabled when this package is installed:

from core.mail_proxy import MailProxy

proxy = MailProxy(db_path="mail.db")
# MailProxy automatically includes MailProxy_EE mixin

# Configure bounce detection
from enterprise.mail_proxy.bounce import BounceConfig
proxy.configure_bounce_receiver(BounceConfig(
    host="imap.example.com",
    port=993,
    user="bounces@example.com",
    password="secret",
))
await proxy.start()

Note

EE detection happens at import time in core.mail_proxy.__init__. The HAS_ENTERPRISE flag controls mixin composition.

class enterprise.mail_proxy.MailProxy_EE[source]

Bases: object

EE mixin: adds BounceReceiver to MailProxy.

Instance Attributes (added by this mixin):

bounce_receiver: BounceReceiver instance (or None if not started) _bounce_config: BounceConfig instance (or None if not configured)

configure_bounce_receiver()[source]

Set BounceConfig before start()

bounce_receiver_running()

Property to check if poller is active

handle_bounce_command()[source]

Handle getBounceStatus command

Overridden Methods (from MailProxy CE stubs):

__init_proxy_ee__(): Initialize bounce_receiver, _bounce_config _start_proxy_ee(): Start BounceReceiver if configured _stop_proxy_ee(): Stop BounceReceiver

bounce_receiver: BounceReceiver | None
__init_proxy_ee__()[source]

Initialize EE state. Called from MailProxy.__init__.

Return type:

None

configure_bounce_receiver(config)[source]

Configure bounce detection.

Call this before start() to enable bounce detection. The bounce receiver will poll the configured IMAP mailbox for bounce messages and correlate them with sent messages using the X-Genro-Mail-ID header.

Parameters:

config (BounceConfig) – BounceConfig with IMAP credentials and polling settings.

Return type:

None

property bounce_receiver_running: bool

True if BounceReceiver is active and polling.

async handle_bounce_command(cmd, payload=None)[source]

Handle bounce-related commands.

Supported commands: - getBounceStatus: Get bounce receiver status - configureBounce: Configure bounce receiver (requires restart)

Parameters:
  • cmd (str) – Command name.

  • payload (dict[str, Any] | None) – Command parameters.

Return type:

dict[str, Any]

Returns:

Command result dict.

enterprise.mail_proxy.is_ee_enabled()[source]

Check if Enterprise Edition is available and enabled.

Return type:

bool

Returns:

True if EE package is properly installed and importable.

MailProxy_EE: Enterprise Edition mixin for bounce detection.

This mixin adds bounce detection to MailProxy by overriding the CE stub methods (__init_proxy_ee__, _start_proxy_ee, _stop_proxy_ee).

Class Hierarchy:
MailProxyBase (CE): config, db, tables, endpoints, api/cli
└── MailProxy (CE): +SmtpSender, +ClientReporter, +metrics

└── MailProxy with MailProxy_EE (EE): +BounceReceiver

Bounce Detection:

Monitors a dedicated IMAP mailbox for bounce notifications. Correlates bounces with sent messages using the X-Genro-Mail-ID header.

Configuration:
proxy.configure_bounce_receiver(BounceConfig(

imap_host=”imap.example.com”, imap_user=”bounce@example.com”, imap_password=”secret”,

)) await proxy.start()

Commands (via handle_bounce_command):
  • getBounceStatus: Returns configured and running status

class enterprise.mail_proxy.proxy_ee.MailProxy_EE[source]

Bases: object

EE mixin: adds BounceReceiver to MailProxy.

Instance Attributes (added by this mixin):

bounce_receiver: BounceReceiver instance (or None if not started) _bounce_config: BounceConfig instance (or None if not configured)

configure_bounce_receiver()[source]

Set BounceConfig before start()

bounce_receiver_running()

Property to check if poller is active

handle_bounce_command()[source]

Handle getBounceStatus command

Overridden Methods (from MailProxy CE stubs):

__init_proxy_ee__(): Initialize bounce_receiver, _bounce_config _start_proxy_ee(): Start BounceReceiver if configured _stop_proxy_ee(): Stop BounceReceiver

bounce_receiver: BounceReceiver | None
__init_proxy_ee__()[source]

Initialize EE state. Called from MailProxy.__init__.

Return type:

None

configure_bounce_receiver(config)[source]

Configure bounce detection.

Call this before start() to enable bounce detection. The bounce receiver will poll the configured IMAP mailbox for bounce messages and correlate them with sent messages using the X-Genro-Mail-ID header.

Parameters:

config (BounceConfig) – BounceConfig with IMAP credentials and polling settings.

Return type:

None

property bounce_receiver_running: bool

True if BounceReceiver is active and polling.

async handle_bounce_command(cmd, payload=None)[source]

Handle bounce-related commands.

Supported commands: - getBounceStatus: Get bounce receiver status - configureBounce: Configure bounce receiver (requires restart)

Parameters:
  • cmd (str) – Command name.

  • payload (dict[str, Any] | None) – Command parameters.

Return type:

dict[str, Any]

Returns:

Command result dict.

Bounce Detection

Bounce message parser for DSN (RFC 3464) and heuristic detection.

class enterprise.mail_proxy.bounce.parser.BounceInfo(original_message_id, bounce_type, bounce_code, bounce_reason, recipient)[source]

Bases: object

Parsed bounce information.

Parameters:
original_message_id: str | None
bounce_type: Literal['hard', 'soft'] | None
bounce_code: str | None
bounce_reason: str | None
recipient: str | None
__init__(original_message_id, bounce_type, bounce_code, bounce_reason, recipient)
Parameters:
class enterprise.mail_proxy.bounce.parser.BounceParser[source]

Bases: object

Parse bounce messages (DSN RFC 3464 + heuristics).

HARD_BOUNCE_CODES = frozenset({'500', '501', '502', '503', '504', '510', '511', '512', '513', '521', '522', '523', '530', '531', '532', '541', '542', '543', '550', '551', '552', '553', '554', '555', '556', '557'})
SOFT_BOUNCE_CODES = frozenset({'400', '401', '402', '403', '404', '405', '407', '408', '409', '410', '411', '412', '413', '414', '421', '422', '431', '432', '441', '442', '450', '451', '452', '453', '454', '455', '456', '471', '472'})
BOUNCE_SUBJECT_PATTERNS = [re.compile('(?:mail|message|delivery)\\s*(?:delivery|failure|failed|returned|undeliverable)', re.IGNORECASE), re.compile('undelivered\\s*mail\\s*returned', re.IGNORECASE), re.compile('(?:returned|bounced)\\s*mail', re.IGNORECASE), re.compile('failure\\s*notice', re.IGNORECASE)]
SMTP_CODE_PATTERN = re.compile('\\b([45]\\d{2})\\b')
ENHANCED_CODE_PATTERN = re.compile('\\b([45])\\.(\\d+)\\.(\\d+)\\b')
parse(raw_email)[source]

Parse a bounce message and extract bounce information.

Parameters:

raw_email (bytes)

Return type:

BounceInfo

Bounce receiver - IMAP polling for bounce detection.

This module provides BounceReceiver, the component responsible for: - Polling an IMAP mailbox for bounce notifications - Parsing bounce messages to extract delivery failure information - Correlating bounces with sent messages via X-Genro-Mail-ID header - Recording bounce events in the message_events table

BounceReceiver is instantiated by MailProxy_EE and accessed via proxy.bounce_receiver.

Example

# BounceReceiver is created by MailProxy_EE when configured from enterprise.mail_proxy.bounce import BounceConfig

proxy.configure_bounce_receiver(BounceConfig(

host=”imap.example.com”, port=993, user=”bounces@example.com”, password=”secret”,

)) await proxy.start() # Starts bounce_receiver automatically

class enterprise.mail_proxy.bounce.receiver.BounceConfig(host, port, user, password, use_ssl=True, folder='INBOX', poll_interval=60)[source]

Bases: object

Configuration for bounce mailbox polling.

Parameters:
host: str
port: int
user: str
password: str
use_ssl: bool = True
folder: str = 'INBOX'
poll_interval: int = 60
__init__(host, port, user, password, use_ssl=True, folder='INBOX', poll_interval=60)
Parameters:
class enterprise.mail_proxy.bounce.receiver.BounceReceiver(proxy, config)[source]

Bases: object

Background task that polls IMAP for bounce messages.

Monitors a dedicated IMAP mailbox for bounce notifications and correlates them with sent messages using the X-Genro-Mail-ID header.

proxy

Parent MailProxy instance for accessing db, logger, etc.

config

BounceConfig with IMAP credentials and polling settings.

Parameters:
__init__(proxy, config)[source]

Initialize BounceReceiver.

Parameters:
  • proxy (MailProxy_EE) – Parent MailProxy instance.

  • config (BounceConfig) – BounceConfig with IMAP credentials and polling settings.

property db

Database access via proxy.

property logger

Logger via proxy.

async start()[source]

Start the bounce receiver background task.

Return type:

None

async stop()[source]

Stop the bounce receiver.

Return type:

None

PEC Support

PEC receipt parser for Italian certified email receipts.

PEC receipts are standardized messages sent by PEC providers: - Ricevuta di accettazione: confirms the PEC system accepted the message - Ricevuta di avvenuta consegna: confirms delivery to recipient’s PEC mailbox - Ricevuta di mancata consegna: delivery failed (recipient doesn’t exist, etc.) - Avviso di non accettazione: message was rejected by the PEC system

Receipts contain: - X-Ricevuta header indicating receipt type - Original message ID in X-Riferimento-Message-ID or embedded headers - Timestamp of the event - Error details for failure receipts

class enterprise.mail_proxy.pec.parser.PecReceiptInfo(original_message_id, receipt_type, timestamp, error_reason, recipient)[source]

Bases: object

Parsed PEC receipt information.

Parameters:
  • original_message_id (str | None)

  • receipt_type (Optional[Literal['accettazione', 'consegna', 'mancata_consegna', 'non_accettazione', 'presa_in_carico']])

  • timestamp (str | None)

  • error_reason (str | None)

  • recipient (str | None)

original_message_id: str | None
receipt_type: Literal['accettazione', 'consegna', 'mancata_consegna', 'non_accettazione', 'presa_in_carico'] | None
timestamp: str | None
error_reason: str | None
recipient: str | None
__init__(original_message_id, receipt_type, timestamp, error_reason, recipient)
Parameters:
  • original_message_id (str | None)

  • receipt_type (Optional[Literal['accettazione', 'consegna', 'mancata_consegna', 'non_accettazione', 'presa_in_carico']])

  • timestamp (str | None)

  • error_reason (str | None)

  • recipient (str | None)

class enterprise.mail_proxy.pec.parser.PecReceiptParser[source]

Bases: object

Parse PEC receipt messages.

RECEIPT_TYPE_MAP: dict[str, Literal['accettazione', 'consegna', 'mancata_consegna', 'non_accettazione', 'presa_in_carico']] = {'accettazione': 'accettazione', 'avvenuta-consegna': 'consegna', 'consegna': 'consegna', 'errore-consegna': 'mancata_consegna', 'mancata-consegna': 'mancata_consegna', 'non-accettazione': 'non_accettazione', 'presa-in-carico': 'presa_in_carico'}
RECEIPT_SUBJECT_PATTERNS = [(re.compile('ACCETTAZIONE:', re.IGNORECASE), 'accettazione'), (re.compile('AVVENUTA\\s+CONSEGNA:', re.IGNORECASE), 'consegna'), (re.compile('POSTA\\s+CERTIFICATA:\\s+AVVENUTA\\s+CONSEGNA', re.IGNORECASE), 'consegna'), (re.compile('MANCATA\\s+CONSEGNA:', re.IGNORECASE), 'mancata_consegna'), (re.compile('ERRORE\\s+CONSEGNA:', re.IGNORECASE), 'mancata_consegna'), (re.compile('NON\\s+ACCETTAZIONE:', re.IGNORECASE), 'non_accettazione'), (re.compile('PRESA\\s+IN\\s+CARICO:', re.IGNORECASE), 'presa_in_carico')]
parse(raw_email)[source]

Parse a PEC receipt message.

Parameters:

raw_email (bytes)

Return type:

PecReceiptInfo

PEC receipt receiver loop for polling IMAP and processing receipts.

This module monitors PEC accounts for incoming receipts (ricevute) and: 1. Parses receipt type (accettazione, consegna, mancata_consegna, etc.) 2. Creates corresponding events in the message_events table 3. Handles timeout for PEC messages that don’t receive acceptance within 30 minutes

The receiver runs as a background task and polls all PEC accounts periodically.

class enterprise.mail_proxy.pec.receiver.PecReceiver(db, logger=None, poll_interval=60, acceptance_timeout=1800)[source]

Bases: object

Background task that polls PEC accounts for receipts.

Parameters:
  • db (MailProxyDb)

  • logger (Logger | None)

  • poll_interval (int)

  • acceptance_timeout (int)

__init__(db, logger=None, poll_interval=60, acceptance_timeout=1800)[source]
Parameters:
  • db (MailProxyDb)

  • logger (Logger | None)

  • poll_interval (int)

  • acceptance_timeout (int)

async start()[source]

Start the PEC receiver background task.

Return type:

None

async stop()[source]

Stop the PEC receiver.

Return type:

None

IMAP Client

Async IMAP client wrapper for bounce detection.

class enterprise.mail_proxy.imap.client.IMAPClient(logger=None)[source]

Bases: object

Async IMAP client wrapper using aioimaplib.

Parameters:

logger (Logger | None)

__init__(logger=None)[source]
Parameters:

logger (Logger | None)

async connect(host, port, user, password, use_ssl=True)[source]

Connect and authenticate to IMAP server.

Parameters:
Return type:

None

async select_folder(folder='INBOX')[source]

Select mailbox folder. Returns UIDVALIDITY.

Parameters:

folder (str)

Return type:

int

property uidvalidity: int | None

Return current UIDVALIDITY value.

async fetch_since_uid(last_uid)[source]

Fetch messages with UID greater than last_uid.

Parameters:

last_uid (int)

Return type:

list[IMAPMessage]

async close()[source]

Close IMAP connection.

Return type:

None

class enterprise.mail_proxy.imap.client.IMAPMessage(uid, raw)[source]

Bases: object

Represents a fetched IMAP message.

Parameters:
uid: int
raw: bytes
__init__(uid, raw)
Parameters:

Large File Storage

Large file storage backend using fsspec for multi-cloud support.

This module provides storage capabilities for large email attachments, supporting multiple backends (S3, GCS, Azure, local filesystem) via fsspec.

When attachments exceed the configured size threshold, they are uploaded to external storage and replaced with download links in the email body.

Example

Using S3 storage:

storage = LargeFileStorage(
    storage_url="s3://my-bucket/mail-attachments",
)
await storage.upload("file-123", content, "report.pdf")
url = storage.get_download_url("file-123", "report.pdf")

Using local filesystem with public URL:

storage = LargeFileStorage(
    storage_url="file:///var/www/downloads",
    public_base_url="https://files.example.com",
)
exception enterprise.mail_proxy.attachments.large_file_storage.LargeFileStorageError[source]

Bases: Exception

Base exception for large file storage errors.

exception enterprise.mail_proxy.attachments.large_file_storage.StorageNotConfiguredError[source]

Bases: LargeFileStorageError

Raised when storage is not properly configured.

exception enterprise.mail_proxy.attachments.large_file_storage.UploadError[source]

Bases: LargeFileStorageError

Raised when file upload fails.

class enterprise.mail_proxy.attachments.large_file_storage.LargeFileStorage(storage_url, public_base_url=None, secret_key=None, storage_options=None)[source]

Bases: object

Storage backend for large email attachments using fsspec.

Supports multiple storage backends through fsspec’s unified interface: - S3/MinIO: s3://bucket/path - Google Cloud Storage: gs://bucket/path - Azure Blob: az://container/path - Local filesystem: file:///path/to/dir

For cloud storage (S3, GCS, Azure), download URLs are generated using the backend’s native signed URL mechanism. For local filesystem, a public_base_url must be provided and signed tokens are generated.

storage_url

fsspec-compatible URL for the storage backend.

public_base_url

Base URL for download links (required for local storage).

secret_key

Secret key for signing download tokens.

Parameters:
__init__(storage_url, public_base_url=None, secret_key=None, storage_options=None)[source]

Initialize the large file storage backend.

Parameters:
  • storage_url (str) – fsspec URL (s3://bucket/path, file:///data, etc.).

  • public_base_url (str | None) – Public URL for download links (required for local storage).

  • secret_key (str | None) – Secret key for signing download tokens.

  • storage_options (dict[str, Any] | None) – Additional options passed to fsspec filesystem.

Raises:

ImportError – If fsspec is not installed.

property fs: AbstractFileSystem

The fsspec filesystem instance.

property base_path: str

The base path within the storage backend.

async upload(file_id, content, filename)[source]

Upload a file to storage.

Parameters:
  • file_id (str) – Unique identifier for this file (e.g., UUID).

  • content (bytes) – File content as bytes.

  • filename (str) – Original filename.

Return type:

str

Returns:

The storage path where the file was uploaded.

Raises:

UploadError – If upload fails.

get_download_url(file_id, filename, expires_in=86400)[source]

Generate a download URL for a file.

For cloud storage backends (S3, GCS, Azure), uses the backend’s native signed URL mechanism. For local filesystem, generates a signed token URL using public_base_url.

Parameters:
  • file_id (str) – The file’s unique identifier.

  • filename (str) – The original filename.

  • expires_in (int) – URL expiration time in seconds (default: 24 hours).

Return type:

str

Returns:

A download URL for the file.

Raises:

StorageNotConfiguredError – If public_base_url is required but not set.

verify_download_token(token, filename)[source]

Verify a download token and return the file_id if valid.

Parameters:
  • token (str) – The token from the download URL.

  • filename (str) – The filename from the URL (for verification).

Return type:

str | None

Returns:

The file_id if token is valid and not expired, None otherwise.

get_file_content(file_id, filename)[source]

Retrieve file content from storage.

Parameters:
  • file_id (str) – The file’s unique identifier.

  • filename (str) – The original filename.

Return type:

bytes | None

Returns:

File content as bytes, or None if not found.

async cleanup_expired(ttl_days)[source]

Remove files older than the specified TTL.

Parameters:

ttl_days (int) – Delete files older than this many days.

Return type:

int

Returns:

Number of files deleted.

exists(file_id, filename)[source]

Check if a file exists in storage.

Parameters:
  • file_id (str) – The file’s unique identifier.

  • filename (str) – The original filename.

Return type:

bool

Returns:

True if file exists, False otherwise.

Enterprise Entities

Account EE (PEC Support)

Enterprise Edition extensions for AccountsTable.

This module adds PEC (Posta Elettronica Certificata) functionality to the base AccountsTable. PEC is the Italian certified email system that requires IMAP polling for delivery receipts.

PEC accounts have: - is_pec_account=1 flag - IMAP configuration for receipt polling - Sync state tracking (last_uid, uidvalidity)

Usage:
class AccountsTable(AccountsTable_EE, AccountsTableBase):

pass

class enterprise.mail_proxy.entities.account.table_ee.AccountsTable_EE[source]

Bases: object

Enterprise Edition: PEC account support with IMAP configuration.

Adds: - IMAP/PEC columns via configure() - Methods for PEC account management - IMAP sync state tracking

configure()[source]

Add EE columns for IMAP/PEC support after CE columns.

Return type:

None

async add_pec_account(acc)[source]

Insert or update a PEC account with IMAP configuration.

PEC accounts have is_pec_account=1 and require IMAP settings for reading delivery receipts (ricevute di accettazione/consegna).

Required fields:

id: Account identifier host, port: SMTP server config imap_host: IMAP server for reading receipts

Optional IMAP fields:

imap_port: IMAP port (default 993) imap_user: IMAP username (defaults to SMTP user) imap_password: IMAP password (defaults to SMTP password) imap_folder: Folder to monitor (default “INBOX”)

Return type:

str

Returns:

The account’s internal pk (UUID).

Parameters:

acc (dict[str, Any])

async list_pec_accounts()[source]

Return all PEC accounts (is_pec_account=1).

Return type:

list[dict[str, Any]]

Returns:

List of PEC account dicts with IMAP configuration.

async get_pec_account_ids()[source]

Get the set of account IDs that are PEC accounts.

Return type:

set[str]

Returns:

Set of account IDs with is_pec_account=1.

async update_imap_sync_state(tenant_id, account_id, last_uid, uidvalidity=None)[source]

Update IMAP sync state after processing receipts.

Called after polling IMAP for PEC receipts to track progress. Next poll will start from last_uid + 1.

Parameters:
  • tenant_id (str) – The tenant that owns this account.

  • account_id (str) – The account identifier.

  • last_uid (int) – The last processed UID.

  • uidvalidity (int | None) – The UIDVALIDITY value (detects mailbox reset).

Return type:

None

Enterprise Edition: PEC account support for AccountEndpoint.

PEC = Posta Elettronica Certificata (Italian certified email). Requires IMAP polling for delivery receipts.

class enterprise.mail_proxy.entities.account.endpoint_ee.AccountEndpoint_EE[source]

Bases: object

EE mixin: adds PEC account methods to AccountEndpoint.

async add_pec(id, tenant_id, host, port, imap_host, user=None, password=None, use_tls=True, imap_port=993, imap_user=None, imap_password=None, imap_folder='INBOX', batch_size=None, ttl=300, limit_per_minute=None, limit_per_hour=None, limit_per_day=None, limit_behavior='defer')[source]

Add or update a PEC account with IMAP configuration.

Parameters:
Return type:

dict

async list_pec()[source]

List all PEC accounts across all tenants.

Return type:

list[dict]

async get_pec_ids()[source]

Get set of account IDs that are PEC accounts.

Return type:

set[str]

Tenant EE (API Keys)

Enterprise Edition extensions for TenantsTable.

This module adds multi-tenant management functionality to the base TenantsTable. In EE mode, multiple tenants can be created with isolated configurations and API key authentication.

Multi-tenant features: - Tenant CRUD operations (add, list, update, remove) - Tenant API key management for scoped authentication

Note: Batch suspension is available in CE (core/tenant/table.py).

Usage:
class TenantsTable(TenantsTable_EE, TenantsTableBase):

pass

class enterprise.mail_proxy.entities.tenant.table_ee.TenantsTable_EE[source]

Bases: object

Enterprise Edition: Multi-tenant management.

Adds methods for: - Creating and managing multiple tenants - Tenant API key authentication

async add(tenant)[source]

Insert or update a tenant configuration.

For new tenants, automatically generates an API key. For existing tenants, keeps the existing API key.

Parameters:

tenant (dict[str, Any]) – Tenant configuration dict with at least ‘id’ field. Optional fields: name, client_auth, client_base_url, client_sync_path, client_attachment_path, rate_limits, large_file_config, active.

Return type:

str

Returns:

The API key (raw, show once) for new tenants. Empty string for existing tenants (key unchanged).

async list_all(active_only=False)[source]

Return all tenants, optionally filtered by active status.

Parameters:

active_only (bool) – If True, return only active tenants.

Return type:

list[dict[str, Any]]

Returns:

List of tenant dicts with decoded fields.

async update_fields(tenant_id, updates)[source]

Update a tenant’s fields.

Parameters:
  • tenant_id (str) – The tenant to update.

  • updates (dict[str, Any]) – Dict of fields to update. Supported fields: name, client_auth, client_base_url, client_sync_path, client_attachment_path, rate_limits, large_file_config, active.

Return type:

bool

Returns:

True if row was updated, False if no valid updates or tenant not found.

async remove(tenant_id)[source]

Delete a tenant and cascade to related accounts and messages.

Warning: This deletes ALL data associated with the tenant including: - All messages in the queue - All SMTP account configurations - The tenant configuration itself

Parameters:

tenant_id (str) – The tenant to delete.

Return type:

bool

Returns:

True if tenant was deleted.

async create_api_key(tenant_id, expires_at=None)[source]

Create a new API key for a tenant.

Generates a new random API key, replacing any existing key. The raw key is returned once and cannot be retrieved later.

Parameters:
  • tenant_id (str) – The tenant ID.

  • expires_at (int | None) – Optional Unix timestamp for key expiration.

Return type:

str | None

Returns:

The raw API key (show once), or None if tenant not found.

async get_tenant_by_token(raw_key)[source]

Find tenant by API key token.

Looks up the tenant associated with the given API key. Validates that the key has not expired.

Parameters:

raw_key (str) – The raw API key to look up.

Return type:

dict[str, Any] | None

Returns:

Tenant dict if found and not expired, None otherwise.

async revoke_api_key(tenant_id)[source]

Revoke the API key for a tenant.

Removes the API key, preventing further authentication. The tenant can still be accessed via instance token.

Parameters:

tenant_id (str) – The tenant ID.

Return type:

bool

Returns:

True if key was revoked, False if tenant not found.

Enterprise Edition extensions for TenantEndpoint.

This module adds API key management functionality to the base TenantEndpoint. EE mode allows tenants to have their own API keys for scoped authentication.

API key features: - Create new API keys for tenants (with optional expiration) - Revoke API keys without deleting tenants

Usage:
class TenantEndpoint(TenantEndpoint_EE, TenantEndpointBase):

pass

class enterprise.mail_proxy.entities.tenant.endpoint_ee.TenantEndpoint_EE[source]

Bases: object

Enterprise Edition: API key management for tenants.

Adds methods for: - Creating tenant-specific API keys - Revoking API keys

async create_api_key(tenant_id, expires_at=None)[source]

Create a new API key for a tenant.

Generates a new random API key, replacing any existing key. The raw key is returned once and cannot be retrieved later. Save it immediately!

Parameters:
  • tenant_id (str) – The tenant ID.

  • expires_at (int | None) – Optional Unix timestamp for key expiration.

Return type:

dict

Returns:

Dict with ok=True and api_key (show once).

Raises:

ValueError – If tenant not found.

async revoke_api_key(tenant_id)[source]

Revoke the API key for a tenant.

Removes the API key, preventing further authentication with it. The tenant can still be accessed via instance token.

Parameters:

tenant_id (str) – The tenant ID.

Return type:

dict

Returns:

Dict with ok=True.

Raises:

ValueError – If tenant not found.

Instance EE (Bounce Config)

Enterprise Edition extensions for InstanceTable.

This module adds bounce detection configuration to the base InstanceTable. Bounce detection monitors a dedicated IMAP mailbox for bounce notifications and DSN (Delivery Status Notification) messages.

Bounce detection workflow: 1. Outgoing emails use a dedicated Return-Path address 2. Bounces are delivered to the bounce mailbox 3. IMAP poller reads and parses bounce messages 4. Bounce events are recorded in message_events

Usage:
class InstanceTable(InstanceTable_EE, InstanceTableBase):

pass

class enterprise.mail_proxy.entities.instance.table_ee.InstanceTable_EE[source]

Bases: object

Enterprise Edition: Bounce detection configuration.

Adds: - Bounce IMAP columns via configure() - Methods for bounce detection management - Sync state tracking

configure()[source]

Add EE columns for bounce detection after CE columns.

Return type:

None

async is_bounce_enabled()[source]

Check if bounce detection is enabled.

Return type:

bool

Returns:

True if bounce_enabled=1 in instance config.

async get_bounce_config()[source]

Get bounce detection configuration.

Returns:

  • enabled: bool

  • imap_host, imap_port, imap_user, imap_password, imap_folder

  • imap_ssl: bool

  • poll_interval: seconds between polls

  • return_path: Return-Path header for outgoing emails

  • last_uid, last_sync, uidvalidity: sync state

Return type:

Dict with bounce settings

async set_bounce_config(*, enabled=None, imap_host=None, imap_port=None, imap_user=None, imap_password=None, imap_folder=None, imap_ssl=None, poll_interval=None, return_path=None)[source]

Set bounce detection configuration.

Only provided fields are updated. Pass None to skip a field.

Parameters:
  • enabled (bool | None) – Enable/disable bounce detection.

  • imap_host (str | None) – IMAP server hostname.

  • imap_port (int | None) – IMAP port (default 993).

  • imap_user (str | None) – IMAP username.

  • imap_password (str | None) – IMAP password.

  • imap_folder (str | None) – Folder to monitor (default “INBOX”).

  • imap_ssl (bool | None) – Use SSL/TLS (default True).

  • poll_interval (int | None) – Seconds between polls (default 60).

  • return_path (str | None) – Return-Path header for outgoing emails.

Return type:

None

async update_bounce_sync_state(*, last_uid, last_sync, uidvalidity=None)[source]

Update bounce IMAP sync state after processing.

Called after polling the bounce mailbox to track progress. Next poll will start from last_uid + 1.

Parameters:
  • last_uid (int) – Last processed UID.

  • last_sync (int) – Unix timestamp of this sync.

  • uidvalidity (int | None) – IMAP UIDVALIDITY (detects mailbox reset).

Return type:

None

Enterprise Edition extensions for InstanceEndpoint.

This module adds bounce detection configuration to the base InstanceEndpoint. Bounce detection monitors a dedicated IMAP mailbox for bounce notifications.

Bounce features: - Get/set bounce detection configuration - Runtime reload of bounce config

Usage:
class InstanceEndpoint(InstanceEndpoint_EE, InstanceEndpointBase):

pass

class enterprise.mail_proxy.entities.instance.endpoint_ee.InstanceEndpoint_EE[source]

Bases: object

Enterprise Edition: Bounce detection configuration.

Adds methods for: - Getting bounce detection configuration - Setting bounce detection configuration - Reloading bounce config at runtime

async get_bounce_config()[source]

Get bounce detection configuration.

Returns:

  • enabled: bool

  • imap_host, imap_port, imap_user, imap_folder

  • imap_ssl: bool

  • poll_interval: seconds between polls

  • return_path: Return-Path header for outgoing emails

Return type:

Dict with bounce settings including

async set_bounce_config(enabled=None, imap_host=None, imap_port=None, imap_user=None, imap_password=None, imap_folder=None, imap_ssl=None, poll_interval=None, return_path=None)[source]

Set bounce detection configuration.

Only provided fields are updated. Pass None to skip a field. Use reload_bounce() to apply changes to running poller.

Parameters:
  • enabled (bool | None) – Enable/disable bounce detection.

  • imap_host (str | None) – IMAP server hostname.

  • imap_port (int | None) – IMAP port (default 993).

  • imap_user (str | None) – IMAP username.

  • imap_password (str | None) – IMAP password.

  • imap_folder (str | None) – Folder to monitor (default “INBOX”).

  • imap_ssl (bool | None) – Use SSL/TLS (default True).

  • poll_interval (int | None) – Seconds between polls (default 60).

  • return_path (str | None) – Return-Path header for outgoing emails.

Return type:

dict

Returns:

Dict with ok=True.

async reload_bounce()[source]

Reload bounce detection configuration at runtime.

Stops the current bounce poller (if running) and restarts it with the latest configuration from the database.

Return type:

dict

Returns:

Dict with ok=True and enabled status.

Message EE (PEC Events)

Enterprise Edition extensions for MessagesTable.

This module adds PEC (Posta Elettronica Certificata) tracking functionality to the base MessagesTable. PEC messages require tracking of acceptance and delivery receipts.

PEC workflow: 1. Message sent via PEC account gets is_pec=1 2. IMAP poller checks for acceptance receipt 3. If no acceptance within timeout, message flagged for alert 4. Delivery receipt completes the PEC lifecycle

Usage:
class MessagesTable(MessagesTable_EE, MessagesTableBase):

pass

class enterprise.mail_proxy.entities.message.table_ee.MessagesTable_EE[source]

Bases: object

Enterprise Edition: PEC message tracking.

Adds methods for: - Clearing PEC flag for non-PEC recipients - Finding PEC messages missing acceptance receipts

async clear_pec_flag(pk)[source]

Clear the is_pec flag when recipient is not a PEC address.

Called when sending to a non-PEC recipient via a PEC account. The message doesn’t need receipt tracking in this case.

Parameters:

pk (str) – Internal primary key of the message (UUID string).

Return type:

None

async get_pec_without_acceptance(cutoff_ts)[source]

Get PEC messages sent before cutoff_ts without acceptance receipt.

Used to detect PEC delivery failures. If a PEC message doesn’t receive an acceptance receipt within the expected timeframe, it may indicate a delivery problem.

Returns messages where: - is_pec = 1 (marked as PEC) - smtp_ts < cutoff_ts (sent before cutoff) - No pec_acceptance event exists in message_events

Parameters:

cutoff_ts (int) – Unix timestamp. Messages sent before this time without acceptance receipt are returned.

Return type:

list[dict[str, Any]]

Returns:

List of message dicts with pk, id, account_id, smtp_ts.

Storage EE (Cloud Backends)

Enterprise Edition: Cloud storage backend support via fsspec.

This mixin adds S3, Azure, and GCS support to StorageNode.

class enterprise.mail_proxy.storage.node_ee.StorageNode_EE[source]

Bases: object

Enterprise Edition mixin: Cloud storage backends via fsspec.

Supports: - S3 (protocol: ‘s3’) - Google Cloud Storage (protocol: ‘gcs’) - Azure Blob (protocol: ‘azure’)

Usage (mixed into StorageNode):
class StorageNode(StorageNode_EE, StorageNodeBase):

pass

SQL Package

Database abstraction layer supporting SQLite and PostgreSQL.

Async database manager with adapter pattern and table registration.

class sql.sqldb.SqlDb(connection_string, parent=None)[source]

Bases: object

Async database manager with adapter pattern.

Supports multiple database types via adapters: - SQLite: “/path/to/db.sqlite” or “sqlite:/path/to/db” - PostgreSQL: “postgresql://user:pass@host/db”

Features: - Table class registration via add_table() - Table access via table(name) - Schema creation and verification - CRUD operations via adapter - Encryption key access via parent.encryption_key

Usage:

db = SqlDb(“/data/mail.db”, parent=proxy) await db.connect()

db.add_table(TenantsTable) db.add_table(AccountsTable) await db.check_structure()

tenant = await db.table(‘tenants’).select_one(where={“id”: “acme”})

await db.close()

Parameters:
  • connection_string (str)

  • parent (Any)

__init__(connection_string, parent=None)[source]

Initialize database manager.

Parameters:
  • connection_string (str) – Database connection string.

  • parent (Any) – Parent object (e.g., proxy) that provides encryption_key.

property encryption_key: bytes | None

Get encryption key from parent. Returns None if not configured.

async connect()[source]

Connect to database.

Return type:

None

async close()[source]

Close database connection.

Return type:

None

add_table(table_class)[source]

Register and instantiate a table class.

Parameters:

table_class (type[Table]) – Table manager class (must have name attribute).

Return type:

Table

Returns:

The instantiated table.

table(name)[source]

Get table instance by name.

Parameters:

name (str) – Table name.

Return type:

Table

Returns:

Table instance.

Raises:

ValueError – If table not registered.

async check_structure()[source]

Create all registered tables if they don’t exist.

Return type:

None

async execute(query, params=None)[source]

Execute raw query, return affected row count.

Parameters:
Return type:

int

async fetch_one(query, params=None)[source]

Execute raw query, return single row.

Parameters:
Return type:

dict[str, Any] | None

async fetch_all(query, params=None)[source]

Execute raw query, return all rows.

Parameters:
Return type:

list[dict[str, Any]]

async commit()[source]

Commit transaction.

Return type:

None

async rollback()[source]

Rollback transaction.

Return type:

None

Table base class with Columns-based schema (async version).

class sql.table.Table(db)[source]

Bases: object

Base class for async table managers.

Subclasses define columns via configure() hook and implement domain-specific operations.

name

Table name in database.

pkey

Primary key column name (e.g., “pk” or “id”).

db

SqlDb instance reference.

columns

Column definitions.

Parameters:

db (SqlDb)

name: str
pkey: str | None = None
__init__(db)[source]
Parameters:

db (SqlDb)

configure()[source]

Override to define columns. Called during __init__.

Return type:

None

pkey_value(record)[source]

Get primary key value from a record.

Parameters:

record (dict[str, Any])

Return type:

Any

new_pkey_value()[source]

Generate a new primary key value. Override in subclasses for custom pk.

Default: returns UUID. Tables with autoincrement pk should return None.

Return type:

Any

async trigger_on_inserting(record)[source]

Called before insert. Can modify record. Return the record to insert.

Auto-generates pk via new_pkey_value() if pk column is not in record.

Parameters:

record (dict[str, Any])

Return type:

dict[str, Any]

async trigger_on_inserted(record)[source]

Called after successful insert.

Parameters:

record (dict[str, Any])

Return type:

None

async trigger_on_updating(record, old_record)[source]

Called before update. Can modify record. Return the record to update.

Parameters:
Return type:

dict[str, Any]

async trigger_on_updated(record, old_record)[source]

Called after successful update.

Parameters:
Return type:

None

async trigger_on_deleting(record)[source]

Called before delete.

Parameters:

record (dict[str, Any])

Return type:

None

async trigger_on_deleted(record)[source]

Called after successful delete.

Parameters:

record (dict[str, Any])

Return type:

None

create_table_sql()[source]

Generate CREATE TABLE IF NOT EXISTS statement.

Return type:

str

async create_schema()[source]

Create table if not exists.

Return type:

None

async add_column_if_missing(column_name)[source]

Add column if it doesn’t exist (migration helper).

Parameters:

column_name (str)

Return type:

None

async sync_schema()[source]

Sync table schema by adding any missing columns.

Iterates over all columns defined in configure() and adds them if they don’t exist in the database. This enables automatic schema migration when new columns are added to the codebase.

Safe to call on every startup - existing columns are ignored. Works with both SQLite and PostgreSQL.

Return type:

None

async insert(data)[source]

Insert a row. Calls trigger_on_inserting before and trigger_on_inserted after.

The data dict is mutated: if the pk is auto-generated (UUID or autoincrement), it will be populated in data after insert.

Parameters:

data (dict[str, Any])

Return type:

int

async select(columns=None, where=None, order_by=None, limit=None)[source]

Select rows.

Parameters:
Return type:

list[dict[str, Any]]

async select_one(columns=None, where=None)[source]

Select single row.

Parameters:
Return type:

dict[str, Any] | None

async select_for_update(where, columns=None)[source]

Select single row with FOR UPDATE lock (PostgreSQL) or regular select (SQLite).

Parameters:
  • where (dict[str, Any]) – WHERE conditions to identify the row.

  • columns (list[str] | None) – Columns to select (None = all).

Return type:

dict[str, Any] | None

Returns:

Row dict or None if not found.

record(pkey_value, insert_missing=False, for_update=True)[source]

Return async context manager for record update.

Parameters:
  • pkey_value (Any) – Primary key value, or dict for composite keys.

  • insert_missing (bool) – If True, insert new record if not found (upsert).

  • for_update (bool) – If True, use SELECT FOR UPDATE (PostgreSQL).

Return type:

RecordUpdater

Returns:

RecordUpdater context manager.

Usage:

# Single key: async with table.record(‘uuid-123’) as rec:

rec[‘name’] = ‘New Name’

# Composite key (dict): async with table.record({‘tenant_id’: ‘t1’, ‘id’: ‘acc1’}) as rec:

rec[‘host’] = ‘smtp.example.com’

# Upsert (insert if missing): async with table.record({‘tenant_id’: ‘t1’, ‘id’: ‘new’}, insert_missing=True) as rec:

rec[‘host’] = ‘smtp.new.com’

async update(values, where)[source]

Update rows. Calls trigger_on_updating before and trigger_on_updated after.

Uses SELECT FOR UPDATE to lock the row during update (PostgreSQL).

Parameters:
Return type:

int

async update_batch(pkeys, updater=None)[source]

Update multiple records by primary key, calling triggers for each.

Performs ONE read (SELECT all records) then N writes (UPDATE per record) with trigger_on_updating/trigger_on_updated called for each.

Parameters:
  • pkeys (list[Any]) – List of primary key values to update.

  • updater (dict[str, Any] | None) – Dict of field:value to set on each record.

Return type:

int

Returns:

Number of records updated.

async update_batch_raw(pkeys, updater)[source]

Update multiple records with a single UPDATE statement. No triggers.

Use when you know there are no triggers to call and want maximum efficiency. Performs a single UPDATE … WHERE pk IN (…) query.

Parameters:
  • pkeys (list[Any]) – List of primary key values to update.

  • updater (dict[str, Any]) – Dict of field:value to set on all records.

Return type:

int

Returns:

Number of records updated.

async delete(where)[source]

Delete rows. Calls trigger_on_deleting before and trigger_on_deleted after.

Parameters:

where (dict[str, Any])

Return type:

int

async exists(where)[source]

Check if row exists.

Parameters:

where (dict[str, Any])

Return type:

bool

async count(where=None)[source]

Count rows.

Parameters:

where (dict[str, Any] | None)

Return type:

int

async fetch_one(query, params=None)[source]

Execute raw query, return single row.

Parameters:
Return type:

dict[str, Any] | None

async fetch_all(query, params=None)[source]

Execute raw query, return all rows.

Parameters:
Return type:

list[dict[str, Any]]

async execute(query, params=None)[source]

Execute raw query, return affected row count.

Parameters:
Return type:

int

class sql.table.RecordUpdater(table, pkey, pkey_value, insert_missing=False, for_update=True)[source]

Bases: object

Async context manager for record update with locking and triggers.

Usage:
async with table.record(pk) as record:

record[‘field’] = ‘value’

# → triggers update() with old_record

async with table.record(pk, insert_missing=True) as record:

record[‘field’] = ‘value’

# → insert() if not exists, update() if exists

The context manager: - __aenter__: SELECT FOR UPDATE (PostgreSQL) or SELECT (SQLite), saves old_record - __aexit__: calls insert() or update() with proper trigger chain Supports both single-column keys and composite keys (dict).

Single key: record(“uuid-123”) or record(“uuid-123”, pkey=”pk”) Composite: record({“tenant_id”: “t1”, “id”: “acc1”})

Parameters:
__init__(table, pkey, pkey_value, insert_missing=False, for_update=True)[source]
Parameters:
record: dict[str, Any] | None
old_record: dict[str, Any] | None
where: dict[str, Any]

Column and Columns definitions for table schema.

class sql.column.Column(name, type_, *, unique=False, nullable=True, default=None, json_encoded=False, encrypted=False)[source]

Bases: object

Column definition with SQL type, constraints, and optional relation.

Parameters:
__init__(name, type_, *, unique=False, nullable=True, default=None, json_encoded=False, encrypted=False)[source]
Parameters:
relation(table, pk='id', sql=False)[source]

Define a foreign key relation to another table.

Parameters:
  • table (str) – Target table name

  • pk (str) – Primary key column in target table (default: “id”)

  • sql (bool) – If True, generate SQL FOREIGN KEY constraint

Return type:

Column

to_sql(*, primary_key=False)[source]

Generate SQL column definition.

Parameters:

primary_key (bool) – If True, add PRIMARY KEY constraint.

Return type:

str

Column names are quoted with double quotes to handle reserved words like ‘user’ which is reserved in PostgreSQL.

class sql.column.Columns[source]

Bases: object

Container for table columns.

__init__()[source]
column(name, type_, *, unique=False, nullable=True, default=None, json_encoded=False, encrypted=False)[source]

Add a column definition. Returns the Column for fluent relation().

Parameters:
Return type:

Column

items()[source]

Return column name-definition pairs.

keys()[source]

Return column names.

values()[source]

Return column definitions.

get(name)[source]

Get column by name.

Parameters:

name (str)

Return type:

Column | None

json_columns()[source]

Return names of JSON-encoded columns.

Return type:

list[str]

encrypted_columns()[source]

Return names of encrypted columns.

Return type:

list[str]

Database Adapters

Base adapter class for async database backends with CRUD helpers.

class sql.adapters.base.DbAdapter[source]

Bases: ABC

Abstract base class for async database adapters with CRUD helpers.

Provides a unified interface for SQLite and PostgreSQL with: - Connection management (connect, close) - Raw query execution (execute, fetch_one, fetch_all) - CRUD helpers (insert, select, update, delete)

Subclasses must implement the abstract methods and set the placeholder attribute for parameter binding (:name for SQLite, %(name)s for PostgreSQL).

placeholder: str = ':name'
pk_column(name)[source]

Return SQL definition for autoincrement primary key column.

Parameters:

name (str)

Return type:

str

for_update_clause()[source]

Return FOR UPDATE clause if supported, empty string otherwise.

Return type:

str

abstractmethod async connect()[source]

Establish database connection.

Return type:

None

abstractmethod async close()[source]

Close database connection.

Return type:

None

abstractmethod async execute(query, params=None)[source]

Execute query, return affected row count.

Parameters:
Return type:

int

abstractmethod async execute_many(query, params_list)[source]

Execute query multiple times with different params (batch insert).

Parameters:
Return type:

int

abstractmethod async fetch_one(query, params=None)[source]

Execute query, return single row as dict or None.

Parameters:
Return type:

dict[str, Any] | None

abstractmethod async fetch_all(query, params=None)[source]

Execute query, return all rows as list of dicts.

Parameters:
Return type:

list[dict[str, Any]]

abstractmethod async execute_script(script)[source]

Execute multiple statements (for schema creation).

Parameters:

script (str)

Return type:

None

abstractmethod async commit()[source]

Commit current transaction.

Return type:

None

abstractmethod async rollback()[source]

Rollback current transaction.

Return type:

None

async insert(table, values)[source]

Insert a row, return rowcount.

Parameters:
  • table (str) – Table name.

  • values (dict[str, Any]) – Column-value pairs.

Return type:

int

Returns:

Number of affected rows (typically 1).

async insert_returning_id(table, values, pk_col='id')[source]

Insert a row and return the generated primary key.

Override in subclasses for database-specific implementation (e.g., RETURNING for PostgreSQL, lastrowid for SQLite).

Parameters:
  • table (str) – Table name.

  • values (dict[str, Any]) – Column-value pairs.

  • pk_col (str) – Primary key column name.

Return type:

Any

Returns:

The generated primary key value, or None if not supported.

async select(table, columns=None, where=None, order_by=None, limit=None)[source]

Select rows, return list of dicts.

Parameters:
  • table (str) – Table name.

  • columns (list[str] | None) – Columns to select (None = all).

  • where (dict[str, Any] | None) – WHERE conditions (AND).

  • order_by (str | None) – ORDER BY clause.

  • limit (int | None) – LIMIT clause.

Return type:

list[dict[str, Any]]

Returns:

List of row dicts.

async select_one(table, columns=None, where=None)[source]

Select single row, return dict or None.

Parameters:
Return type:

dict[str, Any] | None

async update(table, values, where)[source]

Update rows, return rowcount.

Parameters:
  • table (str) – Table name.

  • values (dict[str, Any]) – Column-value pairs to update.

  • where (dict[str, Any]) – WHERE conditions.

Return type:

int

Returns:

Number of affected rows.

async delete(table, where)[source]

Delete rows, return rowcount.

Parameters:
  • table (str) – Table name.

  • where (dict[str, Any]) – WHERE conditions.

Return type:

int

Returns:

Number of deleted rows.

async exists(table, where)[source]

Check if row exists.

Parameters:
Return type:

bool

async count(table, where=None)[source]

Count rows in table.

Parameters:
  • table (str) – Table name.

  • where (dict[str, Any] | None) – Optional WHERE conditions.

Return type:

int

Returns:

Row count.

SQLite async adapter using aiosqlite.

class sql.adapters.sqlite.SqliteAdapter(db_path)[source]

Bases: DbAdapter

SQLite async adapter. Uses :name placeholders natively.

Parameters:

db_path (str)

placeholder: str = ':name'
__init__(db_path)[source]
Parameters:

db_path (str)

async connect()[source]

SQLite connections are opened per-operation, this is a no-op.

Return type:

None

async close()[source]

SQLite connections are closed per-operation, this is a no-op.

Return type:

None

async execute(query, params=None)[source]

Execute query, return affected row count.

Parameters:
Return type:

int

async insert_returning_id(table, values, pk_col='id')[source]

Insert a row and return the generated primary key (autoincrement).

Parameters:
  • table (str) – Table name.

  • values (dict[str, Any]) – Column-value pairs.

  • pk_col (str) – Primary key column name (used for RETURNING in PostgreSQL).

Return type:

Any

Returns:

The generated primary key value (lastrowid for SQLite).

async execute_many(query, params_list)[source]

Execute query multiple times with different params (batch insert).

Parameters:
Return type:

int

async fetch_one(query, params=None)[source]

Execute query, return single row as dict or None.

Parameters:
Return type:

dict[str, Any] | None

async fetch_all(query, params=None)[source]

Execute query, return all rows as list of dicts.

Parameters:
Return type:

list[dict[str, Any]]

async execute_script(script)[source]

Execute multiple statements (for schema creation).

Parameters:

script (str)

Return type:

None

async commit()[source]

Commit is handled per-operation in this implementation.

Return type:

None

async rollback()[source]

Rollback is handled per-operation in this implementation.

Return type:

None

PostgreSQL async adapter using psycopg3 with connection pooling.

class sql.adapters.postgresql.PostgresAdapter(dsn, pool_size=10)[source]

Bases: DbAdapter

PostgreSQL async adapter using psycopg3 with connection pooling.

Converts :name placeholders to %(name)s for psycopg compatibility.

Parameters:
placeholder: str = '%(name)s'
pk_column(name)[source]

Return SQL definition for autoincrement primary key column (PostgreSQL).

Parameters:

name (str)

Return type:

str

__init__(dsn, pool_size=10)[source]
Parameters:
async connect()[source]

Establish connection pool.

Sets search_path to ‘public’ schema to ensure tables are created in a valid schema even when the connection default is unset.

Return type:

None

async close()[source]

Close connection pool.

Return type:

None

async execute(query, params=None)[source]

Execute query, return affected row count.

Parameters:
Return type:

int

async insert_returning_id(table, values, pk_col='id')[source]

Insert a row and return the generated primary key (RETURNING).

Parameters:
  • table (str) – Table name.

  • values (dict[str, Any]) – Column-value pairs.

  • pk_col (str) – Primary key column name for RETURNING clause.

Return type:

Any

Returns:

The generated primary key value.

async execute_many(query, params_list)[source]

Execute query multiple times with different params (batch insert).

Parameters:
Return type:

int

async fetch_one(query, params=None)[source]

Execute query, return single row as dict or None.

Parameters:
Return type:

dict[str, Any] | None

async fetch_all(query, params=None)[source]

Execute query, return all rows as list of dicts.

Parameters:
Return type:

list[dict[str, Any]]

async execute_script(script)[source]

Execute multiple statements (for schema creation).

Parameters:

script (str)

Return type:

None

for_update_clause()[source]

Return FOR UPDATE clause for row locking.

Return type:

str

async commit()[source]

Commit is handled per-operation with connection pooling.

Return type:

None

async rollback()[source]

Rollback is handled per-operation with connection pooling.

Return type:

None

Storage Package

Storage node management for multi-backend file storage.

StorageManager: mount point management compatible with genro-storage API.

The StorageManager handles configuration of storage backends and creation of StorageNode instances for file operations.

class storage.manager.StorageManager[source]

Bases: object

Manages storage mount points and creates StorageNode instances.

API is compatible with genro-storage for future migration.

Usage:

storage = StorageManager() storage.configure([

{‘name’: ‘data’, ‘protocol’: ‘local’, ‘base_path’: ‘/data’}, {‘name’: ‘cache’, ‘protocol’: ‘local’, ‘base_path’: ‘/tmp/cache’},

])

node = storage.node(’data:files/report.pdf’) await node.write_bytes(content)

__init__()[source]

Initialize the storage manager.

configure(source)[source]

Configure mount points from file or list.

Parameters:

source (str | list[dict[str, Any]]) – Either a path to YAML/JSON file, or a list of mount configs. Each config must have ‘name’ and ‘protocol’ keys.

Return type:

None

Example configs:

{‘name’: ‘data’, ‘protocol’: ‘local’, ‘base_path’: ‘/data’} {‘name’: ‘s3’, ‘protocol’: ‘s3’, ‘bucket’: ‘my-bucket’}

register(name, config)[source]

Register a single mount point.

Parameters:
  • name (str) – Mount point name.

  • config (dict[str, Any] | str) – Either a config dict or a storage URL string.

Return type:

None

Examples

storage.register(‘data’, {‘protocol’: ‘local’, ‘base_path’: ‘/data’}) storage.register(‘data’, ‘/data’) # Shorthand for local storage.register(‘s3’, ‘s3://bucket/path’)

get_mount_names()[source]

Get list of configured mount point names.

Return type:

list[str]

has_mount(name)[source]

Check if a mount point is configured.

Parameters:

name (str)

Return type:

bool

get_mount_config(name)[source]

Get configuration for a mount point.

Parameters:

name (str)

Return type:

dict[str, Any] | None

node(mount_or_path, *parts)[source]

Create a StorageNode for a file or directory.

Parameters:
  • mount_or_path (str) – Either ‘mount:path’ or just mount name.

  • *parts (str) – Additional path components.

Return type:

StorageNode

Returns:

StorageNode for the specified path (composed with EE mixin if available).

Examples

storage.node(’data:files/report.pdf’) storage.node(‘data’, ‘files’, ‘report.pdf’)

StorageNode: file/directory abstraction compatible with genro-storage API.

A StorageNode represents a file or directory in a storage backend. It provides a unified interface for file operations regardless of backend.

class storage.node.StorageNode(manager, mount_name, path, config)[source]

Bases: object

A file or directory in a storage backend.

Provides methods for file I/O that work with any backend (local, S3, etc.). API is compatible with genro-storage for future migration.

Parameters:
__init__(manager, mount_name, path, config)[source]

Initialize a storage node.

Parameters:
  • manager (StorageManager) – The StorageManager that created this node.

  • mount_name (str) – Name of the mount point.

  • path (str) – Path within the mount (without mount prefix).

  • config (dict[str, Any]) – Mount configuration dict.

property basename: str

Filename with extension.

property stem: str

Filename without extension.

property suffix: str

File extension (including dot).

property fullpath: str

path).

Type:

Full path including mount (mount

property path: str

Path within the mount (without mount prefix).

property mount_name: str

Name of the mount point.

property parent: StorageNode

Parent directory node.

property mimetype: str

MIME type based on file extension.

child(*parts)[source]

Get a child node by path components.

Parameters:

*parts (str) – Path components to append.

Return type:

StorageNode

Returns:

New StorageNode for the child path.

async exists()[source]

Check if file/directory exists.

Return type:

bool

async is_file()[source]

Check if node is a file.

Return type:

bool

async is_dir()[source]

Check if node is a directory.

Return type:

bool

async size()[source]

Get file size in bytes.

Return type:

int

async mtime()[source]

Get last modification time (Unix timestamp).

Return type:

float

async read_bytes()[source]

Read entire file as bytes.

Return type:

bytes

async read_text(encoding='utf-8')[source]

Read entire file as string.

Parameters:

encoding (str)

Return type:

str

async write_bytes(data)[source]

Write bytes to file.

Parameters:

data (bytes)

Return type:

None

async write_text(text, encoding='utf-8')[source]

Write string to file.

Parameters:
Return type:

None

async delete()[source]

Delete file or directory.

Return type:

bool

Returns:

True if deleted, False if not found.

async mkdir(parents=False, exist_ok=False)[source]

Create directory.

Parameters:
Return type:

None

async children()[source]

List child nodes (if directory).

Return type:

list[StorageNode]

async md5hash()[source]

Calculate MD5 hash of file content.

Return type:

str

url(expires_in=3600)[source]

Generate download URL.

For local filesystem, generates a signed token URL. For cloud backends, uses native presigned URLs.

Parameters:

expires_in (int) – URL expiration in seconds.

Return type:

str

Returns:

Download URL.

verify_url_token(token)[source]

Verify a URL token is valid and not expired.

Parameters:

token (str) – The token from the URL query string.

Return type:

bool

Returns:

True if valid, False otherwise.

exception storage.node.StorageError[source]

Bases: Exception

Base exception for storage operations.

Tools Package

Shared utilities used across packages.

Field-level encryption for sensitive database values.

Provides AES-256-GCM encryption for sensitive fields like passwords and API keys. The encryption key is loaded from environment variable or secrets file.

Key sources (in priority order): 1. MAIL_PROXY_ENCRYPTION_KEY environment variable (base64-encoded) 2. /run/secrets/encryption_key file (Docker/Kubernetes secrets) 3. {instance_dir}/.encryption_key file (development only)

Usage:

from tools.encryption import encrypt_value, decrypt_value

# Encrypt before storing encrypted = encrypt_value(“my-secret-password”) # Returns: “ENC:base64-encoded-ciphertext”

# Decrypt after reading plaintext = decrypt_value(encrypted) # Returns: “my-secret-password”

exception tools.encryption.EncryptionError[source]

Bases: Exception

Raised when encryption/decryption fails.

exception tools.encryption.EncryptionKeyNotConfigured[source]

Bases: EncryptionError

Raised when encryption key is not available.

tools.encryption.decrypt_value(encrypted)[source]

Decrypt a value encrypted with encrypt_value().

Parameters:

encrypted (str) – The encrypted value (with “ENC:” prefix).

Return type:

str

Returns:

Decrypted plaintext.

Raises:
tools.encryption.decrypt_value_with_key(encrypted, key)[source]

Decrypt a value using provided key.

Same as decrypt_value() but uses an explicit key instead of global. Used by Table for field decryption with proxy-provided key.

Parameters:
  • encrypted (str) – The encrypted value (with “ENC:” prefix).

  • key (bytes) – 32-byte AES-256 key.

Return type:

str

Returns:

Decrypted plaintext.

tools.encryption.encrypt_value(plaintext)[source]

Encrypt a string value using AES-256-GCM.

Parameters:

plaintext (str) – The value to encrypt.

Returns:

“ prefix.

Return type:

Encrypted value with “ENC

Raises:
tools.encryption.encrypt_value_with_key(plaintext, key)[source]

Encrypt a string value using provided key.

Same as encrypt_value() but uses an explicit key instead of global. Used by Table for field encryption with proxy-provided key.

Parameters:
  • plaintext (str) – The value to encrypt.

  • key (bytes) – 32-byte AES-256 key.

Returns:

“ prefix.

Return type:

Encrypted value with “ENC

tools.encryption.generate_key()[source]

Generate a new random encryption key.

Return type:

str

Returns:

Base64-encoded 32-byte key suitable for MAIL_PROXY_ENCRYPTION_KEY.

tools.encryption.is_encrypted(value)[source]

Check if a value is encrypted (has ENC: prefix).

Parameters:

value (str)

Return type:

bool

tools.encryption.set_key_for_testing(key)[source]

Set encryption key directly (for testing only).

Parameters:

key (bytes | None) – 32-byte key or None to clear.

Return type:

None

Prometheus metrics for monitoring the mail dispatcher.

This module defines the Prometheus counters and gauges used to track email dispatch operations. All metrics use the gmp_ prefix (genro-mail-proxy).

Metrics exposed:
  • gmp_sent_total: Counter of successfully sent emails per account.

  • gmp_errors_total: Counter of send errors per account.

  • gmp_deferred_total: Counter of deferred emails per account.

  • gmp_rate_limited_total: Counter of rate limit hits per account.

  • gmp_pending_messages: Gauge of messages currently in queue.

All counters are labeled by:
  • tenant_id: Tenant identifier

  • tenant_name: Human-readable tenant name

  • account_id: SMTP account identifier

  • account_name: Human-readable account name (defaults to account_id)

Example

Accessing metrics via the REST API:

GET /metrics

Returns Prometheus text format suitable for scraping.

class tools.prometheus.metrics.MailMetrics(registry=None)[source]

Bases: object

Prometheus metrics collector for the mail dispatcher.

Encapsulates all Prometheus counters and gauges used to monitor email dispatch operations. Each metric is labeled by tenant and account info to enable per-tenant and per-account monitoring and alerting.

registry

The Prometheus CollectorRegistry holding all metrics.

sent

Counter tracking successfully sent emails.

errors

Counter tracking permanent send failures.

deferred

Counter tracking temporarily deferred messages.

rate_limited

Counter tracking rate limit enforcement events.

pending

Gauge showing current queue depth.

Parameters:

registry (CollectorRegistry | None)

__init__(registry=None)[source]

Initialize metrics with an optional custom registry.

Parameters:

registry (CollectorRegistry | None) – Optional Prometheus CollectorRegistry. If not provided, a new registry is created. Use a custom registry for testing or when multiple metric sets are needed.

inc_sent(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]

Increment the sent counter.

Parameters:
Return type:

None

inc_error(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]

Increment the error counter.

Parameters:
Return type:

None

inc_deferred(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]

Increment the deferred counter.

Parameters:
Return type:

None

inc_rate_limited(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]

Increment the rate-limited counter.

Parameters:
Return type:

None

set_pending(value)[source]

Set the pending messages gauge to a specific value.

Parameters:

value (int) – The current number of messages awaiting delivery.

Return type:

None

init_account(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]

Initialize all counters for an account with zero values.

This ensures metrics appear in Prometheus output even before any actual email activity occurs. Prometheus counters with labels only appear in output after being incremented, so this method explicitly creates the label combinations with initial value 0.

Parameters:
Return type:

None

generate_latest()[source]

Export all metrics in Prometheus text exposition format.

Return type:

bytes

Returns:

Byte string containing all metrics in Prometheus format, suitable for HTTP response to a Prometheus scraper.

REPL protection utilities.

Provides decorators and wrappers to protect sensitive methods/attributes from being accessed in interactive REPL sessions.

Usage:

from tools.repl import reserved, repl_wrap

class MyService:

@reserved def get_secret_key(self):

return self._secret

def public_method(self):

return “hello”

# In REPL setup: service = MyService() namespace = {“service”: repl_wrap(service)}

# Now in REPL: >>> service.public_method() # Works ‘hello’ >>> service.get_secret_key() # Blocked AttributeError: ‘get_secret_key’ is reserved and not accessible in REPL

class tools.repl.REPLWrapper(wrapped)[source]

Bases: object

Wrapper that blocks access to @reserved methods/attributes.

This wrapper intercepts attribute access and raises AttributeError for any method marked with @reserved decorator.

Parameters:

wrapped (Any)

__init__(wrapped)[source]
Parameters:

wrapped (Any)

__dir__()[source]

Return directory listing, excluding reserved methods.

Return type:

list[str]

tools.repl.is_reserved(obj)[source]

Check if an object (method/function) is marked as reserved.

Parameters:

obj (Any)

Return type:

bool

tools.repl.repl_wrap(obj)[source]

Wrap an object for safe REPL access.

Returns a wrapper that blocks access to @reserved methods. The wrapper is transparent for all other operations.

Parameters:

obj (TypeVar(T)) – The object to wrap.

Return type:

TypeVar(T)

Returns:

A REPLWrapper that behaves like the original object but blocks @reserved methods.

Usage:

# In REPL setup code: namespace = {

“proxy”: repl_wrap(proxy), “db”: repl_wrap(db),

} code.interact(local=namespace)

tools.repl.reserved(func)[source]

Mark a method as reserved (not accessible from REPL).

Usage:
class MyClass:

@reserved def sensitive_method(self):

Parameters:

func (Callable[..., TypeVar(T)])

Return type:

Callable[..., TypeVar(T)]