Source code for core.mail_proxy.entities.instance.endpoint

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: Apache-2.0
"""Instance REST API endpoint for service-level operations.

This module provides the InstanceEndpoint class exposing service-level
operations for the mail proxy via REST API and CLI commands.

Operations include:
    - health: Container orchestration health check (unauthenticated)
    - status: Authenticated service status with active state
    - run_now: Trigger immediate dispatch cycle
    - suspend/activate: Control message sending per tenant/batch
    - get/update: Instance configuration management
    - get_sync_status: Monitor tenant synchronization health
    - upgrade_to_ee: Transition from Community to Enterprise Edition

Example:
    CLI commands auto-generated::

        mail-proxy instance health
        mail-proxy instance status
        mail-proxy instance run-now --tenant-id acme
        mail-proxy instance suspend --tenant-id acme
        mail-proxy instance activate --tenant-id acme
        mail-proxy instance get
        mail-proxy instance update --name production
        mail-proxy instance get-sync-status
        mail-proxy instance upgrade-to-ee

Note:
    Enterprise Edition (EE) extends this with InstanceEndpoint_EE mixin
    adding bounce detection configuration operations.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from ...interface.endpoint_base import POST, BaseEndpoint

if TYPE_CHECKING:
    from .table import InstanceTable


[docs] class InstanceEndpoint(BaseEndpoint): """REST API endpoint for instance-level operations. Provides service management operations including health checks, dispatch control, and configuration management. Attributes: name: Endpoint name used in URL paths ("instance"). table: InstanceTable instance for configuration storage. proxy: Optional MailProxy instance for service operations. Example: Using the endpoint programmatically:: endpoint = InstanceEndpoint(db.table("instance"), proxy) # Check service health health = await endpoint.health() # Trigger dispatch await endpoint.run_now(tenant_id="acme") # Update configuration await endpoint.update(name="production") """ name = "instance"
[docs] def __init__(self, table: InstanceTable, proxy: object | None = None): """Initialize endpoint with table and optional proxy reference. Args: table: InstanceTable for configuration storage. proxy: Optional MailProxy instance for service operations. When provided, enables run_now, suspend, activate, and get_sync_status to interact with the running service. """ super().__init__(table) self.proxy = proxy
[docs] async def health(self) -> dict: """Health check for container orchestration. Lightweight endpoint for liveness/readiness probes. Does not require authentication. Returns immediately without database access. Returns: Dict with status "ok". Example: :: # Kubernetes liveness probe # GET /instance/health {"status": "ok"} """ return {"status": "ok"}
[docs] async def status(self) -> dict: """Authenticated service status. Returns the current active state of the mail proxy service. Requires authentication. Returns: Dict with ok=True and active boolean indicating if the dispatch loop is running. """ active = True if self.proxy is not None: active = getattr(self.proxy, "_active", True) return {"ok": True, "active": active}
[docs] @POST async def run_now(self, tenant_id: str | None = None) -> dict: """Trigger immediate dispatch cycle. Resets the tenant's sync timer, causing the next dispatch loop iteration to process messages immediately. Args: tenant_id: If provided, only reset this tenant's sync timer. If None, triggers dispatch for all tenants. Returns: Dict with ok=True. """ if self.proxy is not None: result = await self.proxy.handle_command("run now", {"tenant_id": tenant_id}) return result return {"ok": True}
[docs] @POST async def suspend( self, tenant_id: str, batch_code: str | None = None, ) -> dict: """Suspend message sending for a tenant. Prevents messages from being dispatched for the specified tenant or batch. Messages remain in queue and will be sent when activated. Args: tenant_id: Tenant to suspend. batch_code: Optional batch code. If None, suspends all batches. Returns: Dict with suspended batches list and pending message count. """ if self.proxy is not None: result = await self.proxy.handle_command( "suspend", { "tenant_id": tenant_id, "batch_code": batch_code, }, ) return result return {"ok": True, "tenant_id": tenant_id, "batch_code": batch_code}
[docs] @POST async def activate( self, tenant_id: str, batch_code: str | None = None, ) -> dict: """Resume message sending for a tenant. Removes suspension for the specified tenant or batch, allowing queued messages to be dispatched. Args: tenant_id: Tenant to activate. batch_code: Optional batch code. If None, clears all suspensions. Returns: Dict with remaining suspended batches list. """ if self.proxy is not None: result = await self.proxy.handle_command( "activate", { "tenant_id": tenant_id, "batch_code": batch_code, }, ) return result return {"ok": True, "tenant_id": tenant_id, "batch_code": batch_code}
[docs] async def get(self) -> dict: """Get instance configuration. Returns: Dict with ok=True and all instance configuration fields. """ instance = await self.table.get_instance() if instance is None: return {"ok": True, "id": 1, "name": "mail-proxy", "edition": "ce"} return {"ok": True, **instance}
[docs] @POST async def update( self, name: str | None = None, api_token: str | None = None, edition: str | None = None, ) -> dict: """Update instance configuration. Args: name: New instance display name. api_token: New master API token. edition: New edition ("ce" or "ee"). Returns: Dict with ok=True. """ updates = {} if name is not None: updates["name"] = name if api_token is not None: updates["api_token"] = api_token if edition is not None: updates["edition"] = edition if updates: await self.table.update_instance(updates) return {"ok": True}
[docs] async def get_sync_status(self) -> dict: """Get sync status for all tenants. Returns synchronization health information for each tenant, useful for monitoring and debugging delivery issues. Returns: Dict with ok=True and tenants list. Each tenant contains: - id: Tenant identifier - last_sync_ts: Unix timestamp of last sync - next_sync_due: True if sync interval has expired - in_dnd: True if tenant is in Do Not Disturb mode """ if self.proxy is not None: result = await self.proxy.handle_command("listTenantsSyncStatus", {}) return result return {"ok": True, "tenants": []}
[docs] @POST async def upgrade_to_ee(self) -> dict: """Upgrade from Community Edition to Enterprise Edition. Performs explicit upgrade from CE to EE mode: 1. Verifies Enterprise modules are installed 2. Sets edition="ee" in instance configuration 3. Optionally generates API key for "default" tenant The upgrade is idempotent - calling when already EE is safe. Returns: Dict with ok=True, edition, optional default_tenant_token, and descriptive message. Raises: ValueError: If Enterprise modules are not installed. """ import core.mail_proxy if not core.mail_proxy.HAS_ENTERPRISE: raise ValueError( "Enterprise modules not installed. Install with: pip install genro-mail-proxy[ee]" ) # Check if already EE if await self.table.is_enterprise(): return {"ok": True, "edition": "ee", "message": "Already in Enterprise Edition"} # Upgrade to EE await self.table.set_edition("ee") # If "default" tenant exists without token, generate one if self.proxy is not None: tenants_table = self.proxy.db.table("tenants") default_tenant = await tenants_table.get("default") if default_tenant and not default_tenant.get("api_key_hash"): token = await tenants_table.create_api_key("default") return { "ok": True, "edition": "ee", "default_tenant_token": token, "message": "Upgraded to Enterprise Edition. Save the default tenant token - it will not be shown again.", } return {"ok": True, "edition": "ee", "message": "Upgraded to Enterprise Edition"}
__all__ = ["InstanceEndpoint"]