Source code for async_mail_service.smtp_pool

"""Lightweight asyncio-friendly SMTP connection pool."""

import asyncio
import time
import aiosmtplib
from typing import Optional, Tuple, Dict

[docs] class SMTPPool: """Reuse SMTP connections per task to reduce connection overhead.""" def __init__(self, ttl: int = 300): """Create a pool with the given time-to-live, in seconds.""" self.ttl = ttl self.pool: Dict[int, Tuple[aiosmtplib.SMTP, float, Tuple[str, int, Optional[str], Optional[str], bool]]] = {} self.lock = asyncio.Lock() async def _connect(self, host: str, port: int, user: Optional[str], password: Optional[str], use_tls: bool) -> aiosmtplib.SMTP: """Open a new SMTP connection and authenticate if needed.""" # Use explicit 10 second timeout to prevent hanging connections # For plain SMTP (use_tls=False): both use_tls and start_tls should be False # For TLS/SSL (use_tls=True): use_tls=True, start_tls=False (direct TLS on port 465) smtp = aiosmtplib.SMTP(hostname=host, port=port, start_tls=False, use_tls=use_tls, timeout=10.0) # Wrap in asyncio.timeout to ensure we don't hang even if aiosmtplib timeout fails async with asyncio.timeout(15.0): await smtp.connect() if user and password: await smtp.login(user, password) return smtp async def _is_alive(self, smtp: aiosmtplib.SMTP) -> bool: """Return ``True`` when the connection responds correctly to NOOP.""" try: # Use timeout to prevent hanging on dead connections async with asyncio.timeout(5.0): code, _ = await smtp.noop() return code == 250 except Exception: return False
[docs] async def get_connection(self, host: str, port: int, user: Optional[str], password: Optional[str], *, use_tls: bool) -> aiosmtplib.SMTP: """Return a pooled connection bound to the calling task.""" task_id = id(asyncio.current_task()) async with self.lock: entry = self.pool.get(task_id) if entry: smtp, last_used, params = entry params_match = params == (host, port, user, password, use_tls) fresh_enough = (time.time() - last_used) < self.ttl if params_match and fresh_enough: is_alive = await self._is_alive(smtp) if is_alive: async with self.lock: self.pool[task_id] = (smtp, time.time(), params) return smtp async with self.lock: self.pool.pop(task_id, None) try: await smtp.quit() except Exception: pass smtp = await self._connect(host, port, user, password, use_tls) async with self.lock: self.pool[task_id] = (smtp, time.time(), (host, port, user, password, use_tls)) return smtp
[docs] async def cleanup(self) -> None: """Close idle or broken connections still registered in the pool.""" now = time.time() async with self.lock: items = list(self.pool.items()) expired: list[Tuple[int, aiosmtplib.SMTP]] = [] candidates: list[Tuple[int, aiosmtplib.SMTP]] = [] for task_id, (smtp, last_used, params) in items: if (now - last_used) > self.ttl: expired.append((task_id, smtp)) else: candidates.append((task_id, smtp)) for task_id, smtp in candidates: try: alive = await self._is_alive(smtp) except Exception: alive = False if not alive: expired.append((task_id, smtp)) for task_id, smtp in expired: async with self.lock: entry = self.pool.pop(task_id, None) if entry: try: await entry[0].quit() except Exception: pass