Source code for core.mail_proxy.smtp.cache

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: Apache-2.0
"""Two-tiered cache for attachment content.

Provides content-addressable caching using MD5 hash as key with
automatic tier selection based on content size.

Components:
    MemoryCache: Fast LRU cache with short TTL for small files.
    DiskCache: Persistent cache with longer TTL for larger files.
    TieredCache: Combined memory + disk cache with automatic routing.

Example:
    Create and use a tiered cache::

        cache = TieredCache(
            memory_max_mb=50,
            disk_dir="/var/cache/attachments",
            disk_threshold_kb=100,
        )
        await cache.init()

        content = await cache.get("a1b2c3d4...")
        if content is None:
            content = await fetch_from_storage()
            await cache.set(TieredCache.compute_md5(content), content)

Note:
    Files smaller than disk_threshold_kb are cached in memory only.
    Larger files go to disk cache. Memory cache is checked first
    on reads for optimal performance.
"""

from __future__ import annotations

import asyncio
import hashlib
import time
from collections import OrderedDict
from pathlib import Path


[docs] class MemoryCache: """LRU in-memory cache with TTL and size limits. Attributes: _max_bytes: Maximum cache size in bytes. _ttl_seconds: Time-to-live for entries. """
[docs] def __init__(self, max_mb: float = 50, ttl_seconds: int = 300): self._max_bytes = int(max_mb * 1024 * 1024) self._ttl_seconds = ttl_seconds self._cache: OrderedDict[str, tuple[bytes, float]] = OrderedDict() self._current_bytes = 0
[docs] def get(self, md5_hash: str) -> bytes | None: entry = self._cache.get(md5_hash) if entry is None: return None content, timestamp = entry if time.time() - timestamp > self._ttl_seconds: self._remove(md5_hash) return None self._cache.move_to_end(md5_hash) return content
[docs] def set(self, md5_hash: str, content: bytes) -> None: content_size = len(content) if content_size > self._max_bytes: return if md5_hash in self._cache: self._remove(md5_hash) while self._current_bytes + content_size > self._max_bytes and self._cache: oldest_key = next(iter(self._cache)) self._remove(oldest_key) self._cache[md5_hash] = (content, time.time()) self._current_bytes += content_size
def _remove(self, md5_hash: str) -> None: if md5_hash in self._cache: content, _ = self._cache.pop(md5_hash) self._current_bytes -= len(content)
[docs] def clear(self) -> None: self._cache.clear() self._current_bytes = 0
[docs] def cleanup_expired(self) -> int: now = time.time() expired = [ key for key, (_, timestamp) in self._cache.items() if now - timestamp > self._ttl_seconds ] for key in expired: self._remove(key) return len(expired)
@property def size_bytes(self) -> int: return self._current_bytes @property def entry_count(self) -> int: return len(self._cache)
[docs] class DiskCache: """Persistent disk cache with TTL and size limits. Uses subdirectory structure based on MD5 prefix for efficient filesystem operations with large numbers of files. Attributes: _cache_dir: Root directory for cached files. _max_bytes: Maximum total cache size in bytes. _ttl_seconds: Time-to-live for entries. """
[docs] def __init__( self, cache_dir: str, max_mb: float = 500, ttl_seconds: int = 3600, ): self._cache_dir = Path(cache_dir) self._max_bytes = int(max_mb * 1024 * 1024) self._ttl_seconds = ttl_seconds self._lock = asyncio.Lock()
[docs] async def init(self) -> None: self._cache_dir.mkdir(parents=True, exist_ok=True)
def _file_path(self, md5_hash: str) -> Path: subdir = md5_hash[:2] return self._cache_dir / subdir / md5_hash
[docs] async def get(self, md5_hash: str) -> bytes | None: file_path = self._file_path(md5_hash) if not file_path.exists(): return None try: mtime = file_path.stat().st_mtime if time.time() - mtime > self._ttl_seconds: await self._remove(md5_hash) return None return await asyncio.to_thread(file_path.read_bytes) except OSError: return None
[docs] async def set(self, md5_hash: str, content: bytes) -> None: content_size = len(content) if content_size > self._max_bytes: return async with self._lock: await self._ensure_space(content_size) file_path = self._file_path(md5_hash) file_path.parent.mkdir(parents=True, exist_ok=True) await asyncio.to_thread(file_path.write_bytes, content)
async def _remove(self, md5_hash: str) -> None: file_path = self._file_path(md5_hash) try: if file_path.exists(): await asyncio.to_thread(file_path.unlink) if file_path.parent.exists() and not any(file_path.parent.iterdir()): await asyncio.to_thread(file_path.parent.rmdir) except OSError: pass async def _ensure_space(self, needed_bytes: int) -> None: current_size = await self._get_total_size() if current_size + needed_bytes <= self._max_bytes: return files = await self._get_cache_files_by_age() for file_path, file_size in files: if current_size + needed_bytes <= self._max_bytes: break try: await asyncio.to_thread(file_path.unlink) current_size -= file_size except OSError: pass async def _get_total_size(self) -> int: total = 0 if not self._cache_dir.exists(): return 0 for subdir in self._cache_dir.iterdir(): if subdir.is_dir(): for file_path in subdir.iterdir(): if file_path.is_file(): total += file_path.stat().st_size return total async def _get_cache_files_by_age(self) -> list[tuple[Path, int]]: files = [] if not self._cache_dir.exists(): return files for subdir in self._cache_dir.iterdir(): if subdir.is_dir(): for file_path in subdir.iterdir(): if file_path.is_file(): stat = file_path.stat() files.append((file_path, stat.st_size, stat.st_mtime)) files.sort(key=lambda x: x[2]) return [(f[0], f[1]) for f in files]
[docs] async def cleanup_expired(self) -> int: now = time.time() removed = 0 if not self._cache_dir.exists(): return 0 for subdir in self._cache_dir.iterdir(): if not subdir.is_dir(): continue for file_path in subdir.iterdir(): if not file_path.is_file(): continue try: mtime = file_path.stat().st_mtime if now - mtime > self._ttl_seconds: await asyncio.to_thread(file_path.unlink) removed += 1 except OSError: pass try: if not any(subdir.iterdir()): await asyncio.to_thread(subdir.rmdir) except OSError: pass return removed
[docs] async def clear(self) -> None: if not self._cache_dir.exists(): return for subdir in self._cache_dir.iterdir(): if subdir.is_dir(): for file_path in subdir.iterdir(): try: await asyncio.to_thread(file_path.unlink) except OSError: pass try: await asyncio.to_thread(subdir.rmdir) except OSError: pass
[docs] class TieredCache: """Two-tiered cache combining memory and disk storage. Routes content to appropriate tier based on size threshold. Small files go to memory for fast access, large files to disk for persistence. Reads check memory first, then disk. Attributes: _memory: MemoryCache instance for small files. _disk: Optional DiskCache for large files. _threshold_bytes: Size threshold for tier selection. """
[docs] def __init__( self, memory_max_mb: float = 50, memory_ttl_seconds: int = 300, disk_dir: str | None = None, disk_max_mb: float = 500, disk_ttl_seconds: int = 3600, disk_threshold_kb: float = 100, ): self._memory = MemoryCache(max_mb=memory_max_mb, ttl_seconds=memory_ttl_seconds) self._disk: DiskCache | None = None if disk_dir: self._disk = DiskCache( cache_dir=disk_dir, max_mb=disk_max_mb, ttl_seconds=disk_ttl_seconds, ) self._threshold_bytes = int(disk_threshold_kb * 1024)
[docs] async def init(self) -> None: if self._disk: await self._disk.init()
[docs] async def get(self, md5_hash: str) -> bytes | None: content = self._memory.get(md5_hash) if content is not None: return content if self._disk: content = await self._disk.get(md5_hash) if content is not None: if len(content) < self._threshold_bytes: self._memory.set(md5_hash, content) return content return None
[docs] async def set(self, md5_hash: str, content: bytes) -> None: if len(content) < self._threshold_bytes: self._memory.set(md5_hash, content) elif self._disk: await self._disk.set(md5_hash, content)
[docs] async def cleanup_expired(self) -> tuple[int, int]: memory_removed = self._memory.cleanup_expired() disk_removed = 0 if self._disk: disk_removed = await self._disk.cleanup_expired() return memory_removed, disk_removed
[docs] async def clear(self) -> None: self._memory.clear() if self._disk: await self._disk.clear()
[docs] @staticmethod def compute_md5(content: bytes) -> str: return hashlib.md5(content).hexdigest()