Skip to content

Idempotency

The platform implements at-least-once event delivery with idempotency protection to prevent duplicate processing. When a Kafka message is delivered multiple times (due to retries, rebalances, or failures), the idempotency layer ensures the event handler executes only once. Results can be cached for fast duplicate responses.

Architecture

flowchart TB
    subgraph Kafka Consumer
        MSG[Incoming Event] --> CHECK[Check & Reserve Key]
    end

    subgraph Idempotency Manager
        CHECK --> REDIS[(Redis)]
        REDIS --> FOUND{Key Exists?}
        FOUND -->|Yes| STATUS{Status?}
        STATUS -->|Processing| TIMEOUT{Timed Out?}
        STATUS -->|Completed/Failed| DUP[Return Duplicate]
        TIMEOUT -->|Yes| RETRY[Allow Retry]
        TIMEOUT -->|No| WAIT[Block Duplicate]
        FOUND -->|No| RESERVE[Reserve Key]
    end

    subgraph Handler Execution
        RESERVE --> HANDLER[Execute Handler]
        RETRY --> HANDLER
        HANDLER -->|Success| COMPLETE[Mark Completed]
        HANDLER -->|Error| FAIL[Mark Failed]
        COMPLETE --> CACHE[Cache Result]
    end

Key Strategies

The idempotency manager supports three strategies for generating keys from events:

Event-based uses the event's unique ID and type. This is the default and works for events where the ID is guaranteed unique (like UUIDs generated at publish time).

Content hash generates a SHA-256 hash of the event's payload, excluding metadata like timestamps and event IDs. Use this when the same logical operation might produce different event IDs but identical content.

Custom allows the caller to provide an arbitrary key. Useful when idempotency depends on business logic (e.g., "one execution per user per minute").

    def _generate_key(
        self, event: BaseEvent, key_strategy: KeyStrategy, custom_key: str | None = None, fields: set[str] | None = None
    ) -> str:
        if key_strategy == KeyStrategy.EVENT_BASED:
            key = f"{event.event_type}:{event.event_id}"
        elif key_strategy == KeyStrategy.CONTENT_HASH:
            event_dict = event.model_dump(mode="json")
            event_dict.pop("event_id", None)
            event_dict.pop("timestamp", None)
            event_dict.pop("metadata", None)
            if fields:
                event_dict = {k: v for k, v in event_dict.items() if k in fields}
            content = json.dumps(event_dict, sort_keys=True)
            key = hashlib.sha256(content.encode()).hexdigest()
        elif key_strategy == KeyStrategy.CUSTOM and custom_key:
            key = f"{event.event_type}:{custom_key}"
        else:
            raise ValueError(f"Invalid key strategy: {key_strategy}")
        return f"{self.config.key_prefix}:{key}"

Status Lifecycle

Each idempotency record transitions through defined states:

class IdempotencyStatus(StringEnum):
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

When an event arrives, the manager checks for an existing key. If none exists, it creates a record in PROCESSING state and returns control to the handler. On success, the record moves to COMPLETED; on error, to FAILED. Both terminal states block duplicate processing for the TTL duration.

If a key is found in PROCESSING state but has exceeded the processing timeout (default 5 minutes), the manager assumes the previous processor crashed and allows a retry.

Middleware Integration

The IdempotentEventHandler wraps a single Kafka event handler with automatic duplicate detection:


IdempotentEventDispatcher is an EventDispatcher subclass that automatically wraps every registered handler with idempotency. DI providers create this subclass for services that need idempotent event handling (coordinator, k8s worker, result processor); services that don't (saga orchestrator) use a plain EventDispatcher:


Redis Storage

Idempotency records are stored in Redis with automatic TTL expiration. The SET NX EX command provides atomic reservation—if two processes race to claim the same key, only one succeeds:

    async def insert_processing(self, record: IdempotencyRecord) -> None:
        k = self._full_key(record.key)
        doc = self._record_to_doc(record)
        # SET NX with EX for atomic reservation
        ok = await self._r.set(k, json.dumps(doc, default=_json_default), ex=record.ttl_seconds, nx=True)
        if not ok:
            # Mirror Mongo behavior so manager's DuplicateKeyError path is reused
            raise DuplicateKeyError("Key already exists")

Configuration

Parameter Default Description
key_prefix idempotency Redis key namespace
default_ttl_seconds 3600 How long completed keys are retained
processing_timeout_seconds 300 When to assume a processor crashed
enable_result_caching true Store handler results for duplicates
max_result_size_bytes 1048576 Maximum cached result size (1MB)
class IdempotencyConfig(BaseModel):
    key_prefix: str = "idempotency"
    default_ttl_seconds: int = 3600
    processing_timeout_seconds: int = 300
    enable_result_caching: bool = True
    max_result_size_bytes: int = 1048576

Result Caching

When enable_result_caching is true, the manager stores the handler's result JSON alongside the completion status. Subsequent duplicates can return the cached result without re-executing the handler. This is useful for idempotent queries where the response should be consistent.

Results exceeding max_result_size_bytes are silently dropped from the cache but the idempotency protection still applies.

Metrics

The idempotency system exposes several metrics for monitoring:

  • idempotency_cache_hits - Key lookups that found an existing record
  • idempotency_cache_misses - Key lookups that created new records
  • idempotency_duplicates_blocked - Events rejected as duplicates
  • idempotency_keys_active - Current number of active keys (updated periodically)

Key Files

File Purpose
services/idempotency/idempotency_manager.py Core idempotency logic
services/idempotency/redis_repository.py Redis storage adapter
services/idempotency/middleware.py Handler wrapper and IdempotentEventDispatcher
domain/idempotency/ Domain models