Skip to content

API Reference

Auto-generated from source using mkdocstrings.


Core

Factories and lifecycle

Create an AIOKafkaProducer from settings.

Parameters:

Name Type Description Default
settings ProducerSettingsProtocol

Kafka producer settings.

required
value_serializer Callable[[Any], bytes] | None

Optional custom value serializer. Defaults to dumps_bytes (orjson when available, stdlib json otherwise, with bytes pass-through).

None

Returns:

Type Description
AIOKafkaProducer

Configured AIOKafkaProducer (not started).

Source code in aiokafka_foundation_kit/producer/client.py
def create_async_kafka_producer(
    settings: ProducerSettingsProtocol,
    *,
    value_serializer: Callable[[Any], bytes] | None = None,
) -> AIOKafkaProducer:
    """Create an ``AIOKafkaProducer`` from settings.

    Args:
        settings: Kafka producer settings.
        value_serializer: Optional custom value serializer.
            Defaults to ``dumps_bytes`` (orjson when available, stdlib ``json``
            otherwise, with bytes pass-through).

    Returns:
        Configured ``AIOKafkaProducer`` (not started).
    """
    producer_config: dict[str, Any] = {
        **build_kafka_common_config(settings),
        "acks": settings.acks,
        "compression_type": settings.compression_type,
        "enable_idempotence": settings.enable_idempotence,
        "max_batch_size": settings.max_batch_size,
        "linger_ms": settings.linger_ms,
        "request_timeout_ms": settings.request_timeout_ms,
        "value_serializer": value_serializer or dumps_bytes,
    }

    return AIOKafkaProducer(**producer_config)

Manage a Kafka producer's lifecycle as an async context.

Auto-creates topics when auto_create_topics is true and topics is non-empty, then starts the producer, yields it, and ensures it is stopped even if the caller raises.

Parameters:

Name Type Description Default
settings ProducerSettingsProtocol

Kafka producer settings.

required
topics Sequence[TopicConfigProtocol] | None

Optional topic configs to ensure exist before the producer starts.

None
auto_create_topics bool

When true and topics is provided, run ensure_topics_async before producer start.

False
name str

Client name used in log messages. Useful when running multiple producers.

'producer'
on_started Callable[[AIOKafkaProducer], Awaitable[None]] | None

Optional async hook invoked after a successful start.

None
on_stopped Callable[[AIOKafkaProducer], Awaitable[None]] | None

Optional async hook invoked after a successful stop.

None
Source code in aiokafka_foundation_kit/producer/lifecycle.py
@asynccontextmanager
async def producer_lifecycle(
    settings: ProducerSettingsProtocol,
    *,
    topics: Sequence[TopicConfigProtocol] | None = None,
    auto_create_topics: bool = False,
    name: str = "producer",
    on_started: Callable[[AIOKafkaProducer], Awaitable[None]] | None = None,
    on_stopped: Callable[[AIOKafkaProducer], Awaitable[None]] | None = None,
) -> AsyncIterator[AIOKafkaProducer]:
    """Manage a Kafka producer's lifecycle as an async context.

    Auto-creates ``topics`` when ``auto_create_topics`` is true and ``topics``
    is non-empty, then starts the producer, yields it, and ensures it is stopped
    even if the caller raises.

    Args:
        settings: Kafka producer settings.
        topics: Optional topic configs to ensure exist before the producer starts.
        auto_create_topics: When true and ``topics`` is provided, run
            ``ensure_topics_async`` before producer start.
        name: Client name used in log messages. Useful when running multiple producers.
        on_started: Optional async hook invoked after a successful start.
        on_stopped: Optional async hook invoked after a successful stop.
    """
    if auto_create_topics and topics:
        await ensure_topics_async(topics, settings)

    producer = create_async_kafka_producer(settings)
    async with managed_kafka_client(
        producer,
        name=name,
        on_started=on_started,
        on_stopped=on_stopped,
    ) as started:
        yield started

Create an AIOKafkaConsumer from settings.

settings.enable_auto_commit is forwarded as-is. Keep it False for at-least-once delivery (the caller commits manually) and set it True only for fire-and-forget or idempotent consumers.

Parameters:

Name Type Description Default
settings ConsumerSettingsProtocol

Kafka consumer settings.

required
topics list[str] | None

Optional list of topics to subscribe to.

None
value_deserializer Callable[[bytes | None], Any] | None

Optional custom value deserializer. Defaults to loads_bytes (orjson when available, stdlib json otherwise).

None

Returns:

Type Description
AIOKafkaConsumer

Configured AIOKafkaConsumer (not started).

Source code in aiokafka_foundation_kit/consumer/client.py
def create_async_kafka_consumer(
    settings: ConsumerSettingsProtocol,
    topics: list[str] | None = None,
    *,
    value_deserializer: Callable[[bytes | None], Any] | None = None,
) -> AIOKafkaConsumer:
    """Create an ``AIOKafkaConsumer`` from settings.

    ``settings.enable_auto_commit`` is forwarded as-is. Keep it ``False`` for
    at-least-once delivery (the caller commits manually) and set it ``True``
    only for fire-and-forget or idempotent consumers.

    Args:
        settings: Kafka consumer settings.
        topics: Optional list of topics to subscribe to.
        value_deserializer: Optional custom value deserializer.
            Defaults to ``loads_bytes`` (orjson when available, stdlib ``json``
            otherwise).

    Returns:
        Configured ``AIOKafkaConsumer`` (not started).
    """
    consumer_config: dict[str, Any] = {
        **build_kafka_common_config(settings),
        "group_id": settings.group_id,
        "auto_offset_reset": settings.auto_offset_reset,
        "enable_auto_commit": settings.enable_auto_commit,
        "session_timeout_ms": settings.session_timeout_ms,
        "heartbeat_interval_ms": settings.heartbeat_interval_ms,
        "max_poll_records": settings.max_poll_records,
        "max_poll_interval_ms": settings.max_poll_interval_ms,
        "fetch_max_wait_ms": settings.fetch_max_wait_ms,
        "fetch_min_bytes": settings.fetch_min_bytes,
        "fetch_max_bytes": settings.fetch_max_bytes,
        "value_deserializer": value_deserializer or loads_bytes,
    }

    topic_args = tuple(topics) if topics else ()
    return AIOKafkaConsumer(*topic_args, **consumer_config)

Manage a Kafka consumer's lifecycle as an async context.

Starts the consumer, yields it, and ensures it is stopped even if the caller raises.

Parameters:

Name Type Description Default
settings ConsumerSettingsProtocol

Kafka consumer settings.

required
topics tuple[str, ...] | None

Optional tuple of topic names to subscribe to.

None
name str

Client name used in log messages. Useful when running multiple consumers.

'consumer'
on_started Callable[[AIOKafkaConsumer], Awaitable[None]] | None

Optional async hook invoked after a successful start.

None
on_stopped Callable[[AIOKafkaConsumer], Awaitable[None]] | None

Optional async hook invoked after a successful stop.

None
Source code in aiokafka_foundation_kit/consumer/lifecycle.py
@asynccontextmanager
async def consumer_lifecycle(
    settings: ConsumerSettingsProtocol,
    *,
    topics: tuple[str, ...] | None = None,
    name: str = "consumer",
    on_started: Callable[[AIOKafkaConsumer], Awaitable[None]] | None = None,
    on_stopped: Callable[[AIOKafkaConsumer], Awaitable[None]] | None = None,
) -> AsyncIterator[AIOKafkaConsumer]:
    """Manage a Kafka consumer's lifecycle as an async context.

    Starts the consumer, yields it, and ensures it is stopped even if the caller raises.

    Args:
        settings: Kafka consumer settings.
        topics: Optional tuple of topic names to subscribe to.
        name: Client name used in log messages. Useful when running multiple consumers.
        on_started: Optional async hook invoked after a successful start.
        on_stopped: Optional async hook invoked after a successful stop.
    """
    consumer = create_async_kafka_consumer(settings, list(topics) if topics else None)
    async with managed_kafka_client(
        consumer,
        name=name,
        on_started=on_started,
        on_stopped=on_stopped,
    ) as started:
        yield started

Topics

Kafka topic configuration.

Source code in aiokafka_foundation_kit/topics/config.py
@dataclass(frozen=True)
class TopicConfig:
    """Kafka topic configuration."""

    name: str
    num_partitions: int
    replication_factor: int
    replica_assignment: dict[int, list[int]] | None = None
    topic_configs: dict[str, str] | None = None

Ensure Kafka topics exist, creating them if necessary.

Topics are created one-by-one so that existing topics are reported individually (instead of treating the whole batch as "already exists" when any topic in the batch is new).

Parameters:

Name Type Description Default
topics Sequence[TopicConfigProtocol]

Sequence of topic configurations to ensure.

required
settings KafkaSettingsProtocol

Kafka settings for connection.

required

Raises:

Type Description
Exception

If topic creation fails for reasons other than topic already existing.

Source code in aiokafka_foundation_kit/topics/management.py
async def ensure_topics_async(
    topics: Sequence[TopicConfigProtocol],
    settings: KafkaSettingsProtocol,
) -> None:
    """Ensure Kafka topics exist, creating them if necessary.

    Topics are created one-by-one so that existing topics are reported
    individually (instead of treating the whole batch as "already exists" when
    any topic in the batch is new).

    Args:
        topics: Sequence of topic configurations to ensure.
        settings: Kafka settings for connection.

    Raises:
        Exception: If topic creation fails for reasons other than topic already existing.
    """
    if not topics:
        return

    admin_client = AIOKafkaAdminClient(**build_kafka_common_config(settings))
    try:
        await admin_client.start()
        for topic in topics:
            try:
                await admin_client.create_topics([_to_new_topic(topic)], validate_only=False)
                logger.info("Created topic: %s", topic.name)
            except TopicAlreadyExistsError:
                logger.debug("Topic already exists: %s", topic.name)
    finally:
        await admin_client.close()

Health

Check Kafka cluster health by attempting to connect.

Parameters:

Name Type Description Default
settings KafkaSettingsProtocol

Kafka settings for connection.

required
timeout_seconds float

Connection timeout in seconds.

5.0

Returns:

Type Description
bool

True if Kafka is healthy, False otherwise.

Source code in aiokafka_foundation_kit/health/checks.py
async def check_kafka_health_async(settings: KafkaSettingsProtocol, timeout_seconds: float = 5.0) -> bool:
    """Check Kafka cluster health by attempting to connect.

    Args:
        settings: Kafka settings for connection.
        timeout_seconds: Connection timeout in seconds.

    Returns:
        True if Kafka is healthy, False otherwise.
    """
    config = build_kafka_common_config(settings)
    config["request_timeout_ms"] = int(timeout_seconds * 1000)

    producer = AIOKafkaProducer(**config)

    try:
        await producer.start()
        await producer.stop()
    except (TimeoutError, KafkaError, OSError):
        logger.exception("Kafka health check failed")
        return False

    logger.debug("Kafka health check passed")
    return True

Utilities

Build common Kafka client configuration from settings.

Parameters:

Name Type Description Default
settings KafkaSettingsProtocol

Kafka settings object conforming to KafkaSettingsProtocol.

required

Returns:

Type Description
dict[str, Any]

Dictionary of Kafka client configuration parameters.

Source code in aiokafka_foundation_kit/utils/config.py
def build_kafka_common_config(settings: KafkaSettingsProtocol) -> dict[str, Any]:
    """Build common Kafka client configuration from settings.

    Args:
        settings: Kafka settings object conforming to KafkaSettingsProtocol.

    Returns:
        Dictionary of Kafka client configuration parameters.
    """
    config: dict[str, Any] = {
        "bootstrap_servers": settings.bootstrap_servers,
        "security_protocol": settings.security_protocol,
        "metadata_max_age_ms": settings.metadata_max_age_ms,
    }

    if settings.client_id:
        config["client_id"] = settings.client_id

    if settings.security_protocol in ("SASL_PLAINTEXT", "SASL_SSL"):
        config["sasl_mechanism"] = settings.sasl_mechanism
        config["sasl_plain_username"] = settings.sasl_username
        config["sasl_plain_password"] = settings.get_sasl_password()

    if settings.security_protocol in ("SSL", "SASL_SSL"):
        if settings.ssl_cafile:
            config["ssl_cafile"] = settings.ssl_cafile
        if settings.ssl_certfile:
            config["ssl_certfile"] = settings.ssl_certfile
        if settings.ssl_keyfile:
            config["ssl_keyfile"] = settings.ssl_keyfile
        config["ssl_check_hostname"] = settings.ssl_check_hostname

    return config

Serialize value to bytes.

bytes are returned unchanged; everything else is JSON-encoded.

Source code in aiokafka_foundation_kit/utils/json.py
def dumps_bytes(value: Any) -> bytes:
    """Serialize ``value`` to ``bytes``.

    ``bytes`` are returned unchanged; everything else is JSON-encoded.
    """
    if isinstance(value, bytes):
        return value
    if HAS_ORJSON:
        return orjson.dumps(value)  # type: ignore[no-any-return]
    return json.dumps(value).encode("utf-8")

Deserialize JSON bytes to a Python object.

Returns None for tombstone messages (value is None).

Source code in aiokafka_foundation_kit/utils/json.py
def loads_bytes(value: bytes | None) -> Any:
    """Deserialize JSON ``bytes`` to a Python object.

    Returns ``None`` for tombstone messages (``value is None``).
    """
    if value is None:
        return None
    if HAS_ORJSON:
        return orjson.loads(value)
    return json.loads(value.decode("utf-8"))

Start client, run on_started, yield it, then stop it.

Stop errors are caught and logged so the caller's original exception (if any) is preserved.

Source code in aiokafka_foundation_kit/utils/lifecycle.py
@asynccontextmanager
async def managed_kafka_client(
    client: _ClientT,
    *,
    name: str,
    on_started: Callable[[_ClientT], Awaitable[None]] | None = None,
    on_stopped: Callable[[_ClientT], Awaitable[None]] | None = None,
) -> AsyncIterator[_ClientT]:
    """Start ``client``, run ``on_started``, yield it, then stop it.

    Stop errors are caught and logged so the caller's original exception (if
    any) is preserved.
    """
    await client.start()
    logger.info("Kafka %s started", name)
    if on_started is not None:
        await on_started(client)

    try:
        yield client
    finally:
        try:
            await client.stop()
            logger.info("Kafka %s stopped", name)
            if on_stopped is not None:
                await on_stopped(client)
        except KafkaError:
            logger.exception("Error stopping Kafka %s", name)

Configuration protocols

Bases: KafkaSaslSettingsProtocol, KafkaSslSettingsProtocol, Protocol

Protocol for base Kafka settings (connection + SASL + SSL).

Source code in aiokafka_foundation_kit/config/kafka.py
class KafkaSettingsProtocol(KafkaSaslSettingsProtocol, KafkaSslSettingsProtocol, Protocol):
    """Protocol for base Kafka settings (connection + SASL + SSL)."""

Bases: KafkaSettingsProtocol

Protocol for Kafka consumer settings.

Source code in aiokafka_foundation_kit/config/consumer.py
class ConsumerSettingsProtocol(KafkaSettingsProtocol):
    """Protocol for Kafka consumer settings."""

    group_id: str
    auto_offset_reset: KafkaOffsetReset
    enable_auto_commit: bool
    session_timeout_ms: int
    heartbeat_interval_ms: int
    max_poll_records: int
    max_poll_interval_ms: int
    fetch_max_wait_ms: int
    fetch_min_bytes: int
    fetch_max_bytes: int

Bases: KafkaSettingsProtocol

Protocol for Kafka producer settings.

Source code in aiokafka_foundation_kit/config/producer.py
class ProducerSettingsProtocol(KafkaSettingsProtocol):
    """Protocol for Kafka producer settings."""

    acks: KafkaAcks
    compression_type: KafkaCompressionType | None
    enable_idempotence: bool
    max_batch_size: int
    linger_ms: int
    request_timeout_ms: int

Bases: Protocol

Protocol for topic configuration.

Source code in aiokafka_foundation_kit/config/topic.py
class TopicConfigProtocol(Protocol):
    """Protocol for topic configuration."""

    name: str
    num_partitions: int
    replication_factor: int
    replica_assignment: dict[int, list[int]] | None
    topic_configs: dict[str, str] | None

Bases: Protocol

Protocol for core Kafka connection fields only.

Source code in aiokafka_foundation_kit/config/kafka.py
class KafkaConnectionSettingsProtocol(Protocol):
    """Protocol for core Kafka connection fields only."""

    bootstrap_servers: str
    client_id: str | None
    security_protocol: KafkaSecurityProtocol
    metadata_max_age_ms: int

    def get_sasl_password(self) -> str | None: ...

Bases: KafkaConnectionSettingsProtocol, Protocol

Protocol for SASL credentials. Relevant when security_protocol is SASL_*.

Source code in aiokafka_foundation_kit/config/kafka.py
class KafkaSaslSettingsProtocol(KafkaConnectionSettingsProtocol, Protocol):
    """Protocol for SASL credentials. Relevant when ``security_protocol`` is ``SASL_*``."""

    sasl_mechanism: KafkaSaslMechanism | None
    sasl_username: str | None

Bases: KafkaConnectionSettingsProtocol, Protocol

Protocol for TLS material. Relevant when security_protocol is SSL or SASL_SSL.

Source code in aiokafka_foundation_kit/config/kafka.py
class KafkaSslSettingsProtocol(KafkaConnectionSettingsProtocol, Protocol):
    """Protocol for TLS material. Relevant when ``security_protocol`` is ``SSL`` or ``SASL_SSL``."""

    ssl_cafile: str | None
    ssl_certfile: str | None
    ssl_keyfile: str | None
    ssl_check_hostname: bool

Bases: ProducerSettingsProtocol

Producer settings that also carry topic auto-creation policy.

Used by lifecycle/DI helpers that need to know whether to call :func:ensure_topics_async before starting the producer.

Source code in aiokafka_foundation_kit/config/producer.py
class ProducerLifecycleSettingsProtocol(ProducerSettingsProtocol):
    """Producer settings that also carry topic auto-creation policy.

    Used by lifecycle/DI helpers that need to know whether to call
    :func:`ensure_topics_async` before starting the producer.
    """

    auto_create_topics: bool

contrib.models

Requires pip install aiokafka-foundation-kit[models].

Bases: KafkaConnectionMixin, KafkaSaslMixin, KafkaSslMixin

Base Kafka configuration shared between producer and consumer.

Source code in aiokafka_foundation_kit/contrib/models/kafka.py
class BaseKafkaSettings(KafkaConnectionMixin, KafkaSaslMixin, KafkaSslMixin):
    """Base Kafka configuration shared between producer and consumer."""

    security_protocol: KafkaSecurityProtocol = Field(
        default="PLAINTEXT",
        description="Security protocol: PLAINTEXT, SASL_PLAINTEXT, SSL, SASL_SSL",
    )

    @model_validator(mode="after")
    def _validate_sasl(self) -> BaseKafkaSettings:
        if self.security_protocol not in {"SASL_PLAINTEXT", "SASL_SSL"}:
            return self

        if not self.sasl_mechanism:
            msg = f"Kafka sasl_mechanism is required for {self.security_protocol}"
            raise ValueError(msg)
        if not self.sasl_username:
            msg = f"Kafka sasl_username is required for {self.security_protocol}"
            raise ValueError(msg)
        if self.sasl_password is None or not self.sasl_password.get_secret_value():
            msg = f"Kafka sasl_password is required for {self.security_protocol}"
            raise ValueError(msg)

        return self

    @model_validator(mode="after")
    def _validate_ssl(self) -> BaseKafkaSettings:
        if self.security_protocol not in {"SSL", "SASL_SSL"}:
            return self

        if not self.ssl_cafile:
            msg = f"Kafka ssl_cafile is required for {self.security_protocol}"
            raise ValueError(msg)

        return self

get_sasl_password()

Plaintext SASL password for Kafka client config. Do NOT log.

Source code in aiokafka_foundation_kit/contrib/models/kafka.py
def get_sasl_password(self) -> str | None:
    """Plaintext SASL password for Kafka client config. Do NOT log."""
    if self.sasl_password is None:
        return None
    secret: str = self.sasl_password.get_secret_value()
    return secret

Bases: BaseModel

Kafka transport connection parameters.

Source code in aiokafka_foundation_kit/contrib/models/kafka.py
class KafkaConnectionMixin(BaseModel):
    """Kafka transport connection parameters."""

    bootstrap_servers: str = Field(description="Kafka bootstrap servers (comma-separated)")
    client_id: str | None = Field(default=None, description="Client ID for Kafka")
    metadata_max_age_ms: int = Field(default=300000, ge=0, description="Metadata max age in ms")

Bases: BaseModel

SASL credentials. Required when security_protocol selects SASL_*.

Source code in aiokafka_foundation_kit/contrib/models/kafka.py
class KafkaSaslMixin(BaseModel):
    """SASL credentials. Required when ``security_protocol`` selects SASL_*."""

    sasl_mechanism: KafkaSaslMechanism | None = Field(
        default=None,
        description="SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512",
    )
    sasl_username: str | None = Field(default=None, description="SASL username")
    sasl_password: SecretStr | None = Field(default=None, description="SASL password")

    def get_sasl_password(self) -> str | None:
        """Plaintext SASL password for Kafka client config. Do NOT log."""
        if self.sasl_password is None:
            return None
        secret: str = self.sasl_password.get_secret_value()
        return secret

get_sasl_password()

Plaintext SASL password for Kafka client config. Do NOT log.

Source code in aiokafka_foundation_kit/contrib/models/kafka.py
def get_sasl_password(self) -> str | None:
    """Plaintext SASL password for Kafka client config. Do NOT log."""
    if self.sasl_password is None:
        return None
    secret: str = self.sasl_password.get_secret_value()
    return secret

Bases: BaseModel

TLS material. Required when security_protocol selects SSL/SASL_SSL.

Source code in aiokafka_foundation_kit/contrib/models/kafka.py
class KafkaSslMixin(BaseModel):
    """TLS material. Required when ``security_protocol`` selects SSL/SASL_SSL."""

    ssl_cafile: str | None = Field(default=None, description="Path to CA certificate")
    ssl_certfile: str | None = Field(default=None, description="Path to client certificate")
    ssl_keyfile: str | None = Field(default=None, description="Path to client key")
    ssl_check_hostname: bool = Field(default=True, description="Verify hostname")

Bases: BaseKafkaSettings

Kafka consumer configuration.

enable_auto_commit defaults to False because at-least-once consumers must commit offsets manually; flip it to True only for fire-and-forget or idempotent consumers.

Source code in aiokafka_foundation_kit/contrib/models/consumer.py
class BaseKafkaConsumerSettings(BaseKafkaSettings):
    """Kafka consumer configuration.

    ``enable_auto_commit`` defaults to ``False`` because at-least-once consumers
    must commit offsets manually; flip it to ``True`` only for fire-and-forget
    or idempotent consumers.
    """

    group_id: str = Field(description="Consumer group ID")
    auto_offset_reset: KafkaOffsetReset = Field(default="earliest", description="Auto offset reset")
    enable_auto_commit: bool = Field(
        default=False,
        description="Enable broker-side auto-commit. Keep False for at-least-once delivery.",
    )
    session_timeout_ms: int = Field(default=30000, ge=0, description="Session timeout in ms")
    heartbeat_interval_ms: int = Field(default=3000, ge=0, description="Heartbeat interval in ms")
    max_poll_records: int = Field(default=500, ge=1, description="Max records per poll")
    max_poll_interval_ms: int = Field(default=300000, ge=1, description="Maximum poll interval in ms")
    fetch_max_wait_ms: int = Field(default=500, ge=0, description="Maximum wait time for fetch response")
    fetch_min_bytes: int = Field(default=1, ge=0, description="Minimum bytes server should return for fetch")
    fetch_max_bytes: int = Field(default=52428800, ge=1, description="Maximum bytes fetched per request")

get_sasl_password()

Plaintext SASL password for Kafka client config. Do NOT log.

Source code in aiokafka_foundation_kit/contrib/models/kafka.py
def get_sasl_password(self) -> str | None:
    """Plaintext SASL password for Kafka client config. Do NOT log."""
    if self.sasl_password is None:
        return None
    secret: str = self.sasl_password.get_secret_value()
    return secret

Bases: BaseKafkaSettings, KafkaAutoCreateMixin

Kafka producer configuration.

Source code in aiokafka_foundation_kit/contrib/models/producer.py
class BaseKafkaProducerSettings(BaseKafkaSettings, KafkaAutoCreateMixin):
    """Kafka producer configuration."""

    acks: KafkaAcks = Field(default="all", description="Acks: 0, 1, all")
    compression_type: KafkaCompressionType | None = Field(
        default="gzip",
        description="Compression: gzip, snappy, lz4, zstd, None",
    )
    enable_idempotence: bool = Field(default=True, description="Enable idempotent producer")
    max_batch_size: int = Field(default=16384, ge=0, description="Max batch size in bytes")
    linger_ms: int = Field(default=5, ge=0, description="Linger time in ms for batching")
    request_timeout_ms: int = Field(default=30000, ge=0, description="Request timeout in ms")

    @model_validator(mode="after")
    def _validate_auto_create(self) -> BaseKafkaProducerSettings:
        if not self.auto_create_topics:
            return self

        if self.default_replication_factor is None:
            msg = "Kafka default_replication_factor is required when auto_create_topics is enabled"
            raise ValueError(msg)

        return self

get_sasl_password()

Plaintext SASL password for Kafka client config. Do NOT log.

Source code in aiokafka_foundation_kit/contrib/models/kafka.py
def get_sasl_password(self) -> str | None:
    """Plaintext SASL password for Kafka client config. Do NOT log."""
    if self.sasl_password is None:
        return None
    secret: str = self.sasl_password.get_secret_value()
    return secret

Bases: BaseModel

Mixin with topic auto-creation settings (kept separate from transport config).

Source code in aiokafka_foundation_kit/contrib/models/producer.py
class KafkaAutoCreateMixin(BaseModel):
    """Mixin with topic auto-creation settings (kept separate from transport config)."""

    auto_create_topics: bool = Field(
        default=False,
        description="Auto-create topics on startup",
    )
    default_partitions: int = Field(
        default=3,
        ge=1,
        description="Default partitions for new topics",
    )
    default_replication_factor: int | None = Field(
        default=None,
        ge=1,
        description="Default replication factor for new topics",
    )

Bases: BaseModel

Shared Kafka infrastructure settings.

Source code in aiokafka_foundation_kit/contrib/models/infra.py
class BaseKafkaInfraSettings(BaseModel):
    """Shared Kafka infrastructure settings."""

    topic_prefix: str | None = Field(default=None, description="Global prefix for all service topics")
    topic_catalog: dict[str, KafkaTopicSettings] | None = Field(
        default=None,
        description="Map of logical topic names to physical settings for topic creation",
    )
    consumer_subscriptions: list[str] | None = Field(
        default=None,
        description="Logical topic names to subscribe for inbox consumers",
    )

    @field_validator("topic_prefix", mode="before")
    @classmethod
    def _coerce_topic_prefix(cls, value: object) -> str | None:
        return normalize_kafka_topic_prefix_value(value)

Bases: BaseModel

Physical parameters for a specific Kafka topic.

Source code in aiokafka_foundation_kit/contrib/models/infra.py
class KafkaTopicSettings(BaseModel):
    """Physical parameters for a specific Kafka topic."""

    num_partitions: int = Field(default=3, ge=1, description="Number of partitions for the topic")
    replication_factor: int = Field(default=3, ge=1, description="Replication factor for the topic")
    topic_configs: dict[str, str] | None = Field(default=None, description="Topic-specific configuration overrides")

Return a real prefix or None when unset.

Vault/env sometimes stringify JSON null as the literal strings "None" or "null", which would otherwise produce physical topic names like None.auth.*.

Source code in aiokafka_foundation_kit/contrib/models/infra.py
def normalize_kafka_topic_prefix_value(value: object) -> str | None:
    """Return a real prefix or ``None`` when unset.

    Vault/env sometimes stringify JSON ``null`` as the literal strings ``"None"`` or
    ``"null"``, which would otherwise produce physical topic names like ``None.auth.*``.
    """
    if value is None:
        return None
    if isinstance(value, str):
        s = value.strip()
        if not s or s.lower() in ("none", "null"):
            return None
        return s
    return None

contrib.di (Dishka)

Requires pip install aiokafka-foundation-kit[dishka].

Bases: Provider

Dishka provider exposing an AIOKafkaProducer for APP scope.

Topics passed via the container are auto-created when settings.auto_create_topics is enabled.

Source code in aiokafka_foundation_kit/contrib/di/producer.py
class AsyncKafkaProducerProvider(Provider):
    """Dishka provider exposing an ``AIOKafkaProducer`` for ``APP`` scope.

    Topics passed via the container are auto-created when
    ``settings.auto_create_topics`` is enabled.
    """

    scope = Scope.APP

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        check_dishka()
        super().__init__(*args, **kwargs)

    @provide
    async def get_kafka_producer(
        self,
        kafka_settings: ProducerLifecycleSettingsProtocol,
        topics: Sequence[TopicConfigProtocol] | None = None,
    ) -> AsyncIterator[AIOKafkaProducer]:
        """Provide Kafka producer as a singleton for APP scope."""
        async with producer_lifecycle(
            kafka_settings,
            topics=topics,
            auto_create_topics=kafka_settings.auto_create_topics,
        ) as producer:
            yield producer

get_kafka_producer(kafka_settings, topics=None) async

Provide Kafka producer as a singleton for APP scope.

Source code in aiokafka_foundation_kit/contrib/di/producer.py
@provide
async def get_kafka_producer(
    self,
    kafka_settings: ProducerLifecycleSettingsProtocol,
    topics: Sequence[TopicConfigProtocol] | None = None,
) -> AsyncIterator[AIOKafkaProducer]:
    """Provide Kafka producer as a singleton for APP scope."""
    async with producer_lifecycle(
        kafka_settings,
        topics=topics,
        auto_create_topics=kafka_settings.auto_create_topics,
    ) as producer:
        yield producer

Bases: Provider

Dishka provider exposing an AIOKafkaConsumer for APP scope.

Source code in aiokafka_foundation_kit/contrib/di/consumer.py
class AsyncKafkaConsumerProvider(Provider):
    """Dishka provider exposing an ``AIOKafkaConsumer`` for ``APP`` scope."""

    scope = Scope.APP

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        check_dishka()
        super().__init__(*args, **kwargs)

    @provide
    async def get_kafka_consumer(
        self,
        kafka_settings: ConsumerSettingsProtocol,
        topics: tuple[str, ...] | None = None,
    ) -> AsyncIterator[AIOKafkaConsumer]:
        """Provide Kafka consumer as a singleton for APP scope."""
        async with consumer_lifecycle(
            kafka_settings,
            topics=topics,
        ) as consumer:
            yield consumer

get_kafka_consumer(kafka_settings, topics=None) async

Provide Kafka consumer as a singleton for APP scope.

Source code in aiokafka_foundation_kit/contrib/di/consumer.py
@provide
async def get_kafka_consumer(
    self,
    kafka_settings: ConsumerSettingsProtocol,
    topics: tuple[str, ...] | None = None,
) -> AsyncIterator[AIOKafkaConsumer]:
    """Provide Kafka consumer as a singleton for APP scope."""
    async with consumer_lifecycle(
        kafka_settings,
        topics=topics,
    ) as consumer:
        yield consumer

Bases: Provider

Provide resolved topic configs for producer and subscriptions for consumers.

Source code in aiokafka_foundation_kit/contrib/di/infra.py
class KafkaInfraProvider(Provider):
    """Provide resolved topic configs for producer and subscriptions for consumers."""

    scope = Scope.APP

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        check_dishka()
        super().__init__(*args, **kwargs)

    @provide
    def get_topic_configs_for_catalog(
        self,
        settings: KafkaProducerInfraSettingsProtocol,
    ) -> Sequence[TopicConfig]:
        """Provide physical topic configs for topic creation based on topic catalog."""
        prefix = settings.topic_prefix
        topic_catalog = settings.topic_catalog or {}

        return [
            TopicConfig(
                name=_apply_topic_prefix(prefix, name),
                num_partitions=cfg.num_partitions,
                replication_factor=cfg.replication_factor,
                topic_configs=cfg.topic_configs,
            )
            for name, cfg in topic_catalog.items()
        ]

    @provide
    def get_consumer_subscription_topics(
        self,
        settings: KafkaConsumerInfraSettingsProtocol,
    ) -> tuple[str, ...]:
        """Provide fully resolved physical topic names for consumer subscriptions."""
        subscriptions = settings.consumer_subscriptions or []
        if not subscriptions:
            return ()

        prefix = settings.topic_prefix
        return tuple(_apply_topic_prefix(prefix, topic) for topic in subscriptions)

get_consumer_subscription_topics(settings)

Provide fully resolved physical topic names for consumer subscriptions.

Source code in aiokafka_foundation_kit/contrib/di/infra.py
@provide
def get_consumer_subscription_topics(
    self,
    settings: KafkaConsumerInfraSettingsProtocol,
) -> tuple[str, ...]:
    """Provide fully resolved physical topic names for consumer subscriptions."""
    subscriptions = settings.consumer_subscriptions or []
    if not subscriptions:
        return ()

    prefix = settings.topic_prefix
    return tuple(_apply_topic_prefix(prefix, topic) for topic in subscriptions)

get_topic_configs_for_catalog(settings)

Provide physical topic configs for topic creation based on topic catalog.

Source code in aiokafka_foundation_kit/contrib/di/infra.py
@provide
def get_topic_configs_for_catalog(
    self,
    settings: KafkaProducerInfraSettingsProtocol,
) -> Sequence[TopicConfig]:
    """Provide physical topic configs for topic creation based on topic catalog."""
    prefix = settings.topic_prefix
    topic_catalog = settings.topic_catalog or {}

    return [
        TopicConfig(
            name=_apply_topic_prefix(prefix, name),
            num_partitions=cfg.num_partitions,
            replication_factor=cfg.replication_factor,
            topic_configs=cfg.topic_configs,
        )
        for name, cfg in topic_catalog.items()
    ]

Bases: KafkaInfraBaseSettingsProtocol, Protocol

Protocol for producer-side Kafka infrastructure settings.

Defines what producer providers need: a topic prefix and a catalog of physical topic configs to ensure exist on startup.

Source code in aiokafka_foundation_kit/contrib/di/infra.py
class KafkaProducerInfraSettingsProtocol(KafkaInfraBaseSettingsProtocol, Protocol):
    """Protocol for producer-side Kafka infrastructure settings.

    Defines what producer providers need: a topic prefix and a catalog of
    physical topic configs to ensure exist on startup.
    """

    topic_catalog: dict[str, KafkaTopicSettingsProtocol] | None

Bases: KafkaInfraBaseSettingsProtocol, Protocol

Protocol for consumer-side Kafka infrastructure settings.

Defines what consumer providers need: a topic prefix and a list of logical topic names to subscribe to.

Source code in aiokafka_foundation_kit/contrib/di/infra.py
class KafkaConsumerInfraSettingsProtocol(KafkaInfraBaseSettingsProtocol, Protocol):
    """Protocol for consumer-side Kafka infrastructure settings.

    Defines what consumer providers need: a topic prefix and a list of
    logical topic names to subscribe to.
    """

    consumer_subscriptions: list[str] | None

Bases: Protocol

Physical parameters for one topic in a catalog.

Unlike :class:~aiokafka_foundation_kit.config.topic.TopicConfigProtocol, this one omits name because the topic name comes from the catalog map key in :class:KafkaProducerInfraSettingsProtocol.topic_catalog.

Source code in aiokafka_foundation_kit/contrib/di/infra.py
class KafkaTopicSettingsProtocol(Protocol):
    """Physical parameters for one topic in a catalog.

    Unlike :class:`~aiokafka_foundation_kit.config.topic.TopicConfigProtocol`,
    this one omits ``name`` because the topic name comes from the catalog map
    key in :class:`KafkaProducerInfraSettingsProtocol.topic_catalog`.
    """

    num_partitions: int
    replication_factor: int
    topic_configs: dict[str, str] | None

contrib.dependency_injector

Requires pip install aiokafka-foundation-kit[dependency-injector].

Bases: DeclarativeContainer

Container for Kafka producer dependencies.

Provides
  • producer: :class:AIOKafkaProducer with automatic lifecycle.
Configuration
  • kafka_settings: Kafka producer settings (ProducerSettingsProtocol).
  • topics: Optional list of topics to auto-create (Sequence[TopicConfigProtocol]).
  • auto_create_topics: Whether to run ensure_topics_async on startup.
Source code in aiokafka_foundation_kit/contrib/dependency_injector/producer.py
class KafkaProducerContainer(containers.DeclarativeContainer):
    """Container for Kafka producer dependencies.

    Provides:
        - ``producer``: :class:`AIOKafkaProducer` with automatic lifecycle.

    Configuration:
        - ``kafka_settings``: Kafka producer settings (``ProducerSettingsProtocol``).
        - ``topics``: Optional list of topics to auto-create (``Sequence[TopicConfigProtocol]``).
        - ``auto_create_topics``: Whether to run ``ensure_topics_async`` on startup.
    """

    kafka_settings = providers.Dependency()  # type: ignore[var-annotated]
    topics = providers.Dependency(default=None)  # type: ignore[var-annotated]
    auto_create_topics = providers.Object(False)  # type: ignore[var-annotated]

    producer = providers.Resource(  # type: ignore[var-annotated]
        _producer_resource,
        kafka_settings=kafka_settings,
        topics=topics,
        auto_create_topics=auto_create_topics,
    )

    def __init__(self, *args: Any, **kwargs: Any) -> None:  # pragma: no cover
        check_dependency_injector()  # pragma: no cover
        super().__init__(*args, **kwargs)  # pragma: no cover

Bases: DeclarativeContainer

Container for Kafka consumer dependencies.

Provides
  • consumer: :class:AIOKafkaConsumer with automatic lifecycle.
Configuration
  • kafka_settings: Kafka consumer settings (ConsumerSettingsProtocol).
  • topics: Optional tuple of topic names to subscribe to.
Source code in aiokafka_foundation_kit/contrib/dependency_injector/consumer.py
class KafkaConsumerContainer(containers.DeclarativeContainer):
    """Container for Kafka consumer dependencies.

    Provides:
        - ``consumer``: :class:`AIOKafkaConsumer` with automatic lifecycle.

    Configuration:
        - ``kafka_settings``: Kafka consumer settings (``ConsumerSettingsProtocol``).
        - ``topics``: Optional tuple of topic names to subscribe to.
    """

    kafka_settings = providers.Dependency()  # type: ignore[var-annotated]
    topics = providers.Dependency(default=None)  # type: ignore[var-annotated]

    consumer = providers.Resource(  # type: ignore[var-annotated]
        _consumer_resource,
        kafka_settings=kafka_settings,
        topics=topics,
    )

    def __init__(self, *args: Any, **kwargs: Any) -> None:  # pragma: no cover
        check_dependency_injector()  # pragma: no cover
        super().__init__(*args, **kwargs)  # pragma: no cover

contrib.telemetry

Requires pip install aiokafka-foundation-kit[telemetry].

Instrument aiokafka clients for OpenTelemetry tracing.

Automatically traces all aiokafka producer and consumer operations including message publishing, consumption, and offset commits.

Parameters:

Name Type Description Default
tracer_provider TracerProvider | None

Optional tracer provider. Defaults to the global provider.

None
async_produce_hook AsyncProduceHook | None

Async callback executed before sending a message. Signature: async def hook(span: Span, args, kwargs) -> None. Use it to enrich producer spans with custom attributes.

None
async_consume_hook AsyncConsumeHook | None

Async callback executed after consuming a message. Signature: async def hook(span: Span, record, args, kwargs) -> None. Use it to enrich consumer spans with custom attributes.

None
**kwargs Any

Additional keyword arguments passed to AIOKafkaInstrumentor.instrument.

{}

Raises:

Type Description
ImportError

If opentelemetry-instrumentation-aiokafka is not installed.

Examples:

Basic usage::

from aiokafka_foundation_kit.contrib.telemetry import instrument_aiokafka

instrument_aiokafka()

With custom tracer provider::

from opentelemetry.sdk.trace import TracerProvider

provider = TracerProvider()
instrument_aiokafka(tracer_provider=provider)

With hooks for span enrichment::

async def produce_hook(span, args, kwargs):
    if span and span.is_recording():
        span.set_attribute("app.user_id", "42")

async def consume_hook(span, record, args, kwargs):
    if span and span.is_recording():
        span.set_attribute("app.message_id", record.headers.get("id"))

instrument_aiokafka(
    async_produce_hook=produce_hook,
    async_consume_hook=consume_hook,
)
Source code in aiokafka_foundation_kit/contrib/telemetry/instrumentations.py
def instrument_aiokafka(
    *,
    tracer_provider: TracerProvider | None = None,
    async_produce_hook: AsyncProduceHook | None = None,
    async_consume_hook: AsyncConsumeHook | None = None,
    **kwargs: Any,
) -> None:
    """Instrument aiokafka clients for OpenTelemetry tracing.

    Automatically traces all aiokafka producer and consumer operations including
    message publishing, consumption, and offset commits.

    Args:
        tracer_provider: Optional tracer provider. Defaults to the global provider.
        async_produce_hook: Async callback executed before sending a message.
            Signature: ``async def hook(span: Span, args, kwargs) -> None``.
            Use it to enrich producer spans with custom attributes.
        async_consume_hook: Async callback executed after consuming a message.
            Signature: ``async def hook(span: Span, record, args, kwargs) -> None``.
            Use it to enrich consumer spans with custom attributes.
        **kwargs: Additional keyword arguments passed to ``AIOKafkaInstrumentor.instrument``.

    Raises:
        ImportError: If ``opentelemetry-instrumentation-aiokafka`` is not installed.

    Examples:
        Basic usage::

            from aiokafka_foundation_kit.contrib.telemetry import instrument_aiokafka

            instrument_aiokafka()

        With custom tracer provider::

            from opentelemetry.sdk.trace import TracerProvider

            provider = TracerProvider()
            instrument_aiokafka(tracer_provider=provider)

        With hooks for span enrichment::

            async def produce_hook(span, args, kwargs):
                if span and span.is_recording():
                    span.set_attribute("app.user_id", "42")

            async def consume_hook(span, record, args, kwargs):
                if span and span.is_recording():
                    span.set_attribute("app.message_id", record.headers.get("id"))

            instrument_aiokafka(
                async_produce_hook=produce_hook,
                async_consume_hook=consume_hook,
            )
    """
    try:
        from opentelemetry.instrumentation.aiokafka import (  # noqa: PLC0415
            AIOKafkaInstrumentor,
        )
    except ImportError as e:
        raise ImportError(
            "opentelemetry-instrumentation-aiokafka not installed. "
            "Install with: pip install 'aiokafka-foundation-kit[telemetry]'"
        ) from e

    overrides: dict[str, Any] = {}
    if tracer_provider is not None:
        overrides["tracer_provider"] = tracer_provider
    if async_produce_hook is not None:
        overrides["async_produce_hook"] = async_produce_hook
    if async_consume_hook is not None:
        overrides["async_consume_hook"] = async_consume_hook

    AIOKafkaInstrumentor().instrument(**kwargs, **overrides)

Type aliases

Alias Values
KafkaSecurityProtocol "PLAINTEXT", "SASL_PLAINTEXT", "SSL", "SASL_SSL"
KafkaSaslMechanism "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"
KafkaAcks "0", "1", "all"
KafkaCompressionType "gzip", "snappy", "lz4", "zstd"
KafkaOffsetReset "earliest", "latest"