Notifications¶
Notifications are producer-driven with minimal core fields. The notification system supports multiple channels (in-app, webhook, Slack) with throttling, retries, and user subscription preferences.
Architecture¶
flowchart LR
Kafka[(Kafka)] --> NS[NotificationService]
NS --> DB[(MongoDB)]
NS --> SSE[SSE Bus]
NS --> Webhook[Webhook]
NS --> Slack[Slack]
SSE --> Browser
The NotificationService implements LifecycleEnabled and starts its Kafka consumer during application startup.
It subscribes to execution result events (EXECUTION_COMPLETED, EXECUTION_FAILED, EXECUTION_TIMEOUT) and
automatically creates notifications for users when their executions complete.
Core fields¶
| Field | Description |
|---|---|
subject |
Short title |
body |
Text content |
channel |
in_app, webhook, or slack |
severity |
low, medium, high, or urgent |
tags |
List of strings for filtering, e.g. ["execution", "failed"] |
status |
pending, sending, delivered, failed, skipped, read, clicked |
Tag conventions¶
Producers include structured tags for filtering, UI actions, and correlation.
| Tag type | Purpose | Examples |
|---|---|---|
| Category | What the notification is about | execution |
| Entity | Entity type | entity:execution |
| Reference | Link to specific resource | exec:<execution_id> |
| Outcome | What happened | completed, failed, timeout |
Examples¶
Execution completed:
Execution failed:
Throttling¶
The service throttles notifications per user per severity window:
@dataclass
class ThrottleCache:
"""Manages notification throttling with time windows."""
_entries: dict[str, list[datetime]] = field(default_factory=dict)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
async def check_throttle(
self,
user_id: str,
severity: NotificationSeverity,
window_hours: int,
max_per_hour: int,
) -> bool:
"""Check if notification should be throttled."""
key = f"{user_id}:{severity}"
now = datetime.now(UTC)
window_start = now - timedelta(hours=window_hours)
async with self._lock:
if key not in self._entries:
self._entries[key] = []
# Clean old entries
self._entries[key] = [ts for ts in self._entries[key] if ts > window_start]
# Check limit
if len(self._entries[key]) >= max_per_hour:
return True
# Add new entry
self._entries[key].append(now)
return False
async def clear(self) -> None:
"""Clear all throttle entries."""
async with self._lock:
self._entries.clear()
Channel handlers¶
Notifications route to handlers based on channel:
# Channel handlers mapping
self._channel_handlers: dict[NotificationChannel, ChannelHandler] = {
NotificationChannel.IN_APP: self._send_in_app,
NotificationChannel.WEBHOOK: self._send_webhook,
NotificationChannel.SLACK: self._send_slack,
}
In-app notifications publish to the SSE bus for realtime delivery. Webhook and Slack channels use HTTP POST with retry logic.
Subscription filtering¶
Users configure subscriptions per channel with severity and tag filters. The _should_skip_notification method checks
these before delivery:
async def _should_skip_notification(
self, notification: DomainNotification, subscription: DomainNotificationSubscription
) -> str | None:
"""Check if notification should be skipped based on subscription filters.
Returns skip reason if should skip, None otherwise.
"""
if not subscription.enabled:
return f"User {notification.user_id} has {notification.channel} disabled; skipping delivery."
if subscription.severities and notification.severity not in subscription.severities:
return (
f"Notification severity '{notification.severity}' filtered by user preferences "
f"for {notification.channel}"
)
if subscription.include_tags and not any(tag in subscription.include_tags for tag in (notification.tags or [])):
return f"Notification tags {notification.tags} not in include list for {notification.channel}"
if subscription.exclude_tags and any(tag in subscription.exclude_tags for tag in (notification.tags or [])):
return f"Notification tags {notification.tags} excluded by preferences for {notification.channel}"
return None
Key files¶
| File | Purpose |
|---|---|
services/notification_service.py |
Notification delivery and logic |
db/docs/notification.py |
MongoDB document models |
db/repositories/notification_repository.py |
Database operations |