Event storage architecture¶
Unified events collection¶
The system stores all events in a single events MongoDB collection using EventDocument. This provides a unified
approach where all event data—whether from Kafka consumers, API operations, or pod monitors—flows into one collection
with consistent structure.
EventDocument structure¶
EventDocument uses a flexible payload pattern. Base fields are stored at document level for efficient indexing, while
event-specific fields go into the payload dict:
class EventDocument(Document):
"""Event document for event browsing/admin system.
Uses extra='allow' for flexible event data storage - event-specific fields
are stored directly at document level (no payload wrapper needed).
"""
event_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type]
event_type: EventType # Indexed via Settings.indexes
event_version: str = "1.0"
timestamp: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
aggregate_id: Indexed(str) | None = None # type: ignore[valid-type]
metadata: EventMetadata
stored_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
ttl_expires_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc) + timedelta(days=30))
# Most event types have execution_id (sparse-indexed)
execution_id: str | None = None
model_config = ConfigDict(from_attributes=True, extra="allow")
class Settings:
name = "events"
use_state_management = True
bson_encoders = {EventMetadata: dataclasses.asdict}
indexes = [
# Compound indexes for common query patterns
IndexModel([("event_type", ASCENDING), ("timestamp", DESCENDING)], name="idx_event_type_ts"),
IndexModel([("aggregate_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_aggregate_ts"),
IndexModel([("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_user_ts"),
IndexModel([("metadata.service_name", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_service_ts"),
# Event-specific field indexes (sparse - only exist on relevant event types)
IndexModel([("execution_id", ASCENDING)], name="idx_execution_id", sparse=True),
IndexModel([("pod_name", ASCENDING)], name="idx_pod_name", sparse=True),
# TTL index (expireAfterSeconds=0 means use ttl_expires_at value directly)
IndexModel([("ttl_expires_at", ASCENDING)], name="idx_ttl", expireAfterSeconds=0),
# Additional compound indexes for query optimization
IndexModel([("event_type", ASCENDING), ("aggregate_id", ASCENDING)], name="idx_events_type_agg"),
IndexModel([("aggregate_id", ASCENDING), ("timestamp", ASCENDING)], name="idx_events_agg_ts"),
IndexModel([("event_type", ASCENDING), ("timestamp", ASCENDING)], name="idx_events_type_ts_asc"),
IndexModel([("metadata.user_id", ASCENDING), ("timestamp", ASCENDING)], name="idx_events_user_ts"),
IndexModel([("metadata.user_id", ASCENDING), ("event_type", ASCENDING)], name="idx_events_user_type"),
IndexModel(
[("event_type", ASCENDING), ("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)],
name="idx_events_type_user_ts",
),
# Text search index
IndexModel(
[
("event_type", pymongo.TEXT),
("metadata.service_name", pymongo.TEXT),
("metadata.user_id", pymongo.TEXT),
("execution_id", pymongo.TEXT),
],
name="idx_text_search",
language_override="none",
default_language="english",
),
]
Storage pattern¶
When storing events, base fields stay at top level while everything else goes into payload. Repositories handle serialization and deserialization at the boundary between domain models and Beanie documents.
Query pattern¶
For MongoDB queries, access payload fields with dot notation:
Write flow¶
graph TD
App[Application Code] --> KES[KafkaEventService.publish_event]
KES --> Repo[EventRepository]
Repo --> Events[(events collection)]
KES --> Producer[UnifiedProducer]
Producer --> Kafka[(Kafka)]
KafkaEventService.publish_event() stores to events AND publishes to Kafka. Deduplication via unique event_id index
handles double-writes gracefully.
Read patterns¶
All repositories query the same events collection:
| Repository | Use Case |
|---|---|
EventRepository |
Core event operations, replay, typed deserialization |
AdminEventsRepository |
Admin dashboard, analytics, browsing |
ReplayRepository |
Replay session management, event streaming |
TTL and retention¶
Events have a configurable TTL (default 90 days). The ttl_expires_at field triggers MongoDB's TTL index for automatic
cleanup. For permanent audit requirements, events can be archived to EventArchiveDocument before deletion.
ReplayFilter¶
ReplayFilter provides a unified way to query events across all use cases:
@dataclass
class ReplayError:
"""Error details for replay operations.
Attributes:
timestamp: When the error occurred.
error: Human-readable error message.
error_type: Python exception class name (e.g., "ValueError", "KafkaException").
This is the result of `type(exception).__name__`, NOT the ErrorType enum.
Present for session-level errors.
event_id: ID of the event that failed to replay. Present for event-level errors.
"""
timestamp: datetime
error: str
error_type: str | None = None
event_id: str | None = None
The to_mongo_query() method builds MongoDB queries from filter fields:
@dataclass
class ReplayFilter:
# Event selection filters
event_ids: list[str] | None = None
execution_id: str | None = None
aggregate_id: str | None = None
event_types: list[EventType] | None = None
exclude_event_types: list[EventType] | None = None
# Time range
start_time: datetime | None = None
end_time: datetime | None = None
# Metadata filters
user_id: str | None = None
service_name: str | None = None
def is_empty(self) -> bool:
return not any(
[
self.event_ids,
self.execution_id,
self.aggregate_id,
self.event_types,
self.start_time,
self.end_time,
self.user_id,
self.service_name,
]
)
def to_mongo_query(self) -> dict[str, Any]:
query: dict[str, Any] = {}
if self.event_ids:
query["event_id"] = {"$in": self.event_ids}
if self.execution_id:
query["execution_id"] = self.execution_id
if self.aggregate_id:
query["aggregate_id"] = self.aggregate_id
if self.event_types:
query["event_type"] = {"$in": list(self.event_types)}
if self.exclude_event_types:
if "event_type" in query:
query["event_type"]["$nin"] = list(self.exclude_event_types)
else:
query["event_type"] = {"$nin": list(self.exclude_event_types)}
if self.start_time or self.end_time:
time_query: dict[str, Any] = {}
if self.start_time:
time_query["$gte"] = self.start_time
if self.end_time:
time_query["$lte"] = self.end_time
query["timestamp"] = time_query
if self.user_id:
query["metadata.user_id"] = self.user_id
if self.service_name:
query["metadata.service_name"] = self.service_name
return query
All event querying—admin browse, replay preview, event export—uses ReplayFilter.to_mongo_query() for consistency.
Key files¶
| File | Purpose |
|---|---|
db/docs/event.py |
EventDocument and EventArchiveDocument definitions |
domain/replay/models.py |
ReplayFilter, ReplayConfig, ReplaySessionState |
db/repositories/event_repository.py |
Event storage and retrieval operations |
db/repositories/replay_repository.py |
Replay-specific queries |
db/repositories/admin/admin_events_repository.py |
Admin dashboard queries |
services/kafka_event_service.py |
Unified publish (store + Kafka) |
Related docs¶
- User Settings Events — event sourcing pattern for user settings with TypeAdapter merging
- Domain Dataclasses — why domain models use stdlib dataclasses