Skip to content

API Reference

Auto-generated from source using mkdocstrings.

omni_box

Unified Transactional Outbox & Inbox primitives for async Python services.

AckHandle

Bases: ABC

Acknowledgement handle for broker commit/ack operations.

Source code in omni_box/core/protocols/consumers.py
class AckHandle(ABC):
    """Acknowledgement handle for broker commit/ack operations."""

    @abstractmethod
    async def commit(self) -> None:
        """Commit/acknowledge the consumed message."""
        ...

commit() abstractmethod async

Commit/acknowledge the consumed message.

Source code in omni_box/core/protocols/consumers.py
@abstractmethod
async def commit(self) -> None:
    """Commit/acknowledge the consumed message."""
    ...

AckStrategy

Bases: StrEnum

Commit semantics for consumed broker messages.

Source code in omni_box/application/services/consume.py
class AckStrategy(StrEnum):
    """Commit semantics for consumed broker messages."""

    AT_MOST_ONCE = "at_most_once"
    AT_LEAST_ONCE = "at_least_once"
    EXACTLY_ONCE_INBOX = "exactly_once_inbox"

BaseEvent

Bases: BaseModel

Base generic event domain entity for Transactional Outbox and Inbox.

Provides core fields for event identification, status tracking, retry orchestration, and metadata. All events are frozen (immutable) to ensure data integrity during processing.

Source code in omni_box/core/models/entities.py
class BaseEvent(BaseModel):
    """Base generic event domain entity for Transactional Outbox and Inbox.

    Provides core fields for event identification, status tracking, retry
    orchestration, and metadata. All events are frozen (immutable) to
    ensure data integrity during processing.
    """

    model_config = ConfigDict(frozen=True)

    # Identifiers
    id: UUID = Field(default_factory=uuid4)
    event_type: StrippedNonEmptyStr = Field(max_length=EVENT_TYPE_MAX_LENGTH)

    # Payload & Metadata
    payload: dict[str, JsonValue] = Field(..., description="Event payload as a JSON-compatible dictionary.")
    headers: dict[str, str] | None = Field(default=None, description="Optional message headers.")
    trace_id: StrippedNonEmptyStr | None = Field(default=None, max_length=TRACE_ID_MAX_LENGTH)
    idempotency_key: StrippedNonEmptyStr | None = Field(default=None, max_length=IDEMPOTENCY_KEY_MAX_LENGTH)
    correlation_id: StrippedNonEmptyStr | None = Field(default=None, max_length=CORRELATION_ID_MAX_LENGTH)
    causation_id: StrippedNonEmptyStr | None = Field(default=None, max_length=CAUSATION_ID_MAX_LENGTH)
    schema_version: StrippedNonEmptyStr | None = Field(default=None, max_length=SCHEMA_VERSION_MAX_LENGTH)

    # Status & Retries
    status: EventStatus = EventStatus.PENDING
    attempts_made: int = Field(
        default=0,
        ge=0,
        description="Number of failed processing attempts recorded (does not include a successful attempt).",
    )
    max_attempts: int = Field(
        default=DEFAULT_MAX_ATTEMPTS,
        ge=1,
        description=(
            "Maximum number of processing failures allowed. "
            "Event transitions to FAILED when attempts_made == max_attempts."
        ),
    )
    last_error: str | None = Field(default=None, max_length=LAST_ERROR_MAX_LENGTH)

    # Timing
    created_at: datetime = Field(default_factory=utc_now)
    scheduled_at: datetime = Field(default_factory=utc_now)
    completed_at: datetime | None = Field(default=None)
    locked_at: datetime | None = Field(default=None)
    locked_by: StrippedNonEmptyStr | None = Field(default=None, max_length=WORKER_ID_MAX_LENGTH)

    @field_validator("created_at", "scheduled_at", "completed_at", "locked_at", mode="after")
    @classmethod
    def validate_timezone(cls, v: datetime | None) -> datetime | None:
        """Ensure all datetimes are timezone-aware and normalize to UTC."""
        if v is None:
            return v

        if v.tzinfo is None:
            raise ValueError("Datetime must be timezone-aware")

        # Normalize any timezone to UTC
        return v.astimezone(UTC)

    @field_validator("scheduled_at")
    @classmethod
    def validate_scheduled_at(cls, v: datetime, info: ValidationInfo) -> datetime:
        """Ensure scheduled_at is within reasonable range."""
        created_at = info.data.get("created_at")
        if created_at is not None:
            skew_limit = DEFAULT_SCHEDULE_AT_SKEW_SECONDS
            max_future_seconds = DEFAULT_SCHEDULE_AT_MAX_FUTURE_SECONDS
            if info.context:
                skew_limit = info.context.get("scheduled_at_skew_seconds", DEFAULT_SCHEDULE_AT_SKEW_SECONDS)
                max_future_seconds = info.context.get(
                    "scheduled_at_max_future_seconds", DEFAULT_SCHEDULE_AT_MAX_FUTURE_SECONDS
                )

            if skew_limit < 0:
                raise ValueError(f"scheduled_at_skew_seconds must be >= 0, got {skew_limit}")
            if max_future_seconds < 1:
                raise ValueError(f"scheduled_at_max_future_seconds must be >= 1, got {max_future_seconds}")

            if v < created_at and (created_at - v).total_seconds() > skew_limit:
                raise ValueError(f"scheduled_at {v} cannot be significantly before created_at {created_at}")

            if (v - created_at).total_seconds() > max_future_seconds:
                raise ValueError(
                    f"scheduled_at {v} is too far in the future (max {max_future_seconds} seconds from creation)"
                )

        return v

    @field_validator("payload")
    @classmethod
    def validate_payload_content(cls, v: dict[str, JsonValue], info: ValidationInfo) -> dict[str, JsonValue]:
        return validate_payload(v, info)

    @field_validator("headers")
    @classmethod
    def validate_headers_content(cls, v: dict[str, str] | None, info: ValidationInfo) -> dict[str, str] | None:
        return validate_headers(v, info)

    @model_validator(mode="after")
    def _pydantic_validate_invariants(self) -> Self:
        """Internal Pydantic model validator hook."""
        return self.validate_invariants()

    def validate_invariants(self) -> Self:
        """Enforce business invariants across all fields."""
        self._validate_status_timing()
        self._validate_attempts()
        self._validate_lock()
        return self

    def _validate_status_timing(self) -> None:
        """Validate status vs completed timing."""
        if self.status == EventStatus.COMPLETED:
            if self.completed_at is None:
                raise ValueError("completed_at must be set when status is COMPLETED")

            # Allow small clock skew (e.g. 1 second)
            skew_limit = 1.0
            if (
                self.completed_at < self.created_at
                and (self.created_at - self.completed_at).total_seconds() > skew_limit
            ):
                raise ValueError(f"completed_at {self.completed_at} cannot be before created_at {self.created_at}")
            if (
                self.completed_at < self.scheduled_at
                and (self.scheduled_at - self.completed_at).total_seconds() > skew_limit
            ):
                raise ValueError(f"completed_at {self.completed_at} cannot be before scheduled_at {self.scheduled_at}")
        elif self.completed_at is not None:
            raise ValueError(f"completed_at must be None when status is {self.status}")

    def _validate_attempts(self) -> None:
        """Validate attempts consistency."""
        if self.attempts_made > self.max_attempts:
            raise ValueError(f"attempts_made ({self.attempts_made}) cannot exceed max_attempts ({self.max_attempts})")

        if self.status == EventStatus.FAILED and self.attempts_made != self.max_attempts:
            raise ValueError(
                f"status is FAILED, but attempts_made ({self.attempts_made}) "
                f"must equal max_attempts ({self.max_attempts})"
            )
        if self.status == EventStatus.PENDING and self.attempts_made >= self.max_attempts:
            raise ValueError(
                f"status is PENDING, but attempts_made ({self.attempts_made}) "
                f"has reached max_attempts ({self.max_attempts})"
            )

    def _validate_lock(self) -> None:
        """Validate lock consistency."""
        if (self.locked_at is not None) != (self.locked_by is not None):
            raise ValueError("locked_at and locked_by must be both set or both None")

        # Locked events must always be PENDING.
        if self.is_locked and self.status != EventStatus.PENDING:
            raise ValueError(f"locked event must be in PENDING status, got {self.status}")

    @property
    def is_locked(self) -> bool:
        """Check if the event is currently locked."""
        return self.locked_at is not None

    @property
    def can_retry(self) -> bool:
        """Check if the event can be retried."""
        return self.status == EventStatus.PENDING and self.attempts_made < self.max_attempts

    @property
    def attempts_left(self) -> int:
        """Number of attempts remaining before FAILED status."""
        if self.status != EventStatus.PENDING:
            return 0
        return self.max_attempts - self.attempts_made

    @property
    def failure_count(self) -> int:
        """Number of attempts made when in FAILED status."""
        return self.attempts_made if self.status == EventStatus.FAILED else 0

    @staticmethod
    def truncate_error(error: str, max_bytes: int, suffix: str) -> str:
        """Truncate error message to database limit (in bytes)."""
        if max_bytes < 1:
            raise ValueError(f"max_bytes must be >= 1, got {max_bytes}")

        stripped = error.strip()
        if not stripped:
            raise ValueError("Error message cannot be empty or whitespace")

        encoded = stripped.encode("utf-8")
        if len(encoded) <= max_bytes:
            return stripped

        suffix_encoded = suffix.encode("utf-8")
        if len(suffix_encoded) >= max_bytes:
            return encoded[:max_bytes].decode("utf-8", errors="ignore")

        keep_bytes = max_bytes - len(suffix_encoded)
        return encoded[:keep_bytes].decode("utf-8", errors="ignore") + suffix

attempts_left property

Number of attempts remaining before FAILED status.

can_retry property

Check if the event can be retried.

failure_count property

Number of attempts made when in FAILED status.

is_locked property

Check if the event is currently locked.

truncate_error(error, max_bytes, suffix) staticmethod

Truncate error message to database limit (in bytes).

Source code in omni_box/core/models/entities.py
@staticmethod
def truncate_error(error: str, max_bytes: int, suffix: str) -> str:
    """Truncate error message to database limit (in bytes)."""
    if max_bytes < 1:
        raise ValueError(f"max_bytes must be >= 1, got {max_bytes}")

    stripped = error.strip()
    if not stripped:
        raise ValueError("Error message cannot be empty or whitespace")

    encoded = stripped.encode("utf-8")
    if len(encoded) <= max_bytes:
        return stripped

    suffix_encoded = suffix.encode("utf-8")
    if len(suffix_encoded) >= max_bytes:
        return encoded[:max_bytes].decode("utf-8", errors="ignore")

    keep_bytes = max_bytes - len(suffix_encoded)
    return encoded[:keep_bytes].decode("utf-8", errors="ignore") + suffix

validate_invariants()

Enforce business invariants across all fields.

Source code in omni_box/core/models/entities.py
def validate_invariants(self) -> Self:
    """Enforce business invariants across all fields."""
    self._validate_status_timing()
    self._validate_attempts()
    self._validate_lock()
    return self

validate_scheduled_at(v, info) classmethod

Ensure scheduled_at is within reasonable range.

Source code in omni_box/core/models/entities.py
@field_validator("scheduled_at")
@classmethod
def validate_scheduled_at(cls, v: datetime, info: ValidationInfo) -> datetime:
    """Ensure scheduled_at is within reasonable range."""
    created_at = info.data.get("created_at")
    if created_at is not None:
        skew_limit = DEFAULT_SCHEDULE_AT_SKEW_SECONDS
        max_future_seconds = DEFAULT_SCHEDULE_AT_MAX_FUTURE_SECONDS
        if info.context:
            skew_limit = info.context.get("scheduled_at_skew_seconds", DEFAULT_SCHEDULE_AT_SKEW_SECONDS)
            max_future_seconds = info.context.get(
                "scheduled_at_max_future_seconds", DEFAULT_SCHEDULE_AT_MAX_FUTURE_SECONDS
            )

        if skew_limit < 0:
            raise ValueError(f"scheduled_at_skew_seconds must be >= 0, got {skew_limit}")
        if max_future_seconds < 1:
            raise ValueError(f"scheduled_at_max_future_seconds must be >= 1, got {max_future_seconds}")

        if v < created_at and (created_at - v).total_seconds() > skew_limit:
            raise ValueError(f"scheduled_at {v} cannot be significantly before created_at {created_at}")

        if (v - created_at).total_seconds() > max_future_seconds:
            raise ValueError(
                f"scheduled_at {v} is too far in the future (max {max_future_seconds} seconds from creation)"
            )

    return v

validate_timezone(v) classmethod

Ensure all datetimes are timezone-aware and normalize to UTC.

Source code in omni_box/core/models/entities.py
@field_validator("created_at", "scheduled_at", "completed_at", "locked_at", mode="after")
@classmethod
def validate_timezone(cls, v: datetime | None) -> datetime | None:
    """Ensure all datetimes are timezone-aware and normalize to UTC."""
    if v is None:
        return v

    if v.tzinfo is None:
        raise ValueError("Datetime must be timezone-aware")

    # Normalize any timezone to UTC
    return v.astimezone(UTC)

BaseEventHandler

Bases: ABC

Base class for event handlers.

Subclasses should define topic at class level and use @event_handler decorator on methods to register them.

Example

class UserEventHandlers(BaseEventHandler): topic = "users"

@event_handler(UserEventType.CREATED)
async def on_user_created(self, event: InboxEvent, uow: AsyncUnitOfWork):
    # Handle event
    pass
Source code in omni_box/core/dispatch/base.py
class BaseEventHandler(ABC):
    """Base class for event handlers.

    Subclasses should define topic at class level and use @event_handler
    decorator on methods to register them.

    Example:
        class UserEventHandlers(BaseEventHandler):
            topic = "users"

            @event_handler(UserEventType.CREATED)
            async def on_user_created(self, event: InboxEvent, uow: AsyncUnitOfWork):
                # Handle event
                pass
    """

    topic: ClassVar[str | StrEnum]  # Must be defined in subclass

BaseEventSchema

Bases: BaseModel, ABC

Base class for all event payload schemas with self-registration and versioning support.

Source code in omni_box/core/models/schemas.py
class BaseEventSchema(BaseModel, ABC):
    """Base class for all event payload schemas with self-registration and versioning support."""

    _registry: ClassVar[dict[str, dict[str, type["BaseEventSchema"]]]] = {}
    _migrations: ClassVar[dict[str, dict[tuple[str, str], Callable[[dict[str, Any]], dict[str, Any]]]]] = {}

    @classmethod
    def __init_subclass__(cls, event_type: str | None = None, version_prefix: str | None = None, **kwargs: Any) -> None:
        super().__init_subclass__(**kwargs)
        if event_type:
            if event_type not in cls._registry:
                cls._registry[event_type] = {}

            # If a version_prefix is provided (e.g. "1" or "1.2"), use it as the registration key.
            # Otherwise, use the full version from schema_version().
            key = version_prefix or cls.schema_version()
            cls._registry[event_type][key] = cls

    @classmethod
    @abstractmethod
    def schema_version(cls) -> str:
        """Return full Semantic Version (e.g., '1.0.0')."""

    @classmethod
    def register_migration(
        cls,
        event_type: str,
        from_version: str,
        to_version: str,
        func: Callable[[dict[str, Any]], dict[str, Any]],
    ) -> None:
        """Register a migration function between two versions of an event type."""
        if event_type not in cls._migrations:
            cls._migrations[event_type] = {}
        cls._migrations[event_type][(from_version, to_version)] = func

    @classmethod
    def migrate_payload(
        cls,
        event_type: str,
        payload: dict[str, Any],
        from_version: str | None,
        to_version: str,
    ) -> dict[str, Any]:
        """Migrate payload to the target version if migrations are available.

        If from_version is None, it assumes the earliest known version or skips.
        Currently supports single-step migrations.
        """
        if from_version == to_version:
            return payload

        if not from_version:
            return payload

        type_migrations = cls._migrations.get(event_type)
        if not type_migrations:
            return payload

        # Simple path: try to find a direct migration
        func = type_migrations.get((from_version, to_version))
        if func:
            return func(payload)

        # Multi-step migration pathfinding using BFS
        queue: deque[tuple[str, dict[str, Any]]] = deque([(from_version, payload)])
        visited: set[str] = {from_version}

        while queue:
            current_v, current_p = queue.popleft()

            for (v_start, v_end), migrate_func in type_migrations.items():
                if v_start == current_v and v_end not in visited:
                    new_p = migrate_func(current_p)
                    if v_end == to_version:
                        return new_p

                    visited.add(v_end)
                    queue.append((v_end, new_p))

        return payload

    @classmethod
    def from_payload(cls, payload: dict[str, Any]) -> Self:
        """Create schema instance from raw payload dictionary."""
        return cls.model_validate(payload)

    @classmethod
    def resolve(cls, event_type: str, version: str | None) -> type["BaseEventSchema"]:
        """Resolve the best matching schema class based on event type and version prefix.

        Raises:
            SchemaResolutionError: if no schemas are registered for the event
                type, the version is unknown, or no prefix match exists.
        """
        type_map = cls._registry.get(event_type)
        if not type_map:
            raise SchemaResolutionError(f"No schemas registered for event type: {event_type}")

        if version is None:
            raise SchemaResolutionError(f"No schema_version provided for event type: {event_type}")

        # Longest Prefix Match for SemVer (e.g., for "1.2.3" try "1.2.3", then "1.2", then "1")
        parts = version.split(".")
        prefixes = [".".join(parts[:i]) for i in range(len(parts), 0, -1)]

        for prefix in prefixes:
            if prefix in type_map:
                return type_map[prefix]

        raise SchemaResolutionError(f"Unsupported version {version} for event type {event_type}")

    def to_payload(self) -> dict[str, object]:
        """Convert to dict suitable for OutboxEvent.payload."""
        return self.model_dump(mode="json")

from_payload(payload) classmethod

Create schema instance from raw payload dictionary.

Source code in omni_box/core/models/schemas.py
@classmethod
def from_payload(cls, payload: dict[str, Any]) -> Self:
    """Create schema instance from raw payload dictionary."""
    return cls.model_validate(payload)

migrate_payload(event_type, payload, from_version, to_version) classmethod

Migrate payload to the target version if migrations are available.

If from_version is None, it assumes the earliest known version or skips. Currently supports single-step migrations.

Source code in omni_box/core/models/schemas.py
@classmethod
def migrate_payload(
    cls,
    event_type: str,
    payload: dict[str, Any],
    from_version: str | None,
    to_version: str,
) -> dict[str, Any]:
    """Migrate payload to the target version if migrations are available.

    If from_version is None, it assumes the earliest known version or skips.
    Currently supports single-step migrations.
    """
    if from_version == to_version:
        return payload

    if not from_version:
        return payload

    type_migrations = cls._migrations.get(event_type)
    if not type_migrations:
        return payload

    # Simple path: try to find a direct migration
    func = type_migrations.get((from_version, to_version))
    if func:
        return func(payload)

    # Multi-step migration pathfinding using BFS
    queue: deque[tuple[str, dict[str, Any]]] = deque([(from_version, payload)])
    visited: set[str] = {from_version}

    while queue:
        current_v, current_p = queue.popleft()

        for (v_start, v_end), migrate_func in type_migrations.items():
            if v_start == current_v and v_end not in visited:
                new_p = migrate_func(current_p)
                if v_end == to_version:
                    return new_p

                visited.add(v_end)
                queue.append((v_end, new_p))

    return payload

register_migration(event_type, from_version, to_version, func) classmethod

Register a migration function between two versions of an event type.

Source code in omni_box/core/models/schemas.py
@classmethod
def register_migration(
    cls,
    event_type: str,
    from_version: str,
    to_version: str,
    func: Callable[[dict[str, Any]], dict[str, Any]],
) -> None:
    """Register a migration function between two versions of an event type."""
    if event_type not in cls._migrations:
        cls._migrations[event_type] = {}
    cls._migrations[event_type][(from_version, to_version)] = func

resolve(event_type, version) classmethod

Resolve the best matching schema class based on event type and version prefix.

Raises:

Type Description
SchemaResolutionError

if no schemas are registered for the event type, the version is unknown, or no prefix match exists.

Source code in omni_box/core/models/schemas.py
@classmethod
def resolve(cls, event_type: str, version: str | None) -> type["BaseEventSchema"]:
    """Resolve the best matching schema class based on event type and version prefix.

    Raises:
        SchemaResolutionError: if no schemas are registered for the event
            type, the version is unknown, or no prefix match exists.
    """
    type_map = cls._registry.get(event_type)
    if not type_map:
        raise SchemaResolutionError(f"No schemas registered for event type: {event_type}")

    if version is None:
        raise SchemaResolutionError(f"No schema_version provided for event type: {event_type}")

    # Longest Prefix Match for SemVer (e.g., for "1.2.3" try "1.2.3", then "1.2", then "1")
    parts = version.split(".")
    prefixes = [".".join(parts[:i]) for i in range(len(parts), 0, -1)]

    for prefix in prefixes:
        if prefix in type_map:
            return type_map[prefix]

    raise SchemaResolutionError(f"Unsupported version {version} for event type {event_type}")

schema_version() abstractmethod classmethod

Return full Semantic Version (e.g., '1.0.0').

Source code in omni_box/core/models/schemas.py
@classmethod
@abstractmethod
def schema_version(cls) -> str:
    """Return full Semantic Version (e.g., '1.0.0')."""

to_payload()

Convert to dict suitable for OutboxEvent.payload.

Source code in omni_box/core/models/schemas.py
def to_payload(self) -> dict[str, object]:
    """Convert to dict suitable for OutboxEvent.payload."""
    return self.model_dump(mode="json")

BatchProcessingResult dataclass

Results of batch event processing.

Source code in omni_box/core/services/results.py
@dataclass(frozen=True, slots=True)
class BatchProcessingResult[T: BaseEvent]:
    """Results of batch event processing."""

    processed_event_ids: list[UUID] = field(default_factory=list)
    failed_counted: list[EventFailureUpdate] = field(default_factory=list)
    failed_noncounted: list[EventFailureUpdate] = field(default_factory=list)
    remaining_event_ids: set[UUID] = field(default_factory=set)
    commit_failed: bool = False

CommitOffsetPolicy

Bases: StrEnum

When to commit (ACK) the broker message in AT_LEAST_ONCE mode.

Source code in omni_box/application/services/consume.py
class CommitOffsetPolicy(StrEnum):
    """When to commit (ACK) the broker message in AT_LEAST_ONCE mode."""

    ON_PERSIST = "on_persist"
    ON_SUCCESS = "on_success"

ConsumedMessage dataclass

Normalized consumed message passed to Inbox runner.

Source code in omni_box/core/protocols/consumers.py
@dataclass(frozen=True, slots=True)
class ConsumedMessage:
    """Normalized consumed message passed to Inbox runner."""

    message_id: str
    source: str
    event_type: str
    payload: dict[str, JsonValue]
    headers: dict[str, str] | None = None
    trace_id: str | None = None
    correlation_id: str | None = None
    causation_id: str | None = None
    schema_version: str | None = None
    ack_handle: AckHandle | None = None
    raw_message: object = None

EnvelopeData dataclass

Unwrapped message payload and extracted metadata from an envelope.

Source code in omni_box/core/protocols/consumers.py
@dataclass(frozen=True, slots=True)
class EnvelopeData:
    """Unwrapped message payload and extracted metadata from an envelope."""

    payload: dict[str, JsonValue]
    trace_id: str | None = None
    correlation_id: str | None = None
    causation_id: str | None = None
    schema_version: str | None = None

EnvelopeEventConverter

Full envelope: schema_version, event_type, aggregate info, payload, tracing.

Source code in omni_box/core/converters/event.py
class EnvelopeEventConverter:
    """Full envelope: schema_version, event_type, aggregate info, payload, tracing."""

    def __init__(self, default_schema_version: str = "1.0.0") -> None:
        self._default_schema_version = default_schema_version

    def convert(self, event: OutboxEvent) -> dict[str, JsonValue]:
        result: dict[str, JsonValue] = {
            "schema_version": event.schema_version or self._default_schema_version,
            "event_type": event.event_type,
            "aggregate_type": event.aggregate_type,
            "aggregate_id": str(event.aggregate_id),
            "payload": event.payload,
            "timestamp": event.created_at.isoformat(),
        }
        if event.trace_id is not None:
            result["trace_id"] = event.trace_id
        if event.correlation_id is not None:
            result["correlation_id"] = event.correlation_id
        if event.causation_id is not None:
            result["causation_id"] = event.causation_id
        return result

EnvelopeParser

Bases: Protocol

Protocol for unwrapping nested payloads and extracting metadata.

Source code in omni_box/core/protocols/consumers.py
class EnvelopeParser(Protocol):
    """Protocol for unwrapping nested payloads and extracting metadata."""

    def parse(self, raw_payload: dict[str, JsonValue], headers: dict[str, str] | None) -> EnvelopeData:
        """Unwrap payload and extract metadata."""
        ...

parse(raw_payload, headers)

Unwrap payload and extract metadata.

Source code in omni_box/core/protocols/consumers.py
def parse(self, raw_payload: dict[str, JsonValue], headers: dict[str, str] | None) -> EnvelopeData:
    """Unwrap payload and extract metadata."""
    ...

EventAlreadyLockedError

Bases: OmniBoxError

Raised when an event is already locked by another worker.

Source code in omni_box/core/exceptions.py
class EventAlreadyLockedError(OmniBoxError):
    """Raised when an event is already locked by another worker."""

    def __init__(self, event_id: UUID, locked_by: str | None) -> None:
        self.event_id = event_id
        self.locked_by = locked_by
        super().__init__(f"Event {event_id} is already locked by {locked_by}")

EventBatchProcessor

Unified batch processor powered by pipeline architecture.

Source code in omni_box/core/services/processor.py
class EventBatchProcessor[T: BaseEvent]:
    """Unified batch processor powered by pipeline architecture."""

    def __init__(
        self,
        repo: EventRepository[T],
        pipeline: ProcessingPipeline[T],
        fetch_strategy: FetchStrategy[T],
        commit_strategy: CommitStrategy[T],
        *,
        job_name: str = "event_processor",
        metrics: ProcessingMetrics | None = None,
    ) -> None:
        self._repo = repo
        self._pipeline = pipeline
        self._fetch_strategy = fetch_strategy
        self._commit_strategy = commit_strategy
        self._job_name = job_name
        self._metrics = metrics

    async def process_batch(
        self,
        worker_id: str,
        batch_size: int,
        shutdown_requested_func: Callable[[], bool] | None = None,
        **fetch_filters: Unpack[FetchFilters],
    ) -> BatchProcessingResult:
        """Fetch and process a batch of events using the configured pipeline."""
        try:
            # 1. Fetch
            events = await self._fetch_strategy.fetch(
                self._repo,
                batch_size=batch_size,
                worker_id=worker_id,
                **fetch_filters,
            )

            if not events:
                return BatchProcessingResult()

            # 2. Process
            context = ProcessingContext[T](
                repo=self._repo,
                worker_id=worker_id,
                metrics=self._metrics,
            )

            await self._pipeline.process_batch(events, context)

            # 3. Commit
            success = await self._commit_strategy.commit(context)
            if not success:
                logger.error(
                    "Failed to commit batch processing results",
                    worker_id=worker_id,
                    job_name=self._job_name,
                )

            # 4. Result
            return BatchProcessingResult(
                processed_event_ids=context.completed_ids,
                failed_counted=context.failed_counted,
                failed_noncounted=context.failed_noncounted,
                remaining_event_ids=context.skipped_ids,
                commit_failed=not success,
            )
        except (Exception, asyncio.CancelledError) as e:
            if not isinstance(e, asyncio.CancelledError):
                logger.exception(
                    f"{self._job_name} batch failed due to unexpected error",
                    error=str(e),
                    worker_id=worker_id,
                    error_type=type(e).__name__,
                )
            raise

process_batch(worker_id, batch_size, shutdown_requested_func=None, **fetch_filters) async

Fetch and process a batch of events using the configured pipeline.

Source code in omni_box/core/services/processor.py
async def process_batch(
    self,
    worker_id: str,
    batch_size: int,
    shutdown_requested_func: Callable[[], bool] | None = None,
    **fetch_filters: Unpack[FetchFilters],
) -> BatchProcessingResult:
    """Fetch and process a batch of events using the configured pipeline."""
    try:
        # 1. Fetch
        events = await self._fetch_strategy.fetch(
            self._repo,
            batch_size=batch_size,
            worker_id=worker_id,
            **fetch_filters,
        )

        if not events:
            return BatchProcessingResult()

        # 2. Process
        context = ProcessingContext[T](
            repo=self._repo,
            worker_id=worker_id,
            metrics=self._metrics,
        )

        await self._pipeline.process_batch(events, context)

        # 3. Commit
        success = await self._commit_strategy.commit(context)
        if not success:
            logger.error(
                "Failed to commit batch processing results",
                worker_id=worker_id,
                job_name=self._job_name,
            )

        # 4. Result
        return BatchProcessingResult(
            processed_event_ids=context.completed_ids,
            failed_counted=context.failed_counted,
            failed_noncounted=context.failed_noncounted,
            remaining_event_ids=context.skipped_ids,
            commit_failed=not success,
        )
    except (Exception, asyncio.CancelledError) as e:
        if not isinstance(e, asyncio.CancelledError):
            logger.exception(
                f"{self._job_name} batch failed due to unexpected error",
                error=str(e),
                worker_id=worker_id,
                error_type=type(e).__name__,
            )
        raise

EventConcurrentUpdateError

Bases: OmniBoxError

Raised when a repository update affects fewer rows than expected.

Indicates that some events might have been modified by another worker, unlocked by an admin, or already reached the target state.

Source code in omni_box/core/exceptions.py
class EventConcurrentUpdateError(OmniBoxError):
    """Raised when a repository update affects fewer rows than expected.

    Indicates that some events might have been modified by another worker,
    unlocked by an admin, or already reached the target state.
    """

    def __init__(
        self,
        expected: int,
        actual: int,
        message: str | None = None,
        missing_ids: list[UUID] | None = None,
    ) -> None:
        self.expected = expected
        self.actual = actual
        self.missing_ids = missing_ids or []

        msg = message or f"Concurrent update detected: expected {expected} rows, but updated {actual}"
        if self.missing_ids:
            # Show up to first 10 missing IDs for diagnostics
            ids_str = ", ".join(str(i) for i in self.missing_ids[:10])
            if len(self.missing_ids) > 10:
                ids_str += ", ..."
            msg += f". Missing IDs: [{ids_str}]"

        super().__init__(msg)

EventConsumer

Bases: ABC

Abstract event consumer.

Source code in omni_box/core/protocols/consumers.py
class EventConsumer(ABC):
    """Abstract event consumer."""

    @abstractmethod
    async def start(self) -> None:
        """Start consumer lifecycle resources."""
        ...

    @abstractmethod
    async def stop(self) -> None:
        """Stop consumer lifecycle resources."""
        ...

    @abstractmethod
    async def getone(self) -> ConsumedMessage:
        """Fetch one message from broker."""
        ...

getone() abstractmethod async

Fetch one message from broker.

Source code in omni_box/core/protocols/consumers.py
@abstractmethod
async def getone(self) -> ConsumedMessage:
    """Fetch one message from broker."""
    ...

start() abstractmethod async

Start consumer lifecycle resources.

Source code in omni_box/core/protocols/consumers.py
@abstractmethod
async def start(self) -> None:
    """Start consumer lifecycle resources."""
    ...

stop() abstractmethod async

Stop consumer lifecycle resources.

Source code in omni_box/core/protocols/consumers.py
@abstractmethod
async def stop(self) -> None:
    """Stop consumer lifecycle resources."""
    ...

EventHandlerResult dataclass

Explicit outcome of a single event handler.

Source code in omni_box/core/services/results.py
@dataclass(frozen=True, slots=True)
class EventHandlerResult:
    """Explicit outcome of a single event handler."""

    success: bool
    processed: bool = True
    status: EventHandlerStatus | str | None = None
    error_message: str | None = None
    count_as_attempt: bool = True
    next_retry_at: datetime | None = None

EventHandlerStatus

Bases: StrEnum

Semantic status of event processing result.

Source code in omni_box/core/services/results.py
class EventHandlerStatus(StrEnum):
    """Semantic status of event processing result."""

    COMPLETED = "completed"
    """Handler successfully processed the event."""

    STALE = "stale"
    """Handler ignored the event because it is outdated (revision check)."""

    SKIPPED = "skipped"
    """Handler explicitly chose to skip this event (e.g. business logic)."""

    FAILED = "failed"
    """Handler failed with an exception or explicit error."""

    RETRY = "retry"
    """Handler failed but requested a retry."""

COMPLETED = 'completed' class-attribute instance-attribute

Handler successfully processed the event.

FAILED = 'failed' class-attribute instance-attribute

Handler failed with an exception or explicit error.

RETRY = 'retry' class-attribute instance-attribute

Handler failed but requested a retry.

SKIPPED = 'skipped' class-attribute instance-attribute

Handler explicitly chose to skip this event (e.g. business logic).

STALE = 'stale' class-attribute instance-attribute

Handler ignored the event because it is outdated (revision check).

EventLockedByAnotherWorkerError

Bases: OmniBoxError

Raised when an operation is performed by a worker that doesn't own the lock.

Source code in omni_box/core/exceptions.py
class EventLockedByAnotherWorkerError(OmniBoxError):
    """Raised when an operation is performed by a worker that doesn't own the lock."""

    def __init__(self, event_id: UUID, locked_by: str | None, worker_id: str) -> None:
        self.event_id = event_id
        self.locked_by = locked_by
        self.worker_id = worker_id
        super().__init__(f"Event {event_id} is locked by {locked_by}, but operation was attempted by {worker_id}")

EventNotLockedError

Bases: OmniBoxError

Raised when an operation requires a lock but the event is not locked.

Source code in omni_box/core/exceptions.py
class EventNotLockedError(OmniBoxError):
    """Raised when an operation requires a lock but the event is not locked."""

    def __init__(self, event_id: UUID) -> None:
        self.event_id = event_id
        super().__init__(f"Event {event_id} is not locked")

EventProcessorBuilder

Fluent API for constructing highly-customizable event processors.

Source code in omni_box/core/pipeline/builder.py
class EventProcessorBuilder[T: BaseEvent]:
    """Fluent API for constructing highly-customizable event processors."""

    def __init__(self, repo: EventRepository[T]) -> None:
        """Initialize builder with a specific event repository.

        Args:
            repo: The repository to fetch and commit events from/to.
        """
        self._repo = repo
        self._pipeline = ProcessingPipeline[T]()
        self._fetch_strategy: FetchStrategy[T] | None = None
        self._commit_strategy: CommitStrategy[T] | None = None
        self._metrics: ProcessingMetrics | None = None
        self._lease_ttl: int = DEFAULT_LEASE_TIMEOUT_SECONDS
        self._job_name: str = "event_processor"

    def with_fetch_strategy(self, strategy: FetchStrategy[T]) -> Self:
        """Explicitly set the fetching strategy."""
        self._fetch_strategy = strategy
        return self

    def with_commit_strategy(self, strategy: CommitStrategy[T]) -> Self:
        """Explicitly set the commit strategy."""
        self._commit_strategy = strategy
        return self

    def add_step(self, step: ProcessingStep[T]) -> Self:
        """Add a processing step to the pipeline."""
        self._pipeline.add_step(step)
        return self

    def with_metrics(self, metrics: ProcessingMetrics | None) -> Self:
        """Set the metrics collector."""
        self._metrics = metrics
        return self

    def with_lease_ttl(self, ttl_seconds: int) -> Self:
        """Set the distributed lock lease TTL (in seconds)."""
        if ttl_seconds <= 0:
            raise ValueError("lease_ttl must be positive")
        self._lease_ttl = ttl_seconds
        return self

    def with_job_name(self, name: str) -> Self:
        """Set the job name for logging/metrics context."""
        self._job_name = name
        return self

    def build(self) -> EventBatchProcessor[T]:
        """Finalize the processor configuration."""
        capabilities = getattr(self._repo, "capabilities", None)

        if self._fetch_strategy is None:
            if (
                isinstance(capabilities, RepositoryCapabilities) and capabilities.supports_distributed_locking
            ) or isinstance(self._repo, SupportsDistributedLocking):
                self._fetch_strategy = DistributedLockingFetchStrategy[T](ttl=self._lease_ttl)
            else:
                self._fetch_strategy = OptimisticLockingFetchStrategy[T]()

        if self._commit_strategy is None:
            if (isinstance(capabilities, RepositoryCapabilities) and capabilities.supports_bulk) or isinstance(
                self._repo, SupportsBulkOperations
            ):
                self._commit_strategy = BulkCommitStrategy[T]()
            else:
                self._commit_strategy = SingleCommitStrategy[T]()

        return EventBatchProcessor(
            repo=self._repo,
            pipeline=self._pipeline,
            fetch_strategy=self._fetch_strategy,
            commit_strategy=self._commit_strategy,
            job_name=self._job_name,
            metrics=self._metrics,
        )

__init__(repo)

Initialize builder with a specific event repository.

Parameters:

Name Type Description Default
repo EventRepository[T]

The repository to fetch and commit events from/to.

required
Source code in omni_box/core/pipeline/builder.py
def __init__(self, repo: EventRepository[T]) -> None:
    """Initialize builder with a specific event repository.

    Args:
        repo: The repository to fetch and commit events from/to.
    """
    self._repo = repo
    self._pipeline = ProcessingPipeline[T]()
    self._fetch_strategy: FetchStrategy[T] | None = None
    self._commit_strategy: CommitStrategy[T] | None = None
    self._metrics: ProcessingMetrics | None = None
    self._lease_ttl: int = DEFAULT_LEASE_TIMEOUT_SECONDS
    self._job_name: str = "event_processor"

add_step(step)

Add a processing step to the pipeline.

Source code in omni_box/core/pipeline/builder.py
def add_step(self, step: ProcessingStep[T]) -> Self:
    """Add a processing step to the pipeline."""
    self._pipeline.add_step(step)
    return self

build()

Finalize the processor configuration.

Source code in omni_box/core/pipeline/builder.py
def build(self) -> EventBatchProcessor[T]:
    """Finalize the processor configuration."""
    capabilities = getattr(self._repo, "capabilities", None)

    if self._fetch_strategy is None:
        if (
            isinstance(capabilities, RepositoryCapabilities) and capabilities.supports_distributed_locking
        ) or isinstance(self._repo, SupportsDistributedLocking):
            self._fetch_strategy = DistributedLockingFetchStrategy[T](ttl=self._lease_ttl)
        else:
            self._fetch_strategy = OptimisticLockingFetchStrategy[T]()

    if self._commit_strategy is None:
        if (isinstance(capabilities, RepositoryCapabilities) and capabilities.supports_bulk) or isinstance(
            self._repo, SupportsBulkOperations
        ):
            self._commit_strategy = BulkCommitStrategy[T]()
        else:
            self._commit_strategy = SingleCommitStrategy[T]()

    return EventBatchProcessor(
        repo=self._repo,
        pipeline=self._pipeline,
        fetch_strategy=self._fetch_strategy,
        commit_strategy=self._commit_strategy,
        job_name=self._job_name,
        metrics=self._metrics,
    )

with_commit_strategy(strategy)

Explicitly set the commit strategy.

Source code in omni_box/core/pipeline/builder.py
def with_commit_strategy(self, strategy: CommitStrategy[T]) -> Self:
    """Explicitly set the commit strategy."""
    self._commit_strategy = strategy
    return self

with_fetch_strategy(strategy)

Explicitly set the fetching strategy.

Source code in omni_box/core/pipeline/builder.py
def with_fetch_strategy(self, strategy: FetchStrategy[T]) -> Self:
    """Explicitly set the fetching strategy."""
    self._fetch_strategy = strategy
    return self

with_job_name(name)

Set the job name for logging/metrics context.

Source code in omni_box/core/pipeline/builder.py
def with_job_name(self, name: str) -> Self:
    """Set the job name for logging/metrics context."""
    self._job_name = name
    return self

with_lease_ttl(ttl_seconds)

Set the distributed lock lease TTL (in seconds).

Source code in omni_box/core/pipeline/builder.py
def with_lease_ttl(self, ttl_seconds: int) -> Self:
    """Set the distributed lock lease TTL (in seconds)."""
    if ttl_seconds <= 0:
        raise ValueError("lease_ttl must be positive")
    self._lease_ttl = ttl_seconds
    return self

with_metrics(metrics)

Set the metrics collector.

Source code in omni_box/core/pipeline/builder.py
def with_metrics(self, metrics: ProcessingMetrics | None) -> Self:
    """Set the metrics collector."""
    self._metrics = metrics
    return self

EventPublisher

Bases: ABC

Abstract publisher that accepts OutboxEvent.

The publish signature mirrors the handler signature used by :class:~omni_box.core.pipeline.steps.handler.HandlerExecutionStep so that publishers and inbox handlers are interchangeable from the pipeline's perspective. Most broker implementations do not need the repository; it is provided so a publisher can perform book-keeping in the same transaction (e.g. write an audit row) when desired.

Source code in omni_box/core/protocols/publishers.py
class EventPublisher(ABC):
    """Abstract publisher that accepts OutboxEvent.

    The ``publish`` signature mirrors the handler signature used by
    :class:`~omni_box.core.pipeline.steps.handler.HandlerExecutionStep` so that
    publishers and inbox handlers are interchangeable from the pipeline's
    perspective. Most broker implementations do not need the repository; it is
    provided so a publisher *can* perform book-keeping in the same
    transaction (e.g. write an audit row) when desired.
    """

    @abstractmethod
    async def publish(self, event: OutboxEvent, repo: EventRepository[OutboxEvent]) -> None:
        """Publish event to the broker.

        Args:
            event: The outbox event to deliver.
            repo: Repository bound to the in-flight transaction. Pass-through
                value for the pipeline contract; broker implementations may
                ignore it.
        """
        ...

publish(event, repo) abstractmethod async

Publish event to the broker.

Parameters:

Name Type Description Default
event OutboxEvent

The outbox event to deliver.

required
repo EventRepository[OutboxEvent]

Repository bound to the in-flight transaction. Pass-through value for the pipeline contract; broker implementations may ignore it.

required
Source code in omni_box/core/protocols/publishers.py
@abstractmethod
async def publish(self, event: OutboxEvent, repo: EventRepository[OutboxEvent]) -> None:
    """Publish event to the broker.

    Args:
        event: The outbox event to deliver.
        repo: Repository bound to the in-flight transaction. Pass-through
            value for the pipeline contract; broker implementations may
            ignore it.
    """
    ...

EventRouter

Registry and dispatcher for event handlers.

Source code in omni_box/core/dispatch/registry.py
class EventRouter:
    """Registry and dispatcher for event handlers."""

    def __init__(self, normalize_topic: Callable[[str], str] | None = None) -> None:
        self._handlers: dict[tuple[str, str, str | None], InboxHandler] = {}
        self._topic_handlers: dict[str, set[str]] = {}
        self._normalize_topic = normalize_topic
        self._logger = structlog.get_logger(__name__)

    def register_handler(
        self,
        event_type: DispatchName,
        topic: DispatchName,
        handler: InboxHandler,
        schema_version: str | None = None,
        handler_name: str | None = None,
    ) -> None:
        """Register single event handler."""
        et = as_dispatch_str(event_type)
        top = as_dispatch_str(topic)
        if self._normalize_topic:
            top = self._normalize_topic(top)

        key = (top, et, schema_version)
        if key in self._handlers:
            raise HandlerAlreadyRegisteredError(f"Handler for {top}.{et} (v{schema_version}) already registered")

        self._handlers[key] = handler
        self._topic_handlers.setdefault(top, set()).add(et)
        self._logger.debug(
            "Handler registered",
            topic=top,
            event_type=et,
            schema_version=schema_version,
            handler_name=handler_name,
        )

    def register_class(self, handler_class: type[BaseEventHandler], topic: DispatchName | None = None) -> None:
        """Auto-register all @event_handler decorated methods from class."""
        self.register_instance(handler_class(), topic=topic)

    def register_instance(self, instance: BaseEventHandler, topic: DispatchName | None = None) -> None:
        """Auto-register all @event_handler decorated methods from instance."""
        handler_class = type(instance)
        class_topic = getattr(instance, "topic", topic)
        if not class_topic:
            raise ValueError(f"Topic not specified for {handler_class.__name__}")
        class_topic_str = as_dispatch_str(class_topic)

        registered = 0

        for name, method in inspect.getmembers(instance, predicate=inspect.ismethod):
            if not getattr(method, "_is_event_handler", False):
                continue

            event_type = cast(str, method._event_type)  # type: ignore[attr-defined]
            schema_version = cast(str | None, getattr(method, "_schema_version", None))
            raw_method_topic = cast(str | None, getattr(method, "_event_topic", None))
            method_topic = raw_method_topic if raw_method_topic is not None else class_topic_str

            self.register_handler(
                event_type=event_type,
                topic=method_topic,
                schema_version=schema_version,
                handler=cast(InboxHandler, method),
                handler_name=f"{handler_class.__name__}.{name}",
            )
            registered += 1

        self._logger.info(
            "Handler instance registered",
            class_name=handler_class.__name__,
            handlers_count=registered,
        )

    async def dispatch(
        self,
        event: InboxEvent,
        topic: DispatchName,
        repo: InboxEventRepository,
        **dependencies: object,
    ) -> EventHandlerResult:
        """Dispatch event to registered handler."""
        topic_str = as_dispatch_str(topic)
        if self._normalize_topic:
            topic_str = self._normalize_topic(topic_str)

        event_type_str = as_dispatch_str(event.event_type)
        schema_version = event.schema_version

        # 1. Try exact match
        key_v = (topic_str, event_type_str, schema_version)
        handler = self._handlers.get(key_v)

        # 2. Try migration
        if not handler:
            for (t, et, v), h in self._handlers.items():
                if t == topic_str and et == event_type_str and v != schema_version:
                    try:
                        migrated_payload = BaseEventSchema.migrate_payload(
                            event_type=event_type_str,
                            payload=event.payload,
                            from_version=schema_version,
                            to_version=v or "",
                        )
                        if migrated_payload != event.payload:
                            event = event.model_copy(update={"payload": migrated_payload, "schema_version": v})
                            handler = h
                            self._logger.info(
                                "Migrated event", event_id=str(event.id), from_version=schema_version, to_version=v
                            )
                            break
                    except Exception:
                        self._logger.exception("Failed migration", event_id=str(event.id))

        # 3. Try generic match
        if not handler:
            key_any = (topic_str, event_type_str, None)
            handler = self._handlers.get(key_any)

        if not handler:
            msg = f"No handler for topic={topic_str!r} event_type={event_type_str!r} v={schema_version!r}"
            self._logger.warning("No handler", event_id=str(event.id), topic=topic_str, event_type=event_type_str)
            return EventHandlerResult(success=False, error_message=msg, count_as_attempt=True)

        raw = await handler(event, repo, **dependencies)
        return coerce_handler_outcome(raw)

    def get_topics(self) -> set[str]:
        return set(self._topic_handlers.keys())

    def get_event_types_for_topic(self, topic: DispatchName) -> set[str]:
        return self._topic_handlers.get(as_dispatch_str(topic), set())

dispatch(event, topic, repo, **dependencies) async

Dispatch event to registered handler.

Source code in omni_box/core/dispatch/registry.py
async def dispatch(
    self,
    event: InboxEvent,
    topic: DispatchName,
    repo: InboxEventRepository,
    **dependencies: object,
) -> EventHandlerResult:
    """Dispatch event to registered handler."""
    topic_str = as_dispatch_str(topic)
    if self._normalize_topic:
        topic_str = self._normalize_topic(topic_str)

    event_type_str = as_dispatch_str(event.event_type)
    schema_version = event.schema_version

    # 1. Try exact match
    key_v = (topic_str, event_type_str, schema_version)
    handler = self._handlers.get(key_v)

    # 2. Try migration
    if not handler:
        for (t, et, v), h in self._handlers.items():
            if t == topic_str and et == event_type_str and v != schema_version:
                try:
                    migrated_payload = BaseEventSchema.migrate_payload(
                        event_type=event_type_str,
                        payload=event.payload,
                        from_version=schema_version,
                        to_version=v or "",
                    )
                    if migrated_payload != event.payload:
                        event = event.model_copy(update={"payload": migrated_payload, "schema_version": v})
                        handler = h
                        self._logger.info(
                            "Migrated event", event_id=str(event.id), from_version=schema_version, to_version=v
                        )
                        break
                except Exception:
                    self._logger.exception("Failed migration", event_id=str(event.id))

    # 3. Try generic match
    if not handler:
        key_any = (topic_str, event_type_str, None)
        handler = self._handlers.get(key_any)

    if not handler:
        msg = f"No handler for topic={topic_str!r} event_type={event_type_str!r} v={schema_version!r}"
        self._logger.warning("No handler", event_id=str(event.id), topic=topic_str, event_type=event_type_str)
        return EventHandlerResult(success=False, error_message=msg, count_as_attempt=True)

    raw = await handler(event, repo, **dependencies)
    return coerce_handler_outcome(raw)

register_class(handler_class, topic=None)

Auto-register all @event_handler decorated methods from class.

Source code in omni_box/core/dispatch/registry.py
def register_class(self, handler_class: type[BaseEventHandler], topic: DispatchName | None = None) -> None:
    """Auto-register all @event_handler decorated methods from class."""
    self.register_instance(handler_class(), topic=topic)

register_handler(event_type, topic, handler, schema_version=None, handler_name=None)

Register single event handler.

Source code in omni_box/core/dispatch/registry.py
def register_handler(
    self,
    event_type: DispatchName,
    topic: DispatchName,
    handler: InboxHandler,
    schema_version: str | None = None,
    handler_name: str | None = None,
) -> None:
    """Register single event handler."""
    et = as_dispatch_str(event_type)
    top = as_dispatch_str(topic)
    if self._normalize_topic:
        top = self._normalize_topic(top)

    key = (top, et, schema_version)
    if key in self._handlers:
        raise HandlerAlreadyRegisteredError(f"Handler for {top}.{et} (v{schema_version}) already registered")

    self._handlers[key] = handler
    self._topic_handlers.setdefault(top, set()).add(et)
    self._logger.debug(
        "Handler registered",
        topic=top,
        event_type=et,
        schema_version=schema_version,
        handler_name=handler_name,
    )

register_instance(instance, topic=None)

Auto-register all @event_handler decorated methods from instance.

Source code in omni_box/core/dispatch/registry.py
def register_instance(self, instance: BaseEventHandler, topic: DispatchName | None = None) -> None:
    """Auto-register all @event_handler decorated methods from instance."""
    handler_class = type(instance)
    class_topic = getattr(instance, "topic", topic)
    if not class_topic:
        raise ValueError(f"Topic not specified for {handler_class.__name__}")
    class_topic_str = as_dispatch_str(class_topic)

    registered = 0

    for name, method in inspect.getmembers(instance, predicate=inspect.ismethod):
        if not getattr(method, "_is_event_handler", False):
            continue

        event_type = cast(str, method._event_type)  # type: ignore[attr-defined]
        schema_version = cast(str | None, getattr(method, "_schema_version", None))
        raw_method_topic = cast(str | None, getattr(method, "_event_topic", None))
        method_topic = raw_method_topic if raw_method_topic is not None else class_topic_str

        self.register_handler(
            event_type=event_type,
            topic=method_topic,
            schema_version=schema_version,
            handler=cast(InboxHandler, method),
            handler_name=f"{handler_class.__name__}.{name}",
        )
        registered += 1

    self._logger.info(
        "Handler instance registered",
        class_name=handler_class.__name__,
        handlers_count=registered,
    )

EventStatus

Bases: StrEnum

General event status enumeration.

Attributes:

Name Type Description
PENDING

Event is created and waiting to be processed/published.

COMPLETED

Event has been successfully processed/published.

FAILED

Event processing/publication failed after maximum retries.

Source code in omni_box/core/models/enums.py
class EventStatus(StrEnum):
    """General event status enumeration.

    Attributes:
        PENDING: Event is created and waiting to be processed/published.
        COMPLETED: Event has been successfully processed/published.
        FAILED: Event processing/publication failed after maximum retries.
    """

    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"

FetchFilters

Bases: TypedDict

Common filters for event fetching.

Source code in omni_box/core/protocols/repository.py
class FetchFilters(TypedDict, total=False):
    """Common filters for event fetching."""

    source: str | list[str] | None
    topic: str | list[str] | None
    aggregate_type: str | list[str] | None
    aggregate_id: UUID | list[UUID] | None

FilteredFetchStrategy

Strategy that applies fixed filters to all fetch operations.

Source code in omni_box/core/pipeline/strategies/fetch.py
class FilteredFetchStrategy[T: BaseEvent]:
    """Strategy that applies fixed filters to all fetch operations."""

    def __init__(self, sources: list[str] | None = None, ttl: int = DEFAULT_LEASE_TIMEOUT_SECONDS) -> None:
        self._sources = sources
        self._ttl = ttl

    async def fetch(
        self,
        repo: EventRepository[T],
        batch_size: int,
        worker_id: str,
        **filters: Unpack[FetchFilters],
    ) -> list[T]:
        """Fetch events with additional sources filtering."""
        combined_filters = filters.copy()
        if self._sources:
            combined_filters["source"] = self._sources

        if isinstance(repo, SupportsDistributedLocking):
            return await repo.fetch_and_lock_pending(
                limit=batch_size,
                worker_id=worker_id,
                ttl=self._ttl,
                **combined_filters,
            )

        pending = await repo.fetch_pending(limit=batch_size, **combined_filters)
        locked = []
        for event in pending:
            if await repo.mark_processing(event.id, worker_id):
                locked.append(event)
        return locked

fetch(repo, batch_size, worker_id, **filters) async

Fetch events with additional sources filtering.

Source code in omni_box/core/pipeline/strategies/fetch.py
async def fetch(
    self,
    repo: EventRepository[T],
    batch_size: int,
    worker_id: str,
    **filters: Unpack[FetchFilters],
) -> list[T]:
    """Fetch events with additional sources filtering."""
    combined_filters = filters.copy()
    if self._sources:
        combined_filters["source"] = self._sources

    if isinstance(repo, SupportsDistributedLocking):
        return await repo.fetch_and_lock_pending(
            limit=batch_size,
            worker_id=worker_id,
            ttl=self._ttl,
            **combined_filters,
        )

    pending = await repo.fetch_pending(limit=batch_size, **combined_filters)
    locked = []
    for event in pending:
        if await repo.mark_processing(event.id, worker_id):
            locked.append(event)
    return locked

InboxConsumeResult dataclass

Result of one inbox consume cycle.

Source code in omni_box/application/services/consume.py
@dataclass(frozen=True, slots=True)
class InboxConsumeResult:
    """Result of one inbox consume cycle."""

    message_id: str
    event_id: UUID | None
    committed: bool
    processed: bool
    duplicate: bool

InboxConsumerRunner

High-level service for consuming broker messages into the Transactional Inbox.

Source code in omni_box/application/services/consume.py
class InboxConsumerRunner:
    """High-level service for consuming broker messages into the Transactional Inbox."""

    def __init__(
        self,
        consumer: EventConsumer,
        transaction_provider: InboxTransactionProviderProtocol,
        handler: InboxHandler | None = None,
        *,
        worker_id: str,
        consumer_group: str,
        domain_service: OmniBoxDomainService | None = None,
        ack_strategy: AckStrategy = AckStrategy.EXACTLY_ONCE_INBOX,
        commit_offset_policy: CommitOffsetPolicy = CommitOffsetPolicy.ON_PERSIST,
        exactly_once_commit_on_failed: bool = False,
        process_timeout: float = DEFAULT_PROCESS_TIMEOUT_SECONDS,
        concurrency_limit: int | None = None,
        metrics: InboxMetrics | None = None,
    ) -> None:
        self._consumer = consumer
        self._processor = InboxMessageProcessor(
            transaction_provider=transaction_provider,
            handler=handler,
            worker_id=worker_id,
            consumer_group=consumer_group,
            domain_service=domain_service,
            process_timeout=process_timeout,
        )
        self._ack_strategy = ack_strategy
        self._commit_offset_policy = commit_offset_policy
        self._exactly_once_commit_on_failed = exactly_once_commit_on_failed
        self._metrics = metrics or NoOpInboxMetrics()
        self._semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit else None
        self._running = False
        self._lifecycle_lock = asyncio.Lock()
        # Backoff used when ``process_one`` raises while ``run_forever`` is
        # active. Caps the loop's retry rate so an external outage can't burn
        # CPU.
        self._run_forever_min_backoff = 0.1
        self._run_forever_max_backoff = 5.0

    async def start(self) -> None:
        async with self._lifecycle_lock:
            if self._running:
                return
            await self._consumer.start()
            self._running = True

    async def stop(self) -> None:
        async with self._lifecycle_lock:
            if not self._running:
                return
            self._running = False
            await self._consumer.stop()

    async def run_forever(self) -> None:
        if not self._running:
            raise RuntimeError("Runner is not started.")
        backoff = self._run_forever_min_backoff
        while self._running:
            try:
                await self.process_one()
            except asyncio.CancelledError:
                raise
            except Exception:
                logger.exception("InboxConsumerRunner.process_one failed; backing off")
                await asyncio.sleep(backoff)
                backoff = min(backoff * 2, self._run_forever_max_backoff)
            else:
                backoff = self._run_forever_min_backoff

    async def process_one(self) -> InboxConsumeResult:
        if self._semaphore:
            async with self._semaphore:
                return await self._process_one_internal()
        return await self._process_one_internal()

    async def _process_one_internal(self) -> InboxConsumeResult:
        message = await self._consumer.getone()
        self._metrics.inc_consumed()
        ack_handle = message.ack_handle or NullAckHandle()
        committed = False

        if self._ack_strategy == AckStrategy.AT_MOST_ONCE:
            await self._commit_with_metrics(ack_handle)
            committed = True

        event = self._processor.create_event(message)
        stored, handler_result, handler_error, started_at = await self._processor.process_in_transaction(event)

        if stored is None:
            raise InboxPersistError(message_id=message.message_id, cause=handler_error)

        if stored.status == EventStatus.COMPLETED:
            return await self._handle_duplicate(message, stored, ack_handle, committed)

        if handler_error is not None:
            return await self._handle_handler_error(message, stored, handler_error, started_at, ack_handle, committed)

        if handler_result is not None:
            self._update_metrics_after_handler(message.event_type, handler_result, started_at)

        if self._should_commit_offset(handler_result, committed):
            await self._commit_with_metrics(ack_handle)
            committed = True

        return InboxConsumeResult(
            message_id=message.message_id,
            event_id=stored.id,
            committed=committed,
            processed=handler_result.processed if handler_result else False,
            duplicate=False,
        )

    async def _handle_duplicate(
        self, message: ConsumedMessage, stored: InboxEvent, ack_handle: AckHandle, committed: bool
    ) -> InboxConsumeResult:
        logger.info(
            "Inbox event already processed (duplicate)",
            event_id=str(stored.id),
            message_id=message.message_id,
            event_type=stored.event_type,
        )
        if self._ack_strategy == AckStrategy.EXACTLY_ONCE_INBOX and not committed:
            await self._commit_with_metrics(ack_handle)
            committed = True
        self._metrics.inc_duplicate(event_type=stored.event_type)
        return InboxConsumeResult(
            message_id=message.message_id,
            event_id=stored.id,
            committed=committed,
            processed=False,
            duplicate=True,
        )

    async def _handle_handler_error(
        self,
        message: ConsumedMessage,
        stored: InboxEvent,
        error: Exception,
        started_at: float,
        ack_handle: AckHandle,
        committed: bool,
    ) -> InboxConsumeResult:
        if started_at > 0:
            self._metrics.observe_handler_duration(perf_counter() - started_at, event_type=stored.event_type)
        self._metrics.inc_failed(event_type=stored.event_type)

        logger.error(
            "Inbox message handler failed", event_id=str(stored.id), message_id=message.message_id, error=str(error)
        )

        if (
            self._ack_strategy == AckStrategy.AT_LEAST_ONCE
            and self._commit_offset_policy == CommitOffsetPolicy.ON_PERSIST
            and not committed
        ):
            await self._commit_with_metrics(ack_handle)
            committed = True
        if self._ack_strategy == AckStrategy.EXACTLY_ONCE_INBOX and self._exactly_once_commit_on_failed:
            await self._commit_with_metrics(ack_handle)
            committed = True

        return InboxConsumeResult(
            message_id=message.message_id,
            event_id=stored.id,
            committed=committed,
            processed=False,
            duplicate=False,
        )

    def _update_metrics_after_handler(
        self, event_type: str, handler_result: EventHandlerResult, started_at: float
    ) -> None:
        self._metrics.observe_handler_duration(perf_counter() - started_at, event_type=event_type)
        if handler_result.processed:
            if handler_result.success:
                self._metrics.inc_processed(event_type=event_type)
            else:
                self._metrics.inc_failed(event_type=event_type)

    def _should_commit_offset(self, handler_result: EventHandlerResult | None, committed: bool) -> bool:
        if committed:
            return False

        if self._ack_strategy == AckStrategy.AT_LEAST_ONCE:
            if self._commit_offset_policy == CommitOffsetPolicy.ON_PERSIST:
                return True
            if self._commit_offset_policy == CommitOffsetPolicy.ON_SUCCESS:
                return handler_result is None or handler_result.processed

        elif self._ack_strategy == AckStrategy.EXACTLY_ONCE_INBOX:
            if not self._processor.has_handler:
                return True
            elif handler_result is None:
                return False
            elif not handler_result.processed or handler_result.success:
                return True

        return False

    async def _commit_with_metrics(self, ack_handle: AckHandle) -> None:
        try:
            await ack_handle.commit()
        except Exception:
            self._metrics.inc_commit_failed()
            raise
        self._metrics.inc_committed()

InboxEvent

Bases: BaseEvent

Domain entity for events received from external brokers (Inbox).

Ensures exactly-once processing by tracking external message identifiers and consumer groups. Facilitates payload parsing into typed schemas using the global schema registry.

Source code in omni_box/core/models/entities.py
class InboxEvent(BaseEvent):
    """Domain entity for events received from external brokers (Inbox).

    Ensures exactly-once processing by tracking external message identifiers
    and consumer groups. Facilitates payload parsing into typed schemas
    using the global schema registry.
    """

    # Inbox-specific Identifiers
    message_id: StrippedNonEmptyStr = Field(max_length=MESSAGE_ID_MAX_LENGTH)
    consumer_group: StrippedNonEmptyStr = Field(max_length=CONSUMER_GROUP_MAX_LENGTH)

    # Source info
    source: StrippedNonEmptyStr = Field(max_length=SOURCE_MAX_LENGTH)

    @property
    def processed_at(self) -> datetime | None:
        """Alias for completed_at."""
        return self.completed_at

    def get_context_value(self, key: str) -> str | None:
        """Get a value from the event headers (context).

        Args:
            key: The header key to retrieve.

        Returns:
            The header value if present, None otherwise.
        """
        return self.headers.get(key) if self.headers else None

    def get_payload_as[S: BaseEventSchema](self, schema_cls: type[S] | None = None) -> S:
        """Resolve and parse the payload into a typed schema.

        If schema_cls is provided, it parses with it. Otherwise, it uses the global
        discovery mechanism via BaseEventSchema.resolve() based on event_type and schema_version.
        """
        final_schema_cls: type[S]
        if schema_cls is None:
            resolved_cls = BaseEventSchema.resolve(
                event_type=self.event_type,
                version=self.schema_version,
            )
            final_schema_cls = cast(type[S], resolved_cls)
        else:
            final_schema_cls = schema_cls

        return final_schema_cls.from_payload(self.payload)

attempts_left property

Number of attempts remaining before FAILED status.

can_retry property

Check if the event can be retried.

failure_count property

Number of attempts made when in FAILED status.

is_locked property

Check if the event is currently locked.

processed_at property

Alias for completed_at.

get_context_value(key)

Get a value from the event headers (context).

Parameters:

Name Type Description Default
key str

The header key to retrieve.

required

Returns:

Type Description
str | None

The header value if present, None otherwise.

Source code in omni_box/core/models/entities.py
def get_context_value(self, key: str) -> str | None:
    """Get a value from the event headers (context).

    Args:
        key: The header key to retrieve.

    Returns:
        The header value if present, None otherwise.
    """
    return self.headers.get(key) if self.headers else None

get_payload_as(schema_cls=None)

Resolve and parse the payload into a typed schema.

If schema_cls is provided, it parses with it. Otherwise, it uses the global discovery mechanism via BaseEventSchema.resolve() based on event_type and schema_version.

Source code in omni_box/core/models/entities.py
def get_payload_as[S: BaseEventSchema](self, schema_cls: type[S] | None = None) -> S:
    """Resolve and parse the payload into a typed schema.

    If schema_cls is provided, it parses with it. Otherwise, it uses the global
    discovery mechanism via BaseEventSchema.resolve() based on event_type and schema_version.
    """
    final_schema_cls: type[S]
    if schema_cls is None:
        resolved_cls = BaseEventSchema.resolve(
            event_type=self.event_type,
            version=self.schema_version,
        )
        final_schema_cls = cast(type[S], resolved_cls)
    else:
        final_schema_cls = schema_cls

    return final_schema_cls.from_payload(self.payload)

truncate_error(error, max_bytes, suffix) staticmethod

Truncate error message to database limit (in bytes).

Source code in omni_box/core/models/entities.py
@staticmethod
def truncate_error(error: str, max_bytes: int, suffix: str) -> str:
    """Truncate error message to database limit (in bytes)."""
    if max_bytes < 1:
        raise ValueError(f"max_bytes must be >= 1, got {max_bytes}")

    stripped = error.strip()
    if not stripped:
        raise ValueError("Error message cannot be empty or whitespace")

    encoded = stripped.encode("utf-8")
    if len(encoded) <= max_bytes:
        return stripped

    suffix_encoded = suffix.encode("utf-8")
    if len(suffix_encoded) >= max_bytes:
        return encoded[:max_bytes].decode("utf-8", errors="ignore")

    keep_bytes = max_bytes - len(suffix_encoded)
    return encoded[:keep_bytes].decode("utf-8", errors="ignore") + suffix

validate_invariants()

Enforce business invariants across all fields.

Source code in omni_box/core/models/entities.py
def validate_invariants(self) -> Self:
    """Enforce business invariants across all fields."""
    self._validate_status_timing()
    self._validate_attempts()
    self._validate_lock()
    return self

validate_scheduled_at(v, info) classmethod

Ensure scheduled_at is within reasonable range.

Source code in omni_box/core/models/entities.py
@field_validator("scheduled_at")
@classmethod
def validate_scheduled_at(cls, v: datetime, info: ValidationInfo) -> datetime:
    """Ensure scheduled_at is within reasonable range."""
    created_at = info.data.get("created_at")
    if created_at is not None:
        skew_limit = DEFAULT_SCHEDULE_AT_SKEW_SECONDS
        max_future_seconds = DEFAULT_SCHEDULE_AT_MAX_FUTURE_SECONDS
        if info.context:
            skew_limit = info.context.get("scheduled_at_skew_seconds", DEFAULT_SCHEDULE_AT_SKEW_SECONDS)
            max_future_seconds = info.context.get(
                "scheduled_at_max_future_seconds", DEFAULT_SCHEDULE_AT_MAX_FUTURE_SECONDS
            )

        if skew_limit < 0:
            raise ValueError(f"scheduled_at_skew_seconds must be >= 0, got {skew_limit}")
        if max_future_seconds < 1:
            raise ValueError(f"scheduled_at_max_future_seconds must be >= 1, got {max_future_seconds}")

        if v < created_at and (created_at - v).total_seconds() > skew_limit:
            raise ValueError(f"scheduled_at {v} cannot be significantly before created_at {created_at}")

        if (v - created_at).total_seconds() > max_future_seconds:
            raise ValueError(
                f"scheduled_at {v} is too far in the future (max {max_future_seconds} seconds from creation)"
            )

    return v

validate_timezone(v) classmethod

Ensure all datetimes are timezone-aware and normalize to UTC.

Source code in omni_box/core/models/entities.py
@field_validator("created_at", "scheduled_at", "completed_at", "locked_at", mode="after")
@classmethod
def validate_timezone(cls, v: datetime | None) -> datetime | None:
    """Ensure all datetimes are timezone-aware and normalize to UTC."""
    if v is None:
        return v

    if v.tzinfo is None:
        raise ValueError("Datetime must be timezone-aware")

    # Normalize any timezone to UTC
    return v.astimezone(UTC)

InboxEventRepository

Bases: EventRepository[InboxEvent], Protocol

Specific protocol for Inbox events.

Source code in omni_box/core/protocols/repository.py
@runtime_checkable
class InboxEventRepository(EventRepository[InboxEvent], Protocol):
    """Specific protocol for Inbox events."""

    async def get_by_message_id(self, message_id: str, consumer_group: str) -> InboxEvent | None:
        """Get inbox event by external message identifier and consumer group."""
        ...

    async def exists(self, message_id: str, consumer_group: str) -> bool:
        """Check if message already exists in inbox."""
        ...

    async def has_completed_sibling_for_inbox_key(
        self, message_id: str, consumer_group: str, exclude_event_id: UUID
    ) -> bool:
        """True if another row with the same (message_id, consumer_group) is completed."""
        ...

capabilities property

Report optional features supported by this repository instance.

create(event) async

Persist a new event.

Source code in omni_box/core/protocols/repository.py
async def create(self, event: T) -> T:
    """Persist a new event."""
    ...

exists(message_id, consumer_group) async

Check if message already exists in inbox.

Source code in omni_box/core/protocols/repository.py
async def exists(self, message_id: str, consumer_group: str) -> bool:
    """Check if message already exists in inbox."""
    ...

fetch_pending(limit, **filters) async

Fetch pending events that are ready for processing.

Source code in omni_box/core/protocols/repository.py
async def fetch_pending(self, limit: PositiveInt, **filters: Unpack[FetchFilters]) -> list[T]:
    """Fetch pending events that are ready for processing."""
    ...

get_by_id(event_id) async

Fetch an event by its primary identifier.

Source code in omni_box/core/protocols/repository.py
async def get_by_id(self, event_id: UUID) -> T | None:
    """Fetch an event by its primary identifier."""
    ...

get_by_message_id(message_id, consumer_group) async

Get inbox event by external message identifier and consumer group.

Source code in omni_box/core/protocols/repository.py
async def get_by_message_id(self, message_id: str, consumer_group: str) -> InboxEvent | None:
    """Get inbox event by external message identifier and consumer group."""
    ...

has_completed_sibling_for_inbox_key(message_id, consumer_group, exclude_event_id) async

True if another row with the same (message_id, consumer_group) is completed.

Source code in omni_box/core/protocols/repository.py
async def has_completed_sibling_for_inbox_key(
    self, message_id: str, consumer_group: str, exclude_event_id: UUID
) -> bool:
    """True if another row with the same (message_id, consumer_group) is completed."""
    ...

mark_completed(event_id, worker_id) async

Mark an event as COMPLETED.

Source code in omni_box/core/protocols/repository.py
async def mark_completed(self, event_id: UUID, worker_id: str) -> None:
    """Mark an event as COMPLETED."""
    ...

mark_failed(event_id, error, worker_id, next_retry_at, count_as_attempt=True) async

Mark an event as FAILED or schedule a retry.

Source code in omni_box/core/protocols/repository.py
async def mark_failed(
    self, event_id: UUID, error: str, worker_id: str, next_retry_at: datetime | None, count_as_attempt: bool = True
) -> None:
    """Mark an event as FAILED or schedule a retry."""
    ...

mark_processing(event_id, worker_id) async

Manually mark an event as being processed (locking).

Source code in omni_box/core/protocols/repository.py
async def mark_processing(self, event_id: UUID, worker_id: str) -> bool:
    """Manually mark an event as being processed (locking)."""
    ...

InboxHandler

Bases: Protocol

Unified protocol for inbox event handlers.

**dependencies carries DI-resolved values injected by the runner (e.g. service instances, settings). It is typed as Any because each handler implementation chooses its own keyword set; static checking is enforced at the handler signature, not on the protocol.

Source code in omni_box/core/protocols/handlers.py
class InboxHandler[T: InboxEvent](Protocol):
    """Unified protocol for inbox event handlers.

    ``**dependencies`` carries DI-resolved values injected by the runner
    (e.g. service instances, settings).  It is typed as ``Any`` because each
    handler implementation chooses its own keyword set; static checking is
    enforced at the handler signature, not on the protocol.
    """

    async def __call__(
        self,
        event: T,
        repo: InboxEventRepository,
        **dependencies: Any,
    ) -> EventHandlerResult | None:
        """Process an inbox event."""
        ...

__call__(event, repo, **dependencies) async

Process an inbox event.

Source code in omni_box/core/protocols/handlers.py
async def __call__(
    self,
    event: T,
    repo: InboxEventRepository,
    **dependencies: Any,
) -> EventHandlerResult | None:
    """Process an inbox event."""
    ...

InboxMetrics

Bases: ProcessingMetrics, Protocol

Interface for inbox consumer metrics. All methods are sync.

Source code in omni_box/core/protocols/metrics.py
@runtime_checkable
class InboxMetrics(ProcessingMetrics, Protocol):
    """Interface for inbox consumer metrics. All methods are sync."""

    def inc_consumed(self, count: int = 1) -> None:
        """Increment the count of consumed broker messages."""
        ...

    def inc_committed(self, count: int = 1) -> None:
        """Increment the count of successful broker commits."""
        ...

    def inc_commit_failed(self, count: int = 1) -> None:
        """Increment the count of failed broker commits."""
        ...

inc_commit_failed(count=1)

Increment the count of failed broker commits.

Source code in omni_box/core/protocols/metrics.py
def inc_commit_failed(self, count: int = 1) -> None:
    """Increment the count of failed broker commits."""
    ...

inc_committed(count=1)

Increment the count of successful broker commits.

Source code in omni_box/core/protocols/metrics.py
def inc_committed(self, count: int = 1) -> None:
    """Increment the count of successful broker commits."""
    ...

inc_consumed(count=1)

Increment the count of consumed broker messages.

Source code in omni_box/core/protocols/metrics.py
def inc_consumed(self, count: int = 1) -> None:
    """Increment the count of consumed broker messages."""
    ...

inc_duplicate(count=1, event_type=None, status=None)

Increment count of duplicate/skipped events.

Source code in omni_box/core/protocols/metrics.py
def inc_duplicate(self, count: int = 1, event_type: str | None = None, status: str | None = None) -> None:
    """Increment count of duplicate/skipped events."""
    ...

inc_failed(count=1, event_type=None, status=None)

Increment count of failed processing attempts.

Source code in omni_box/core/protocols/metrics.py
def inc_failed(self, count: int = 1, event_type: str | None = None, status: str | None = None) -> None:
    """Increment count of failed processing attempts."""
    ...

inc_processed(count=1, event_type=None, status=None)

Increment count of successfully processed events.

Source code in omni_box/core/protocols/metrics.py
def inc_processed(self, count: int = 1, event_type: str | None = None, status: str | None = None) -> None:
    """Increment count of successfully processed events."""
    ...

observe_handler_duration(seconds, event_type=None)

Observe processing duration.

Source code in omni_box/core/protocols/metrics.py
def observe_handler_duration(self, seconds: float, event_type: str | None = None) -> None:
    """Observe processing duration."""
    ...

InboxPersistError

Bases: OmniBoxError

Raised when the inbox consumer fails to persist a consumed message.

Typically means the underlying transaction was rolled back (DB outage, integrity violation, lock conflict). The broker offset is intentionally not committed so the message can be redelivered.

Source code in omni_box/core/exceptions.py
class InboxPersistError(OmniBoxError):
    """Raised when the inbox consumer fails to persist a consumed message.

    Typically means the underlying transaction was rolled back (DB outage,
    integrity violation, lock conflict). The broker offset is intentionally
    not committed so the message can be redelivered.
    """

    def __init__(self, message_id: str, cause: BaseException | None = None) -> None:
        self.message_id = message_id
        self.cause = cause
        super().__init__(f"Failed to persist inbox event for message {message_id!r}")

InvalidEventStateError

Bases: OmniBoxError

Raised when an operation is performed on an event in an invalid state.

Source code in omni_box/core/exceptions.py
class InvalidEventStateError(OmniBoxError):
    """Raised when an operation is performed on an event in an invalid state."""

    def __init__(
        self, event_id: UUID, current_status: str, expected_statuses: list[str], message: str | None = None
    ) -> None:
        self.event_id = event_id
        self.current_status = current_status
        self.expected_statuses = expected_statuses
        if message:
            super().__init__(f"Event {event_id}: {message}")
        else:
            super().__init__(
                f"Event {event_id} is in state '{current_status}', expected one of: {', '.join(expected_statuses)}"
            )

NullAckHandle

Bases: AckHandle

No-op ack handle.

Source code in omni_box/core/protocols/consumers.py
class NullAckHandle(AckHandle):
    """No-op ack handle."""

    async def commit(self) -> None:
        return

OmniBoxDomainService

Standardized way to create and manage events with sensible defaults.

Source code in omni_box/core/services/domain.py
class OmniBoxDomainService:
    """Standardized way to create and manage events with sensible defaults."""

    def __init__(
        self,
        *,
        max_attempts: int = DEFAULT_MAX_ATTEMPTS,
        scheduled_at_skew_seconds: int = DEFAULT_SCHEDULE_AT_SKEW_SECONDS,
        payload_max_bytes: int = DEFAULT_PAYLOAD_MAX_BYTES,
        headers_max_count: int = DEFAULT_HEADERS_MAX_COUNT,
        header_key_max_length: int = DEFAULT_HEADER_KEY_MAX_LENGTH,
        header_value_max_length: int = DEFAULT_HEADER_VALUE_MAX_LENGTH,
        scheduled_at_max_future_seconds: int = DEFAULT_SCHEDULE_AT_MAX_FUTURE_SECONDS,
        last_error_max_length: int = LAST_ERROR_MAX_LENGTH,
        truncation_suffix: str = DEFAULT_TRUNCATION_SUFFIX,
    ) -> None:
        self.max_attempts = max_attempts
        self.scheduled_at_skew_seconds = scheduled_at_skew_seconds
        self.payload_max_bytes = payload_max_bytes
        self.headers_max_count = headers_max_count
        self.header_key_max_length = header_key_max_length
        self.header_value_max_length = header_value_max_length
        self.scheduled_at_max_future_seconds = scheduled_at_max_future_seconds
        self.last_error_max_length = last_error_max_length
        self.truncation_suffix = truncation_suffix

    @property
    def validation_context(self) -> dict[str, int]:
        return {
            "scheduled_at_skew_seconds": self.scheduled_at_skew_seconds,
            "scheduled_at_max_future_seconds": self.scheduled_at_max_future_seconds,
            "payload_max_bytes": self.payload_max_bytes,
            "headers_max_count": self.headers_max_count,
            "header_key_max_length": self.header_key_max_length,
            "header_value_max_length": self.header_value_max_length,
        }

    def create_outbox_event(
        self,
        aggregate_type: str,
        aggregate_id: UUID,
        event_type: str,
        topic: str,
        partition_key: str,
        payload: dict[str, Any],
        *,
        headers: dict[str, str] | None = None,
        max_attempts: int | None = None,
        trace_id: str | None = None,
        idempotency_key: str | None = None,
        correlation_id: str | None = None,
        causation_id: str | None = None,
        schema_version: str | None = None,
        scheduled_at: datetime | None = None,
    ) -> OutboxEvent:
        event_data = {
            "id": uuid4(),
            "aggregate_type": aggregate_type.strip(),
            "aggregate_id": aggregate_id,
            "event_type": event_type.strip(),
            "topic": topic.strip(),
            "partition_key": partition_key.strip(),
            "payload": to_jsonable_python(payload),
            "headers": headers,
            "attempts_made": 0,
            "max_attempts": max_attempts or self.max_attempts,
            "trace_id": trace_id,
            "idempotency_key": idempotency_key,
            "correlation_id": correlation_id,
            "causation_id": causation_id,
            "schema_version": schema_version,
            "status": EventStatus.PENDING,
        }
        if scheduled_at:
            event_data["scheduled_at"] = scheduled_at

        return OutboxEvent.model_validate(event_data, context=self.validation_context)

    def create_inbox_event(
        self,
        message_id: str,
        consumer_group: str,
        source: str,
        event_type: str,
        payload: dict[str, Any],
        *,
        headers: dict[str, str] | None = None,
        trace_id: str | None = None,
        correlation_id: str | None = None,
        causation_id: str | None = None,
        schema_version: str | None = None,
    ) -> InboxEvent:
        event_data = {
            "id": uuid4(),
            "message_id": message_id.strip(),
            "consumer_group": consumer_group.strip(),
            "source": source.strip(),
            "event_type": event_type.strip(),
            "payload": to_jsonable_python(payload),
            "headers": headers,
            "attempts_made": 0,
            "max_attempts": self.max_attempts,
            "trace_id": trace_id,
            "correlation_id": correlation_id,
            "causation_id": causation_id,
            "schema_version": schema_version,
            "status": EventStatus.PENDING,
        }
        return InboxEvent.model_validate(event_data, context=self.validation_context)

    def is_lock_stale(self, event: BaseEvent, now: datetime, stale_timeout_seconds: float | None = None) -> bool:
        """Check if the current lock is stale."""
        if not event.locked_at:
            return False

        timeout = stale_timeout_seconds if stale_timeout_seconds is not None else float(DEFAULT_LEASE_TIMEOUT_SECONDS)
        if timeout <= 0:
            raise ValueError(f"stale_timeout_seconds must be positive, got {timeout}")

        if now.tzinfo is None:
            raise ValueError("now must be timezone-aware")

        delta = now.astimezone(UTC) - event.locked_at
        return delta.total_seconds() > timeout

    def assert_locked_by(self, event: BaseEvent, worker_id: str) -> None:
        """Assert that the event is locked by the specified worker."""
        normalized_worker_id = worker_id.strip()
        if not normalized_worker_id:
            raise ValueError("worker_id cannot be empty or whitespace")

        if not event.locked_at:
            raise EventNotLockedError(event.id)
        if event.locked_by != normalized_worker_id:
            raise EventLockedByAnotherWorkerError(event.id, event.locked_by, normalized_worker_id)

    def lock_event[TE: BaseEvent](self, event: TE, worker_id: str, locked_at: datetime) -> TE:
        """Lock the event for processing."""
        if event.status != EventStatus.PENDING:
            raise InvalidEventStateError(
                event.id,
                event.status,
                [EventStatus.PENDING],
                "Only PENDING events can be locked",
            )

        normalized_worker_id = worker_id.strip()
        if not normalized_worker_id:
            raise ValueError("worker_id cannot be empty or whitespace")

        if event.locked_at:
            raise EventAlreadyLockedError(event.id, event.locked_by)

        if locked_at.tzinfo is None:
            raise ValueError("locked_at must be timezone-aware")

        return self._copy_event_with_update(
            event,
            {
                "locked_at": locked_at.astimezone(UTC),
                "locked_by": normalized_worker_id,
            },
        )

    def refresh_event_lock[TE: BaseEvent](self, event: TE, worker_id: str, now: datetime) -> TE:
        """Extend the current lock time."""
        if event.status != EventStatus.PENDING:
            raise InvalidEventStateError(
                event.id,
                event.status,
                [EventStatus.PENDING],
                "Only PENDING events can have their lock refreshed",
            )

        self.assert_locked_by(event, worker_id)
        if now.tzinfo is None:
            raise ValueError("now must be timezone-aware")

        return self._copy_event_with_update(event, {"locked_at": now.astimezone(UTC)})

    def unlock_event[TE: BaseEvent](self, event: TE, worker_id: str) -> TE:
        """Release the lock on the event."""
        if event.status != EventStatus.PENDING:
            raise InvalidEventStateError(
                event.id,
                event.status,
                [EventStatus.PENDING],
                "Only PENDING events can be unlocked",
            )

        self.assert_locked_by(event, worker_id)

        return self._copy_event_with_update(event, {"locked_at": None, "locked_by": None})

    def force_unlock_event[TE: BaseEvent](self, event: TE, reason: str) -> TE:
        """Forcefully release the lock without owner verification."""
        if not event.locked_at:
            raise EventNotLockedError(event.id)

        normalized_reason = reason.strip()
        if not normalized_reason:
            raise ValueError("Reason for force unlock cannot be empty or whitespace")

        if len(normalized_reason) > FORCE_UNLOCK_REASON_MAX_LENGTH:
            raise ValueError(
                f"Reason for force unlock is too long: {len(normalized_reason)} "
                f"(max {FORCE_UNLOCK_REASON_MAX_LENGTH} chars)"
            )

        return self._copy_event_with_update(
            event,
            {
                "locked_at": None,
                "locked_by": None,
                "last_error": f"Administrative force unlock: {normalized_reason}",
            },
        )

    def mark_event_completed[TE: BaseEvent](self, event: TE, completed_at: datetime, worker_id: str) -> TE:
        """Mark event as successfully completed."""
        self.assert_locked_by(event, worker_id)

        if event.status == EventStatus.COMPLETED:  # pragma: no cover
            raise InvalidEventStateError(event.id, event.status, [EventStatus.PENDING], "Event is already completed")
        if event.status == EventStatus.FAILED:  # pragma: no cover
            raise InvalidEventStateError(event.id, event.status, [EventStatus.PENDING])

        if completed_at.tzinfo is None:
            raise ValueError("completed_at must be timezone-aware")

        comp_at_utc = completed_at.astimezone(UTC)
        if comp_at_utc < event.created_at:
            raise ValueError(f"completed_at {comp_at_utc} cannot be before created_at {event.created_at}")

        if comp_at_utc < event.scheduled_at:
            raise ValueError(f"completed_at {comp_at_utc} cannot be before scheduled_at {event.scheduled_at}")

        return self._copy_event_with_update(
            event,
            {
                "status": EventStatus.COMPLETED,
                "completed_at": comp_at_utc,
                "locked_at": None,
                "locked_by": None,
            },
        )

    def mark_event_failed[TE: BaseEvent](
        self,
        event: TE,
        error: str,
        worker_id: str,
        count_as_attempt: bool = True,
        next_retry_at: datetime | None = None,
    ) -> TE:
        """Mark event as failed with error details."""
        if not error.strip():
            raise ValueError("Error message cannot be empty or whitespace")

        if not count_as_attempt and next_retry_at is None:
            raise ValueError("next_retry_at must be provided if not counting as attempt")

        self.assert_locked_by(event, worker_id)

        if event.status == EventStatus.COMPLETED:  # pragma: no cover
            raise InvalidEventStateError(event.id, event.status, [EventStatus.PENDING])
        if event.status == EventStatus.FAILED:  # pragma: no cover
            raise InvalidEventStateError(
                event.id, event.status, [EventStatus.PENDING], "Event is already in FAILED state"
            )

        new_attempts_made = event.attempts_made + (1 if count_as_attempt else 0)
        if new_attempts_made > event.max_attempts:  # pragma: no cover
            raise ValueError(f"Cannot increment attempts_made beyond max_attempts ({event.max_attempts})")

        truncated_error = BaseEvent.truncate_error(
            error, max_bytes=self.last_error_max_length, suffix=self.truncation_suffix
        )
        new_status = EventStatus.FAILED if new_attempts_made == event.max_attempts else EventStatus.PENDING

        update_data: dict[str, object] = {
            "status": new_status,
            "attempts_made": new_attempts_made,
            "last_error": truncated_error,
            "locked_at": None,
            "locked_by": None,
        }
        if next_retry_at is not None:
            if next_retry_at.tzinfo is None:
                raise ValueError("next_retry_at must be timezone-aware")
            next_retry_at_utc = next_retry_at.astimezone(UTC)

            if next_retry_at_utc < event.created_at:
                raise ValueError(f"next_retry_at {next_retry_at_utc} cannot be before created_at {event.created_at}")

            if (next_retry_at_utc - event.created_at).total_seconds() > self.scheduled_at_max_future_seconds:
                raise ValueError(f"next_retry_at {next_retry_at_utc} is too far in the future")
            update_data["scheduled_at"] = next_retry_at_utc

        return self._copy_event_with_update(event, update_data)

    def _copy_event_with_update[TE: BaseEvent](self, event: TE, update: dict[str, object]) -> TE:
        """Create a copy of the event with updated fields."""
        new_data = event.model_dump(mode="python")
        new_data.update(update)
        return type(event).model_validate(new_data, context=self.validation_context)

assert_locked_by(event, worker_id)

Assert that the event is locked by the specified worker.

Source code in omni_box/core/services/domain.py
def assert_locked_by(self, event: BaseEvent, worker_id: str) -> None:
    """Assert that the event is locked by the specified worker."""
    normalized_worker_id = worker_id.strip()
    if not normalized_worker_id:
        raise ValueError("worker_id cannot be empty or whitespace")

    if not event.locked_at:
        raise EventNotLockedError(event.id)
    if event.locked_by != normalized_worker_id:
        raise EventLockedByAnotherWorkerError(event.id, event.locked_by, normalized_worker_id)

force_unlock_event(event, reason)

Forcefully release the lock without owner verification.

Source code in omni_box/core/services/domain.py
def force_unlock_event[TE: BaseEvent](self, event: TE, reason: str) -> TE:
    """Forcefully release the lock without owner verification."""
    if not event.locked_at:
        raise EventNotLockedError(event.id)

    normalized_reason = reason.strip()
    if not normalized_reason:
        raise ValueError("Reason for force unlock cannot be empty or whitespace")

    if len(normalized_reason) > FORCE_UNLOCK_REASON_MAX_LENGTH:
        raise ValueError(
            f"Reason for force unlock is too long: {len(normalized_reason)} "
            f"(max {FORCE_UNLOCK_REASON_MAX_LENGTH} chars)"
        )

    return self._copy_event_with_update(
        event,
        {
            "locked_at": None,
            "locked_by": None,
            "last_error": f"Administrative force unlock: {normalized_reason}",
        },
    )

is_lock_stale(event, now, stale_timeout_seconds=None)

Check if the current lock is stale.

Source code in omni_box/core/services/domain.py
def is_lock_stale(self, event: BaseEvent, now: datetime, stale_timeout_seconds: float | None = None) -> bool:
    """Check if the current lock is stale."""
    if not event.locked_at:
        return False

    timeout = stale_timeout_seconds if stale_timeout_seconds is not None else float(DEFAULT_LEASE_TIMEOUT_SECONDS)
    if timeout <= 0:
        raise ValueError(f"stale_timeout_seconds must be positive, got {timeout}")

    if now.tzinfo is None:
        raise ValueError("now must be timezone-aware")

    delta = now.astimezone(UTC) - event.locked_at
    return delta.total_seconds() > timeout

lock_event(event, worker_id, locked_at)

Lock the event for processing.

Source code in omni_box/core/services/domain.py
def lock_event[TE: BaseEvent](self, event: TE, worker_id: str, locked_at: datetime) -> TE:
    """Lock the event for processing."""
    if event.status != EventStatus.PENDING:
        raise InvalidEventStateError(
            event.id,
            event.status,
            [EventStatus.PENDING],
            "Only PENDING events can be locked",
        )

    normalized_worker_id = worker_id.strip()
    if not normalized_worker_id:
        raise ValueError("worker_id cannot be empty or whitespace")

    if event.locked_at:
        raise EventAlreadyLockedError(event.id, event.locked_by)

    if locked_at.tzinfo is None:
        raise ValueError("locked_at must be timezone-aware")

    return self._copy_event_with_update(
        event,
        {
            "locked_at": locked_at.astimezone(UTC),
            "locked_by": normalized_worker_id,
        },
    )

mark_event_completed(event, completed_at, worker_id)

Mark event as successfully completed.

Source code in omni_box/core/services/domain.py
def mark_event_completed[TE: BaseEvent](self, event: TE, completed_at: datetime, worker_id: str) -> TE:
    """Mark event as successfully completed."""
    self.assert_locked_by(event, worker_id)

    if event.status == EventStatus.COMPLETED:  # pragma: no cover
        raise InvalidEventStateError(event.id, event.status, [EventStatus.PENDING], "Event is already completed")
    if event.status == EventStatus.FAILED:  # pragma: no cover
        raise InvalidEventStateError(event.id, event.status, [EventStatus.PENDING])

    if completed_at.tzinfo is None:
        raise ValueError("completed_at must be timezone-aware")

    comp_at_utc = completed_at.astimezone(UTC)
    if comp_at_utc < event.created_at:
        raise ValueError(f"completed_at {comp_at_utc} cannot be before created_at {event.created_at}")

    if comp_at_utc < event.scheduled_at:
        raise ValueError(f"completed_at {comp_at_utc} cannot be before scheduled_at {event.scheduled_at}")

    return self._copy_event_with_update(
        event,
        {
            "status": EventStatus.COMPLETED,
            "completed_at": comp_at_utc,
            "locked_at": None,
            "locked_by": None,
        },
    )

mark_event_failed(event, error, worker_id, count_as_attempt=True, next_retry_at=None)

Mark event as failed with error details.

Source code in omni_box/core/services/domain.py
def mark_event_failed[TE: BaseEvent](
    self,
    event: TE,
    error: str,
    worker_id: str,
    count_as_attempt: bool = True,
    next_retry_at: datetime | None = None,
) -> TE:
    """Mark event as failed with error details."""
    if not error.strip():
        raise ValueError("Error message cannot be empty or whitespace")

    if not count_as_attempt and next_retry_at is None:
        raise ValueError("next_retry_at must be provided if not counting as attempt")

    self.assert_locked_by(event, worker_id)

    if event.status == EventStatus.COMPLETED:  # pragma: no cover
        raise InvalidEventStateError(event.id, event.status, [EventStatus.PENDING])
    if event.status == EventStatus.FAILED:  # pragma: no cover
        raise InvalidEventStateError(
            event.id, event.status, [EventStatus.PENDING], "Event is already in FAILED state"
        )

    new_attempts_made = event.attempts_made + (1 if count_as_attempt else 0)
    if new_attempts_made > event.max_attempts:  # pragma: no cover
        raise ValueError(f"Cannot increment attempts_made beyond max_attempts ({event.max_attempts})")

    truncated_error = BaseEvent.truncate_error(
        error, max_bytes=self.last_error_max_length, suffix=self.truncation_suffix
    )
    new_status = EventStatus.FAILED if new_attempts_made == event.max_attempts else EventStatus.PENDING

    update_data: dict[str, object] = {
        "status": new_status,
        "attempts_made": new_attempts_made,
        "last_error": truncated_error,
        "locked_at": None,
        "locked_by": None,
    }
    if next_retry_at is not None:
        if next_retry_at.tzinfo is None:
            raise ValueError("next_retry_at must be timezone-aware")
        next_retry_at_utc = next_retry_at.astimezone(UTC)

        if next_retry_at_utc < event.created_at:
            raise ValueError(f"next_retry_at {next_retry_at_utc} cannot be before created_at {event.created_at}")

        if (next_retry_at_utc - event.created_at).total_seconds() > self.scheduled_at_max_future_seconds:
            raise ValueError(f"next_retry_at {next_retry_at_utc} is too far in the future")
        update_data["scheduled_at"] = next_retry_at_utc

    return self._copy_event_with_update(event, update_data)

refresh_event_lock(event, worker_id, now)

Extend the current lock time.

Source code in omni_box/core/services/domain.py
def refresh_event_lock[TE: BaseEvent](self, event: TE, worker_id: str, now: datetime) -> TE:
    """Extend the current lock time."""
    if event.status != EventStatus.PENDING:
        raise InvalidEventStateError(
            event.id,
            event.status,
            [EventStatus.PENDING],
            "Only PENDING events can have their lock refreshed",
        )

    self.assert_locked_by(event, worker_id)
    if now.tzinfo is None:
        raise ValueError("now must be timezone-aware")

    return self._copy_event_with_update(event, {"locked_at": now.astimezone(UTC)})

unlock_event(event, worker_id)

Release the lock on the event.

Source code in omni_box/core/services/domain.py
def unlock_event[TE: BaseEvent](self, event: TE, worker_id: str) -> TE:
    """Release the lock on the event."""
    if event.status != EventStatus.PENDING:
        raise InvalidEventStateError(
            event.id,
            event.status,
            [EventStatus.PENDING],
            "Only PENDING events can be unlocked",
        )

    self.assert_locked_by(event, worker_id)

    return self._copy_event_with_update(event, {"locked_at": None, "locked_by": None})

OmniBoxError

Bases: Exception

Base exception for omni-box-related errors.

Source code in omni_box/core/exceptions.py
class OmniBoxError(Exception):
    """Base exception for omni-box-related errors."""

OmniBoxMaintenanceService

Core service for omni-box maintenance operations.

Source code in omni_box/core/services/maintenance.py
class OmniBoxMaintenanceService:
    """Core service for omni-box maintenance operations."""

    def __init__(self, repo: EventRepository) -> None:
        self._repo = repo
        capabilities = getattr(repo, "capabilities", None)
        self._has_retention = (
            isinstance(capabilities, RepositoryCapabilities) and capabilities.supports_retention
        ) or isinstance(repo, SupportsRetentionPolicies)

    async def release_stale_locks(
        self,
        stale_timeout_seconds: PositiveNumber,
    ) -> int:
        if not self._has_retention:
            raise UnsupportedCapabilityError(
                capability="SupportsRetentionPolicies",
                repo_type=type(self._repo).__name__,
            )

        released_count = 0

        try:
            released_count = int(await self._repo.release_stale_locks(int(stale_timeout_seconds)))  # type: ignore[attr-defined]

            if released_count > 0:
                logger.info("Released stale locks", count=released_count, timeout_seconds=stale_timeout_seconds)
            else:
                logger.debug("No stale locks to release")
        except Exception:
            logger.exception("Failed to release stale locks", timeout_seconds=stale_timeout_seconds)
            raise
        return released_count

    async def cleanup_old_events(
        self,
        retention_days: PositiveInt,
        batch_size: int = DEFAULT_CLEANUP_BATCH_SIZE,
        max_iterations: int = DEFAULT_MAINTENANCE_MAX_ITERATIONS,
    ) -> int:
        if not self._has_retention:
            raise UnsupportedCapabilityError(
                capability="SupportsRetentionPolicies",
                repo_type=type(self._repo).__name__,
            )

        total_deleted = 0

        try:
            iteration = 0
            while iteration < max_iterations:
                deleted_count = int(
                    await self._repo.delete_old_completed(retention_days, batch_size=batch_size)  # type: ignore[attr-defined]
                )

                if deleted_count == 0:
                    break

                total_deleted += deleted_count
                iteration += 1

            if iteration >= max_iterations:
                logger.warning(
                    "Cleanup reached maximum iterations",
                    max_iterations=max_iterations,
                    total_deleted=total_deleted,
                )

            if total_deleted > 0:
                logger.info("Deleted old events", count=total_deleted, retention_days=retention_days)
            else:
                logger.debug("No old events to delete")
        except Exception:
            logger.exception("Failed to delete old events", retention_days=retention_days)
            raise
        return total_deleted

OutboxEvent

Bases: BaseEvent

Domain entity for events being sent from the application (Outbox).

Adds aggregate context and routing information required for publishing events to external brokers.

Source code in omni_box/core/models/entities.py
class OutboxEvent(BaseEvent):
    """Domain entity for events being sent from the application (Outbox).

    Adds aggregate context and routing information required for publishing
    events to external brokers.
    """

    # Outbox-specific Identifiers
    aggregate_type: StrippedNonEmptyStr = Field(max_length=AGGREGATE_TYPE_MAX_LENGTH)
    aggregate_id: UUID = Field(...)

    # Routing
    topic: StrippedNonEmptyStr = Field(max_length=TOPIC_MAX_LENGTH)
    partition_key: StrippedNonEmptyStr = Field(max_length=PARTITION_KEY_MAX_LENGTH)

attempts_left property

Number of attempts remaining before FAILED status.

can_retry property

Check if the event can be retried.

failure_count property

Number of attempts made when in FAILED status.

is_locked property

Check if the event is currently locked.

truncate_error(error, max_bytes, suffix) staticmethod

Truncate error message to database limit (in bytes).

Source code in omni_box/core/models/entities.py
@staticmethod
def truncate_error(error: str, max_bytes: int, suffix: str) -> str:
    """Truncate error message to database limit (in bytes)."""
    if max_bytes < 1:
        raise ValueError(f"max_bytes must be >= 1, got {max_bytes}")

    stripped = error.strip()
    if not stripped:
        raise ValueError("Error message cannot be empty or whitespace")

    encoded = stripped.encode("utf-8")
    if len(encoded) <= max_bytes:
        return stripped

    suffix_encoded = suffix.encode("utf-8")
    if len(suffix_encoded) >= max_bytes:
        return encoded[:max_bytes].decode("utf-8", errors="ignore")

    keep_bytes = max_bytes - len(suffix_encoded)
    return encoded[:keep_bytes].decode("utf-8", errors="ignore") + suffix

validate_invariants()

Enforce business invariants across all fields.

Source code in omni_box/core/models/entities.py
def validate_invariants(self) -> Self:
    """Enforce business invariants across all fields."""
    self._validate_status_timing()
    self._validate_attempts()
    self._validate_lock()
    return self

validate_scheduled_at(v, info) classmethod

Ensure scheduled_at is within reasonable range.

Source code in omni_box/core/models/entities.py
@field_validator("scheduled_at")
@classmethod
def validate_scheduled_at(cls, v: datetime, info: ValidationInfo) -> datetime:
    """Ensure scheduled_at is within reasonable range."""
    created_at = info.data.get("created_at")
    if created_at is not None:
        skew_limit = DEFAULT_SCHEDULE_AT_SKEW_SECONDS
        max_future_seconds = DEFAULT_SCHEDULE_AT_MAX_FUTURE_SECONDS
        if info.context:
            skew_limit = info.context.get("scheduled_at_skew_seconds", DEFAULT_SCHEDULE_AT_SKEW_SECONDS)
            max_future_seconds = info.context.get(
                "scheduled_at_max_future_seconds", DEFAULT_SCHEDULE_AT_MAX_FUTURE_SECONDS
            )

        if skew_limit < 0:
            raise ValueError(f"scheduled_at_skew_seconds must be >= 0, got {skew_limit}")
        if max_future_seconds < 1:
            raise ValueError(f"scheduled_at_max_future_seconds must be >= 1, got {max_future_seconds}")

        if v < created_at and (created_at - v).total_seconds() > skew_limit:
            raise ValueError(f"scheduled_at {v} cannot be significantly before created_at {created_at}")

        if (v - created_at).total_seconds() > max_future_seconds:
            raise ValueError(
                f"scheduled_at {v} is too far in the future (max {max_future_seconds} seconds from creation)"
            )

    return v

validate_timezone(v) classmethod

Ensure all datetimes are timezone-aware and normalize to UTC.

Source code in omni_box/core/models/entities.py
@field_validator("created_at", "scheduled_at", "completed_at", "locked_at", mode="after")
@classmethod
def validate_timezone(cls, v: datetime | None) -> datetime | None:
    """Ensure all datetimes are timezone-aware and normalize to UTC."""
    if v is None:
        return v

    if v.tzinfo is None:
        raise ValueError("Datetime must be timezone-aware")

    # Normalize any timezone to UTC
    return v.astimezone(UTC)

OutboxEventRepository

Bases: EventRepository[OutboxEvent], Protocol

Specific protocol for Outbox events.

Source code in omni_box/core/protocols/repository.py
@runtime_checkable
class OutboxEventRepository(EventRepository[OutboxEvent], Protocol):
    """Specific protocol for Outbox events."""

capabilities property

Report optional features supported by this repository instance.

create(event) async

Persist a new event.

Source code in omni_box/core/protocols/repository.py
async def create(self, event: T) -> T:
    """Persist a new event."""
    ...

fetch_pending(limit, **filters) async

Fetch pending events that are ready for processing.

Source code in omni_box/core/protocols/repository.py
async def fetch_pending(self, limit: PositiveInt, **filters: Unpack[FetchFilters]) -> list[T]:
    """Fetch pending events that are ready for processing."""
    ...

get_by_id(event_id) async

Fetch an event by its primary identifier.

Source code in omni_box/core/protocols/repository.py
async def get_by_id(self, event_id: UUID) -> T | None:
    """Fetch an event by its primary identifier."""
    ...

mark_completed(event_id, worker_id) async

Mark an event as COMPLETED.

Source code in omni_box/core/protocols/repository.py
async def mark_completed(self, event_id: UUID, worker_id: str) -> None:
    """Mark an event as COMPLETED."""
    ...

mark_failed(event_id, error, worker_id, next_retry_at, count_as_attempt=True) async

Mark an event as FAILED or schedule a retry.

Source code in omni_box/core/protocols/repository.py
async def mark_failed(
    self, event_id: UUID, error: str, worker_id: str, next_retry_at: datetime | None, count_as_attempt: bool = True
) -> None:
    """Mark an event as FAILED or schedule a retry."""
    ...

mark_processing(event_id, worker_id) async

Manually mark an event as being processed (locking).

Source code in omni_box/core/protocols/repository.py
async def mark_processing(self, event_id: UUID, worker_id: str) -> bool:
    """Manually mark an event as being processed (locking)."""
    ...

OutboxMetrics

Bases: ProcessingMetrics, Protocol

Interface for outbox processing metrics. All methods are sync.

Source code in omni_box/core/protocols/metrics.py
@runtime_checkable
class OutboxMetrics(ProcessingMetrics, Protocol):
    """Interface for outbox processing metrics. All methods are sync."""

    def set_locked_batch_size(self, value: int) -> None:
        """Set the number of events locked in the current batch."""
        ...

    def inc_published(self, count: int = 1, event_type: str | None = None, status: str | None = None) -> None:
        """Alias for inc_processed in outbox context."""
        ...

inc_duplicate(count=1, event_type=None, status=None)

Increment count of duplicate/skipped events.

Source code in omni_box/core/protocols/metrics.py
def inc_duplicate(self, count: int = 1, event_type: str | None = None, status: str | None = None) -> None:
    """Increment count of duplicate/skipped events."""
    ...

inc_failed(count=1, event_type=None, status=None)

Increment count of failed processing attempts.

Source code in omni_box/core/protocols/metrics.py
def inc_failed(self, count: int = 1, event_type: str | None = None, status: str | None = None) -> None:
    """Increment count of failed processing attempts."""
    ...

inc_processed(count=1, event_type=None, status=None)

Increment count of successfully processed events.

Source code in omni_box/core/protocols/metrics.py
def inc_processed(self, count: int = 1, event_type: str | None = None, status: str | None = None) -> None:
    """Increment count of successfully processed events."""
    ...

inc_published(count=1, event_type=None, status=None)

Alias for inc_processed in outbox context.

Source code in omni_box/core/protocols/metrics.py
def inc_published(self, count: int = 1, event_type: str | None = None, status: str | None = None) -> None:
    """Alias for inc_processed in outbox context."""
    ...

observe_handler_duration(seconds, event_type=None)

Observe processing duration.

Source code in omni_box/core/protocols/metrics.py
def observe_handler_duration(self, seconds: float, event_type: str | None = None) -> None:
    """Observe processing duration."""
    ...

set_locked_batch_size(value)

Set the number of events locked in the current batch.

Source code in omni_box/core/protocols/metrics.py
def set_locked_batch_size(self, value: int) -> None:
    """Set the number of events locked in the current batch."""
    ...

OutboxPublisher

High-level service for publishing outbox events to a message broker.

Source code in omni_box/application/services/publish.py
class OutboxPublisher:
    """High-level service for publishing outbox events to a message broker."""

    def __init__(
        self,
        repo: OutboxEventRepository,
        broker: EventPublisher,
        metrics: OutboxMetrics | None = None,
        publish_timeout: float = DEFAULT_PUBLISH_TIMEOUT_SECONDS,
        concurrency_limit: int | None = None,
    ) -> None:
        self._repo = repo
        self._broker = broker
        self._metrics = metrics or NoOpOutboxMetrics()
        self._processor = create_outbox_processor(
            repo=repo,
            publisher=broker,
            metrics=self._metrics,
            job_name="outbox_publisher",
            publish_timeout=publish_timeout,
        )
        self._semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit else None

    async def publish_batch(
        self,
        worker_id: str,
        batch_size: PositiveInt,
        shutdown_requested_func: Callable[[], bool] | None = None,
        **fetch_filters: Unpack[FetchFilters],
    ) -> BatchProcessingResult[OutboxEvent]:
        if self._semaphore:
            async with self._semaphore:
                return await self._processor.process_batch(
                    worker_id=worker_id,
                    batch_size=batch_size,
                    shutdown_requested_func=shutdown_requested_func,
                    **fetch_filters,
                )

        return await self._processor.process_batch(
            worker_id=worker_id,
            batch_size=batch_size,
            shutdown_requested_func=shutdown_requested_func,
            **fetch_filters,
        )

RepositoryCapabilities dataclass

Flags for optional repository capabilities.

Source code in omni_box/core/protocols/repository.py
@dataclass(frozen=True, slots=True)
class RepositoryCapabilities:
    """Flags for optional repository capabilities."""

    supports_bulk: bool = False
    supports_distributed_locking: bool = False
    supports_retention: bool = False

StorageConnectionError

Bases: StorageError

Connection to storage failed.

Source code in omni_box/core/exceptions.py
class StorageConnectionError(StorageError):
    """Connection to storage failed."""

StorageError

Bases: OmniBoxError

Base exception for storage operations.

Source code in omni_box/core/exceptions.py
class StorageError(OmniBoxError):
    """Base exception for storage operations."""

StorageIntegrityError

Bases: StorageError

Data integrity constraint violation.

Source code in omni_box/core/exceptions.py
class StorageIntegrityError(StorageError):
    """Data integrity constraint violation."""

StorageTimeoutError

Bases: StorageError

Storage operation timed out.

Source code in omni_box/core/exceptions.py
class StorageTimeoutError(StorageError):
    """Storage operation timed out."""

StorageTransactionError

Bases: StorageError

Transaction operation failed.

Source code in omni_box/core/exceptions.py
class StorageTransactionError(StorageError):
    """Transaction operation failed."""

UnsupportedCapabilityError

Bases: OmniBoxError

Raised when a required repository capability is not available.

Source code in omni_box/core/exceptions.py
class UnsupportedCapabilityError(OmniBoxError):
    """Raised when a required repository capability is not available."""

    def __init__(self, capability: str, repo_type: str) -> None:
        self.capability = capability
        self.repo_type = repo_type
        super().__init__(
            f"Repository {repo_type} does not support {capability}. "
            f"Use a repository that implements the required protocol or disable this feature."
        )

create_dispatching_processor(repo, router, *, filter_sources=None, skip_duplicate_siblings=True, process_timeout=DEFAULT_PROCESS_TIMEOUT_SECONDS, dependencies=None, metrics=None, dlq_storage=None, enable_otel=False, enable_circuit_breaker=False, circuit_breaker_failure_threshold=5, circuit_breaker_recovery_timeout=60, job_name='dispatching_processor', additional_steps_before=None, additional_steps_after=None)

Preset factory for inbox processing with automated event routing.

Source code in omni_box/application/factories.py
def create_dispatching_processor(
    repo: InboxEventRepository,
    router: EventRouter,
    *,
    filter_sources: list[str] | None = None,
    skip_duplicate_siblings: bool = True,
    process_timeout: float = DEFAULT_PROCESS_TIMEOUT_SECONDS,
    dependencies: dict[str, object] | None = None,
    metrics: InboxMetrics | None = None,
    dlq_storage: DLQStorage[InboxEvent] | None = None,
    enable_otel: bool = False,
    enable_circuit_breaker: bool = False,
    circuit_breaker_failure_threshold: int = 5,
    circuit_breaker_recovery_timeout: int = 60,
    job_name: str = "dispatching_processor",
    additional_steps_before: list[ProcessingStep[InboxEvent]] | None = None,
    additional_steps_after: list[ProcessingStep[InboxEvent]] | None = None,
) -> EventBatchProcessor[InboxEvent]:
    """Preset factory for inbox processing with automated event routing."""

    async def dispatch_handler(event: InboxEvent, repo: InboxEventRepository) -> EventHandlerResult:
        return await router.dispatch(event, event.source, repo, **(dependencies or {}))

    builder = EventProcessorBuilder[InboxEvent](repo)

    if filter_sources:
        builder.with_fetch_strategy(FilteredFetchStrategy(sources=filter_sources))

    if enable_otel:
        builder.add_step(OpenTelemetryStep(service_name=job_name))

    if enable_circuit_breaker:
        builder.add_step(
            CircuitBreakerStep(
                failure_threshold=circuit_breaker_failure_threshold,
                recovery_timeout_seconds=circuit_breaker_recovery_timeout,
            )
        )

    if dlq_storage:
        builder.add_step(DLQStep(dlq_storage))

    if additional_steps_before:
        for step in additional_steps_before:
            builder.add_step(step)

    if skip_duplicate_siblings:
        builder.add_step(SiblingDeduplicationStep(enabled=True))

    builder.add_step(HandlerExecutionStep(dispatch_handler, timeout=process_timeout))  # type: ignore[arg-type]

    if additional_steps_after:
        for step in additional_steps_after:
            builder.add_step(step)

    if metrics:
        builder.add_step(MetricsStep(metrics))
        builder.with_metrics(metrics)

    return builder.with_job_name(job_name).build()

create_inbox_processor(repo, handler, *, skip_duplicate_siblings=True, filter_sources=None, process_timeout=DEFAULT_PROCESS_TIMEOUT_SECONDS, metrics=None, dlq_storage=None, enable_otel=False, enable_circuit_breaker=False, circuit_breaker_failure_threshold=5, circuit_breaker_recovery_timeout=60, job_name='inbox_processor', additional_steps_before=None, additional_steps_after=None)

Preset factory for common inbox processing scenarios.

Source code in omni_box/application/factories.py
def create_inbox_processor(
    repo: InboxEventRepository,
    handler: Callable[[InboxEvent, InboxEventRepository], Awaitable[EventHandlerResult | None]],
    *,
    skip_duplicate_siblings: bool = True,
    filter_sources: list[str] | None = None,
    process_timeout: float = DEFAULT_PROCESS_TIMEOUT_SECONDS,
    metrics: InboxMetrics | None = None,
    dlq_storage: DLQStorage[InboxEvent] | None = None,
    enable_otel: bool = False,
    enable_circuit_breaker: bool = False,
    circuit_breaker_failure_threshold: int = 5,
    circuit_breaker_recovery_timeout: int = 60,
    job_name: str = "inbox_processor",
    additional_steps_before: list[ProcessingStep[InboxEvent]] | None = None,
    additional_steps_after: list[ProcessingStep[InboxEvent]] | None = None,
) -> EventBatchProcessor[InboxEvent]:
    """Preset factory for common inbox processing scenarios."""
    builder = EventProcessorBuilder[InboxEvent](repo)

    if filter_sources:
        builder.with_fetch_strategy(FilteredFetchStrategy(sources=filter_sources))

    if enable_otel:
        builder.add_step(OpenTelemetryStep(service_name=job_name))

    if enable_circuit_breaker:
        builder.add_step(
            CircuitBreakerStep(
                failure_threshold=circuit_breaker_failure_threshold,
                recovery_timeout_seconds=circuit_breaker_recovery_timeout,
            )
        )

    if dlq_storage:
        builder.add_step(DLQStep(dlq_storage))

    if additional_steps_before:
        for step in additional_steps_before:
            builder.add_step(step)

    if skip_duplicate_siblings:
        builder.add_step(SiblingDeduplicationStep(enabled=True))

    builder.add_step(HandlerExecutionStep(handler, timeout=process_timeout))  # type: ignore[arg-type]

    if additional_steps_after:
        for step in additional_steps_after:
            builder.add_step(step)

    if metrics:
        builder.add_step(MetricsStep(metrics))
        builder.with_metrics(metrics)

    return builder.with_job_name(job_name).build()

create_outbox_processor(repo, publisher, *, publish_timeout=DEFAULT_PUBLISH_TIMEOUT_SECONDS, metrics=None, dlq_storage=None, enable_otel=False, enable_circuit_breaker=False, circuit_breaker_failure_threshold=5, circuit_breaker_recovery_timeout=60, job_name='outbox_processor', additional_steps_before=None, additional_steps_after=None)

Preset factory for common outbox publishing scenarios.

Source code in omni_box/application/factories.py
def create_outbox_processor(
    repo: OutboxEventRepository,
    publisher: EventPublisher,
    *,
    publish_timeout: float = DEFAULT_PUBLISH_TIMEOUT_SECONDS,
    metrics: OutboxMetrics | None = None,
    dlq_storage: DLQStorage[OutboxEvent] | None = None,
    enable_otel: bool = False,
    enable_circuit_breaker: bool = False,
    circuit_breaker_failure_threshold: int = 5,
    circuit_breaker_recovery_timeout: int = 60,
    job_name: str = "outbox_processor",
    additional_steps_before: list[ProcessingStep[OutboxEvent]] | None = None,
    additional_steps_after: list[ProcessingStep[OutboxEvent]] | None = None,
) -> EventBatchProcessor[OutboxEvent]:
    """Preset factory for common outbox publishing scenarios."""
    builder = EventProcessorBuilder[OutboxEvent](repo)

    if enable_otel:
        builder.add_step(OpenTelemetryStep(service_name=job_name))

    if enable_circuit_breaker:
        builder.add_step(
            CircuitBreakerStep(
                failure_threshold=circuit_breaker_failure_threshold,
                recovery_timeout_seconds=circuit_breaker_recovery_timeout,
            )
        )

    if dlq_storage:
        builder.add_step(DLQStep(dlq_storage))

    if additional_steps_before:
        for step in additional_steps_before:
            builder.add_step(step)

    builder.add_step(HandlerExecutionStep(publisher.publish, timeout=publish_timeout))

    if additional_steps_after:
        for step in additional_steps_after:
            builder.add_step(step)

    if metrics:
        builder.add_step(MetricsStep(metrics))
        builder.with_metrics(metrics)

    return builder.with_job_name(job_name).build()

event_handler(event_type, topic=None, schema_version=None)

Mark method as event handler.

Parameters:

Name Type Description Default
event_type DispatchName

Event type to handle (str or StrEnum)

required
topic DispatchName | None

Optional topic override (str or StrEnum; class topic if omitted)

None
schema_version str | None

Optional schema version to match (e.g. "1.0.0")

None

Returns:

Type Description
Callable[[F], F]

Decorated method with metadata attached

Source code in omni_box/core/dispatch/decorators.py
def event_handler[F: Callable[..., Any]](
    event_type: DispatchName,
    topic: DispatchName | None = None,
    schema_version: str | None = None,
) -> Callable[[F], F]:
    """Mark method as event handler.

    Args:
        event_type: Event type to handle (``str`` or ``StrEnum``)
        topic: Optional topic override (``str`` or ``StrEnum``; class topic if omitted)
        schema_version: Optional schema version to match (e.g. "1.0.0")

    Returns:
        Decorated method with metadata attached
    """

    def decorator(method: F) -> F:
        method._is_event_handler = True  # type: ignore[attr-defined]
        method._event_type = as_dispatch_str(event_type)  # type: ignore[attr-defined]
        method._event_topic = as_dispatch_str(topic) if topic is not None else None  # type: ignore[attr-defined]
        method._schema_version = schema_version  # type: ignore[attr-defined]
        return method

    return decorator

handler_completed(status=EventHandlerStatus.COMPLETED)

Explicit successful completion.

Source code in omni_box/core/services/results.py
def handler_completed(status: EventHandlerStatus | str = EventHandlerStatus.COMPLETED) -> EventHandlerResult:
    """Explicit successful completion."""
    return EventHandlerResult(success=True, status=status)

handler_retry(message, *, count_as_attempt=True, next_retry_at=None, status=EventHandlerStatus.RETRY)

Schedule retry with an error message.

Source code in omni_box/core/services/results.py
def handler_retry(
    message: str,
    *,
    count_as_attempt: bool = True,
    next_retry_at: datetime | None = None,
    status: EventHandlerStatus | str = EventHandlerStatus.RETRY,
) -> EventHandlerResult:
    """Schedule retry with an error message."""
    return EventHandlerResult(
        success=False,
        error_message=message,
        count_as_attempt=count_as_attempt,
        next_retry_at=next_retry_at,
        status=status,
    )

handler_skipped(status=EventHandlerStatus.SKIPPED)

Signal that the handler chose not to process this event.

Source code in omni_box/core/services/results.py
def handler_skipped(status: EventHandlerStatus | str = EventHandlerStatus.SKIPPED) -> EventHandlerResult:
    """Signal that the handler chose not to process this event."""
    return EventHandlerResult(success=False, processed=False, count_as_attempt=False, status=status)