Skip to content

Kafka test stability

Producer lifecycle

The project uses aiokafka (aiokafka==0.12.0) via FastStream as the Kafka backend. UnifiedProducer is a thin wrapper that delegates to await self._broker.publish() — it has no internal Kafka client and no start()/stop() methods of its own. The broker lifecycle is managed externally (by FastStream in the API process, or by each worker's startup code), so the confluent-kafka librdkafka race conditions that plague many Python Kafka test suites do not apply here.

In test fixtures, construct the producer and yield it directly:

@pytest.fixture(scope="function")
async def producer():
    p = UnifiedProducer(broker, event_repository, logger, settings, event_metrics)
    yield p

No explicit cleanup is needed — the broker is started and stopped separately.

Consumer teardown delays

The problem

Test teardown taking 40+ seconds with errors like:

ERROR aiokafka.consumer.group_coordinator: Error sending LeaveGroupRequest to node 1 [[Error 7] RequestTimedOutError]

This happens when consumer.stop() sends a LeaveGroupRequest to the Kafka coordinator, but the request times out.

Root cause

The request_timeout_ms parameter in aiokafka defaults to 40000ms (40 seconds). When the Kafka coordinator is slow or unresponsive during test teardown, the consumer waits the full timeout before giving up.

See aiokafka#773 for details on consumer stop delays.

The fix

Configure shorter timeouts in .env.test:

# Reduce consumer pool and timeouts for faster test startup/teardown
# https://github.com/aio-libs/aiokafka/issues/773
SSE_CONSUMER_POOL_SIZE=1
KAFKA_SESSION_TIMEOUT_MS=6000
KAFKA_HEARTBEAT_INTERVAL_MS=2000
KAFKA_REQUEST_TIMEOUT_MS=5000

All consumers must pass these settings to AIOKafkaConsumer:

consumer = AIOKafkaConsumer(
    *topics,
    bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
    session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS,
    heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS,
    request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS,
)

Results

Metric Before After
Teardown time 40s <1s
Total test time 70s 20-35s

Key timeouts explained

Setting Default Test Value Purpose
KAFKA_SESSION_TIMEOUT_MS 45000 6000 Time before broker considers consumer dead
KAFKA_HEARTBEAT_INTERVAL_MS 10000 2000 Frequency of heartbeats to coordinator
KAFKA_REQUEST_TIMEOUT_MS 40000 5000 Timeout for broker requests (including LeaveGroup)
SSE_CONSUMER_POOL_SIZE 10 1 Number of SSE consumers (fewer = faster startup)

Timeout constraints

request_timeout_ms must be less than session_timeout_ms. The test values (5000 < 6000) and production defaults (40000 < 45000) satisfy this constraint.