Source code for async_mail_service.api

"""
FastAPI application factory and HTTP schemas for the async mail service.

The module exposes a `create_app` function that builds the REST API used to
control the dispatcher and defines the pydantic payloads that document the
behaviour of each command.  Authentication is enforced through a configurable
API token carried in the ``X-API-Token`` header.
"""

from typing import Optional, Dict, Any, List, Literal, Union, Callable, AsyncContextManager
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException, APIRouter, Depends, status
from fastapi.responses import Response
from fastapi.security import APIKeyHeader
from pydantic import BaseModel, Field, ConfigDict

from .core import AsyncMailCore

app = FastAPI(title="Async Mail Service")
service: AsyncMailCore | None = None
API_TOKEN_HEADER_NAME = "X-API-Token"
api_key_scheme = APIKeyHeader(name=API_TOKEN_HEADER_NAME, auto_error=False)
app.state.api_token = None

[docs] async def require_token(api_token: str | None = Depends(api_key_scheme)) -> None: """Validate the API token carried in the ``X-API-Token`` header. If a token has been configured through :func:`create_app` and a request provides either a missing or different value, a ``401`` error is raised. When no token is configured the dependency is effectively bypassed. """ expected = getattr(app.state, "api_token", None) if expected is None: return if not api_token or api_token != expected: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or missing API token")
auth_dependency = Depends(require_token)
[docs] class AccountPayload(BaseModel): """SMTP account definition used when adding or updating accounts.""" id: str host: str port: int user: Optional[str] = None password: Optional[str] = None ttl: Optional[int] = 300 limit_per_minute: Optional[int] = None limit_per_hour: Optional[int] = None limit_per_day: Optional[int] = None limit_behavior: Optional[str] = "defer" use_tls: Optional[bool] = None batch_size: Optional[int] = None
[docs] class CommandStatus(BaseModel): """Base schema shared by most responses produced by the service.""" ok: bool error: Optional[str] = None
[docs] class BasicOkResponse(CommandStatus): pass
[docs] class AttachmentPayload(BaseModel): """Description of an attachment supported by the dispatcher.""" filename: Optional[str] = None content: Optional[str] = None url: Optional[str] = None s3: Optional[Dict[str, Any]] = None
[docs] class MessagePayload(BaseModel): """Payload accepted by the ``addMessages`` command.""" model_config = ConfigDict(populate_by_name=True) id: str account_id: Optional[str] = None from_: str = Field(alias="from") to: Union[List[str], str] cc: Optional[Union[List[str], str]] = None bcc: Optional[Union[List[str], str]] = None reply_to: Optional[str] = None return_path: Optional[str] = None subject: str body: str content_type: Optional[str] = Field(default="plain") headers: Optional[Dict[str, Any]] = None message_id: Optional[str] = None attachments: Optional[List[AttachmentPayload]] = None priority: Optional[Union[int, Literal["immediate", "high", "medium", "low"]]] = None deferred_ts: Optional[int] = None
[docs] class AccountInfo(BaseModel): """Stored SMTP account as returned by ``listAccounts``.""" id: str host: str port: int user: Optional[str] = None ttl: int limit_per_minute: Optional[int] = None limit_per_hour: Optional[int] = None limit_per_day: Optional[int] = None limit_behavior: Optional[str] = None use_tls: Optional[bool] = None batch_size: Optional[int] = None created_at: Optional[str] = None
[docs] class AccountsResponse(CommandStatus): accounts: List[AccountInfo]
[docs] class EnqueueMessagesPayload(BaseModel): """Queue of messages used by ``addMessages``.""" messages: List[MessagePayload] default_priority: Optional[Union[int, Literal["immediate", "high", "medium", "low"]]] = None
[docs] class RejectedMessage(BaseModel): """Rejected message entry.""" id: Optional[str] = None reason: str
[docs] class AddMessagesResponse(CommandStatus): """Response returned by the ``addMessages`` command.""" queued: int = 0 rejected: List[RejectedMessage] = Field(default_factory=list)
[docs] class MessageRecord(BaseModel): """Full representation of a message tracked by the dispatcher.""" id: str priority: int account_id: Optional[str] = None deferred_ts: Optional[int] = None sent_ts: Optional[int] = None error_ts: Optional[int] = None error: Optional[str] = None reported_ts: Optional[int] = None created_at: Optional[str] = None updated_at: Optional[str] = None message: Dict[str, Any]
[docs] class MessagesResponse(CommandStatus): messages: List[MessageRecord]
[docs] class DeleteMessagesPayload(BaseModel): ids: List[str] = Field(default_factory=list)
[docs] class DeleteMessagesResponse(CommandStatus): removed: int not_found: Optional[List[str]] = None
[docs] class CleanupMessagesPayload(BaseModel): """Request payload for manual cleanup of reported messages.""" older_than_seconds: Optional[int] = None
[docs] class CleanupMessagesResponse(CommandStatus): """Response from cleanup operation.""" removed: int
[docs] def create_app( svc: AsyncMailCore, api_token: str | None = None, lifespan: Callable[[FastAPI], AsyncContextManager] | None = None ) -> FastAPI: """Create and configure the FastAPI application. Parameters ---------- svc: Instance of :class:`async_mail_service.core.AsyncMailCore` that implements the business logic for each command. api_token: Optional secret used to protect every endpoint. When provided, the ``X-API-Token`` header must match this value on every request. lifespan: Optional lifespan context manager for startup/shutdown events. Returns ------- FastAPI A configured application ready to be served by Uvicorn or any ASGI server. """ global service service = svc # Use custom lifespan if provided, otherwise use the global app if lifespan is not None: api = FastAPI(title="Async Mail Service", lifespan=lifespan) else: api = app api.state.api_token = api_token router = APIRouter(prefix="/commands", tags=["commands"], dependencies=[auth_dependency]) @api.get("/status", response_model=BasicOkResponse, response_model_exclude_none=True, dependencies=[auth_dependency]) async def status(): """Return a simple health status payload.""" return BasicOkResponse(ok=True) @router.post("/run-now", response_model=BasicOkResponse, response_model_exclude_none=True) async def run_now(): if not service: raise HTTPException(500, "Service not initialized") result = await service.handle_command("run now", {}) return BasicOkResponse.model_validate(result) @router.post("/suspend", response_model=BasicOkResponse, response_model_exclude_none=True) async def suspend(): """Suspend the scheduler component of the mail service.""" if not service: raise HTTPException(500, "Service not initialized") result = await service.handle_command("suspend", {}) return BasicOkResponse.model_validate(result) @router.post("/activate", response_model=BasicOkResponse, response_model_exclude_none=True) async def activate(): """Activate the scheduler component of the mail service.""" if not service: raise HTTPException(500, "Service not initialized") result = await service.handle_command("activate", {}) return BasicOkResponse.model_validate(result) @router.post("/add-messages", response_model=AddMessagesResponse, response_model_exclude_none=True) async def add_messages(payload: EnqueueMessagesPayload): """Push a batch of messages into the scheduler queue.""" if not service: raise HTTPException(500, "Service not initialized") serialized: List[Dict[str, Any]] = [] for msg in payload.messages: data = msg.model_dump(by_alias=True, exclude_none=True) if msg.attachments is not None: data["attachments"] = [att.model_dump(exclude_none=True) for att in msg.attachments] serialized.append(data) data = {"messages": serialized} if payload.default_priority is not None: data["default_priority"] = payload.default_priority result = await service.handle_command("addMessages", data) if not isinstance(result, dict) or result.get("ok") is not True: detail = {"error": result.get("error"), "rejected": result.get("rejected")} raise HTTPException(status_code=400, detail=detail) return AddMessagesResponse.model_validate(result) @router.post("/delete-messages", response_model=DeleteMessagesResponse, response_model_exclude_none=True) async def delete_messages(payload: DeleteMessagesPayload): """Remove messages from the scheduler queue and related tracking tables.""" if not service: raise HTTPException(500, "Service not initialized") result = await service.handle_command("deleteMessages", payload.model_dump()) return DeleteMessagesResponse.model_validate(result) @router.post("/cleanup-messages", response_model=CleanupMessagesResponse, response_model_exclude_none=True) async def cleanup_messages(payload: CleanupMessagesPayload = CleanupMessagesPayload()): """Manually trigger cleanup of reported messages older than retention period. By default uses the configured retention period. Optionally specify older_than_seconds to override the retention period for this cleanup. """ if not service: raise HTTPException(500, "Service not initialized") result = await service.handle_command("cleanupMessages", payload.model_dump()) return CleanupMessagesResponse.model_validate(result) @api.post("/account", response_model=BasicOkResponse, response_model_exclude_none=True, dependencies=[auth_dependency]) async def add_account(acc: AccountPayload): """Register or update an SMTP account definition.""" if not service: raise HTTPException(500, "Service not initialized") result = await service.handle_command("addAccount", acc.model_dump()) return BasicOkResponse.model_validate(result) @api.get("/accounts", response_model=AccountsResponse, response_model_exclude_none=True, dependencies=[auth_dependency]) async def list_accounts(): """List the SMTP accounts known by the dispatcher.""" if not service: raise HTTPException(500, "Service not initialized") result = await service.handle_command("listAccounts", {}) return AccountsResponse.model_validate(result) @api.delete("/account/{account_id}", response_model=BasicOkResponse, response_model_exclude_none=True, dependencies=[auth_dependency]) async def delete_account(account_id: str): """Remove an SMTP account and any scheduler state bound to it.""" if not service: raise HTTPException(500, "Service not initialized") result = await service.handle_command("deleteAccount", {"id": account_id}) return BasicOkResponse.model_validate(result) @api.get("/messages", response_model=MessagesResponse, response_model_exclude_none=True, dependencies=[auth_dependency]) async def all_messages(): """Expose the current message queue with detailed payload information.""" if not service: raise HTTPException(500, "Service not initialized") result = await service.handle_command("listMessages", {}) return MessagesResponse.model_validate(result) @api.get("/metrics", dependencies=[auth_dependency]) async def metrics(): """Expose Prometheus metrics collected by the dispatcher.""" if not service: raise HTTPException(500, "Service not initialized") return Response(content=service.metrics.generate_latest(), media_type="text/plain; version=0.0.4") api.include_router(router) return api