Source code for enterprise.mail_proxy.pec.receiver

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: BSL-1.1
"""PEC receipt receiver loop for polling IMAP and processing receipts.

This module monitors PEC accounts for incoming receipts (ricevute) and:
1. Parses receipt type (accettazione, consegna, mancata_consegna, etc.)
2. Creates corresponding events in the message_events table
3. Handles timeout for PEC messages that don't receive acceptance within 30 minutes

The receiver runs as a background task and polls all PEC accounts periodically.
"""

from __future__ import annotations

import asyncio
import time
from typing import TYPE_CHECKING

from .parser import PecReceiptParser

if TYPE_CHECKING:
    from logging import Logger

    from core.mail_proxy.mailproxy_db import MailProxyDb


# Timeout in seconds for PEC acceptance (30 minutes)
PEC_ACCEPTANCE_TIMEOUT = 30 * 60


[docs] class PecReceiver: """Background task that polls PEC accounts for receipts."""
[docs] def __init__( self, db: MailProxyDb, logger: Logger | None = None, poll_interval: int = 60, acceptance_timeout: int = PEC_ACCEPTANCE_TIMEOUT, ): self._db = db self._logger = logger self._parser = PecReceiptParser() self._poll_interval = poll_interval self._acceptance_timeout = acceptance_timeout self._running = False self._task: asyncio.Task[None] | None = None
[docs] async def start(self) -> None: """Start the PEC receiver background task.""" if self._running: return self._running = True self._task = asyncio.create_task(self._poll_loop()) if self._logger: self._logger.info("PecReceiver started")
[docs] async def stop(self) -> None: """Stop the PEC receiver.""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None if self._logger: self._logger.info("PecReceiver stopped")
async def _poll_loop(self) -> None: """Main polling loop.""" while self._running: try: await self._process_all_pec_accounts() await self._check_pec_timeouts() except Exception as e: if self._logger: self._logger.error("PEC processing error: %s", e) await asyncio.sleep(self._poll_interval) async def _process_all_pec_accounts(self) -> None: """Process receipts for all PEC accounts.""" pec_accounts = await self._db.table("accounts").list_pec_accounts() if not pec_accounts: return for account in pec_accounts: try: await self._process_account(account) except Exception as e: if self._logger: self._logger.error( "Error processing PEC account %s: %s", account.get("id"), e, ) async def _process_account(self, account: dict) -> None: """Process receipts for a single PEC account.""" from ..imap import IMAPClient tenant_id = account["tenant_id"] account_id = account["id"] imap_host = account.get("imap_host") imap_port = account.get("imap_port", 993) imap_user = account.get("imap_user") imap_password = account.get("imap_password") imap_folder = account.get("imap_folder", "INBOX") last_uid = account.get("imap_last_uid") or 0 stored_uidvalidity = account.get("imap_uidvalidity") if not all([imap_host, imap_user, imap_password]): if self._logger: self._logger.warning( "PEC account %s missing IMAP configuration", account_id, ) return client = IMAPClient(logger=self._logger) try: await client.connect( host=imap_host, port=imap_port, user=imap_user, password=imap_password, use_ssl=True, ) uidvalidity = await client.select_folder(imap_folder) # Reset last_uid if UIDVALIDITY changed (mailbox was recreated) if stored_uidvalidity is not None and uidvalidity != stored_uidvalidity: if self._logger: self._logger.warning( "PEC account %s: UIDVALIDITY changed from %d to %d, resetting sync state", account_id, stored_uidvalidity, uidvalidity, ) last_uid = 0 # Fetch new messages messages = await client.fetch_since_uid(last_uid) if not messages: # Update uidvalidity even if no messages if uidvalidity != stored_uidvalidity: await self._db.table("accounts").update_imap_sync_state( tenant_id, account_id, last_uid=last_uid, uidvalidity=uidvalidity, ) return if self._logger: self._logger.debug( "PEC account %s: processing %d messages", account_id, len(messages), ) processed = 0 max_uid = last_uid for msg in messages: receipt_info = self._parser.parse(msg.raw) if receipt_info.receipt_type and receipt_info.original_message_id: # Found a valid PEC receipt await self._handle_receipt(receipt_info) processed += 1 if self._logger: self._logger.info( "PEC receipt: msg_id=%s type=%s", receipt_info.original_message_id, receipt_info.receipt_type, ) # Track max UID if msg.uid > max_uid: max_uid = msg.uid # Update sync state await self._db.table("accounts").update_imap_sync_state( tenant_id, account_id, last_uid=max_uid, uidvalidity=uidvalidity, ) if self._logger and processed > 0: self._logger.info( "PEC account %s: processed %d receipts", account_id, processed, ) finally: await client.close() async def _handle_receipt(self, receipt_info) -> None: """Handle a parsed PEC receipt by creating appropriate event.""" # original_message_id contains the pk (UUID) from the Message-ID header message_pk = receipt_info.original_message_id receipt_type = receipt_info.receipt_type # Map receipt type to event type event_type_map = { "accettazione": "pec_acceptance", "consegna": "pec_delivery", "mancata_consegna": "pec_error", "non_accettazione": "pec_error", "presa_in_carico": "pec_acceptance", # Treat as acceptance } event_type = event_type_map.get(receipt_type, "pec_acceptance") # Build event metadata metadata = {} if receipt_info.timestamp: metadata["pec_timestamp"] = receipt_info.timestamp if receipt_info.recipient: metadata["recipient"] = receipt_info.recipient if receipt_info.error_reason: metadata["error_reason"] = receipt_info.error_reason # Create event now_ts = int(time.time()) await self._db.table("message_events").add_event( message_pk=message_pk, event_type=event_type, event_ts=now_ts, description=f"PEC {receipt_type}", metadata=metadata if metadata else None, ) # For delivery confirmations, the message can be considered complete # For errors, mark the message accordingly if receipt_type in ("mancata_consegna", "non_accettazione"): # Clear PEC flag on error - it won't get delivered via PEC await self._db.table("messages").clear_pec_flag(message_pk) async def _check_pec_timeouts(self) -> None: """Check for PEC messages that timed out waiting for acceptance.""" # Find messages with is_pec=1 and smtp_ts > 30 minutes ago # that haven't received an acceptance receipt now_ts = int(time.time()) cutoff_ts = now_ts - self._acceptance_timeout timed_out_messages = await self._db.table("messages").get_pec_without_acceptance(cutoff_ts) for msg in timed_out_messages: message_pk = msg["pk"] if self._logger: self._logger.info( "PEC timeout: message pk=%s sent over 30 min ago without acceptance, declassifying", message_pk, ) # Clear the PEC flag await self._db.table("messages").clear_pec_flag(message_pk) # Create timeout event await self._db.table("message_events").add_event( message_pk=message_pk, event_type="pec_timeout", event_ts=now_ts, description="PEC acceptance timeout - message treated as normal email", )
__all__ = ["PecReceiver", "PEC_ACCEPTANCE_TIMEOUT"]