Source code for enterprise.mail_proxy.bounce.parser

# Copyright 2025 Softwell S.r.l. - SPDX-License-Identifier: BSL-1.1
"""Bounce message parser for DSN (RFC 3464) and heuristic detection."""

from __future__ import annotations

import email
import re
from dataclasses import dataclass
from email.message import Message
from typing import Literal


[docs] @dataclass class BounceInfo: """Parsed bounce information.""" original_message_id: str | None # X-Genro-Mail-ID from original message bounce_type: Literal["hard", "soft"] | None bounce_code: str | None # SMTP code e.g. "550", "421" bounce_reason: str | None recipient: str | None
[docs] class BounceParser: """Parse bounce messages (DSN RFC 3464 + heuristics).""" # Hard bounce codes (5xx permanent failures) HARD_BOUNCE_CODES = frozenset( [ "500", "501", "502", "503", "504", "510", "511", "512", "513", "521", "522", "523", "530", "531", "532", "541", "542", "543", "550", "551", "552", "553", "554", "555", "556", "557", ] ) # Soft bounce codes (4xx temporary failures) SOFT_BOUNCE_CODES = frozenset( [ "400", "401", "402", "403", "404", "405", "407", "408", "409", "410", "411", "412", "413", "414", "421", "422", "431", "432", "441", "442", "450", "451", "452", "453", "454", "455", "456", "471", "472", ] ) # Patterns for extracting bounce info from non-standard bounces BOUNCE_SUBJECT_PATTERNS = [ re.compile( r"(?:mail|message|delivery)\s*(?:delivery|failure|failed|returned|undeliverable)", re.I ), re.compile(r"undelivered\s*mail\s*returned", re.I), re.compile(r"(?:returned|bounced)\s*mail", re.I), re.compile(r"failure\s*notice", re.I), ] SMTP_CODE_PATTERN = re.compile(r"\b([45]\d{2})\b") ENHANCED_CODE_PATTERN = re.compile(r"\b([45])\.(\d+)\.(\d+)\b")
[docs] def parse(self, raw_email: bytes) -> BounceInfo: """Parse a bounce message and extract bounce information.""" msg = email.message_from_bytes(raw_email) content_type = msg.get_content_type() # Check if this is a DSN (RFC 3464) if content_type == "multipart/report": report_type = msg.get_param("report-type") or "" if isinstance(report_type, str) and report_type.lower() == "delivery-status": return self._parse_dsn(msg) # Fallback: heuristic parsing return self._parse_heuristic(msg)
def _parse_dsn(self, msg: Message) -> BounceInfo: """Parse a standard DSN (RFC 3464) bounce message.""" original_id: str | None = None bounce_type: Literal["hard", "soft"] | None = None bounce_code: str | None = None bounce_reason: str | None = None recipient: str | None = None for part in msg.walk(): part_type = part.get_content_type() # Parse delivery-status part (RFC 3464 uses message/delivery-status, # but some MTAs use text/delivery-status - accept both) if part_type in ("message/delivery-status", "text/delivery-status"): payload = part.get_payload() if isinstance(payload, list): for status_part in payload: status_text = str(status_part) r, c, t, m = self._extract_dsn_info(status_text) recipient = r or recipient bounce_code = c or bounce_code bounce_type = t or bounce_type bounce_reason = m or bounce_reason elif isinstance(payload, str): recipient, bounce_code, bounce_type, bounce_reason = self._extract_dsn_info( payload ) # Extract original message ID from attached original message # RFC 3464 uses message/rfc822 for full message or text/rfc822-headers for headers only # MIMEMessage with "rfc822-headers" produces message/rfc822-headers - accept all variants elif part_type in ("message/rfc822", "message/rfc822-headers", "text/rfc822-headers"): original_id = self._extract_original_id(part) return BounceInfo( original_message_id=original_id, bounce_type=bounce_type, bounce_code=bounce_code, bounce_reason=bounce_reason, recipient=recipient, ) def _extract_dsn_info( self, status_text: str ) -> tuple[str | None, str | None, Literal["hard", "soft"] | None, str | None]: """Extract bounce info from DSN status text.""" recipient: str | None = None bounce_code: str | None = None bounce_type: Literal["hard", "soft"] | None = None bounce_reason: str | None = None for line in status_text.split("\n"): line = line.strip() lower_line = line.lower() if lower_line.startswith("final-recipient:"): # Format: Final-Recipient: rfc822; user@example.com parts = line.split(";", 1) if len(parts) > 1: recipient = parts[1].strip() elif lower_line.startswith("status:"): # Format: Status: 5.1.1 or Status: 550 status = line.split(":", 1)[1].strip() # Try enhanced status code first match = self.ENHANCED_CODE_PATTERN.search(status) if match: class_digit = match.group(1) bounce_code = f"{class_digit}{match.group(2)}{match.group(3)}" bounce_type = "hard" if class_digit == "5" else "soft" else: # Try simple code match = self.SMTP_CODE_PATTERN.search(status) if match: bounce_code = match.group(1) bounce_type = "hard" if bounce_code.startswith("5") else "soft" elif lower_line.startswith("diagnostic-code:"): # Format: Diagnostic-Code: smtp; 550 User unknown diag = line.split(":", 1)[1].strip() if ";" in diag: diag = diag.split(";", 1)[1].strip() bounce_reason = diag[:500] # Limit length # Extract code from diagnostic if not found yet if not bounce_code: match = self.SMTP_CODE_PATTERN.search(diag) if match: bounce_code = match.group(1) bounce_type = "hard" if bounce_code.startswith("5") else "soft" elif lower_line.startswith("action:"): action = line.split(":", 1)[1].strip().lower() if action == "failed" and bounce_type is None: bounce_type = "hard" elif action in ("delayed", "relayed", "expanded"): bounce_type = "soft" return recipient, bounce_code, bounce_type, bounce_reason def _parse_heuristic(self, msg: Message) -> BounceInfo: """Parse bounce using heuristics for non-standard bounces.""" original_id: str | None = None bounce_type: Literal["hard", "soft"] | None = None bounce_code: str | None = None bounce_reason: str | None = None recipient: str | None = None # Check subject for bounce indicators subject = msg.get("Subject", "") is_bounce = any(p.search(subject) for p in self.BOUNCE_SUBJECT_PATTERNS) if not is_bounce: # Check From header for common bounce senders from_addr = msg.get("From", "").lower() is_bounce = any( pattern in from_addr for pattern in ["mailer-daemon", "postmaster", "mail-daemon", "maildelivery"] ) if not is_bounce: return BounceInfo(None, None, None, None, None) # Search body for SMTP codes and reasons body = self._get_text_body(msg) if body: # Find SMTP code match = self.SMTP_CODE_PATTERN.search(body) if match: bounce_code = match.group(1) bounce_type = "hard" if bounce_code.startswith("5") else "soft" # Extract first few lines as reason (limit length) lines = [line.strip() for line in body.split("\n") if line.strip()][:5] bounce_reason = " ".join(lines)[:500] # Try to find recipient email email_pattern = re.compile(r"[\w.+-]+@[\w.-]+\.\w+") emails = email_pattern.findall(body[:1000]) if emails: recipient = emails[0] # Try to find original message ID in body or attachments original_id = self._find_original_id_in_message(msg) return BounceInfo( original_message_id=original_id, bounce_type=bounce_type or "hard", # Default to hard if we detected a bounce bounce_code=bounce_code, bounce_reason=bounce_reason, recipient=recipient, ) def _extract_original_id(self, part: Message) -> str | None: """Extract X-Genro-Mail-ID from original message part.""" payload = part.get_payload() if isinstance(payload, list) and payload: inner_msg = payload[0] if hasattr(inner_msg, "get"): return inner_msg.get("X-Genro-Mail-ID") elif isinstance(payload, str): # Search for header in text match = re.search(r"X-Genro-Mail-ID:\s*(\S+)", payload, re.I) if match: return match.group(1).strip() return None def _find_original_id_in_message(self, msg: Message) -> str | None: """Search for X-Genro-Mail-ID anywhere in the message.""" for part in msg.walk(): # Check headers original_id = part.get("X-Genro-Mail-ID") if original_id: return original_id # Check body text if part.get_content_type() in ("text/plain", "text/html", "message/rfc822"): payload = part.get_payload(decode=True) if isinstance(payload, bytes): text = payload.decode("utf-8", errors="replace") match = re.search(r"X-Genro-Mail-ID:\s*(\S+)", text, re.I) if match: return match.group(1).strip() return None def _get_text_body(self, msg: Message) -> str: """Extract text body from message.""" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if isinstance(payload, bytes): return payload.decode("utf-8", errors="replace") return "" else: payload = msg.get_payload(decode=True) if isinstance(payload, bytes): return payload.decode("utf-8", errors="replace") return str(payload) if payload else ""
__all__ = ["BounceInfo", "BounceParser"]