Source code for enterprise.mail_proxy.imap.client

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: BSL-1.1
"""Async IMAP client wrapper for bounce detection."""

from __future__ import annotations

import ssl
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from logging import Logger


[docs] @dataclass class IMAPMessage: """Represents a fetched IMAP message.""" uid: int raw: bytes
[docs] class IMAPClient: """Async IMAP client wrapper using aioimaplib."""
[docs] def __init__(self, logger: Logger | None = None): self._client: Any = None self._logger = logger self._uidvalidity: int | None = None
[docs] async def connect( self, host: str, port: int, user: str, password: str, use_ssl: bool = True, ) -> None: """Connect and authenticate to IMAP server.""" import aioimaplib if use_ssl: ssl_context = ssl.create_default_context() self._client = aioimaplib.IMAP4_SSL(host=host, port=port, ssl_context=ssl_context) else: self._client = aioimaplib.IMAP4(host=host, port=port) await self._client.wait_hello_from_server() response = await self._client.login(user, password) if response.result != "OK": raise ConnectionError(f"IMAP login failed: {response.lines}") if self._logger: self._logger.debug("IMAP connected to %s:%d as %s", host, port, user)
[docs] async def select_folder(self, folder: str = "INBOX") -> int: """Select mailbox folder. Returns UIDVALIDITY.""" if not self._client: raise RuntimeError("Not connected") response = await self._client.select(folder) if response.result != "OK": raise RuntimeError(f"Failed to select folder {folder}: {response.lines}") # Parse UIDVALIDITY from response for line in response.lines: if isinstance(line, bytes): line = line.decode("utf-8", errors="replace") if "UIDVALIDITY" in line: # Format: [UIDVALIDITY 123456] import re match = re.search(r"UIDVALIDITY\s+(\d+)", line) if match: self._uidvalidity = int(match.group(1)) break if self._logger: self._logger.debug("Selected folder %s, UIDVALIDITY=%s", folder, self._uidvalidity) return self._uidvalidity or 0
@property def uidvalidity(self) -> int | None: """Return current UIDVALIDITY value.""" return self._uidvalidity
[docs] async def fetch_since_uid(self, last_uid: int) -> list[IMAPMessage]: """Fetch messages with UID greater than last_uid.""" if not self._client: raise RuntimeError("Not connected") messages: list[IMAPMessage] = [] # Search for UIDs greater than last_uid search_criteria = f"UID {last_uid + 1}:*" response = await self._client.uid_search(search_criteria) if response.result != "OK": if self._logger: self._logger.warning("IMAP search failed: %s", response.lines) return messages # Parse UIDs from response uids: list[int] = [] for line in response.lines: if isinstance(line, bytes): line = line.decode("utf-8", errors="replace") if line and line.strip(): for uid_str in line.split(): if uid_str.isdigit(): uid = int(uid_str) if uid > last_uid: uids.append(uid) if not uids: return messages if self._logger: self._logger.debug("Found %d new messages (UIDs: %s)", len(uids), uids[:10]) # Fetch each message for uid in uids: response = await self._client.uid("FETCH", str(uid), "(RFC822)") if response.result == "OK": # Parse raw message from response # aioimaplib returns RFC822 content as bytearray in response.lines # Format: [b'N FETCH (UID X RFC822 {size}', bytearray(content), b')', b'status'] for item in response.lines: if isinstance(item, bytearray) and item: # The actual email content is always a bytearray messages.append(IMAPMessage(uid=uid, raw=bytes(item))) break return messages
[docs] async def close(self) -> None: """Close IMAP connection.""" if self._client: try: await self._client.logout() except Exception: pass self._client = None if self._logger: self._logger.debug("IMAP connection closed")
__all__ = ["IMAPClient", "IMAPMessage"]