Skip to content

Event storage architecture

Unified events collection

The system stores all events in a single events MongoDB collection using EventDocument. This provides a unified approach where all event data—whether from Kafka consumers, API operations, or pod monitors—flows into one collection with consistent structure.

EventDocument structure

EventDocument uses a flexible payload pattern. Base fields are stored at document level for efficient indexing, while event-specific fields go into the payload dict:

class EventDocument(Document):
    """Event document for event browsing/admin system.

    Uses extra='allow' for flexible event data storage - event-specific fields
    are stored directly at document level (no payload wrapper needed).
    """

    event_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4()))  # type: ignore[valid-type]
    event_type: EventType  # Indexed via Settings.indexes
    event_version: str = "1.0"
    timestamp: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc))  # type: ignore[valid-type]
    aggregate_id: Indexed(str) | None = None  # type: ignore[valid-type]
    metadata: EventMetadata
    stored_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    ttl_expires_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc) + timedelta(days=30))

    # Most event types have execution_id (sparse-indexed)
    execution_id: str | None = None

    model_config = ConfigDict(from_attributes=True, extra="allow")

    class Settings:
        name = "events"
        use_state_management = True
        bson_encoders = {EventMetadata: dataclasses.asdict}
        indexes = [
            # Compound indexes for common query patterns
            IndexModel([("event_type", ASCENDING), ("timestamp", DESCENDING)], name="idx_event_type_ts"),
            IndexModel([("aggregate_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_aggregate_ts"),
            IndexModel([("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_user_ts"),
            IndexModel([("metadata.service_name", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_service_ts"),
            # Event-specific field indexes (sparse - only exist on relevant event types)
            IndexModel([("execution_id", ASCENDING)], name="idx_execution_id", sparse=True),
            IndexModel([("pod_name", ASCENDING)], name="idx_pod_name", sparse=True),
            # TTL index (expireAfterSeconds=0 means use ttl_expires_at value directly)
            IndexModel([("ttl_expires_at", ASCENDING)], name="idx_ttl", expireAfterSeconds=0),
            # Additional compound indexes for query optimization
            IndexModel([("event_type", ASCENDING), ("aggregate_id", ASCENDING)], name="idx_events_type_agg"),
            IndexModel([("aggregate_id", ASCENDING), ("timestamp", ASCENDING)], name="idx_events_agg_ts"),
            IndexModel([("event_type", ASCENDING), ("timestamp", ASCENDING)], name="idx_events_type_ts_asc"),
            IndexModel([("metadata.user_id", ASCENDING), ("timestamp", ASCENDING)], name="idx_events_user_ts"),
            IndexModel([("metadata.user_id", ASCENDING), ("event_type", ASCENDING)], name="idx_events_user_type"),
            IndexModel(
                [("event_type", ASCENDING), ("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)],
                name="idx_events_type_user_ts",
            ),
            # Text search index
            IndexModel(
                [
                    ("event_type", pymongo.TEXT),
                    ("metadata.service_name", pymongo.TEXT),
                    ("metadata.user_id", pymongo.TEXT),
                    ("execution_id", pymongo.TEXT),
                ],
                name="idx_text_search",
                language_override="none",
                default_language="english",
            ),
        ]

Storage pattern

When storing events, base fields stay at top level while everything else goes into payload. Repositories handle serialization and deserialization at the boundary between domain models and Beanie documents.

Query pattern

For MongoDB queries, access payload fields with dot notation:

query["payload.execution_id"] = execution_id
query["aggregate_id"] = aggregate_id

Write flow

graph TD
    App[Application Code] --> KES[KafkaEventService.publish_event]
    KES --> Repo[EventRepository]
    Repo --> Events[(events collection)]
    KES --> Producer[UnifiedProducer]
    Producer --> Kafka[(Kafka)]

KafkaEventService.publish_event() stores to events AND publishes to Kafka. Deduplication via unique event_id index handles double-writes gracefully.

Read patterns

All repositories query the same events collection:

Repository Use Case
EventRepository Core event operations, replay, typed deserialization
AdminEventsRepository Admin dashboard, analytics, browsing
ReplayRepository Replay session management, event streaming

TTL and retention

Events have a configurable TTL (default 90 days). The ttl_expires_at field triggers MongoDB's TTL index for automatic cleanup. For permanent audit requirements, events can be archived to EventArchiveDocument before deletion.

ReplayFilter

ReplayFilter provides a unified way to query events across all use cases:

@dataclass
class ReplayError:
    """Error details for replay operations.

    Attributes:
        timestamp: When the error occurred.
        error: Human-readable error message.
        error_type: Python exception class name (e.g., "ValueError", "KafkaException").
            This is the result of `type(exception).__name__`, NOT the ErrorType enum.
            Present for session-level errors.
        event_id: ID of the event that failed to replay. Present for event-level errors.
    """

    timestamp: datetime
    error: str
    error_type: str | None = None
    event_id: str | None = None

The to_mongo_query() method builds MongoDB queries from filter fields:

@dataclass
class ReplayFilter:
    # Event selection filters
    event_ids: list[str] | None = None
    execution_id: str | None = None
    aggregate_id: str | None = None
    event_types: list[EventType] | None = None
    exclude_event_types: list[EventType] | None = None

    # Time range
    start_time: datetime | None = None
    end_time: datetime | None = None

    # Metadata filters
    user_id: str | None = None
    service_name: str | None = None

    def is_empty(self) -> bool:
        return not any(
            [
                self.event_ids,
                self.execution_id,
                self.aggregate_id,
                self.event_types,
                self.start_time,
                self.end_time,
                self.user_id,
                self.service_name,
            ]
        )

    def to_mongo_query(self) -> dict[str, Any]:
        query: dict[str, Any] = {}

        if self.event_ids:
            query["event_id"] = {"$in": self.event_ids}

        if self.execution_id:
            query["execution_id"] = self.execution_id

        if self.aggregate_id:
            query["aggregate_id"] = self.aggregate_id

        if self.event_types:
            query["event_type"] = {"$in": list(self.event_types)}

        if self.exclude_event_types:
            if "event_type" in query:
                query["event_type"]["$nin"] = list(self.exclude_event_types)
            else:
                query["event_type"] = {"$nin": list(self.exclude_event_types)}

        if self.start_time or self.end_time:
            time_query: dict[str, Any] = {}
            if self.start_time:
                time_query["$gte"] = self.start_time
            if self.end_time:
                time_query["$lte"] = self.end_time
            query["timestamp"] = time_query

        if self.user_id:
            query["metadata.user_id"] = self.user_id

        if self.service_name:
            query["metadata.service_name"] = self.service_name

        return query

All event querying—admin browse, replay preview, event export—uses ReplayFilter.to_mongo_query() for consistency.

Key files

File Purpose
db/docs/event.py EventDocument and EventArchiveDocument definitions
domain/replay/models.py ReplayFilter, ReplayConfig, ReplaySessionState
db/repositories/event_repository.py Event storage and retrieval operations
db/repositories/replay_repository.py Replay-specific queries
db/repositories/admin/admin_events_repository.py Admin dashboard queries
services/kafka_event_service.py Unified publish (store + Kafka)