Skip to content

Notifications

Notifications are producer-driven with minimal core fields. The notification system supports multiple channels (in-app, webhook, Slack) with throttling, retries, and user subscription preferences.

Architecture

flowchart LR
    Kafka[(Kafka)] --> NS[NotificationService]
    NS --> DB[(MongoDB)]
    NS --> SSE[SSE Bus]
    NS --> Webhook[Webhook]
    NS --> Slack[Slack]
    SSE --> Browser

The NotificationService implements LifecycleEnabled and starts its Kafka consumer during application startup. It subscribes to execution result events (EXECUTION_COMPLETED, EXECUTION_FAILED, EXECUTION_TIMEOUT) and automatically creates notifications for users when their executions complete.

Core fields

Field Description
subject Short title
body Text content
channel in_app, webhook, or slack
severity low, medium, high, or urgent
tags List of strings for filtering, e.g. ["execution", "failed"]
status pending, sending, delivered, failed, skipped, read, clicked

Tag conventions

Producers include structured tags for filtering, UI actions, and correlation.

Tag type Purpose Examples
Category What the notification is about execution
Entity Entity type entity:execution
Reference Link to specific resource exec:<execution_id>
Outcome What happened completed, failed, timeout

Examples

Execution completed:

["execution", "completed", "entity:execution", "exec:2c1b...e8"]

Execution failed:

["execution", "failed", "entity:execution", "exec:2c1b...e8"]

Throttling

The service throttles notifications per user per severity window:

@dataclass
class ThrottleCache:
    """Manages notification throttling with time windows."""

    _entries: dict[str, list[datetime]] = field(default_factory=dict)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)

    async def check_throttle(
        self,
        user_id: str,
        severity: NotificationSeverity,
        window_hours: int,
        max_per_hour: int,
    ) -> bool:
        """Check if notification should be throttled."""
        key = f"{user_id}:{severity}"
        now = datetime.now(UTC)
        window_start = now - timedelta(hours=window_hours)

        async with self._lock:
            if key not in self._entries:
                self._entries[key] = []

            # Clean old entries
            self._entries[key] = [ts for ts in self._entries[key] if ts > window_start]

            # Check limit
            if len(self._entries[key]) >= max_per_hour:
                return True

            # Add new entry
            self._entries[key].append(now)
            return False

    async def clear(self) -> None:
        """Clear all throttle entries."""
        async with self._lock:
            self._entries.clear()

Channel handlers

Notifications route to handlers based on channel:

        # Channel handlers mapping
        self._channel_handlers: dict[NotificationChannel, ChannelHandler] = {
            NotificationChannel.IN_APP: self._send_in_app,
            NotificationChannel.WEBHOOK: self._send_webhook,
            NotificationChannel.SLACK: self._send_slack,
        }

In-app notifications publish to the SSE bus for realtime delivery. Webhook and Slack channels use HTTP POST with retry logic.

Subscription filtering

Users configure subscriptions per channel with severity and tag filters. The _should_skip_notification method checks these before delivery:

    async def _should_skip_notification(
        self, notification: DomainNotification, subscription: DomainNotificationSubscription
    ) -> str | None:
        """Check if notification should be skipped based on subscription filters.

        Returns skip reason if should skip, None otherwise.
        """
        if not subscription.enabled:
            return f"User {notification.user_id} has {notification.channel} disabled; skipping delivery."

        if subscription.severities and notification.severity not in subscription.severities:
            return (
                f"Notification severity '{notification.severity}' filtered by user preferences "
                f"for {notification.channel}"
            )

        if subscription.include_tags and not any(tag in subscription.include_tags for tag in (notification.tags or [])):
            return f"Notification tags {notification.tags} not in include list for {notification.channel}"

        if subscription.exclude_tags and any(tag in subscription.exclude_tags for tag in (notification.tags or [])):
            return f"Notification tags {notification.tags} excluded by preferences for {notification.channel}"

        return None

Key files

File Purpose
services/notification_service.py Notification delivery and logic
db/docs/notification.py MongoDB document models
db/repositories/notification_repository.py Database operations