Database Migrations and Schema¶
omni-box ships abstract SQLAlchemy bases that describe the outbox/inbox schema. The consuming service owns its own DeclarativeBase, registers the concrete tables, and generates migrations.
The DDL below mirrors
omni_box/infra/storage/postgres/orm.py. If you derive a custom schema, keep the column names and types identical —PostgresOutboxRepository/PostgresInboxRepositoryrely on them viamapped_columndefaults.
Status values¶
EventStatus is defined as a Python StrEnum with lowercase values:
All DDL, constraints, and partial indexes use the lowercase form.
Outbox table¶
CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
aggregate_type VARCHAR(50) NOT NULL,
aggregate_id UUID NOT NULL,
topic VARCHAR(255) NOT NULL,
partition_key VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
headers JSONB,
trace_id VARCHAR(64),
idempotency_key VARCHAR(128),
correlation_id VARCHAR(64),
causation_id VARCHAR(64),
schema_version VARCHAR(50),
status VARCHAR(20) NOT NULL DEFAULT 'pending',
attempts_made INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 6,
last_error VARCHAR(2000),
scheduled_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ,
locked_at TIMESTAMPTZ,
locked_by VARCHAR(255),
created_at TIMESTAMPTZ NOT NULL DEFAULT timezone('UTC', now()),
updated_at TIMESTAMPTZ DEFAULT timezone('UTC', now()),
CONSTRAINT ck_outbox_events_attempts_valid CHECK (attempts_made <= max_attempts),
CONSTRAINT ck_outbox_events_completed_status_consistency CHECK (
(status = 'completed' AND completed_at IS NOT NULL) OR
(status <> 'completed' AND completed_at IS NULL)
),
CONSTRAINT ck_outbox_events_lock_consistency CHECK (
(locked_at IS NULL AND locked_by IS NULL) OR
(locked_at IS NOT NULL AND locked_by IS NOT NULL)
)
);
-- Hot path for the publisher: PENDING, not locked, has budget.
CREATE INDEX idx_outbox_events_pending_fetch
ON outbox_events (scheduled_at)
WHERE status = 'pending' AND locked_at IS NULL AND attempts_made < max_attempts;
CREATE INDEX idx_outbox_events_locked_at
ON outbox_events (locked_at)
WHERE locked_at IS NOT NULL;
CREATE INDEX idx_outbox_events_completed_cleanup
ON outbox_events (completed_at)
WHERE status = 'completed' AND completed_at IS NOT NULL;
CREATE UNIQUE INDEX idx_outbox_events_idempotency_key
ON outbox_events (idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE INDEX idx_outbox_events_created_at ON outbox_events (created_at);
CREATE INDEX idx_outbox_events_updated_at ON outbox_events (updated_at);
Inbox table¶
The inbox uses an extra unique index over (message_id, consumer_group) so that PostgresInboxRepository can use INSERT ... ON CONFLICT DO NOTHING for cheap deduplication.
CREATE TABLE inbox_events (
id UUID PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
message_id VARCHAR(255) NOT NULL,
consumer_group VARCHAR(255) NOT NULL,
source VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
headers JSONB,
trace_id VARCHAR(64),
idempotency_key VARCHAR(128),
correlation_id VARCHAR(64),
causation_id VARCHAR(64),
schema_version VARCHAR(50),
status VARCHAR(20) NOT NULL DEFAULT 'pending',
attempts_made INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 6,
last_error VARCHAR(2000),
scheduled_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ,
locked_at TIMESTAMPTZ,
locked_by VARCHAR(255),
created_at TIMESTAMPTZ NOT NULL DEFAULT timezone('UTC', now()),
updated_at TIMESTAMPTZ DEFAULT timezone('UTC', now()),
CONSTRAINT ck_inbox_events_attempts_valid CHECK (attempts_made <= max_attempts),
CONSTRAINT ck_inbox_events_completed_status_consistency CHECK (
(status = 'completed' AND completed_at IS NOT NULL) OR
(status <> 'completed' AND completed_at IS NULL)
),
CONSTRAINT ck_inbox_events_lock_consistency CHECK (
(locked_at IS NULL AND locked_by IS NULL) OR
(locked_at IS NOT NULL AND locked_by IS NOT NULL)
)
);
-- Inbox deduplication key.
CREATE UNIQUE INDEX idx_inbox_deduplication
ON inbox_events (message_id, consumer_group);
CREATE INDEX idx_inbox_events_pending_fetch
ON inbox_events (scheduled_at)
WHERE status = 'pending' AND locked_at IS NULL AND attempts_made < max_attempts;
CREATE INDEX idx_inbox_events_locked_at
ON inbox_events (locked_at)
WHERE locked_at IS NOT NULL;
CREATE INDEX idx_inbox_events_completed_cleanup
ON inbox_events (completed_at)
WHERE status = 'completed' AND completed_at IS NOT NULL;
CREATE UNIQUE INDEX idx_inbox_events_idempotency_key
ON inbox_events (idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE INDEX idx_inbox_events_created_at ON inbox_events (created_at);
CREATE INDEX idx_inbox_events_updated_at ON inbox_events (updated_at);
Partitioned variants¶
When you inherit from OutboxEventPartitionedDBBase / InboxEventPartitionedDBBase, created_at becomes part of the primary key (PostgreSQL requires unique indexes on partitioned tables to include the partition key). Every unique index — including the inbox deduplication index — must be widened accordingly.
For the partitioned inbox:
CREATE UNIQUE INDEX idx_inbox_events_p_deduplication
ON inbox_events_partitioned (message_id, consumer_group, created_at);
PostgresInboxRepository reads the column tuple from __inbox_dedup_index_columns__ on the model class, so the partitioned base sets ("message_id", "consumer_group", "created_at") automatically.
SQLAlchemy registration¶
omni_box.infra.storage.postgres.orm does not export a shared Base. Define your own DeclarativeBase and bind the abstract models to it:
from sqlalchemy.orm import DeclarativeBase
from omni_box.infra.storage.postgres import (
InboxEventDBBase,
OutboxEventDBBase,
)
class Base(DeclarativeBase):
"""Your service-owned declarative base."""
class OutboxEventDB(Base, OutboxEventDBBase):
"""Concrete outbox table; inherits __tablename__ and __table_args__."""
class InboxEventDB(Base, InboxEventDBBase):
"""Concrete inbox table; inherits __tablename__, dedup index, etc."""
You can override __tablename__, add service-specific columns, or change __inbox_dedup_index_columns__ if you partition.
Alembic example¶
An auto-generated migration for the outbox table will look like this (fragment):
def upgrade() -> None:
op.create_table(
"outbox_events",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("event_type", sa.String(100), nullable=False),
sa.Column("aggregate_type", sa.String(50), nullable=False),
sa.Column("aggregate_id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("topic", sa.String(255), nullable=False),
sa.Column("partition_key", sa.String(255), nullable=False),
sa.Column("payload", postgresql.JSONB, nullable=False),
sa.Column("headers", postgresql.JSONB),
sa.Column("trace_id", sa.String(64)),
sa.Column("idempotency_key", sa.String(128)),
sa.Column("correlation_id", sa.String(64)),
sa.Column("causation_id", sa.String(64)),
sa.Column("schema_version", sa.String(50)),
sa.Column("status", sa.String(20), nullable=False, server_default="pending"),
sa.Column("attempts_made", sa.Integer, nullable=False, server_default="0"),
sa.Column("max_attempts", sa.Integer, nullable=False, server_default="6"),
sa.Column("last_error", sa.String(2000)),
sa.Column("scheduled_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("completed_at", sa.DateTime(timezone=True)),
sa.Column("locked_at", sa.DateTime(timezone=True)),
sa.Column("locked_by", sa.String(255)),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("timezone('UTC', now())"),
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("timezone('UTC', now())"),
),
sa.CheckConstraint("attempts_made <= max_attempts", name="ck_outbox_events_attempts_valid"),
# ...other CHECKs and indexes from get_event_constraints("outbox_events")
)
If you use SQLAlchemy autogenerate, Alembic picks up the Index / CheckConstraint declarations from __table_args__ automatically — you do not need to write them out manually.