Python Modules
This document provides API reference for the genro-mail-proxy Python modules.
The package is organized into four main namespaces:
core.mail_proxy- Core functionality (Apache 2.0)enterprise.mail_proxy- Enterprise features (BSL 1.1)sql- Database abstraction layerstorage- Storage node managementtools- Shared utilities
Core Package (Apache 2.0)
Main Components
Email dispatcher microservice with multi-tenant support.
This package provides an asynchronous email dispatch service with queueing, rate limiting, retry logic, and multi-backend attachments.
- Components:
MailProxy: Main service class with SMTP sender and background loops. MailProxyBase: Foundation layer with database and endpoint discovery. ProxyConfig: Hierarchical configuration dataclasses.
- Features:
Multi-tenant isolation with per-tenant configuration
Priority-based message queuing (immediate/high/medium/low)
Per-account rate limiting (minute/hour/day)
Automatic retry with exponential backoff
Attachment fetching (HTTP endpoint, URL, base64, filesystem)
Delivery report callbacks to client applications
Prometheus metrics for monitoring
FastAPI REST API for control and message submission
SQLite/PostgreSQL persistence
Example
Create and run the mail service:
from core.mail_proxy.proxy import MailProxy
from core.mail_proxy.interface import create_app
proxy = MailProxy(db_path="/data/mail.db")
app = create_app(proxy, api_token="secret")
# Or via CLI
# mail-proxy serve --port 8000
Note
Enterprise Edition (EE) extends this package with bounce detection, PEC (certified email), and per-tenant API keys when the enterprise package is installed.
- core.mail_proxy.main()[source]
CLI entry point. Creates a MailProxy and runs the CLI.
Resolves the current instance from context (via mail-proxy use) and configures the MailProxy with the correct database path.
- Return type:
MailProxy: runtime layer with SMTP sending, background loops, and metrics.
MailProxy extends MailProxyBase with the full runtime for email dispatch:
- Components:
SmtpSender: Connection pool, rate limiting, dispatch loop
ClientReporter: Delivery report sync to upstream services
AttachmentManager: Fetch attachments with two-tier cache
Prometheus MailMetrics: Counters and gauges for monitoring
- Class Hierarchy:
- MailProxyBase (proxy_base.py): config, db, tables, endpoints, api/cli
- └── MailProxy (this class): +SmtpSender, +ClientReporter, +metrics
└── MailProxy with MailProxy_EE mixin: +bounce detection (EE)
- EE Hook Methods (CE stubs, overridden by MailProxy_EE):
__init_proxy_ee__(): Initialize EE state (bounce_receiver, _bounce_config)
_start_proxy_ee(): Start bounce poller background task
_stop_proxy_ee(): Stop bounce poller
- Command Handling:
handle_command() is the entry point for external control. Commands are routed to local handlers (runtime-dependent) or EndpointDispatcher (CRUD).
- Local commands (require runtime state):
run now, listTenantsSyncStatus, addMessages, deleteMessages, cleanupMessages, deleteAccount
- Delegated commands (via EndpointDispatcher):
addTenant, getTenant, listTenants, updateTenant, deleteTenant, addAccount, listAccounts, getAccount, …
- Background Tasks:
SmtpSender.dispatch_loop: Fetch pending messages, send via SMTP
ClientReporter.sync_loop: Report delivery events to upstream
- Usage (recommended factory):
proxy = await MailProxy.create(db_path=”/data/mail.db”, start_active=True) await proxy.stop()
- Usage (via API property):
proxy = MailProxy(config=ProxyConfig(db_path=”/data/mail.db”)) app = proxy.api # Lifespan calls start()/stop() automatically
- Module Constants:
PRIORITY_LABELS: {0: “immediate”, 1: “high”, 2: “medium”, 3: “low”} LABEL_TO_PRIORITY: Reverse mapping DEFAULT_PRIORITY: 2 (medium)
- class core.mail_proxy.proxy.MailProxy(config=None, *, logger=None, metrics=None)[source]
Bases:
MailProxyBaseRuntime layer: SMTP sending, background loops, metrics, command handling.
Extends MailProxyBase with: - SmtpSender: SMTP connection pool, rate limiter, dispatch loop - ClientReporter: Delivery report sync loop - AttachmentManager: Fetch attachments with caching - MailMetrics: Prometheus counters and gauges
- Class Attributes:
is_enterprise: True when EE modules installed (set dynamically)
- Instance Attributes (from base):
config: ProxyConfig instance db: SqlDb with autodiscovered tables endpoints: Dict of endpoint instances
- Instance Attributes (runtime):
smtp_sender: SmtpSender instance (pool, rate_limiter, dispatch) client_reporter: ClientReporter instance (sync loop) attachments: AttachmentManager instance metrics: MailMetrics instance logger: Logger for diagnostics
- Compatibility Properties (delegate to smtp_sender):
pool: SMTP connection pool rate_limiter: Per-account rate limiter
- Parameters:
config (
ProxyConfig|None)metrics (
MailMetrics|None)
- is_enterprise: bool = False
True when EE modules installed. Set dynamically in mail_proxy/__init__.py.
- property pool
SMTP connection pool (delegate to smtp_sender.pool).
- property rate_limiter
Per-account rate limiter (delegate to smtp_sender.rate_limiter).
- __init_proxy_ee__()[source]
Initialize EE state (bounce_receiver, _bounce_config). CE stub.
- Return type:
- __init__(config=None, *, logger=None, metrics=None)[source]
Initialize the mail dispatcher with ProxyConfig.
- Parameters:
config (
ProxyConfig|None) – ProxyConfig instance with all configuration. If None, creates default config.logger (
Logger|None) – Custom logger instance. If None, uses default logger.metrics (
MailMetrics|None) – Prometheus metrics collector. If None, creates new instance.
- async classmethod create(**kwargs)[source]
Create and initialize a MailProxy instance.
This is the recommended way to create instances. It ensures proper async initialization is completed before returning a ready-to-use proxy.
- Parameters:
**kwargs – All arguments accepted by MailProxy.__init__().
- Return type:
- Returns:
Fully initialized MailProxy instance with background tasks running.
Example
proxy = await MailProxy.create(db_path=”./mail.db”) # Ready to use immediately - no need to call start()
Note
For cases requiring delayed startup, use the traditional pattern:
proxy = MailProxy(db_path="./mail.db") # ... additional setup ... await proxy.start()
- async init()[source]
Initialize persistence layer and attachment manager.
Performs the following initialization steps: 1. Initialize database schema and run migrations (via MailProxyBase.init) 2. Load cache configuration (from env vars or config file) 3. Initialize attachment cache (memory and disk tiers) 4. Create the AttachmentManager
- Return type:
- async handle_command(cmd, payload=None)[source]
Execute an external control command.
Dispatches the command to the appropriate handler method. Supported commands: -
run now: Trigger immediate dispatch cycle -suspend: Pause the scheduler -activate: Resume the scheduler -addAccount,listAccounts,deleteAccount: SMTP account management -addMessages,deleteMessages,listMessages: Message queue management -cleanupMessages: Remove old reported messages -addTenant,getTenant,listTenants,updateTenant,deleteTenant: Tenant managementState-modifying commands are automatically logged to the command_log table for audit trail and replay capability.
- async start()[source]
Start the background scheduler and maintenance tasks.
Initializes the persistence layer and spawns background tasks for: - SMTP dispatch via smtp_sender - Client report loop: sends delivery reports to upstream services
- Return type:
Base class for MailProxy: database, tables, endpoints, and interface factories.
MailProxyBase is the foundation layer of genro-mail-proxy, providing:
Configuration: ProxyConfig instance at self.config
Database: SqlDb at self.db with autodiscovered Table classes
Endpoints: Registry at self.endpoints with autodiscovered Endpoint classes
Interfaces: Lazy api (FastAPI) and cli (Click) properties
- Class Hierarchy:
- MailProxyBase (this class)
- └── MailProxy (proxy.py): adds SMTP sender, background loops, metrics
└── MailProxy with MailProxy_EE mixin: adds bounce detection (EE)
- Discovery Mechanism:
Tables from core.mail_proxy.entities.*/table.py are composed with optional EE mixins from enterprise.mail_proxy.entities.*/table_ee.py. Endpoints follow the same pattern with endpoint.py and endpoint_ee.py.
- Discovered entities (CE):
tenants: Multi-tenant isolation
accounts: SMTP account configuration
messages: Message queue with priority
message_events: Delivery event history
command_log: API audit trail
instance: Service-level configuration
- EE extensions (when enterprise package installed):
accounts: +PEC (certified email) fields
tenants: +API key management
instance: +Bounce detection config
- Usage (testing without runtime):
proxy = MailProxyBase(db_path=”:memory:”) await proxy.init() await proxy.db.table(“tenants”).add({“id”: “t1”, “name”: “Test”})
- Usage (production via proxy.api):
proxy = MailProxy(config=ProxyConfig(db_path=”/data/mail.db”)) app = proxy.api # FastAPI app with auto-start/stop lifespan
- class core.mail_proxy.proxy_base.MailProxyBase(config=None)[source]
Bases:
objectFoundation layer: config, database, tables, endpoints, interface factories.
- config
ProxyConfig instance with all configuration
- db
SqlDb with autodiscovered Table classes (CE + EE mixins)
- endpoints
Dict of Endpoint instances keyed by name
- Properties:
api: FastAPI app (lazy, created on first access) cli: Click CLI group (lazy, created on first access)
Subclassed by MailProxy which adds SMTP sender, background loops, etc.
- Parameters:
config (
ProxyConfig|None)
- __init__(config=None)[source]
Initialize base proxy with config and database.
- Parameters:
config (
ProxyConfig|None) – ProxyConfig instance. If None, creates default.
- endpoints: dict[str, BaseEndpoint]
- property encryption_key: bytes | None
Encryption key for database field encryption. None if not configured.
- async init()[source]
Initialize database: connect, create tables, run migrations, detect edition.
- Return type:
- property api: FastAPI
FastAPI app with all endpoints, auth middleware, and lifespan.
Created on first access. Includes default lifespan that calls proxy.start() on startup and proxy.stop() on shutdown.
- Usage:
uvicorn core.mail_proxy.server:app
- property cli: Group
Click CLI group with endpoint commands and service commands.
Created on first access. Includes: - Endpoint commands: tenants, accounts, messages, instance, command_log - Service commands: serve, connect, stats, token, run-now
- Usage:
mail-proxy –help
Configuration dataclasses for MailProxy (Community Edition).
This module defines the configuration hierarchy for genro-mail-proxy. ProxyConfig is the single entry point for all configuration, organizing settings into logical nested groups (timing, queue, concurrency, etc.).
- Architecture:
ProxyConfig is used by MailProxyBase (and thus MailProxy) to configure the service. It is CE-only: Enterprise Edition does not extend these dataclasses but rather stores EE-specific config in the database (via InstanceTable_EE for bounce detection).
- Usage:
- config = ProxyConfig(
db_path=”/data/mail.db”, timing=TimingConfig(send_loop_interval=1.0), concurrency=ConcurrencyConfig(max_sends=20),
) proxy = MailProxy(config=config)
# Access nested config interval = proxy.config.timing.send_loop_interval
- Nested Config Classes:
TimingConfig: Intervals, timeouts, retention periods
QueueConfig: Queue sizes, batch limits
ConcurrencyConfig: Parallelism limits (sends, attachments)
ClientSyncConfig: Upstream reporting (URL, auth)
RetryConfig: Retry behavior (max attempts, delays)
CacheConfig: Attachment cache (memory/disk tiers)
- class core.mail_proxy.proxy_config.CacheConfig(memory_max_mb=50.0, memory_ttl_seconds=300, disk_dir=None, disk_max_mb=500.0, disk_ttl_seconds=3600, disk_threshold_kb=100.0)[source]
Bases:
objectTwo-tier attachment cache configuration (memory + disk).
- Parameters:
- __init__(memory_max_mb=50.0, memory_ttl_seconds=300, disk_dir=None, disk_max_mb=500.0, disk_ttl_seconds=3600, disk_threshold_kb=100.0)
- class core.mail_proxy.proxy_config.ClientSyncConfig(url=None, user=None, password=None, token=None)[source]
Bases:
objectUpstream delivery report synchronization settings.
- class core.mail_proxy.proxy_config.ConcurrencyConfig(max_sends=10, max_per_account=3, max_attachments=3)[source]
Bases:
objectParallelism limits for SMTP sends and attachment fetches.
- class core.mail_proxy.proxy_config.ProxyConfig(db_path='/data/mail_service.db', instance_name='mail-proxy', port=8000, api_token=None, timing=<factory>, queue=<factory>, concurrency=<factory>, client_sync=<factory>, retry=<factory>, cache=<factory>, default_priority=2, test_mode=False, log_delivery_activity=False, start_active=False, report_delivery_callable=None)[source]
Bases:
objectMain configuration container for MailProxy (CE).
Single entry point for all proxy configuration. Groups settings into logical nested structures for clean organization and access.
This class is CE-only. Enterprise Edition stores additional config (bounce detection, PEC) in the database via table extensions.
- Nested Groups:
timing: Intervals, timeouts, retention periods queue: Queue sizes, batch limits concurrency: Parallelism limits (sends, attachments) client_sync: Upstream reporting (URL, auth) retry: Retry behavior (max attempts, delays) cache: Attachment cache (memory/disk tiers)
- Top-Level Settings:
db_path: SQLite/PostgreSQL database path instance_name: Service identifier for display port: Default API server port api_token: Optional bearer token for API auth default_priority: Default message priority (0-3) test_mode: Disable auto-processing for tests log_delivery_activity: Verbose delivery logging start_active: Start processing immediately report_delivery_callable: Custom delivery report handler
Example
- config = ProxyConfig(
db_path=”/data/mail.db”, timing=TimingConfig(send_loop_interval=1.0), concurrency=ConcurrencyConfig(max_sends=20),
) proxy = MailProxy(config=config) interval = proxy.config.timing.send_loop_interval
- Parameters:
db_path (
str)instance_name (
str)port (
int)timing (
TimingConfig)queue (
QueueConfig)concurrency (
ConcurrencyConfig)client_sync (
ClientSyncConfig)retry (
RetryConfig)cache (
CacheConfig)default_priority (
int)test_mode (
bool)log_delivery_activity (
bool)start_active (
bool)report_delivery_callable (
Callable[[dict[str,Any]],Awaitable[None]] |None)
- __init__(db_path='/data/mail_service.db', instance_name='mail-proxy', port=8000, api_token=None, timing=<factory>, queue=<factory>, concurrency=<factory>, client_sync=<factory>, retry=<factory>, cache=<factory>, default_priority=2, test_mode=False, log_delivery_activity=False, start_active=False, report_delivery_callable=None)
- Parameters:
db_path (
str)instance_name (
str)port (
int)timing (
TimingConfig)queue (
QueueConfig)concurrency (
ConcurrencyConfig)client_sync (
ClientSyncConfig)retry (
RetryConfig)cache (
CacheConfig)default_priority (
int)test_mode (
bool)log_delivery_activity (
bool)start_active (
bool)report_delivery_callable (
Callable[[dict[str,Any]],Awaitable[None]] |None)
- timing: TimingConfig
Timing and interval settings.
- queue: QueueConfig
Queue size and batch settings.
- concurrency: ConcurrencyConfig
Concurrency limits.
- client_sync: ClientSyncConfig
Client synchronization settings.
- retry: RetryConfig
Retry behavior settings.
- cache: CacheConfig
Attachment cache settings.
- class core.mail_proxy.proxy_config.QueueConfig(result_size=1000, message_size=10000, put_timeout=5.0, max_enqueue_batch=1000)[source]
Bases:
objectQueue sizes and batch limits for message processing.
- class core.mail_proxy.proxy_config.RetryConfig(max_retries=3, delays=(60, 300, 900))[source]
Bases:
objectSMTP retry behavior with exponential backoff.
- class core.mail_proxy.proxy_config.TimingConfig(send_loop_interval=0.5, attachment_timeout=30, report_retention_seconds=604800)[source]
Bases:
objectTiming and interval settings for the dispatch service.
ASGI application entry point for uvicorn.
This module provides the FastAPI application instance for deployment with ASGI servers like uvicorn, hypercorn, or gunicorn+uvicorn.
- Configuration via environment variables:
GMP_DB_PATH: Database path (SQLite file or PostgreSQL URL) GMP_API_TOKEN: API authentication token
- Components:
app: FastAPI application with full MailProxy lifecycle management. _proxy: Internal MailProxy instance (use app instead).
Example
Run with uvicorn:
GMP_DB_PATH=/data/mail.db GMP_API_TOKEN=secret \
uvicorn core.mail_proxy.server:app --host 0.0.0.0 --port 8000
Run with Docker:
docker run -e GMP_DB_PATH=postgresql://... -e GMP_API_TOKEN=secret ...
Or via CLI (reads from ~/.mail-proxy/<name>/config.ini):
mail-proxy serve --port 8000
Note
The application includes a lifespan context manager that calls proxy.start() on startup and proxy.stop() on shutdown, ensuring proper initialization of background tasks and graceful cleanup.
- Design Decision (2025-02):
API tokens are stored in plaintext in config.ini for CLI instances. This is acceptable for development; production uses Docker with environment variables. Hashing like tenant tokens was considered but deferred since Docker passes tokens via env vars anyway.
Interface Layer
FastAPI route generation from endpoint classes via introspection.
This module generates REST API routes automatically from endpoint classes by introspecting method signatures and creating appropriate handlers.
- Components:
create_app: FastAPI application factory. register_endpoint: Register endpoint methods as FastAPI routes. verify_tenant_token: Token verification for tenant-scoped requests. require_admin_token: Admin-only endpoint protection. require_token: General authentication dependency.
Example
Create and run the API server:
from core.mail_proxy.interface import create_app
from core.mail_proxy.proxy import MailProxy
proxy = MailProxy(db_path="/data/mail.db")
app = create_app(proxy, api_token="secret")
# Run with uvicorn
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Register custom endpoints:
from fastapi import FastAPI
from core.mail_proxy.interface import register_endpoint
app = FastAPI()
endpoint = MyCustomEndpoint(table)
register_endpoint(app, endpoint)
Note
Authentication uses X-API-Token header. Global token grants admin access to all tenants. Tenant tokens restrict access to own resources.
- core.mail_proxy.interface.api_base.create_app(svc, api_token=None, lifespan=None, tenant_tokens_enabled=False)[source]
Create and configure the FastAPI application.
- Parameters:
svc (
MailProxy) – MailProxy instance implementing business logic.api_token (
str|None) – Optional global token for X-API-Token authentication.lifespan (
Callable[[FastAPI],AbstractAsyncContextManager[None]] |None) – Optional lifespan context manager. If None, creates default that starts/stops the proxy service.tenant_tokens_enabled (
bool) – When True, enables per-tenant API keys.
- Return type:
- Returns:
Configured FastAPI application with all routes registered.
Example
from core.mail_proxy.proxy import MailProxy from core.mail_proxy.interface import create_app proxy = MailProxy(db_path="/data/mail.db") app = create_app(proxy, api_token="admin-secret") # Run with uvicorn import uvicorn uvicorn.run(app)
- core.mail_proxy.interface.api_base.register_endpoint(app, endpoint, prefix='')[source]
Register all methods of an endpoint as FastAPI routes.
Introspects the endpoint to discover async methods and creates appropriate GET (query params) or POST (body) routes.
- Parameters:
- Return type:
Example
endpoint = AccountEndpoint(db.table("accounts")) register_endpoint(app, endpoint) # Creates routes: GET /accounts/list, POST /accounts/add, etc.
- async core.mail_proxy.interface.api_base.require_admin_token(request, api_token=Depends(dependency=<fastapi.security.api_key.APIKeyHeader object>, use_cache=True, scope=None))[source]
Require global admin token for admin-only endpoints.
Admin-only endpoints include tenant management, API key operations, and instance configuration.
- async core.mail_proxy.interface.api_base.require_token(request, api_token=Depends(dependency=<fastapi.security.api_key.APIKeyHeader object>, use_cache=True, scope=None))[source]
Validate API token from X-API-Token header.
Accepts global admin token (full access) or tenant token (own resources). Stores token info in request.state for downstream verification.
- async core.mail_proxy.interface.api_base.verify_tenant_token(tenant_id, api_token, global_token)[source]
Verify API token for a tenant-scoped request.
- Parameters:
- Raises:
HTTPException – 401 if token is invalid or tenant_id mismatch.
- Return type:
Note
Global token grants access to any tenant
Tenant token grants access ONLY to own resources
No token configured = open access
Click command generation from endpoint classes via introspection.
This module generates CLI commands automatically from endpoint classes by introspecting method signatures and creating Click commands.
- Components:
register_endpoint: Register endpoint methods as Click commands.
Example
Register endpoint commands:
import click
from core.mail_proxy.interface import register_cli_endpoint
from core.mail_proxy.entities.account import AccountEndpoint
@click.group()
def cli():
pass
endpoint = AccountEndpoint(table)
register_cli_endpoint(cli, endpoint)
# Creates: cli accounts add, cli accounts get, cli accounts list
Generated commands:
mail-proxy accounts list # uses context tenant
mail-proxy accounts list acme # explicit tenant
mail-proxy accounts add main --host smtp.example.com
Note
tenant_id is special: optional positional with context fallback
Other required params become positional arguments
Optional params become –options
Boolean params become –flag/–no-flag toggles
Method underscores become dashes (add_batch → add-batch)
- core.mail_proxy.interface.cli_base.register_endpoint(group, endpoint, run_async=None)[source]
Register all methods of an endpoint as Click commands.
Creates a subgroup named after the endpoint and adds commands for each public async method.
- Parameters:
- Return type:
Group- Returns:
The created Click subgroup with all endpoint commands.
Example
@click.group() def cli(): pass endpoint = AccountEndpoint(db.table("accounts")) register_endpoint(cli, endpoint) # Now available: # cli accounts list # cli accounts add <id> --host <host> --port <port> # cli accounts delete <id>
Special CLI commands not derived from endpoint introspection.
This module provides CLI commands that don’t map directly to REST API endpoints. These are administrative and utility commands requiring special handling (interactive sessions, file I/O, server communication).
- Components:
add_connect_command: Interactive Python REPL with pre-configured client. add_stats_command: Display aggregate queue statistics. add_send_command: Queue email from .eml file. add_token_command: API token management (show/regenerate). add_run_now_command: Trigger immediate dispatch cycle via HTTP. add_list_command: List all configured instances with status. add_stop_command: Stop running instances. add_restart_command: Restart running instances.
- Instance Management:
Instances are stored in ~/.mail-proxy/<name>/ with config.ini files. The list/stop/restart commands manage these instances by tracking PID files for process management.
Example
Add special commands to CLI group:
from core.mail_proxy.interface.cli_commands import (
add_connect_command,
add_stats_command,
add_send_command,
add_list_command,
add_stop_command,
)
@click.group()
def cli():
pass
add_connect_command(cli, get_url, get_token, "myinstance")
add_stats_command(cli, db)
add_send_command(cli, db, "tenant1")
add_list_command(cli)
add_stop_command(cli)
Run commands:
mail-proxy myinstance connect
mail-proxy myinstance stats --json
mail-proxy myinstance tenant1 send email.eml
mail-proxy list
mail-proxy stop myserver
Note
These commands are registered separately from endpoint-derived commands because they require special parameters (callbacks, file paths) or interactive behavior not suitable for introspection.
- core.mail_proxy.interface.cli_commands.add_connect_command(group, get_url, get_token, instance_name)[source]
Register ‘connect’ command for interactive Python REPL.
Creates a REPL session with a pre-configured MailProxyClient for interactive server administration and debugging.
- Parameters:
group (
Group) – Click group to register command on.get_url (
Callable[[],str]) – Callback returning server URL (from instance config).get_token (
Callable[[],str|None]) – Callback returning API token (from instance config).instance_name (
str) – Instance name for display and client configuration.
- Return type:
Example
mail-proxy myserver connect mail-proxy myserver connect --url http://remote:8000 --token secret # In REPL: >>> proxy.status() >>> proxy.messages.list(tenant_id="acme")
- core.mail_proxy.interface.cli_commands.add_stats_command(group, db)[source]
Register ‘stats’ command for aggregate queue statistics.
Displays tenant/account/message counts with breakdown by status.
- Parameters:
group (click.Group) – Click group to register command on.
db (MailProxyDb) – Database instance for querying statistics.
- Return type:
None
Example
mail-proxy myserver stats mail-proxy myserver stats --json
- core.mail_proxy.interface.cli_commands.add_send_command(group, db, tenant_id)[source]
Register ‘send’ command to queue email from .eml file.
Parses RFC 5322 email file and queues for delivery.
- Parameters:
group (click.Group) – Click group to register command on.
db (MailProxyDb) – Database instance for message operations.
tenant_id (str) – Tenant context for the send operation.
- Return type:
None
Example
mail-proxy myserver acme send email.eml mail-proxy myserver acme send email.eml --account smtp1 --priority 1
- core.mail_proxy.interface.cli_commands.add_token_command(group, db)[source]
Register ‘token’ command for API token management.
Shows current token or regenerates a new one.
- Parameters:
group (click.Group) – Click group to register command on.
db (MailProxyDb) – Database instance for token storage.
- Return type:
None
Example
mail-proxy myserver token mail-proxy myserver token --regenerate
- core.mail_proxy.interface.cli_commands.add_run_now_command(group, get_url, get_token, tenant_id=None)[source]
Register ‘run-now’ command to trigger immediate dispatch.
Sends HTTP POST to running server to force dispatch cycle.
- Parameters:
- Return type:
Example
mail-proxy myserver run-now mail-proxy myserver acme run-now
- core.mail_proxy.interface.cli_commands.add_serve_command(group)[source]
Register ‘serve’ command to start a mail-proxy server instance.
- Parameters:
group (
Group) – Click group to register command on.- Return type:
Example
mail-proxy serve # Start default-mailer mail-proxy serve myserver # Start/create myserver mail-proxy serve myserver -p 8080 # Start on specific port mail-proxy serve myserver -c # Start and open REPL
- core.mail_proxy.interface.cli_commands.add_list_command(group)[source]
Register ‘list’ command to show all configured instances.
- Parameters:
group (
Group) – Click group to register command on.- Return type:
Example
mail-proxy list
- core.mail_proxy.interface.cli_commands.add_stop_command(group)[source]
Register ‘stop’ command to stop running instances.
- Parameters:
group (
Group) – Click group to register command on.- Return type:
Example
mail-proxy stop # Stop all running instances mail-proxy stop myserver # Stop specific instance mail-proxy stop myserver -f # Force kill
- core.mail_proxy.interface.cli_commands.add_restart_command(group)[source]
Register ‘restart’ command to restart running instances.
- Parameters:
group (
Group) – Click group to register command on.- Return type:
Example
mail-proxy restart # Restart all running instances mail-proxy restart myserver # Restart specific instance
- core.mail_proxy.interface.cli_commands.add_use_command(group)[source]
Register ‘use’ command to select current context (instance/tenant).
Sets the default instance and optionally tenant for subsequent commands.
- Parameters:
group (
Group) – Click group to register command on.- Return type:
Example
mail-proxy use production # instance only mail-proxy use production/acme # instance + tenant mail-proxy use /beta # change tenant only
- core.mail_proxy.interface.cli_commands.add_current_command(group)[source]
Register ‘current’ command to show current context.
- Parameters:
group (
Group) – Click group to register command on.- Return type:
Example
mail-proxy current mail-proxy current --export
- core.mail_proxy.interface.cli_commands.resolve_context(explicit_instance=None, explicit_tenant=None)[source]
Resolve active instance and tenant using priority chain.
- Resolution order for instance:
Explicit argument
GMP_INSTANCE environment variable
~/.mail-proxy/.current file (instance part)
Auto-select if only one instance exists
- Resolution order for tenant:
Explicit argument
GMP_TENANT environment variable
~/.mail-proxy/.current file (tenant part)
- core.mail_proxy.interface.cli_commands.require_context(explicit_instance=None, explicit_tenant=None, require_tenant=False)[source]
Resolve context or exit with error if ambiguous.
- Parameters:
- Return type:
- Returns:
(instance, tenant) tuple.
- Raises:
SystemExit – If required context cannot be resolved.
- core.mail_proxy.interface.cli_commands.resolve_instance(explicit=None)[source]
Resolve the active instance (backwards compatible wrapper).
- core.mail_proxy.interface.cli_commands.require_instance(explicit=None)[source]
Resolve instance or exit (backwards compatible wrapper).
Base class for endpoint introspection and command dispatch.
This module provides the foundation for automatic API/CLI generation from endpoint classes via method introspection.
- Components:
POST: Decorator to mark methods as HTTP POST. BaseEndpoint: Base class with introspection capabilities. EndpointDispatcher: Routes commands to endpoint methods.
Example
Define an endpoint:
from core.mail_proxy.interface.endpoint_base import BaseEndpoint, POST
class MyEndpoint(BaseEndpoint):
name = "items"
async def list(self, active_only: bool = False) -> list[dict]:
"""List all items."""
return await self.table.list_all(active_only=active_only)
@POST
async def add(self, id: str, name: str) -> dict:
"""Add a new item."""
return await self.table.add({"id": id, "name": name})
Use with dispatcher:
dispatcher = EndpointDispatcher(db)
result = await dispatcher.dispatch("addMessages", {"messages": [...]})
Note
BaseEndpoint.discover() scans CE and EE packages for endpoint classes and composes them when both exist for an entity.
- class core.mail_proxy.interface.endpoint_base.BaseEndpoint(table)[source]
Bases:
objectBase class for all endpoints with introspection capabilities.
Provides method discovery, HTTP method inference, and Pydantic model generation from signatures for automatic API/CLI generation.
- name
Endpoint name used in URL paths and CLI groups.
- table
Database table instance for operations.
Example
Create a custom endpoint:
class ItemEndpoint(BaseEndpoint): name = "items" async def get(self, item_id: str) -> dict: item = await self.table.get(item_id) if not item: raise ValueError(f"Item '{item_id}' not found") return item @POST async def add(self, id: str, name: str) -> dict: return await self.table.add({"id": id, "name": name}) # Register with FastAPI endpoint = ItemEndpoint(db.table("items")) register_endpoint(app, endpoint)
- Parameters:
table (
Any)
- __init__(table)[source]
Initialize endpoint with table reference.
- Parameters:
table (
Any) – Database table instance for operations.
- create_request_model(method_name)[source]
Create Pydantic model from method signature.
Used by API layer to validate and parse request bodies.
- is_simple_params(method_name)[source]
Check if method has only simple params suitable for query string.
- classmethod discover()[source]
Autodiscover all endpoint classes from entities/ directories.
Scans CE and EE packages for endpoint.py and endpoint_ee.py modules. When both exist for an entity, composes them with EE mixin first.
- Return type:
- Returns:
List of endpoint classes ready for instantiation.
Example
for endpoint_class in BaseEndpoint.discover(): table = db.table(endpoint_class.name) endpoint = endpoint_class(table) register_endpoint(app, endpoint)
- class core.mail_proxy.interface.endpoint_base.EndpointDispatcher(db, proxy=None)[source]
Bases:
objectDispatches commands to appropriate endpoint methods.
Centralizes command routing, mapping legacy camelCase commands to endpoint.method pairs for backward compatibility.
- COMMAND_MAP
Maps command names to (endpoint_name, method_name).
- db
Database instance for table access.
- proxy
Optional MailProxy for operations needing runtime state.
Example
Use dispatcher for legacy API compatibility:
dispatcher = EndpointDispatcher(db, proxy=proxy) # Dispatch legacy command result = await dispatcher.dispatch( "addMessages", {"messages": [{"to": "user@example.com"}]} ) # Returns: {"ok": True, "count": 1} # Direct endpoint access messages_endpoint = dispatcher.get_endpoint("messages") await messages_endpoint.add_batch(messages=[...])
- COMMAND_MAP: dict[str, tuple[str, str]] = {'activate': ('tenants', 'activate_batch'), 'addAccount': ('accounts', 'add'), 'addMessages': ('messages', 'add_batch'), 'addTenant': ('tenants', 'add'), 'cleanupMessages': ('messages', 'cleanup'), 'deleteAccount': ('accounts', 'delete'), 'deleteMessages': ('messages', 'delete_batch'), 'deleteTenant': ('tenants', 'delete'), 'getInstance': ('instance', 'get'), 'getTenant': ('tenants', 'get'), 'listAccounts': ('accounts', 'list'), 'listMessages': ('messages', 'list'), 'listTenants': ('tenants', 'list'), 'listTenantsSyncStatus': ('instance', 'get_sync_status'), 'suspend': ('tenants', 'suspend_batch'), 'updateInstance': ('instance', 'update'), 'updateTenant': ('tenants', 'update')}
- async dispatch(cmd, payload)[source]
Dispatch a command to the appropriate endpoint method.
- Parameters:
- Returns:
True/False, …}.
- Return type:
Result dict in legacy format {“ok”
Example
result = await dispatcher.dispatch( "addTenant", {"id": "acme", "name": "Acme Corp"} ) if result["ok"]: print(f"Created tenant: {result['id']}")
- core.mail_proxy.interface.endpoint_base.POST(method)[source]
Decorator to mark an endpoint method as POST.
POST methods receive parameters via JSON request body instead of query parameters.
- Parameters:
method (
Callable) – The async method to decorate.- Return type:
- Returns:
The decorated method with _http_post attribute set.
Example
@POST async def add(self, id: str, data: dict) -> dict: """Add item with complex data.""" ...
Entities
Each entity follows the table + endpoint pattern for database persistence and API exposure.
Account Entity
SMTP account configuration table manager.
This module provides the AccountsTable class for managing SMTP server configurations in a multi-tenant environment. Each account belongs to a tenant and defines connection parameters for outgoing email delivery.
The table uses a UUID primary key (pk) with a unique constraint on (tenant_id, id) for multi-tenant isolation. This allows each tenant to use their own account identifiers without conflicts.
Example
Basic account management:
from core.mail_proxy.proxy_base import MailProxyBase
proxy = MailProxyBase(db_path=":memory:")
await proxy.init()
accounts = proxy.db.table("accounts")
# Add an SMTP account
pk = await accounts.add({
"id": "main",
"tenant_id": "acme",
"host": "smtp.gmail.com",
"port": 587,
"user": "sender@acme.com",
"password": "app-password",
"use_tls": True,
})
# Retrieve account
account = await accounts.get("acme", "main")
# List all accounts for a tenant
all_accounts = await accounts.list_all(tenant_id="acme")
- core.mail_proxy.entities.account.table.name
Table name in database (“accounts”).
- core.mail_proxy.entities.account.table.pkey
Primary key column name (“pk”).
Note
Enterprise Edition (EE) extends this class with PEC (Posta Elettronica Certificata) support via AccountsTable_EE mixin, adding IMAP polling for delivery receipts.
- class core.mail_proxy.entities.account.table.AccountsTable(db)[source]
Bases:
TableSMTP account configurations for outgoing email delivery.
Manages SMTP server connection parameters including host, port, credentials, TLS settings, and rate limits. Each account belongs to a tenant and is identified by a client-provided ID.
- The table schema includes:
pk: Internal UUID primary key
id: Client-provided account identifier (unique per tenant)
tenant_id: Foreign key to tenants table
host, port: SMTP server connection
user, password: Authentication credentials (password encrypted)
ttl: Connection cache TTL in seconds
limit_per_minute/hour/day: Rate limiting thresholds
limit_behavior: Action when rate exceeded (“defer” or “reject”)
use_tls: TLS/STARTTLS mode
batch_size: Max messages per connection
Example
Adding an account with rate limits:
pk = await accounts.add({ "id": "transactional", "tenant_id": "acme", "host": "smtp.sendgrid.net", "port": 587, "user": "apikey", "password": "SG.xxxxx", "use_tls": True, "limit_per_hour": 1000, "limit_per_day": 10000, "limit_behavior": "defer", })
- Parameters:
db (
SqlDb)
- create_table_sql()[source]
Generate CREATE TABLE statement with multi-tenant unique constraint.
Adds UNIQUE (tenant_id, id) constraint to ensure each tenant has unique account identifiers while allowing the same ID across different tenants.
- Return type:
- Returns:
SQL CREATE TABLE statement with UNIQUE constraint.
- configure()[source]
Define table columns.
- Return type:
- Columns:
pk: UUID primary key (auto-generated on insert). id: Client account identifier (required). tenant_id: Owning tenant (FK to tenants, required). host: SMTP server hostname (required). port: SMTP server port (required). user: SMTP username for authentication. password: SMTP password (encrypted at rest). ttl: Connection cache TTL in seconds (default: 300). limit_per_minute: Max emails per minute. limit_per_hour: Max emails per hour. limit_per_day: Max emails per day. limit_behavior: Rate limit action (“defer” or “reject”). use_tls: TLS mode (1=STARTTLS, 0=none, NULL=auto). batch_size: Messages per SMTP connection. created_at: Record creation timestamp. updated_at: Last modification timestamp.
Note
EE columns (is_pec_account, imap_*) are added by AccountsTable_EE.configure() when enterprise package is installed.
- async migrate_from_legacy_schema()[source]
Migrate from composite primary key to UUID primary key.
Legacy databases used PRIMARY KEY (tenant_id, id). This migration adds a UUID ‘pk’ column as the new primary key while preserving the UNIQUE constraint on (tenant_id, id).
- The migration process:
Create new table with UUID pk column
Copy existing rows, generating UUIDs
Drop old table and rename new table
- Return type:
- Returns:
True if migration was performed, False if not needed (pk column already exists or table doesn’t exist).
Note
Safe to call on every startup. Skips silently if migration is not needed.
- async add(acc)[source]
Insert or update an SMTP account configuration.
Performs an upsert based on (tenant_id, id). If the account exists, updates all fields. If new, generates a UUID for the pk column.
- Parameters:
acc (
dict[str,Any]) – Account configuration dict with keys: - id (required): Client account identifier. - tenant_id (required): Owning tenant ID. - host (required): SMTP server hostname. - port (required): SMTP server port. - user: SMTP username. - password: SMTP password (will be encrypted). - ttl: Connection cache TTL (default: 300). - limit_per_minute/hour/day: Rate limits. - limit_behavior: “defer” or “reject” (default: “defer”). - use_tls: True/False/None for TLS mode. - batch_size: Messages per connection. - is_pec_account: True for PEC accounts (EE). - imap_*: IMAP settings for PEC (EE).- Return type:
- Returns:
The account’s internal UUID (pk).
Example
pk = await accounts.add({ "id": "marketing", "tenant_id": "acme", "host": "smtp.mailgun.org", "port": 587, "user": "postmaster@acme.com", "password": "secret", "use_tls": True, "limit_per_hour": 500, })
- async get(tenant_id, account_id)[source]
Retrieve a single SMTP account by tenant and ID.
- Parameters:
- Return type:
- Returns:
Account dict with use_tls converted to bool/None.
- Raises:
ValueError – If account not found for this tenant.
Example
try: account = await accounts.get("acme", "main") print(f"SMTP host: {account['host']}") except ValueError: print("Account not found")
- async list_all(tenant_id=None)[source]
List SMTP accounts, optionally filtered by tenant.
- Parameters:
tenant_id (
str|None) – Filter by tenant ID. If None, returns all accounts.- Return type:
- Returns:
List of account dicts ordered by ID, with boolean fields decoded.
Example
# All accounts for a tenant acme_accounts = await accounts.list_all(tenant_id="acme") # All accounts across all tenants (admin view) all_accounts = await accounts.list_all()
SMTP account REST API endpoint.
This module provides the AccountEndpoint class exposing CRUD operations for SMTP account configurations via REST API and CLI commands.
The endpoint is designed for automatic introspection by api_base and cli_base modules, which generate FastAPI routes and Typer commands from method signatures using inspect and pydantic.create_model.
Example
Register endpoint with the API router:
from core.mail_proxy.entities.account import AccountEndpoint
endpoint = AccountEndpoint(proxy.db.table("accounts"))
# Routes auto-generated: POST /accounts, GET /accounts/{id}, etc.
CLI commands auto-generated:
mail-proxy accounts add --tenant-id acme --id main --host smtp.example.com --port 587
mail-proxy accounts list --tenant-id acme
mail-proxy accounts get --tenant-id acme --account-id main
mail-proxy accounts delete --tenant-id acme --account-id main
Note
Enterprise Edition (EE) extends this with AccountEndpoint_EE mixin adding PEC-specific fields (is_pec_account, imap_* settings).
- class core.mail_proxy.entities.account.endpoint.AccountEndpoint(table)[source]
Bases:
BaseEndpointREST API endpoint for SMTP account management.
Provides CRUD operations for SMTP accounts. Each method is introspected to auto-generate API routes and CLI commands.
- name
Endpoint name used in URL paths (“accounts”).
- table
AccountsTable instance for database operations.
Example
Using the endpoint programmatically:
endpoint = AccountEndpoint(db.table("accounts")) # Add account account = await endpoint.add( id="main", tenant_id="acme", host="smtp.gmail.com", port=587, user="sender@acme.com", password="app-password", use_tls=True, ) # List accounts accounts = await endpoint.list(tenant_id="acme")
- Parameters:
table (
AccountsTable)
- __init__(table)[source]
Initialize endpoint with table reference.
- Parameters:
table (
AccountsTable) – AccountsTable instance for database operations.
- async add(id, tenant_id, host, port, user=None, password=None, use_tls=True, batch_size=None, ttl=300, limit_per_minute=None, limit_per_hour=None, limit_per_day=None, limit_behavior='defer')[source]
Add or update an SMTP account configuration.
Performs upsert: creates new account or updates existing one based on (tenant_id, id) composite key.
- Parameters:
id (
str) – Account identifier (unique within tenant).tenant_id (
str) – Owning tenant ID.host (
str) – SMTP server hostname.port (
int) – SMTP server port (typically 25, 465, or 587).use_tls (
bool) – Enable STARTTLS (True) or plain connection (False).ttl (
int) – Connection cache TTL in seconds.limit_behavior (
Literal['defer','reject']) – Action when rate exceeded (“defer” or “reject”).
- Return type:
- Returns:
Complete account record after insert/update.
- async get(tenant_id, account_id)[source]
Retrieve a single SMTP account by tenant and ID.
- Parameters:
- Return type:
- Returns:
Account configuration dict.
- Raises:
ValueError – If account not found.
Tenant Entity
Tenant configuration table manager.
This module provides the TenantsTable class for managing tenant configurations in a multi-tenant mail proxy environment.
In Community Edition (CE), a single “default” tenant is used implicitly. Enterprise Edition (EE) extends with full multi-tenant management via TenantsTable_EE mixin, adding API key authentication and tenant CRUD.
- Each tenant can configure:
Client authentication (for callback URLs)
Rate limits (per minute/hour/day)
Large file handling (warn/reject/rewrite)
Batch suspension (pause specific campaigns)
Example
Basic tenant operations:
from core.mail_proxy.proxy_base import MailProxyBase
proxy = MailProxyBase(db_path=":memory:")
await proxy.init()
tenants = proxy.db.table("tenants")
# Ensure default tenant exists (CE mode)
await tenants.ensure_default()
# Get tenant config
tenant = await tenants.get("default")
# Suspend a batch
await tenants.suspend_batch("default", "newsletter-q1")
# Activate all batches
await tenants.activate_batch("default")
Note
Enterprise Edition (EE) extends this class with TenantsTable_EE mixin, adding: add(), list_all(), update_fields(), remove(), create_api_key(), revoke_api_key(), get_tenant_by_token().
- class core.mail_proxy.entities.tenant.table.TenantsTable(db)[source]
Bases:
TableTenant configuration storage table.
Manages tenant settings including client authentication, rate limits, and batch suspension for campaign control.
- name
Table name (“tenants”).
- pkey
Primary key column (“id”).
- Table Schema:
id: Tenant identifier (primary key)
name: Display name
client_auth: JSON dict with HTTP auth config for callbacks
client_base_url: Base URL for client callbacks
client_sync_path: Path for sync endpoint
client_attachment_path: Path for attachment endpoint
rate_limits: JSON dict with per-minute/hour/day limits
large_file_config: JSON dict for large attachment handling
active: 0/1 flag for tenant status
suspended_batches: Comma-separated batch codes or “*” for all
api_key_hash: Hashed API key (EE only)
api_key_expires_at: API key expiration (EE only)
created_at, updated_at: Timestamps
Example
Work with tenant configuration:
tenants = proxy.db.table("tenants") # Get tenant tenant = await tenants.get("acme") # Check batch suspension is_suspended = tenants.is_batch_suspended( tenant["suspended_batches"], "campaign-001", ) # Suspend a batch await tenants.suspend_batch("acme", "campaign-001")
- Parameters:
db (
SqlDb)
- configure()[source]
Define table columns.
- Return type:
- Columns:
id: Tenant identifier (primary key string). name: Human-readable tenant name. client_auth: JSON dict with auth method, credentials for callbacks. client_base_url: Base URL for client HTTP callbacks. client_sync_path: Path appended to base_url for sync endpoint. client_attachment_path: Path for attachment fetch endpoint. rate_limits: JSON dict with per_minute, per_hour, per_day limits. large_file_config: JSON dict with threshold, action settings. active: 1=active, 0=disabled (INTEGER for SQLite). suspended_batches: Comma-separated batch codes or “*” for all. api_key_hash: Bcrypt hash of API key (EE only). api_key_expires_at: API key expiration timestamp (EE only). created_at: Row creation timestamp. updated_at: Last modification timestamp.
- is_batch_suspended(suspended_batches, batch_code)[source]
Check if a batch is suspended.
- Parameters:
- Return type:
- Returns:
True if the message should be skipped.
Note
“*” suspends all messages regardless of batch_code
Messages without batch_code are only suspended by “*”
Specific batch codes must match exactly
Example
# All suspended is_batch_suspended("*", "any-batch") # True is_batch_suspended("*", None) # True # Specific batches is_batch_suspended("batch1,batch2", "batch1") # True is_batch_suspended("batch1,batch2", "batch3") # False is_batch_suspended("batch1,batch2", None) # False
- async ensure_default()[source]
Ensure the ‘default’ tenant exists for CE single-tenant mode.
Creates the default tenant without API key. In CE mode, all operations use the instance token. When upgrading to EE, admin can generate tenant token via create_api_key().
- Return type:
- async suspend_batch(tenant_id, batch_code=None)[source]
Suspend sending for a tenant.
- Suspended batches are skipped by the dispatcher. Use for:
Pausing a campaign (specific batch_code)
Emergency stop (batch_code=None suspends all)
- Parameters:
- Return type:
- Returns:
True if tenant found and updated, False if not found.
Example
# Suspend specific batch await tenants.suspend_batch("acme", "newsletter-q1") # Suspend all sending await tenants.suspend_batch("acme")
- async activate_batch(tenant_id, batch_code=None)[source]
Resume sending for a tenant.
Removes batch from suspension list. If batch_code is None, clears ALL suspensions.
- Parameters:
- Return type:
- Returns:
True if updated successfully. False if tenant not found or cannot remove single batch from “*”.
Note
Cannot remove a single batch when full suspension (“*”) is active. Must call activate_batch(tenant_id, None) first to clear all.
Example
# Activate specific batch await tenants.activate_batch("acme", "newsletter-q1") # Clear all suspensions await tenants.activate_batch("acme")
- async get_suspended_batches(tenant_id)[source]
Get suspended batch codes for a tenant.
- Parameters:
tenant_id (
str) – Tenant identifier.- Return type:
- Returns:
Set of batch codes, {“*”} if all suspended, or empty set.
Example
suspended = await tenants.get_suspended_batches("acme") if "*" in suspended: print("All sending suspended") elif "campaign-001" in suspended: print("Campaign 001 is suspended")
Tenant REST API endpoint.
This module provides the TenantEndpoint class exposing CRUD operations for tenant configurations via REST API and CLI commands.
The endpoint is designed for automatic introspection by api_base and cli_base modules, which generate FastAPI routes and Typer commands from method signatures.
Example
CLI commands auto-generated:
mail-proxy tenants add --id acme --name "Acme Corp"
mail-proxy tenants list
mail-proxy tenants get --tenant-id acme
mail-proxy tenants delete --tenant-id acme
mail-proxy tenants suspend-batch --tenant-id acme --batch-code newsletter
mail-proxy tenants activate-batch --tenant-id acme
Note
Enterprise Edition (EE) extends this with TenantEndpoint_EE mixin adding API key management (create_api_key, revoke_api_key).
- class core.mail_proxy.entities.tenant.endpoint.AuthMethod(value)[source]
-
Authentication methods for HTTP client callbacks.
- NONE
No authentication required.
- BEARER
Bearer token authentication.
- BASIC
HTTP Basic authentication.
- NONE = 'none'
- BEARER = 'bearer'
- BASIC = 'basic'
- class core.mail_proxy.entities.tenant.endpoint.LargeFileAction(value)[source]
-
Action when attachment exceeds size threshold.
- WARN
Log warning but proceed with sending.
- REJECT
Reject the message entirely.
- REWRITE
Store file externally and rewrite attachment URL.
- WARN = 'warn'
- REJECT = 'reject'
- REWRITE = 'rewrite'
- class core.mail_proxy.entities.tenant.endpoint.TenantEndpoint(table)[source]
Bases:
BaseEndpointREST API endpoint for tenant management.
Provides CRUD operations for tenant configurations including batch suspension for campaign control.
- name
Endpoint name used in URL paths (“tenants”).
- table
TenantsTable instance for database operations.
Example
Using the endpoint programmatically:
endpoint = TenantEndpoint(db.table("tenants")) # Add tenant tenant = await endpoint.add( id="acme", name="Acme Corp", client_base_url="https://acme.com", ) # Suspend a batch await endpoint.suspend_batch(tenant_id="acme", batch_code="newsletter") # List tenants tenants = await endpoint.list()
- Parameters:
table (
TenantsTable)
- __init__(table)[source]
Initialize endpoint with table reference.
- Parameters:
table (
TenantsTable) – TenantsTable instance for database operations.
- async add(id, name=None, client_auth=None, client_base_url=None, client_sync_path=None, client_attachment_path=None, rate_limits=None, large_file_config=None, active=True)[source]
Add or update a tenant configuration.
- Parameters:
id (
str) – Tenant identifier (unique).client_auth (
dict[str,Any] |None) – HTTP auth config for callbacks (method, credentials).client_base_url (
str|None) – Base URL for client HTTP callbacks.client_sync_path (
str|None) – Path for sync endpoint (default: /mail-proxy/sync).client_attachment_path (
str|None) – Path for attachments (default: /mail-proxy/attachments).rate_limits (
dict[str,Any] |None) – Rate limit config (per_minute, per_hour, per_day).large_file_config (
dict[str,Any] |None) – Large file handling (threshold, action).active (
bool) – Whether tenant is active.
- Return type:
- Returns:
Tenant dict, including api_key for new tenants (EE only).
- async get(tenant_id)[source]
Retrieve a single tenant configuration.
- Parameters:
tenant_id (
str) – Tenant identifier.- Return type:
- Returns:
Tenant configuration dict.
- Raises:
ValueError – If tenant not found.
- async update(tenant_id, name=None, client_auth=None, client_base_url=None, client_sync_path=None, client_attachment_path=None, rate_limits=None, large_file_config=None, active=None)[source]
Update tenant configuration fields.
Only provided fields are updated; None values are ignored.
- async suspend_batch(tenant_id, batch_code=None)[source]
Suspend sending for a tenant.
Suspended batches are skipped by the dispatcher.
- Parameters:
- Return type:
- Returns:
Dict with ok=True and suspended_batches list.
- Raises:
ValueError – If tenant not found.
- async activate_batch(tenant_id, batch_code=None)[source]
Resume sending for a tenant.
Removes batch from suspension list.
- Parameters:
- Return type:
- Returns:
Dict with ok=True and remaining suspended_batches.
- Raises:
ValueError – If tenant not found or cannot remove single batch from “*”.
- async get_suspended_batches(tenant_id)[source]
Get suspended batches for a tenant.
- Parameters:
tenant_id (
str) – Tenant identifier.- Return type:
- Returns:
Dict with ok=True and suspended_batches list.
- Raises:
ValueError – If tenant not found.
- core.mail_proxy.entities.tenant.endpoint.get_tenant_attachment_url(tenant)[source]
Build full attachment fetch URL from tenant config.
- Parameters:
- Return type:
- Returns:
Full URL (base_url + attachment_path) or None if no base_url.
Example
tenant = {"client_base_url": "https://acme.com"} url = get_tenant_attachment_url(tenant) # Returns: "https://acme.com/mail-proxy/attachments" (default path)
- core.mail_proxy.entities.tenant.endpoint.get_tenant_sync_url(tenant)[source]
Build full sync callback URL from tenant config.
- Parameters:
- Return type:
- Returns:
Full URL (base_url + sync_path) or None if no base_url.
Example
tenant = {"client_base_url": "https://acme.com", "client_sync_path": "/api/sync"} url = get_tenant_sync_url(tenant) # Returns: "https://acme.com/api/sync"
Message Entity
Email message queue table manager.
This module provides the MessagesTable class for managing email messages in the delivery queue. Each message contains a JSON payload with email content and is associated with a tenant and SMTP account.
- Messages progress through states:
Pending: smtp_ts IS NULL, deferred_ts IS NULL
Deferred: smtp_ts IS NULL, deferred_ts IS NOT NULL
Processed: smtp_ts IS NOT NULL
The table uses a UUID primary key (pk) with a unique constraint on (tenant_id, id) for multi-tenant isolation and idempotent inserts.
Example
Queue and send messages:
from core.mail_proxy.proxy_base import MailProxyBase
proxy = MailProxyBase(db_path=":memory:")
await proxy.init()
messages = proxy.db.table("messages")
# Queue a message batch
result = await messages.insert_batch([{
"id": "msg-001",
"tenant_id": "acme",
"account_id": "main",
"payload": {
"from": "sender@acme.com",
"to": ["user@example.com"],
"subject": "Hello",
"body": "Test message",
},
}])
# Fetch messages ready for delivery
import time
ready = await messages.fetch_ready(limit=10, now_ts=int(time.time()))
# Mark as sent
for msg in ready:
await messages.mark_sent(msg["pk"], int(time.time()))
Note
Enterprise Edition (EE) extends this class with MessagesTable_EE mixin, adding PEC-specific methods for Italian certified email.
- class core.mail_proxy.entities.message.table.MessagesTable(db)[source]
Bases:
TableEmail message queue with scheduling and deferred delivery.
Manages the email delivery queue with support for priority ordering, deferred delivery (retry scheduling), batch operations, and multi-tenant isolation.
- name
Table name (“messages”).
- pkey
Primary key column (“pk”, UUID string).
- Table Schema:
pk: UUID primary key (generated on insert)
id: Client-provided message identifier (unique per tenant)
tenant_id: Tenant identifier for isolation
account_id: Legacy account reference (business key)
account_pk: FK to accounts.pk (UUID)
priority: Delivery priority (0=immediate, 1=high, 2=medium, 3=low)
payload: JSON email content (from, to, subject, body, etc.)
batch_code: Optional campaign/batch identifier
deferred_ts: Retry timestamp (NULL = ready for delivery)
smtp_ts: SMTP attempt timestamp (NULL = pending)
is_pec: PEC flag for Italian certified email (EE)
created_at, updated_at: Timestamps
Example
Basic message queue operations:
messages = proxy.db.table("messages") # Insert messages await messages.insert_batch([ {"id": "m1", "tenant_id": "t1", "account_id": "a1", "payload": {"from": "a@b.com", "to": ["c@d.com"], ...}}, ]) # Fetch ready messages ready = await messages.fetch_ready(limit=100, now_ts=now) # Mark sent/error await messages.mark_sent(pk, now) await messages.mark_error(pk, now)
- Parameters:
db (
SqlDb)
- create_table_sql()[source]
Generate CREATE TABLE with UNIQUE constraint.
Adds UNIQUE (tenant_id, id) constraint to ensure idempotent inserts and multi-tenant isolation.
- Return type:
- Returns:
SQL CREATE TABLE statement with UNIQUE constraint.
- configure()[source]
Define table columns.
- Return type:
- Columns:
pk: UUID primary key (generated via get_uuid()). id: Client message identifier (unique per tenant). tenant_id: Owning tenant (denormalized for query efficiency). account_id: Legacy account reference (business key). account_pk: FK to accounts.pk UUID. priority: Delivery priority (0-3, default 2). payload: JSON email content. batch_code: Optional batch/campaign identifier. created_at, updated_at: Timestamps. deferred_ts: Unix timestamp for retry scheduling. smtp_ts: Unix timestamp when SMTP was attempted. is_pec: PEC flag (1=awaiting receipts, EE only).
- async migrate_from_legacy_schema()[source]
Migrate from INTEGER pk to UUID pk schema.
Legacy databases used INTEGER autoincrement primary key. This migration adds UUID ‘pk’ column as new primary key.
- Return type:
- Returns:
True if migration performed, False if not needed.
- async migrate_account_pk()[source]
Populate account_pk from account_id + tenant_id.
Links messages to accounts via UUID instead of business key.
- Return type:
- Returns:
True if migration performed, False if not needed.
- async insert_batch(entries, pec_account_ids=None, tenant_id=None, auto_pec=True)[source]
Persist a batch of messages for delivery.
- Performs upsert based on (tenant_id, id):
New message: insert with generated UUID pk
Existing pending: update fields
Existing processed: skip (already sent)
- Parameters:
entries (
Sequence[dict[str,Any]]) – List of message dicts. Each must have: - id: Client message identifier - tenant_id or use tenant_id param - account_id: SMTP account identifier - payload: Email content dict Optional: priority, deferred_ts, batch_code, account_pk.pec_account_ids (
set[str] |None) – Set of account IDs that are PEC accounts. If None and auto_pec=True, fetched from accounts table.tenant_id (
str|None) – Default tenant ID if not in each entry.auto_pec (
bool) – If True, auto-fetch PEC account IDs.
- Returns:
msg_id, “pk”: pk} for inserted/updated messages.
- Return type:
List of {“id”
Example
result = await messages.insert_batch([ { "id": "msg-001", "tenant_id": "acme", "account_id": "main", "payload": { "from": "sender@acme.com", "to": ["user@example.com"], "subject": "Test", "body": "Hello", }, }, ]) # Returns: [{"id": "msg-001", "pk": "uuid-..."}]
- async fetch_ready(*, limit, now_ts, priority=None, min_priority=None)[source]
Fetch messages ready for SMTP delivery.
Returns pending messages ordered by priority and creation time. Excludes messages from suspended tenants/batches.
- Parameters:
- Return type:
- Returns:
List of message dicts with decoded payload.
Note
- Suspension logic:
tenant.suspended_batches = “*”: all messages skipped
tenant.suspended_batches contains batch_code: skipped
Messages without batch_code: only skipped when “*”
- async set_deferred(pk, deferred_ts)[source]
Schedule message for retry at specified timestamp.
Resets smtp_ts to NULL so message becomes pending again.
- async get_ids_for_tenant(ids, tenant_id)[source]
Get message IDs that belong to a tenant.
Validates ownership by checking tenant_id in accounts table.
- async remove_fully_reported_before(threshold_ts)[source]
Delete messages whose events are all reported before threshold.
- A message can be removed when:
It has been processed (smtp_ts IS NOT NULL)
All its events have been reported
Most recent reported_ts is older than threshold
- async remove_fully_reported_before_for_tenant(threshold_ts, tenant_id)[source]
Delete fully reported messages for a tenant.
- async list_all(*, tenant_id=None, active_only=False, include_history=False)[source]
List messages with optional filters.
Email message REST API endpoint.
This module provides the MessageEndpoint class exposing CRUD operations for email messages via REST API and CLI commands.
The endpoint is designed for automatic introspection by api_base and cli_base modules, which generate FastAPI routes and Typer commands from method signatures.
Example
CLI commands auto-generated:
mail-proxy messages add --tenant-id acme --id msg-001 --account-id main ...
mail-proxy messages list --tenant-id acme
mail-proxy messages get --message-id msg-001 --tenant-id acme
mail-proxy messages delete --message-pk uuid-...
mail-proxy messages add-batch --messages '[{...}, {...}]'
mail-proxy messages cleanup --tenant-id acme
Note
Enterprise Edition (EE) extends this with MessageEndpoint_EE mixin adding PEC-specific status information.
- class core.mail_proxy.entities.message.endpoint.AttachmentPayload(**data)[source]
Bases:
BaseModelEmail attachment specification.
- filename
Attachment filename (may contain MD5 marker).
- storage_path
Content location. Format depends on fetch_mode: - endpoint: query params (e.g., “doc_id=123”) - http_url: full URL (e.g., “https://files.example.com/file.pdf”) - base64: base64-encoded content - filesystem: absolute path (e.g., “/var/attachments/file.pdf”)
- mime_type
Optional MIME type override.
- fetch_mode
Explicit fetch mode. If not provided, inferred from path.
- content_md5
MD5 hash for cache lookup.
- auth
Optional authentication override for HTTP requests.
Example
Attachment from HTTP URL:
AttachmentPayload( filename="report.pdf", storage_path="https://cdn.example.com/docs/report.pdf", fetch_mode=FetchMode.HTTP_URL, )
- Parameters:
data (
Any)
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- filename: Annotated[str, FieldInfo(annotation=NoneType, required=True, description='Attachment filename', metadata=[MinLen(min_length=1), MaxLen(max_length=255)])]
- storage_path: Annotated[str, FieldInfo(annotation=NoneType, required=True, description='Storage path', metadata=[MinLen(min_length=1)])]
- mime_type: Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='MIME type override')]
- fetch_mode: Annotated[FetchMode | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Fetch mode')]
- class core.mail_proxy.entities.message.endpoint.FetchMode(value)[source]
-
Source mode for fetching email attachments.
- ENDPOINT
Fetch from configured HTTP endpoint with path parameter.
- HTTP_URL
Fetch directly from a full HTTP/HTTPS URL.
- BASE64
Inline base64-encoded content.
- FILESYSTEM
Fetch from local filesystem path.
- ENDPOINT = 'endpoint'
- HTTP_URL = 'http_url'
- BASE64 = 'base64'
- FILESYSTEM = 'filesystem'
- class core.mail_proxy.entities.message.endpoint.MessageEndpoint(table)[source]
Bases:
BaseEndpointREST API endpoint for email message management.
Provides CRUD operations for email messages including batch operations for bulk insert/delete and cleanup of old messages.
- name
Endpoint name used in URL paths (“messages”).
- table
MessagesTable instance for database operations.
Example
Using the endpoint programmatically:
endpoint = MessageEndpoint(db.table("messages")) # Add a single message msg = await endpoint.add( id="msg-001", tenant_id="acme", account_id="main", from_addr="sender@acme.com", to=["user@example.com"], subject="Test", body="Hello", ) # List messages messages = await endpoint.list(tenant_id="acme")
- Parameters:
table (
MessagesTable)
- __init__(table)[source]
Initialize endpoint with table reference.
- Parameters:
table (
MessagesTable) – MessagesTable instance for database operations.
- async add(id, tenant_id, account_id, from_addr, to, subject, body, cc=None, bcc=None, reply_to=None, return_path=None, content_type='plain', message_id=None, priority=2, deferred_ts=None, batch_code=None, attachments=None, headers=None)[source]
Add a new message to the delivery queue.
- Parameters:
id (
str) – Unique message identifier (client-provided).tenant_id (
str) – Tenant identifier.account_id (
str) – SMTP account to use for sending.from_addr (
str) – Sender email address.subject (
str) – Email subject line.body (
str) – Email body content.return_path (
str|None) – Return-Path (envelope sender) address.content_type (
Literal['plain','html']) – Body content type (“plain” or “html”).priority (
int) – Delivery priority (0=immediate, 1=high, 2=medium, 3=low).deferred_ts (
int|None) – Unix timestamp to defer delivery until.batch_code (
str|None) – Batch/campaign identifier for grouping.attachments (
list[dict[str,Any]] |None) – List of attachment specifications.
- Return type:
- Returns:
Dict with message id and pk.
- Raises:
ValueError – If message could not be added.
- async get(message_id, tenant_id)[source]
Retrieve a single message by ID.
- Parameters:
- Return type:
- Returns:
Message dict with status and payload.
- Raises:
ValueError – If message not found.
- async list(tenant_id, active_only=False, include_history=False)[source]
List messages for a tenant.
- async count_active()[source]
Count messages awaiting delivery.
- Return type:
- Returns:
Number of active (pending) messages.
- async count_pending_for_tenant(tenant_id, batch_code=None)[source]
Count pending messages for a tenant.
- async add_batch(messages, default_priority=None)[source]
Add multiple messages in a single operation.
Validates each message and queues valid ones. Invalid messages are reported in the rejected list.
- Parameters:
messages (
list[dict[str,Any]]) – List of message dicts. Each must have: - id: Unique message identifier - tenant_id: Tenant identifier - account_id: SMTP account to use - from (or from_addr): Sender address - to: Recipient(s) - subject: Email subject Optional: body, cc, bcc, reply_to, return_path, content_type, message_id, priority, deferred_ts, batch_code, attachments, headers.default_priority (
int|None) – Default priority for messages without explicit priority.
- Returns:
ok: True
queued: Number of messages queued
rejected: List of {id, reason} for invalid messages
- Return type:
Dict with
Example
result = await endpoint.add_batch([ {"id": "m1", "tenant_id": "t1", "account_id": "a1", "from": "a@b.com", "to": ["c@d.com"], "subject": "Hi"}, {"id": "m2", ...}, ]) # Returns: {"ok": True, "queued": 2, "rejected": []}
- async delete_batch(tenant_id, ids)[source]
Delete multiple messages by their IDs.
Validates ownership before deletion.
- class core.mail_proxy.entities.message.endpoint.MessageStatus(value)[source]
-
Current delivery status of an email message.
- PENDING
Queued and waiting for delivery attempt.
- DEFERRED
Temporarily delayed (rate limit or soft error).
- SENT
Successfully delivered to SMTP server.
- ERROR
Delivery failed with permanent error.
- PENDING = 'pending'
- DEFERRED = 'deferred'
- SENT = 'sent'
- ERROR = 'error'
Message Event Entity
Message event table for delivery tracking.
This module implements event-based tracking for message lifecycle changes. Each significant state change (sent, error, deferred, bounce, PEC receipts) is recorded as a separate event, enabling complete delivery history.
- Components:
MessageEventTable: Table manager for message events.
- Event Types:
deferred: Message was deferred (rate limit, temporary failure)
sent: Message successfully delivered via SMTP
error: Permanent delivery failure
bounce: Bounce notification received (EE)
pec_acceptance: PEC acceptance receipt (EE)
pec_delivery: PEC delivery receipt (EE)
pec_error: PEC error notification (EE)
Example
Record and query events:
from core.mail_proxy.proxy_base import MailProxyBase
proxy = MailProxyBase(db_path=":memory:")
await proxy.init()
events = proxy.db.table("message_events")
# Record a sent event
await events.add_event(
message_pk="550e8400-e29b-41d4-a716-446655440000",
event_type="sent",
event_ts=1704067200,
description="250 OK id=1abc-2def",
)
# Fetch unreported events for sync
unreported = await events.fetch_unreported(limit=100)
# Mark as reported
event_ids = [e["event_id"] for e in unreported]
await events.mark_reported(event_ids, reported_ts=1704067260)
Note
Events are linked to messages via message_pk (UUID primary key). The trigger_on_inserted method automatically updates message status when events are recorded.
- class core.mail_proxy.entities.message_event.table.MessageEventTable(db)[source]
Bases:
TableMessage events storage table.
Records delivery events for complete message history and reporting. Each event links to a message via message_pk and includes timestamp, type, description, and optional metadata.
- name
Table name (“message_events”).
- pkey
Primary key column (“id”, auto-increment INTEGER).
- Table Schema:
id: Auto-increment primary key
message_pk: FK to messages.pk (UUID)
event_type: Event category (sent, error, deferred, etc.)
event_ts: Unix timestamp when event occurred
description: Error message, bounce reason, etc.
metadata: JSON for extra data (bounce_type, deferred_ts)
reported_ts: When event was synced to client
Example
Query message history:
events = proxy.db.table("message_events") # Get all events for a message history = await events.get_events_for_message( "550e8400-e29b-41d4-a716-446655440000" ) for event in history: print(f"{event['event_type']} at {event['event_ts']}") # Check pending reports count = await events.count_unreported_for_message(message_pk)
- Parameters:
db (
SqlDb)
- configure()[source]
Define table columns.
- Return type:
- Columns:
id: Auto-increment primary key (INTEGER). message_pk: Reference to messages.pk (UUID string). event_type: Event category for filtering. event_ts: Unix timestamp of event occurrence. description: Human-readable event details. metadata: JSON-encoded extra data. reported_ts: Unix timestamp when synced to client.
- async trigger_on_inserted(record)[source]
Update message status based on event type.
Called automatically after event insert. Updates the message’s state in the messages table based on event_type.
- Parameters:
record (
dict[str,Any]) – Inserted event record with event_type and message_pk.- Return type:
Note
“sent” → marks message as sent
“error” → marks message as error
“deferred” → sets deferred_ts from metadata or event_ts
- async add_event(message_pk, event_type, event_ts, description=None, metadata=None)[source]
Record a message event.
- Parameters:
message_pk (
str) – Message’s internal pk (UUID).event_type (
str) – Event category (sent, error, deferred, bounce, pec_*).event_ts (
int) – Unix timestamp when event occurred.description (
str|None) – Optional error message or reason.metadata (
dict[str,Any] |None) – Optional dict serialized as JSON.
- Return type:
- Returns:
Number of rows inserted (typically 1).
Note
Triggers are called automatically after insert, updating message status based on event_type.
- async fetch_unreported(limit)[source]
Fetch events not yet reported to clients.
- Parameters:
limit (
int) – Maximum number of events to return.- Return type:
- Returns:
List of event dicts with message_id for external reporting, ordered chronologically by event_ts.
Example
unreported = await events.fetch_unreported(limit=100) for event in unreported: # event contains: event_id, message_id, event_type, # event_ts, description, metadata, account_id, tenant_id await report_to_client(event)
- async get_events_for_message(message_pk)[source]
Get all events for a message, ordered chronologically.
Instance Entity
Instance configuration table manager.
This module provides the InstanceTable class for managing instance-level configuration in a singleton pattern (single row with id=1).
- The instance table stores:
Service identity: name, api_token
Edition: “ce” (Community) or “ee” (Enterprise)
Flexible config: JSON storage for additional settings
- Configuration access follows a dual pattern:
Typed columns: name, api_token, edition (direct column access)
JSON config: Additional key-value pairs in config column
Example
Basic instance configuration:
from core.mail_proxy.proxy_base import MailProxyBase
proxy = MailProxyBase(db_path=":memory:")
await proxy.init()
instance = proxy.db.table("instance")
# Set instance name
await instance.set_name("production-mailer")
# Set API token
await instance.set_api_token("secret-token")
# Store additional config
await instance.set_config("host", "0.0.0.0")
await instance.set_config("port", "8080")
# Check edition
is_ee = await instance.is_enterprise()
Note
Enterprise Edition (EE) extends this class with InstanceTable_EE mixin, adding bounce detection IMAP configuration columns.
- class core.mail_proxy.entities.instance.table.InstanceTable(db)[source]
Bases:
TableSingleton table for instance-level configuration.
Stores instance-wide settings in a single row (id=1). Provides typed access to common settings and flexible JSON storage for additional configuration.
- name
Table name (“instance”).
- pkey
Primary key column (“id”).
- Table Schema:
id: Always 1 (singleton pattern)
name: Instance display name
api_token: Master API token for authentication
edition: “ce” (Community) or “ee” (Enterprise)
config: JSON storage for additional settings
created_at: Creation timestamp
updated_at: Last modification timestamp
Example
Configure instance settings:
instance = proxy.db.table("instance") # Typed column access await instance.set_name("my-mailer") name = await instance.get_name() # JSON config access await instance.set_config("max_workers", "4") workers = await instance.get_config("max_workers") # Get all config merged all_config = await instance.get_all_config()
- Parameters:
db (
SqlDb)
- configure()[source]
Define table columns.
- Return type:
- Columns:
id: Singleton ID (always 1, INTEGER primary key). name: Instance display name (default: “mail-proxy”). api_token: Master API token for authentication. edition: “ce” or “ee” (default: “ce”). config: JSON storage for additional key-value settings. created_at: Row creation timestamp. updated_at: Last modification timestamp.
Note
EE columns (bounce_*) are added by InstanceTable_EE.configure().
- async ensure_instance()[source]
Get or create the singleton instance configuration.
Creates the singleton row if it doesn’t exist.
- async update_instance(updates)[source]
Update the singleton instance configuration.
Example
await instance.update_instance({ "name": "production", "api_token": "new-token", })
- async get_name()[source]
Get instance display name.
- Return type:
- Returns:
Instance name, defaults to “mail-proxy” if not set.
- async get_edition()[source]
Get current edition.
- Return type:
- Returns:
“ce” (Community Edition) or “ee” (Enterprise Edition).
- async is_enterprise()[source]
Check if running in Enterprise Edition mode.
- Return type:
- Returns:
True if edition is “ee”, False otherwise.
- async set_edition(edition)[source]
Set edition.
- Parameters:
edition (
str) – “ce” (Community) or “ee” (Enterprise).- Raises:
ValueError – If edition is not “ce” or “ee”.
- Return type:
- async get_config(key, default=None)[source]
Get a configuration value by key.
- Uses dual access pattern:
Keys in _TYPED_CONFIG_KEYS: read from typed columns
Other keys: read from JSON config column
- Parameters:
- Return type:
- Returns:
Configuration value as string, or default.
Example
# Typed column name = await instance.get_config("name") # JSON config port = await instance.get_config("port", "8080")
- async set_config(key, value)[source]
Set a configuration value.
- Uses dual access pattern:
Keys in _TYPED_CONFIG_KEYS: save to typed columns
Other keys: save to JSON config column
- Parameters:
- Return type:
Example
# Typed column await instance.set_config("name", "production") # JSON config await instance.set_config("host", "0.0.0.0")
- async get_all_config()[source]
Get all configuration values merged.
Returns typed columns and JSON config merged into a single dict. JSON config values override typed columns if same key exists.
Example
config = await instance.get_all_config() # Returns: {"name": "my-mailer", "edition": "ce", "host": "0.0.0.0", ...}
Instance REST API endpoint for service-level operations.
This module provides the InstanceEndpoint class exposing service-level operations for the mail proxy via REST API and CLI commands.
- Operations include:
health: Container orchestration health check (unauthenticated)
status: Authenticated service status with active state
run_now: Trigger immediate dispatch cycle
suspend/activate: Control message sending per tenant/batch
get/update: Instance configuration management
get_sync_status: Monitor tenant synchronization health
upgrade_to_ee: Transition from Community to Enterprise Edition
Example
CLI commands auto-generated:
mail-proxy instance health
mail-proxy instance status
mail-proxy instance run-now --tenant-id acme
mail-proxy instance suspend --tenant-id acme
mail-proxy instance activate --tenant-id acme
mail-proxy instance get
mail-proxy instance update --name production
mail-proxy instance get-sync-status
mail-proxy instance upgrade-to-ee
Note
Enterprise Edition (EE) extends this with InstanceEndpoint_EE mixin adding bounce detection configuration operations.
- class core.mail_proxy.entities.instance.endpoint.InstanceEndpoint(table, proxy=None)[source]
Bases:
BaseEndpointREST API endpoint for instance-level operations.
Provides service management operations including health checks, dispatch control, and configuration management.
- name
Endpoint name used in URL paths (“instance”).
- table
InstanceTable instance for configuration storage.
- proxy
Optional MailProxy instance for service operations.
Example
Using the endpoint programmatically:
endpoint = InstanceEndpoint(db.table("instance"), proxy) # Check service health health = await endpoint.health() # Trigger dispatch await endpoint.run_now(tenant_id="acme") # Update configuration await endpoint.update(name="production")
- Parameters:
table (
InstanceTable)
- __init__(table, proxy=None)[source]
Initialize endpoint with table and optional proxy reference.
- Parameters:
table (
InstanceTable) – InstanceTable for configuration storage.proxy (
object|None) – Optional MailProxy instance for service operations. When provided, enables run_now, suspend, activate, and get_sync_status to interact with the running service.
- async health()[source]
Health check for container orchestration.
Lightweight endpoint for liveness/readiness probes. Does not require authentication. Returns immediately without database access.
- Return type:
- Returns:
Dict with status “ok”.
Example
# Kubernetes liveness probe # GET /instance/health {"status": "ok"}
- async status()[source]
Authenticated service status.
Returns the current active state of the mail proxy service. Requires authentication.
- Return type:
- Returns:
Dict with ok=True and active boolean indicating if the dispatch loop is running.
- async run_now(tenant_id=None)[source]
Trigger immediate dispatch cycle.
Resets the tenant’s sync timer, causing the next dispatch loop iteration to process messages immediately.
- async suspend(tenant_id, batch_code=None)[source]
Suspend message sending for a tenant.
Prevents messages from being dispatched for the specified tenant or batch. Messages remain in queue and will be sent when activated.
- async activate(tenant_id, batch_code=None)[source]
Resume message sending for a tenant.
Removes suspension for the specified tenant or batch, allowing queued messages to be dispatched.
- async get()[source]
Get instance configuration.
- Return type:
- Returns:
Dict with ok=True and all instance configuration fields.
- async get_sync_status()[source]
Get sync status for all tenants.
Returns synchronization health information for each tenant, useful for monitoring and debugging delivery issues.
- Returns:
id: Tenant identifier
last_sync_ts: Unix timestamp of last sync
next_sync_due: True if sync interval has expired
in_dnd: True if tenant is in Do Not Disturb mode
- Return type:
Dict with ok=True and tenants list. Each tenant contains
- async upgrade_to_ee()[source]
Upgrade from Community Edition to Enterprise Edition.
- Performs explicit upgrade from CE to EE mode:
Verifies Enterprise modules are installed
Sets edition=”ee” in instance configuration
Optionally generates API key for “default” tenant
The upgrade is idempotent - calling when already EE is safe.
- Return type:
- Returns:
Dict with ok=True, edition, optional default_tenant_token, and descriptive message.
- Raises:
ValueError – If Enterprise modules are not installed.
Command Log Entity
Command log table for API audit trail.
This module implements an audit log for API commands, recording every state-modifying operation with its full payload. This enables:
Debugging: Trace what happened and when
Migration: Replay commands to reconstruct state
Recovery: Restore from command history after failures
Compliance: Audit trail for regulatory requirements
- Logged commands include:
POST /commands/add-messages: Queue messages for delivery
POST /commands/delete-messages: Remove messages from queue
POST /commands/cleanup-messages: Remove old reported messages
POST /account: Create/update SMTP account
DELETE /account/{id}: Remove SMTP account
POST /tenant: Create/update tenant
DELETE /tenant/{id}: Remove tenant
Example
Log an API command:
from core.mail_proxy.proxy_base import MailProxyBase
proxy = MailProxyBase(db_path=":memory:")
await proxy.init()
log = proxy.db.table("command_log")
# Record a command
cmd_id = await log.log_command(
endpoint="POST /commands/add-messages",
payload={"messages": [{"to": "user@example.com"}]},
tenant_id="acme",
response_status=200,
)
# List recent commands
commands = await log.list_commands(tenant_id="acme", limit=10)
# Export for replay
export = await log.export_commands(tenant_id="acme")
- class core.mail_proxy.entities.command_log.table.CommandLogTable(db)[source]
Bases:
TableAudit log table for API command tracking.
Records every state-modifying API command with timestamp, endpoint, tenant context, request payload, and response status. Commands are stored chronologically for replay and debugging.
- name
Table name (“command_log”).
- pkey
Primary key column (“id”, auto-increment INTEGER).
- Table Schema:
id: Auto-increment primary key
command_ts: Unix timestamp of command
endpoint: HTTP method + path (e.g., “POST /commands/add-messages”)
tenant_id: Tenant context (nullable for global commands)
payload: JSON request body
response_status: HTTP status code
response_body: JSON response summary
Example
Query command history:
log = proxy.db.table("command_log") # Get last 24 hours of commands for a tenant import time since = int(time.time()) - 86400 commands = await log.list_commands( tenant_id="acme", since_ts=since, ) # Export for migration export = await log.export_commands() # Returns: [{"endpoint": "...", "payload": {...}, ...}, ...]
- Parameters:
db (
SqlDb)
- configure()[source]
Define table columns.
- Return type:
- Columns:
id: Auto-increment primary key (INTEGER). command_ts: Unix timestamp of command execution. endpoint: HTTP method and path combined. tenant_id: Tenant context for multi-tenant filtering. payload: JSON-serialized request body. response_status: HTTP response status code. response_body: JSON-serialized response summary.
- async log_command(endpoint, payload, *, tenant_id=None, response_status=None, response_body=None, command_ts=None)[source]
Record an API command in the audit log.
- Parameters:
endpoint (
str) – HTTP method + path (e.g., “POST /commands/add-messages”).payload (
dict[str,Any]) – Request body as dict (will be JSON-serialized).tenant_id (
str|None) – Tenant context for multi-tenant commands.response_body (
dict[str,Any] |None) – Response body as dict (will be JSON-serialized).command_ts (
int|None) – Unix timestamp. Defaults to current time.
- Return type:
- Returns:
Auto-generated command log ID.
Example
cmd_id = await log.log_command( endpoint="POST /account", payload={"id": "main", "host": "smtp.example.com"}, tenant_id="acme", response_status=200, )
- async list_commands(*, tenant_id=None, since_ts=None, until_ts=None, endpoint_filter=None, limit=100, offset=0)[source]
List logged commands with optional filters.
- Parameters:
since_ts (
int|None) – Include commands with command_ts >= since_ts.until_ts (
int|None) – Include commands with command_ts <= until_ts.endpoint_filter (
str|None) – Filter by endpoint (SQL LIKE partial match).limit (
int) – Maximum number of results to return.offset (
int) – Skip first N results for pagination.
- Return type:
- Returns:
List of command records with parsed JSON fields, ordered by timestamp ascending.
Example
# Get add-messages commands for last hour import time since = int(time.time()) - 3600 commands = await log.list_commands( tenant_id="acme", since_ts=since, endpoint_filter="add-messages", )
- async export_commands(*, tenant_id=None, since_ts=None, until_ts=None)[source]
Export commands in replay-friendly format.
Returns minimal fields needed for replay: endpoint, tenant_id, payload, and command_ts (for ordering). Response data is excluded.
- Parameters:
- Return type:
- Returns:
List of command dicts suitable for replay.
Example
# Export all commands for migration export = await log.export_commands() # Replay on new instance for cmd in export: await api.call(cmd["endpoint"], cmd["payload"])
- async purge_before(threshold_ts)[source]
Delete command logs older than threshold.
Used for log rotation to prevent unbounded growth.
- Parameters:
threshold_ts (
int) – Delete commands with command_ts < threshold_ts.- Return type:
- Returns:
Number of deleted records.
Example
# Delete logs older than 30 days import time threshold = int(time.time()) - (30 * 86400) deleted = await log.purge_before(threshold) print(f"Purged {deleted} old log entries")
Command log REST API endpoint.
This module provides the CommandLogEndpoint class exposing operations for querying and managing the API command audit trail.
The endpoint is designed for automatic introspection by api_base and cli_base modules, which generate FastAPI routes and Typer commands from method signatures.
Example
CLI commands auto-generated:
mail-proxy command_log list --tenant-id acme --limit 50
mail-proxy command_log get --command-id 123
mail-proxy command_log export --tenant-id acme
mail-proxy command_log purge --threshold-ts 1700000000
Note
The command log is append-only during normal operation. Only the purge command removes entries, for log rotation purposes.
- class core.mail_proxy.entities.command_log.endpoint.CommandLogEndpoint(table)[source]
Bases:
BaseEndpointREST API endpoint for command log audit trail.
Provides read access to the command log with filtering and export capabilities, plus a purge operation for log rotation.
- name
Endpoint name used in URL paths (“command_log”).
- table
CommandLogTable instance for database operations.
Example
Using the endpoint programmatically:
endpoint = CommandLogEndpoint(db.table("command_log")) # List recent commands commands = await endpoint.list(tenant_id="acme", limit=10) # Export for backup export = await endpoint.export(tenant_id="acme")
- Parameters:
table (
CommandLogTable)
- __init__(table)[source]
Initialize endpoint with table reference.
- Parameters:
table (
CommandLogTable) – CommandLogTable instance for database operations.
- async list(tenant_id=None, since_ts=None, until_ts=None, endpoint_filter=None, limit=100, offset=0)[source]
List logged commands with optional filters.
- Parameters:
- Return type:
- Returns:
List of command records ordered by timestamp.
- async get(command_id)[source]
Retrieve a specific command by ID.
- Parameters:
command_id (
int) – Command log entry ID.- Return type:
- Returns:
Command record dict with parsed JSON fields.
- Raises:
ValueError – If command not found.
Storage Entity
Storages table: per-tenant storage backend configurations.
- class core.mail_proxy.entities.storage.table.StoragesTable(db)[source]
Bases:
TableStorages table: named storage backends per tenant.
Each tenant can have multiple named storage backends (e.g., HOME, SALES, ARCHIVE). CE supports only local filesystem; EE adds S3, GCS, Azure via fsspec.
Schema: pk (UUID), tenant_id, name (unique per tenant), protocol, config (JSON).
- Parameters:
db (
SqlDb)
Storage endpoint: CRUD operations for tenant storage backends.
Designed for introspection by api_base/cli_base to auto-generate routes/commands.
- class core.mail_proxy.entities.storage.endpoint.StorageEndpoint(table)[source]
Bases:
BaseEndpointStorage management endpoint. Methods are introspected for API/CLI generation.
- Parameters:
table (
StoragesTable)
- __init__(table)[source]
Initialize endpoint with table reference.
- Parameters:
table (
StoragesTable) – Database table instance for operations.
- async add(tenant_id, name, protocol, config=None)[source]
Add or update a storage backend for a tenant.
- Parameters:
- Return type:
- For local protocol:
config: {“base_path”: “/data/attachments”}
- For S3 protocol (EE only):
- config: {“bucket”: “my-bucket”, “prefix”: “attachments/”,
“aws_access_key_id”: “…”, “aws_secret_access_key”: “…”}
- For GCS protocol (EE only):
- config: {“bucket”: “my-bucket”, “prefix”: “attachments/”,
“project”: “…”, “token”: “…”}
- For Azure protocol (EE only):
- config: {“container”: “my-container”, “prefix”: “attachments/”,
“account_name”: “…”, “account_key”: “…”}
SMTP Components
SMTP Sender - main coordinator for email dispatch.
This module provides SmtpSender, the central component for SMTP email delivery. It coordinates: - Background dispatch loop for processing queued messages - Rate limiting per account - Email construction with attachments - SMTP connection management via pool - Retry logic with exponential backoff
SmtpSender is instantiated by MailProxy and accessed via proxy.smtp_sender.
Example
# SmtpSender is created by MailProxy proxy = MailProxy(db_path=”mail.db”) await proxy.start() # Starts smtp_sender automatically
# Direct access if needed proxy.smtp_sender.wake() # Trigger immediate dispatch cycle
- class core.mail_proxy.smtp.sender.SmtpSender(proxy)[source]
Bases:
objectCentral coordinator for SMTP email dispatch.
Manages the complete email sending pipeline: - Fetches ready messages from database queue - Builds EmailMessage objects with attachments - Applies rate limiting per account - Sends via SMTP connection pool - Handles retries for temporary failures - Records delivery events
- proxy
Parent MailProxy instance for accessing db, config, metrics.
- pool
SMTP connection pool for connection reuse.
- rate_limiter
Per-account rate limiting controller.
- Parameters:
proxy (
MailProxy)
- __init__(proxy)[source]
Initialize SmtpSender.
- Parameters:
proxy (
MailProxy) – Parent MailProxy instance.
- property db
Database access via proxy.
- property config
Configuration via proxy.
- property logger
Logger via proxy.
- property metrics: MailMetrics
Prometheus metrics via proxy.
- property attachments: AttachmentManager
Global attachment manager via proxy.
- exception core.mail_proxy.smtp.sender.AccountConfigurationError(message='Missing SMTP account configuration')[source]
Bases:
RuntimeErrorRaised when SMTP account configuration is missing or invalid.
- Parameters:
message (
str)
- exception core.mail_proxy.smtp.sender.AttachmentTooLargeError(filename, size_mb, max_size_mb)[source]
Bases:
ValueErrorRaised when an attachment exceeds the size limit and action is ‘reject’.
Asyncio-friendly SMTP connection pool with acquire/release pattern.
This module provides a connection pool for SMTP clients using a true resource pool pattern with acquire/release semantics. Connections are pooled by account key (host:port:user) and can be shared between concurrent coroutines.
Features: - Connection pooling per SMTP account - Configurable max connections per account - TTL-based connection expiration - Health checking via SMTP NOOP commands - Automatic reconnection when connections become stale - Context manager support for automatic release
Example
Using the SMTP pool with context manager:
pool = SMTPPool(ttl=300, max_per_account=5)
async with pool.connection(
host="smtp.example.com",
port=465,
user="sender@example.com",
password="secret",
use_tls=True
) as smtp:
await smtp.send_message(message)
# Connection automatically released back to pool
Manual acquire/release:
smtp = await pool.acquire(host, port, user, password, use_tls=True)
try:
await smtp.send_message(message)
finally:
await pool.release(smtp)
- class core.mail_proxy.smtp.pool.SMTPPool(ttl=300, max_per_account=5)[source]
Bases:
objectSMTP connection pool with acquire/release semantics.
Maintains pools of SMTP connections indexed by account key (host:port:user). Connections can be acquired by any coroutine and must be released after use. This enables efficient connection sharing across concurrent operations.
The pool uses TTL-based expiration and NOOP health checks to ensure connections remain valid. Invalid connections are discarded and replaced.
- ttl
Maximum age in seconds for pooled connections.
- max_per_account
Maximum connections per SMTP account.
- async acquire(host, port, user, password, *, use_tls, timeout=30.0)[source]
Acquire a connection from the pool.
Returns an idle connection if available, otherwise creates a new one. If max connections reached, waits until one becomes available.
- Parameters:
- Return type:
SMTP- Returns:
SMTP connection ready for use.
- Raises:
asyncio.TimeoutError – If timeout waiting for connection.
- async release(smtp)[source]
Release a connection back to the pool.
The connection is returned to the idle pool for reuse by other coroutines. If the connection is unhealthy, it is closed instead.
- Parameters:
smtp (
SMTP) – The SMTP connection to release.- Return type:
- connection(host, port, user, password, *, use_tls, timeout=30.0)[source]
Context manager for automatic acquire/release.
Example
- async with pool.connection(host, port, user, pwd, use_tls=True) as smtp:
await smtp.send_message(msg)
- async get_connection(host, port, user, password, *, use_tls)[source]
Legacy API: Get a connection (acquire without explicit release).
DEPRECATED: Use acquire()/release() or connection() context manager.
This method exists for backward compatibility. Connections obtained this way should be released manually, or they will be cleaned up when the pool is cleaned.
- async cleanup()[source]
Remove expired and unhealthy connections from idle pools.
Should be called periodically to prevent resource leaks.
- Return type:
- class core.mail_proxy.smtp.pool.PooledConnection(smtp, account_key, created_at=<factory>, last_used=<factory>)[source]
Bases:
objectWrapper for a pooled SMTP connection with metadata.
- smtp: SMTP
Attachment fetching from multiple storage backends.
Provides AttachmentManager for retrieving email attachments with: - Base64 inline content decoding - Filesystem fetching with path traversal protection - HTTP fetching with authentication support - Optional MD5-based caching
Supported fetch_mode values: - endpoint: HTTP POST to tenant’s attachment URL - http_url: Direct HTTP fetch from URL in storage_path - base64: Inline base64-encoded content in storage_path - filesystem: Local filesystem path (absolute or relative to base_dir)
If fetch_mode is not specified, it is inferred from storage_path format:
- base64:... prefix -> base64 (prefix is stripped)
- http:// or https:// prefix -> http_url
- / (absolute path) -> filesystem
- otherwise -> endpoint (default)
- class core.mail_proxy.smtp.attachments.Base64Fetcher[source]
Bases:
objectDecoder for base64-encoded inline attachment content.
- class core.mail_proxy.smtp.attachments.StorageFetcher(storage_manager=None)[source]
Bases:
objectFetcher for attachments via StorageManager (mount:path format).
In CE: supports only ‘local’ protocol (filesystem). In EE: supports S3, GCS, Azure via fsspec.
Path formats: - “mount:path/to/file” → uses configured mount point - “/absolute/path” → direct filesystem access (legacy compatibility)
- Parameters:
storage_manager (
Any)
- __init__(storage_manager=None)[source]
Initialize with optional StorageManager.
- Parameters:
storage_manager (
Any) – StorageManager instance. If None, only absolute paths work.
- async fetch(path)[source]
Fetch file content from storage.
- Parameters:
path (
str) – Either “mount:relative/path” or “/absolute/path”.- Return type:
- Returns:
File content as bytes.
- Raises:
ValueError – If path format invalid or mount not configured.
FileNotFoundError – If file doesn’t exist.
- class core.mail_proxy.smtp.attachments.HttpFetcher(default_endpoint=None, auth_config=None)[source]
Bases:
objectFetcher for HTTP-served attachments with authentication support.
- class core.mail_proxy.smtp.attachments.AttachmentManager(storage_manager=None, http_endpoint=None, http_auth_config=None, cache=None)[source]
Bases:
objectHigh-level interface for fetching email attachments from multiple sources.
- Parameters:
Two-tiered cache for attachment content.
Provides content-addressable caching using MD5 hash as key with automatic tier selection based on content size.
- Components:
MemoryCache: Fast LRU cache with short TTL for small files. DiskCache: Persistent cache with longer TTL for larger files. TieredCache: Combined memory + disk cache with automatic routing.
Example
Create and use a tiered cache:
cache = TieredCache(
memory_max_mb=50,
disk_dir="/var/cache/attachments",
disk_threshold_kb=100,
)
await cache.init()
content = await cache.get("a1b2c3d4...")
if content is None:
content = await fetch_from_storage()
await cache.set(TieredCache.compute_md5(content), content)
Note
Files smaller than disk_threshold_kb are cached in memory only. Larger files go to disk cache. Memory cache is checked first on reads for optimal performance.
- class core.mail_proxy.smtp.cache.MemoryCache(max_mb=50, ttl_seconds=300)[source]
Bases:
objectLRU in-memory cache with TTL and size limits.
- _max_bytes
Maximum cache size in bytes.
- _ttl_seconds
Time-to-live for entries.
- class core.mail_proxy.smtp.cache.DiskCache(cache_dir, max_mb=500, ttl_seconds=3600)[source]
Bases:
objectPersistent disk cache with TTL and size limits.
Uses subdirectory structure based on MD5 prefix for efficient filesystem operations with large numbers of files.
- _cache_dir
Root directory for cached files.
- _max_bytes
Maximum total cache size in bytes.
- _ttl_seconds
Time-to-live for entries.
- class core.mail_proxy.smtp.cache.TieredCache(memory_max_mb=50, memory_ttl_seconds=300, disk_dir=None, disk_max_mb=500, disk_ttl_seconds=3600, disk_threshold_kb=100)[source]
Bases:
objectTwo-tiered cache combining memory and disk storage.
Routes content to appropriate tier based on size threshold. Small files go to memory for fast access, large files to disk for persistence. Reads check memory first, then disk.
- _memory
MemoryCache instance for small files.
- _disk
Optional DiskCache for large files.
- _threshold_bytes
Size threshold for tier selection.
- Parameters:
Retry strategy for SMTP message delivery.
This module provides a configurable retry strategy that determines when and how to retry failed message deliveries.
Example
Using default retry strategy:
strategy = RetryStrategy()
# Check if error is temporary
is_temp, smtp_code = strategy.classify_error(exception)
# Check if should retry
if strategy.should_retry(retry_count, exception):
delay = strategy.calculate_delay(retry_count)
# schedule retry after delay seconds
- class core.mail_proxy.smtp.retry.RetryStrategy(max_retries=5, delays=(60, 300, 900, 3600, 7200))[source]
Bases:
objectConfiguration and logic for message retry behavior.
- max_retries
Maximum number of retry attempts before permanent failure.
- delays
Tuple of delay intervals in seconds for each retry attempt. If retry_count exceeds the length, the last delay is used.
In-memory sliding-window rate limiter.
This module implements per-account rate limiting with configurable limits at minute, hour, and day granularity. The limiter uses in-memory storage to track send history, enabling fast rate limiting decisions.
The sliding window approach ensures fair distribution of sends over time rather than allowing burst behavior at window boundaries.
To handle parallel dispatch correctly, the limiter tracks “in-flight” sends in memory. This ensures that concurrent sends are counted even before they complete and are logged.
Note: Send history is lost on service restart. This is acceptable for rate limiting purposes as it only allows a brief window of potentially higher sending rates after restart.
Example
Using the rate limiter:
rate_limiter = RateLimiter()
deferred_until, should_reject = await rate_limiter.check_and_plan(account)
if deferred_until:
if should_reject:
# Message should be rejected with rate limit error
return {"error": "rate_limit_exceeded"}
else:
# Message should be deferred until this timestamp
await persistence.set_deferred(msg_id, deferred_until)
else:
# Safe to send now
try:
await send_message(msg)
await rate_limiter.log_send(account_id)
except Exception:
await rate_limiter.release_slot(account_id)
raise
- class core.mail_proxy.smtp.rate_limiter.RateLimiter(smtp_sender=None)[source]
Bases:
objectPer-account sliding-window rate limiter with in-memory storage.
Enforces configurable send rate limits at three granularities: - Per minute - Per hour - Per day
When any limit is exceeded, the limiter calculates the earliest timestamp at which the message can be safely sent without violating the limit.
Tracks in-flight sends in memory to handle parallel dispatch correctly.
- Parameters:
smtp_sender (
SmtpSender|None)
- __init__(smtp_sender=None)[source]
Initialize the rate limiter.
- Parameters:
smtp_sender (
SmtpSender|None) – Parent SmtpSender instance for accessing proxy resources.
- async check_and_plan(account)[source]
Check rate limits and calculate deferral timestamp if exceeded.
Evaluates the account’s configured rate limits against recent send history plus in-flight sends. If any limit is exceeded, returns a tuple indicating the deferral time and whether to reject.
If the check passes, reserves a slot by incrementing the in-flight counter. The caller MUST call either log_send() on success or release_slot() on failure to release the reservation.
Limits are checked in order of granularity (minute, hour, day) and the first exceeded limit determines the deferral time.
- Parameters:
account (
dict[str,Any]) – Account configuration dictionary containing: - id: The account identifier (required). - limit_per_minute: Max sends per minute (optional). - limit_per_hour: Max sends per hour (optional). - limit_per_day: Max sends per day (optional). - limit_behavior: “defer” (default) or “reject”.- Returns:
deferred_until: Unix timestamp until which message should be deferred, or None if sending is permitted immediately.
should_reject: True if limit_behavior is “reject” and limit was exceeded, meaning message should be rejected with error.
- Return type:
Tuple of (deferred_until, should_reject)
- async log_send(account_id)[source]
Record a successful send for rate limiting purposes.
Must be called after each successful message delivery to maintain accurate rate limit tracking. Releases the in-flight slot reserved by check_and_plan().
- async release_slot(account_id)[source]
Release an in-flight slot without logging a send.
Call this when a send fails after check_and_plan() returned None. This ensures the in-flight counter stays accurate.
Reporting
Client Reporter - delivery report synchronization with upstream clients.
This module provides ClientReporter, the component responsible for: - Fetching unreported delivery events from message_events table - Sending delivery reports to tenant-specific or global sync URLs - Handling “do not disturb” schedules via next_sync_after - Managing report retention cleanup
ClientReporter is instantiated by MailProxy and accessed via proxy.client_reporter.
Example
# ClientReporter is created by MailProxy proxy = MailProxy(db_path=”mail.db”) await proxy.start() # Starts client_reporter automatically
# Direct access if needed proxy.client_reporter.wake() # Trigger immediate report cycle
- class core.mail_proxy.reporting.client_reporter.ClientReporter(proxy)[source]
Bases:
objectDelivery report synchronization with upstream clients.
Manages the background loop that: - Fetches unreported events from message_events table - Groups events by tenant and sends to appropriate sync URLs - Handles authentication (bearer token, basic auth) - Respects “do not disturb” schedules from client responses - Cleans up old reported messages based on retention policy
- proxy
Parent MailProxy instance for accessing db, config, etc.
- Parameters:
proxy (
MailProxy)
- __init__(proxy)[source]
Initialize ClientReporter.
- Parameters:
proxy (
MailProxy) – Parent MailProxy instance.
- property db
Database access via proxy.
- property logger
Logger via proxy.
- property metrics
Prometheus metrics via proxy.
Enterprise Package (BSL 1.1)
Enterprise features require a commercial license for production use.
Proxy Extension
Enterprise Edition features for genro-mail-proxy.
This package extends the core mail-proxy with enterprise capabilities: bounce detection, PEC receipt handling, and large file storage.
- Components:
MailProxy_EE: Mixin adding bounce detection to MailProxy. bounce/: IMAP polling for bounce notification detection. pec/: PEC (Italian certified email) receipt handling. attachments/: Large file storage via fsspec (S3/GCS/Azure). entities/: EE table mixins (AccountsTable_EE, TenantsTable_EE). imap/: Async IMAP client wrapper (aioimaplib).
Example
EE features are auto-enabled when this package is installed:
from core.mail_proxy import MailProxy
proxy = MailProxy(db_path="mail.db")
# MailProxy automatically includes MailProxy_EE mixin
# Configure bounce detection
from enterprise.mail_proxy.bounce import BounceConfig
proxy.configure_bounce_receiver(BounceConfig(
host="imap.example.com",
port=993,
user="bounces@example.com",
password="secret",
))
await proxy.start()
Note
EE detection happens at import time in core.mail_proxy.__init__. The HAS_ENTERPRISE flag controls mixin composition.
- class enterprise.mail_proxy.MailProxy_EE[source]
Bases:
objectEE mixin: adds BounceReceiver to MailProxy.
- Instance Attributes (added by this mixin):
bounce_receiver: BounceReceiver instance (or None if not started) _bounce_config: BounceConfig instance (or None if not configured)
- bounce_receiver_running()
Property to check if poller is active
- Overridden Methods (from MailProxy CE stubs):
__init_proxy_ee__(): Initialize bounce_receiver, _bounce_config _start_proxy_ee(): Start BounceReceiver if configured _stop_proxy_ee(): Stop BounceReceiver
- bounce_receiver: BounceReceiver | None
- configure_bounce_receiver(config)[source]
Configure bounce detection.
Call this before start() to enable bounce detection. The bounce receiver will poll the configured IMAP mailbox for bounce messages and correlate them with sent messages using the X-Genro-Mail-ID header.
- Parameters:
config (
BounceConfig) – BounceConfig with IMAP credentials and polling settings.- Return type:
- enterprise.mail_proxy.is_ee_enabled()[source]
Check if Enterprise Edition is available and enabled.
- Return type:
- Returns:
True if EE package is properly installed and importable.
MailProxy_EE: Enterprise Edition mixin for bounce detection.
This mixin adds bounce detection to MailProxy by overriding the CE stub methods (__init_proxy_ee__, _start_proxy_ee, _stop_proxy_ee).
- Class Hierarchy:
- MailProxyBase (CE): config, db, tables, endpoints, api/cli
- └── MailProxy (CE): +SmtpSender, +ClientReporter, +metrics
└── MailProxy with MailProxy_EE (EE): +BounceReceiver
- Bounce Detection:
Monitors a dedicated IMAP mailbox for bounce notifications. Correlates bounces with sent messages using the X-Genro-Mail-ID header.
- Configuration:
- proxy.configure_bounce_receiver(BounceConfig(
imap_host=”imap.example.com”, imap_user=”bounce@example.com”, imap_password=”secret”,
)) await proxy.start()
- Commands (via handle_bounce_command):
getBounceStatus: Returns configured and running status
- class enterprise.mail_proxy.proxy_ee.MailProxy_EE[source]
Bases:
objectEE mixin: adds BounceReceiver to MailProxy.
- Instance Attributes (added by this mixin):
bounce_receiver: BounceReceiver instance (or None if not started) _bounce_config: BounceConfig instance (or None if not configured)
- bounce_receiver_running()
Property to check if poller is active
- Overridden Methods (from MailProxy CE stubs):
__init_proxy_ee__(): Initialize bounce_receiver, _bounce_config _start_proxy_ee(): Start BounceReceiver if configured _stop_proxy_ee(): Stop BounceReceiver
- bounce_receiver: BounceReceiver | None
- configure_bounce_receiver(config)[source]
Configure bounce detection.
Call this before start() to enable bounce detection. The bounce receiver will poll the configured IMAP mailbox for bounce messages and correlate them with sent messages using the X-Genro-Mail-ID header.
- Parameters:
config (
BounceConfig) – BounceConfig with IMAP credentials and polling settings.- Return type:
Bounce Detection
Bounce message parser for DSN (RFC 3464) and heuristic detection.
- class enterprise.mail_proxy.bounce.parser.BounceInfo(original_message_id, bounce_type, bounce_code, bounce_reason, recipient)[source]
Bases:
objectParsed bounce information.
- Parameters:
- class enterprise.mail_proxy.bounce.parser.BounceParser[source]
Bases:
objectParse bounce messages (DSN RFC 3464 + heuristics).
- HARD_BOUNCE_CODES = frozenset({'500', '501', '502', '503', '504', '510', '511', '512', '513', '521', '522', '523', '530', '531', '532', '541', '542', '543', '550', '551', '552', '553', '554', '555', '556', '557'})
- SOFT_BOUNCE_CODES = frozenset({'400', '401', '402', '403', '404', '405', '407', '408', '409', '410', '411', '412', '413', '414', '421', '422', '431', '432', '441', '442', '450', '451', '452', '453', '454', '455', '456', '471', '472'})
- BOUNCE_SUBJECT_PATTERNS = [re.compile('(?:mail|message|delivery)\\s*(?:delivery|failure|failed|returned|undeliverable)', re.IGNORECASE), re.compile('undelivered\\s*mail\\s*returned', re.IGNORECASE), re.compile('(?:returned|bounced)\\s*mail', re.IGNORECASE), re.compile('failure\\s*notice', re.IGNORECASE)]
- SMTP_CODE_PATTERN = re.compile('\\b([45]\\d{2})\\b')
- ENHANCED_CODE_PATTERN = re.compile('\\b([45])\\.(\\d+)\\.(\\d+)\\b')
Bounce receiver - IMAP polling for bounce detection.
This module provides BounceReceiver, the component responsible for: - Polling an IMAP mailbox for bounce notifications - Parsing bounce messages to extract delivery failure information - Correlating bounces with sent messages via X-Genro-Mail-ID header - Recording bounce events in the message_events table
BounceReceiver is instantiated by MailProxy_EE and accessed via proxy.bounce_receiver.
Example
# BounceReceiver is created by MailProxy_EE when configured from enterprise.mail_proxy.bounce import BounceConfig
- proxy.configure_bounce_receiver(BounceConfig(
host=”imap.example.com”, port=993, user=”bounces@example.com”, password=”secret”,
)) await proxy.start() # Starts bounce_receiver automatically
- class enterprise.mail_proxy.bounce.receiver.BounceConfig(host, port, user, password, use_ssl=True, folder='INBOX', poll_interval=60)[source]
Bases:
objectConfiguration for bounce mailbox polling.
- Parameters:
- class enterprise.mail_proxy.bounce.receiver.BounceReceiver(proxy, config)[source]
Bases:
objectBackground task that polls IMAP for bounce messages.
Monitors a dedicated IMAP mailbox for bounce notifications and correlates them with sent messages using the X-Genro-Mail-ID header.
- proxy
Parent MailProxy instance for accessing db, logger, etc.
- config
BounceConfig with IMAP credentials and polling settings.
- Parameters:
proxy (
MailProxy_EE)config (
BounceConfig)
- __init__(proxy, config)[source]
Initialize BounceReceiver.
- Parameters:
proxy (
MailProxy_EE) – Parent MailProxy instance.config (
BounceConfig) – BounceConfig with IMAP credentials and polling settings.
- property db
Database access via proxy.
- property logger
Logger via proxy.
PEC Support
PEC receipt parser for Italian certified email receipts.
PEC receipts are standardized messages sent by PEC providers: - Ricevuta di accettazione: confirms the PEC system accepted the message - Ricevuta di avvenuta consegna: confirms delivery to recipient’s PEC mailbox - Ricevuta di mancata consegna: delivery failed (recipient doesn’t exist, etc.) - Avviso di non accettazione: message was rejected by the PEC system
Receipts contain: - X-Ricevuta header indicating receipt type - Original message ID in X-Riferimento-Message-ID or embedded headers - Timestamp of the event - Error details for failure receipts
- class enterprise.mail_proxy.pec.parser.PecReceiptInfo(original_message_id, receipt_type, timestamp, error_reason, recipient)[source]
Bases:
objectParsed PEC receipt information.
- Parameters:
- receipt_type: Literal['accettazione', 'consegna', 'mancata_consegna', 'non_accettazione', 'presa_in_carico'] | None
- __init__(original_message_id, receipt_type, timestamp, error_reason, recipient)
- class enterprise.mail_proxy.pec.parser.PecReceiptParser[source]
Bases:
objectParse PEC receipt messages.
- RECEIPT_TYPE_MAP: dict[str, Literal['accettazione', 'consegna', 'mancata_consegna', 'non_accettazione', 'presa_in_carico']] = {'accettazione': 'accettazione', 'avvenuta-consegna': 'consegna', 'consegna': 'consegna', 'errore-consegna': 'mancata_consegna', 'mancata-consegna': 'mancata_consegna', 'non-accettazione': 'non_accettazione', 'presa-in-carico': 'presa_in_carico'}
- RECEIPT_SUBJECT_PATTERNS = [(re.compile('ACCETTAZIONE:', re.IGNORECASE), 'accettazione'), (re.compile('AVVENUTA\\s+CONSEGNA:', re.IGNORECASE), 'consegna'), (re.compile('POSTA\\s+CERTIFICATA:\\s+AVVENUTA\\s+CONSEGNA', re.IGNORECASE), 'consegna'), (re.compile('MANCATA\\s+CONSEGNA:', re.IGNORECASE), 'mancata_consegna'), (re.compile('ERRORE\\s+CONSEGNA:', re.IGNORECASE), 'mancata_consegna'), (re.compile('NON\\s+ACCETTAZIONE:', re.IGNORECASE), 'non_accettazione'), (re.compile('PRESA\\s+IN\\s+CARICO:', re.IGNORECASE), 'presa_in_carico')]
PEC receipt receiver loop for polling IMAP and processing receipts.
This module monitors PEC accounts for incoming receipts (ricevute) and: 1. Parses receipt type (accettazione, consegna, mancata_consegna, etc.) 2. Creates corresponding events in the message_events table 3. Handles timeout for PEC messages that don’t receive acceptance within 30 minutes
The receiver runs as a background task and polls all PEC accounts periodically.
- class enterprise.mail_proxy.pec.receiver.PecReceiver(db, logger=None, poll_interval=60, acceptance_timeout=1800)[source]
Bases:
objectBackground task that polls PEC accounts for receipts.
- Parameters:
db (MailProxyDb)
logger (Logger | None)
poll_interval (int)
acceptance_timeout (int)
IMAP Client
Async IMAP client wrapper for bounce detection.
- class enterprise.mail_proxy.imap.client.IMAPClient(logger=None)[source]
Bases:
objectAsync IMAP client wrapper using aioimaplib.
- async connect(host, port, user, password, use_ssl=True)[source]
Connect and authenticate to IMAP server.
Large File Storage
Large file storage backend using fsspec for multi-cloud support.
This module provides storage capabilities for large email attachments, supporting multiple backends (S3, GCS, Azure, local filesystem) via fsspec.
When attachments exceed the configured size threshold, they are uploaded to external storage and replaced with download links in the email body.
Example
Using S3 storage:
storage = LargeFileStorage(
storage_url="s3://my-bucket/mail-attachments",
)
await storage.upload("file-123", content, "report.pdf")
url = storage.get_download_url("file-123", "report.pdf")
Using local filesystem with public URL:
storage = LargeFileStorage(
storage_url="file:///var/www/downloads",
public_base_url="https://files.example.com",
)
- exception enterprise.mail_proxy.attachments.large_file_storage.LargeFileStorageError[source]
Bases:
ExceptionBase exception for large file storage errors.
- exception enterprise.mail_proxy.attachments.large_file_storage.StorageNotConfiguredError[source]
Bases:
LargeFileStorageErrorRaised when storage is not properly configured.
- exception enterprise.mail_proxy.attachments.large_file_storage.UploadError[source]
Bases:
LargeFileStorageErrorRaised when file upload fails.
- class enterprise.mail_proxy.attachments.large_file_storage.LargeFileStorage(storage_url, public_base_url=None, secret_key=None, storage_options=None)[source]
Bases:
objectStorage backend for large email attachments using fsspec.
Supports multiple storage backends through fsspec’s unified interface: - S3/MinIO: s3://bucket/path - Google Cloud Storage: gs://bucket/path - Azure Blob: az://container/path - Local filesystem: file:///path/to/dir
For cloud storage (S3, GCS, Azure), download URLs are generated using the backend’s native signed URL mechanism. For local filesystem, a public_base_url must be provided and signed tokens are generated.
- storage_url
fsspec-compatible URL for the storage backend.
- public_base_url
Base URL for download links (required for local storage).
- secret_key
Secret key for signing download tokens.
- Parameters:
- __init__(storage_url, public_base_url=None, secret_key=None, storage_options=None)[source]
Initialize the large file storage backend.
- Parameters:
storage_url (
str) – fsspec URL (s3://bucket/path, file:///data, etc.).public_base_url (
str|None) – Public URL for download links (required for local storage).secret_key (
str|None) – Secret key for signing download tokens.storage_options (
dict[str,Any] |None) – Additional options passed to fsspec filesystem.
- Raises:
ImportError – If fsspec is not installed.
- property fs: AbstractFileSystem
The fsspec filesystem instance.
- async upload(file_id, content, filename)[source]
Upload a file to storage.
- Parameters:
- Return type:
- Returns:
The storage path where the file was uploaded.
- Raises:
UploadError – If upload fails.
- get_download_url(file_id, filename, expires_in=86400)[source]
Generate a download URL for a file.
For cloud storage backends (S3, GCS, Azure), uses the backend’s native signed URL mechanism. For local filesystem, generates a signed token URL using public_base_url.
- Parameters:
- Return type:
- Returns:
A download URL for the file.
- Raises:
StorageNotConfiguredError – If public_base_url is required but not set.
- verify_download_token(token, filename)[source]
Verify a download token and return the file_id if valid.
Enterprise Entities
Account EE (PEC Support)
Enterprise Edition extensions for AccountsTable.
This module adds PEC (Posta Elettronica Certificata) functionality to the base AccountsTable. PEC is the Italian certified email system that requires IMAP polling for delivery receipts.
PEC accounts have: - is_pec_account=1 flag - IMAP configuration for receipt polling - Sync state tracking (last_uid, uidvalidity)
- Usage:
- class AccountsTable(AccountsTable_EE, AccountsTableBase):
pass
- class enterprise.mail_proxy.entities.account.table_ee.AccountsTable_EE[source]
Bases:
objectEnterprise Edition: PEC account support with IMAP configuration.
Adds: - IMAP/PEC columns via configure() - Methods for PEC account management - IMAP sync state tracking
- async add_pec_account(acc)[source]
Insert or update a PEC account with IMAP configuration.
PEC accounts have is_pec_account=1 and require IMAP settings for reading delivery receipts (ricevute di accettazione/consegna).
- Required fields:
id: Account identifier host, port: SMTP server config imap_host: IMAP server for reading receipts
- Optional IMAP fields:
imap_port: IMAP port (default 993) imap_user: IMAP username (defaults to SMTP user) imap_password: IMAP password (defaults to SMTP password) imap_folder: Folder to monitor (default “INBOX”)
Enterprise Edition: PEC account support for AccountEndpoint.
PEC = Posta Elettronica Certificata (Italian certified email). Requires IMAP polling for delivery receipts.
- class enterprise.mail_proxy.entities.account.endpoint_ee.AccountEndpoint_EE[source]
Bases:
objectEE mixin: adds PEC account methods to AccountEndpoint.
- async add_pec(id, tenant_id, host, port, imap_host, user=None, password=None, use_tls=True, imap_port=993, imap_user=None, imap_password=None, imap_folder='INBOX', batch_size=None, ttl=300, limit_per_minute=None, limit_per_hour=None, limit_per_day=None, limit_behavior='defer')[source]
Add or update a PEC account with IMAP configuration.
Tenant EE (API Keys)
Enterprise Edition extensions for TenantsTable.
This module adds multi-tenant management functionality to the base TenantsTable. In EE mode, multiple tenants can be created with isolated configurations and API key authentication.
Multi-tenant features: - Tenant CRUD operations (add, list, update, remove) - Tenant API key management for scoped authentication
Note: Batch suspension is available in CE (core/tenant/table.py).
- Usage:
- class TenantsTable(TenantsTable_EE, TenantsTableBase):
pass
- class enterprise.mail_proxy.entities.tenant.table_ee.TenantsTable_EE[source]
Bases:
objectEnterprise Edition: Multi-tenant management.
Adds methods for: - Creating and managing multiple tenants - Tenant API key authentication
- async add(tenant)[source]
Insert or update a tenant configuration.
For new tenants, automatically generates an API key. For existing tenants, keeps the existing API key.
- Parameters:
tenant (
dict[str,Any]) – Tenant configuration dict with at least ‘id’ field. Optional fields: name, client_auth, client_base_url, client_sync_path, client_attachment_path, rate_limits, large_file_config, active.- Return type:
- Returns:
The API key (raw, show once) for new tenants. Empty string for existing tenants (key unchanged).
- async list_all(active_only=False)[source]
Return all tenants, optionally filtered by active status.
- async update_fields(tenant_id, updates)[source]
Update a tenant’s fields.
- Parameters:
- Return type:
- Returns:
True if row was updated, False if no valid updates or tenant not found.
- async remove(tenant_id)[source]
Delete a tenant and cascade to related accounts and messages.
Warning: This deletes ALL data associated with the tenant including: - All messages in the queue - All SMTP account configurations - The tenant configuration itself
- async create_api_key(tenant_id, expires_at=None)[source]
Create a new API key for a tenant.
Generates a new random API key, replacing any existing key. The raw key is returned once and cannot be retrieved later.
- async get_tenant_by_token(raw_key)[source]
Find tenant by API key token.
Looks up the tenant associated with the given API key. Validates that the key has not expired.
Enterprise Edition extensions for TenantEndpoint.
This module adds API key management functionality to the base TenantEndpoint. EE mode allows tenants to have their own API keys for scoped authentication.
API key features: - Create new API keys for tenants (with optional expiration) - Revoke API keys without deleting tenants
- Usage:
- class TenantEndpoint(TenantEndpoint_EE, TenantEndpointBase):
pass
- class enterprise.mail_proxy.entities.tenant.endpoint_ee.TenantEndpoint_EE[source]
Bases:
objectEnterprise Edition: API key management for tenants.
Adds methods for: - Creating tenant-specific API keys - Revoking API keys
- async create_api_key(tenant_id, expires_at=None)[source]
Create a new API key for a tenant.
Generates a new random API key, replacing any existing key. The raw key is returned once and cannot be retrieved later. Save it immediately!
- Parameters:
- Return type:
- Returns:
Dict with ok=True and api_key (show once).
- Raises:
ValueError – If tenant not found.
- async revoke_api_key(tenant_id)[source]
Revoke the API key for a tenant.
Removes the API key, preventing further authentication with it. The tenant can still be accessed via instance token.
- Parameters:
tenant_id (
str) – The tenant ID.- Return type:
- Returns:
Dict with ok=True.
- Raises:
ValueError – If tenant not found.
Instance EE (Bounce Config)
Enterprise Edition extensions for InstanceTable.
This module adds bounce detection configuration to the base InstanceTable. Bounce detection monitors a dedicated IMAP mailbox for bounce notifications and DSN (Delivery Status Notification) messages.
Bounce detection workflow: 1. Outgoing emails use a dedicated Return-Path address 2. Bounces are delivered to the bounce mailbox 3. IMAP poller reads and parses bounce messages 4. Bounce events are recorded in message_events
- Usage:
- class InstanceTable(InstanceTable_EE, InstanceTableBase):
pass
- class enterprise.mail_proxy.entities.instance.table_ee.InstanceTable_EE[source]
Bases:
objectEnterprise Edition: Bounce detection configuration.
Adds: - Bounce IMAP columns via configure() - Methods for bounce detection management - Sync state tracking
- async is_bounce_enabled()[source]
Check if bounce detection is enabled.
- Return type:
- Returns:
True if bounce_enabled=1 in instance config.
- async get_bounce_config()[source]
Get bounce detection configuration.
- Returns:
enabled: bool
imap_host, imap_port, imap_user, imap_password, imap_folder
imap_ssl: bool
poll_interval: seconds between polls
return_path: Return-Path header for outgoing emails
last_uid, last_sync, uidvalidity: sync state
- Return type:
Dict with bounce settings
- async set_bounce_config(*, enabled=None, imap_host=None, imap_port=None, imap_user=None, imap_password=None, imap_folder=None, imap_ssl=None, poll_interval=None, return_path=None)[source]
Set bounce detection configuration.
Only provided fields are updated. Pass None to skip a field.
Enterprise Edition extensions for InstanceEndpoint.
This module adds bounce detection configuration to the base InstanceEndpoint. Bounce detection monitors a dedicated IMAP mailbox for bounce notifications.
Bounce features: - Get/set bounce detection configuration - Runtime reload of bounce config
- Usage:
- class InstanceEndpoint(InstanceEndpoint_EE, InstanceEndpointBase):
pass
- class enterprise.mail_proxy.entities.instance.endpoint_ee.InstanceEndpoint_EE[source]
Bases:
objectEnterprise Edition: Bounce detection configuration.
Adds methods for: - Getting bounce detection configuration - Setting bounce detection configuration - Reloading bounce config at runtime
- async get_bounce_config()[source]
Get bounce detection configuration.
- Returns:
enabled: bool
imap_host, imap_port, imap_user, imap_folder
imap_ssl: bool
poll_interval: seconds between polls
return_path: Return-Path header for outgoing emails
- Return type:
Dict with bounce settings including
- async set_bounce_config(enabled=None, imap_host=None, imap_port=None, imap_user=None, imap_password=None, imap_folder=None, imap_ssl=None, poll_interval=None, return_path=None)[source]
Set bounce detection configuration.
Only provided fields are updated. Pass None to skip a field. Use reload_bounce() to apply changes to running poller.
Message EE (PEC Events)
Enterprise Edition extensions for MessagesTable.
This module adds PEC (Posta Elettronica Certificata) tracking functionality to the base MessagesTable. PEC messages require tracking of acceptance and delivery receipts.
PEC workflow: 1. Message sent via PEC account gets is_pec=1 2. IMAP poller checks for acceptance receipt 3. If no acceptance within timeout, message flagged for alert 4. Delivery receipt completes the PEC lifecycle
- Usage:
- class MessagesTable(MessagesTable_EE, MessagesTableBase):
pass
- class enterprise.mail_proxy.entities.message.table_ee.MessagesTable_EE[source]
Bases:
objectEnterprise Edition: PEC message tracking.
Adds methods for: - Clearing PEC flag for non-PEC recipients - Finding PEC messages missing acceptance receipts
- async clear_pec_flag(pk)[source]
Clear the is_pec flag when recipient is not a PEC address.
Called when sending to a non-PEC recipient via a PEC account. The message doesn’t need receipt tracking in this case.
- async get_pec_without_acceptance(cutoff_ts)[source]
Get PEC messages sent before cutoff_ts without acceptance receipt.
Used to detect PEC delivery failures. If a PEC message doesn’t receive an acceptance receipt within the expected timeframe, it may indicate a delivery problem.
Returns messages where: - is_pec = 1 (marked as PEC) - smtp_ts < cutoff_ts (sent before cutoff) - No pec_acceptance event exists in message_events
Storage EE (Cloud Backends)
Enterprise Edition: Cloud storage backend support via fsspec.
This mixin adds S3, Azure, and GCS support to StorageNode.
- class enterprise.mail_proxy.storage.node_ee.StorageNode_EE[source]
Bases:
objectEnterprise Edition mixin: Cloud storage backends via fsspec.
Supports: - S3 (protocol: ‘s3’) - Google Cloud Storage (protocol: ‘gcs’) - Azure Blob (protocol: ‘azure’)
- Usage (mixed into StorageNode):
- class StorageNode(StorageNode_EE, StorageNodeBase):
pass
SQL Package
Database abstraction layer supporting SQLite and PostgreSQL.
Async database manager with adapter pattern and table registration.
- class sql.sqldb.SqlDb(connection_string, parent=None)[source]
Bases:
objectAsync database manager with adapter pattern.
Supports multiple database types via adapters: - SQLite: “/path/to/db.sqlite” or “sqlite:/path/to/db” - PostgreSQL: “postgresql://user:pass@host/db”
Features: - Table class registration via add_table() - Table access via table(name) - Schema creation and verification - CRUD operations via adapter - Encryption key access via parent.encryption_key
- Usage:
db = SqlDb(“/data/mail.db”, parent=proxy) await db.connect()
db.add_table(TenantsTable) db.add_table(AccountsTable) await db.check_structure()
tenant = await db.table(‘tenants’).select_one(where={“id”: “acme”})
await db.close()
- property encryption_key: bytes | None
Get encryption key from parent. Returns None if not configured.
- table(name)[source]
Get table instance by name.
- Parameters:
name (
str) – Table name.- Return type:
- Returns:
Table instance.
- Raises:
ValueError – If table not registered.
Table base class with Columns-based schema (async version).
- class sql.table.Table(db)[source]
Bases:
objectBase class for async table managers.
Subclasses define columns via configure() hook and implement domain-specific operations.
- name
Table name in database.
- pkey
Primary key column name (e.g., “pk” or “id”).
- db
SqlDb instance reference.
- columns
Column definitions.
- Parameters:
db (
SqlDb)
- new_pkey_value()[source]
Generate a new primary key value. Override in subclasses for custom pk.
Default: returns UUID. Tables with autoincrement pk should return None.
- Return type:
- async trigger_on_inserting(record)[source]
Called before insert. Can modify record. Return the record to insert.
Auto-generates pk via new_pkey_value() if pk column is not in record.
- async trigger_on_updating(record, old_record)[source]
Called before update. Can modify record. Return the record to update.
- async add_column_if_missing(column_name)[source]
Add column if it doesn’t exist (migration helper).
- async sync_schema()[source]
Sync table schema by adding any missing columns.
Iterates over all columns defined in configure() and adds them if they don’t exist in the database. This enables automatic schema migration when new columns are added to the codebase.
Safe to call on every startup - existing columns are ignored. Works with both SQLite and PostgreSQL.
- Return type:
- async insert(data)[source]
Insert a row. Calls trigger_on_inserting before and trigger_on_inserted after.
The data dict is mutated: if the pk is auto-generated (UUID or autoincrement), it will be populated in data after insert.
- async select_for_update(where, columns=None)[source]
Select single row with FOR UPDATE lock (PostgreSQL) or regular select (SQLite).
- record(pkey_value, insert_missing=False, for_update=True)[source]
Return async context manager for record update.
- Parameters:
- Return type:
- Returns:
RecordUpdater context manager.
- Usage:
# Single key: async with table.record(‘uuid-123’) as rec:
rec[‘name’] = ‘New Name’
# Composite key (dict): async with table.record({‘tenant_id’: ‘t1’, ‘id’: ‘acc1’}) as rec:
rec[‘host’] = ‘smtp.example.com’
# Upsert (insert if missing): async with table.record({‘tenant_id’: ‘t1’, ‘id’: ‘new’}, insert_missing=True) as rec:
rec[‘host’] = ‘smtp.new.com’
- async update(values, where)[source]
Update rows. Calls trigger_on_updating before and trigger_on_updated after.
Uses SELECT FOR UPDATE to lock the row during update (PostgreSQL).
- async update_batch(pkeys, updater=None)[source]
Update multiple records by primary key, calling triggers for each.
Performs ONE read (SELECT all records) then N writes (UPDATE per record) with trigger_on_updating/trigger_on_updated called for each.
- async update_batch_raw(pkeys, updater)[source]
Update multiple records with a single UPDATE statement. No triggers.
Use when you know there are no triggers to call and want maximum efficiency. Performs a single UPDATE … WHERE pk IN (…) query.
- async delete(where)[source]
Delete rows. Calls trigger_on_deleting before and trigger_on_deleted after.
- class sql.table.RecordUpdater(table, pkey, pkey_value, insert_missing=False, for_update=True)[source]
Bases:
objectAsync context manager for record update with locking and triggers.
- Usage:
- async with table.record(pk) as record:
record[‘field’] = ‘value’
# → triggers update() with old_record
- async with table.record(pk, insert_missing=True) as record:
record[‘field’] = ‘value’
# → insert() if not exists, update() if exists
The context manager: - __aenter__: SELECT FOR UPDATE (PostgreSQL) or SELECT (SQLite), saves old_record - __aexit__: calls insert() or update() with proper trigger chain Supports both single-column keys and composite keys (dict).
Single key: record(“uuid-123”) or record(“uuid-123”, pkey=”pk”) Composite: record({“tenant_id”: “t1”, “id”: “acc1”})
- Parameters:
Column and Columns definitions for table schema.
- class sql.column.Column(name, type_, *, unique=False, nullable=True, default=None, json_encoded=False, encrypted=False)[source]
Bases:
objectColumn definition with SQL type, constraints, and optional relation.
- Parameters:
- __init__(name, type_, *, unique=False, nullable=True, default=None, json_encoded=False, encrypted=False)[source]
- class sql.column.Columns[source]
Bases:
objectContainer for table columns.
Database Adapters
Base adapter class for async database backends with CRUD helpers.
- class sql.adapters.base.DbAdapter[source]
Bases:
ABCAbstract base class for async database adapters with CRUD helpers.
Provides a unified interface for SQLite and PostgreSQL with: - Connection management (connect, close) - Raw query execution (execute, fetch_one, fetch_all) - CRUD helpers (insert, select, update, delete)
Subclasses must implement the abstract methods and set the placeholder attribute for parameter binding (:name for SQLite, %(name)s for PostgreSQL).
- for_update_clause()[source]
Return FOR UPDATE clause if supported, empty string otherwise.
- Return type:
- abstractmethod async execute_many(query, params_list)[source]
Execute query multiple times with different params (batch insert).
- abstractmethod async fetch_one(query, params=None)[source]
Execute query, return single row as dict or None.
- abstractmethod async fetch_all(query, params=None)[source]
Execute query, return all rows as list of dicts.
- abstractmethod async execute_script(script)[source]
Execute multiple statements (for schema creation).
- async insert_returning_id(table, values, pk_col='id')[source]
Insert a row and return the generated primary key.
Override in subclasses for database-specific implementation (e.g., RETURNING for PostgreSQL, lastrowid for SQLite).
- async select(table, columns=None, where=None, order_by=None, limit=None)[source]
Select rows, return list of dicts.
SQLite async adapter using aiosqlite.
- class sql.adapters.sqlite.SqliteAdapter(db_path)[source]
Bases:
DbAdapterSQLite async adapter. Uses :name placeholders natively.
- Parameters:
db_path (
str)
- async insert_returning_id(table, values, pk_col='id')[source]
Insert a row and return the generated primary key (autoincrement).
- async execute_many(query, params_list)[source]
Execute query multiple times with different params (batch insert).
PostgreSQL async adapter using psycopg3 with connection pooling.
- class sql.adapters.postgresql.PostgresAdapter(dsn, pool_size=10)[source]
Bases:
DbAdapterPostgreSQL async adapter using psycopg3 with connection pooling.
Converts :name placeholders to %(name)s for psycopg compatibility.
- async connect()[source]
Establish connection pool.
Sets search_path to ‘public’ schema to ensure tables are created in a valid schema even when the connection default is unset.
- Return type:
- async insert_returning_id(table, values, pk_col='id')[source]
Insert a row and return the generated primary key (RETURNING).
- async execute_many(query, params_list)[source]
Execute query multiple times with different params (batch insert).
Storage Package
Storage node management for multi-backend file storage.
StorageManager: mount point management compatible with genro-storage API.
The StorageManager handles configuration of storage backends and creation of StorageNode instances for file operations.
- class storage.manager.StorageManager[source]
Bases:
objectManages storage mount points and creates StorageNode instances.
API is compatible with genro-storage for future migration.
- Usage:
storage = StorageManager() storage.configure([
{‘name’: ‘data’, ‘protocol’: ‘local’, ‘base_path’: ‘/data’}, {‘name’: ‘cache’, ‘protocol’: ‘local’, ‘base_path’: ‘/tmp/cache’},
])
node = storage.node(’data:files/report.pdf’) await node.write_bytes(content)
- configure(source)[source]
Configure mount points from file or list.
- Parameters:
source (
str|list[dict[str,Any]]) – Either a path to YAML/JSON file, or a list of mount configs. Each config must have ‘name’ and ‘protocol’ keys.- Return type:
- Example configs:
{‘name’: ‘data’, ‘protocol’: ‘local’, ‘base_path’: ‘/data’} {‘name’: ‘s3’, ‘protocol’: ‘s3’, ‘bucket’: ‘my-bucket’}
- register(name, config)[source]
Register a single mount point.
- Parameters:
- Return type:
Examples
storage.register(‘data’, {‘protocol’: ‘local’, ‘base_path’: ‘/data’}) storage.register(‘data’, ‘/data’) # Shorthand for local storage.register(‘s3’, ‘s3://bucket/path’)
- node(mount_or_path, *parts)[source]
Create a StorageNode for a file or directory.
- Parameters:
- Return type:
- Returns:
StorageNode for the specified path (composed with EE mixin if available).
Examples
storage.node(’data:files/report.pdf’) storage.node(‘data’, ‘files’, ‘report.pdf’)
StorageNode: file/directory abstraction compatible with genro-storage API.
A StorageNode represents a file or directory in a storage backend. It provides a unified interface for file operations regardless of backend.
- class storage.node.StorageNode(manager, mount_name, path, config)[source]
Bases:
objectA file or directory in a storage backend.
Provides methods for file I/O that work with any backend (local, S3, etc.). API is compatible with genro-storage for future migration.
- property parent: StorageNode
Parent directory node.
- child(*parts)[source]
Get a child node by path components.
- Parameters:
*parts (
str) – Path components to append.- Return type:
- Returns:
New StorageNode for the child path.
- async delete()[source]
Delete file or directory.
- Return type:
- Returns:
True if deleted, False if not found.
Tools Package
Shared utilities used across packages.
Field-level encryption for sensitive database values.
Provides AES-256-GCM encryption for sensitive fields like passwords and API keys. The encryption key is loaded from environment variable or secrets file.
Key sources (in priority order): 1. MAIL_PROXY_ENCRYPTION_KEY environment variable (base64-encoded) 2. /run/secrets/encryption_key file (Docker/Kubernetes secrets) 3. {instance_dir}/.encryption_key file (development only)
- Usage:
from tools.encryption import encrypt_value, decrypt_value
# Encrypt before storing encrypted = encrypt_value(“my-secret-password”) # Returns: “ENC:base64-encoded-ciphertext”
# Decrypt after reading plaintext = decrypt_value(encrypted) # Returns: “my-secret-password”
- exception tools.encryption.EncryptionError[source]
Bases:
ExceptionRaised when encryption/decryption fails.
- exception tools.encryption.EncryptionKeyNotConfigured[source]
Bases:
EncryptionErrorRaised when encryption key is not available.
- tools.encryption.decrypt_value(encrypted)[source]
Decrypt a value encrypted with encrypt_value().
- Parameters:
encrypted (
str) – The encrypted value (with “ENC:” prefix).- Return type:
- Returns:
Decrypted plaintext.
- Raises:
EncryptionKeyNotConfigured – If no key is available.
EncryptionError – If decryption fails.
- tools.encryption.decrypt_value_with_key(encrypted, key)[source]
Decrypt a value using provided key.
Same as decrypt_value() but uses an explicit key instead of global. Used by Table for field decryption with proxy-provided key.
- tools.encryption.encrypt_value(plaintext)[source]
Encrypt a string value using AES-256-GCM.
- Parameters:
plaintext (
str) – The value to encrypt.- Returns:
“ prefix.
- Return type:
Encrypted value with “ENC
- Raises:
EncryptionKeyNotConfigured – If no key is available.
EncryptionError – If encryption fails.
- tools.encryption.encrypt_value_with_key(plaintext, key)[source]
Encrypt a string value using provided key.
Same as encrypt_value() but uses an explicit key instead of global. Used by Table for field encryption with proxy-provided key.
- tools.encryption.generate_key()[source]
Generate a new random encryption key.
- Return type:
- Returns:
Base64-encoded 32-byte key suitable for MAIL_PROXY_ENCRYPTION_KEY.
Prometheus metrics for monitoring the mail dispatcher.
This module defines the Prometheus counters and gauges used to track email
dispatch operations. All metrics use the gmp_ prefix (genro-mail-proxy).
- Metrics exposed:
gmp_sent_total: Counter of successfully sent emails per account.gmp_errors_total: Counter of send errors per account.gmp_deferred_total: Counter of deferred emails per account.gmp_rate_limited_total: Counter of rate limit hits per account.gmp_pending_messages: Gauge of messages currently in queue.
- All counters are labeled by:
tenant_id: Tenant identifiertenant_name: Human-readable tenant nameaccount_id: SMTP account identifieraccount_name: Human-readable account name (defaults to account_id)
Example
Accessing metrics via the REST API:
GET /metrics
Returns Prometheus text format suitable for scraping.
- class tools.prometheus.metrics.MailMetrics(registry=None)[source]
Bases:
objectPrometheus metrics collector for the mail dispatcher.
Encapsulates all Prometheus counters and gauges used to monitor email dispatch operations. Each metric is labeled by tenant and account info to enable per-tenant and per-account monitoring and alerting.
- registry
The Prometheus CollectorRegistry holding all metrics.
- sent
Counter tracking successfully sent emails.
- errors
Counter tracking permanent send failures.
- deferred
Counter tracking temporarily deferred messages.
- rate_limited
Counter tracking rate limit enforcement events.
- pending
Gauge showing current queue depth.
- Parameters:
registry (
CollectorRegistry|None)
- __init__(registry=None)[source]
Initialize metrics with an optional custom registry.
- Parameters:
registry (
CollectorRegistry|None) – Optional Prometheus CollectorRegistry. If not provided, a new registry is created. Use a custom registry for testing or when multiple metric sets are needed.
- inc_sent(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]
Increment the sent counter.
- inc_error(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]
Increment the error counter.
- inc_deferred(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]
Increment the deferred counter.
- inc_rate_limited(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]
Increment the rate-limited counter.
- init_account(tenant_id=None, tenant_name=None, account_id=None, account_name=None)[source]
Initialize all counters for an account with zero values.
This ensures metrics appear in Prometheus output even before any actual email activity occurs. Prometheus counters with labels only appear in output after being incremented, so this method explicitly creates the label combinations with initial value 0.
REPL protection utilities.
Provides decorators and wrappers to protect sensitive methods/attributes from being accessed in interactive REPL sessions.
- Usage:
from tools.repl import reserved, repl_wrap
- class MyService:
@reserved def get_secret_key(self):
return self._secret
- def public_method(self):
return “hello”
# In REPL setup: service = MyService() namespace = {“service”: repl_wrap(service)}
# Now in REPL: >>> service.public_method() # Works ‘hello’ >>> service.get_secret_key() # Blocked AttributeError: ‘get_secret_key’ is reserved and not accessible in REPL
- class tools.repl.REPLWrapper(wrapped)[source]
Bases:
objectWrapper that blocks access to @reserved methods/attributes.
This wrapper intercepts attribute access and raises AttributeError for any method marked with @reserved decorator.
- Parameters:
wrapped (
Any)
- tools.repl.repl_wrap(obj)[source]
Wrap an object for safe REPL access.
Returns a wrapper that blocks access to @reserved methods. The wrapper is transparent for all other operations.
- Parameters:
obj (
TypeVar(T)) – The object to wrap.- Return type:
TypeVar(T)- Returns:
A REPLWrapper that behaves like the original object but blocks @reserved methods.
- Usage:
# In REPL setup code: namespace = {
“proxy”: repl_wrap(proxy), “db”: repl_wrap(db),
} code.interact(local=namespace)