Source code for core.mail_proxy.smtp.attachments

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: Apache-2.0
"""Attachment fetching from multiple storage backends.

Provides AttachmentManager for retrieving email attachments with:
- Base64 inline content decoding
- Filesystem fetching with path traversal protection
- HTTP fetching with authentication support
- Optional MD5-based caching

Supported fetch_mode values:
- endpoint: HTTP POST to tenant's attachment URL
- http_url: Direct HTTP fetch from URL in storage_path
- base64: Inline base64-encoded content in storage_path
- filesystem: Local filesystem path (absolute or relative to base_dir)

If fetch_mode is not specified, it is inferred from storage_path format:
- ``base64:...`` prefix -> base64 (prefix is stripped)
- ``http://`` or ``https://`` prefix -> http_url
- ``/`` (absolute path) -> filesystem
- otherwise -> endpoint (default)
"""

from __future__ import annotations

import asyncio
import base64
import mimetypes
import re
from pathlib import Path
from typing import Any

import aiohttp

from .cache import TieredCache

MD5_MARKER_PATTERN = re.compile(r"\{MD5:([a-fA-F0-9]+)\}")


[docs] class Base64Fetcher: """Decoder for base64-encoded inline attachment content."""
[docs] async def fetch(self, base64_content: str) -> bytes | None: if not base64_content: return None try: content = base64_content.strip() padding_needed = 4 - (len(content) % 4) if padding_needed != 4: content += "=" * padding_needed return base64.b64decode(content, validate=True) except Exception as e: raise ValueError(f"Invalid base64 content: {e}") from e
[docs] class StorageFetcher: """Fetcher for attachments via StorageManager (mount:path format). In CE: supports only 'local' protocol (filesystem). In EE: supports S3, GCS, Azure via fsspec. Path formats: - "mount:path/to/file" → uses configured mount point - "/absolute/path" → direct filesystem access (legacy compatibility) """
[docs] def __init__(self, storage_manager: Any = None): """Initialize with optional StorageManager. Args: storage_manager: StorageManager instance. If None, only absolute paths work. """ self._storage_manager = storage_manager
[docs] async def fetch(self, path: str) -> bytes | None: """Fetch file content from storage. Args: path: Either "mount:relative/path" or "/absolute/path". Returns: File content as bytes. Raises: ValueError: If path format invalid or mount not configured. FileNotFoundError: If file doesn't exist. """ if not path: raise ValueError("Empty path provided") # Mount-based path (mount:path) if ":" in path and not path.startswith("/"): return await self._fetch_from_mount(path) # Absolute path (legacy filesystem) if path.startswith("/"): return await self._fetch_absolute(path) raise ValueError(f"Invalid path format: '{path}'. Use 'mount:path' or absolute path.")
async def _fetch_from_mount(self, path: str) -> bytes: """Fetch from a configured mount point.""" if not self._storage_manager: raise ValueError("StorageManager not configured. Cannot resolve mount paths.") node = self._storage_manager.node(path) if not await node.exists(): raise FileNotFoundError(f"File not found: {path}") if not await node.is_file(): raise ValueError(f"Not a regular file: {path}") return await node.read_bytes() async def _fetch_absolute(self, path: str) -> bytes: """Fetch from absolute filesystem path (legacy compatibility).""" file_path = Path(path).resolve() if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if not file_path.is_file(): raise ValueError(f"Not a regular file: {file_path}") return await asyncio.to_thread(file_path.read_bytes) @property def storage_manager(self) -> Any: """Get the StorageManager instance.""" return self._storage_manager
[docs] class HttpFetcher: """Fetcher for HTTP-served attachments with authentication support."""
[docs] def __init__( self, default_endpoint: str | None = None, auth_config: dict[str, str] | None = None, ): self._default_endpoint = default_endpoint self._auth_config = auth_config or {}
def _parse_path(self, path: str) -> tuple[str, str]: if path.startswith("["): match = re.match(r"\[([^\]]+)\](.*)", path) if not match: raise ValueError(f"Invalid HTTP path format: {path}") return match.group(1), match.group(2) if path.startswith(("http://", "https://")): return path, "" if not self._default_endpoint: raise ValueError("No default endpoint configured and path doesn't specify one") return self._default_endpoint, path def _get_auth_headers(self, auth_override: dict[str, str] | None = None) -> dict[str, str]: auth_config = auth_override if auth_override is not None else self._auth_config method = auth_config.get("method", "none") if method == "bearer": token = auth_config.get("token", "") return {"Authorization": f"Bearer {token}"} if method == "basic": user = auth_config.get("user", "") password = auth_config.get("password", "") credentials = base64.b64encode(f"{user}:{password}".encode()).decode() return {"Authorization": f"Basic {credentials}"} return {}
[docs] async def fetch(self, path: str, auth_override: dict[str, str] | None = None) -> bytes: server_url, params = self._parse_path(path) headers = self._get_auth_headers(auth_override) async with aiohttp.ClientSession() as session: if not params: async with session.get(server_url, headers=headers) as response: response.raise_for_status() return await response.read() else: async with session.post( server_url, json={"storage_path": params}, headers=headers, ) as response: response.raise_for_status() return await response.read()
@property def default_endpoint(self) -> str | None: return self._default_endpoint
[docs] class AttachmentManager: """High-level interface for fetching email attachments from multiple sources."""
[docs] def __init__( self, storage_manager: Any = None, http_endpoint: str | None = None, http_auth_config: dict[str, str] | None = None, cache: TieredCache | None = None, ): self._base64_fetcher = Base64Fetcher() self._storage_fetcher = StorageFetcher(storage_manager=storage_manager) self._http_fetcher = HttpFetcher( default_endpoint=http_endpoint, auth_config=http_auth_config, ) self._cache = cache
[docs] @staticmethod def parse_filename(filename: str) -> tuple[str, str | None]: """Extract MD5 marker from filename if present.""" match = MD5_MARKER_PATTERN.search(filename) if not match: return filename, None md5_hash = match.group(1).lower() clean_filename = MD5_MARKER_PATTERN.sub("", filename) clean_filename = re.sub(r"_+", "_", clean_filename) clean_filename = clean_filename.strip("_") clean_filename = re.sub(r"_\.", ".", clean_filename) return clean_filename, md5_hash
def _parse_storage_path(self, path: str, fetch_mode: str | None = None) -> tuple[str, str]: if not path: raise ValueError("Empty storage_path") if not fetch_mode: if path.startswith("base64:"): fetch_mode = "base64" path = path[7:] elif path.startswith(("http://", "https://")): fetch_mode = "http_url" elif path.startswith("/"): fetch_mode = "storage" # absolute path → storage fetcher elif ":" in path: fetch_mode = "storage" # mount:path → storage fetcher else: fetch_mode = "endpoint" if fetch_mode == "endpoint": return ("http", path) if fetch_mode == "http_url": return ("http", f"[{path}]") if fetch_mode == "base64": if path.startswith("base64:"): path = path[7:] return ("base64", path) if fetch_mode in ("storage", "filesystem"): return ("storage", path) raise ValueError(f"Unknown fetch_mode: {fetch_mode}")
[docs] async def fetch(self, att: dict[str, Any]) -> tuple[bytes, str] | None: """Retrieve attachment content with caching and filename cleanup.""" storage_path = att.get("storage_path") if not storage_path: return None raw_filename = att.get("filename", "file.bin") clean_filename, md5_from_marker = self.parse_filename(raw_filename) content_md5 = att.get("content_md5") fetch_mode = att.get("fetch_mode") auth = att.get("auth") cache_key = content_md5 or md5_from_marker if cache_key and self._cache: cached = await self._cache.get(cache_key) if cached is not None: return cached, clean_filename content = await self._fetch_from_backend( storage_path, fetch_mode=fetch_mode, auth_override=auth ) if content is None: return None if self._cache: actual_md5 = TieredCache.compute_md5(content) await self._cache.set(actual_md5, content) return content, clean_filename
async def _fetch_from_backend( self, storage_path: str, fetch_mode: str | None = None, auth_override: dict[str, Any] | None = None, ) -> bytes | None: path_type, parsed_path = self._parse_storage_path(storage_path, fetch_mode) if path_type == "base64": return await self._base64_fetcher.fetch(parsed_path) if path_type == "storage": return await self._storage_fetcher.fetch(parsed_path) if path_type == "http": return await self._http_fetcher.fetch(parsed_path, auth_override) raise ValueError(f"Unknown path type: {path_type}")
[docs] @staticmethod def guess_mime(filename: str) -> tuple[str, str]: """Determine the MIME type for a filename based on its extension.""" mt, _ = mimetypes.guess_type(filename) if not mt: return ("application", "octet-stream") return tuple(mt.split("/", 1)) # type: ignore[return-value]