API Reference¶
Auto-generated from source using mkdocstrings.
Core¶
Factories and lifecycle¶
Create an AIOKafkaProducer from settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
ProducerSettingsProtocol
|
Kafka producer settings. |
required |
value_serializer
|
Callable[[Any], bytes] | None
|
Optional custom value serializer.
Defaults to |
None
|
Returns:
| Type | Description |
|---|---|
AIOKafkaProducer
|
Configured |
Source code in aiokafka_foundation_kit/producer/client.py
Manage a Kafka producer's lifecycle as an async context.
Auto-creates topics when auto_create_topics is true and topics
is non-empty, then starts the producer, yields it, and ensures it is stopped
even if the caller raises.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
ProducerSettingsProtocol
|
Kafka producer settings. |
required |
topics
|
Sequence[TopicConfigProtocol] | None
|
Optional topic configs to ensure exist before the producer starts. |
None
|
auto_create_topics
|
bool
|
When true and |
False
|
name
|
str
|
Client name used in log messages. Useful when running multiple producers. |
'producer'
|
on_started
|
Callable[[AIOKafkaProducer], Awaitable[None]] | None
|
Optional async hook invoked after a successful start. |
None
|
on_stopped
|
Callable[[AIOKafkaProducer], Awaitable[None]] | None
|
Optional async hook invoked after a successful stop. |
None
|
Source code in aiokafka_foundation_kit/producer/lifecycle.py
Create an AIOKafkaConsumer from settings.
settings.enable_auto_commit is forwarded as-is. Keep it False for
at-least-once delivery (the caller commits manually) and set it True
only for fire-and-forget or idempotent consumers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
ConsumerSettingsProtocol
|
Kafka consumer settings. |
required |
topics
|
list[str] | None
|
Optional list of topics to subscribe to. |
None
|
value_deserializer
|
Callable[[bytes | None], Any] | None
|
Optional custom value deserializer.
Defaults to |
None
|
Returns:
| Type | Description |
|---|---|
AIOKafkaConsumer
|
Configured |
Source code in aiokafka_foundation_kit/consumer/client.py
Manage a Kafka consumer's lifecycle as an async context.
Starts the consumer, yields it, and ensures it is stopped even if the caller raises.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
ConsumerSettingsProtocol
|
Kafka consumer settings. |
required |
topics
|
tuple[str, ...] | None
|
Optional tuple of topic names to subscribe to. |
None
|
name
|
str
|
Client name used in log messages. Useful when running multiple consumers. |
'consumer'
|
on_started
|
Callable[[AIOKafkaConsumer], Awaitable[None]] | None
|
Optional async hook invoked after a successful start. |
None
|
on_stopped
|
Callable[[AIOKafkaConsumer], Awaitable[None]] | None
|
Optional async hook invoked after a successful stop. |
None
|
Source code in aiokafka_foundation_kit/consumer/lifecycle.py
Topics¶
Ensure Kafka topics exist, creating them if necessary.
Topics are created one-by-one so that existing topics are reported individually (instead of treating the whole batch as "already exists" when any topic in the batch is new).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
Sequence[TopicConfigProtocol]
|
Sequence of topic configurations to ensure. |
required |
settings
|
KafkaSettingsProtocol
|
Kafka settings for connection. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If topic creation fails for reasons other than topic already existing. |
Source code in aiokafka_foundation_kit/topics/management.py
Health¶
Check Kafka cluster health by attempting to connect.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
KafkaSettingsProtocol
|
Kafka settings for connection. |
required |
timeout_seconds
|
float
|
Connection timeout in seconds. |
5.0
|
Returns:
| Type | Description |
|---|---|
bool
|
True if Kafka is healthy, False otherwise. |
Source code in aiokafka_foundation_kit/health/checks.py
Utilities¶
Build common Kafka client configuration from settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
KafkaSettingsProtocol
|
Kafka settings object conforming to KafkaSettingsProtocol. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary of Kafka client configuration parameters. |
Source code in aiokafka_foundation_kit/utils/config.py
Serialize value to bytes.
bytes are returned unchanged; everything else is JSON-encoded.
Source code in aiokafka_foundation_kit/utils/json.py
Deserialize JSON bytes to a Python object.
Returns None for tombstone messages (value is None).
Source code in aiokafka_foundation_kit/utils/json.py
Start client, run on_started, yield it, then stop it.
Stop errors are caught and logged so the caller's original exception (if any) is preserved.
Source code in aiokafka_foundation_kit/utils/lifecycle.py
Configuration protocols¶
Bases: KafkaSaslSettingsProtocol, KafkaSslSettingsProtocol, Protocol
Protocol for base Kafka settings (connection + SASL + SSL).
Source code in aiokafka_foundation_kit/config/kafka.py
Bases: KafkaSettingsProtocol
Protocol for Kafka consumer settings.
Source code in aiokafka_foundation_kit/config/consumer.py
Bases: KafkaSettingsProtocol
Protocol for Kafka producer settings.
Source code in aiokafka_foundation_kit/config/producer.py
Bases: Protocol
Protocol for topic configuration.
Source code in aiokafka_foundation_kit/config/topic.py
Bases: Protocol
Protocol for core Kafka connection fields only.
Source code in aiokafka_foundation_kit/config/kafka.py
Bases: KafkaConnectionSettingsProtocol, Protocol
Protocol for SASL credentials. Relevant when security_protocol is SASL_*.
Source code in aiokafka_foundation_kit/config/kafka.py
Bases: KafkaConnectionSettingsProtocol, Protocol
Protocol for TLS material. Relevant when security_protocol is SSL or SASL_SSL.
Source code in aiokafka_foundation_kit/config/kafka.py
Bases: ProducerSettingsProtocol
Producer settings that also carry topic auto-creation policy.
Used by lifecycle/DI helpers that need to know whether to call
:func:ensure_topics_async before starting the producer.
Source code in aiokafka_foundation_kit/config/producer.py
contrib.models¶
Requires pip install aiokafka-foundation-kit[models].
Bases: KafkaConnectionMixin, KafkaSaslMixin, KafkaSslMixin
Base Kafka configuration shared between producer and consumer.
Source code in aiokafka_foundation_kit/contrib/models/kafka.py
get_sasl_password()
¶
Plaintext SASL password for Kafka client config. Do NOT log.
Bases: BaseModel
Kafka transport connection parameters.
Source code in aiokafka_foundation_kit/contrib/models/kafka.py
Bases: BaseModel
SASL credentials. Required when security_protocol selects SASL_*.
Source code in aiokafka_foundation_kit/contrib/models/kafka.py
get_sasl_password()
¶
Plaintext SASL password for Kafka client config. Do NOT log.
Bases: BaseModel
TLS material. Required when security_protocol selects SSL/SASL_SSL.
Source code in aiokafka_foundation_kit/contrib/models/kafka.py
Bases: BaseKafkaSettings
Kafka consumer configuration.
enable_auto_commit defaults to False because at-least-once consumers
must commit offsets manually; flip it to True only for fire-and-forget
or idempotent consumers.
Source code in aiokafka_foundation_kit/contrib/models/consumer.py
get_sasl_password()
¶
Plaintext SASL password for Kafka client config. Do NOT log.
Bases: BaseKafkaSettings, KafkaAutoCreateMixin
Kafka producer configuration.
Source code in aiokafka_foundation_kit/contrib/models/producer.py
get_sasl_password()
¶
Plaintext SASL password for Kafka client config. Do NOT log.
Bases: BaseModel
Mixin with topic auto-creation settings (kept separate from transport config).
Source code in aiokafka_foundation_kit/contrib/models/producer.py
Bases: BaseModel
Shared Kafka infrastructure settings.
Source code in aiokafka_foundation_kit/contrib/models/infra.py
Bases: BaseModel
Physical parameters for a specific Kafka topic.
Source code in aiokafka_foundation_kit/contrib/models/infra.py
Return a real prefix or None when unset.
Vault/env sometimes stringify JSON null as the literal strings "None" or
"null", which would otherwise produce physical topic names like None.auth.*.
Source code in aiokafka_foundation_kit/contrib/models/infra.py
contrib.di (Dishka)¶
Requires pip install aiokafka-foundation-kit[dishka].
Bases: Provider
Dishka provider exposing an AIOKafkaProducer for APP scope.
Topics passed via the container are auto-created when
settings.auto_create_topics is enabled.
Source code in aiokafka_foundation_kit/contrib/di/producer.py
get_kafka_producer(kafka_settings, topics=None)
async
¶
Provide Kafka producer as a singleton for APP scope.
Source code in aiokafka_foundation_kit/contrib/di/producer.py
Bases: Provider
Dishka provider exposing an AIOKafkaConsumer for APP scope.
Source code in aiokafka_foundation_kit/contrib/di/consumer.py
get_kafka_consumer(kafka_settings, topics=None)
async
¶
Provide Kafka consumer as a singleton for APP scope.
Source code in aiokafka_foundation_kit/contrib/di/consumer.py
Bases: Provider
Provide resolved topic configs for producer and subscriptions for consumers.
Source code in aiokafka_foundation_kit/contrib/di/infra.py
get_consumer_subscription_topics(settings)
¶
Provide fully resolved physical topic names for consumer subscriptions.
Source code in aiokafka_foundation_kit/contrib/di/infra.py
get_topic_configs_for_catalog(settings)
¶
Provide physical topic configs for topic creation based on topic catalog.
Source code in aiokafka_foundation_kit/contrib/di/infra.py
Bases: KafkaInfraBaseSettingsProtocol, Protocol
Protocol for producer-side Kafka infrastructure settings.
Defines what producer providers need: a topic prefix and a catalog of physical topic configs to ensure exist on startup.
Source code in aiokafka_foundation_kit/contrib/di/infra.py
Bases: KafkaInfraBaseSettingsProtocol, Protocol
Protocol for consumer-side Kafka infrastructure settings.
Defines what consumer providers need: a topic prefix and a list of logical topic names to subscribe to.
Source code in aiokafka_foundation_kit/contrib/di/infra.py
Bases: Protocol
Physical parameters for one topic in a catalog.
Unlike :class:~aiokafka_foundation_kit.config.topic.TopicConfigProtocol,
this one omits name because the topic name comes from the catalog map
key in :class:KafkaProducerInfraSettingsProtocol.topic_catalog.
Source code in aiokafka_foundation_kit/contrib/di/infra.py
contrib.dependency_injector¶
Requires pip install aiokafka-foundation-kit[dependency-injector].
Bases: DeclarativeContainer
Container for Kafka producer dependencies.
Provides
producer: :class:AIOKafkaProducerwith automatic lifecycle.
Configuration
kafka_settings: Kafka producer settings (ProducerSettingsProtocol).topics: Optional list of topics to auto-create (Sequence[TopicConfigProtocol]).auto_create_topics: Whether to runensure_topics_asyncon startup.
Source code in aiokafka_foundation_kit/contrib/dependency_injector/producer.py
Bases: DeclarativeContainer
Container for Kafka consumer dependencies.
Provides
consumer: :class:AIOKafkaConsumerwith automatic lifecycle.
Configuration
kafka_settings: Kafka consumer settings (ConsumerSettingsProtocol).topics: Optional tuple of topic names to subscribe to.
Source code in aiokafka_foundation_kit/contrib/dependency_injector/consumer.py
contrib.telemetry¶
Requires pip install aiokafka-foundation-kit[telemetry].
Instrument aiokafka clients for OpenTelemetry tracing.
Automatically traces all aiokafka producer and consumer operations including message publishing, consumption, and offset commits.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tracer_provider
|
TracerProvider | None
|
Optional tracer provider. Defaults to the global provider. |
None
|
async_produce_hook
|
AsyncProduceHook | None
|
Async callback executed before sending a message.
Signature: |
None
|
async_consume_hook
|
AsyncConsumeHook | None
|
Async callback executed after consuming a message.
Signature: |
None
|
**kwargs
|
Any
|
Additional keyword arguments passed to |
{}
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If |
Examples:
Basic usage::
from aiokafka_foundation_kit.contrib.telemetry import instrument_aiokafka
instrument_aiokafka()
With custom tracer provider::
from opentelemetry.sdk.trace import TracerProvider
provider = TracerProvider()
instrument_aiokafka(tracer_provider=provider)
With hooks for span enrichment::
async def produce_hook(span, args, kwargs):
if span and span.is_recording():
span.set_attribute("app.user_id", "42")
async def consume_hook(span, record, args, kwargs):
if span and span.is_recording():
span.set_attribute("app.message_id", record.headers.get("id"))
instrument_aiokafka(
async_produce_hook=produce_hook,
async_consume_hook=consume_hook,
)
Source code in aiokafka_foundation_kit/contrib/telemetry/instrumentations.py
Type aliases¶
| Alias | Values |
|---|---|
KafkaSecurityProtocol |
"PLAINTEXT", "SASL_PLAINTEXT", "SSL", "SASL_SSL" |
KafkaSaslMechanism |
"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512" |
KafkaAcks |
"0", "1", "all" |
KafkaCompressionType |
"gzip", "snappy", "lz4", "zstd" |
KafkaOffsetReset |
"earliest", "latest" |