# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: Apache-2.0
"""Asyncio-friendly SMTP connection pool with acquire/release pattern.
This module provides a connection pool for SMTP clients using a true resource
pool pattern with acquire/release semantics. Connections are pooled by account
key (host:port:user) and can be shared between concurrent coroutines.
Features:
- Connection pooling per SMTP account
- Configurable max connections per account
- TTL-based connection expiration
- Health checking via SMTP NOOP commands
- Automatic reconnection when connections become stale
- Context manager support for automatic release
Example:
Using the SMTP pool with context manager::
pool = SMTPPool(ttl=300, max_per_account=5)
async with pool.connection(
host="smtp.example.com",
port=465,
user="sender@example.com",
password="secret",
use_tls=True
) as smtp:
await smtp.send_message(message)
# Connection automatically released back to pool
Manual acquire/release::
smtp = await pool.acquire(host, port, user, password, use_tls=True)
try:
await smtp.send_message(message)
finally:
await pool.release(smtp)
"""
import asyncio
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
import aiosmtplib
[docs]
@dataclass
class PooledConnection:
"""Wrapper for a pooled SMTP connection with metadata."""
smtp: aiosmtplib.SMTP
account_key: str
created_at: float = field(default_factory=time.time)
last_used: float = field(default_factory=time.time)
[docs]
def touch(self) -> None:
"""Update last_used timestamp."""
self.last_used = time.time()
[docs]
def age(self) -> float:
"""Return connection age in seconds."""
return time.time() - self.created_at
[docs]
def idle_time(self) -> float:
"""Return time since last use in seconds."""
return time.time() - self.last_used
[docs]
class SMTPPool:
"""SMTP connection pool with acquire/release semantics.
Maintains pools of SMTP connections indexed by account key (host:port:user).
Connections can be acquired by any coroutine and must be released after use.
This enables efficient connection sharing across concurrent operations.
The pool uses TTL-based expiration and NOOP health checks to ensure
connections remain valid. Invalid connections are discarded and replaced.
Attributes:
ttl: Maximum age in seconds for pooled connections.
max_per_account: Maximum connections per SMTP account.
"""
[docs]
def __init__(self, ttl: int = 300, max_per_account: int = 5):
"""Initialize the SMTP connection pool.
Args:
ttl: Time-to-live in seconds for connections. Defaults to 300.
max_per_account: Max concurrent connections per account. Defaults to 5.
"""
self.ttl = ttl
self.max_per_account = max_per_account
# Pool of idle connections: account_key -> list of PooledConnection
self._idle: dict[str, list[PooledConnection]] = {}
# Count of active (acquired) connections per account
self._active_count: dict[str, int] = {}
# Condition for waiting when pool is full
self._conditions: dict[str, asyncio.Condition] = {}
# Global lock for pool operations
self._lock = asyncio.Lock()
# Track connection -> account_key for release
self._connection_keys: dict[int, str] = {}
def _make_key(self, host: str, port: int, user: str | None) -> str:
"""Create account key from connection parameters."""
return f"{host}:{port}:{user or ''}"
async def _get_condition(self, key: str) -> asyncio.Condition:
"""Get or create condition variable for an account."""
if key not in self._conditions:
self._conditions[key] = asyncio.Condition()
return self._conditions[key]
async def _connect(
self,
host: str,
port: int,
user: str | None,
password: str | None,
use_tls: bool,
) -> aiosmtplib.SMTP:
"""Establish a new SMTP connection.
TLS behavior:
- Port 465 with use_tls=True: Direct TLS (implicit)
- Port 587 with use_tls=True: STARTTLS
- use_tls=False: Plain SMTP
Args:
host: SMTP server hostname.
port: SMTP server port.
user: Username for auth, or None.
password: Password for auth, or None.
use_tls: Whether to use TLS.
Returns:
Connected aiosmtplib.SMTP instance.
Raises:
asyncio.TimeoutError: If connection times out.
aiosmtplib.SMTPException: If connection fails.
"""
if use_tls and port == 465:
smtp = aiosmtplib.SMTP(
hostname=host, port=port, start_tls=False, use_tls=True, timeout=10.0
)
elif use_tls:
smtp = aiosmtplib.SMTP(
hostname=host, port=port, start_tls=True, use_tls=False, timeout=10.0
)
else:
smtp = aiosmtplib.SMTP(
hostname=host, port=port, start_tls=False, use_tls=False, timeout=10.0
)
async def _do_connect():
await smtp.connect()
if user and password:
await smtp.login(user, password)
await asyncio.wait_for(_do_connect(), timeout=15.0)
return smtp
async def _is_alive(self, smtp: aiosmtplib.SMTP) -> bool:
"""Check if connection is alive via NOOP command."""
try:
code, _ = await asyncio.wait_for(smtp.noop(), timeout=5.0)
return code == 250
except Exception:
return False
async def _close_connection(self, conn: PooledConnection) -> None:
"""Close a pooled connection gracefully."""
try:
await asyncio.wait_for(conn.smtp.quit(), timeout=5.0)
except Exception:
pass
[docs]
async def acquire(
self,
host: str,
port: int,
user: str | None,
password: str | None,
*,
use_tls: bool,
timeout: float | None = 30.0,
) -> aiosmtplib.SMTP:
"""Acquire a connection from the pool.
Returns an idle connection if available, otherwise creates a new one.
If max connections reached, waits until one becomes available.
Args:
host: SMTP server hostname.
port: SMTP server port.
user: Username for auth, or None.
password: Password for auth, or None.
use_tls: Whether to use TLS.
timeout: Max seconds to wait for connection. None = wait forever.
Returns:
SMTP connection ready for use.
Raises:
asyncio.TimeoutError: If timeout waiting for connection.
"""
key = self._make_key(host, port, user)
deadline = time.time() + timeout if timeout else None
while True:
async with self._lock:
# Try to get an idle connection
if key in self._idle and self._idle[key]:
conn = self._idle[key].pop()
# Check TTL
if conn.age() > self.ttl:
await self._close_connection(conn)
continue
# Check health
if not await self._is_alive(conn.smtp):
await self._close_connection(conn)
continue
# Valid connection found
conn.touch()
self._active_count[key] = self._active_count.get(key, 0) + 1
self._connection_keys[id(conn.smtp)] = key
return conn.smtp
# No idle connection - can we create new?
active = self._active_count.get(key, 0)
if active < self.max_per_account:
# Create new connection
self._active_count[key] = active + 1
# Create connection outside lock
if active < self.max_per_account:
try:
smtp = await self._connect(host, port, user, password, use_tls)
async with self._lock:
self._connection_keys[id(smtp)] = key
return smtp
except Exception:
async with self._lock:
self._active_count[key] = self._active_count.get(key, 1) - 1
raise
# Pool is full - wait for release
condition = await self._get_condition(key)
async with condition:
remaining = deadline - time.time() if deadline else None
if remaining is not None and remaining <= 0:
raise asyncio.TimeoutError("Timeout waiting for SMTP connection")
try:
await asyncio.wait_for(condition.wait(), timeout=remaining)
except asyncio.TimeoutError as err:
raise asyncio.TimeoutError("Timeout waiting for SMTP connection") from err
[docs]
async def release(self, smtp: aiosmtplib.SMTP) -> None:
"""Release a connection back to the pool.
The connection is returned to the idle pool for reuse by other
coroutines. If the connection is unhealthy, it is closed instead.
Args:
smtp: The SMTP connection to release.
"""
smtp_id = id(smtp)
async with self._lock:
key = self._connection_keys.pop(smtp_id, None)
if key is None:
# Connection not tracked - just close it
try:
await smtp.quit()
except Exception:
pass
return
self._active_count[key] = max(0, self._active_count.get(key, 1) - 1)
# Check if connection is still healthy before returning to pool
if await self._is_alive(smtp):
conn = PooledConnection(smtp=smtp, account_key=key)
if key not in self._idle:
self._idle[key] = []
self._idle[key].append(conn)
else:
try:
await smtp.quit()
except Exception:
pass
# Notify waiters that a connection is available
condition = await self._get_condition(key)
async with condition:
condition.notify()
[docs]
@asynccontextmanager
async def connection(
self,
host: str,
port: int,
user: str | None,
password: str | None,
*,
use_tls: bool,
timeout: float | None = 30.0,
):
"""Context manager for automatic acquire/release.
Example:
async with pool.connection(host, port, user, pwd, use_tls=True) as smtp:
await smtp.send_message(msg)
"""
smtp = await self.acquire(host, port, user, password, use_tls=use_tls, timeout=timeout)
try:
yield smtp
finally:
await self.release(smtp)
[docs]
async def get_connection(
self,
host: str,
port: int,
user: str | None,
password: str | None,
*,
use_tls: bool,
) -> aiosmtplib.SMTP:
"""Legacy API: Get a connection (acquire without explicit release).
DEPRECATED: Use acquire()/release() or connection() context manager.
This method exists for backward compatibility. Connections obtained
this way should be released manually, or they will be cleaned up
when the pool is cleaned.
Args:
host: SMTP server hostname.
port: SMTP server port.
user: Username for auth, or None.
password: Password for auth, or None.
use_tls: Whether to use TLS.
Returns:
Connected SMTP instance.
"""
return await self.acquire(host, port, user, password, use_tls=use_tls)
[docs]
async def cleanup(self) -> None:
"""Remove expired and unhealthy connections from idle pools.
Should be called periodically to prevent resource leaks.
"""
to_close: list[PooledConnection] = []
async with self._lock:
for key in list(self._idle.keys()):
valid: list[PooledConnection] = []
for conn in self._idle[key]:
if conn.age() > self.ttl or not await self._is_alive(conn.smtp):
to_close.append(conn)
else:
valid.append(conn)
self._idle[key] = valid
# Clean up empty entries
if not self._idle[key]:
del self._idle[key]
# Close connections outside lock
for conn in to_close:
await self._close_connection(conn)
[docs]
async def close_all(self) -> None:
"""Close all connections in the pool.
Use when shutting down the application.
"""
to_close: list[PooledConnection] = []
async with self._lock:
for key in list(self._idle.keys()):
to_close.extend(self._idle[key])
self._idle.clear()
self._active_count.clear()
self._connection_keys.clear()
for conn in to_close:
await self._close_connection(conn)
[docs]
def stats(self) -> dict:
"""Return pool statistics.
Returns:
Dict with idle counts, active counts per account.
"""
return {
"idle": {k: len(v) for k, v in self._idle.items()},
"active": dict(self._active_count),
"max_per_account": self.max_per_account,
"ttl": self.ttl,
}
__all__ = ["SMTPPool", "PooledConnection"]