Python Modules

Core orchestration logic for the asynchronous mail dispatcher.

exception async_mail_service.core.AccountConfigurationError(message: str = 'Missing SMTP account configuration')[source]

Bases: RuntimeError

Raised when a message is missing the information required to resolve an SMTP account.

class async_mail_service.core.AsyncMailCore(*, db_path: str | None = '/data/mail_service.db', logger=None, metrics: MailMetrics | None = None, start_active: bool = False, result_queue_size: int = 1000, message_queue_size: int = 10000, queue_put_timeout: float = 5.0, max_enqueue_batch: int = 1000, attachment_timeout: int = 30, client_sync_url: str | None = None, client_sync_user: str | None = None, client_sync_password: str | None = None, client_sync_token: str | None = None, default_priority: int | str = 2, report_delivery_callable: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, send_loop_interval: float = 0.5, report_retention_seconds: int | None = None, batch_size_per_account: int = 50, test_mode: bool = False, log_delivery_activity: bool = False, max_retries: int = 5, retry_delays: List[int] | None = None, imap_enabled: bool = False, imap_poll_interval: float = 60.0, imap_message_retention_seconds: int = 300, imap_attachment_retention_seconds: int = 600, s3_bucket: str | None = None, s3_region: str = 'eu-west-1', s3_access_key: str | None = None, s3_secret_key: str | None = None, s3_endpoint_url: str | None = None, s3_path_prefix: str = 'received/', attachment_inline_max_size: int = 524288)[source]

Bases: object

Coordinate scheduling, rate limiting, persistence and delivery.

async handle_command(cmd: str, payload: Dict[str, Any] | None = None) Dict[str, Any][source]

Execute one of the external control commands.

async init() None[source]

Initialise persistence.

async results()[source]

Yield delivery events to API consumers.

async start() None[source]

Start the background scheduler and maintenance tasks.

async stop() None[source]

Stop the background tasks gracefully.

FastAPI application factory and HTTP schemas for the async mail service.

The module exposes a create_app function that builds the REST API used to control the dispatcher and defines the pydantic payloads that document the behaviour of each command. Authentication is enforced through a configurable API token carried in the X-API-Token header.

class async_mail_service.api.AccountInfo(*, id: str, host: str, port: int, user: str | None = None, ttl: int, limit_per_minute: int | None = None, limit_per_hour: int | None = None, limit_per_day: int | None = None, limit_behavior: str | None = None, use_tls: bool | None = None, batch_size: int | None = None, created_at: str | None = None)[source]

Bases: BaseModel

Stored SMTP account as returned by listAccounts.

batch_size: int | None
created_at: str | None
host: str
id: str
limit_behavior: str | None
limit_per_day: int | None
limit_per_hour: int | None
limit_per_minute: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

port: int
ttl: int
use_tls: bool | None
user: str | None
class async_mail_service.api.AccountPayload(*, id: str, host: str, port: int, user: str | None = None, password: str | None = None, ttl: int | None = 300, limit_per_minute: int | None = None, limit_per_hour: int | None = None, limit_per_day: int | None = None, limit_behavior: str | None = 'defer', use_tls: bool | None = None, batch_size: int | None = None)[source]

Bases: BaseModel

SMTP account definition used when adding or updating accounts.

batch_size: int | None
host: str
id: str
limit_behavior: str | None
limit_per_day: int | None
limit_per_hour: int | None
limit_per_minute: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

password: str | None
port: int
ttl: int | None
use_tls: bool | None
user: str | None
class async_mail_service.api.AccountsResponse(*, ok: bool, error: str | None = None, accounts: List[AccountInfo])[source]

Bases: CommandStatus

accounts: List[AccountInfo]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class async_mail_service.api.AddMessagesResponse(*, ok: bool, error: str | None = None, queued: int = 0, rejected: ~typing.List[~async_mail_service.api.RejectedMessage] = <factory>)[source]

Bases: CommandStatus

Response returned by the addMessages command.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

queued: int
rejected: List[RejectedMessage]
class async_mail_service.api.AttachmentPayload(*, filename: str | None = None, content: str | None = None, url: str | None = None, s3: Dict[str, Any] | None = None)[source]

Bases: BaseModel

Description of an attachment supported by the dispatcher.

content: str | None
filename: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

s3: Dict[str, Any] | None
url: str | None
class async_mail_service.api.BasicOkResponse(*, ok: bool, error: str | None = None)[source]

Bases: CommandStatus

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class async_mail_service.api.CleanupMessagesPayload(*, older_than_seconds: int | None = None)[source]

Bases: BaseModel

Request payload for manual cleanup of reported messages.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

older_than_seconds: int | None
class async_mail_service.api.CleanupMessagesResponse(*, ok: bool, error: str | None = None, removed: int)[source]

Bases: CommandStatus

Response from cleanup operation.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

removed: int
class async_mail_service.api.CommandStatus(*, ok: bool, error: str | None = None)[source]

Bases: BaseModel

Base schema shared by most responses produced by the service.

error: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

ok: bool
class async_mail_service.api.DeleteMessagesPayload(*, ids: ~typing.List[str] = <factory>)[source]

Bases: BaseModel

ids: List[str]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class async_mail_service.api.DeleteMessagesResponse(*, ok: bool, error: str | None = None, removed: int, not_found: List[str] | None = None)[source]

Bases: CommandStatus

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

not_found: List[str] | None
removed: int
class async_mail_service.api.EnqueueMessagesPayload(*, messages: List[MessagePayload], default_priority: int | Literal['immediate', 'high', 'medium', 'low'] | None = None)[source]

Bases: BaseModel

Queue of messages used by addMessages.

default_priority: int | Literal['immediate', 'high', 'medium', 'low'] | None
messages: List[MessagePayload]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class async_mail_service.api.MessagePayload(*, id: str, account_id: str | None = None, from_: str, to: List[str] | str, cc: List[str] | str | None = None, bcc: List[str] | str | None = None, reply_to: str | None = None, return_path: str | None = None, subject: str, body: str, content_type: str | None = 'plain', headers: Dict[str, Any] | None = None, message_id: str | None = None, attachments: List[AttachmentPayload] | None = None, priority: int | Literal['immediate', 'high', 'medium', 'low'] | None = None, deferred_ts: int | None = None)[source]

Bases: BaseModel

Payload accepted by the addMessages command.

account_id: str | None
attachments: List[AttachmentPayload] | None
bcc: List[str] | str | None
body: str
cc: List[str] | str | None
content_type: str | None
deferred_ts: int | None
from_: str
headers: Dict[str, Any] | None
id: str
message_id: str | None
model_config: ClassVar[ConfigDict] = {'populate_by_name': True, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

priority: int | Literal['immediate', 'high', 'medium', 'low'] | None
reply_to: str | None
return_path: str | None
subject: str
to: List[str] | str
class async_mail_service.api.MessageRecord(*, id: str, priority: int, account_id: str | None = None, deferred_ts: int | None = None, sent_ts: int | None = None, error_ts: int | None = None, error: str | None = None, reported_ts: int | None = None, created_at: str | None = None, updated_at: str | None = None, message: Dict[str, Any])[source]

Bases: BaseModel

Full representation of a message tracked by the dispatcher.

account_id: str | None
created_at: str | None
deferred_ts: int | None
error: str | None
error_ts: int | None
id: str
message: Dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

priority: int
reported_ts: int | None
sent_ts: int | None
updated_at: str | None
class async_mail_service.api.MessagesResponse(*, ok: bool, error: str | None = None, messages: List[MessageRecord])[source]

Bases: CommandStatus

messages: List[MessageRecord]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class async_mail_service.api.RejectedMessage(*, id: str | None = None, reason: str)[source]

Bases: BaseModel

Rejected message entry.

id: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

reason: str
async_mail_service.api.create_app(svc: AsyncMailCore, api_token: str | None = None, lifespan: Callable[[FastAPI], AsyncContextManager] | None = None) FastAPI[source]

Create and configure the FastAPI application.

Parameters:
  • svc – Instance of async_mail_service.core.AsyncMailCore that implements the business logic for each command.

  • api_token – Optional secret used to protect every endpoint. When provided, the X-API-Token header must match this value on every request.

  • lifespan – Optional lifespan context manager for startup/shutdown events.

Returns:

A configured application ready to be served by Uvicorn or any ASGI server.

Return type:

FastAPI

async async_mail_service.api.require_token(api_token: str | None = Depends(APIKeyHeader)) None[source]

Validate the API token carried in the X-API-Token header.

If a token has been configured through create_app() and a request provides either a missing or different value, a 401 error is raised. When no token is configured the dependency is effectively bypassed.

SQLite backed persistence used by the mail dispatcher.

class async_mail_service.persistence.Persistence(db_path: str = '/data/mail_service.db')[source]

Bases: object

Helper class responsible for reading and writing service state.

async add_account(acc: Dict[str, Any]) None[source]

Insert or overwrite an SMTP account definition.

async cleanup_synced_received_messages(older_than_seconds: int) int[source]

Delete received messages synced more than X seconds ago.

async clear_deferred(msg_id: str) None[source]

Clear the deferred timestamp for a message.

async count_active_messages() int[source]

Return the number of messages still awaiting delivery.

async count_sends_since(account_id: str, since_ts: int) int[source]

Count messages sent after since_ts for the given account.

async delete_account(account_id: str) None[source]

Remove a previously stored SMTP account and related state.

async delete_message(msg_id: str) bool[source]

Remove a message regardless of its state.

async existing_message_ids(ids: Iterable[str]) set[str][source]

Return the subset of ids that already exist in storage.

async fetch_ready_messages(*, limit: int, now_ts: int) List[Dict[str, Any]][source]

Return messages eligible for SMTP dispatch.

async fetch_received_messages(limit: int) List[Dict[str, Any]][source]

Return received messages that need to be synced to client.

async fetch_reports(limit: int) List[Dict[str, Any]][source]

Return messages that need to be reported back to the client.

Only returns messages in final states (sent or error). Messages with only deferred_ts are not reported (internal retry logic).

async get_account(account_id: str) Dict[str, Any][source]

Fetch a single SMTP account or raise if it does not exist.

async get_imap_last_uid(account_id: str) int | None[source]

Get the last processed UID for an IMAP account.

async init_db() None[source]

Create (or migrate) the database schema.

async insert_messages(entries: Sequence[Dict[str, Any]]) List[str][source]

Persist a batch of messages, returning the ids that were stored.

If a message with the same id already exists but has NOT been sent (sent_ts IS NULL), it will be replaced with the new data. This allows clients to correct errors or retry with different parameters. Messages that have been sent are never replaced.

async insert_received_message(message: Dict[str, Any]) None[source]

Store a received message in the buffer.

async list_accounts() List[Dict[str, Any]][source]

Return all known SMTP accounts.

async list_messages(*, active_only: bool = False) List[Dict[str, Any]][source]

Return messages for inspection purposes.

async list_receive_accounts() List[Dict[str, Any]][source]

Return all accounts configured for IMAP receiving.

async log_send(account_id: str, timestamp: int) None[source]

Record a delivery event for rate limiting purposes.

async mark_error(msg_id: str, error_ts: int, error: str) None[source]

Mark a message as failed.

Resets reported_ts and deferred_ts so the message will be reported with final error state.

async mark_received_synced(message_ids: Iterable[str], synced_ts: int) None[source]

Mark received messages as synced to client.

async mark_reported(message_ids: Iterable[str], reported_ts: int) None[source]

Set the reported timestamp for the provided messages.

async mark_sent(msg_id: str, sent_ts: int) None[source]

Mark a message as sent.

Resets reported_ts so the message will be reported with final state.

async purge_messages_for_account(account_id: str) None[source]

Delete every message linked to the given account.

async remove_reported_before(threshold_ts: int) int[source]

Delete reported messages older than threshold_ts.

Only deletes messages in final states (sent or error). Messages with only deferred_ts are kept in queue until they reach a final state.

async set_deferred(msg_id: str, deferred_ts: int) None[source]

Update the deferred timestamp for a message.

async update_imap_last_uid(account_id: str, last_uid: int) None[source]

Update the last processed UID for an IMAP account.

async update_message_payload(msg_id: str, payload: Dict[str, Any]) None[source]

Update the payload field of a message (used for retry count tracking).

Lightweight asyncio-friendly SMTP connection pool.

class async_mail_service.smtp_pool.SMTPPool(ttl: int = 300)[source]

Bases: object

Reuse SMTP connections per task to reduce connection overhead.

async cleanup() None[source]

Close idle or broken connections still registered in the pool.

async get_connection(host: str, port: int, user: str | None, password: str | None, *, use_tls: bool) SMTP[source]

Return a pooled connection bound to the calling task.

Rate limiter that relies on persisted send logs.

class async_mail_service.rate_limit.RateLimiter(persistence: Persistence)[source]

Bases: object

Simple sliding-window limiter built on top of Persistence.

async check_and_plan(account: Dict[str, Any]) int | None[source]

Return a timestamp until which the message must be deferred.

async log_send(account_id: str) None[source]

Persist the fact that a message has been sent right now.

Transport helpers to fetch pending messages and push delivery reports.

class async_mail_service.fetcher.Fetcher(fetch_url: str | None = None, fetch_callable: Callable[[], Awaitable[List[Dict[str, Any]]]] | None = None, report_callable: Callable[[Dict[str, Any]], Awaitable[None]] | None = None)[source]

Bases: object

Retrieve messages and propagate delivery results.

async fetch_messages() List[Dict[str, Any]][source]

Return pending messages from the upstream Genropy service.

async report_delivery(payload: Dict[str, Any]) None[source]

Send a delivery report back to the upstream Genropy service.

Prometheus metrics exposed by the mail dispatcher.

class async_mail_service.prometheus.MailMetrics(registry: CollectorRegistry | None = None)[source]

Bases: object

Wrapper around the Prometheus registry used by the service.

generate_latest() bytes[source]

Return the latest metrics snapshot in Prometheus text format.

inc_deferred(account_id: str)[source]

Increase the deferred counter for the given account.

inc_error(account_id: str)[source]

Increase the errors counter for the given account.

inc_rate_limited(account_id: str)[source]

Increase the rate_limited counter for the given account.

inc_sent(account_id: str)[source]

Increase the sent counter for the given account.

set_pending(value: int)[source]

Update the gauge tracking pending messages.

Attachment helpers that normalise multiple input sources.

class async_mail_service.attachments.AttachmentManager[source]

Bases: object

Collect attachment fetchers and expose a unified interface.

async fetch(att: Dict[str, Any]) bytes | None[source]

Return the attachment payload or None if not available.

static guess_mime(filename: str) Tuple[str, str][source]

Guess the MIME type for the given filename.