Skip to content

Event storage architecture

Unified events collection

The system stores all events in a single events MongoDB collection using EventDocument. This provides a unified approach where all event data—whether from Kafka consumers, API operations, or pod monitors—flows into one collection with consistent structure.

EventDocument structure

EventDocument uses a flexible payload pattern. Base fields are stored at document level for efficient indexing, while event-specific fields go into the payload dict:

    execution_id: str | None = None

    model_config = ConfigDict(from_attributes=True, extra="allow")

    class Settings:
        name = "events"
        use_state_management = True
        indexes = [
            # Compound indexes for common query patterns
            IndexModel([("event_type", ASCENDING), ("timestamp", DESCENDING)], name="idx_event_type_ts"),
            IndexModel([("aggregate_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_aggregate_ts"),
            IndexModel([("metadata.correlation_id", ASCENDING)], name="idx_meta_correlation"),
            IndexModel([("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_user_ts"),
            IndexModel([("metadata.service_name", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_service_ts"),
            # Event-specific field indexes (sparse - only exist on relevant event types)
            IndexModel([("execution_id", ASCENDING)], name="idx_execution_id", sparse=True),

Storage pattern

When storing events, base fields stay at top level while everything else goes into payload. The _flatten_doc helper reverses this for deserialization:


The store_event method applies this pattern:


Query pattern

For MongoDB queries, access payload fields with dot notation:

query["payload.execution_id"] = execution_id
query["metadata.correlation_id"] = correlation_id

Write flow

graph TD
    App[Application Code] --> KES[KafkaEventService.publish_event]
    KES --> ES[EventStore.store_event]
    ES --> Events[(events collection)]
    KES --> Producer[UnifiedProducer]
    Producer --> Kafka[(Kafka)]
    Kafka --> ESC[EventStoreConsumer]
    ESC --> ES

KafkaEventService.publish_event() stores to events AND publishes to Kafka. EventStoreConsumer consumes from Kafka and stores to the same events collection. Deduplication via unique event_id index handles double-writes gracefully.

Read patterns

All repositories query the same events collection:

Repository Use Case
EventStore Core event operations, replay, typed deserialization
AdminEventsRepository Admin dashboard, analytics, browsing
ReplayRepository Replay session management, event streaming

TTL and retention

Events have a configurable TTL (default 90 days). The ttl_expires_at field triggers MongoDB's TTL index for automatic cleanup. For permanent audit requirements, events can be archived to EventArchiveDocument before deletion.

ReplayFilter

ReplayFilter provides a unified way to query events across all use cases:

    Attributes:
        timestamp: When the error occurred.
        error: Human-readable error message.
        error_type: Python exception class name (e.g., "ValueError", "KafkaException").
            This is the result of `type(exception).__name__`, NOT the ErrorType enum.
            Present for session-level errors.
        event_id: ID of the event that failed to replay. Present for event-level errors.
    """

    model_config = ConfigDict(from_attributes=True)

    timestamp: datetime
    error: str
    error_type: str | None = None
    event_id: str | None = None


class ReplayFilter(BaseModel):
    model_config = ConfigDict(from_attributes=True)

The to_mongo_query() method builds MongoDB queries from filter fields:

    def is_empty(self) -> bool:
        return not any(
            [
                self.event_ids,
                self.execution_id,
                self.correlation_id,
                self.aggregate_id,
                self.event_types,
                self.start_time,
                self.end_time,
                self.user_id,
                self.service_name,
            ]
        )

    def to_mongo_query(self) -> dict[str, Any]:
        query: dict[str, Any] = {}

        if self.event_ids:
            query["event_id"] = {"$in": self.event_ids}

        if self.execution_id:
            query["execution_id"] = str(self.execution_id)

        if self.correlation_id:
            query["metadata.correlation_id"] = self.correlation_id

        if self.aggregate_id:
            query["aggregate_id"] = self.aggregate_id

        if self.event_types:
            query["event_type"] = {"$in": [str(et) for et in self.event_types]}

        if self.exclude_event_types:
            if "event_type" in query:
                query["event_type"]["$nin"] = [str(et) for et in self.exclude_event_types]
            else:
                query["event_type"] = {"$nin": [str(et) for et in self.exclude_event_types]}

        if self.start_time or self.end_time:
            time_query: dict[str, Any] = {}
            if self.start_time:

All event querying—admin browse, replay preview, event export—uses ReplayFilter.to_mongo_query() for consistency.

Key files

File Purpose
db/docs/event.py EventDocument and EventArchiveDocument definitions
domain/replay/models.py ReplayFilter, ReplayConfig, ReplaySessionState
events/event_store.py Event storage and retrieval operations
db/repositories/replay_repository.py Replay-specific queries
db/repositories/admin/admin_events_repository.py Admin dashboard queries
services/kafka_event_service.py Unified publish (store + Kafka)