Source code for async_mail_service.prometheus

"""Prometheus metrics exposed by the mail dispatcher."""

from prometheus_client import Counter, Gauge, CollectorRegistry, generate_latest

[docs] class MailMetrics: """Wrapper around the Prometheus registry used by the service.""" def __init__(self, registry: CollectorRegistry | None = None): """Create counters and gauges inside the provided registry.""" self.registry = registry or CollectorRegistry() self.sent = Counter("asyncmail_sent_total", "Total sent emails", ["account_id"], registry=self.registry) self.errors = Counter("asyncmail_errors_total", "Total send errors", ["account_id"], registry=self.registry) self.deferred = Counter("asyncmail_deferred_total", "Total deferred emails", ["account_id"], registry=self.registry) self.rate_limited = Counter("asyncmail_rate_limited_total", "Total rate limited occurrences", ["account_id"], registry=self.registry) self.pending = Gauge("asyncmail_pending_messages", "Current pending messages", registry=self.registry)
[docs] def inc_sent(self, account_id: str): """Increase the ``sent`` counter for the given account.""" self.sent.labels(account_id=account_id or "default").inc()
[docs] def inc_error(self, account_id: str): """Increase the ``errors`` counter for the given account.""" self.errors.labels(account_id=account_id or "default").inc()
[docs] def inc_deferred(self, account_id: str): """Increase the ``deferred`` counter for the given account.""" self.deferred.labels(account_id=account_id or "default").inc()
[docs] def inc_rate_limited(self, account_id: str): """Increase the ``rate_limited`` counter for the given account.""" self.rate_limited.labels(account_id=account_id or "default").inc()
[docs] def set_pending(self, value: int): """Update the gauge tracking pending messages.""" self.pending.set(value)
[docs] def generate_latest(self) -> bytes: """Return the latest metrics snapshot in Prometheus text format.""" return generate_latest(self.registry)