Skip to content

Quick start

Installation

# Minimal — AIOKafka factories and lifecycle helpers
pip install aiokafka-foundation-kit

# With Pydantic settings models and orjson acceleration
pip install aiokafka-foundation-kit[models,orjson]

# Full stack (adds Dishka, dependency-injector, OpenTelemetry)
pip install aiokafka-foundation-kit[all]

Requires Python 3.11+.


Producer

producer_lifecycle starts the producer, yields it, and guarantees a clean stop even when an exception is raised:

from aiokafka_foundation_kit import producer_lifecycle
from aiokafka_foundation_kit.contrib.models import BaseKafkaProducerSettings

settings = BaseKafkaProducerSettings(bootstrap_servers="localhost:9092")

async with producer_lifecycle(settings) as producer:
    await producer.send_and_wait("my-topic", b'{"event": "order_placed"}')

With topic auto-creation

Pass topics and auto_create_topics=True to ensure all topics exist before the producer starts:

from aiokafka_foundation_kit import producer_lifecycle, TopicConfig
from aiokafka_foundation_kit.contrib.models import BaseKafkaProducerSettings

settings = BaseKafkaProducerSettings(bootstrap_servers="localhost:9092")

topics = [
    TopicConfig(name="orders", num_partitions=6, replication_factor=3),
    TopicConfig(name="payments", num_partitions=3, replication_factor=3),
]

async with producer_lifecycle(settings, topics=topics, auto_create_topics=True) as producer:
    await producer.send_and_wait("orders", b'{"id": "42"}')

Lifecycle hooks

on_started / on_stopped accept async callables that receive the live producer instance:

from aiokafka import AIOKafkaProducer

async def on_ready(producer: AIOKafkaProducer) -> None:
    print("Producer ready")

async with producer_lifecycle(settings, on_started=on_ready) as producer:
    ...

Low-level factory

When you manage the lifecycle yourself (e.g. inside a custom DI framework):

from aiokafka_foundation_kit import create_async_kafka_producer

producer = create_async_kafka_producer(settings)
await producer.start()
try:
    await producer.send_and_wait("topic", b"data")
finally:
    await producer.stop()

Consumer

from aiokafka_foundation_kit import consumer_lifecycle
from aiokafka_foundation_kit.contrib.models import BaseKafkaConsumerSettings

settings = BaseKafkaConsumerSettings(
    bootstrap_servers="localhost:9092",
    group_id="order-processor",
)

async with consumer_lifecycle(settings, topics=("orders",)) as consumer:
    async for msg in consumer:
        print(f"partition={msg.partition} offset={msg.offset}: {msg.value!r}")
        await consumer.commit()

At-least-once delivery

enable_auto_commit defaults to False. Call await consumer.commit() after processing each message (or batch) to guarantee at-least-once delivery.

Multiple topics

async with consumer_lifecycle(
    settings,
    topics=("orders", "payments", "refunds"),
) as consumer:
    async for msg in consumer:
        ...

Low-level factory

from aiokafka_foundation_kit import create_async_kafka_consumer

consumer = create_async_kafka_consumer(settings, topics=["orders"])
await consumer.start()
try:
    async for msg in consumer:
        ...
finally:
    await consumer.stop()

Health check

Probe whether the Kafka cluster is reachable. Returns True when the broker responds within the timeout:

from aiokafka_foundation_kit import check_kafka_health_async
from aiokafka_foundation_kit.contrib.models import BaseKafkaProducerSettings

settings = BaseKafkaProducerSettings(bootstrap_servers="localhost:9092")

is_healthy = await check_kafka_health_async(settings, timeout_seconds=5.0)
if not is_healthy:
    raise RuntimeError("Kafka cluster is unreachable")

Pair this with a /healthz HTTP endpoint or a Kubernetes startup probe.


Topic management

Create topics idempotently — topics that already exist are logged and skipped silently:

from aiokafka_foundation_kit import ensure_topics_async, TopicConfig
from aiokafka_foundation_kit.contrib.models import BaseKafkaProducerSettings

settings = BaseKafkaProducerSettings(bootstrap_servers="localhost:9092")

topics = [
    TopicConfig(
        name="orders",
        num_partitions=6,
        replication_factor=3,
        topic_configs={"retention.ms": "604800000"},  # 7 days
    ),
    TopicConfig(name="dead-letter-orders", num_partitions=1, replication_factor=3),
]

await ensure_topics_async(topics, settings)

JSON serialisation

dumps_bytes / loads_bytes are the default serialisers used internally and can be used standalone. When orjson is installed (pip install aiokafka-foundation-kit[orjson]) it is used automatically; otherwise the standard json module is used as a fallback.

from aiokafka_foundation_kit.utils.json import dumps_bytes, loads_bytes

payload = {"event": "order_placed", "order_id": 42}

raw = dumps_bytes(payload)   # bytes
data = loads_bytes(raw)      # dict

# Raw bytes pass through unchanged (e.g. already-serialised Avro / Protobuf)
assert dumps_bytes(b"binary") == b"binary"

# None (tombstone) is preserved
assert loads_bytes(None) is None