API Reference¶
Auto-generated from source using mkdocstrings.
omni_box¶
Unified Transactional Outbox & Inbox primitives for async Python services.
AckHandle
¶
Bases: ABC
Acknowledgement handle for broker commit/ack operations.
Source code in omni_box/core/protocols/consumers.py
AckStrategy
¶
Bases: StrEnum
Commit semantics for consumed broker messages.
Source code in omni_box/application/services/consume.py
BaseEvent
¶
Bases: BaseModel
Base generic event domain entity for Transactional Outbox and Inbox.
Provides core fields for event identification, status tracking, retry orchestration, and metadata. All events are frozen (immutable) to ensure data integrity during processing.
Source code in omni_box/core/models/entities.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 | |
attempts_left
property
¶
Number of attempts remaining before FAILED status.
can_retry
property
¶
Check if the event can be retried.
failure_count
property
¶
Number of attempts made when in FAILED status.
is_locked
property
¶
Check if the event is currently locked.
truncate_error(error, max_bytes, suffix)
staticmethod
¶
Truncate error message to database limit (in bytes).
Source code in omni_box/core/models/entities.py
validate_invariants()
¶
Enforce business invariants across all fields.
validate_scheduled_at(v, info)
classmethod
¶
Ensure scheduled_at is within reasonable range.
Source code in omni_box/core/models/entities.py
validate_timezone(v)
classmethod
¶
Ensure all datetimes are timezone-aware and normalize to UTC.
Source code in omni_box/core/models/entities.py
BaseEventHandler
¶
Bases: ABC
Base class for event handlers.
Subclasses should define topic at class level and use @event_handler decorator on methods to register them.
Example
class UserEventHandlers(BaseEventHandler): topic = "users"
@event_handler(UserEventType.CREATED)
async def on_user_created(self, event: InboxEvent, uow: AsyncUnitOfWork):
# Handle event
pass
Source code in omni_box/core/dispatch/base.py
BaseEventSchema
¶
Bases: BaseModel, ABC
Base class for all event payload schemas with self-registration and versioning support.
Source code in omni_box/core/models/schemas.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | |
from_payload(payload)
classmethod
¶
migrate_payload(event_type, payload, from_version, to_version)
classmethod
¶
Migrate payload to the target version if migrations are available.
If from_version is None, it assumes the earliest known version or skips. Currently supports single-step migrations.
Source code in omni_box/core/models/schemas.py
register_migration(event_type, from_version, to_version, func)
classmethod
¶
Register a migration function between two versions of an event type.
Source code in omni_box/core/models/schemas.py
resolve(event_type, version)
classmethod
¶
Resolve the best matching schema class based on event type and version prefix.
Raises:
| Type | Description |
|---|---|
SchemaResolutionError
|
if no schemas are registered for the event type, the version is unknown, or no prefix match exists. |
Source code in omni_box/core/models/schemas.py
schema_version()
abstractmethod
classmethod
¶
BatchProcessingResult
dataclass
¶
Results of batch event processing.
Source code in omni_box/core/services/results.py
CommitOffsetPolicy
¶
Bases: StrEnum
When to commit (ACK) the broker message in AT_LEAST_ONCE mode.
Source code in omni_box/application/services/consume.py
ConsumedMessage
dataclass
¶
Normalized consumed message passed to Inbox runner.
Source code in omni_box/core/protocols/consumers.py
EnvelopeData
dataclass
¶
Unwrapped message payload and extracted metadata from an envelope.
Source code in omni_box/core/protocols/consumers.py
EnvelopeEventConverter
¶
Full envelope: schema_version, event_type, aggregate info, payload, tracing.
Source code in omni_box/core/converters/event.py
EnvelopeParser
¶
Bases: Protocol
Protocol for unwrapping nested payloads and extracting metadata.
Source code in omni_box/core/protocols/consumers.py
EventAlreadyLockedError
¶
Bases: OmniBoxError
Raised when an event is already locked by another worker.
Source code in omni_box/core/exceptions.py
EventBatchProcessor
¶
Unified batch processor powered by pipeline architecture.
Source code in omni_box/core/services/processor.py
process_batch(worker_id, batch_size, shutdown_requested_func=None, **fetch_filters)
async
¶
Fetch and process a batch of events using the configured pipeline.
Source code in omni_box/core/services/processor.py
EventConcurrentUpdateError
¶
Bases: OmniBoxError
Raised when a repository update affects fewer rows than expected.
Indicates that some events might have been modified by another worker, unlocked by an admin, or already reached the target state.
Source code in omni_box/core/exceptions.py
EventConsumer
¶
Bases: ABC
Abstract event consumer.
Source code in omni_box/core/protocols/consumers.py
EventHandlerResult
dataclass
¶
Explicit outcome of a single event handler.
Source code in omni_box/core/services/results.py
EventHandlerStatus
¶
Bases: StrEnum
Semantic status of event processing result.
Source code in omni_box/core/services/results.py
COMPLETED = 'completed'
class-attribute
instance-attribute
¶
Handler successfully processed the event.
FAILED = 'failed'
class-attribute
instance-attribute
¶
Handler failed with an exception or explicit error.
RETRY = 'retry'
class-attribute
instance-attribute
¶
Handler failed but requested a retry.
SKIPPED = 'skipped'
class-attribute
instance-attribute
¶
Handler explicitly chose to skip this event (e.g. business logic).
STALE = 'stale'
class-attribute
instance-attribute
¶
Handler ignored the event because it is outdated (revision check).
EventLockedByAnotherWorkerError
¶
Bases: OmniBoxError
Raised when an operation is performed by a worker that doesn't own the lock.
Source code in omni_box/core/exceptions.py
EventNotLockedError
¶
Bases: OmniBoxError
Raised when an operation requires a lock but the event is not locked.
Source code in omni_box/core/exceptions.py
EventProcessorBuilder
¶
Fluent API for constructing highly-customizable event processors.
Source code in omni_box/core/pipeline/builder.py
__init__(repo)
¶
Initialize builder with a specific event repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repo
|
EventRepository[T]
|
The repository to fetch and commit events from/to. |
required |
Source code in omni_box/core/pipeline/builder.py
add_step(step)
¶
build()
¶
Finalize the processor configuration.
Source code in omni_box/core/pipeline/builder.py
with_commit_strategy(strategy)
¶
with_fetch_strategy(strategy)
¶
with_job_name(name)
¶
with_lease_ttl(ttl_seconds)
¶
Set the distributed lock lease TTL (in seconds).
EventPublisher
¶
Bases: ABC
Abstract publisher that accepts OutboxEvent.
The publish signature mirrors the handler signature used by
:class:~omni_box.core.pipeline.steps.handler.HandlerExecutionStep so that
publishers and inbox handlers are interchangeable from the pipeline's
perspective. Most broker implementations do not need the repository; it is
provided so a publisher can perform book-keeping in the same
transaction (e.g. write an audit row) when desired.
Source code in omni_box/core/protocols/publishers.py
publish(event, repo)
abstractmethod
async
¶
Publish event to the broker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
OutboxEvent
|
The outbox event to deliver. |
required |
repo
|
EventRepository[OutboxEvent]
|
Repository bound to the in-flight transaction. Pass-through value for the pipeline contract; broker implementations may ignore it. |
required |
Source code in omni_box/core/protocols/publishers.py
EventRouter
¶
Registry and dispatcher for event handlers.
Source code in omni_box/core/dispatch/registry.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | |
dispatch(event, topic, repo, **dependencies)
async
¶
Dispatch event to registered handler.
Source code in omni_box/core/dispatch/registry.py
register_class(handler_class, topic=None)
¶
Auto-register all @event_handler decorated methods from class.
register_handler(event_type, topic, handler, schema_version=None, handler_name=None)
¶
Register single event handler.
Source code in omni_box/core/dispatch/registry.py
register_instance(instance, topic=None)
¶
Auto-register all @event_handler decorated methods from instance.
Source code in omni_box/core/dispatch/registry.py
EventStatus
¶
Bases: StrEnum
General event status enumeration.
Attributes:
| Name | Type | Description |
|---|---|---|
PENDING |
Event is created and waiting to be processed/published. |
|
COMPLETED |
Event has been successfully processed/published. |
|
FAILED |
Event processing/publication failed after maximum retries. |
Source code in omni_box/core/models/enums.py
FetchFilters
¶
Bases: TypedDict
Common filters for event fetching.
Source code in omni_box/core/protocols/repository.py
FilteredFetchStrategy
¶
Strategy that applies fixed filters to all fetch operations.
Source code in omni_box/core/pipeline/strategies/fetch.py
fetch(repo, batch_size, worker_id, **filters)
async
¶
Fetch events with additional sources filtering.
Source code in omni_box/core/pipeline/strategies/fetch.py
InboxConsumeResult
dataclass
¶
Result of one inbox consume cycle.
Source code in omni_box/application/services/consume.py
InboxConsumerRunner
¶
High-level service for consuming broker messages into the Transactional Inbox.
Source code in omni_box/application/services/consume.py
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 | |
InboxEvent
¶
Bases: BaseEvent
Domain entity for events received from external brokers (Inbox).
Ensures exactly-once processing by tracking external message identifiers and consumer groups. Facilitates payload parsing into typed schemas using the global schema registry.
Source code in omni_box/core/models/entities.py
attempts_left
property
¶
Number of attempts remaining before FAILED status.
can_retry
property
¶
Check if the event can be retried.
failure_count
property
¶
Number of attempts made when in FAILED status.
is_locked
property
¶
Check if the event is currently locked.
processed_at
property
¶
Alias for completed_at.
get_context_value(key)
¶
get_payload_as(schema_cls=None)
¶
Resolve and parse the payload into a typed schema.
If schema_cls is provided, it parses with it. Otherwise, it uses the global discovery mechanism via BaseEventSchema.resolve() based on event_type and schema_version.
Source code in omni_box/core/models/entities.py
truncate_error(error, max_bytes, suffix)
staticmethod
¶
Truncate error message to database limit (in bytes).
Source code in omni_box/core/models/entities.py
validate_invariants()
¶
Enforce business invariants across all fields.
validate_scheduled_at(v, info)
classmethod
¶
Ensure scheduled_at is within reasonable range.
Source code in omni_box/core/models/entities.py
validate_timezone(v)
classmethod
¶
Ensure all datetimes are timezone-aware and normalize to UTC.
Source code in omni_box/core/models/entities.py
InboxEventRepository
¶
Bases: EventRepository[InboxEvent], Protocol
Specific protocol for Inbox events.
Source code in omni_box/core/protocols/repository.py
capabilities
property
¶
Report optional features supported by this repository instance.
create(event)
async
¶
exists(message_id, consumer_group)
async
¶
fetch_pending(limit, **filters)
async
¶
get_by_id(event_id)
async
¶
get_by_message_id(message_id, consumer_group)
async
¶
has_completed_sibling_for_inbox_key(message_id, consumer_group, exclude_event_id)
async
¶
True if another row with the same (message_id, consumer_group) is completed.
mark_completed(event_id, worker_id)
async
¶
mark_failed(event_id, error, worker_id, next_retry_at, count_as_attempt=True)
async
¶
Mark an event as FAILED or schedule a retry.
InboxHandler
¶
Bases: Protocol
Unified protocol for inbox event handlers.
**dependencies carries DI-resolved values injected by the runner
(e.g. service instances, settings). It is typed as Any because each
handler implementation chooses its own keyword set; static checking is
enforced at the handler signature, not on the protocol.
Source code in omni_box/core/protocols/handlers.py
InboxMetrics
¶
Bases: ProcessingMetrics, Protocol
Interface for inbox consumer metrics. All methods are sync.
Source code in omni_box/core/protocols/metrics.py
InboxPersistError
¶
Bases: OmniBoxError
Raised when the inbox consumer fails to persist a consumed message.
Typically means the underlying transaction was rolled back (DB outage, integrity violation, lock conflict). The broker offset is intentionally not committed so the message can be redelivered.
Source code in omni_box/core/exceptions.py
InvalidEventStateError
¶
Bases: OmniBoxError
Raised when an operation is performed on an event in an invalid state.
Source code in omni_box/core/exceptions.py
NullAckHandle
¶
OmniBoxDomainService
¶
Standardized way to create and manage events with sensible defaults.
Source code in omni_box/core/services/domain.py
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 | |
assert_locked_by(event, worker_id)
¶
Assert that the event is locked by the specified worker.
Source code in omni_box/core/services/domain.py
force_unlock_event(event, reason)
¶
Forcefully release the lock without owner verification.
Source code in omni_box/core/services/domain.py
is_lock_stale(event, now, stale_timeout_seconds=None)
¶
Check if the current lock is stale.
Source code in omni_box/core/services/domain.py
lock_event(event, worker_id, locked_at)
¶
Lock the event for processing.
Source code in omni_box/core/services/domain.py
mark_event_completed(event, completed_at, worker_id)
¶
Mark event as successfully completed.
Source code in omni_box/core/services/domain.py
mark_event_failed(event, error, worker_id, count_as_attempt=True, next_retry_at=None)
¶
Mark event as failed with error details.
Source code in omni_box/core/services/domain.py
refresh_event_lock(event, worker_id, now)
¶
Extend the current lock time.
Source code in omni_box/core/services/domain.py
unlock_event(event, worker_id)
¶
Release the lock on the event.
Source code in omni_box/core/services/domain.py
OmniBoxError
¶
OmniBoxMaintenanceService
¶
Core service for omni-box maintenance operations.
Source code in omni_box/core/services/maintenance.py
OutboxEvent
¶
Bases: BaseEvent
Domain entity for events being sent from the application (Outbox).
Adds aggregate context and routing information required for publishing events to external brokers.
Source code in omni_box/core/models/entities.py
attempts_left
property
¶
Number of attempts remaining before FAILED status.
can_retry
property
¶
Check if the event can be retried.
failure_count
property
¶
Number of attempts made when in FAILED status.
is_locked
property
¶
Check if the event is currently locked.
truncate_error(error, max_bytes, suffix)
staticmethod
¶
Truncate error message to database limit (in bytes).
Source code in omni_box/core/models/entities.py
validate_invariants()
¶
Enforce business invariants across all fields.
validate_scheduled_at(v, info)
classmethod
¶
Ensure scheduled_at is within reasonable range.
Source code in omni_box/core/models/entities.py
validate_timezone(v)
classmethod
¶
Ensure all datetimes are timezone-aware and normalize to UTC.
Source code in omni_box/core/models/entities.py
OutboxEventRepository
¶
Bases: EventRepository[OutboxEvent], Protocol
Specific protocol for Outbox events.
Source code in omni_box/core/protocols/repository.py
capabilities
property
¶
Report optional features supported by this repository instance.
create(event)
async
¶
fetch_pending(limit, **filters)
async
¶
get_by_id(event_id)
async
¶
mark_completed(event_id, worker_id)
async
¶
mark_failed(event_id, error, worker_id, next_retry_at, count_as_attempt=True)
async
¶
Mark an event as FAILED or schedule a retry.
OutboxMetrics
¶
Bases: ProcessingMetrics, Protocol
Interface for outbox processing metrics. All methods are sync.
Source code in omni_box/core/protocols/metrics.py
OutboxPublisher
¶
High-level service for publishing outbox events to a message broker.
Source code in omni_box/application/services/publish.py
RepositoryCapabilities
dataclass
¶
Flags for optional repository capabilities.
Source code in omni_box/core/protocols/repository.py
StorageConnectionError
¶
Bases: StorageError
Connection to storage failed.
StorageError
¶
Bases: OmniBoxError
Base exception for storage operations.
StorageIntegrityError
¶
Bases: StorageError
Data integrity constraint violation.
StorageTimeoutError
¶
Bases: StorageError
Storage operation timed out.
StorageTransactionError
¶
Bases: StorageError
Transaction operation failed.
UnsupportedCapabilityError
¶
Bases: OmniBoxError
Raised when a required repository capability is not available.
Source code in omni_box/core/exceptions.py
create_dispatching_processor(repo, router, *, filter_sources=None, skip_duplicate_siblings=True, process_timeout=DEFAULT_PROCESS_TIMEOUT_SECONDS, dependencies=None, metrics=None, dlq_storage=None, enable_otel=False, enable_circuit_breaker=False, circuit_breaker_failure_threshold=5, circuit_breaker_recovery_timeout=60, job_name='dispatching_processor', additional_steps_before=None, additional_steps_after=None)
¶
Preset factory for inbox processing with automated event routing.
Source code in omni_box/application/factories.py
create_inbox_processor(repo, handler, *, skip_duplicate_siblings=True, filter_sources=None, process_timeout=DEFAULT_PROCESS_TIMEOUT_SECONDS, metrics=None, dlq_storage=None, enable_otel=False, enable_circuit_breaker=False, circuit_breaker_failure_threshold=5, circuit_breaker_recovery_timeout=60, job_name='inbox_processor', additional_steps_before=None, additional_steps_after=None)
¶
Preset factory for common inbox processing scenarios.
Source code in omni_box/application/factories.py
create_outbox_processor(repo, publisher, *, publish_timeout=DEFAULT_PUBLISH_TIMEOUT_SECONDS, metrics=None, dlq_storage=None, enable_otel=False, enable_circuit_breaker=False, circuit_breaker_failure_threshold=5, circuit_breaker_recovery_timeout=60, job_name='outbox_processor', additional_steps_before=None, additional_steps_after=None)
¶
Preset factory for common outbox publishing scenarios.
Source code in omni_box/application/factories.py
event_handler(event_type, topic=None, schema_version=None)
¶
Mark method as event handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_type
|
DispatchName
|
Event type to handle ( |
required |
topic
|
DispatchName | None
|
Optional topic override ( |
None
|
schema_version
|
str | None
|
Optional schema version to match (e.g. "1.0.0") |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[F], F]
|
Decorated method with metadata attached |
Source code in omni_box/core/dispatch/decorators.py
handler_completed(status=EventHandlerStatus.COMPLETED)
¶
handler_retry(message, *, count_as_attempt=True, next_retry_at=None, status=EventHandlerStatus.RETRY)
¶
Schedule retry with an error message.
Source code in omni_box/core/services/results.py
handler_skipped(status=EventHandlerStatus.SKIPPED)
¶
Signal that the handler chose not to process this event.