User Guide¶
omni-box provides production-ready primitives for Transactional Outbox and Transactional Inbox patterns in async Python services.
Installation¶
Optional extras (see README.md for the full list): postgres, kafka, metrics, opentelemetry, settings, dishka.
Basic concepts¶
- Outbox — guarantees that an event reaches the broker iff the originating DB transaction committed. The publisher is a separate background job that reads pending rows.
- Inbox — guarantees that each incoming broker message is handled at most once per
(message_id, consumer_group)thanks to the inbox unique index, regardless of broker redelivery.
Transactional Outbox¶
1. Define your payload schema (optional)¶
2. Persist events inside your business transaction¶
from omni_box import OmniBoxDomainService
domain = OmniBoxDomainService()
async def create_user(uow, email: str):
async with uow.transaction() as tx:
user = await tx.users.create(email=email)
event = domain.create_outbox_event(
event_type="user.created",
topic="users",
partition_key=str(user.id),
payload={"user_id": str(user.id), "email": email},
aggregate_type="user",
aggregate_id=user.id,
)
await tx.outbox.create(event)
tx.outbox is your service-owned repository — typically PostgresOutboxRepository bound to the same AsyncSession as the rest of your UoW.
3. Run the publisher¶
import asyncio
from omni_box import OutboxPublisher
from omni_box.core.converters import EnvelopeEventConverter
from omni_box.infra.brokers.kafka import KafkaEventPublisher
broker = KafkaEventPublisher(
producer=kafka_producer, # caller-owned AIOKafkaProducer
converter=EnvelopeEventConverter(),
)
publisher = OutboxPublisher(repo=outbox_repo, broker=broker)
while not shutdown:
result = await publisher.publish_batch(worker_id="publisher-1", batch_size=100)
if not result.processed_event_ids:
await asyncio.sleep(1.0)
Transactional Inbox¶
Option A — drive consumption with InboxConsumerRunner¶
This is the typical "one Kafka message per transaction" loop with configurable commit semantics.
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from omni_box import AckStrategy, InboxConsumerRunner
from omni_box.core.protocols import InboxEventRepository
from omni_box.core.protocols.transaction import InboxTransactionProviderProtocol
class InboxTxProvider(InboxTransactionProviderProtocol):
def __init__(self, session_factory, repo_factory) -> None:
self._session_factory = session_factory
self._repo_factory = repo_factory
@asynccontextmanager
async def transaction(self) -> AsyncIterator[InboxEventRepository]:
async with self._session_factory() as session, session.begin():
yield self._repo_factory(session)
runner = InboxConsumerRunner(
consumer=kafka_consumer_adapter,
transaction_provider=InboxTxProvider(session_factory, lambda s: PostgresInboxRepository(s, model_class=InboxEventDB)),
handler=handle_inbox_event, # optional in-tx handler
worker_id="worker-1",
consumer_group="identity-service",
ack_strategy=AckStrategy.EXACTLY_ONCE_INBOX,
)
await runner.start()
try:
await runner.run_forever()
finally:
await runner.stop()
Option B — batch processing already-stored inbox rows¶
Use create_inbox_processor when you want to ingest messages quickly (commit on persist) and process them in a separate workload.
from omni_box import InboxEvent, create_inbox_processor
from omni_box.core.protocols import InboxEventRepository
async def my_handler(event: InboxEvent, repo: InboxEventRepository):
print(f"Processing {event.event_type} ({event.message_id})")
processor = create_inbox_processor(
repo=inbox_repo,
handler=my_handler,
job_name="my_inbox_job",
)
await processor.process_batch(worker_id="worker-1", batch_size=50)
Option C — automated event routing¶
from omni_box import EventRouter, create_dispatching_processor, event_handler, InboxEvent
from omni_box.core.protocols import InboxEventRepository
router = EventRouter()
@event_handler(event_type="user.created", source="users")
async def handle_user_created(event: InboxEvent, repo: InboxEventRepository, uow):
...
processor = create_dispatching_processor(
repo=inbox_repo,
router=router,
dependencies={"uow": uow},
)
Customising the pipeline¶
Need full control? Build the processor yourself.
from omni_box import EventProcessorBuilder
from omni_box.core.pipeline.steps import (
CircuitBreakerStep,
DLQStep,
HandlerExecutionStep,
OpenTelemetryStep,
SiblingDeduplicationStep,
)
from omni_box.core.pipeline.strategies import (
BulkCommitStrategy,
DistributedLockingFetchStrategy,
)
builder = (
EventProcessorBuilder(inbox_repo)
.add_step(OpenTelemetryStep(service_name="my-service"))
.add_step(CircuitBreakerStep(failure_threshold=5, recovery_timeout_seconds=60))
.add_step(DLQStep(dlq_storage))
.add_step(SiblingDeduplicationStep())
.add_step(HandlerExecutionStep(my_handler, timeout=30))
.with_fetch_strategy(DistributedLockingFetchStrategy())
.with_commit_strategy(BulkCommitStrategy())
)
processor = builder.build()
The builder auto-picks DistributedLockingFetchStrategy + BulkCommitStrategy when the repository advertises matching capabilities, so the with_* calls above are usually optional.
Outbox payload envelopes¶
OutboxPublisher delegates the body shape to a converter:
| Converter | Body |
|---|---|
RawEventConverter |
Raw event.payload. |
SchemaVersionedConverter |
{"schema_version": …, "payload": …}. |
EnvelopeEventConverter |
Full envelope: payload + tracing identifiers. |
from omni_box import EnvelopeEventConverter
from omni_box.infra.brokers.kafka import KafkaEventPublisher
broker = KafkaEventPublisher(producer=p, converter=EnvelopeEventConverter())
Observability¶
Structured logging¶
omni-box emits structured events through structlog. Configure renderers in your application bootstrap — for example:
import structlog
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
]
)
omni-box does not ship a PII sanitiser; redact sensitive fields in your own structlog processor if needed.
Metrics¶
Pass metrics= to any factory or to MetricsStep. Either implement InboxMetrics / OutboxMetrics yourself or use the Prometheus adapters from omni_box.infra.metrics (extra: metrics).
OpenTelemetry¶
Add OpenTelemetryStep(service_name="my-service") to your pipeline (extra: opentelemetry).
Dead Letter Queue¶
from omni_box.core.pipeline.steps import DLQStep, DLQStorage
class MyDLQStorage(DLQStorage):
async def move_to_dlq(self, event, error: str) -> None:
await db.save_to_dlq(event, error)
builder.add_step(DLQStep(MyDLQStorage()))
DLQStep is best-effort and non-transactional. Transient failures (count_as_attempt=False) are never routed to DLQ.
Circuit breaker¶
from omni_box.core.pipeline.steps import CircuitBreakerStep
builder.add_step(CircuitBreakerStep(failure_threshold=5, recovery_timeout_seconds=60))
The breaker state is held in-process — it does not survive restarts and is not shared between replicas. For multi-worker deployments add an external coordination layer (e.g. Redis-backed counters) on top of this step.
Maintenance¶
Run periodically against the same repository:
from omni_box import OmniBoxMaintenanceService
m = OmniBoxMaintenanceService(repo=outbox_repo)
await m.release_stale_locks(stale_timeout_seconds=300)
await m.cleanup_old_events(retention_days=14)
Both methods require the repository to implement SupportsRetentionPolicies. PostgresOutboxRepository and PostgresInboxRepository do.