Python Modules
Core orchestration logic for the asynchronous mail dispatcher.
- exception async_mail_service.core.AccountConfigurationError(message: str = 'Missing SMTP account configuration')[source]
Bases:
RuntimeErrorRaised when a message is missing the information required to resolve an SMTP account.
- class async_mail_service.core.AsyncMailCore(*, db_path: str | None = '/data/mail_service.db', logger=None, metrics: MailMetrics | None = None, start_active: bool = False, result_queue_size: int = 1000, message_queue_size: int = 10000, queue_put_timeout: float = 5.0, max_enqueue_batch: int = 1000, attachment_timeout: int = 30, client_sync_url: str | None = None, client_sync_user: str | None = None, client_sync_password: str | None = None, client_sync_token: str | None = None, default_priority: int | str = 2, report_delivery_callable: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, send_loop_interval: float = 0.5, report_retention_seconds: int | None = None, batch_size_per_account: int = 50, test_mode: bool = False, log_delivery_activity: bool = False, max_retries: int = 5, retry_delays: List[int] | None = None, imap_enabled: bool = False, imap_poll_interval: float = 60.0, imap_message_retention_seconds: int = 300, imap_attachment_retention_seconds: int = 600, s3_bucket: str | None = None, s3_region: str = 'eu-west-1', s3_access_key: str | None = None, s3_secret_key: str | None = None, s3_endpoint_url: str | None = None, s3_path_prefix: str = 'received/', attachment_inline_max_size: int = 524288)[source]
Bases:
objectCoordinate scheduling, rate limiting, persistence and delivery.
FastAPI application factory and HTTP schemas for the async mail service.
The module exposes a create_app function that builds the REST API used to
control the dispatcher and defines the pydantic payloads that document the
behaviour of each command. Authentication is enforced through a configurable
API token carried in the X-API-Token header.
- class async_mail_service.api.AccountInfo(*, id: str, host: str, port: int, user: str | None = None, ttl: int, limit_per_minute: int | None = None, limit_per_hour: int | None = None, limit_per_day: int | None = None, limit_behavior: str | None = None, use_tls: bool | None = None, batch_size: int | None = None, created_at: str | None = None)[source]
Bases:
BaseModelStored SMTP account as returned by
listAccounts.- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.AccountPayload(*, id: str, host: str, port: int, user: str | None = None, password: str | None = None, ttl: int | None = 300, limit_per_minute: int | None = None, limit_per_hour: int | None = None, limit_per_day: int | None = None, limit_behavior: str | None = 'defer', use_tls: bool | None = None, batch_size: int | None = None)[source]
Bases:
BaseModelSMTP account definition used when adding or updating accounts.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.AccountsResponse(*, ok: bool, error: str | None = None, accounts: List[AccountInfo])[source]
Bases:
CommandStatus- accounts: List[AccountInfo]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.AddMessagesResponse(*, ok: bool, error: str | None = None, queued: int = 0, rejected: ~typing.List[~async_mail_service.api.RejectedMessage] = <factory>)[source]
Bases:
CommandStatusResponse returned by the
addMessagescommand.- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- rejected: List[RejectedMessage]
- class async_mail_service.api.AttachmentPayload(*, filename: str | None = None, content: str | None = None, url: str | None = None, s3: Dict[str, Any] | None = None)[source]
Bases:
BaseModelDescription of an attachment supported by the dispatcher.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.BasicOkResponse(*, ok: bool, error: str | None = None)[source]
Bases:
CommandStatus- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.CleanupMessagesPayload(*, older_than_seconds: int | None = None)[source]
Bases:
BaseModelRequest payload for manual cleanup of reported messages.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.CleanupMessagesResponse(*, ok: bool, error: str | None = None, removed: int)[source]
Bases:
CommandStatusResponse from cleanup operation.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.CommandStatus(*, ok: bool, error: str | None = None)[source]
Bases:
BaseModelBase schema shared by most responses produced by the service.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.DeleteMessagesPayload(*, ids: ~typing.List[str] = <factory>)[source]
Bases:
BaseModel- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.DeleteMessagesResponse(*, ok: bool, error: str | None = None, removed: int, not_found: List[str] | None = None)[source]
Bases:
CommandStatus- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.EnqueueMessagesPayload(*, messages: List[MessagePayload], default_priority: int | Literal['immediate', 'high', 'medium', 'low'] | None = None)[source]
Bases:
BaseModelQueue of messages used by
addMessages.- messages: List[MessagePayload]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.MessagePayload(*, id: str, account_id: str | None = None, from_: str, to: List[str] | str, cc: List[str] | str | None = None, bcc: List[str] | str | None = None, reply_to: str | None = None, return_path: str | None = None, subject: str, body: str, content_type: str | None = 'plain', headers: Dict[str, Any] | None = None, message_id: str | None = None, attachments: List[AttachmentPayload] | None = None, priority: int | Literal['immediate', 'high', 'medium', 'low'] | None = None, deferred_ts: int | None = None)[source]
Bases:
BaseModelPayload accepted by the
addMessagescommand.- attachments: List[AttachmentPayload] | None
- model_config: ClassVar[ConfigDict] = {'populate_by_name': True, 'validate_by_alias': True, 'validate_by_name': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.MessageRecord(*, id: str, priority: int, account_id: str | None = None, deferred_ts: int | None = None, sent_ts: int | None = None, error_ts: int | None = None, error: str | None = None, reported_ts: int | None = None, created_at: str | None = None, updated_at: str | None = None, message: Dict[str, Any])[source]
Bases:
BaseModelFull representation of a message tracked by the dispatcher.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.MessagesResponse(*, ok: bool, error: str | None = None, messages: List[MessageRecord])[source]
Bases:
CommandStatus- messages: List[MessageRecord]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class async_mail_service.api.RejectedMessage(*, id: str | None = None, reason: str)[source]
Bases:
BaseModelRejected message entry.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- async_mail_service.api.create_app(svc: AsyncMailCore, api_token: str | None = None, lifespan: Callable[[FastAPI], AsyncContextManager] | None = None) FastAPI[source]
Create and configure the FastAPI application.
- Parameters:
svc – Instance of
async_mail_service.core.AsyncMailCorethat implements the business logic for each command.api_token – Optional secret used to protect every endpoint. When provided, the
X-API-Tokenheader must match this value on every request.lifespan – Optional lifespan context manager for startup/shutdown events.
- Returns:
A configured application ready to be served by Uvicorn or any ASGI server.
- Return type:
FastAPI
- async async_mail_service.api.require_token(api_token: str | None = Depends(APIKeyHeader)) None[source]
Validate the API token carried in the
X-API-Tokenheader.If a token has been configured through
create_app()and a request provides either a missing or different value, a401error is raised. When no token is configured the dependency is effectively bypassed.
SQLite backed persistence used by the mail dispatcher.
- class async_mail_service.persistence.Persistence(db_path: str = '/data/mail_service.db')[source]
Bases:
objectHelper class responsible for reading and writing service state.
- async add_account(acc: Dict[str, Any]) None[source]
Insert or overwrite an SMTP account definition.
- async cleanup_synced_received_messages(older_than_seconds: int) int[source]
Delete received messages synced more than X seconds ago.
- async count_sends_since(account_id: str, since_ts: int) int[source]
Count messages sent after
since_tsfor the given account.
- async delete_account(account_id: str) None[source]
Remove a previously stored SMTP account and related state.
- async existing_message_ids(ids: Iterable[str]) set[str][source]
Return the subset of ids that already exist in storage.
- async fetch_ready_messages(*, limit: int, now_ts: int) List[Dict[str, Any]][source]
Return messages eligible for SMTP dispatch.
- async fetch_received_messages(limit: int) List[Dict[str, Any]][source]
Return received messages that need to be synced to client.
- async fetch_reports(limit: int) List[Dict[str, Any]][source]
Return messages that need to be reported back to the client.
Only returns messages in final states (sent or error). Messages with only deferred_ts are not reported (internal retry logic).
- async get_account(account_id: str) Dict[str, Any][source]
Fetch a single SMTP account or raise if it does not exist.
- async get_imap_last_uid(account_id: str) int | None[source]
Get the last processed UID for an IMAP account.
- async insert_messages(entries: Sequence[Dict[str, Any]]) List[str][source]
Persist a batch of messages, returning the ids that were stored.
If a message with the same id already exists but has NOT been sent (sent_ts IS NULL), it will be replaced with the new data. This allows clients to correct errors or retry with different parameters. Messages that have been sent are never replaced.
- async insert_received_message(message: Dict[str, Any]) None[source]
Store a received message in the buffer.
- async list_messages(*, active_only: bool = False) List[Dict[str, Any]][source]
Return messages for inspection purposes.
- async list_receive_accounts() List[Dict[str, Any]][source]
Return all accounts configured for IMAP receiving.
- async log_send(account_id: str, timestamp: int) None[source]
Record a delivery event for rate limiting purposes.
- async mark_error(msg_id: str, error_ts: int, error: str) None[source]
Mark a message as failed.
Resets reported_ts and deferred_ts so the message will be reported with final error state.
- async mark_received_synced(message_ids: Iterable[str], synced_ts: int) None[source]
Mark received messages as synced to client.
- async mark_reported(message_ids: Iterable[str], reported_ts: int) None[source]
Set the reported timestamp for the provided messages.
- async mark_sent(msg_id: str, sent_ts: int) None[source]
Mark a message as sent.
Resets reported_ts so the message will be reported with final state.
- async purge_messages_for_account(account_id: str) None[source]
Delete every message linked to the given account.
- async remove_reported_before(threshold_ts: int) int[source]
Delete reported messages older than
threshold_ts.Only deletes messages in final states (sent or error). Messages with only deferred_ts are kept in queue until they reach a final state.
- async set_deferred(msg_id: str, deferred_ts: int) None[source]
Update the deferred timestamp for a message.
Lightweight asyncio-friendly SMTP connection pool.
- class async_mail_service.smtp_pool.SMTPPool(ttl: int = 300)[source]
Bases:
objectReuse SMTP connections per task to reduce connection overhead.
Rate limiter that relies on persisted send logs.
- class async_mail_service.rate_limit.RateLimiter(persistence: Persistence)[source]
Bases:
objectSimple sliding-window limiter built on top of
Persistence.
Transport helpers to fetch pending messages and push delivery reports.
- class async_mail_service.fetcher.Fetcher(fetch_url: str | None = None, fetch_callable: Callable[[], Awaitable[List[Dict[str, Any]]]] | None = None, report_callable: Callable[[Dict[str, Any]], Awaitable[None]] | None = None)[source]
Bases:
objectRetrieve messages and propagate delivery results.
Prometheus metrics exposed by the mail dispatcher.
- class async_mail_service.prometheus.MailMetrics(registry: CollectorRegistry | None = None)[source]
Bases:
objectWrapper around the Prometheus registry used by the service.
Attachment helpers that normalise multiple input sources.