Source code for core.mail_proxy.entities.message.endpoint

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: Apache-2.0
"""Email message REST API endpoint.

This module provides the MessageEndpoint class exposing CRUD operations
for email messages via REST API and CLI commands.

The endpoint is designed for automatic introspection by api_base and
cli_base modules, which generate FastAPI routes and Typer commands
from method signatures.

Example:
    CLI commands auto-generated::

        mail-proxy messages add --tenant-id acme --id msg-001 --account-id main ...
        mail-proxy messages list --tenant-id acme
        mail-proxy messages get --message-id msg-001 --tenant-id acme
        mail-proxy messages delete --message-pk uuid-...
        mail-proxy messages add-batch --messages '[{...}, {...}]'
        mail-proxy messages cleanup --tenant-id acme

Note:
    Enterprise Edition (EE) extends this with MessageEndpoint_EE mixin
    adding PEC-specific status information.
"""

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING, Annotated, Any, Literal

from pydantic import BaseModel, ConfigDict, Field

from ...interface.endpoint_base import POST, BaseEndpoint

if TYPE_CHECKING:
    from .table import MessagesTable


[docs] class FetchMode(str, Enum): """Source mode for fetching email attachments. Attributes: ENDPOINT: Fetch from configured HTTP endpoint with path parameter. HTTP_URL: Fetch directly from a full HTTP/HTTPS URL. BASE64: Inline base64-encoded content. FILESYSTEM: Fetch from local filesystem path. """ ENDPOINT = "endpoint" HTTP_URL = "http_url" BASE64 = "base64" FILESYSTEM = "filesystem"
[docs] class MessageStatus(str, Enum): """Current delivery status of an email message. Attributes: PENDING: Queued and waiting for delivery attempt. DEFERRED: Temporarily delayed (rate limit or soft error). SENT: Successfully delivered to SMTP server. ERROR: Delivery failed with permanent error. """ PENDING = "pending" DEFERRED = "deferred" SENT = "sent" ERROR = "error"
[docs] class AttachmentPayload(BaseModel): """Email attachment specification. Attributes: filename: Attachment filename (may contain MD5 marker). storage_path: Content location. Format depends on fetch_mode: - endpoint: query params (e.g., "doc_id=123") - http_url: full URL (e.g., "https://files.example.com/file.pdf") - base64: base64-encoded content - filesystem: absolute path (e.g., "/var/attachments/file.pdf") mime_type: Optional MIME type override. fetch_mode: Explicit fetch mode. If not provided, inferred from path. content_md5: MD5 hash for cache lookup. auth: Optional authentication override for HTTP requests. Example: Attachment from HTTP URL:: AttachmentPayload( filename="report.pdf", storage_path="https://cdn.example.com/docs/report.pdf", fetch_mode=FetchMode.HTTP_URL, ) """ model_config = ConfigDict(extra="forbid") filename: Annotated[str, Field(min_length=1, max_length=255, description="Attachment filename")] storage_path: Annotated[str, Field(min_length=1, description="Storage path")] mime_type: Annotated[str | None, Field(default=None, description="MIME type override")] fetch_mode: Annotated[FetchMode | None, Field(default=None, description="Fetch mode")] content_md5: Annotated[ str | None, Field(default=None, pattern=r"^[a-fA-F0-9]{32}$", description="MD5 hash") ] auth: Annotated[dict[str, Any] | None, Field(default=None, description="Auth override")]
[docs] class MessageEndpoint(BaseEndpoint): """REST API endpoint for email message management. Provides CRUD operations for email messages including batch operations for bulk insert/delete and cleanup of old messages. Attributes: name: Endpoint name used in URL paths ("messages"). table: MessagesTable instance for database operations. Example: Using the endpoint programmatically:: endpoint = MessageEndpoint(db.table("messages")) # Add a single message msg = await endpoint.add( id="msg-001", tenant_id="acme", account_id="main", from_addr="sender@acme.com", to=["user@example.com"], subject="Test", body="Hello", ) # List messages messages = await endpoint.list(tenant_id="acme") """ name = "messages"
[docs] def __init__(self, table: MessagesTable): """Initialize endpoint with table reference. Args: table: MessagesTable instance for database operations. """ super().__init__(table)
[docs] @POST async def add( self, id: str, tenant_id: str, account_id: str, from_addr: str, to: list[str], subject: str, body: str, cc: list[str] | None = None, bcc: list[str] | None = None, reply_to: str | None = None, return_path: str | None = None, content_type: Literal["plain", "html"] = "plain", message_id: str | None = None, priority: int = 2, deferred_ts: int | None = None, batch_code: str | None = None, attachments: list[dict[str, Any]] | None = None, headers: dict[str, str] | None = None, ) -> dict: """Add a new message to the delivery queue. Args: id: Unique message identifier (client-provided). tenant_id: Tenant identifier. account_id: SMTP account to use for sending. from_addr: Sender email address. to: List of recipient addresses. subject: Email subject line. body: Email body content. cc: List of CC addresses. bcc: List of BCC addresses. reply_to: Reply-To address. return_path: Return-Path (envelope sender) address. content_type: Body content type ("plain" or "html"). message_id: Custom Message-ID header. priority: Delivery priority (0=immediate, 1=high, 2=medium, 3=low). deferred_ts: Unix timestamp to defer delivery until. batch_code: Batch/campaign identifier for grouping. attachments: List of attachment specifications. headers: Additional email headers. Returns: Dict with message id and pk. Raises: ValueError: If message could not be added. """ payload = { "from": from_addr, "to": to, "subject": subject, "body": body, "content_type": content_type, } if cc: payload["cc"] = cc if bcc: payload["bcc"] = bcc if reply_to: payload["reply_to"] = reply_to if return_path: payload["return_path"] = return_path if message_id: payload["message_id"] = message_id if attachments: payload["attachments"] = attachments if headers: payload["headers"] = headers entry = { "id": id, "tenant_id": tenant_id, "account_id": account_id, "priority": priority, "deferred_ts": deferred_ts, "batch_code": batch_code, "payload": payload, } result = await self.table.insert_batch([entry], tenant_id=tenant_id) if result: return result[0] raise ValueError(f"Failed to add message '{id}'")
[docs] async def get(self, message_id: str, tenant_id: str) -> dict: """Retrieve a single message by ID. Args: message_id: Client-provided message identifier. tenant_id: Tenant identifier. Returns: Message dict with status and payload. Raises: ValueError: If message not found. """ message = await self.table.get(message_id, tenant_id) if not message: raise ValueError(f"Message '{message_id}' not found") return self._add_status(message)
[docs] async def list( self, tenant_id: str, active_only: bool = False, include_history: bool = False, ) -> list[dict]: """List messages for a tenant. Args: tenant_id: Tenant to list messages for. active_only: Only return pending messages. include_history: Include event history for each message. Returns: List of message dicts with status info. """ messages = await self.table.list_all( tenant_id=tenant_id, active_only=active_only, include_history=include_history, ) return [self._add_status(m) for m in messages]
[docs] @POST async def delete(self, message_pk: str) -> bool: """Delete a message by internal primary key. Args: message_pk: Internal message UUID. Returns: True if deleted, False if not found. """ return await self.table.remove_by_pk(message_pk)
[docs] async def count_active(self) -> int: """Count messages awaiting delivery. Returns: Number of active (pending) messages. """ return await self.table.count_active()
[docs] async def count_pending_for_tenant( self, tenant_id: str, batch_code: str | None = None, ) -> int: """Count pending messages for a tenant. Args: tenant_id: Tenant identifier. batch_code: Optional batch code filter. Returns: Number of pending messages. """ return await self.table.count_pending_for_tenant(tenant_id, batch_code)
[docs] @POST async def add_batch( self, messages: list[dict[str, Any]], default_priority: int | None = None, ) -> dict: """Add multiple messages in a single operation. Validates each message and queues valid ones. Invalid messages are reported in the rejected list. Args: messages: List of message dicts. Each must have: - id: Unique message identifier - tenant_id: Tenant identifier - account_id: SMTP account to use - from (or from_addr): Sender address - to: Recipient(s) - subject: Email subject Optional: body, cc, bcc, reply_to, return_path, content_type, message_id, priority, deferred_ts, batch_code, attachments, headers. default_priority: Default priority for messages without explicit priority. Returns: Dict with: - ok: True - queued: Number of messages queued - rejected: List of {id, reason} for invalid messages Example: :: result = await endpoint.add_batch([ {"id": "m1", "tenant_id": "t1", "account_id": "a1", "from": "a@b.com", "to": ["c@d.com"], "subject": "Hi"}, {"id": "m2", ...}, ]) # Returns: {"ok": True, "queued": 2, "rejected": []} """ queued = 0 rejected: list[dict[str, str | None]] = [] entries = [] for msg in messages: msg_id = msg.get("id") if not msg_id: rejected.append({"id": None, "reason": "Missing 'id' field"}) continue tenant_id = msg.get("tenant_id") if not tenant_id: rejected.append({"id": msg_id, "reason": "Missing 'tenant_id' field"}) continue account_id = msg.get("account_id") if not account_id: rejected.append({"id": msg_id, "reason": "Missing 'account_id' field"}) continue from_addr = msg.get("from") or msg.get("from_addr") if not from_addr: rejected.append({"id": msg_id, "reason": "Missing 'from' field"}) continue to = msg.get("to") if not to: rejected.append({"id": msg_id, "reason": "Missing 'to' field"}) continue subject = msg.get("subject") if not subject: rejected.append({"id": msg_id, "reason": "Missing 'subject' field"}) continue payload: dict[str, Any] = { "from": from_addr, "to": to if isinstance(to, list) else [to], "subject": subject, "body": msg.get("body", ""), "content_type": msg.get("content_type", "plain"), } for field in ( "cc", "bcc", "reply_to", "return_path", "message_id", "attachments", "headers", ): if msg.get(field): payload[field] = msg[field] priority = msg.get("priority") if priority is None and default_priority is not None: priority = default_priority if priority is None: priority = 2 entries.append( { "id": msg_id, "tenant_id": tenant_id, "account_id": account_id, "priority": priority, "deferred_ts": msg.get("deferred_ts"), "batch_code": msg.get("batch_code"), "payload": payload, } ) if entries: result = await self.table.insert_batch(entries) queued = len(result) return {"ok": True, "queued": queued, "rejected": rejected}
[docs] @POST async def delete_batch( self, tenant_id: str, ids: list[str], ) -> dict: """Delete multiple messages by their IDs. Validates ownership before deletion. Args: tenant_id: Tenant identifier for authorization check. ids: List of message IDs to delete. Returns: Dict with: - ok: True - removed: Number of messages deleted - not_found: List of IDs that don't exist - unauthorized: List of IDs belonging to other tenants """ removed = 0 not_found: list[str] = [] unauthorized: list[str] = [] tenant_ids = await self.table.get_ids_for_tenant(ids, tenant_id) for msg_id in ids: if msg_id not in tenant_ids: existing = await self.table.existing_ids([msg_id]) if msg_id in existing: unauthorized.append(msg_id) else: not_found.append(msg_id) continue msg = await self.table.get(msg_id, tenant_id) if msg and await self.table.remove_by_pk(msg["pk"]): removed += 1 else: not_found.append(msg_id) return { "ok": True, "removed": removed, "not_found": not_found if not_found else None, "unauthorized": unauthorized if unauthorized else None, }
[docs] async def cleanup( self, tenant_id: str, older_than_seconds: int | None = None, ) -> dict: """Clean up fully reported messages older than retention period. Removes messages that have been delivered and reported to the client, freeing up database space. Args: tenant_id: Tenant identifier. older_than_seconds: Messages reported before (now - older_than_seconds) will be deleted. Defaults to 86400 (24 hours). Returns: Dict with ok=True and removed count. """ import time retention = older_than_seconds if older_than_seconds is not None else 86400 threshold_ts = int(time.time()) - retention removed = await self.table.remove_fully_reported_before_for_tenant(threshold_ts, tenant_id) return {"ok": True, "removed": removed}
def _add_status(self, message: dict) -> dict: """Add computed status field to message dict. Args: message: Message dict from database. Returns: Message dict with 'status' field added. """ if message.get("smtp_ts") is not None: if message.get("error"): message["status"] = MessageStatus.ERROR.value else: message["status"] = MessageStatus.SENT.value elif message.get("deferred_ts") is not None: message["status"] = MessageStatus.DEFERRED.value else: message["status"] = MessageStatus.PENDING.value return message
__all__ = [ "AttachmentPayload", "FetchMode", "MessageEndpoint", "MessageStatus", ]