Skip to content

Idempotency

The platform implements at-least-once event delivery with idempotency protection to prevent duplicate processing. When a Kafka message is delivered multiple times (due to retries, rebalances, or failures), the idempotency layer ensures the event handler executes only once. Results can be cached for fast duplicate responses.

Architecture

flowchart TB
    subgraph Kafka Consumer
        MSG[Incoming Event] --> CHECK[Check & Reserve Key]
    end

    subgraph Idempotency Manager
        CHECK --> REDIS[(Redis)]
        REDIS --> FOUND{Key Exists?}
        FOUND -->|Yes| STATUS{Status?}
        STATUS -->|Processing| TIMEOUT{Timed Out?}
        STATUS -->|Completed/Failed| DUP[Return Duplicate]
        TIMEOUT -->|Yes| RETRY[Allow Retry]
        TIMEOUT -->|No| WAIT[Block Duplicate]
        FOUND -->|No| RESERVE[Reserve Key]
    end

    subgraph Handler Execution
        RESERVE --> HANDLER[Execute Handler]
        RETRY --> HANDLER
        HANDLER -->|Success| COMPLETE[Mark Completed]
        HANDLER -->|Error| FAIL[Mark Failed]
        COMPLETE --> CACHE[Cache Result]
    end

Key Strategies

The idempotency manager supports three strategies for generating keys from events:

Event-based uses the event's unique ID and type. This is the default and works for events where the ID is guaranteed unique (like UUIDs generated at publish time).

Content hash generates a SHA-256 hash of the event's payload, excluding metadata like timestamps and event IDs. Use this when the same logical operation might produce different event IDs but identical content.

Custom allows the caller to provide an arbitrary key. Useful when idempotency depends on business logic (e.g., "one execution per user per minute").

    def _generate_key(
        self, event: BaseEvent, key_strategy: KeyStrategy, custom_key: str | None = None, fields: set[str] | None = None
    ) -> str:
        if key_strategy == KeyStrategy.EVENT_BASED:
            key = f"{event.event_type}:{event.event_id}"
        elif key_strategy == KeyStrategy.CONTENT_HASH:
            event_dict = event.model_dump(mode="json")
            event_dict.pop("event_id", None)
            event_dict.pop("timestamp", None)
            event_dict.pop("metadata", None)
            if fields:
                event_dict = {k: v for k, v in event_dict.items() if k in fields}
            content = json.dumps(event_dict, sort_keys=True)
            key = hashlib.sha256(content.encode()).hexdigest()
        elif key_strategy == KeyStrategy.CUSTOM and custom_key:
            key = f"{event.event_type}:{custom_key}"
        else:
            raise ValueError(f"Invalid key strategy: {key_strategy}")
        return f"{self.config.key_prefix}:{key}"

Status Lifecycle

Each idempotency record transitions through defined states:

class IdempotencyStatus(StringEnum):
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

When an event arrives, the manager checks for an existing key. If none exists, it creates a record in PROCESSING state and returns control to the handler. On success, the record moves to COMPLETED; on error, to FAILED. Both terminal states block duplicate processing for the TTL duration.

If a key is found in PROCESSING state but has exceeded the processing timeout (default 5 minutes), the manager assumes the previous processor crashed and allows a retry.

Handler Integration

Event handlers use the with_idempotency helper in events/handlers.py to wrap handler execution with automatic duplicate detection. The helper checks and reserves the key, runs the handler, then marks the result as completed or failed:

async def with_idempotency(
        event: DomainEvent,
        handler: Callable[..., Awaitable[None]],
        idem: IdempotencyManager,
        key_strategy: KeyStrategy,
        ttl_seconds: int,
        logger: structlog.stdlib.BoundLogger,
) -> None:
    """Run *handler* inside an idempotency guard (check -> execute -> mark)."""
    result = await idem.check_and_reserve(
        event=event, key_strategy=key_strategy, ttl_seconds=ttl_seconds,
    )
    if result.is_duplicate:
        logger.info(f"Duplicate event: {event.event_type} ({event.event_id})")
        return
    try:
        await handler(event)
        await idem.mark_completed(event=event, key_strategy=key_strategy)
    except Exception as e:
        await idem.mark_failed(
            event=event, error=str(e), key_strategy=key_strategy,
        )
        raise

IdempotencyManager is injected via Dishka into each worker's handler registrations. The saga orchestrator skips idempotency since saga steps have their own deduplication logic.

Redis Storage

Idempotency records are stored in Redis with automatic TTL expiration. The SET NX EX command provides atomic reservation—if two processes race to claim the same key, only one succeeds:

    async def insert_processing(self, record: IdempotencyRecord) -> None:
        k = self._full_key(record.key)
        doc = self._record_to_doc(record)
        # SET NX with EX for atomic reservation
        ok = await self._r.set(k, json.dumps(doc, default=_json_default), ex=record.ttl_seconds, nx=True)
        if not ok:
            # Mirror Mongo behavior so manager's DuplicateKeyError path is reused
            raise DuplicateKeyError("Key already exists")

Configuration

Parameter Default Description
key_prefix idempotency Redis key namespace
default_ttl_seconds 3600 How long completed keys are retained
processing_timeout_seconds 300 When to assume a processor crashed
enable_result_caching true Store handler results for duplicates
max_result_size_bytes 1048576 Maximum cached result size (1MB)
class IdempotencyConfig(BaseModel):
    key_prefix: str = "idempotency"
    default_ttl_seconds: int = 3600
    processing_timeout_seconds: int = 300
    enable_result_caching: bool = True
    max_result_size_bytes: int = 1048576

Result Caching

When enable_result_caching is true, the manager stores the handler's result JSON alongside the completion status. Subsequent duplicates can return the cached result without re-executing the handler. This is useful for idempotent queries where the response should be consistent.

Results exceeding max_result_size_bytes are silently dropped from the cache but the idempotency protection still applies.

Metrics

The idempotency system exposes several metrics for monitoring:

  • idempotency_cache_hits - Key lookups that found an existing record
  • idempotency_cache_misses - Key lookups that created new records
  • idempotency_duplicates_blocked - Events rejected as duplicates
  • idempotency_keys_active - Current number of active keys (updated periodically)

Key Files

File Purpose
services/idempotency/idempotency_manager.py Core idempotency logic
services/idempotency/redis_repository.py Redis storage adapter
events/handlers.py with_idempotency helper for event handlers
domain/idempotency/ Domain models