Idempotency¶
The platform implements at-least-once event delivery with idempotency protection to prevent duplicate processing. When a Kafka message is delivered multiple times (due to retries, rebalances, or failures), the idempotency layer ensures the event handler executes only once. Results can be cached for fast duplicate responses.
Architecture¶
flowchart TB
subgraph Kafka Consumer
MSG[Incoming Event] --> CHECK[Check & Reserve Key]
end
subgraph Idempotency Manager
CHECK --> REDIS[(Redis)]
REDIS --> FOUND{Key Exists?}
FOUND -->|Yes| STATUS{Status?}
STATUS -->|Processing| TIMEOUT{Timed Out?}
STATUS -->|Completed/Failed| DUP[Return Duplicate]
TIMEOUT -->|Yes| RETRY[Allow Retry]
TIMEOUT -->|No| WAIT[Block Duplicate]
FOUND -->|No| RESERVE[Reserve Key]
end
subgraph Handler Execution
RESERVE --> HANDLER[Execute Handler]
RETRY --> HANDLER
HANDLER -->|Success| COMPLETE[Mark Completed]
HANDLER -->|Error| FAIL[Mark Failed]
COMPLETE --> CACHE[Cache Result]
end
Key Strategies¶
The idempotency manager supports three strategies for generating keys from events:
Event-based uses the event's unique ID and type. This is the default and works for events where the ID is guaranteed unique (like UUIDs generated at publish time).
Content hash generates a SHA-256 hash of the event's payload, excluding metadata like timestamps and event IDs. Use this when the same logical operation might produce different event IDs but identical content.
Custom allows the caller to provide an arbitrary key. Useful when idempotency depends on business logic (e.g., "one execution per user per minute").
def _generate_key(
self, event: BaseEvent, key_strategy: KeyStrategy, custom_key: str | None = None, fields: set[str] | None = None
) -> str:
if key_strategy == KeyStrategy.EVENT_BASED:
key = f"{event.event_type}:{event.event_id}"
elif key_strategy == KeyStrategy.CONTENT_HASH:
event_dict = event.model_dump(mode="json")
event_dict.pop("event_id", None)
event_dict.pop("timestamp", None)
event_dict.pop("metadata", None)
if fields:
event_dict = {k: v for k, v in event_dict.items() if k in fields}
content = json.dumps(event_dict, sort_keys=True)
key = hashlib.sha256(content.encode()).hexdigest()
elif key_strategy == KeyStrategy.CUSTOM and custom_key:
key = f"{event.event_type}:{custom_key}"
else:
raise ValueError(f"Invalid key strategy: {key_strategy}")
return f"{self.config.key_prefix}:{key}"
Status Lifecycle¶
Each idempotency record transitions through defined states:
class IdempotencyStatus(StringEnum):
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
When an event arrives, the manager checks for an existing key. If none exists, it creates a record in PROCESSING state
and returns control to the handler. On success, the record moves to COMPLETED; on error, to FAILED. Both terminal
states block duplicate processing for the TTL duration.
If a key is found in PROCESSING state but has exceeded the processing timeout (default 5 minutes), the manager assumes
the previous processor crashed and allows a retry.
Middleware Integration¶
The IdempotentEventHandler wraps a single Kafka event handler with automatic duplicate detection:
IdempotentEventDispatcher is an EventDispatcher subclass that automatically wraps every registered
handler with idempotency. DI providers create this subclass for services that need idempotent event handling
(coordinator, k8s worker, result processor); services that don't (saga orchestrator) use a plain EventDispatcher:
Redis Storage¶
Idempotency records are stored in Redis with automatic TTL expiration. The SET NX EX command provides atomic
reservation—if two processes race to claim the same key, only one succeeds:
async def insert_processing(self, record: IdempotencyRecord) -> None:
k = self._full_key(record.key)
doc = self._record_to_doc(record)
# SET NX with EX for atomic reservation
ok = await self._r.set(k, json.dumps(doc, default=_json_default), ex=record.ttl_seconds, nx=True)
if not ok:
# Mirror Mongo behavior so manager's DuplicateKeyError path is reused
raise DuplicateKeyError("Key already exists")
Configuration¶
| Parameter | Default | Description |
|---|---|---|
key_prefix |
idempotency |
Redis key namespace |
default_ttl_seconds |
3600 |
How long completed keys are retained |
processing_timeout_seconds |
300 |
When to assume a processor crashed |
enable_result_caching |
true |
Store handler results for duplicates |
max_result_size_bytes |
1048576 |
Maximum cached result size (1MB) |
class IdempotencyConfig(BaseModel):
key_prefix: str = "idempotency"
default_ttl_seconds: int = 3600
processing_timeout_seconds: int = 300
enable_result_caching: bool = True
max_result_size_bytes: int = 1048576
Result Caching¶
When enable_result_caching is true, the manager stores the handler's result JSON alongside the completion status.
Subsequent duplicates can return the cached result without re-executing the handler. This is useful for idempotent
queries where the response should be consistent.
Results exceeding max_result_size_bytes are silently dropped from the cache but the idempotency protection still
applies.
Metrics¶
The idempotency system exposes several metrics for monitoring:
idempotency_cache_hits- Key lookups that found an existing recordidempotency_cache_misses- Key lookups that created new recordsidempotency_duplicates_blocked- Events rejected as duplicatesidempotency_keys_active- Current number of active keys (updated periodically)
Key Files¶
| File | Purpose |
|---|---|
services/idempotency/idempotency_manager.py |
Core idempotency logic |
services/idempotency/redis_repository.py |
Redis storage adapter |
services/idempotency/middleware.py |
Handler wrapper and IdempotentEventDispatcher |
domain/idempotency/ |
Domain models |