Skip to content

Notifications

Notifications are producer-driven with minimal core fields. The notification system supports multiple channels (in-app, webhook, Slack) with throttling, retries, and user subscription preferences.

Architecture

flowchart LR
    Kafka[(Kafka)] --> NS[NotificationService]
    NS --> DB[(MongoDB)]
    NS --> SSE[SSE Bus]
    NS --> Webhook[Webhook]
    NS --> Slack[Slack]
    SSE --> Browser

The NotificationService implements LifecycleEnabled and starts its Kafka consumer during application startup. It subscribes to execution result events (EXECUTION_COMPLETED, EXECUTION_FAILED, EXECUTION_TIMEOUT) and automatically creates notifications for users when their executions complete.

Core fields

Field Description
subject Short title
body Text content
channel in_app, webhook, or slack
severity low, medium, high, or urgent
tags List of strings for filtering, e.g. ["execution", "failed"]
status pending, sending, delivered, failed, skipped, read, clicked

Tag conventions

Producers include structured tags for filtering, UI actions, and correlation.

Tag type Purpose Examples
Category What the notification is about execution
Entity Entity type entity:execution
Reference Link to specific resource exec:<execution_id>
Outcome What happened completed, failed, timeout

Examples

Execution completed:

["execution", "completed", "entity:execution", "exec:2c1b...e8"]

Execution failed:

["execution", "failed", "entity:execution", "exec:2c1b...e8"]

Throttling

The service throttles notifications per user per severity window:

                return True

            # Add new entry
            self._entries[key].append(now)
            return False

    async def clear(self) -> None:
        """Clear all throttle entries."""
        async with self._lock:
            self._entries.clear()


@dataclass(frozen=True)
class SystemConfig:
    severity: NotificationSeverity
    throttle_exempt: bool


class NotificationService:
    def __init__(
        self,
        notification_repository: NotificationRepository,
        event_service: KafkaEventService,
        sse_bus: SSERedisBus,
        settings: Settings,
        logger: logging.Logger,

Channel handlers

Notifications route to handlers based on channel:

            user_id,
            severity,
            window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
            max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
        ):

In-app notifications publish to the SSE bus for realtime delivery. Webhook and Slack channels use HTTP POST with retry logic.

Subscription filtering

Users configure subscriptions per channel with severity and tag filters. The _should_skip_notification method checks these before delivery:


Key files

File Purpose
services/notification_service.py Notification delivery and logic
db/docs/notification.py MongoDB document models
db/repositories/notification_repository.py Database operations