Event storage architecture¶
Unified events collection¶
The system stores all events in a single events MongoDB collection using EventDocument. This provides a unified
approach where all event data—whether from Kafka consumers, API operations, or pod monitors—flows into one collection
with consistent structure.
EventDocument structure¶
EventDocument uses a flexible payload pattern. Base fields are stored at document level for efficient indexing, while
event-specific fields go into the payload dict:
execution_id: str | None = None
model_config = ConfigDict(from_attributes=True, extra="allow")
class Settings:
name = "events"
use_state_management = True
indexes = [
# Compound indexes for common query patterns
IndexModel([("event_type", ASCENDING), ("timestamp", DESCENDING)], name="idx_event_type_ts"),
IndexModel([("aggregate_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_aggregate_ts"),
IndexModel([("metadata.correlation_id", ASCENDING)], name="idx_meta_correlation"),
IndexModel([("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_user_ts"),
IndexModel([("metadata.service_name", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_service_ts"),
# Event-specific field indexes (sparse - only exist on relevant event types)
IndexModel([("execution_id", ASCENDING)], name="idx_execution_id", sparse=True),
Storage pattern¶
When storing events, base fields stay at top level while everything else goes into payload. The _flatten_doc helper
reverses this for deserialization:
The store_event method applies this pattern:
Query pattern¶
For MongoDB queries, access payload fields with dot notation:
Write flow¶
graph TD
App[Application Code] --> KES[KafkaEventService.publish_event]
KES --> ES[EventStore.store_event]
ES --> Events[(events collection)]
KES --> Producer[UnifiedProducer]
Producer --> Kafka[(Kafka)]
Kafka --> ESC[EventStoreConsumer]
ESC --> ES
KafkaEventService.publish_event() stores to events AND publishes to Kafka. EventStoreConsumer consumes from Kafka
and stores to the same events collection. Deduplication via unique event_id index handles double-writes gracefully.
Read patterns¶
All repositories query the same events collection:
| Repository | Use Case |
|---|---|
EventStore |
Core event operations, replay, typed deserialization |
AdminEventsRepository |
Admin dashboard, analytics, browsing |
ReplayRepository |
Replay session management, event streaming |
TTL and retention¶
Events have a configurable TTL (default 90 days). The ttl_expires_at field triggers MongoDB's TTL index for automatic
cleanup. For permanent audit requirements, events can be archived to EventArchiveDocument before deletion.
ReplayFilter¶
ReplayFilter provides a unified way to query events across all use cases:
Attributes:
timestamp: When the error occurred.
error: Human-readable error message.
error_type: Python exception class name (e.g., "ValueError", "KafkaException").
This is the result of `type(exception).__name__`, NOT the ErrorType enum.
Present for session-level errors.
event_id: ID of the event that failed to replay. Present for event-level errors.
"""
model_config = ConfigDict(from_attributes=True)
timestamp: datetime
error: str
error_type: str | None = None
event_id: str | None = None
class ReplayFilter(BaseModel):
model_config = ConfigDict(from_attributes=True)
The to_mongo_query() method builds MongoDB queries from filter fields:
def is_empty(self) -> bool:
return not any(
[
self.event_ids,
self.execution_id,
self.correlation_id,
self.aggregate_id,
self.event_types,
self.start_time,
self.end_time,
self.user_id,
self.service_name,
]
)
def to_mongo_query(self) -> dict[str, Any]:
query: dict[str, Any] = {}
if self.event_ids:
query["event_id"] = {"$in": self.event_ids}
if self.execution_id:
query["execution_id"] = str(self.execution_id)
if self.correlation_id:
query["metadata.correlation_id"] = self.correlation_id
if self.aggregate_id:
query["aggregate_id"] = self.aggregate_id
if self.event_types:
query["event_type"] = {"$in": [str(et) for et in self.event_types]}
if self.exclude_event_types:
if "event_type" in query:
query["event_type"]["$nin"] = [str(et) for et in self.exclude_event_types]
else:
query["event_type"] = {"$nin": [str(et) for et in self.exclude_event_types]}
if self.start_time or self.end_time:
time_query: dict[str, Any] = {}
if self.start_time:
All event querying—admin browse, replay preview, event export—uses ReplayFilter.to_mongo_query() for consistency.
Key files¶
| File | Purpose |
|---|---|
db/docs/event.py |
EventDocument and EventArchiveDocument definitions |
domain/replay/models.py |
ReplayFilter, ReplayConfig, ReplaySessionState |
events/event_store.py |
Event storage and retrieval operations |
db/repositories/replay_repository.py |
Replay-specific queries |
db/repositories/admin/admin_events_repository.py |
Admin dashboard queries |
services/kafka_event_service.py |
Unified publish (store + Kafka) |
Related docs¶
- User Settings Events — event sourcing pattern for user settings with TypeAdapter merging
- Pydantic Dataclasses — why domain models use pydantic dataclasses for nested conversion