Source code for core.mail_proxy.proxy_base

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: Apache-2.0
"""Base class for MailProxy: database, tables, endpoints, and interface factories.

MailProxyBase is the foundation layer of genro-mail-proxy, providing:

1. Configuration: ProxyConfig instance at self.config
2. Database: SqlDb at self.db with autodiscovered Table classes
3. Endpoints: Registry at self.endpoints with autodiscovered Endpoint classes
4. Interfaces: Lazy `api` (FastAPI) and `cli` (Click) properties

Class Hierarchy:
    MailProxyBase (this class)
        └── MailProxy (proxy.py): adds SMTP sender, background loops, metrics
            └── MailProxy with MailProxy_EE mixin: adds bounce detection (EE)

Discovery Mechanism:
    Tables from `core.mail_proxy.entities.*/table.py` are composed with
    optional EE mixins from `enterprise.mail_proxy.entities.*/table_ee.py`.
    Endpoints follow the same pattern with `endpoint.py` and `endpoint_ee.py`.

    Discovered entities (CE):
        - tenants: Multi-tenant isolation
        - accounts: SMTP account configuration
        - messages: Message queue with priority
        - message_events: Delivery event history
        - command_log: API audit trail
        - instance: Service-level configuration

    EE extensions (when enterprise package installed):
        - accounts: +PEC (certified email) fields
        - tenants: +API key management
        - instance: +Bounce detection config

Usage (testing without runtime):
    proxy = MailProxyBase(db_path=":memory:")
    await proxy.init()
    await proxy.db.table("tenants").add({"id": "t1", "name": "Test"})

Usage (production via proxy.api):
    proxy = MailProxy(config=ProxyConfig(db_path="/data/mail.db"))
    app = proxy.api  # FastAPI app with auto-start/stop lifespan
"""

from __future__ import annotations

import importlib
import logging
import pkgutil
from typing import TYPE_CHECKING, Any

from sql import SqlDb

from .interface import BaseEndpoint
from .proxy_config import ProxyConfig

if TYPE_CHECKING:
    import click
    from fastapi import FastAPI

# Packages to scan for entities
_CE_ENTITIES_PACKAGE = "core.mail_proxy.entities"
_EE_ENTITIES_PACKAGE = "enterprise.mail_proxy.entities"


[docs] class MailProxyBase: """Foundation layer: config, database, tables, endpoints, interface factories. Attributes: config: ProxyConfig instance with all configuration db: SqlDb with autodiscovered Table classes (CE + EE mixins) endpoints: Dict of Endpoint instances keyed by name Properties: api: FastAPI app (lazy, created on first access) cli: Click CLI group (lazy, created on first access) Subclassed by MailProxy which adds SMTP sender, background loops, etc. """
[docs] def __init__(self, config: ProxyConfig | None = None): """Initialize base proxy with config and database. Args: config: ProxyConfig instance. If None, creates default. """ self.config = config or ProxyConfig() self._encryption_key: bytes | None = None self._load_encryption_key() self.db = SqlDb(self.config.db_path or ":memory:", parent=self) self._discover_tables() self.endpoints: dict[str, BaseEndpoint] = {} self._discover_endpoints()
def _load_encryption_key(self) -> None: """Load encryption key from environment or secrets file. Sources (in priority order): 1. MAIL_PROXY_ENCRYPTION_KEY env var (base64-encoded 32 bytes) 2. /run/secrets/encryption_key file (Docker/K8s secrets) If no key is configured, encryption is disabled (fields stored as plaintext). """ import base64 import os from pathlib import Path # 1. Environment variable key_b64 = os.environ.get("MAIL_PROXY_ENCRYPTION_KEY") if key_b64: try: key = base64.b64decode(key_b64) if len(key) == 32: self._encryption_key = key return except Exception: pass # 2. Secrets file secrets_path = Path("/run/secrets/encryption_key") if secrets_path.exists(): try: key = secrets_path.read_bytes().strip() if len(key) == 32: self._encryption_key = key return except Exception: pass @property def encryption_key(self) -> bytes | None: """Encryption key for database field encryption. None if not configured.""" return self._encryption_key
[docs] def set_encryption_key(self, key: bytes) -> None: """Set encryption key programmatically (for testing).""" if len(key) != 32: raise ValueError("Encryption key must be 32 bytes") self._encryption_key = key
def _discover_tables(self) -> None: """Autodiscover Table classes from entities/ and compose with EE mixins.""" ce_modules = self._find_entity_modules(_CE_ENTITIES_PACKAGE, "table") ee_modules = self._find_entity_modules(_EE_ENTITIES_PACKAGE, "table_ee") for entity_name, ce_module in ce_modules.items(): ce_class = self._get_class_from_module(ce_module, "Table") if not ce_class: continue ee_module = ee_modules.get(entity_name) if ee_module: ee_mixin = self._get_ee_mixin_from_module(ee_module, "_EE") if ee_mixin: composed_class = type( ce_class.__name__, (ee_mixin, ce_class), {"__module__": ce_class.__module__} ) self.db.add_table(composed_class) continue self.db.add_table(ce_class) def _discover_endpoints(self) -> None: """Autodiscover Endpoint classes and compose with EE mixins.""" for endpoint_class in BaseEndpoint.discover(): table = self.db.table(endpoint_class.name) # InstanceEndpoint needs proxy reference, others just need table if endpoint_class.name == "instance": self.endpoints[endpoint_class.name] = endpoint_class(table, proxy=self) else: self.endpoints[endpoint_class.name] = endpoint_class(table)
[docs] def endpoint(self, name: str) -> BaseEndpoint: """Get endpoint by name.""" if name not in self.endpoints: raise ValueError(f"Endpoint '{name}' not found") return self.endpoints[name]
[docs] async def init(self) -> None: """Initialize database: connect, create tables, run migrations, detect edition.""" await self.db.connect() await self.db.check_structure() logger = logging.getLogger("mail_proxy") # Run legacy schema migrations accounts = self.db.table("accounts") if await accounts.migrate_from_legacy_schema(): logger.info("Migrated accounts table from legacy schema") messages = self.db.table("messages") if await messages.migrate_from_legacy_schema(): logger.info("Migrated messages table from legacy schema") # Sync schema for all tables await self.db.table("tenants").sync_schema() await self.db.table("accounts").sync_schema() await self.db.table("messages").sync_schema() await self.db.table("message_events").sync_schema() await self.db.table("command_log").sync_schema() await self.db.table("instance").sync_schema() # Populate account_pk for existing messages if await messages.migrate_account_pk(): logger.info("Migrated messages table: populated account_pk") # Edition detection and default tenant creation await self._init_edition()
async def _init_edition(self) -> None: """Detect CE/EE mode based on existing data and installed modules.""" import core.mail_proxy tenants_table = self.db.table("tenants") instance_table = self.db.table("instance") tenants = await tenants_table.list_all() count = len(tenants) if count == 0: if core.mail_proxy.HAS_ENTERPRISE: await instance_table.set_edition("ee") else: await tenants_table.ensure_default() await instance_table.set_edition("ce") elif count > 1 or (count == 1 and tenants[0]["id"] != "default"): await instance_table.set_edition("ee")
[docs] async def close(self) -> None: """Close database connection.""" await self.db.close()
# ------------------------------------------------------------------------- # Discovery helpers (private) # ------------------------------------------------------------------------- def _find_entity_modules(self, base_package: str, module_name: str) -> dict[str, Any]: """Scan package for entity subpackages containing module_name.""" result: dict[str, Any] = {} try: package = importlib.import_module(base_package) except ImportError: return result package_path = getattr(package, "__path__", None) if not package_path: return result for _, name, is_pkg in pkgutil.iter_modules(package_path): if not is_pkg: continue full_module_name = f"{base_package}.{name}.{module_name}" try: module = importlib.import_module(full_module_name) result[name] = module except ImportError: pass return result def _get_class_from_module(self, module: Any, class_suffix: str) -> type | None: """Extract CE Table/Endpoint class by suffix (excludes _EE mixins).""" for attr_name in dir(module): if attr_name.startswith("_"): continue obj = getattr(module, attr_name) if isinstance(obj, type) and attr_name.endswith(class_suffix): if "_EE" in attr_name or "Mixin" in attr_name: continue if attr_name == "Table": continue if not hasattr(obj, "name"): continue return obj return None def _get_ee_mixin_from_module(self, module: Any, class_suffix: str) -> type | None: """Extract EE mixin class (suffix _EE) for composition with CE class.""" for name in dir(module): if name.startswith("_"): continue obj = getattr(module, name) if isinstance(obj, type) and name.endswith(class_suffix): return obj return None # ------------------------------------------------------------------------- # Interface factories (lazy properties) # ------------------------------------------------------------------------- @property def api(self) -> FastAPI: """FastAPI app with all endpoints, auth middleware, and lifespan. Created on first access. Includes default lifespan that calls proxy.start() on startup and proxy.stop() on shutdown. Usage: uvicorn core.mail_proxy.server:app """ if not hasattr(self, "_api") or self._api is None: from .interface import create_app self._api = create_app(self, api_token=self.config.api_token) return self._api @property def cli(self) -> click.Group: """Click CLI group with endpoint commands and service commands. Created on first access. Includes: - Endpoint commands: tenants, accounts, messages, instance, command_log - Service commands: serve, connect, stats, token, run-now Usage: mail-proxy --help """ if not hasattr(self, "_cli") or self._cli is None: self._cli = self._create_cli() return self._cli def _create_cli(self) -> click.Group: """Build Click CLI: endpoint commands + service commands (serve, etc.).""" import click from .interface import ( add_connect_command, add_current_command, add_list_command, add_restart_command, add_run_now_command, add_serve_command, add_stats_command, add_stop_command, add_token_command, add_use_command, register_cli_endpoint, ) @click.group() @click.version_option() def cli() -> None: """Mail-Proxy: Asynchronous email dispatch service.""" pass # Register endpoint-based commands (tenants, accounts, messages, instance) for endpoint in self.endpoints.values(): register_cli_endpoint(cli, endpoint) # Add special commands def get_url() -> str: return f"http://localhost:{self.config.port}" def get_token() -> str | None: return self.config.api_token add_connect_command(cli, get_url, get_token, self.config.instance_name) add_stats_command(cli, self.db) add_token_command(cli, self.db) add_run_now_command(cli, get_url, get_token) # Instance management commands add_serve_command(cli) add_list_command(cli) add_stop_command(cli) add_restart_command(cli) # Context management commands add_use_command(cli) add_current_command(cli) return cli
__all__ = ["MailProxyBase"]