# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: Apache-2.0
"""FastAPI route generation from endpoint classes via introspection.
This module generates REST API routes automatically from endpoint classes
by introspecting method signatures and creating appropriate handlers.
Components:
create_app: FastAPI application factory.
register_endpoint: Register endpoint methods as FastAPI routes.
verify_tenant_token: Token verification for tenant-scoped requests.
require_admin_token: Admin-only endpoint protection.
require_token: General authentication dependency.
Example:
Create and run the API server::
from core.mail_proxy.interface import create_app
from core.mail_proxy.proxy import MailProxy
proxy = MailProxy(db_path="/data/mail.db")
app = create_app(proxy, api_token="secret")
# Run with uvicorn
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Register custom endpoints::
from fastapi import FastAPI
from core.mail_proxy.interface import register_endpoint
app = FastAPI()
endpoint = MyCustomEndpoint(table)
register_endpoint(app, endpoint)
Note:
Authentication uses X-API-Token header. Global token grants admin
access to all tenants. Tenant tokens restrict access to own resources.
"""
from __future__ import annotations
import inspect
import logging
import secrets
from collections.abc import Callable
from collections.abc import Callable as CallableType
from contextlib import AbstractAsyncContextManager
from typing import TYPE_CHECKING, Any
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Query, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse, Response
from fastapi.security import APIKeyHeader
from .endpoint_base import BaseEndpoint
if TYPE_CHECKING:
from ..proxy import MailProxy
logger = logging.getLogger(__name__)
# Authentication constants
API_TOKEN_HEADER_NAME = "X-API-Token"
api_key_scheme = APIKeyHeader(name=API_TOKEN_HEADER_NAME, auto_error=False)
# Global service reference (set by create_app)
_service: MailProxy | None = None
def _get_http_method_fallback(method_name: str) -> str:
"""Infer HTTP method from method name prefix.
Args:
method_name: Name of the endpoint method.
Returns:
HTTP method string (GET, POST, DELETE, PATCH, PUT).
"""
if method_name.startswith(("add", "create", "post", "run", "suspend", "activate")):
return "POST"
elif method_name.startswith(("delete", "remove")):
return "DELETE"
elif method_name.startswith(("update", "patch")):
return "PATCH"
elif method_name.startswith(("set", "put")):
return "PUT"
return "GET"
def _count_params_fallback(method: Callable) -> int:
"""Count non-self parameters for a method.
Args:
method: The method to introspect.
Returns:
Number of parameters excluding 'self'.
"""
sig = inspect.signature(method)
return sum(1 for p in sig.parameters if p != "self")
def _create_model_fallback(method: Callable, method_name: str) -> type:
"""Create Pydantic model from method signature.
Args:
method: The method to introspect.
method_name: Name used for model class name.
Returns:
Dynamically created Pydantic model class.
"""
from typing import get_type_hints
from pydantic import create_model
sig = inspect.signature(method)
try:
hints = get_type_hints(method)
except Exception:
hints = {}
fields = {}
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
annotation = hints.get(param_name, param.annotation)
if annotation is inspect.Parameter.empty:
annotation = Any
if param.default is inspect.Parameter.empty:
fields[param_name] = (annotation, ...)
else:
fields[param_name] = (annotation, param.default)
model_name = f"{method_name.title().replace('_', '')}Request"
return create_model(model_name, **fields)
[docs]
def register_endpoint(app: FastAPI | APIRouter, endpoint: Any, prefix: str = "") -> None:
"""Register all methods of an endpoint as FastAPI routes.
Introspects the endpoint to discover async methods and creates
appropriate GET (query params) or POST (body) routes.
Args:
app: FastAPI app or APIRouter to register routes on.
endpoint: Endpoint instance (BaseEndpoint or duck-typed).
prefix: Optional URL prefix. Defaults to /{endpoint.name}.
Example:
::
endpoint = AccountEndpoint(db.table("accounts"))
register_endpoint(app, endpoint)
# Creates routes: GET /accounts/list, POST /accounts/add, etc.
"""
name = getattr(endpoint, "name", endpoint.__class__.__name__.lower())
base_path = prefix or f"/{name}"
if isinstance(endpoint, BaseEndpoint):
methods = endpoint.get_methods()
else:
methods = []
for method_name in dir(endpoint):
if method_name.startswith("_"):
continue
method = getattr(endpoint, method_name)
if callable(method) and inspect.iscoroutinefunction(method):
methods.append((method_name, method))
for method_name, method in methods:
if isinstance(endpoint, BaseEndpoint):
http_method = endpoint.get_http_method(method_name)
param_count = endpoint.count_params(method_name)
else:
http_method = _get_http_method_fallback(method_name)
param_count = _count_params_fallback(method)
path = f"{base_path}/{method_name}"
doc = method.__doc__ or f"{method_name} operation"
if http_method == "GET" or (http_method == "DELETE" and param_count <= 3):
_register_query_route(app, path, method, http_method, doc)
else:
_register_body_route(app, path, method, http_method, doc, method_name, endpoint)
def _register_query_route(
app: FastAPI | APIRouter, path: str, method: Callable, http_method: str, doc: str
) -> None:
"""Register route with query parameters."""
sig = inspect.signature(method)
params = []
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
ann = param.annotation if param.annotation is not inspect.Parameter.empty else str
default = param.default if param.default is not inspect.Parameter.empty else ...
params.append((param_name, ann, default))
async def handler(**kwargs: Any) -> Any:
return await method(**kwargs)
new_params = [
inspect.Parameter(
name=p[0],
kind=inspect.Parameter.KEYWORD_ONLY,
default=Query(p[2]) if p[2] is not ... else Query(...),
annotation=p[1],
)
for p in params
]
handler.__signature__ = inspect.Signature(parameters=new_params) # type: ignore
handler.__doc__ = doc
if http_method == "GET":
app.get(path, summary=doc.split("\n")[0])(handler)
elif http_method == "DELETE":
app.delete(path, summary=doc.split("\n")[0])(handler)
def _make_body_handler(method: Callable, RequestModel: type) -> Callable:
"""Create handler that accepts body and calls method."""
async def handler(data: RequestModel) -> Any: # type: ignore
return await method(**data.model_dump())
handler.__signature__ = inspect.Signature( # type: ignore
parameters=[
inspect.Parameter(
"data",
inspect.Parameter.POSITIONAL_OR_KEYWORD,
annotation=RequestModel,
)
]
)
return handler
def _register_body_route(
app: FastAPI | APIRouter,
path: str,
method: Callable,
http_method: str,
doc: str,
method_name: str,
endpoint: Any = None,
) -> None:
"""Register route with request body."""
if isinstance(endpoint, BaseEndpoint):
RequestModel = endpoint.create_request_model(method_name)
else:
RequestModel = _create_model_fallback(method, method_name)
handler = _make_body_handler(method, RequestModel)
handler.__doc__ = doc
if http_method == "POST":
app.post(path, summary=doc.split("\n")[0])(handler)
elif http_method == "PUT":
app.put(path, summary=doc.split("\n")[0])(handler)
elif http_method == "PATCH":
app.patch(path, summary=doc.split("\n")[0])(handler)
elif http_method == "DELETE":
app.delete(path, summary=doc.split("\n")[0])(handler)
# =============================================================================
# Authentication functions
# =============================================================================
[docs]
async def verify_tenant_token(
tenant_id: str | None,
api_token: str | None,
global_token: str | None,
) -> None:
"""Verify API token for a tenant-scoped request.
Args:
tenant_id: The tenant ID from the request.
api_token: The token from X-API-Token header.
global_token: The configured global API token (admin).
Raises:
HTTPException: 401 if token is invalid or tenant_id mismatch.
Note:
- Global token grants access to any tenant
- Tenant token grants access ONLY to own resources
- No token configured = open access
"""
if not api_token:
if global_token is not None:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or missing API token")
return
if global_token is not None and secrets.compare_digest(api_token, global_token):
return
if _service and getattr(_service, "db", None):
token_tenant = await _service.db.table("tenants").get_tenant_by_token(api_token)
if token_tenant:
if tenant_id and token_tenant["id"] != tenant_id:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, "Token not authorized for this tenant"
)
return
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or missing API token")
[docs]
async def require_admin_token(
request: Request,
api_token: str | None = Depends(api_key_scheme),
) -> None:
"""Require global admin token for admin-only endpoints.
Admin-only endpoints include tenant management, API key operations,
and instance configuration.
Args:
request: FastAPI request object.
api_token: Token from X-API-Token header (via Depends).
Raises:
HTTPException: 401 if not global admin token, 403 if tenant token.
"""
expected = getattr(request.app.state, "api_token", None)
if not api_token:
if expected is not None:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Admin token required")
return
if expected is not None and secrets.compare_digest(api_token, expected):
return
if _service and getattr(_service, "db", None):
token_tenant = await _service.db.table("tenants").get_tenant_by_token(api_token)
if token_tenant:
raise HTTPException(
status.HTTP_403_FORBIDDEN,
"Admin token required, tenant tokens not allowed for this operation",
)
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or missing API token")
[docs]
async def require_token(
request: Request,
api_token: str | None = Depends(api_key_scheme),
) -> None:
"""Validate API token from X-API-Token header.
Accepts global admin token (full access) or tenant token (own resources).
Stores token info in request.state for downstream verification.
Args:
request: FastAPI request object.
api_token: Token from X-API-Token header (via Depends).
Raises:
HTTPException: 401 if token is invalid.
"""
request.state.api_token = api_token
expected = getattr(request.app.state, "api_token", None)
if not api_token:
if expected is not None:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or missing API token")
return
if expected is not None and secrets.compare_digest(api_token, expected):
request.state.is_admin = True
return
if _service and getattr(_service, "db", None):
token_tenant = await _service.db.table("tenants").get_tenant_by_token(api_token)
if token_tenant:
request.state.token_tenant_id = token_tenant["id"]
request.state.is_admin = False
return
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or missing API token")
# Dependency shortcuts
admin_dependency = Depends(require_admin_token)
auth_dependency = Depends(require_token)
# =============================================================================
# Application factory
# =============================================================================
[docs]
def create_app(
svc: MailProxy,
api_token: str | None = None,
lifespan: CallableType[[FastAPI], AbstractAsyncContextManager[None]] | None = None,
tenant_tokens_enabled: bool = False,
) -> FastAPI:
"""Create and configure the FastAPI application.
Args:
svc: MailProxy instance implementing business logic.
api_token: Optional global token for X-API-Token authentication.
lifespan: Optional lifespan context manager. If None, creates
default that starts/stops the proxy service.
tenant_tokens_enabled: When True, enables per-tenant API keys.
Returns:
Configured FastAPI application with all routes registered.
Example:
::
from core.mail_proxy.proxy import MailProxy
from core.mail_proxy.interface import create_app
proxy = MailProxy(db_path="/data/mail.db")
app = create_app(proxy, api_token="admin-secret")
# Run with uvicorn
import uvicorn
uvicorn.run(app)
"""
global _service
_service = svc
if lifespan is None:
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
@asynccontextmanager
async def default_lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Default lifespan: start and stop the MailProxy service."""
logger.info("Starting mail-proxy service...")
await svc.start()
logger.info("Mail-proxy service started")
try:
yield
finally:
logger.info("Stopping mail-proxy service...")
await svc.stop()
logger.info("Mail-proxy service stopped")
lifespan = default_lifespan
app = FastAPI(title="Async Mail Service", lifespan=lifespan)
app.state.api_token = api_token
app.state.tenant_tokens_enabled = tenant_tokens_enabled
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(
request: Request, exc: RequestValidationError
) -> JSONResponse:
"""Handle FastAPI request validation errors with detailed logging."""
body = await request.body()
logger.error(f"Validation error on {request.method} {request.url.path}")
logger.error(f"Request body: {body.decode('utf-8', errors='replace')}")
logger.error(f"Validation errors: {exc.errors()}")
return JSONResponse(status_code=422, content={"detail": exc.errors()})
_register_entity_endpoints(app, svc)
_register_instance_endpoints(app, svc)
return app
def _register_entity_endpoints(app: FastAPI, svc: MailProxy) -> None:
"""Register entity endpoints via autodiscovery."""
router = APIRouter(dependencies=[auth_dependency])
for endpoint_class in BaseEndpoint.discover():
if endpoint_class.name == "instance":
continue
table = svc.db.table(endpoint_class.name)
endpoint = endpoint_class(table)
register_endpoint(router, endpoint)
app.include_router(router)
def _register_instance_endpoints(app: FastAPI, svc: MailProxy) -> None:
"""Register instance-level endpoints (health, metrics, operations)."""
instance_class = None
for endpoint_class in BaseEndpoint.discover():
if endpoint_class.name == "instance":
instance_class = endpoint_class
break
if not instance_class:
logger.warning("InstanceEndpoint not found in discovery")
return
instance_table = svc.db.table("instance")
instance_endpoint = instance_class(instance_table, proxy=svc)
@app.get("/health")
async def health() -> dict:
"""Health check endpoint for container orchestration."""
return await instance_endpoint.health()
@app.get("/metrics")
async def metrics() -> Response:
"""Export Prometheus metrics in text exposition format."""
return Response(
content=svc.metrics.generate_latest(), media_type="text/plain; version=0.0.4"
)
router = APIRouter(dependencies=[auth_dependency])
register_endpoint(router, instance_endpoint)
app.include_router(router)
__all__ = [
"API_TOKEN_HEADER_NAME",
"admin_dependency",
"api_key_scheme",
"auth_dependency",
"create_app",
"register_endpoint",
"require_admin_token",
"require_token",
"verify_tenant_token",
]