Skip to content

API Reference

Complete API documentation auto-generated from source code.

Core Module

The main sqlalchemy_foundation_kit module exports all core functionality.

sqlalchemy_foundation_kit

Foundation layer for SQLAlchemy-based services with UoW, session management, and observability.

UnConstrainedEnum = partial(Enum, native_enum=False, create_constraint=False, validate_strings=True, values_callable=_extract_enum_values) module-attribute

GenericJSONDict = dict[str, Any] module-attribute

DB_NAMING_CONVENTION = {'ix': '%(column_0_label)s_idx', 'uq': '%(table_name)s_%(column_0_name)s_key', 'ck': '%(table_name)s_%(constraint_name)s_check', 'fk': '%(table_name)s_%(column_0_name)s_fkey', 'pk': '%(table_name)s_pkey'} module-attribute

Base

Bases: DeclarativeBase

Base class for all ORM models.

Source code in sqlalchemy_foundation_kit/base/models.py
class Base(DeclarativeBase):
    """Base class for all ORM models."""

    type_annotation_map: ClassVar[dict[type, TypeEngine[Any]]] = {
        uuid.UUID: postgresql.UUID(),
        datetime.datetime: TIMESTAMP(timezone=True),
    }
    metadata = MetaData(naming_convention=DB_NAMING_CONVENTION)

BaseTable

Bases: Base

Base table class with repr.

Source code in sqlalchemy_foundation_kit/base/models.py
class BaseTable(Base):
    """Base table class with __repr__."""

    __abstract__ = True

    def __repr__(self) -> str:
        columns = {column.name: getattr(self, column.name) for column in self.__table__.columns}
        return f"<{self.__tablename__}: {', '.join(f'{k}={v}' for k, v in columns.items())}>"

DatetimeColumnsMixin

Mixin for tables that need created_at and updated_at timestamps.

Control indexing via created_at_index and updated_at_index class variables in the model. Defaults to False.

Source code in sqlalchemy_foundation_kit/base/models.py
class DatetimeColumnsMixin:
    """Mixin for tables that need created_at and updated_at timestamps.

    Control indexing via __created_at_index__ and __updated_at_index__ class variables in the model.
    Defaults to False.
    """

    __created_at_index__: ClassVar[bool] = False
    __updated_at_index__: ClassVar[bool] = False

    @declared_attr
    def created_at(self) -> Mapped[datetime.datetime]:
        return mapped_column(
            server_default=func.timezone("UTC", func.now()),
            index=self.__created_at_index__,
        )

    @declared_attr
    def updated_at(self) -> Mapped[datetime.datetime]:
        return mapped_column(
            server_default=func.timezone("UTC", func.now()),
            onupdate=func.timezone("UTC", func.now()),
            index=self.__updated_at_index__,
        )

PydanticJSONB

Bases: TypeDecorator

SQLAlchemy TypeDecorator for Pydantic models stored as JSONB.

Validates and serializes values against the supplied Pydantic-compatible type on both write and read paths. A model_type is required — if you need raw dict storage without validation, use SQLAlchemy's built-in JSONB directly (or the :data:GenericJSONDict alias).

Source code in sqlalchemy_foundation_kit/base/types.py
class PydanticJSONB(TypeDecorator):
    """SQLAlchemy TypeDecorator for Pydantic models stored as JSONB.

    Validates and serializes values against the supplied Pydantic-compatible type
    on both write and read paths. A ``model_type`` is **required** — if you need
    raw dict storage without validation, use SQLAlchemy's built-in ``JSONB`` directly
    (or the :data:`GenericJSONDict` alias).
    """

    impl = JSONB
    cache_ok = True

    def __init__(self, model_type: type[T], *args: Any, **kwargs: Any) -> None:
        """Initialize the type decorator.

        Args:
            model_type: Pydantic model class (or any type compatible with
                ``pydantic.TypeAdapter``) used to validate and serialize values.
        """
        self.model_type = model_type
        self.adapter: TypeAdapter[T] = TypeAdapter(model_type)
        super().__init__(*args, **kwargs)

    def process_bind_param(self, value: Any, dialect: Any) -> Any:
        if value is None:
            return None

        # Validate before dump to avoid Pydantic serialization warnings when value
        # is a dict (e.g. from model_dump()). This ensures value matches the expected
        # schema and converts it to a model instance if needed.
        validated = self.adapter.validate_python(value)
        return self.adapter.dump_python(validated, mode="json")

    def process_result_value(self, value: Any, dialect: Any) -> Any:
        if value is None:
            return None

        try:
            return self.adapter.validate_python(value)
        except ValidationError:
            logger.warning(
                "Validation error while loading %s from JSONB. Using raw data. "
                "This may indicate legacy data that doesn't match current schema.",
                self.model_type,
            )
            return value
__init__(model_type, *args, **kwargs)

Initialize the type decorator.

Parameters:

Name Type Description Default
model_type type[T]

Pydantic model class (or any type compatible with pydantic.TypeAdapter) used to validate and serialize values.

required
Source code in sqlalchemy_foundation_kit/base/types.py
def __init__(self, model_type: type[T], *args: Any, **kwargs: Any) -> None:
    """Initialize the type decorator.

    Args:
        model_type: Pydantic model class (or any type compatible with
            ``pydantic.TypeAdapter``) used to validate and serialize values.
    """
    self.model_type = model_type
    self.adapter: TypeAdapter[T] = TypeAdapter(model_type)
    super().__init__(*args, **kwargs)

AsyncSessionManager

Bases: Generic[SessionT]

Manages async database sessions with configurable connection pooling.

Supports two initialization approaches:

  1. Direct constructor — all parameters in constructor with defaults.
  2. Builder pattern — use :class:AsyncSessionManagerBuilder for more readable complex configurations.
Source code in sqlalchemy_foundation_kit/session/manager.py
class AsyncSessionManager(Generic[SessionT]):
    """Manages async database sessions with configurable connection pooling.

    Supports two initialization approaches:

    1. **Direct constructor** — all parameters in constructor with defaults.
    2. **Builder pattern** — use :class:`AsyncSessionManagerBuilder` for more
       readable complex configurations.
    """

    def __init__(
        self,
        url: str,
        echo: bool = False,
        poolclass: str | type = "null",
        session_class: type[SessionT] | None = None,
        expire_on_commit: bool = False,
        connect_args: dict[str, object] | None = None,
        isolation_level: str | None = None,
        pool_settings: PoolSettingsProtocol | None = None,
        use_orjson: bool = False,
        metrics: PostgresMetricsProtocol | None = None,
        on_engine_created: Callable[[AsyncEngine], None] | None = None,
        dispose_timeout: float = DEFAULT_DISPOSE_TIMEOUT_SECONDS,
        **kwargs: object,
    ) -> None:
        """Initialize session manager with direct configuration.

        Args:
            url: Database connection URL (required).
            echo: If True, SQLAlchemy will log all SQL statements (default: False).
            poolclass: SQLAlchemy pool class or name (default: "null").
                Use "queue" for production, "null" for testing.
            session_class: Custom ``AsyncSession`` subclass (default: ``AsyncSession``).
            expire_on_commit: If True, objects expire after commit (default: False).
            connect_args: Arguments passed to the database driver (default: None).
            isolation_level: Default transaction isolation level (default: None).
            pool_settings: Pool configuration settings (default: None).
            use_orjson: If True, use orjson for JSON serialization (default: False).
            metrics: Optional metrics collector (default: None).
            on_engine_created: Optional callback invoked with ``AsyncEngine`` after creation.
                Use for OpenTelemetry instrumentation, custom event listeners, etc.
            dispose_timeout: Maximum seconds to wait for engine disposal in :meth:`aclose`
                (default: 30.0). Lower this in tests or short-lived environments; raise it
                if you have long-running transactions that need more time to settle.
            **kwargs: Additional keyword arguments for ``create_async_engine``.
        """
        self._closed = False
        self._close_lock = asyncio.Lock()
        self._dispose_timeout = dispose_timeout
        resolved_poolclass = resolve_pool_class(poolclass)
        engine_kwargs = build_engine_kwargs(
            echo=echo,
            poolclass=resolved_poolclass,
            isolation_level=isolation_level,
            pool_settings=pool_settings,
            connect_args=connect_args,
            extra_kwargs=kwargs,
            use_orjson=use_orjson,
        )

        self._engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
        self._session_maker = cast(
            async_sessionmaker[SessionT],
            async_sessionmaker(
                self._engine,
                class_=session_class or AsyncSession,
                expire_on_commit=expire_on_commit,
            ),
        )

        if metrics:
            attach_metrics(self._engine, metrics)

        if on_engine_created is not None:
            on_engine_created(self._engine)

    async def aclose(self) -> None:
        """Close the engine and all connections."""
        async with self._close_lock:
            if self._closed:
                return

            try:
                # Use shield so disposal runs even if the task is cancelled; timeout avoids indefinite hang.
                await asyncio.wait_for(
                    asyncio.shield(self._engine.dispose()),
                    timeout=self._dispose_timeout,
                )
            except TimeoutError:
                logger.warning(
                    "Engine disposal timed out after %.1f seconds. Some connections may not have closed cleanly.",
                    self._dispose_timeout,
                )
            finally:
                # Always mark as closed to prevent retry loops
                self._closed = True

    def _ensure_not_closed(self) -> None:
        """Ensure that the manager is not closed."""
        if self._closed:
            raise RuntimeError("AsyncSessionManager is closed")

    async def __aenter__(self) -> AsyncSessionManager[SessionT]:
        """Support for async context manager."""
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Close the engine on exit."""
        await self.aclose()

    @property
    def engine(self) -> AsyncEngine:
        """Get the underlying engine."""
        return self._engine

    @property
    def session_maker(self) -> async_sessionmaker[SessionT]:
        """Get the session maker."""
        return self._session_maker

    @asynccontextmanager
    async def get_session(self) -> AsyncIterator[SessionT]:
        """Get a new database session."""
        self._ensure_not_closed()
        async with self._session_maker() as session:
            yield session

    @asynccontextmanager
    async def get_transaction(self, isolation_level: str | None = None) -> AsyncIterator[SessionT]:
        """Get a new database session with automatic transaction management.

        Args:
            isolation_level: Optional isolation level for the transaction.

        Yields:
            Managed async session with active transaction.
        """
        self._ensure_not_closed()
        options = {"isolation_level": isolation_level} if isolation_level else {}
        async with (
            self._session_maker(execution_options=options) as session,
            session.begin(),
        ):
            yield session
engine property

Get the underlying engine.

session_maker property

Get the session maker.

__aenter__() async

Support for async context manager.

Source code in sqlalchemy_foundation_kit/session/manager.py
async def __aenter__(self) -> AsyncSessionManager[SessionT]:
    """Support for async context manager."""
    return self
__aexit__(exc_type, exc_val, exc_tb) async

Close the engine on exit.

Source code in sqlalchemy_foundation_kit/session/manager.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Close the engine on exit."""
    await self.aclose()
__init__(url, echo=False, poolclass='null', session_class=None, expire_on_commit=False, connect_args=None, isolation_level=None, pool_settings=None, use_orjson=False, metrics=None, on_engine_created=None, dispose_timeout=DEFAULT_DISPOSE_TIMEOUT_SECONDS, **kwargs)

Initialize session manager with direct configuration.

Parameters:

Name Type Description Default
url str

Database connection URL (required).

required
echo bool

If True, SQLAlchemy will log all SQL statements (default: False).

False
poolclass str | type

SQLAlchemy pool class or name (default: "null"). Use "queue" for production, "null" for testing.

'null'
session_class type[SessionT] | None

Custom AsyncSession subclass (default: AsyncSession).

None
expire_on_commit bool

If True, objects expire after commit (default: False).

False
connect_args dict[str, object] | None

Arguments passed to the database driver (default: None).

None
isolation_level str | None

Default transaction isolation level (default: None).

None
pool_settings PoolSettingsProtocol | None

Pool configuration settings (default: None).

None
use_orjson bool

If True, use orjson for JSON serialization (default: False).

False
metrics PostgresMetricsProtocol | None

Optional metrics collector (default: None).

None
on_engine_created Callable[[AsyncEngine], None] | None

Optional callback invoked with AsyncEngine after creation. Use for OpenTelemetry instrumentation, custom event listeners, etc.

None
dispose_timeout float

Maximum seconds to wait for engine disposal in :meth:aclose (default: 30.0). Lower this in tests or short-lived environments; raise it if you have long-running transactions that need more time to settle.

DEFAULT_DISPOSE_TIMEOUT_SECONDS
**kwargs object

Additional keyword arguments for create_async_engine.

{}
Source code in sqlalchemy_foundation_kit/session/manager.py
def __init__(
    self,
    url: str,
    echo: bool = False,
    poolclass: str | type = "null",
    session_class: type[SessionT] | None = None,
    expire_on_commit: bool = False,
    connect_args: dict[str, object] | None = None,
    isolation_level: str | None = None,
    pool_settings: PoolSettingsProtocol | None = None,
    use_orjson: bool = False,
    metrics: PostgresMetricsProtocol | None = None,
    on_engine_created: Callable[[AsyncEngine], None] | None = None,
    dispose_timeout: float = DEFAULT_DISPOSE_TIMEOUT_SECONDS,
    **kwargs: object,
) -> None:
    """Initialize session manager with direct configuration.

    Args:
        url: Database connection URL (required).
        echo: If True, SQLAlchemy will log all SQL statements (default: False).
        poolclass: SQLAlchemy pool class or name (default: "null").
            Use "queue" for production, "null" for testing.
        session_class: Custom ``AsyncSession`` subclass (default: ``AsyncSession``).
        expire_on_commit: If True, objects expire after commit (default: False).
        connect_args: Arguments passed to the database driver (default: None).
        isolation_level: Default transaction isolation level (default: None).
        pool_settings: Pool configuration settings (default: None).
        use_orjson: If True, use orjson for JSON serialization (default: False).
        metrics: Optional metrics collector (default: None).
        on_engine_created: Optional callback invoked with ``AsyncEngine`` after creation.
            Use for OpenTelemetry instrumentation, custom event listeners, etc.
        dispose_timeout: Maximum seconds to wait for engine disposal in :meth:`aclose`
            (default: 30.0). Lower this in tests or short-lived environments; raise it
            if you have long-running transactions that need more time to settle.
        **kwargs: Additional keyword arguments for ``create_async_engine``.
    """
    self._closed = False
    self._close_lock = asyncio.Lock()
    self._dispose_timeout = dispose_timeout
    resolved_poolclass = resolve_pool_class(poolclass)
    engine_kwargs = build_engine_kwargs(
        echo=echo,
        poolclass=resolved_poolclass,
        isolation_level=isolation_level,
        pool_settings=pool_settings,
        connect_args=connect_args,
        extra_kwargs=kwargs,
        use_orjson=use_orjson,
    )

    self._engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
    self._session_maker = cast(
        async_sessionmaker[SessionT],
        async_sessionmaker(
            self._engine,
            class_=session_class or AsyncSession,
            expire_on_commit=expire_on_commit,
        ),
    )

    if metrics:
        attach_metrics(self._engine, metrics)

    if on_engine_created is not None:
        on_engine_created(self._engine)
aclose() async

Close the engine and all connections.

Source code in sqlalchemy_foundation_kit/session/manager.py
async def aclose(self) -> None:
    """Close the engine and all connections."""
    async with self._close_lock:
        if self._closed:
            return

        try:
            # Use shield so disposal runs even if the task is cancelled; timeout avoids indefinite hang.
            await asyncio.wait_for(
                asyncio.shield(self._engine.dispose()),
                timeout=self._dispose_timeout,
            )
        except TimeoutError:
            logger.warning(
                "Engine disposal timed out after %.1f seconds. Some connections may not have closed cleanly.",
                self._dispose_timeout,
            )
        finally:
            # Always mark as closed to prevent retry loops
            self._closed = True
get_session() async

Get a new database session.

Source code in sqlalchemy_foundation_kit/session/manager.py
@asynccontextmanager
async def get_session(self) -> AsyncIterator[SessionT]:
    """Get a new database session."""
    self._ensure_not_closed()
    async with self._session_maker() as session:
        yield session
get_transaction(isolation_level=None) async

Get a new database session with automatic transaction management.

Parameters:

Name Type Description Default
isolation_level str | None

Optional isolation level for the transaction.

None

Yields:

Type Description
AsyncIterator[SessionT]

Managed async session with active transaction.

Source code in sqlalchemy_foundation_kit/session/manager.py
@asynccontextmanager
async def get_transaction(self, isolation_level: str | None = None) -> AsyncIterator[SessionT]:
    """Get a new database session with automatic transaction management.

    Args:
        isolation_level: Optional isolation level for the transaction.

    Yields:
        Managed async session with active transaction.
    """
    self._ensure_not_closed()
    options = {"isolation_level": isolation_level} if isolation_level else {}
    async with (
        self._session_maker(execution_options=options) as session,
        session.begin(),
    ):
        yield session

AsyncSessionManagerBuilder

Bases: Generic[SessionT]

Builder for AsyncSessionManager construction.

Provides a fluent API for configuring AsyncSessionManager with many optional parameters. This pattern improves code readability and follows KISS principle by avoiding constructors with 10+ parameters.

Examples:

Basic usage: >>> manager = ( ... AsyncSessionManagerBuilder("postgresql+asyncpg://...") ... .with_pool("queue") ... .with_echo(True) ... .build() ... )

Advanced configuration: >>> manager = ( ... AsyncSessionManagerBuilderCustomSession ... .with_session_class(CustomSession) ... .with_pool("queue", pool_settings=settings.pool) ... .with_metrics(metrics) ... .with_callbacks(on_engine_created=instrument_engine) ... .with_json_serialization(orjson=True) ... .with_isolation_level("READ COMMITTED") ... .build() ... )

Reusable configuration: >>> builder = ( ... AsyncSessionManagerBuilder("postgresql+asyncpg://...") ... .with_pool("queue") ... .with_metrics(metrics) ... ) >>> manager1 = builder.with_echo(True).build() >>> manager2 = builder.with_echo(False).build()

Source code in sqlalchemy_foundation_kit/session/builder.py
class AsyncSessionManagerBuilder(Generic[SessionT]):
    """Builder for AsyncSessionManager construction.

    Provides a fluent API for configuring AsyncSessionManager with many optional
    parameters. This pattern improves code readability and follows KISS principle
    by avoiding constructors with 10+ parameters.

    Examples:
        Basic usage:
            >>> manager = (
            ...     AsyncSessionManagerBuilder("postgresql+asyncpg://...")
            ...     .with_pool("queue")
            ...     .with_echo(True)
            ...     .build()
            ... )

        Advanced configuration:
            >>> manager = (
            ...     AsyncSessionManagerBuilder[CustomSession]("postgresql+asyncpg://...")
            ...     .with_session_class(CustomSession)
            ...     .with_pool("queue", pool_settings=settings.pool)
            ...     .with_metrics(metrics)
            ...     .with_callbacks(on_engine_created=instrument_engine)
            ...     .with_json_serialization(orjson=True)
            ...     .with_isolation_level("READ COMMITTED")
            ...     .build()
            ... )

        Reusable configuration:
            >>> builder = (
            ...     AsyncSessionManagerBuilder("postgresql+asyncpg://...")
            ...     .with_pool("queue")
            ...     .with_metrics(metrics)
            ... )
            >>> manager1 = builder.with_echo(True).build()
            >>> manager2 = builder.with_echo(False).build()
    """

    def __init__(self, url: str) -> None:
        """Initialize builder with database URL (required).

        Args:
            url: Database connection URL (required parameter).
        """
        self._url = url
        self._echo: bool = False
        self._poolclass: str | type = "null"
        self._session_class: type[SessionT] | None = None
        self._expire_on_commit: bool = False
        self._connect_args: dict[str, object] | None = None
        self._isolation_level: str | None = None
        self._pool_settings: PoolSettingsProtocol | None = None
        self._use_orjson: bool = False
        self._metrics: PostgresMetricsProtocol | None = None
        self._on_engine_created: Callable[[AsyncEngine], None] | None = None
        self._dispose_timeout: float | None = None
        self._extra_kwargs: dict[str, object] = {}

    def with_echo(self, echo: bool = True) -> AsyncSessionManagerBuilder[SessionT]:
        """Enable SQL statement logging.

        Args:
            echo: If True, SQLAlchemy logs all SQL statements.

        Returns:
            Self for method chaining.
        """
        self._echo = echo
        return self

    def with_pool(
        self,
        poolclass: str | type,
        pool_settings: PoolSettingsProtocol | None = None,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Configure connection pool.

        Args:
            poolclass: Pool class name or type (e.g., "queue", "null").
            pool_settings: Optional pool configuration settings.

        Returns:
            Self for method chaining.
        """
        self._poolclass = poolclass
        self._pool_settings = pool_settings
        return self

    def with_session_class(
        self,
        session_class: type[SessionT],
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Use custom session class.

        Args:
            session_class: Custom AsyncSession subclass.

        Returns:
            Self for method chaining.
        """
        self._session_class = session_class
        return self

    def with_expire_on_commit(
        self,
        expire: bool = True,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Configure object expiration behavior after commit.

        Args:
            expire: If True, all objects expire after commit.

        Returns:
            Self for method chaining.
        """
        self._expire_on_commit = expire
        return self

    def with_connect_args(
        self,
        **connect_args: object,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Add database driver connection arguments.

        Args:
            **connect_args: Arguments passed to the database driver.

        Returns:
            Self for method chaining.

        Examples:
            >>> builder.with_connect_args(
            ...     server_settings={"application_name": "myapp"},
            ...     command_timeout=60,
            ... )
        """
        if self._connect_args is None:
            self._connect_args = {}
        self._connect_args.update(connect_args)
        return self

    def with_isolation_level(
        self,
        isolation_level: str,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Set default transaction isolation level.

        Args:
            isolation_level: Default isolation level (e.g., "READ COMMITTED").

        Returns:
            Self for method chaining.
        """
        self._isolation_level = isolation_level
        return self

    def with_metrics(
        self,
        metrics: PostgresMetricsProtocol,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Enable connection pool metrics collection.

        Args:
            metrics: Metrics collector implementing PostgresMetricsProtocol.

        Returns:
            Self for method chaining.
        """
        self._metrics = metrics
        return self

    def with_callbacks(
        self,
        on_engine_created: Callable[[AsyncEngine], None],
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Register engine creation callback.

        Useful for attaching instrumentation (OpenTelemetry), custom event
        listeners, or debug hooks right after engine creation.

        Args:
            on_engine_created: Callback invoked with AsyncEngine after creation.

        Returns:
            Self for method chaining.

        Examples:
            >>> from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
            >>> def instrument(engine):
            ...     SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
            >>> builder.with_callbacks(on_engine_created=instrument)
        """
        self._on_engine_created = on_engine_created
        return self

    def with_json_serialization(
        self,
        orjson: bool = True,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Configure JSON serialization backend.

        Args:
            orjson: If True, use orjson for faster JSON serialization.

        Returns:
            Self for method chaining.

        Raises:
            ImportError: If orjson=True but orjson is not installed.
        """
        self._use_orjson = orjson
        return self

    def with_extra_kwargs(self, **kwargs: object) -> AsyncSessionManagerBuilder[SessionT]:
        """Add additional keyword arguments for create_async_engine.

        Args:
            **kwargs: Additional engine configuration.

        Returns:
            Self for method chaining.
        """
        self._extra_kwargs.update(kwargs)
        return self

    def with_dispose_timeout(self, timeout: float) -> AsyncSessionManagerBuilder[SessionT]:
        """Configure how long :meth:`AsyncSessionManager.aclose` waits for engine disposal.

        Args:
            timeout: Maximum seconds to wait for the engine to dispose.

        Returns:
            Self for method chaining.
        """
        self._dispose_timeout = timeout
        return self

    def build(self) -> AsyncSessionManager[SessionT]:
        """Build AsyncSessionManager instance with configured parameters.

        Returns:
            Configured AsyncSessionManager instance.

        Raises:
            ImportError: If use_orjson=True but orjson is not installed.

        Examples:
            >>> manager = builder.build()
            >>> async with manager.get_session() as session:
            ...     await session.execute(...)
        """
        kwargs: dict[str, object] = {
            "url": self._url,
            "echo": self._echo,
            "poolclass": self._poolclass,
            "session_class": self._session_class,
            "expire_on_commit": self._expire_on_commit,
            "connect_args": self._connect_args,
            "isolation_level": self._isolation_level,
            "pool_settings": self._pool_settings,
            "use_orjson": self._use_orjson,
            "metrics": self._metrics,
            "on_engine_created": self._on_engine_created,
        }
        if self._dispose_timeout is not None:
            kwargs["dispose_timeout"] = self._dispose_timeout
        return AsyncSessionManager(**kwargs, **self._extra_kwargs)  # type: ignore[arg-type]
__init__(url)

Initialize builder with database URL (required).

Parameters:

Name Type Description Default
url str

Database connection URL (required parameter).

required
Source code in sqlalchemy_foundation_kit/session/builder.py
def __init__(self, url: str) -> None:
    """Initialize builder with database URL (required).

    Args:
        url: Database connection URL (required parameter).
    """
    self._url = url
    self._echo: bool = False
    self._poolclass: str | type = "null"
    self._session_class: type[SessionT] | None = None
    self._expire_on_commit: bool = False
    self._connect_args: dict[str, object] | None = None
    self._isolation_level: str | None = None
    self._pool_settings: PoolSettingsProtocol | None = None
    self._use_orjson: bool = False
    self._metrics: PostgresMetricsProtocol | None = None
    self._on_engine_created: Callable[[AsyncEngine], None] | None = None
    self._dispose_timeout: float | None = None
    self._extra_kwargs: dict[str, object] = {}
build()

Build AsyncSessionManager instance with configured parameters.

Returns:

Type Description
AsyncSessionManager[SessionT]

Configured AsyncSessionManager instance.

Raises:

Type Description
ImportError

If use_orjson=True but orjson is not installed.

Examples:

>>> manager = builder.build()
>>> async with manager.get_session() as session:
...     await session.execute(...)
Source code in sqlalchemy_foundation_kit/session/builder.py
def build(self) -> AsyncSessionManager[SessionT]:
    """Build AsyncSessionManager instance with configured parameters.

    Returns:
        Configured AsyncSessionManager instance.

    Raises:
        ImportError: If use_orjson=True but orjson is not installed.

    Examples:
        >>> manager = builder.build()
        >>> async with manager.get_session() as session:
        ...     await session.execute(...)
    """
    kwargs: dict[str, object] = {
        "url": self._url,
        "echo": self._echo,
        "poolclass": self._poolclass,
        "session_class": self._session_class,
        "expire_on_commit": self._expire_on_commit,
        "connect_args": self._connect_args,
        "isolation_level": self._isolation_level,
        "pool_settings": self._pool_settings,
        "use_orjson": self._use_orjson,
        "metrics": self._metrics,
        "on_engine_created": self._on_engine_created,
    }
    if self._dispose_timeout is not None:
        kwargs["dispose_timeout"] = self._dispose_timeout
    return AsyncSessionManager(**kwargs, **self._extra_kwargs)  # type: ignore[arg-type]
with_callbacks(on_engine_created)

Register engine creation callback.

Useful for attaching instrumentation (OpenTelemetry), custom event listeners, or debug hooks right after engine creation.

Parameters:

Name Type Description Default
on_engine_created Callable[[AsyncEngine], None]

Callback invoked with AsyncEngine after creation.

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Examples:

>>> from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
>>> def instrument(engine):
...     SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
>>> builder.with_callbacks(on_engine_created=instrument)
Source code in sqlalchemy_foundation_kit/session/builder.py
def with_callbacks(
    self,
    on_engine_created: Callable[[AsyncEngine], None],
) -> AsyncSessionManagerBuilder[SessionT]:
    """Register engine creation callback.

    Useful for attaching instrumentation (OpenTelemetry), custom event
    listeners, or debug hooks right after engine creation.

    Args:
        on_engine_created: Callback invoked with AsyncEngine after creation.

    Returns:
        Self for method chaining.

    Examples:
        >>> from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
        >>> def instrument(engine):
        ...     SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
        >>> builder.with_callbacks(on_engine_created=instrument)
    """
    self._on_engine_created = on_engine_created
    return self
with_connect_args(**connect_args)

Add database driver connection arguments.

Parameters:

Name Type Description Default
**connect_args object

Arguments passed to the database driver.

{}

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Examples:

>>> builder.with_connect_args(
...     server_settings={"application_name": "myapp"},
...     command_timeout=60,
... )
Source code in sqlalchemy_foundation_kit/session/builder.py
def with_connect_args(
    self,
    **connect_args: object,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Add database driver connection arguments.

    Args:
        **connect_args: Arguments passed to the database driver.

    Returns:
        Self for method chaining.

    Examples:
        >>> builder.with_connect_args(
        ...     server_settings={"application_name": "myapp"},
        ...     command_timeout=60,
        ... )
    """
    if self._connect_args is None:
        self._connect_args = {}
    self._connect_args.update(connect_args)
    return self
with_dispose_timeout(timeout)

Configure how long :meth:AsyncSessionManager.aclose waits for engine disposal.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for the engine to dispose.

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_dispose_timeout(self, timeout: float) -> AsyncSessionManagerBuilder[SessionT]:
    """Configure how long :meth:`AsyncSessionManager.aclose` waits for engine disposal.

    Args:
        timeout: Maximum seconds to wait for the engine to dispose.

    Returns:
        Self for method chaining.
    """
    self._dispose_timeout = timeout
    return self
with_echo(echo=True)

Enable SQL statement logging.

Parameters:

Name Type Description Default
echo bool

If True, SQLAlchemy logs all SQL statements.

True

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_echo(self, echo: bool = True) -> AsyncSessionManagerBuilder[SessionT]:
    """Enable SQL statement logging.

    Args:
        echo: If True, SQLAlchemy logs all SQL statements.

    Returns:
        Self for method chaining.
    """
    self._echo = echo
    return self
with_expire_on_commit(expire=True)

Configure object expiration behavior after commit.

Parameters:

Name Type Description Default
expire bool

If True, all objects expire after commit.

True

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_expire_on_commit(
    self,
    expire: bool = True,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Configure object expiration behavior after commit.

    Args:
        expire: If True, all objects expire after commit.

    Returns:
        Self for method chaining.
    """
    self._expire_on_commit = expire
    return self
with_extra_kwargs(**kwargs)

Add additional keyword arguments for create_async_engine.

Parameters:

Name Type Description Default
**kwargs object

Additional engine configuration.

{}

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_extra_kwargs(self, **kwargs: object) -> AsyncSessionManagerBuilder[SessionT]:
    """Add additional keyword arguments for create_async_engine.

    Args:
        **kwargs: Additional engine configuration.

    Returns:
        Self for method chaining.
    """
    self._extra_kwargs.update(kwargs)
    return self
with_isolation_level(isolation_level)

Set default transaction isolation level.

Parameters:

Name Type Description Default
isolation_level str

Default isolation level (e.g., "READ COMMITTED").

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_isolation_level(
    self,
    isolation_level: str,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Set default transaction isolation level.

    Args:
        isolation_level: Default isolation level (e.g., "READ COMMITTED").

    Returns:
        Self for method chaining.
    """
    self._isolation_level = isolation_level
    return self
with_json_serialization(orjson=True)

Configure JSON serialization backend.

Parameters:

Name Type Description Default
orjson bool

If True, use orjson for faster JSON serialization.

True

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Raises:

Type Description
ImportError

If orjson=True but orjson is not installed.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_json_serialization(
    self,
    orjson: bool = True,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Configure JSON serialization backend.

    Args:
        orjson: If True, use orjson for faster JSON serialization.

    Returns:
        Self for method chaining.

    Raises:
        ImportError: If orjson=True but orjson is not installed.
    """
    self._use_orjson = orjson
    return self
with_metrics(metrics)

Enable connection pool metrics collection.

Parameters:

Name Type Description Default
metrics PostgresMetricsProtocol

Metrics collector implementing PostgresMetricsProtocol.

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_metrics(
    self,
    metrics: PostgresMetricsProtocol,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Enable connection pool metrics collection.

    Args:
        metrics: Metrics collector implementing PostgresMetricsProtocol.

    Returns:
        Self for method chaining.
    """
    self._metrics = metrics
    return self
with_pool(poolclass, pool_settings=None)

Configure connection pool.

Parameters:

Name Type Description Default
poolclass str | type

Pool class name or type (e.g., "queue", "null").

required
pool_settings PoolSettingsProtocol | None

Optional pool configuration settings.

None

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_pool(
    self,
    poolclass: str | type,
    pool_settings: PoolSettingsProtocol | None = None,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Configure connection pool.

    Args:
        poolclass: Pool class name or type (e.g., "queue", "null").
        pool_settings: Optional pool configuration settings.

    Returns:
        Self for method chaining.
    """
    self._poolclass = poolclass
    self._pool_settings = pool_settings
    return self
with_session_class(session_class)

Use custom session class.

Parameters:

Name Type Description Default
session_class type[SessionT]

Custom AsyncSession subclass.

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_session_class(
    self,
    session_class: type[SessionT],
) -> AsyncSessionManagerBuilder[SessionT]:
    """Use custom session class.

    Args:
        session_class: Custom AsyncSession subclass.

    Returns:
        Self for method chaining.
    """
    self._session_class = session_class
    return self

AsyncCConnection

Bases: Connection

Custom async connection class for pgbouncer magic.

This subclass overrides only the private method _get_unique_id so that prepared statement identifiers are unique per connection. That is required when using pgbouncer in transaction mode, where the same server connection may be reused for different logical connections.

See: https://github.com/sqlalchemy/sqlalchemy/issues/6467

Source code in sqlalchemy_foundation_kit/session/connection.py
class AsyncCConnection(asyncpg.Connection):
    """Custom async connection class for pgbouncer magic.

    This subclass overrides only the private method _get_unique_id so that
    prepared statement identifiers are unique per connection. That is required
    when using pgbouncer in transaction mode, where the same server connection
    may be reused for different logical connections.

    See: https://github.com/sqlalchemy/sqlalchemy/issues/6467
    """

    def _get_unique_id(self, prefix: str) -> str:
        """Generate unique ID for prepared statements.

        Args:
            prefix: Prefix for the unique ID.

        Returns:
            A unique string ID including the prefix and a UUID.
        """
        return f"__asyncpg_{prefix}_{uuid.uuid4()}__"

AsyncUnitOfWork

Bases: Protocol, Generic[T_co]

Provides transactional context for repository operations.

Provides three modes of operation
  • transaction(): For write operations with automatic commit/rollback.
  • managed_session(): For write operations with manual commit/rollback control.
  • query(): For read-only operations without transaction management.
Source code in sqlalchemy_foundation_kit/uow/protocols.py
class AsyncUnitOfWork(Protocol, Generic[T_co]):
    """Provides transactional context for repository operations.

    Provides three modes of operation:
        - ``transaction()``: For write operations with automatic commit/rollback.
        - ``managed_session()``: For write operations with **manual** commit/rollback control.
        - ``query()``: For read-only operations without transaction management.
    """

    def transaction(
        self,
        isolation_level: str | None = None,
        flush_before_commit: bool | None = None,
    ) -> AbstractAsyncContextManager[T_co]:
        """Create a new transaction context with automatic commit/rollback.

        Args:
            isolation_level: Optional transaction isolation level.
            flush_before_commit: If True, flush session before commit to surface
                constraint violations within transaction. If ``None``, the implementation
                applies its own default (typically configured at construction time).
        """

    def managed_session(
        self,
        isolation_level: str | None = None,
    ) -> AbstractAsyncContextManager[tuple[T_co, Any]]:
        """Create a session with manual transaction control.

        Unlike :meth:`transaction`, this does **NOT** auto-commit on success. The caller
        must explicitly call ``session.commit()`` or ``session.rollback()``. Useful for
        complex transactional logic where the commit decision depends on multiple
        conditions or external factors.

        The second element of the yielded tuple is the underlying session object
        (typed as ``Any`` in the protocol to avoid leaking SQLAlchemy types — concrete
        implementations like ``AsyncSQLAlchemyUnitOfWork`` yield ``AsyncSession``).

        On exception inside the context, the session is automatically rolled back.

        Args:
            isolation_level: Optional transaction isolation level.
        """

    def query(self, isolation_level: str | None = None) -> AbstractAsyncContextManager[T_co]:
        """Create a read-only query context without transaction management."""
managed_session(isolation_level=None)

Create a session with manual transaction control.

Unlike :meth:transaction, this does NOT auto-commit on success. The caller must explicitly call session.commit() or session.rollback(). Useful for complex transactional logic where the commit decision depends on multiple conditions or external factors.

The second element of the yielded tuple is the underlying session object (typed as Any in the protocol to avoid leaking SQLAlchemy types — concrete implementations like AsyncSQLAlchemyUnitOfWork yield AsyncSession).

On exception inside the context, the session is automatically rolled back.

Parameters:

Name Type Description Default
isolation_level str | None

Optional transaction isolation level.

None
Source code in sqlalchemy_foundation_kit/uow/protocols.py
def managed_session(
    self,
    isolation_level: str | None = None,
) -> AbstractAsyncContextManager[tuple[T_co, Any]]:
    """Create a session with manual transaction control.

    Unlike :meth:`transaction`, this does **NOT** auto-commit on success. The caller
    must explicitly call ``session.commit()`` or ``session.rollback()``. Useful for
    complex transactional logic where the commit decision depends on multiple
    conditions or external factors.

    The second element of the yielded tuple is the underlying session object
    (typed as ``Any`` in the protocol to avoid leaking SQLAlchemy types — concrete
    implementations like ``AsyncSQLAlchemyUnitOfWork`` yield ``AsyncSession``).

    On exception inside the context, the session is automatically rolled back.

    Args:
        isolation_level: Optional transaction isolation level.
    """
query(isolation_level=None)

Create a read-only query context without transaction management.

Source code in sqlalchemy_foundation_kit/uow/protocols.py
def query(self, isolation_level: str | None = None) -> AbstractAsyncContextManager[T_co]:
    """Create a read-only query context without transaction management."""
transaction(isolation_level=None, flush_before_commit=None)

Create a new transaction context with automatic commit/rollback.

Parameters:

Name Type Description Default
isolation_level str | None

Optional transaction isolation level.

None
flush_before_commit bool | None

If True, flush session before commit to surface constraint violations within transaction. If None, the implementation applies its own default (typically configured at construction time).

None
Source code in sqlalchemy_foundation_kit/uow/protocols.py
def transaction(
    self,
    isolation_level: str | None = None,
    flush_before_commit: bool | None = None,
) -> AbstractAsyncContextManager[T_co]:
    """Create a new transaction context with automatic commit/rollback.

    Args:
        isolation_level: Optional transaction isolation level.
        flush_before_commit: If True, flush session before commit to surface
            constraint violations within transaction. If ``None``, the implementation
            applies its own default (typically configured at construction time).
    """

AsyncSQLAlchemyUnitOfWork

Bases: AsyncUnitOfWork[T], Generic[T]

Base async SQLAlchemy Unit of Work.

Provides transactional context for repository operations using SQLAlchemy AsyncSession.

Methods:

Name Description
transaction

For write operations with automatic commit/rollback.

query

For read-only operations without transaction management.

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
class AsyncSQLAlchemyUnitOfWork(AsyncUnitOfWork[T], Generic[T]):
    """Base async SQLAlchemy Unit of Work.

    Provides transactional context for repository operations using SQLAlchemy AsyncSession.

    Methods:
        transaction(): For write operations with automatic commit/rollback.
        query(): For read-only operations without transaction management.
    """

    def __init__(
        self,
        session_maker: async_sessionmaker[AsyncSession],
        transaction_factory: Callable[[AsyncSession], T],
        *,
        flush_before_commit: bool = True,
    ) -> None:
        """Initialize the unit of work.

        Args:
            session_maker: Async session factory.
            transaction_factory: Callable producing the transaction object exposed to callers.
            flush_before_commit: Default ``flush_before_commit`` policy applied when
                :meth:`transaction` is called without an explicit override.
                Set to ``False`` here once if your service prefers SQLAlchemy's default
                "flush on commit" semantics instead of an early flush.
        """
        self._session_maker = session_maker
        self._transaction_factory = transaction_factory
        self._flush_before_commit = flush_before_commit

    @asynccontextmanager
    async def open_session(
        self,
        isolation_level: IsolationLevel | str | None = None,
    ) -> AsyncIterator[AsyncSession]:
        """Open a session with optional isolation level applied.

        This is the extension point for subclasses that need custom session setup
        (e.g., RLS context, session-level GUCs, custom statement timeouts).
        Override to wrap or augment session creation while preserving isolation handling.

        Used internally by :meth:`transaction` and :meth:`query`.

        Args:
            isolation_level: Optional transaction isolation level.

        Yields:
            Configured AsyncSession instance.

        Raises:
            ValueError: If isolation_level is not supported.

        Examples:
            Subclass that sets a session-level GUC for every transaction:

                class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
                    def __init__(self, session_maker, tx_factory, tenant_id):
                        super().__init__(session_maker, tx_factory)
                        self._tenant_id = tenant_id

                    @asynccontextmanager
                    async def open_session(self, isolation_level=None):
                        async with super().open_session(isolation_level) as session:
                            await session.execute(
                                text("SET app.tenant_id = :tid"),
                                {"tid": self._tenant_id},
                            )
                            yield session
        """
        async with self._session_maker() as session:
            # Apply isolation level if specified (DRY: using utility function)
            await apply_isolation_level(session, isolation_level)
            yield session

    @asynccontextmanager
    async def transaction(
        self,
        isolation_level: IsolationLevel | str | None = None,
        flush_before_commit: bool | None = None,
    ) -> AsyncIterator[T]:
        """Create a new transaction context with automatic commit/rollback.

        The Unit of Work automatically commits the transaction on successful exit
        and rolls back on exception. This ensures atomic operations.

        Args:
            isolation_level: Optional transaction isolation level.
                Can be an IsolationLevel enum member or a string value.
                Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".
            flush_before_commit: If True, flush the session before commit to surface
                constraint violations while still inside the transaction. If ``None`` (default),
                falls back to the value passed to the constructor (``True`` unless overridden).

        Raises:
            ValueError: If isolation_level is not supported.

        Examples:
            Write operation with automatic commit:
                async with uow.transaction() as tx:
                    await tx.users.create(...)
                    # Auto-commit on exit, rollback on exception
        """
        if flush_before_commit is None:
            flush_before_commit = self._flush_before_commit

        async with self.open_session(isolation_level) as session, session.begin():
            uow = self._transaction_factory(session)
            yield uow
            if flush_before_commit:
                # Flush changes before commit to catch constraint violations early
                # while still inside the transaction context.
                try:
                    await session.flush()
                except SQLAlchemyError as e:
                    logger.warning("Database flush failed", extra={"error": str(e)})
                    raise

    @asynccontextmanager
    async def managed_session(
        self,
        isolation_level: IsolationLevel | str | None = None,
    ) -> AsyncIterator[tuple[T, AsyncSession]]:
        """Create a session with manual transaction control.

        Unlike transaction(), this does NOT auto-commit. The caller must
        explicitly call session.commit() or session.rollback(). This is useful
        for complex transactional logic where commit decision depends on multiple
        conditions or external factors.

        A transaction is started automatically, but you have full control over
        when to commit or rollback. If you exit without calling either, SQLAlchemy
        will automatically rollback on session close.

        Args:
            isolation_level: Optional transaction isolation level.
                Can be an IsolationLevel enum member or a string value.
                Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

        Yields:
            Tuple of (transaction object, session) for manual control.

        Raises:
            ValueError: If isolation_level is not supported.

        Examples:
            Manual transaction control:
                async with uow.managed_session() as (tx, session):
                    await tx.users.create(...)

                    # Manually decide when to commit
                    if should_commit:
                        await session.commit()
                    else:
                        await session.rollback()

            Conditional commit based on external service:
                async with uow.managed_session() as (tx, session):
                    user = await tx.users.create(...)

                    # Call external service
                    result = await external_api.validate(user)

                    if result.success:
                        await session.commit()
                    else:
                        await session.rollback()

            Multiple operations with intermediate decision:
                async with uow.managed_session() as (tx, session):
                    user = await tx.users.create(...)

                    # First checkpoint
                    await session.flush()

                    # More operations
                    await tx.profiles.create(user_id=user.id)

                    # Final decision
                    await session.commit()

        Note:
            Prefer :meth:`transaction` for the vast majority of use cases — it commits
            automatically and enforces the UoW pattern. This method is an advanced escape
            hatch for scenarios where the commit/rollback decision depends on conditions
            that can only be evaluated after data is written (e.g., external service
            validation). ``session.commit()`` calls belong exclusively at the use-case
            boundary via this method, never inside repository implementations.

        Warning:
            You MUST explicitly call session.commit() or session.rollback().
            Exiting the context without calling either will result in automatic
            rollback when the session closes.
        """
        async with self.open_session(isolation_level) as session:
            # Start transaction WITHOUT context manager - no auto-commit
            await session.begin()
            try:
                uow = self._transaction_factory(session)
                yield uow, session
            except (Exception, asyncio.CancelledError):
                # Auto-rollback on exception OR cancellation
                await session.rollback()
                raise
            # User must call session.commit() or session.rollback() explicitly

    @asynccontextmanager
    async def query(
        self,
        isolation_level: IsolationLevel | str | None = None,
    ) -> AsyncIterator[T]:
        """Create a read-only query context without transaction management.

        This method is designed for read-only operations and does not start a transaction
        or perform any commit/rollback. It's semantically clearer than managed_session()
        for read operations.

        Args:
            isolation_level: Optional transaction isolation level.
                Can be an IsolationLevel enum member or a string value.
                Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

        Raises:
            ValueError: If isolation_level is not supported.

        Examples:
            Read-only query:
                async with uow.query() as qx:
                    users = await qx.users.list_all()
                    user = await qx.users.get_by_id(user_id)
                # No commit/rollback - just closes session

        Note:
            While this method is intended for read-only operations, SQLAlchemy does not enforce
            this at the session level. It's up to the caller to ensure only read operations are performed.
        """
        async with self.open_session(isolation_level) as session:
            # No transaction begin/commit - just yield the session
            uow = self._transaction_factory(session)
            yield uow
__init__(session_maker, transaction_factory, *, flush_before_commit=True)

Initialize the unit of work.

Parameters:

Name Type Description Default
session_maker async_sessionmaker[AsyncSession]

Async session factory.

required
transaction_factory Callable[[AsyncSession], T]

Callable producing the transaction object exposed to callers.

required
flush_before_commit bool

Default flush_before_commit policy applied when :meth:transaction is called without an explicit override. Set to False here once if your service prefers SQLAlchemy's default "flush on commit" semantics instead of an early flush.

True
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
def __init__(
    self,
    session_maker: async_sessionmaker[AsyncSession],
    transaction_factory: Callable[[AsyncSession], T],
    *,
    flush_before_commit: bool = True,
) -> None:
    """Initialize the unit of work.

    Args:
        session_maker: Async session factory.
        transaction_factory: Callable producing the transaction object exposed to callers.
        flush_before_commit: Default ``flush_before_commit`` policy applied when
            :meth:`transaction` is called without an explicit override.
            Set to ``False`` here once if your service prefers SQLAlchemy's default
            "flush on commit" semantics instead of an early flush.
    """
    self._session_maker = session_maker
    self._transaction_factory = transaction_factory
    self._flush_before_commit = flush_before_commit
managed_session(isolation_level=None) async

Create a session with manual transaction control.

Unlike transaction(), this does NOT auto-commit. The caller must explicitly call session.commit() or session.rollback(). This is useful for complex transactional logic where commit decision depends on multiple conditions or external factors.

A transaction is started automatically, but you have full control over when to commit or rollback. If you exit without calling either, SQLAlchemy will automatically rollback on session close.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

None

Yields:

Type Description
AsyncIterator[tuple[T, AsyncSession]]

Tuple of (transaction object, session) for manual control.

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

Manual transaction control: async with uow.managed_session() as (tx, session): await tx.users.create(...)

    # Manually decide when to commit
    if should_commit:
        await session.commit()
    else:
        await session.rollback()

Conditional commit based on external service: async with uow.managed_session() as (tx, session): user = await tx.users.create(...)

    # Call external service
    result = await external_api.validate(user)

    if result.success:
        await session.commit()
    else:
        await session.rollback()

Multiple operations with intermediate decision: async with uow.managed_session() as (tx, session): user = await tx.users.create(...)

    # First checkpoint
    await session.flush()

    # More operations
    await tx.profiles.create(user_id=user.id)

    # Final decision
    await session.commit()
Note

Prefer :meth:transaction for the vast majority of use cases — it commits automatically and enforces the UoW pattern. This method is an advanced escape hatch for scenarios where the commit/rollback decision depends on conditions that can only be evaluated after data is written (e.g., external service validation). session.commit() calls belong exclusively at the use-case boundary via this method, never inside repository implementations.

Warning

You MUST explicitly call session.commit() or session.rollback(). Exiting the context without calling either will result in automatic rollback when the session closes.

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
@asynccontextmanager
async def managed_session(
    self,
    isolation_level: IsolationLevel | str | None = None,
) -> AsyncIterator[tuple[T, AsyncSession]]:
    """Create a session with manual transaction control.

    Unlike transaction(), this does NOT auto-commit. The caller must
    explicitly call session.commit() or session.rollback(). This is useful
    for complex transactional logic where commit decision depends on multiple
    conditions or external factors.

    A transaction is started automatically, but you have full control over
    when to commit or rollback. If you exit without calling either, SQLAlchemy
    will automatically rollback on session close.

    Args:
        isolation_level: Optional transaction isolation level.
            Can be an IsolationLevel enum member or a string value.
            Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

    Yields:
        Tuple of (transaction object, session) for manual control.

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        Manual transaction control:
            async with uow.managed_session() as (tx, session):
                await tx.users.create(...)

                # Manually decide when to commit
                if should_commit:
                    await session.commit()
                else:
                    await session.rollback()

        Conditional commit based on external service:
            async with uow.managed_session() as (tx, session):
                user = await tx.users.create(...)

                # Call external service
                result = await external_api.validate(user)

                if result.success:
                    await session.commit()
                else:
                    await session.rollback()

        Multiple operations with intermediate decision:
            async with uow.managed_session() as (tx, session):
                user = await tx.users.create(...)

                # First checkpoint
                await session.flush()

                # More operations
                await tx.profiles.create(user_id=user.id)

                # Final decision
                await session.commit()

    Note:
        Prefer :meth:`transaction` for the vast majority of use cases — it commits
        automatically and enforces the UoW pattern. This method is an advanced escape
        hatch for scenarios where the commit/rollback decision depends on conditions
        that can only be evaluated after data is written (e.g., external service
        validation). ``session.commit()`` calls belong exclusively at the use-case
        boundary via this method, never inside repository implementations.

    Warning:
        You MUST explicitly call session.commit() or session.rollback().
        Exiting the context without calling either will result in automatic
        rollback when the session closes.
    """
    async with self.open_session(isolation_level) as session:
        # Start transaction WITHOUT context manager - no auto-commit
        await session.begin()
        try:
            uow = self._transaction_factory(session)
            yield uow, session
        except (Exception, asyncio.CancelledError):
            # Auto-rollback on exception OR cancellation
            await session.rollback()
            raise
open_session(isolation_level=None) async

Open a session with optional isolation level applied.

This is the extension point for subclasses that need custom session setup (e.g., RLS context, session-level GUCs, custom statement timeouts). Override to wrap or augment session creation while preserving isolation handling.

Used internally by :meth:transaction and :meth:query.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level.

None

Yields:

Type Description
AsyncIterator[AsyncSession]

Configured AsyncSession instance.

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

Subclass that sets a session-level GUC for every transaction:

class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
    def __init__(self, session_maker, tx_factory, tenant_id):
        super().__init__(session_maker, tx_factory)
        self._tenant_id = tenant_id

    @asynccontextmanager
    async def open_session(self, isolation_level=None):
        async with super().open_session(isolation_level) as session:
            await session.execute(
                text("SET app.tenant_id = :tid"),
                {"tid": self._tenant_id},
            )
            yield session
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
@asynccontextmanager
async def open_session(
    self,
    isolation_level: IsolationLevel | str | None = None,
) -> AsyncIterator[AsyncSession]:
    """Open a session with optional isolation level applied.

    This is the extension point for subclasses that need custom session setup
    (e.g., RLS context, session-level GUCs, custom statement timeouts).
    Override to wrap or augment session creation while preserving isolation handling.

    Used internally by :meth:`transaction` and :meth:`query`.

    Args:
        isolation_level: Optional transaction isolation level.

    Yields:
        Configured AsyncSession instance.

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        Subclass that sets a session-level GUC for every transaction:

            class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
                def __init__(self, session_maker, tx_factory, tenant_id):
                    super().__init__(session_maker, tx_factory)
                    self._tenant_id = tenant_id

                @asynccontextmanager
                async def open_session(self, isolation_level=None):
                    async with super().open_session(isolation_level) as session:
                        await session.execute(
                            text("SET app.tenant_id = :tid"),
                            {"tid": self._tenant_id},
                        )
                        yield session
    """
    async with self._session_maker() as session:
        # Apply isolation level if specified (DRY: using utility function)
        await apply_isolation_level(session, isolation_level)
        yield session
query(isolation_level=None) async

Create a read-only query context without transaction management.

This method is designed for read-only operations and does not start a transaction or perform any commit/rollback. It's semantically clearer than managed_session() for read operations.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

None

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

Read-only query: async with uow.query() as qx: users = await qx.users.list_all() user = await qx.users.get_by_id(user_id) # No commit/rollback - just closes session

Note

While this method is intended for read-only operations, SQLAlchemy does not enforce this at the session level. It's up to the caller to ensure only read operations are performed.

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
@asynccontextmanager
async def query(
    self,
    isolation_level: IsolationLevel | str | None = None,
) -> AsyncIterator[T]:
    """Create a read-only query context without transaction management.

    This method is designed for read-only operations and does not start a transaction
    or perform any commit/rollback. It's semantically clearer than managed_session()
    for read operations.

    Args:
        isolation_level: Optional transaction isolation level.
            Can be an IsolationLevel enum member or a string value.
            Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        Read-only query:
            async with uow.query() as qx:
                users = await qx.users.list_all()
                user = await qx.users.get_by_id(user_id)
            # No commit/rollback - just closes session

    Note:
        While this method is intended for read-only operations, SQLAlchemy does not enforce
        this at the session level. It's up to the caller to ensure only read operations are performed.
    """
    async with self.open_session(isolation_level) as session:
        # No transaction begin/commit - just yield the session
        uow = self._transaction_factory(session)
        yield uow
transaction(isolation_level=None, flush_before_commit=None) async

Create a new transaction context with automatic commit/rollback.

The Unit of Work automatically commits the transaction on successful exit and rolls back on exception. This ensures atomic operations.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

None
flush_before_commit bool | None

If True, flush the session before commit to surface constraint violations while still inside the transaction. If None (default), falls back to the value passed to the constructor (True unless overridden).

None

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

Write operation with automatic commit: async with uow.transaction() as tx: await tx.users.create(...) # Auto-commit on exit, rollback on exception

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
@asynccontextmanager
async def transaction(
    self,
    isolation_level: IsolationLevel | str | None = None,
    flush_before_commit: bool | None = None,
) -> AsyncIterator[T]:
    """Create a new transaction context with automatic commit/rollback.

    The Unit of Work automatically commits the transaction on successful exit
    and rolls back on exception. This ensures atomic operations.

    Args:
        isolation_level: Optional transaction isolation level.
            Can be an IsolationLevel enum member or a string value.
            Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".
        flush_before_commit: If True, flush the session before commit to surface
            constraint violations while still inside the transaction. If ``None`` (default),
            falls back to the value passed to the constructor (``True`` unless overridden).

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        Write operation with automatic commit:
            async with uow.transaction() as tx:
                await tx.users.create(...)
                # Auto-commit on exit, rollback on exception
    """
    if flush_before_commit is None:
        flush_before_commit = self._flush_before_commit

    async with self.open_session(isolation_level) as session, session.begin():
        uow = self._transaction_factory(session)
        yield uow
        if flush_before_commit:
            # Flush changes before commit to catch constraint violations early
            # while still inside the transaction context.
            try:
                await session.flush()
            except SQLAlchemyError as e:
                logger.warning("Database flush failed", extra={"error": str(e)})
                raise

AsyncUowTransaction

Bases: Protocol

Transaction-scoped repositories container.

Intended to be extended by concrete transaction types that expose repository attributes for a specific bounded context.

Source code in sqlalchemy_foundation_kit/uow/protocols.py
class AsyncUowTransaction(Protocol):
    """Transaction-scoped repositories container.

    Intended to be extended by concrete transaction types that expose
    repository attributes for a specific bounded context.
    """

AsyncSQLAlchemyUowTransaction

Bases: AsyncUowTransaction

Base async SQLAlchemy transaction-scoped repositories.

This class provides access to the underlying SQLAlchemy session and is intended to be subclassed by services to expose specific repositories.

Example

class IdentityTransaction(AsyncSQLAlchemyUowTransaction): @property def users(self) -> UserRepository: return PostgresUserRepository(self.session)

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
class AsyncSQLAlchemyUowTransaction(AsyncUowTransaction):
    """Base async SQLAlchemy transaction-scoped repositories.

    This class provides access to the underlying SQLAlchemy session and is
    intended to be subclassed by services to expose specific repositories.

    Example:
        class IdentityTransaction(AsyncSQLAlchemyUowTransaction):
            @property
            def users(self) -> UserRepository:
                return PostgresUserRepository(self.session)
    """

    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    @property
    def session(self) -> AsyncSession:
        """Get the underlying SQLAlchemy async session."""
        return self._session
session property

Get the underlying SQLAlchemy async session.

IsolationLevel

Bases: StrEnum

PostgreSQL transaction isolation levels.

Values match PostgreSQL's expected form (with spaces) for use with SQLAlchemy execution_options(isolation_level=...).

Source code in sqlalchemy_foundation_kit/uow/enums.py
class IsolationLevel(StrEnum):
    """PostgreSQL transaction isolation levels.

    Values match PostgreSQL's expected form (with spaces) for use with
    SQLAlchemy execution_options(isolation_level=...).
    """

    READ_UNCOMMITTED = "READ UNCOMMITTED"
    READ_COMMITTED = "READ COMMITTED"
    REPEATABLE_READ = "REPEATABLE READ"
    SERIALIZABLE = "SERIALIZABLE"

PostgresAdvisoryLockMixin

Mixin providing PostgreSQL advisory lock support for UoW transactions.

Requires the class to have a session property returning AsyncSession.

Example

class IdentityTransaction(AsyncSQLAlchemyUowTransaction, PostgresAdvisoryLockMixin): @property def users(self) -> UserRepository: return PostgresUserRepository(self.session)

Now has access to try_advisory_lock method

async with uow.transaction() as tx: if await tx.try_advisory_lock(12345): # Protected operation ...

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
class PostgresAdvisoryLockMixin:
    """Mixin providing PostgreSQL advisory lock support for UoW transactions.

    Requires the class to have a `session` property returning AsyncSession.

    Example:
        class IdentityTransaction(AsyncSQLAlchemyUowTransaction, PostgresAdvisoryLockMixin):
            @property
            def users(self) -> UserRepository:
                return PostgresUserRepository(self.session)

        # Now has access to try_advisory_lock method
        async with uow.transaction() as tx:
            if await tx.try_advisory_lock(12345):
                # Protected operation
                ...
    """

    session: AsyncSession  # Type annotation for protocol compliance

    async def try_advisory_lock(self, key: int) -> bool:
        """Acquire a Postgres transaction-scoped advisory lock.

        Delegates to :func:`try_advisory_xact_lock` for actual locking logic.

        Args:
            key: Integer lock key.

        Returns:
            True if lock was acquired, False if already held by another session.
        """
        return await try_advisory_xact_lock(self.session, key)
try_advisory_lock(key) async

Acquire a Postgres transaction-scoped advisory lock.

Delegates to :func:try_advisory_xact_lock for actual locking logic.

Parameters:

Name Type Description Default
key int

Integer lock key.

required

Returns:

Type Description
bool

True if lock was acquired, False if already held by another session.

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
async def try_advisory_lock(self, key: int) -> bool:
    """Acquire a Postgres transaction-scoped advisory lock.

    Delegates to :func:`try_advisory_xact_lock` for actual locking logic.

    Args:
        key: Integer lock key.

    Returns:
        True if lock was acquired, False if already held by another session.
    """
    return await try_advisory_xact_lock(self.session, key)

SupportsAdvisoryLock

Bases: Protocol

Capability protocol for transactions supporting advisory locks.

Use this when your transaction needs PostgreSQL advisory lock support. Not all transactions require this capability (e.g., in-memory, read-only).

Source code in sqlalchemy_foundation_kit/uow/protocols.py
class SupportsAdvisoryLock(Protocol):
    """Capability protocol for transactions supporting advisory locks.

    Use this when your transaction needs PostgreSQL advisory lock support.
    Not all transactions require this capability (e.g., in-memory, read-only).
    """

    async def try_advisory_lock(self, key: int) -> bool:
        """Try to acquire a transaction-scoped advisory lock identified by ``key``.

        Returns ``True`` if the lock was acquired (and is held for the rest of the
        transaction), ``False`` if another transaction already holds it.
        """
        ...
try_advisory_lock(key) async

Try to acquire a transaction-scoped advisory lock identified by key.

Returns True if the lock was acquired (and is held for the rest of the transaction), False if another transaction already holds it.

Source code in sqlalchemy_foundation_kit/uow/protocols.py
async def try_advisory_lock(self, key: int) -> bool:
    """Try to acquire a transaction-scoped advisory lock identified by ``key``.

    Returns ``True`` if the lock was acquired (and is held for the rest of the
    transaction), ``False`` if another transaction already holds it.
    """
    ...

RetryConfig dataclass

Configuration for retry behavior with exponential backoff.

Attributes:

Name Type Description
max_retries int

Maximum number of attempts before giving up.

retry_delay float

Base delay in seconds; actual delay is retry_delay * 2 ** attempt, capped at max_backoff_delay.

max_backoff_delay float

Maximum delay between retries in seconds. Prevents exponential backoff from growing indefinitely.

Source code in sqlalchemy_foundation_kit/session/retry.py
@dataclass(frozen=True)
class RetryConfig:
    """Configuration for retry behavior with exponential backoff.

    Attributes:
        max_retries: Maximum number of attempts before giving up.
        retry_delay: Base delay in seconds; actual delay is ``retry_delay * 2 ** attempt``,
            capped at ``max_backoff_delay``.
        max_backoff_delay: Maximum delay between retries in seconds.
            Prevents exponential backoff from growing indefinitely.
    """

    max_retries: int = 3
    retry_delay: float = 1.0
    max_backoff_delay: float = 60.0

PostgresSettingsProtocol

Bases: Protocol

Protocol for PostgreSQL configuration.

Organized protocol with grouped settings for connection, pool, and query configuration.

Attributes:

Name Type Description
connection ConnectionSettingsProtocol

Connection parameters (host, port, user, database).

pool PoolSettingsProtocol

Connection pool settings.

query QuerySettingsProtocol

Query execution and transaction settings.

application_name str

Application identifier for connections.

db_schema str | None

Optional PostgreSQL schema name.

use_orjson_serialization bool

Enable orjson for JSON operations.

jit str | None

JIT compilation setting (PgBouncer compatibility).

Examples:

Implementing the protocol: >>> class MyConfig: ... connection: ConnectionSettingsProtocol ... pool: PoolSettingsProtocol ... query: QuerySettingsProtocol ... application_name: str = "my-app" ... db_schema: str | None = None ... use_orjson_serialization: bool = True ... jit: str | None = "off" ... ... def to_dsn(self) -> str: ... return f"postgresql://{self.connection.user}@{self.connection.host}..."

Using the protocol: >>> def create_engine(config: PostgresSettingsProtocol): ... dsn = config.to_dsn() ... pool_size = config.pool.pool_size ... echo = config.query.echo

Source code in sqlalchemy_foundation_kit/config/postgres.py
class PostgresSettingsProtocol(Protocol):
    """Protocol for PostgreSQL configuration.

    Organized protocol with grouped settings for connection, pool, and query configuration.

    Attributes:
        connection: Connection parameters (host, port, user, database).
        pool: Connection pool settings.
        query: Query execution and transaction settings.
        application_name: Application identifier for connections.
        db_schema: Optional PostgreSQL schema name.
        use_orjson_serialization: Enable orjson for JSON operations.
        jit: JIT compilation setting (PgBouncer compatibility).

    Examples:
        Implementing the protocol:
            >>> class MyConfig:
            ...     connection: ConnectionSettingsProtocol
            ...     pool: PoolSettingsProtocol
            ...     query: QuerySettingsProtocol
            ...     application_name: str = "my-app"
            ...     db_schema: str | None = None
            ...     use_orjson_serialization: bool = True
            ...     jit: str | None = "off"
            ...
            ...     def to_dsn(self) -> str:
            ...         return f"postgresql://{self.connection.user}@{self.connection.host}..."

        Using the protocol:
            >>> def create_engine(config: PostgresSettingsProtocol):
            ...     dsn = config.to_dsn()
            ...     pool_size = config.pool.pool_size
            ...     echo = config.query.echo
    """

    connection: ConnectionSettingsProtocol
    pool: PoolSettingsProtocol
    query: QuerySettingsProtocol
    application_name: str
    db_schema: str | None
    use_orjson_serialization: bool
    jit: str | None

    def to_dsn(self) -> str:
        """Convert config to DSN string.

        Returns PostgreSQL connection string in format:
        postgresql+asyncpg://user:password@host:port/database

        Examples:
            >>> config.to_dsn()
            'postgresql+asyncpg://user:***@localhost:5432/mydb'
        """
        ...
to_dsn()

Convert config to DSN string.

Returns PostgreSQL connection string in format: postgresql+asyncpg://user:password@host:port/database

Examples:

>>> config.to_dsn()
'postgresql+asyncpg://user:***@localhost:5432/mydb'
Source code in sqlalchemy_foundation_kit/config/postgres.py
def to_dsn(self) -> str:
    """Convert config to DSN string.

    Returns PostgreSQL connection string in format:
    postgresql+asyncpg://user:password@host:port/database

    Examples:
        >>> config.to_dsn()
        'postgresql+asyncpg://user:***@localhost:5432/mydb'
    """
    ...

ConnectionSettingsProtocol

Bases: Protocol

Protocol for PostgreSQL connection settings.

Defines connection parameters required for establishing database connections.

Attributes:

Name Type Description
host str

PostgreSQL server hostname or IP address.

port int

PostgreSQL server port number.

user str

Database username for authentication.

password PasswordLike | str

Database password — either a plain str or a SecretStr-like object implementing get_secret_value().

database str

Target database name.

Source code in sqlalchemy_foundation_kit/config/postgres.py
class ConnectionSettingsProtocol(Protocol):
    """Protocol for PostgreSQL connection settings.

    Defines connection parameters required for establishing database connections.

    Attributes:
        host: PostgreSQL server hostname or IP address.
        port: PostgreSQL server port number.
        user: Database username for authentication.
        password: Database password — either a plain ``str`` or a SecretStr-like object
            implementing ``get_secret_value()``.
        database: Target database name.
    """

    host: str
    port: int
    user: str
    password: PasswordLike | str
    database: str

PoolSettingsProtocol

Bases: Protocol

Protocol for PostgreSQL connection pool settings.

Defines pool configuration for SQLAlchemy engine connection management.

Attributes:

Name Type Description
kind str | type

Connection pool implementation (queue, static, etc.).

size int | None

Number of connections to maintain in pool.

max_overflow int | None

Additional connections allowed when pool exhausted.

pre_ping bool

Test connection health before checkout.

recycle int | None

Recycle connections after N seconds.

timeout float | None

Timeout for acquiring connection from pool.

Examples:

>>> pool: PoolSettingsProtocol = ...
>>> if pool.size > 100:
...     logger.warning("Large pool size detected")
Source code in sqlalchemy_foundation_kit/config/postgres.py
class PoolSettingsProtocol(Protocol):
    """Protocol for PostgreSQL connection pool settings.

    Defines pool configuration for SQLAlchemy engine connection management.

    Attributes:
        kind: Connection pool implementation (queue, static, etc.).
        size: Number of connections to maintain in pool.
        max_overflow: Additional connections allowed when pool exhausted.
        pre_ping: Test connection health before checkout.
        recycle: Recycle connections after N seconds.
        timeout: Timeout for acquiring connection from pool.

    Examples:
        >>> pool: PoolSettingsProtocol = ...
        >>> if pool.size > 100:
        ...     logger.warning("Large pool size detected")
    """

    kind: str | type
    size: int | None
    max_overflow: int | None
    pre_ping: bool
    recycle: int | None
    timeout: float | None

QuerySettingsProtocol

Bases: Protocol

Protocol for PostgreSQL query execution settings.

Defines query behavior, caching, and transaction isolation configuration.

Attributes:

Name Type Description
echo bool

Enable SQL statement logging.

statement_cache_size int | None

Prepared statement cache size.

prepared_statement_cache_size int | None

Server-side prepared statement cache size.

isolation_level str | None

Transaction isolation level.

Examples:

>>> query: QuerySettingsProtocol = ...
>>> if query.echo:
...     logger.info("SQL echo enabled")
Source code in sqlalchemy_foundation_kit/config/postgres.py
class QuerySettingsProtocol(Protocol):
    """Protocol for PostgreSQL query execution settings.

    Defines query behavior, caching, and transaction isolation configuration.

    Attributes:
        echo: Enable SQL statement logging.
        statement_cache_size: Prepared statement cache size.
        prepared_statement_cache_size: Server-side prepared statement cache size.
        isolation_level: Transaction isolation level.

    Examples:
        >>> query: QuerySettingsProtocol = ...
        >>> if query.echo:
        ...     logger.info("SQL echo enabled")
    """

    echo: bool
    statement_cache_size: int | None
    prepared_statement_cache_size: int | None
    isolation_level: str | None

PostgresMetricsProtocol

Bases: PoolStatsRecorder, CheckoutRecorder, ErrorRecorder, Protocol

Composite protocol covering all PostgreSQL metrics capabilities.

Aggregates the narrow capability protocols for convenience. Implementations that only need a subset can implement the individual protocols directly.

Examples:

>>> class MyMetrics:
...     def record_pool_stats(self, pool_size: int, pool_checked_out: int, pool_overflow: int) -> None: ...
...     def record_checkout(self, duration: float) -> None: ...
...     def record_error(self, error_type: str, is_timeout: bool = False) -> None: ...
>>> metrics: PostgresMetricsProtocol = MyMetrics()
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
class PostgresMetricsProtocol(PoolStatsRecorder, CheckoutRecorder, ErrorRecorder, Protocol):
    """Composite protocol covering all PostgreSQL metrics capabilities.

    Aggregates the narrow capability protocols for convenience. Implementations
    that only need a subset can implement the individual protocols directly.

    Examples:
        >>> class MyMetrics:
        ...     def record_pool_stats(self, pool_size: int, pool_checked_out: int, pool_overflow: int) -> None: ...
        ...     def record_checkout(self, duration: float) -> None: ...
        ...     def record_error(self, error_type: str, is_timeout: bool = False) -> None: ...
        >>> metrics: PostgresMetricsProtocol = MyMetrics()
    """
record_checkout(duration)

Record a database connection checkout from the pool.

Parameters:

Name Type Description Default
duration float

Time taken to acquire the connection from the pool, in seconds.

required
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
def record_checkout(self, duration: float) -> None:
    """Record a database connection checkout from the pool.

    Args:
        duration: Time taken to acquire the connection from the pool, in seconds.
    """
    ...
record_error(error_type, is_timeout=False)

Record a database connection or execution error.

Parameters:

Name Type Description Default
error_type str

The type of error that occurred (e.g., "OperationalError").

required
is_timeout bool

True if this error was specifically a connection checkout timeout.

False
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
def record_error(self, error_type: str, is_timeout: bool = False) -> None:
    """Record a database connection or execution error.

    Args:
        error_type: The type of error that occurred (e.g., "OperationalError").
        is_timeout: True if this error was specifically a connection checkout timeout.
    """
    ...
record_pool_stats(pool_size, pool_checked_out, pool_overflow)

Record database connection pool statistics.

Parameters:

Name Type Description Default
pool_size int

Current total number of connections in the pool.

required
pool_checked_out int

Number of connections currently in use.

required
pool_overflow int

Number of connections over the configured pool_size.

required
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
def record_pool_stats(
    self,
    pool_size: int,
    pool_checked_out: int,
    pool_overflow: int,
) -> None:
    """Record database connection pool statistics.

    Args:
        pool_size: Current total number of connections in the pool.
        pool_checked_out: Number of connections currently in use.
        pool_overflow: Number of connections over the configured pool_size.
    """
    ...

create_async_session_manager(postgres_config, application_name=None, metrics=None, on_engine_created=None, connection_class=None, extra_server_settings=None, extra_connect_args=None, **kwargs)

Create async session manager with PostgreSQL-specific configuration.

Parameters:

Name Type Description Default
postgres_config PostgresSettingsProtocol

PostgreSQL configuration implementing PostgresSettingsProtocol.

required
application_name str | None

Optional custom application name. If None, uses postgres_config.application_name.

None
metrics PostgresMetricsProtocol | None

Optional metrics collector for connection pool monitoring.

None
on_engine_created Callable[[AsyncEngine], None] | None

Optional callback invoked with the AsyncEngine right after creation. Use it to attach OpenTelemetry instrumentation, custom listeners, etc.

None
connection_class type[Connection] | None

Custom asyncpg Connection subclass. Defaults to AsyncCConnection which provides pgbouncer transaction-mode compatibility.

None
extra_server_settings dict[str, str] | None

Additional PostgreSQL server_settings to merge with defaults (e.g., {"statement_timeout": "30000", "timezone": "UTC"}). User-provided keys override library defaults.

None
extra_connect_args dict[str, object] | None

Additional asyncpg connect_args to merge with defaults (e.g., {"command_timeout": 60}). User-provided keys override library defaults.

None
**kwargs Any

Additional keyword arguments passed to AsyncSessionManager.

{}

Returns:

Type Description
AsyncSessionManager[AsyncSession]

Configured AsyncSessionManager instance.

Examples:

Basic usage: >>> manager = create_async_session_manager(postgres_config)

With metrics: >>> from sqlalchemy_foundation_kit.contrib.metrics import PostgresMetrics >>> manager = create_async_session_manager(postgres_config, metrics=PostgresMetrics())

With custom server settings and command timeout: >>> manager = create_async_session_manager( ... postgres_config, ... extra_server_settings={"statement_timeout": "30000", "timezone": "UTC"}, ... extra_connect_args={"command_timeout": 60}, ... )

With OpenTelemetry tracing bound to this engine: >>> from sqlalchemy_foundation_kit.contrib.telemetry import instrument_engine >>> manager = create_async_session_manager( ... postgres_config, ... on_engine_created=instrument_engine, ... )

Source code in sqlalchemy_foundation_kit/session/factories.py
def create_async_session_manager(
    postgres_config: PostgresSettingsProtocol,
    application_name: str | None = None,
    metrics: PostgresMetricsProtocol | None = None,
    on_engine_created: Callable[[AsyncEngine], None] | None = None,
    connection_class: type[asyncpg.Connection] | None = None,
    extra_server_settings: dict[str, str] | None = None,
    extra_connect_args: dict[str, object] | None = None,
    **kwargs: Any,
) -> AsyncSessionManager[AsyncSession]:
    """Create async session manager with PostgreSQL-specific configuration.

    Args:
        postgres_config: PostgreSQL configuration implementing PostgresSettingsProtocol.
        application_name: Optional custom application name. If None, uses postgres_config.application_name.
        metrics: Optional metrics collector for connection pool monitoring.
        on_engine_created: Optional callback invoked with the AsyncEngine right after creation.
            Use it to attach OpenTelemetry instrumentation, custom listeners, etc.
        connection_class: Custom asyncpg Connection subclass. Defaults to ``AsyncCConnection``
            which provides pgbouncer transaction-mode compatibility.
        extra_server_settings: Additional PostgreSQL ``server_settings`` to merge with defaults
            (e.g., ``{"statement_timeout": "30000", "timezone": "UTC"}``). User-provided keys
            override library defaults.
        extra_connect_args: Additional asyncpg ``connect_args`` to merge with defaults
            (e.g., ``{"command_timeout": 60}``). User-provided keys override library defaults.
        **kwargs: Additional keyword arguments passed to AsyncSessionManager.

    Returns:
        Configured AsyncSessionManager instance.

    Examples:
        Basic usage:
            >>> manager = create_async_session_manager(postgres_config)

        With metrics:
            >>> from sqlalchemy_foundation_kit.contrib.metrics import PostgresMetrics
            >>> manager = create_async_session_manager(postgres_config, metrics=PostgresMetrics())

        With custom server settings and command timeout:
            >>> manager = create_async_session_manager(
            ...     postgres_config,
            ...     extra_server_settings={"statement_timeout": "30000", "timezone": "UTC"},
            ...     extra_connect_args={"command_timeout": 60},
            ... )

        With OpenTelemetry tracing bound to this engine:
            >>> from sqlalchemy_foundation_kit.contrib.telemetry import instrument_engine
            >>> manager = create_async_session_manager(
            ...     postgres_config,
            ...     on_engine_created=instrument_engine,
            ... )
    """
    app_name = application_name or postgres_config.application_name

    # Build server settings with optional overrides
    server_settings: dict[str, str] = {
        "application_name": app_name,
        **({"jit": postgres_config.jit} if postgres_config.jit is not None else {}),
        **({"search_path": postgres_config.db_schema} if postgres_config.db_schema is not None else {}),
        **(extra_server_settings or {}),
    }

    # Build connect args with optional overrides
    connect_args: dict[str, object] = {
        "server_settings": server_settings,
        "statement_cache_size": postgres_config.query.statement_cache_size,
        "prepared_statement_cache_size": postgres_config.query.prepared_statement_cache_size,
        "connection_class": connection_class or AsyncCConnection,
        **(extra_connect_args or {}),
    }

    return AsyncSessionManager(
        url=postgres_config.to_dsn(),
        echo=postgres_config.query.echo,
        poolclass=postgres_config.pool.kind,
        connect_args=connect_args,
        isolation_level=postgres_config.query.isolation_level,
        pool_settings=postgres_config.pool,
        use_orjson=postgres_config.use_orjson_serialization,
        metrics=metrics,
        on_engine_created=on_engine_created,
        **kwargs,
    )

try_advisory_xact_lock(session, key) async

Acquire a Postgres transaction-scoped advisory lock.

Uses pg_try_advisory_xact_lock: non-blocking, released automatically at transaction end. String keys are hashed to integers. The key is then truncated to signed 64-bit as Postgres expects.

Parameters:

Name Type Description Default
session AsyncSession

SQLAlchemy AsyncSession within an active transaction.

required
key str | int

Lock identifier (string or integer). Strings are hashed to integers.

required

Returns:

Type Description
bool

True if lock was acquired, False if already held by another session.

Examples:

>>> async with session_maker() as session:
...     async with session.begin():
...         if await try_advisory_xact_lock(session, "my_operation"):
...             # Perform protected operation
...             await session.execute(...)
...             await session.commit()
Source code in sqlalchemy_foundation_kit/session/locks.py
async def try_advisory_xact_lock(session: AsyncSession, key: str | int) -> bool:
    """Acquire a Postgres transaction-scoped advisory lock.

    Uses ``pg_try_advisory_xact_lock``: non-blocking, released automatically
    at transaction end. String keys are hashed to integers. The key is then
    truncated to signed 64-bit as Postgres expects.

    Args:
        session: SQLAlchemy AsyncSession within an active transaction.
        key: Lock identifier (string or integer). Strings are hashed to integers.

    Returns:
        True if lock was acquired, False if already held by another session.

    Examples:
        >>> async with session_maker() as session:
        ...     async with session.begin():
        ...         if await try_advisory_xact_lock(session, "my_operation"):
        ...             # Perform protected operation
        ...             await session.execute(...)
        ...             await session.commit()
    """
    # Convert string keys to integers via hashing
    int_key = hash(key) if isinstance(key, str) else key

    result = await session.execute(
        text("SELECT pg_try_advisory_xact_lock(:k)"),
        {"k": _to_signed64(int_key)},
    )
    return bool(result.scalar())

retry_async_connection(connect_func, service_name, config=DEFAULT_RETRY_CONFIG) async

Retry an async connection callable with exponential backoff.

Parameters:

Name Type Description Default
connect_func Callable[[], Awaitable[None]]

Callable that attempts to establish/test the connection.

required
service_name str

Human-readable service name used in log messages.

required
config RetryConfig

Retry behavior configuration.

DEFAULT_RETRY_CONFIG

Raises:

Type Description
ValueError

If config.max_retries is less than 1.

Exception

Re-raises the last exception when all attempts fail.

Source code in sqlalchemy_foundation_kit/session/retry.py
async def retry_async_connection(
    connect_func: Callable[[], Awaitable[None]],
    service_name: str,
    config: RetryConfig = DEFAULT_RETRY_CONFIG,
) -> None:
    """Retry an async connection callable with exponential backoff.

    Args:
        connect_func: Callable that attempts to establish/test the connection.
        service_name: Human-readable service name used in log messages.
        config: Retry behavior configuration.

    Raises:
        ValueError: If config.max_retries is less than 1.
        Exception: Re-raises the last exception when all attempts fail.
    """
    if config.max_retries < 1:
        raise ValueError(f"max_retries must be >= 1, got {config.max_retries}")

    for attempt in range(config.max_retries):
        try:
            await connect_func()
        except Exception:
            if attempt == config.max_retries - 1:
                logger.exception(
                    "%s connection failed after %d attempts",
                    service_name,
                    config.max_retries,
                )
                raise
            logger.warning(
                "%s connection attempt %d failed, retrying...",
                service_name,
                attempt + 1,
            )
            delay = min(config.retry_delay * (2**attempt), config.max_backoff_delay)
            await asyncio.sleep(delay)
        else:
            logger.info("%s connection successful", service_name)
            return

build_engine_kwargs(echo, poolclass, isolation_level, pool_settings, connect_args, extra_kwargs, use_orjson=False)

Build SQLAlchemy engine keyword arguments.

Parameters:

Name Type Description Default
echo bool

If True, SQLAlchemy will log all SQL statements.

required
poolclass type

SQLAlchemy pool class.

required
isolation_level str | None

Default transaction isolation level.

required
pool_settings PoolSettingsProtocol | None

Pool configuration settings (validated by caller, e.g., Pydantic).

required
connect_args dict[str, object] | None

Arguments passed to the database driver.

required
extra_kwargs dict[str, object]

Additional keyword arguments for create_async_engine.

required
use_orjson bool

If True, use orjson for JSON serialization.

False

Returns:

Type Description
dict[str, object]

Dictionary of engine keyword arguments ready for create_async_engine().

Raises:

Type Description
ImportError

If use_orjson is True but orjson is not installed.

Examples:

>>> kwargs = build_engine_kwargs(
...     echo=False,
...     poolclass=NullPool,
...     isolation_level=None,
...     pool_settings=None,
...     connect_args=None,
...     extra_kwargs={},
...     use_orjson=False,
... )
>>> kwargs["echo"]
False
Source code in sqlalchemy_foundation_kit/base/engine.py
def build_engine_kwargs(
    echo: bool,
    poolclass: type,
    isolation_level: str | None,
    pool_settings: PoolSettingsProtocol | None,
    connect_args: dict[str, object] | None,
    extra_kwargs: dict[str, object],
    use_orjson: bool = False,
) -> dict[str, object]:
    """Build SQLAlchemy engine keyword arguments.

    Args:
        echo: If True, SQLAlchemy will log all SQL statements.
        poolclass: SQLAlchemy pool class.
        isolation_level: Default transaction isolation level.
        pool_settings: Pool configuration settings (validated by caller, e.g., Pydantic).
        connect_args: Arguments passed to the database driver.
        extra_kwargs: Additional keyword arguments for create_async_engine.
        use_orjson: If True, use orjson for JSON serialization.

    Returns:
        Dictionary of engine keyword arguments ready for create_async_engine().

    Raises:
        ImportError: If use_orjson is True but orjson is not installed.

    Examples:
        >>> kwargs = build_engine_kwargs(
        ...     echo=False,
        ...     poolclass=NullPool,
        ...     isolation_level=None,
        ...     pool_settings=None,
        ...     connect_args=None,
        ...     extra_kwargs={},
        ...     use_orjson=False,
        ... )
        >>> kwargs["echo"]
        False
    """
    engine_kwargs: dict[str, object] = {
        "echo": echo,
        "poolclass": poolclass,
        "isolation_level": isolation_level,
        "pool_pre_ping": pool_settings.pre_ping if pool_settings else True,
    }

    if use_orjson:
        from .serialization import configure_orjson_serialization  # noqa: PLC0415

        engine_kwargs.update(configure_orjson_serialization())

    if pool_settings:
        _apply_pool_settings(engine_kwargs, poolclass, pool_settings)

    if connect_args:
        engine_kwargs["connect_args"] = {k: v for k, v in connect_args.items() if v is not None}

    if extra_kwargs:
        engine_kwargs.update(extra_kwargs)

    return engine_kwargs

resolve_pool_class(poolclass)

Resolve pool class from string name or return the class directly.

Parameters:

Name Type Description Default
poolclass PoolClassStr | str | type

Pool class name (e.g., "null", "queue") or actual class type.

required

Returns:

Type Description
type

Pool class type.

Raises:

Type Description
ValueError

If pool class name is not recognized.

Source code in sqlalchemy_foundation_kit/base/engine.py
def resolve_pool_class(poolclass: PoolClassStr | str | type) -> type:
    """Resolve pool class from string name or return the class directly.

    Args:
        poolclass: Pool class name (e.g., "null", "queue") or actual class type.

    Returns:
        Pool class type.

    Raises:
        ValueError: If pool class name is not recognized.
    """
    if isinstance(poolclass, str):
        return PoolRegistry.resolve(poolclass)

    return poolclass

register_pool_class(name, pool_class, *, override=False)

Register a custom pool class.

Convenience wrapper around :meth:PoolRegistry.register for users who prefer a functional API over the class-based one.

Parameters:

Name Type Description Default
name str

Pool class identifier (lowercase recommended).

required
pool_class type

Pool class type to register.

required
override bool

If True, allows overriding built-in pools (use with caution).

False

Raises:

Type Description
ValueError

If name already exists and override=False.

Source code in sqlalchemy_foundation_kit/base/engine.py
def register_pool_class(name: str, pool_class: type, *, override: bool = False) -> None:
    """Register a custom pool class.

    Convenience wrapper around :meth:`PoolRegistry.register` for users who prefer
    a functional API over the class-based one.

    Args:
        name: Pool class identifier (lowercase recommended).
        pool_class: Pool class type to register.
        override: If True, allows overriding built-in pools (use with caution).

    Raises:
        ValueError: If name already exists and override=False.
    """
    PoolRegistry.register(name, pool_class, override=override)

load_orm_metadata(models_modules, metadata=None)

Load all ORM models metadata synchronously.

Imports specified modules to ensure that all SQLAlchemy models are registered in the metadata. This is useful for migrations and schema introspection with tools like Alembic.

Parameters:

Name Type Description Default
models_modules Iterable[str]

Iterable of module paths to import (e.g., ["myapp.models", "myapp.core.models"]).

required
metadata MetaData | None

Optional specific MetaData object to use. If None, uses Base.metadata.

None

Returns:

Type Description
MetaData

MetaData object containing all registered models from the imported modules.

Examples:

Load models from multiple modules: >>> from sqlalchemy_foundation_kit.base import load_orm_metadata >>> metadata = load_orm_metadata([ ... "myapp.users.models", ... "myapp.orders.models", ... "myapp.products.models", ... ]) >>> len(metadata.tables) 15

Use with custom metadata: >>> from sqlalchemy import MetaData >>> custom_meta = MetaData(schema="public") >>> metadata = load_orm_metadata(["myapp.models"], metadata=custom_meta)

Typical usage in Alembic env.py: >>> from sqlalchemy_foundation_kit.base import Base, load_orm_metadata >>> target_metadata = Base.metadata >>> load_orm_metadata(["myapp.models"]) # Register all models >>> # Now target_metadata.tables contains all tables

Source code in sqlalchemy_foundation_kit/base/metadata.py
def load_orm_metadata(models_modules: Iterable[str], metadata: MetaData | None = None) -> MetaData:
    """Load all ORM models metadata synchronously.

    Imports specified modules to ensure that all SQLAlchemy models are
    registered in the metadata. This is useful for migrations and schema
    introspection with tools like Alembic.

    Args:
        models_modules: Iterable of module paths to import (e.g., ["myapp.models", "myapp.core.models"]).
        metadata: Optional specific MetaData object to use. If None, uses Base.metadata.

    Returns:
        MetaData object containing all registered models from the imported modules.

    Examples:
        Load models from multiple modules:
            >>> from sqlalchemy_foundation_kit.base import load_orm_metadata
            >>> metadata = load_orm_metadata([
            ...     "myapp.users.models",
            ...     "myapp.orders.models",
            ...     "myapp.products.models",
            ... ])
            >>> len(metadata.tables)
            15

        Use with custom metadata:
            >>> from sqlalchemy import MetaData
            >>> custom_meta = MetaData(schema="public")
            >>> metadata = load_orm_metadata(["myapp.models"], metadata=custom_meta)

        Typical usage in Alembic env.py:
            >>> from sqlalchemy_foundation_kit.base import Base, load_orm_metadata
            >>> target_metadata = Base.metadata
            >>> load_orm_metadata(["myapp.models"])  # Register all models
            >>> # Now target_metadata.tables contains all tables
    """
    for module in models_modules:
        import_module(module)

    return metadata if metadata is not None else Base.metadata

configure_orjson_serialization()

Configure orjson serialization for SQLAlchemy engine.

Returns:

Type Description
dict[str, object]

Dictionary with json_serializer and json_deserializer configured.

Raises:

Type Description
ImportError

If orjson is not installed.

Examples:

>>> config = configure_orjson_serialization()
>>> "json_serializer" in config
True
>>> "json_deserializer" in config
True
Source code in sqlalchemy_foundation_kit/base/serialization.py
def configure_orjson_serialization() -> dict[str, object]:
    """Configure orjson serialization for SQLAlchemy engine.

    Returns:
        Dictionary with json_serializer and json_deserializer configured.

    Raises:
        ImportError: If orjson is not installed.

    Examples:
        >>> config = configure_orjson_serialization()
        >>> "json_serializer" in config
        True
        >>> "json_deserializer" in config
        True
    """
    orjson = require_optional("orjson", "json")

    return {
        "json_serializer": _json_serializer,
        "json_deserializer": orjson.loads,
    }

Base ORM Models

Base classes and mixins for SQLAlchemy models.

Database base models and mixins.

DB_NAMING_CONVENTION = {'ix': '%(column_0_label)s_idx', 'uq': '%(table_name)s_%(column_0_name)s_key', 'ck': '%(table_name)s_%(constraint_name)s_check', 'fk': '%(table_name)s_%(column_0_name)s_fkey', 'pk': '%(table_name)s_pkey'} module-attribute

Base

Bases: DeclarativeBase

Base class for all ORM models.

Source code in sqlalchemy_foundation_kit/base/models.py
class Base(DeclarativeBase):
    """Base class for all ORM models."""

    type_annotation_map: ClassVar[dict[type, TypeEngine[Any]]] = {
        uuid.UUID: postgresql.UUID(),
        datetime.datetime: TIMESTAMP(timezone=True),
    }
    metadata = MetaData(naming_convention=DB_NAMING_CONVENTION)

BaseTable

Bases: Base

Base table class with repr.

Source code in sqlalchemy_foundation_kit/base/models.py
class BaseTable(Base):
    """Base table class with __repr__."""

    __abstract__ = True

    def __repr__(self) -> str:
        columns = {column.name: getattr(self, column.name) for column in self.__table__.columns}
        return f"<{self.__tablename__}: {', '.join(f'{k}={v}' for k, v in columns.items())}>"

DatetimeColumnsMixin

Mixin for tables that need created_at and updated_at timestamps.

Control indexing via created_at_index and updated_at_index class variables in the model. Defaults to False.

Source code in sqlalchemy_foundation_kit/base/models.py
class DatetimeColumnsMixin:
    """Mixin for tables that need created_at and updated_at timestamps.

    Control indexing via __created_at_index__ and __updated_at_index__ class variables in the model.
    Defaults to False.
    """

    __created_at_index__: ClassVar[bool] = False
    __updated_at_index__: ClassVar[bool] = False

    @declared_attr
    def created_at(self) -> Mapped[datetime.datetime]:
        return mapped_column(
            server_default=func.timezone("UTC", func.now()),
            index=self.__created_at_index__,
        )

    @declared_attr
    def updated_at(self) -> Mapped[datetime.datetime]:
        return mapped_column(
            server_default=func.timezone("UTC", func.now()),
            onupdate=func.timezone("UTC", func.now()),
            index=self.__updated_at_index__,
        )

Example Usage

from uuid import UUID, uuid4
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy_foundation_kit import BaseTable, DatetimeColumnsMixin

class UserDB(BaseTable, DatetimeColumnsMixin):
    """User ORM model with automatic timestamps."""
    __tablename__ = "users"
    __created_at_index__ = True  # Creates index on created_at

    id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
    email: Mapped[str] = mapped_column(unique=True)
    username: Mapped[str]

# Provides:
# - created_at: Mapped[datetime] (server default)
# - updated_at: Mapped[datetime] (auto-updated)
# - __repr__: String representation

Custom Types

Custom SQLAlchemy types for PostgreSQL.

Custom SQLAlchemy types.

GenericJSONDict = dict[str, Any] module-attribute

PydanticJSONB

Bases: TypeDecorator

SQLAlchemy TypeDecorator for Pydantic models stored as JSONB.

Validates and serializes values against the supplied Pydantic-compatible type on both write and read paths. A model_type is required — if you need raw dict storage without validation, use SQLAlchemy's built-in JSONB directly (or the :data:GenericJSONDict alias).

Source code in sqlalchemy_foundation_kit/base/types.py
class PydanticJSONB(TypeDecorator):
    """SQLAlchemy TypeDecorator for Pydantic models stored as JSONB.

    Validates and serializes values against the supplied Pydantic-compatible type
    on both write and read paths. A ``model_type`` is **required** — if you need
    raw dict storage without validation, use SQLAlchemy's built-in ``JSONB`` directly
    (or the :data:`GenericJSONDict` alias).
    """

    impl = JSONB
    cache_ok = True

    def __init__(self, model_type: type[T], *args: Any, **kwargs: Any) -> None:
        """Initialize the type decorator.

        Args:
            model_type: Pydantic model class (or any type compatible with
                ``pydantic.TypeAdapter``) used to validate and serialize values.
        """
        self.model_type = model_type
        self.adapter: TypeAdapter[T] = TypeAdapter(model_type)
        super().__init__(*args, **kwargs)

    def process_bind_param(self, value: Any, dialect: Any) -> Any:
        if value is None:
            return None

        # Validate before dump to avoid Pydantic serialization warnings when value
        # is a dict (e.g. from model_dump()). This ensures value matches the expected
        # schema and converts it to a model instance if needed.
        validated = self.adapter.validate_python(value)
        return self.adapter.dump_python(validated, mode="json")

    def process_result_value(self, value: Any, dialect: Any) -> Any:
        if value is None:
            return None

        try:
            return self.adapter.validate_python(value)
        except ValidationError:
            logger.warning(
                "Validation error while loading %s from JSONB. Using raw data. "
                "This may indicate legacy data that doesn't match current schema.",
                self.model_type,
            )
            return value

__init__(model_type, *args, **kwargs)

Initialize the type decorator.

Parameters:

Name Type Description Default
model_type type[T]

Pydantic model class (or any type compatible with pydantic.TypeAdapter) used to validate and serialize values.

required
Source code in sqlalchemy_foundation_kit/base/types.py
def __init__(self, model_type: type[T], *args: Any, **kwargs: Any) -> None:
    """Initialize the type decorator.

    Args:
        model_type: Pydantic model class (or any type compatible with
            ``pydantic.TypeAdapter``) used to validate and serialize values.
    """
    self.model_type = model_type
    self.adapter: TypeAdapter[T] = TypeAdapter(model_type)
    super().__init__(*args, **kwargs)

Example Usage

from pydantic import BaseModel
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy_foundation_kit import PydanticJSONB, BaseTable

class UserPreferences(BaseModel):
    theme: str
    language: str

class UserDB(BaseTable):
    __tablename__ = "users"

    id: Mapped[UUID] = mapped_column(primary_key=True)
    preferences: Mapped[UserPreferences] = mapped_column(
        PydanticJSONB(UserPreferences)
    )

# Automatically validated on read/write
user = UserDB(
    preferences=UserPreferences(theme="dark", language="en")
)

Session Management

Async session manager with connection pooling and health checks.

Async database session manager with connection pooling.

AsyncSessionManager

Bases: Generic[SessionT]

Manages async database sessions with configurable connection pooling.

Supports two initialization approaches:

  1. Direct constructor — all parameters in constructor with defaults.
  2. Builder pattern — use :class:AsyncSessionManagerBuilder for more readable complex configurations.
Source code in sqlalchemy_foundation_kit/session/manager.py
class AsyncSessionManager(Generic[SessionT]):
    """Manages async database sessions with configurable connection pooling.

    Supports two initialization approaches:

    1. **Direct constructor** — all parameters in constructor with defaults.
    2. **Builder pattern** — use :class:`AsyncSessionManagerBuilder` for more
       readable complex configurations.
    """

    def __init__(
        self,
        url: str,
        echo: bool = False,
        poolclass: str | type = "null",
        session_class: type[SessionT] | None = None,
        expire_on_commit: bool = False,
        connect_args: dict[str, object] | None = None,
        isolation_level: str | None = None,
        pool_settings: PoolSettingsProtocol | None = None,
        use_orjson: bool = False,
        metrics: PostgresMetricsProtocol | None = None,
        on_engine_created: Callable[[AsyncEngine], None] | None = None,
        dispose_timeout: float = DEFAULT_DISPOSE_TIMEOUT_SECONDS,
        **kwargs: object,
    ) -> None:
        """Initialize session manager with direct configuration.

        Args:
            url: Database connection URL (required).
            echo: If True, SQLAlchemy will log all SQL statements (default: False).
            poolclass: SQLAlchemy pool class or name (default: "null").
                Use "queue" for production, "null" for testing.
            session_class: Custom ``AsyncSession`` subclass (default: ``AsyncSession``).
            expire_on_commit: If True, objects expire after commit (default: False).
            connect_args: Arguments passed to the database driver (default: None).
            isolation_level: Default transaction isolation level (default: None).
            pool_settings: Pool configuration settings (default: None).
            use_orjson: If True, use orjson for JSON serialization (default: False).
            metrics: Optional metrics collector (default: None).
            on_engine_created: Optional callback invoked with ``AsyncEngine`` after creation.
                Use for OpenTelemetry instrumentation, custom event listeners, etc.
            dispose_timeout: Maximum seconds to wait for engine disposal in :meth:`aclose`
                (default: 30.0). Lower this in tests or short-lived environments; raise it
                if you have long-running transactions that need more time to settle.
            **kwargs: Additional keyword arguments for ``create_async_engine``.
        """
        self._closed = False
        self._close_lock = asyncio.Lock()
        self._dispose_timeout = dispose_timeout
        resolved_poolclass = resolve_pool_class(poolclass)
        engine_kwargs = build_engine_kwargs(
            echo=echo,
            poolclass=resolved_poolclass,
            isolation_level=isolation_level,
            pool_settings=pool_settings,
            connect_args=connect_args,
            extra_kwargs=kwargs,
            use_orjson=use_orjson,
        )

        self._engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
        self._session_maker = cast(
            async_sessionmaker[SessionT],
            async_sessionmaker(
                self._engine,
                class_=session_class or AsyncSession,
                expire_on_commit=expire_on_commit,
            ),
        )

        if metrics:
            attach_metrics(self._engine, metrics)

        if on_engine_created is not None:
            on_engine_created(self._engine)

    async def aclose(self) -> None:
        """Close the engine and all connections."""
        async with self._close_lock:
            if self._closed:
                return

            try:
                # Use shield so disposal runs even if the task is cancelled; timeout avoids indefinite hang.
                await asyncio.wait_for(
                    asyncio.shield(self._engine.dispose()),
                    timeout=self._dispose_timeout,
                )
            except TimeoutError:
                logger.warning(
                    "Engine disposal timed out after %.1f seconds. Some connections may not have closed cleanly.",
                    self._dispose_timeout,
                )
            finally:
                # Always mark as closed to prevent retry loops
                self._closed = True

    def _ensure_not_closed(self) -> None:
        """Ensure that the manager is not closed."""
        if self._closed:
            raise RuntimeError("AsyncSessionManager is closed")

    async def __aenter__(self) -> AsyncSessionManager[SessionT]:
        """Support for async context manager."""
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Close the engine on exit."""
        await self.aclose()

    @property
    def engine(self) -> AsyncEngine:
        """Get the underlying engine."""
        return self._engine

    @property
    def session_maker(self) -> async_sessionmaker[SessionT]:
        """Get the session maker."""
        return self._session_maker

    @asynccontextmanager
    async def get_session(self) -> AsyncIterator[SessionT]:
        """Get a new database session."""
        self._ensure_not_closed()
        async with self._session_maker() as session:
            yield session

    @asynccontextmanager
    async def get_transaction(self, isolation_level: str | None = None) -> AsyncIterator[SessionT]:
        """Get a new database session with automatic transaction management.

        Args:
            isolation_level: Optional isolation level for the transaction.

        Yields:
            Managed async session with active transaction.
        """
        self._ensure_not_closed()
        options = {"isolation_level": isolation_level} if isolation_level else {}
        async with (
            self._session_maker(execution_options=options) as session,
            session.begin(),
        ):
            yield session

engine property

Get the underlying engine.

session_maker property

Get the session maker.

__aenter__() async

Support for async context manager.

Source code in sqlalchemy_foundation_kit/session/manager.py
async def __aenter__(self) -> AsyncSessionManager[SessionT]:
    """Support for async context manager."""
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Close the engine on exit.

Source code in sqlalchemy_foundation_kit/session/manager.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Close the engine on exit."""
    await self.aclose()

__init__(url, echo=False, poolclass='null', session_class=None, expire_on_commit=False, connect_args=None, isolation_level=None, pool_settings=None, use_orjson=False, metrics=None, on_engine_created=None, dispose_timeout=DEFAULT_DISPOSE_TIMEOUT_SECONDS, **kwargs)

Initialize session manager with direct configuration.

Parameters:

Name Type Description Default
url str

Database connection URL (required).

required
echo bool

If True, SQLAlchemy will log all SQL statements (default: False).

False
poolclass str | type

SQLAlchemy pool class or name (default: "null"). Use "queue" for production, "null" for testing.

'null'
session_class type[SessionT] | None

Custom AsyncSession subclass (default: AsyncSession).

None
expire_on_commit bool

If True, objects expire after commit (default: False).

False
connect_args dict[str, object] | None

Arguments passed to the database driver (default: None).

None
isolation_level str | None

Default transaction isolation level (default: None).

None
pool_settings PoolSettingsProtocol | None

Pool configuration settings (default: None).

None
use_orjson bool

If True, use orjson for JSON serialization (default: False).

False
metrics PostgresMetricsProtocol | None

Optional metrics collector (default: None).

None
on_engine_created Callable[[AsyncEngine], None] | None

Optional callback invoked with AsyncEngine after creation. Use for OpenTelemetry instrumentation, custom event listeners, etc.

None
dispose_timeout float

Maximum seconds to wait for engine disposal in :meth:aclose (default: 30.0). Lower this in tests or short-lived environments; raise it if you have long-running transactions that need more time to settle.

DEFAULT_DISPOSE_TIMEOUT_SECONDS
**kwargs object

Additional keyword arguments for create_async_engine.

{}
Source code in sqlalchemy_foundation_kit/session/manager.py
def __init__(
    self,
    url: str,
    echo: bool = False,
    poolclass: str | type = "null",
    session_class: type[SessionT] | None = None,
    expire_on_commit: bool = False,
    connect_args: dict[str, object] | None = None,
    isolation_level: str | None = None,
    pool_settings: PoolSettingsProtocol | None = None,
    use_orjson: bool = False,
    metrics: PostgresMetricsProtocol | None = None,
    on_engine_created: Callable[[AsyncEngine], None] | None = None,
    dispose_timeout: float = DEFAULT_DISPOSE_TIMEOUT_SECONDS,
    **kwargs: object,
) -> None:
    """Initialize session manager with direct configuration.

    Args:
        url: Database connection URL (required).
        echo: If True, SQLAlchemy will log all SQL statements (default: False).
        poolclass: SQLAlchemy pool class or name (default: "null").
            Use "queue" for production, "null" for testing.
        session_class: Custom ``AsyncSession`` subclass (default: ``AsyncSession``).
        expire_on_commit: If True, objects expire after commit (default: False).
        connect_args: Arguments passed to the database driver (default: None).
        isolation_level: Default transaction isolation level (default: None).
        pool_settings: Pool configuration settings (default: None).
        use_orjson: If True, use orjson for JSON serialization (default: False).
        metrics: Optional metrics collector (default: None).
        on_engine_created: Optional callback invoked with ``AsyncEngine`` after creation.
            Use for OpenTelemetry instrumentation, custom event listeners, etc.
        dispose_timeout: Maximum seconds to wait for engine disposal in :meth:`aclose`
            (default: 30.0). Lower this in tests or short-lived environments; raise it
            if you have long-running transactions that need more time to settle.
        **kwargs: Additional keyword arguments for ``create_async_engine``.
    """
    self._closed = False
    self._close_lock = asyncio.Lock()
    self._dispose_timeout = dispose_timeout
    resolved_poolclass = resolve_pool_class(poolclass)
    engine_kwargs = build_engine_kwargs(
        echo=echo,
        poolclass=resolved_poolclass,
        isolation_level=isolation_level,
        pool_settings=pool_settings,
        connect_args=connect_args,
        extra_kwargs=kwargs,
        use_orjson=use_orjson,
    )

    self._engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
    self._session_maker = cast(
        async_sessionmaker[SessionT],
        async_sessionmaker(
            self._engine,
            class_=session_class or AsyncSession,
            expire_on_commit=expire_on_commit,
        ),
    )

    if metrics:
        attach_metrics(self._engine, metrics)

    if on_engine_created is not None:
        on_engine_created(self._engine)

aclose() async

Close the engine and all connections.

Source code in sqlalchemy_foundation_kit/session/manager.py
async def aclose(self) -> None:
    """Close the engine and all connections."""
    async with self._close_lock:
        if self._closed:
            return

        try:
            # Use shield so disposal runs even if the task is cancelled; timeout avoids indefinite hang.
            await asyncio.wait_for(
                asyncio.shield(self._engine.dispose()),
                timeout=self._dispose_timeout,
            )
        except TimeoutError:
            logger.warning(
                "Engine disposal timed out after %.1f seconds. Some connections may not have closed cleanly.",
                self._dispose_timeout,
            )
        finally:
            # Always mark as closed to prevent retry loops
            self._closed = True

get_session() async

Get a new database session.

Source code in sqlalchemy_foundation_kit/session/manager.py
@asynccontextmanager
async def get_session(self) -> AsyncIterator[SessionT]:
    """Get a new database session."""
    self._ensure_not_closed()
    async with self._session_maker() as session:
        yield session

get_transaction(isolation_level=None) async

Get a new database session with automatic transaction management.

Parameters:

Name Type Description Default
isolation_level str | None

Optional isolation level for the transaction.

None

Yields:

Type Description
AsyncIterator[SessionT]

Managed async session with active transaction.

Source code in sqlalchemy_foundation_kit/session/manager.py
@asynccontextmanager
async def get_transaction(self, isolation_level: str | None = None) -> AsyncIterator[SessionT]:
    """Get a new database session with automatic transaction management.

    Args:
        isolation_level: Optional isolation level for the transaction.

    Yields:
        Managed async session with active transaction.
    """
    self._ensure_not_closed()
    options = {"isolation_level": isolation_level} if isolation_level else {}
    async with (
        self._session_maker(execution_options=options) as session,
        session.begin(),
    ):
        yield session

attach_metrics(engine, metrics)

Attach metrics event listeners to a SQLAlchemy engine.

Registers event handlers for connection checkout, checkin, and error events to collect pool statistics and connection metrics.

Parameters:

Name Type Description Default
engine AsyncEngine

SQLAlchemy AsyncEngine to attach listeners to.

required
metrics PostgresMetricsProtocol

Metrics collector implementing PostgresMetricsProtocol.

required
Source code in sqlalchemy_foundation_kit/session/manager.py
def attach_metrics(engine: AsyncEngine, metrics: PostgresMetricsProtocol) -> None:
    """Attach metrics event listeners to a SQLAlchemy engine.

    Registers event handlers for connection checkout, checkin, and error events
    to collect pool statistics and connection metrics.

    Args:
        engine: SQLAlchemy ``AsyncEngine`` to attach listeners to.
        metrics: Metrics collector implementing ``PostgresMetricsProtocol``.
    """
    pool = engine.pool

    def record_pool_stats() -> None:
        _safe_metric_call(
            lambda: metrics.record_pool_stats(
                pool_size=pool.size() if hasattr(pool, "size") else 0,
                pool_checked_out=(pool.checkedout() if hasattr(pool, "checkedout") else 0),
                pool_overflow=pool.overflow() if hasattr(pool, "overflow") else 0,
            ),
            "Failed to record database pool stats",
        )

    def on_checkout(dbapi_connection: Any, connection_record: Any, connection_proxy: Any) -> None:
        """Handle connection checkout event."""
        record_pool_stats()
        connection_record.info["checkout_start"] = time.perf_counter()

    def on_checkin(dbapi_connection: Any, connection_record: Any) -> None:
        """Handle connection checkin event."""
        record_pool_stats()

        if "checkout_start" in connection_record.info:
            duration = time.perf_counter() - connection_record.info["checkout_start"]
            _safe_metric_call(
                lambda: metrics.record_checkout(duration=duration),
                "Failed to record database checkout duration",
            )

    def on_error(exception_context: Any) -> None:
        """Handle database error event."""
        exc = exception_context.original_exception
        error_type = type(exc).__name__
        is_timeout = isinstance(exc, (TimeoutError, SATimeoutError))
        _safe_metric_call(
            lambda: metrics.record_error(error_type=error_type, is_timeout=is_timeout),
            "Failed to record database error metric",
        )

    event.listen(pool, "checkout", on_checkout)
    event.listen(pool, "checkin", on_checkin)
    event.listen(engine.sync_engine, "handle_error", on_error)

Builder pattern for AsyncSessionManager configuration.

Provides a fluent interface for constructing AsyncSessionManager instances with many optional parameters, following the Builder pattern to simplify complex object creation and improve code readability.

AsyncSessionManagerBuilder

Bases: Generic[SessionT]

Builder for AsyncSessionManager construction.

Provides a fluent API for configuring AsyncSessionManager with many optional parameters. This pattern improves code readability and follows KISS principle by avoiding constructors with 10+ parameters.

Examples:

Basic usage: >>> manager = ( ... AsyncSessionManagerBuilder("postgresql+asyncpg://...") ... .with_pool("queue") ... .with_echo(True) ... .build() ... )

Advanced configuration: >>> manager = ( ... AsyncSessionManagerBuilderCustomSession ... .with_session_class(CustomSession) ... .with_pool("queue", pool_settings=settings.pool) ... .with_metrics(metrics) ... .with_callbacks(on_engine_created=instrument_engine) ... .with_json_serialization(orjson=True) ... .with_isolation_level("READ COMMITTED") ... .build() ... )

Reusable configuration: >>> builder = ( ... AsyncSessionManagerBuilder("postgresql+asyncpg://...") ... .with_pool("queue") ... .with_metrics(metrics) ... ) >>> manager1 = builder.with_echo(True).build() >>> manager2 = builder.with_echo(False).build()

Source code in sqlalchemy_foundation_kit/session/builder.py
class AsyncSessionManagerBuilder(Generic[SessionT]):
    """Builder for AsyncSessionManager construction.

    Provides a fluent API for configuring AsyncSessionManager with many optional
    parameters. This pattern improves code readability and follows KISS principle
    by avoiding constructors with 10+ parameters.

    Examples:
        Basic usage:
            >>> manager = (
            ...     AsyncSessionManagerBuilder("postgresql+asyncpg://...")
            ...     .with_pool("queue")
            ...     .with_echo(True)
            ...     .build()
            ... )

        Advanced configuration:
            >>> manager = (
            ...     AsyncSessionManagerBuilder[CustomSession]("postgresql+asyncpg://...")
            ...     .with_session_class(CustomSession)
            ...     .with_pool("queue", pool_settings=settings.pool)
            ...     .with_metrics(metrics)
            ...     .with_callbacks(on_engine_created=instrument_engine)
            ...     .with_json_serialization(orjson=True)
            ...     .with_isolation_level("READ COMMITTED")
            ...     .build()
            ... )

        Reusable configuration:
            >>> builder = (
            ...     AsyncSessionManagerBuilder("postgresql+asyncpg://...")
            ...     .with_pool("queue")
            ...     .with_metrics(metrics)
            ... )
            >>> manager1 = builder.with_echo(True).build()
            >>> manager2 = builder.with_echo(False).build()
    """

    def __init__(self, url: str) -> None:
        """Initialize builder with database URL (required).

        Args:
            url: Database connection URL (required parameter).
        """
        self._url = url
        self._echo: bool = False
        self._poolclass: str | type = "null"
        self._session_class: type[SessionT] | None = None
        self._expire_on_commit: bool = False
        self._connect_args: dict[str, object] | None = None
        self._isolation_level: str | None = None
        self._pool_settings: PoolSettingsProtocol | None = None
        self._use_orjson: bool = False
        self._metrics: PostgresMetricsProtocol | None = None
        self._on_engine_created: Callable[[AsyncEngine], None] | None = None
        self._dispose_timeout: float | None = None
        self._extra_kwargs: dict[str, object] = {}

    def with_echo(self, echo: bool = True) -> AsyncSessionManagerBuilder[SessionT]:
        """Enable SQL statement logging.

        Args:
            echo: If True, SQLAlchemy logs all SQL statements.

        Returns:
            Self for method chaining.
        """
        self._echo = echo
        return self

    def with_pool(
        self,
        poolclass: str | type,
        pool_settings: PoolSettingsProtocol | None = None,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Configure connection pool.

        Args:
            poolclass: Pool class name or type (e.g., "queue", "null").
            pool_settings: Optional pool configuration settings.

        Returns:
            Self for method chaining.
        """
        self._poolclass = poolclass
        self._pool_settings = pool_settings
        return self

    def with_session_class(
        self,
        session_class: type[SessionT],
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Use custom session class.

        Args:
            session_class: Custom AsyncSession subclass.

        Returns:
            Self for method chaining.
        """
        self._session_class = session_class
        return self

    def with_expire_on_commit(
        self,
        expire: bool = True,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Configure object expiration behavior after commit.

        Args:
            expire: If True, all objects expire after commit.

        Returns:
            Self for method chaining.
        """
        self._expire_on_commit = expire
        return self

    def with_connect_args(
        self,
        **connect_args: object,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Add database driver connection arguments.

        Args:
            **connect_args: Arguments passed to the database driver.

        Returns:
            Self for method chaining.

        Examples:
            >>> builder.with_connect_args(
            ...     server_settings={"application_name": "myapp"},
            ...     command_timeout=60,
            ... )
        """
        if self._connect_args is None:
            self._connect_args = {}
        self._connect_args.update(connect_args)
        return self

    def with_isolation_level(
        self,
        isolation_level: str,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Set default transaction isolation level.

        Args:
            isolation_level: Default isolation level (e.g., "READ COMMITTED").

        Returns:
            Self for method chaining.
        """
        self._isolation_level = isolation_level
        return self

    def with_metrics(
        self,
        metrics: PostgresMetricsProtocol,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Enable connection pool metrics collection.

        Args:
            metrics: Metrics collector implementing PostgresMetricsProtocol.

        Returns:
            Self for method chaining.
        """
        self._metrics = metrics
        return self

    def with_callbacks(
        self,
        on_engine_created: Callable[[AsyncEngine], None],
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Register engine creation callback.

        Useful for attaching instrumentation (OpenTelemetry), custom event
        listeners, or debug hooks right after engine creation.

        Args:
            on_engine_created: Callback invoked with AsyncEngine after creation.

        Returns:
            Self for method chaining.

        Examples:
            >>> from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
            >>> def instrument(engine):
            ...     SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
            >>> builder.with_callbacks(on_engine_created=instrument)
        """
        self._on_engine_created = on_engine_created
        return self

    def with_json_serialization(
        self,
        orjson: bool = True,
    ) -> AsyncSessionManagerBuilder[SessionT]:
        """Configure JSON serialization backend.

        Args:
            orjson: If True, use orjson for faster JSON serialization.

        Returns:
            Self for method chaining.

        Raises:
            ImportError: If orjson=True but orjson is not installed.
        """
        self._use_orjson = orjson
        return self

    def with_extra_kwargs(self, **kwargs: object) -> AsyncSessionManagerBuilder[SessionT]:
        """Add additional keyword arguments for create_async_engine.

        Args:
            **kwargs: Additional engine configuration.

        Returns:
            Self for method chaining.
        """
        self._extra_kwargs.update(kwargs)
        return self

    def with_dispose_timeout(self, timeout: float) -> AsyncSessionManagerBuilder[SessionT]:
        """Configure how long :meth:`AsyncSessionManager.aclose` waits for engine disposal.

        Args:
            timeout: Maximum seconds to wait for the engine to dispose.

        Returns:
            Self for method chaining.
        """
        self._dispose_timeout = timeout
        return self

    def build(self) -> AsyncSessionManager[SessionT]:
        """Build AsyncSessionManager instance with configured parameters.

        Returns:
            Configured AsyncSessionManager instance.

        Raises:
            ImportError: If use_orjson=True but orjson is not installed.

        Examples:
            >>> manager = builder.build()
            >>> async with manager.get_session() as session:
            ...     await session.execute(...)
        """
        kwargs: dict[str, object] = {
            "url": self._url,
            "echo": self._echo,
            "poolclass": self._poolclass,
            "session_class": self._session_class,
            "expire_on_commit": self._expire_on_commit,
            "connect_args": self._connect_args,
            "isolation_level": self._isolation_level,
            "pool_settings": self._pool_settings,
            "use_orjson": self._use_orjson,
            "metrics": self._metrics,
            "on_engine_created": self._on_engine_created,
        }
        if self._dispose_timeout is not None:
            kwargs["dispose_timeout"] = self._dispose_timeout
        return AsyncSessionManager(**kwargs, **self._extra_kwargs)  # type: ignore[arg-type]

__init__(url)

Initialize builder with database URL (required).

Parameters:

Name Type Description Default
url str

Database connection URL (required parameter).

required
Source code in sqlalchemy_foundation_kit/session/builder.py
def __init__(self, url: str) -> None:
    """Initialize builder with database URL (required).

    Args:
        url: Database connection URL (required parameter).
    """
    self._url = url
    self._echo: bool = False
    self._poolclass: str | type = "null"
    self._session_class: type[SessionT] | None = None
    self._expire_on_commit: bool = False
    self._connect_args: dict[str, object] | None = None
    self._isolation_level: str | None = None
    self._pool_settings: PoolSettingsProtocol | None = None
    self._use_orjson: bool = False
    self._metrics: PostgresMetricsProtocol | None = None
    self._on_engine_created: Callable[[AsyncEngine], None] | None = None
    self._dispose_timeout: float | None = None
    self._extra_kwargs: dict[str, object] = {}

build()

Build AsyncSessionManager instance with configured parameters.

Returns:

Type Description
AsyncSessionManager[SessionT]

Configured AsyncSessionManager instance.

Raises:

Type Description
ImportError

If use_orjson=True but orjson is not installed.

Examples:

>>> manager = builder.build()
>>> async with manager.get_session() as session:
...     await session.execute(...)
Source code in sqlalchemy_foundation_kit/session/builder.py
def build(self) -> AsyncSessionManager[SessionT]:
    """Build AsyncSessionManager instance with configured parameters.

    Returns:
        Configured AsyncSessionManager instance.

    Raises:
        ImportError: If use_orjson=True but orjson is not installed.

    Examples:
        >>> manager = builder.build()
        >>> async with manager.get_session() as session:
        ...     await session.execute(...)
    """
    kwargs: dict[str, object] = {
        "url": self._url,
        "echo": self._echo,
        "poolclass": self._poolclass,
        "session_class": self._session_class,
        "expire_on_commit": self._expire_on_commit,
        "connect_args": self._connect_args,
        "isolation_level": self._isolation_level,
        "pool_settings": self._pool_settings,
        "use_orjson": self._use_orjson,
        "metrics": self._metrics,
        "on_engine_created": self._on_engine_created,
    }
    if self._dispose_timeout is not None:
        kwargs["dispose_timeout"] = self._dispose_timeout
    return AsyncSessionManager(**kwargs, **self._extra_kwargs)  # type: ignore[arg-type]

with_callbacks(on_engine_created)

Register engine creation callback.

Useful for attaching instrumentation (OpenTelemetry), custom event listeners, or debug hooks right after engine creation.

Parameters:

Name Type Description Default
on_engine_created Callable[[AsyncEngine], None]

Callback invoked with AsyncEngine after creation.

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Examples:

>>> from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
>>> def instrument(engine):
...     SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
>>> builder.with_callbacks(on_engine_created=instrument)
Source code in sqlalchemy_foundation_kit/session/builder.py
def with_callbacks(
    self,
    on_engine_created: Callable[[AsyncEngine], None],
) -> AsyncSessionManagerBuilder[SessionT]:
    """Register engine creation callback.

    Useful for attaching instrumentation (OpenTelemetry), custom event
    listeners, or debug hooks right after engine creation.

    Args:
        on_engine_created: Callback invoked with AsyncEngine after creation.

    Returns:
        Self for method chaining.

    Examples:
        >>> from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
        >>> def instrument(engine):
        ...     SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
        >>> builder.with_callbacks(on_engine_created=instrument)
    """
    self._on_engine_created = on_engine_created
    return self

with_connect_args(**connect_args)

Add database driver connection arguments.

Parameters:

Name Type Description Default
**connect_args object

Arguments passed to the database driver.

{}

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Examples:

>>> builder.with_connect_args(
...     server_settings={"application_name": "myapp"},
...     command_timeout=60,
... )
Source code in sqlalchemy_foundation_kit/session/builder.py
def with_connect_args(
    self,
    **connect_args: object,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Add database driver connection arguments.

    Args:
        **connect_args: Arguments passed to the database driver.

    Returns:
        Self for method chaining.

    Examples:
        >>> builder.with_connect_args(
        ...     server_settings={"application_name": "myapp"},
        ...     command_timeout=60,
        ... )
    """
    if self._connect_args is None:
        self._connect_args = {}
    self._connect_args.update(connect_args)
    return self

with_dispose_timeout(timeout)

Configure how long :meth:AsyncSessionManager.aclose waits for engine disposal.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for the engine to dispose.

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_dispose_timeout(self, timeout: float) -> AsyncSessionManagerBuilder[SessionT]:
    """Configure how long :meth:`AsyncSessionManager.aclose` waits for engine disposal.

    Args:
        timeout: Maximum seconds to wait for the engine to dispose.

    Returns:
        Self for method chaining.
    """
    self._dispose_timeout = timeout
    return self

with_echo(echo=True)

Enable SQL statement logging.

Parameters:

Name Type Description Default
echo bool

If True, SQLAlchemy logs all SQL statements.

True

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_echo(self, echo: bool = True) -> AsyncSessionManagerBuilder[SessionT]:
    """Enable SQL statement logging.

    Args:
        echo: If True, SQLAlchemy logs all SQL statements.

    Returns:
        Self for method chaining.
    """
    self._echo = echo
    return self

with_expire_on_commit(expire=True)

Configure object expiration behavior after commit.

Parameters:

Name Type Description Default
expire bool

If True, all objects expire after commit.

True

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_expire_on_commit(
    self,
    expire: bool = True,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Configure object expiration behavior after commit.

    Args:
        expire: If True, all objects expire after commit.

    Returns:
        Self for method chaining.
    """
    self._expire_on_commit = expire
    return self

with_extra_kwargs(**kwargs)

Add additional keyword arguments for create_async_engine.

Parameters:

Name Type Description Default
**kwargs object

Additional engine configuration.

{}

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_extra_kwargs(self, **kwargs: object) -> AsyncSessionManagerBuilder[SessionT]:
    """Add additional keyword arguments for create_async_engine.

    Args:
        **kwargs: Additional engine configuration.

    Returns:
        Self for method chaining.
    """
    self._extra_kwargs.update(kwargs)
    return self

with_isolation_level(isolation_level)

Set default transaction isolation level.

Parameters:

Name Type Description Default
isolation_level str

Default isolation level (e.g., "READ COMMITTED").

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_isolation_level(
    self,
    isolation_level: str,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Set default transaction isolation level.

    Args:
        isolation_level: Default isolation level (e.g., "READ COMMITTED").

    Returns:
        Self for method chaining.
    """
    self._isolation_level = isolation_level
    return self

with_json_serialization(orjson=True)

Configure JSON serialization backend.

Parameters:

Name Type Description Default
orjson bool

If True, use orjson for faster JSON serialization.

True

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Raises:

Type Description
ImportError

If orjson=True but orjson is not installed.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_json_serialization(
    self,
    orjson: bool = True,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Configure JSON serialization backend.

    Args:
        orjson: If True, use orjson for faster JSON serialization.

    Returns:
        Self for method chaining.

    Raises:
        ImportError: If orjson=True but orjson is not installed.
    """
    self._use_orjson = orjson
    return self

with_metrics(metrics)

Enable connection pool metrics collection.

Parameters:

Name Type Description Default
metrics PostgresMetricsProtocol

Metrics collector implementing PostgresMetricsProtocol.

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_metrics(
    self,
    metrics: PostgresMetricsProtocol,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Enable connection pool metrics collection.

    Args:
        metrics: Metrics collector implementing PostgresMetricsProtocol.

    Returns:
        Self for method chaining.
    """
    self._metrics = metrics
    return self

with_pool(poolclass, pool_settings=None)

Configure connection pool.

Parameters:

Name Type Description Default
poolclass str | type

Pool class name or type (e.g., "queue", "null").

required
pool_settings PoolSettingsProtocol | None

Optional pool configuration settings.

None

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_pool(
    self,
    poolclass: str | type,
    pool_settings: PoolSettingsProtocol | None = None,
) -> AsyncSessionManagerBuilder[SessionT]:
    """Configure connection pool.

    Args:
        poolclass: Pool class name or type (e.g., "queue", "null").
        pool_settings: Optional pool configuration settings.

    Returns:
        Self for method chaining.
    """
    self._poolclass = poolclass
    self._pool_settings = pool_settings
    return self

with_session_class(session_class)

Use custom session class.

Parameters:

Name Type Description Default
session_class type[SessionT]

Custom AsyncSession subclass.

required

Returns:

Type Description
AsyncSessionManagerBuilder[SessionT]

Self for method chaining.

Source code in sqlalchemy_foundation_kit/session/builder.py
def with_session_class(
    self,
    session_class: type[SessionT],
) -> AsyncSessionManagerBuilder[SessionT]:
    """Use custom session class.

    Args:
        session_class: Custom AsyncSession subclass.

    Returns:
        Self for method chaining.
    """
    self._session_class = session_class
    return self

Custom connection class for pgbouncer compatibility (async).

AsyncCConnection

Bases: Connection

Custom async connection class for pgbouncer magic.

This subclass overrides only the private method _get_unique_id so that prepared statement identifiers are unique per connection. That is required when using pgbouncer in transaction mode, where the same server connection may be reused for different logical connections.

See: https://github.com/sqlalchemy/sqlalchemy/issues/6467

Source code in sqlalchemy_foundation_kit/session/connection.py
class AsyncCConnection(asyncpg.Connection):
    """Custom async connection class for pgbouncer magic.

    This subclass overrides only the private method _get_unique_id so that
    prepared statement identifiers are unique per connection. That is required
    when using pgbouncer in transaction mode, where the same server connection
    may be reused for different logical connections.

    See: https://github.com/sqlalchemy/sqlalchemy/issues/6467
    """

    def _get_unique_id(self, prefix: str) -> str:
        """Generate unique ID for prepared statements.

        Args:
            prefix: Prefix for the unique ID.

        Returns:
            A unique string ID including the prefix and a UUID.
        """
        return f"__asyncpg_{prefix}_{uuid.uuid4()}__"

Retry utilities for database connection startup.

DEFAULT_RETRY_CONFIG = RetryConfig() module-attribute

RetryConfig dataclass

Configuration for retry behavior with exponential backoff.

Attributes:

Name Type Description
max_retries int

Maximum number of attempts before giving up.

retry_delay float

Base delay in seconds; actual delay is retry_delay * 2 ** attempt, capped at max_backoff_delay.

max_backoff_delay float

Maximum delay between retries in seconds. Prevents exponential backoff from growing indefinitely.

Source code in sqlalchemy_foundation_kit/session/retry.py
@dataclass(frozen=True)
class RetryConfig:
    """Configuration for retry behavior with exponential backoff.

    Attributes:
        max_retries: Maximum number of attempts before giving up.
        retry_delay: Base delay in seconds; actual delay is ``retry_delay * 2 ** attempt``,
            capped at ``max_backoff_delay``.
        max_backoff_delay: Maximum delay between retries in seconds.
            Prevents exponential backoff from growing indefinitely.
    """

    max_retries: int = 3
    retry_delay: float = 1.0
    max_backoff_delay: float = 60.0

retry_async_connection(connect_func, service_name, config=DEFAULT_RETRY_CONFIG) async

Retry an async connection callable with exponential backoff.

Parameters:

Name Type Description Default
connect_func Callable[[], Awaitable[None]]

Callable that attempts to establish/test the connection.

required
service_name str

Human-readable service name used in log messages.

required
config RetryConfig

Retry behavior configuration.

DEFAULT_RETRY_CONFIG

Raises:

Type Description
ValueError

If config.max_retries is less than 1.

Exception

Re-raises the last exception when all attempts fail.

Source code in sqlalchemy_foundation_kit/session/retry.py
async def retry_async_connection(
    connect_func: Callable[[], Awaitable[None]],
    service_name: str,
    config: RetryConfig = DEFAULT_RETRY_CONFIG,
) -> None:
    """Retry an async connection callable with exponential backoff.

    Args:
        connect_func: Callable that attempts to establish/test the connection.
        service_name: Human-readable service name used in log messages.
        config: Retry behavior configuration.

    Raises:
        ValueError: If config.max_retries is less than 1.
        Exception: Re-raises the last exception when all attempts fail.
    """
    if config.max_retries < 1:
        raise ValueError(f"max_retries must be >= 1, got {config.max_retries}")

    for attempt in range(config.max_retries):
        try:
            await connect_func()
        except Exception:
            if attempt == config.max_retries - 1:
                logger.exception(
                    "%s connection failed after %d attempts",
                    service_name,
                    config.max_retries,
                )
                raise
            logger.warning(
                "%s connection attempt %d failed, retrying...",
                service_name,
                attempt + 1,
            )
            delay = min(config.retry_delay * (2**attempt), config.max_backoff_delay)
            await asyncio.sleep(delay)
        else:
            logger.info("%s connection successful", service_name)
            return

PostgreSQL advisory locks (async).

try_advisory_xact_lock(session, key) async

Acquire a Postgres transaction-scoped advisory lock.

Uses pg_try_advisory_xact_lock: non-blocking, released automatically at transaction end. String keys are hashed to integers. The key is then truncated to signed 64-bit as Postgres expects.

Parameters:

Name Type Description Default
session AsyncSession

SQLAlchemy AsyncSession within an active transaction.

required
key str | int

Lock identifier (string or integer). Strings are hashed to integers.

required

Returns:

Type Description
bool

True if lock was acquired, False if already held by another session.

Examples:

>>> async with session_maker() as session:
...     async with session.begin():
...         if await try_advisory_xact_lock(session, "my_operation"):
...             # Perform protected operation
...             await session.execute(...)
...             await session.commit()
Source code in sqlalchemy_foundation_kit/session/locks.py
async def try_advisory_xact_lock(session: AsyncSession, key: str | int) -> bool:
    """Acquire a Postgres transaction-scoped advisory lock.

    Uses ``pg_try_advisory_xact_lock``: non-blocking, released automatically
    at transaction end. String keys are hashed to integers. The key is then
    truncated to signed 64-bit as Postgres expects.

    Args:
        session: SQLAlchemy AsyncSession within an active transaction.
        key: Lock identifier (string or integer). Strings are hashed to integers.

    Returns:
        True if lock was acquired, False if already held by another session.

    Examples:
        >>> async with session_maker() as session:
        ...     async with session.begin():
        ...         if await try_advisory_xact_lock(session, "my_operation"):
        ...             # Perform protected operation
        ...             await session.execute(...)
        ...             await session.commit()
    """
    # Convert string keys to integers via hashing
    int_key = hash(key) if isinstance(key, str) else key

    result = await session.execute(
        text("SELECT pg_try_advisory_xact_lock(:k)"),
        {"k": _to_signed64(int_key)},
    )
    return bool(result.scalar())

Example Usage

from sqlalchemy_foundation_kit import create_async_session_manager

# Create session manager
session_manager = create_async_session_manager(
    settings.postgres,
    metrics=metrics,
)

# Transactional context
async with session_manager.get_transaction() as session:
    user = UserDB(email="user@example.com")
    session.add(user)
    # Auto-commit on exit, auto-rollback on exception

# Health check
is_healthy = await session_manager.healthcheck()

# Graceful shutdown
await session_manager.close(timeout=30.0)

Unit of Work

Unit of Work pattern for transactional consistency.

Unit of Work protocols.

AsyncUnitOfWork

Bases: Protocol, Generic[T_co]

Provides transactional context for repository operations.

Provides three modes of operation
  • transaction(): For write operations with automatic commit/rollback.
  • managed_session(): For write operations with manual commit/rollback control.
  • query(): For read-only operations without transaction management.
Source code in sqlalchemy_foundation_kit/uow/protocols.py
class AsyncUnitOfWork(Protocol, Generic[T_co]):
    """Provides transactional context for repository operations.

    Provides three modes of operation:
        - ``transaction()``: For write operations with automatic commit/rollback.
        - ``managed_session()``: For write operations with **manual** commit/rollback control.
        - ``query()``: For read-only operations without transaction management.
    """

    def transaction(
        self,
        isolation_level: str | None = None,
        flush_before_commit: bool | None = None,
    ) -> AbstractAsyncContextManager[T_co]:
        """Create a new transaction context with automatic commit/rollback.

        Args:
            isolation_level: Optional transaction isolation level.
            flush_before_commit: If True, flush session before commit to surface
                constraint violations within transaction. If ``None``, the implementation
                applies its own default (typically configured at construction time).
        """

    def managed_session(
        self,
        isolation_level: str | None = None,
    ) -> AbstractAsyncContextManager[tuple[T_co, Any]]:
        """Create a session with manual transaction control.

        Unlike :meth:`transaction`, this does **NOT** auto-commit on success. The caller
        must explicitly call ``session.commit()`` or ``session.rollback()``. Useful for
        complex transactional logic where the commit decision depends on multiple
        conditions or external factors.

        The second element of the yielded tuple is the underlying session object
        (typed as ``Any`` in the protocol to avoid leaking SQLAlchemy types — concrete
        implementations like ``AsyncSQLAlchemyUnitOfWork`` yield ``AsyncSession``).

        On exception inside the context, the session is automatically rolled back.

        Args:
            isolation_level: Optional transaction isolation level.
        """

    def query(self, isolation_level: str | None = None) -> AbstractAsyncContextManager[T_co]:
        """Create a read-only query context without transaction management."""

managed_session(isolation_level=None)

Create a session with manual transaction control.

Unlike :meth:transaction, this does NOT auto-commit on success. The caller must explicitly call session.commit() or session.rollback(). Useful for complex transactional logic where the commit decision depends on multiple conditions or external factors.

The second element of the yielded tuple is the underlying session object (typed as Any in the protocol to avoid leaking SQLAlchemy types — concrete implementations like AsyncSQLAlchemyUnitOfWork yield AsyncSession).

On exception inside the context, the session is automatically rolled back.

Parameters:

Name Type Description Default
isolation_level str | None

Optional transaction isolation level.

None
Source code in sqlalchemy_foundation_kit/uow/protocols.py
def managed_session(
    self,
    isolation_level: str | None = None,
) -> AbstractAsyncContextManager[tuple[T_co, Any]]:
    """Create a session with manual transaction control.

    Unlike :meth:`transaction`, this does **NOT** auto-commit on success. The caller
    must explicitly call ``session.commit()`` or ``session.rollback()``. Useful for
    complex transactional logic where the commit decision depends on multiple
    conditions or external factors.

    The second element of the yielded tuple is the underlying session object
    (typed as ``Any`` in the protocol to avoid leaking SQLAlchemy types — concrete
    implementations like ``AsyncSQLAlchemyUnitOfWork`` yield ``AsyncSession``).

    On exception inside the context, the session is automatically rolled back.

    Args:
        isolation_level: Optional transaction isolation level.
    """

query(isolation_level=None)

Create a read-only query context without transaction management.

Source code in sqlalchemy_foundation_kit/uow/protocols.py
def query(self, isolation_level: str | None = None) -> AbstractAsyncContextManager[T_co]:
    """Create a read-only query context without transaction management."""

transaction(isolation_level=None, flush_before_commit=None)

Create a new transaction context with automatic commit/rollback.

Parameters:

Name Type Description Default
isolation_level str | None

Optional transaction isolation level.

None
flush_before_commit bool | None

If True, flush session before commit to surface constraint violations within transaction. If None, the implementation applies its own default (typically configured at construction time).

None
Source code in sqlalchemy_foundation_kit/uow/protocols.py
def transaction(
    self,
    isolation_level: str | None = None,
    flush_before_commit: bool | None = None,
) -> AbstractAsyncContextManager[T_co]:
    """Create a new transaction context with automatic commit/rollback.

    Args:
        isolation_level: Optional transaction isolation level.
        flush_before_commit: If True, flush session before commit to surface
            constraint violations within transaction. If ``None``, the implementation
            applies its own default (typically configured at construction time).
    """

AsyncUowTransaction

Bases: Protocol

Transaction-scoped repositories container.

Intended to be extended by concrete transaction types that expose repository attributes for a specific bounded context.

Source code in sqlalchemy_foundation_kit/uow/protocols.py
class AsyncUowTransaction(Protocol):
    """Transaction-scoped repositories container.

    Intended to be extended by concrete transaction types that expose
    repository attributes for a specific bounded context.
    """

SupportsAdvisoryLock

Bases: Protocol

Capability protocol for transactions supporting advisory locks.

Use this when your transaction needs PostgreSQL advisory lock support. Not all transactions require this capability (e.g., in-memory, read-only).

Source code in sqlalchemy_foundation_kit/uow/protocols.py
class SupportsAdvisoryLock(Protocol):
    """Capability protocol for transactions supporting advisory locks.

    Use this when your transaction needs PostgreSQL advisory lock support.
    Not all transactions require this capability (e.g., in-memory, read-only).
    """

    async def try_advisory_lock(self, key: int) -> bool:
        """Try to acquire a transaction-scoped advisory lock identified by ``key``.

        Returns ``True`` if the lock was acquired (and is held for the rest of the
        transaction), ``False`` if another transaction already holds it.
        """
        ...

try_advisory_lock(key) async

Try to acquire a transaction-scoped advisory lock identified by key.

Returns True if the lock was acquired (and is held for the rest of the transaction), False if another transaction already holds it.

Source code in sqlalchemy_foundation_kit/uow/protocols.py
async def try_advisory_lock(self, key: int) -> bool:
    """Try to acquire a transaction-scoped advisory lock identified by ``key``.

    Returns ``True`` if the lock was acquired (and is held for the rest of the
    transaction), ``False`` if another transaction already holds it.
    """
    ...

Unit of Work implementation (async SQLAlchemy).

AsyncSQLAlchemyUnitOfWork

Bases: AsyncUnitOfWork[T], Generic[T]

Base async SQLAlchemy Unit of Work.

Provides transactional context for repository operations using SQLAlchemy AsyncSession.

Methods:

Name Description
transaction

For write operations with automatic commit/rollback.

query

For read-only operations without transaction management.

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
class AsyncSQLAlchemyUnitOfWork(AsyncUnitOfWork[T], Generic[T]):
    """Base async SQLAlchemy Unit of Work.

    Provides transactional context for repository operations using SQLAlchemy AsyncSession.

    Methods:
        transaction(): For write operations with automatic commit/rollback.
        query(): For read-only operations without transaction management.
    """

    def __init__(
        self,
        session_maker: async_sessionmaker[AsyncSession],
        transaction_factory: Callable[[AsyncSession], T],
        *,
        flush_before_commit: bool = True,
    ) -> None:
        """Initialize the unit of work.

        Args:
            session_maker: Async session factory.
            transaction_factory: Callable producing the transaction object exposed to callers.
            flush_before_commit: Default ``flush_before_commit`` policy applied when
                :meth:`transaction` is called without an explicit override.
                Set to ``False`` here once if your service prefers SQLAlchemy's default
                "flush on commit" semantics instead of an early flush.
        """
        self._session_maker = session_maker
        self._transaction_factory = transaction_factory
        self._flush_before_commit = flush_before_commit

    @asynccontextmanager
    async def open_session(
        self,
        isolation_level: IsolationLevel | str | None = None,
    ) -> AsyncIterator[AsyncSession]:
        """Open a session with optional isolation level applied.

        This is the extension point for subclasses that need custom session setup
        (e.g., RLS context, session-level GUCs, custom statement timeouts).
        Override to wrap or augment session creation while preserving isolation handling.

        Used internally by :meth:`transaction` and :meth:`query`.

        Args:
            isolation_level: Optional transaction isolation level.

        Yields:
            Configured AsyncSession instance.

        Raises:
            ValueError: If isolation_level is not supported.

        Examples:
            Subclass that sets a session-level GUC for every transaction:

                class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
                    def __init__(self, session_maker, tx_factory, tenant_id):
                        super().__init__(session_maker, tx_factory)
                        self._tenant_id = tenant_id

                    @asynccontextmanager
                    async def open_session(self, isolation_level=None):
                        async with super().open_session(isolation_level) as session:
                            await session.execute(
                                text("SET app.tenant_id = :tid"),
                                {"tid": self._tenant_id},
                            )
                            yield session
        """
        async with self._session_maker() as session:
            # Apply isolation level if specified (DRY: using utility function)
            await apply_isolation_level(session, isolation_level)
            yield session

    @asynccontextmanager
    async def transaction(
        self,
        isolation_level: IsolationLevel | str | None = None,
        flush_before_commit: bool | None = None,
    ) -> AsyncIterator[T]:
        """Create a new transaction context with automatic commit/rollback.

        The Unit of Work automatically commits the transaction on successful exit
        and rolls back on exception. This ensures atomic operations.

        Args:
            isolation_level: Optional transaction isolation level.
                Can be an IsolationLevel enum member or a string value.
                Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".
            flush_before_commit: If True, flush the session before commit to surface
                constraint violations while still inside the transaction. If ``None`` (default),
                falls back to the value passed to the constructor (``True`` unless overridden).

        Raises:
            ValueError: If isolation_level is not supported.

        Examples:
            Write operation with automatic commit:
                async with uow.transaction() as tx:
                    await tx.users.create(...)
                    # Auto-commit on exit, rollback on exception
        """
        if flush_before_commit is None:
            flush_before_commit = self._flush_before_commit

        async with self.open_session(isolation_level) as session, session.begin():
            uow = self._transaction_factory(session)
            yield uow
            if flush_before_commit:
                # Flush changes before commit to catch constraint violations early
                # while still inside the transaction context.
                try:
                    await session.flush()
                except SQLAlchemyError as e:
                    logger.warning("Database flush failed", extra={"error": str(e)})
                    raise

    @asynccontextmanager
    async def managed_session(
        self,
        isolation_level: IsolationLevel | str | None = None,
    ) -> AsyncIterator[tuple[T, AsyncSession]]:
        """Create a session with manual transaction control.

        Unlike transaction(), this does NOT auto-commit. The caller must
        explicitly call session.commit() or session.rollback(). This is useful
        for complex transactional logic where commit decision depends on multiple
        conditions or external factors.

        A transaction is started automatically, but you have full control over
        when to commit or rollback. If you exit without calling either, SQLAlchemy
        will automatically rollback on session close.

        Args:
            isolation_level: Optional transaction isolation level.
                Can be an IsolationLevel enum member or a string value.
                Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

        Yields:
            Tuple of (transaction object, session) for manual control.

        Raises:
            ValueError: If isolation_level is not supported.

        Examples:
            Manual transaction control:
                async with uow.managed_session() as (tx, session):
                    await tx.users.create(...)

                    # Manually decide when to commit
                    if should_commit:
                        await session.commit()
                    else:
                        await session.rollback()

            Conditional commit based on external service:
                async with uow.managed_session() as (tx, session):
                    user = await tx.users.create(...)

                    # Call external service
                    result = await external_api.validate(user)

                    if result.success:
                        await session.commit()
                    else:
                        await session.rollback()

            Multiple operations with intermediate decision:
                async with uow.managed_session() as (tx, session):
                    user = await tx.users.create(...)

                    # First checkpoint
                    await session.flush()

                    # More operations
                    await tx.profiles.create(user_id=user.id)

                    # Final decision
                    await session.commit()

        Note:
            Prefer :meth:`transaction` for the vast majority of use cases — it commits
            automatically and enforces the UoW pattern. This method is an advanced escape
            hatch for scenarios where the commit/rollback decision depends on conditions
            that can only be evaluated after data is written (e.g., external service
            validation). ``session.commit()`` calls belong exclusively at the use-case
            boundary via this method, never inside repository implementations.

        Warning:
            You MUST explicitly call session.commit() or session.rollback().
            Exiting the context without calling either will result in automatic
            rollback when the session closes.
        """
        async with self.open_session(isolation_level) as session:
            # Start transaction WITHOUT context manager - no auto-commit
            await session.begin()
            try:
                uow = self._transaction_factory(session)
                yield uow, session
            except (Exception, asyncio.CancelledError):
                # Auto-rollback on exception OR cancellation
                await session.rollback()
                raise
            # User must call session.commit() or session.rollback() explicitly

    @asynccontextmanager
    async def query(
        self,
        isolation_level: IsolationLevel | str | None = None,
    ) -> AsyncIterator[T]:
        """Create a read-only query context without transaction management.

        This method is designed for read-only operations and does not start a transaction
        or perform any commit/rollback. It's semantically clearer than managed_session()
        for read operations.

        Args:
            isolation_level: Optional transaction isolation level.
                Can be an IsolationLevel enum member or a string value.
                Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

        Raises:
            ValueError: If isolation_level is not supported.

        Examples:
            Read-only query:
                async with uow.query() as qx:
                    users = await qx.users.list_all()
                    user = await qx.users.get_by_id(user_id)
                # No commit/rollback - just closes session

        Note:
            While this method is intended for read-only operations, SQLAlchemy does not enforce
            this at the session level. It's up to the caller to ensure only read operations are performed.
        """
        async with self.open_session(isolation_level) as session:
            # No transaction begin/commit - just yield the session
            uow = self._transaction_factory(session)
            yield uow

__init__(session_maker, transaction_factory, *, flush_before_commit=True)

Initialize the unit of work.

Parameters:

Name Type Description Default
session_maker async_sessionmaker[AsyncSession]

Async session factory.

required
transaction_factory Callable[[AsyncSession], T]

Callable producing the transaction object exposed to callers.

required
flush_before_commit bool

Default flush_before_commit policy applied when :meth:transaction is called without an explicit override. Set to False here once if your service prefers SQLAlchemy's default "flush on commit" semantics instead of an early flush.

True
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
def __init__(
    self,
    session_maker: async_sessionmaker[AsyncSession],
    transaction_factory: Callable[[AsyncSession], T],
    *,
    flush_before_commit: bool = True,
) -> None:
    """Initialize the unit of work.

    Args:
        session_maker: Async session factory.
        transaction_factory: Callable producing the transaction object exposed to callers.
        flush_before_commit: Default ``flush_before_commit`` policy applied when
            :meth:`transaction` is called without an explicit override.
            Set to ``False`` here once if your service prefers SQLAlchemy's default
            "flush on commit" semantics instead of an early flush.
    """
    self._session_maker = session_maker
    self._transaction_factory = transaction_factory
    self._flush_before_commit = flush_before_commit

managed_session(isolation_level=None) async

Create a session with manual transaction control.

Unlike transaction(), this does NOT auto-commit. The caller must explicitly call session.commit() or session.rollback(). This is useful for complex transactional logic where commit decision depends on multiple conditions or external factors.

A transaction is started automatically, but you have full control over when to commit or rollback. If you exit without calling either, SQLAlchemy will automatically rollback on session close.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

None

Yields:

Type Description
AsyncIterator[tuple[T, AsyncSession]]

Tuple of (transaction object, session) for manual control.

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

Manual transaction control: async with uow.managed_session() as (tx, session): await tx.users.create(...)

    # Manually decide when to commit
    if should_commit:
        await session.commit()
    else:
        await session.rollback()

Conditional commit based on external service: async with uow.managed_session() as (tx, session): user = await tx.users.create(...)

    # Call external service
    result = await external_api.validate(user)

    if result.success:
        await session.commit()
    else:
        await session.rollback()

Multiple operations with intermediate decision: async with uow.managed_session() as (tx, session): user = await tx.users.create(...)

    # First checkpoint
    await session.flush()

    # More operations
    await tx.profiles.create(user_id=user.id)

    # Final decision
    await session.commit()
Note

Prefer :meth:transaction for the vast majority of use cases — it commits automatically and enforces the UoW pattern. This method is an advanced escape hatch for scenarios where the commit/rollback decision depends on conditions that can only be evaluated after data is written (e.g., external service validation). session.commit() calls belong exclusively at the use-case boundary via this method, never inside repository implementations.

Warning

You MUST explicitly call session.commit() or session.rollback(). Exiting the context without calling either will result in automatic rollback when the session closes.

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
@asynccontextmanager
async def managed_session(
    self,
    isolation_level: IsolationLevel | str | None = None,
) -> AsyncIterator[tuple[T, AsyncSession]]:
    """Create a session with manual transaction control.

    Unlike transaction(), this does NOT auto-commit. The caller must
    explicitly call session.commit() or session.rollback(). This is useful
    for complex transactional logic where commit decision depends on multiple
    conditions or external factors.

    A transaction is started automatically, but you have full control over
    when to commit or rollback. If you exit without calling either, SQLAlchemy
    will automatically rollback on session close.

    Args:
        isolation_level: Optional transaction isolation level.
            Can be an IsolationLevel enum member or a string value.
            Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

    Yields:
        Tuple of (transaction object, session) for manual control.

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        Manual transaction control:
            async with uow.managed_session() as (tx, session):
                await tx.users.create(...)

                # Manually decide when to commit
                if should_commit:
                    await session.commit()
                else:
                    await session.rollback()

        Conditional commit based on external service:
            async with uow.managed_session() as (tx, session):
                user = await tx.users.create(...)

                # Call external service
                result = await external_api.validate(user)

                if result.success:
                    await session.commit()
                else:
                    await session.rollback()

        Multiple operations with intermediate decision:
            async with uow.managed_session() as (tx, session):
                user = await tx.users.create(...)

                # First checkpoint
                await session.flush()

                # More operations
                await tx.profiles.create(user_id=user.id)

                # Final decision
                await session.commit()

    Note:
        Prefer :meth:`transaction` for the vast majority of use cases — it commits
        automatically and enforces the UoW pattern. This method is an advanced escape
        hatch for scenarios where the commit/rollback decision depends on conditions
        that can only be evaluated after data is written (e.g., external service
        validation). ``session.commit()`` calls belong exclusively at the use-case
        boundary via this method, never inside repository implementations.

    Warning:
        You MUST explicitly call session.commit() or session.rollback().
        Exiting the context without calling either will result in automatic
        rollback when the session closes.
    """
    async with self.open_session(isolation_level) as session:
        # Start transaction WITHOUT context manager - no auto-commit
        await session.begin()
        try:
            uow = self._transaction_factory(session)
            yield uow, session
        except (Exception, asyncio.CancelledError):
            # Auto-rollback on exception OR cancellation
            await session.rollback()
            raise

open_session(isolation_level=None) async

Open a session with optional isolation level applied.

This is the extension point for subclasses that need custom session setup (e.g., RLS context, session-level GUCs, custom statement timeouts). Override to wrap or augment session creation while preserving isolation handling.

Used internally by :meth:transaction and :meth:query.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level.

None

Yields:

Type Description
AsyncIterator[AsyncSession]

Configured AsyncSession instance.

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

Subclass that sets a session-level GUC for every transaction:

class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
    def __init__(self, session_maker, tx_factory, tenant_id):
        super().__init__(session_maker, tx_factory)
        self._tenant_id = tenant_id

    @asynccontextmanager
    async def open_session(self, isolation_level=None):
        async with super().open_session(isolation_level) as session:
            await session.execute(
                text("SET app.tenant_id = :tid"),
                {"tid": self._tenant_id},
            )
            yield session
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
@asynccontextmanager
async def open_session(
    self,
    isolation_level: IsolationLevel | str | None = None,
) -> AsyncIterator[AsyncSession]:
    """Open a session with optional isolation level applied.

    This is the extension point for subclasses that need custom session setup
    (e.g., RLS context, session-level GUCs, custom statement timeouts).
    Override to wrap or augment session creation while preserving isolation handling.

    Used internally by :meth:`transaction` and :meth:`query`.

    Args:
        isolation_level: Optional transaction isolation level.

    Yields:
        Configured AsyncSession instance.

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        Subclass that sets a session-level GUC for every transaction:

            class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
                def __init__(self, session_maker, tx_factory, tenant_id):
                    super().__init__(session_maker, tx_factory)
                    self._tenant_id = tenant_id

                @asynccontextmanager
                async def open_session(self, isolation_level=None):
                    async with super().open_session(isolation_level) as session:
                        await session.execute(
                            text("SET app.tenant_id = :tid"),
                            {"tid": self._tenant_id},
                        )
                        yield session
    """
    async with self._session_maker() as session:
        # Apply isolation level if specified (DRY: using utility function)
        await apply_isolation_level(session, isolation_level)
        yield session

query(isolation_level=None) async

Create a read-only query context without transaction management.

This method is designed for read-only operations and does not start a transaction or perform any commit/rollback. It's semantically clearer than managed_session() for read operations.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

None

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

Read-only query: async with uow.query() as qx: users = await qx.users.list_all() user = await qx.users.get_by_id(user_id) # No commit/rollback - just closes session

Note

While this method is intended for read-only operations, SQLAlchemy does not enforce this at the session level. It's up to the caller to ensure only read operations are performed.

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
@asynccontextmanager
async def query(
    self,
    isolation_level: IsolationLevel | str | None = None,
) -> AsyncIterator[T]:
    """Create a read-only query context without transaction management.

    This method is designed for read-only operations and does not start a transaction
    or perform any commit/rollback. It's semantically clearer than managed_session()
    for read operations.

    Args:
        isolation_level: Optional transaction isolation level.
            Can be an IsolationLevel enum member or a string value.
            Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        Read-only query:
            async with uow.query() as qx:
                users = await qx.users.list_all()
                user = await qx.users.get_by_id(user_id)
            # No commit/rollback - just closes session

    Note:
        While this method is intended for read-only operations, SQLAlchemy does not enforce
        this at the session level. It's up to the caller to ensure only read operations are performed.
    """
    async with self.open_session(isolation_level) as session:
        # No transaction begin/commit - just yield the session
        uow = self._transaction_factory(session)
        yield uow

transaction(isolation_level=None, flush_before_commit=None) async

Create a new transaction context with automatic commit/rollback.

The Unit of Work automatically commits the transaction on successful exit and rolls back on exception. This ensures atomic operations.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".

None
flush_before_commit bool | None

If True, flush the session before commit to surface constraint violations while still inside the transaction. If None (default), falls back to the value passed to the constructor (True unless overridden).

None

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

Write operation with automatic commit: async with uow.transaction() as tx: await tx.users.create(...) # Auto-commit on exit, rollback on exception

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
@asynccontextmanager
async def transaction(
    self,
    isolation_level: IsolationLevel | str | None = None,
    flush_before_commit: bool | None = None,
) -> AsyncIterator[T]:
    """Create a new transaction context with automatic commit/rollback.

    The Unit of Work automatically commits the transaction on successful exit
    and rolls back on exception. This ensures atomic operations.

    Args:
        isolation_level: Optional transaction isolation level.
            Can be an IsolationLevel enum member or a string value.
            Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED".
        flush_before_commit: If True, flush the session before commit to surface
            constraint violations while still inside the transaction. If ``None`` (default),
            falls back to the value passed to the constructor (``True`` unless overridden).

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        Write operation with automatic commit:
            async with uow.transaction() as tx:
                await tx.users.create(...)
                # Auto-commit on exit, rollback on exception
    """
    if flush_before_commit is None:
        flush_before_commit = self._flush_before_commit

    async with self.open_session(isolation_level) as session, session.begin():
        uow = self._transaction_factory(session)
        yield uow
        if flush_before_commit:
            # Flush changes before commit to catch constraint violations early
            # while still inside the transaction context.
            try:
                await session.flush()
            except SQLAlchemyError as e:
                logger.warning("Database flush failed", extra={"error": str(e)})
                raise

AsyncSQLAlchemyUowTransaction

Bases: AsyncUowTransaction

Base async SQLAlchemy transaction-scoped repositories.

This class provides access to the underlying SQLAlchemy session and is intended to be subclassed by services to expose specific repositories.

Example

class IdentityTransaction(AsyncSQLAlchemyUowTransaction): @property def users(self) -> UserRepository: return PostgresUserRepository(self.session)

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
class AsyncSQLAlchemyUowTransaction(AsyncUowTransaction):
    """Base async SQLAlchemy transaction-scoped repositories.

    This class provides access to the underlying SQLAlchemy session and is
    intended to be subclassed by services to expose specific repositories.

    Example:
        class IdentityTransaction(AsyncSQLAlchemyUowTransaction):
            @property
            def users(self) -> UserRepository:
                return PostgresUserRepository(self.session)
    """

    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    @property
    def session(self) -> AsyncSession:
        """Get the underlying SQLAlchemy async session."""
        return self._session

session property

Get the underlying SQLAlchemy async session.

PostgresAdvisoryLockMixin

Mixin providing PostgreSQL advisory lock support for UoW transactions.

Requires the class to have a session property returning AsyncSession.

Example

class IdentityTransaction(AsyncSQLAlchemyUowTransaction, PostgresAdvisoryLockMixin): @property def users(self) -> UserRepository: return PostgresUserRepository(self.session)

Now has access to try_advisory_lock method

async with uow.transaction() as tx: if await tx.try_advisory_lock(12345): # Protected operation ...

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
class PostgresAdvisoryLockMixin:
    """Mixin providing PostgreSQL advisory lock support for UoW transactions.

    Requires the class to have a `session` property returning AsyncSession.

    Example:
        class IdentityTransaction(AsyncSQLAlchemyUowTransaction, PostgresAdvisoryLockMixin):
            @property
            def users(self) -> UserRepository:
                return PostgresUserRepository(self.session)

        # Now has access to try_advisory_lock method
        async with uow.transaction() as tx:
            if await tx.try_advisory_lock(12345):
                # Protected operation
                ...
    """

    session: AsyncSession  # Type annotation for protocol compliance

    async def try_advisory_lock(self, key: int) -> bool:
        """Acquire a Postgres transaction-scoped advisory lock.

        Delegates to :func:`try_advisory_xact_lock` for actual locking logic.

        Args:
            key: Integer lock key.

        Returns:
            True if lock was acquired, False if already held by another session.
        """
        return await try_advisory_xact_lock(self.session, key)

try_advisory_lock(key) async

Acquire a Postgres transaction-scoped advisory lock.

Delegates to :func:try_advisory_xact_lock for actual locking logic.

Parameters:

Name Type Description Default
key int

Integer lock key.

required

Returns:

Type Description
bool

True if lock was acquired, False if already held by another session.

Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
async def try_advisory_lock(self, key: int) -> bool:
    """Acquire a Postgres transaction-scoped advisory lock.

    Delegates to :func:`try_advisory_xact_lock` for actual locking logic.

    Args:
        key: Integer lock key.

    Returns:
        True if lock was acquired, False if already held by another session.
    """
    return await try_advisory_xact_lock(self.session, key)

normalize_isolation_level(isolation_level)

Normalize isolation_level to a valid PostgreSQL string or None.

Accepts both enum members and strings. Strings may use either underscores or spaces (e.g., "READ_COMMITTED" or "READ COMMITTED") for convenience.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Enum member, string, or None.

required

Returns:

Type Description
str | None

PostgreSQL-form string (with spaces) valid for execution_options, or None.

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

>>> normalize_isolation_level(None)
None
>>> normalize_isolation_level(IsolationLevel.READ_COMMITTED)
'READ COMMITTED'
>>> normalize_isolation_level("READ_COMMITTED")
'READ COMMITTED'
>>> normalize_isolation_level("read committed")
'READ COMMITTED'
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
def normalize_isolation_level(
    isolation_level: IsolationLevel | str | None,
) -> str | None:
    """Normalize isolation_level to a valid PostgreSQL string or None.

    Accepts both enum members and strings. Strings may use either underscores
    or spaces (e.g., "READ_COMMITTED" or "READ COMMITTED") for convenience.

    Args:
        isolation_level: Enum member, string, or None.

    Returns:
        PostgreSQL-form string (with spaces) valid for execution_options, or None.

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        >>> normalize_isolation_level(None)
        None
        >>> normalize_isolation_level(IsolationLevel.READ_COMMITTED)
        'READ COMMITTED'
        >>> normalize_isolation_level("READ_COMMITTED")
        'READ COMMITTED'
        >>> normalize_isolation_level("read committed")
        'READ COMMITTED'
    """
    # Explicit None check (PEP 8: explicit is better than implicit)
    if isolation_level is None:
        return None

    # Fast path for enum members
    if isinstance(isolation_level, IsolationLevel):
        return isolation_level.value

    # Normalize string input: uppercase and replace underscores with spaces
    normalized = str(isolation_level).upper().replace("_", " ")

    # Validate against cached valid levels
    if normalized not in _VALID_ISOLATION_LEVELS:
        supported = ", ".join(sorted(_VALID_ISOLATION_LEVELS))
        raise ValueError(f"Invalid isolation level: {isolation_level!r}. Supported values: {supported}")

    return normalized

apply_isolation_level(session, isolation_level) async

Apply isolation level to an async session's connection.

Implementation Detail: We use run_sync() because SQLAlchemy's execution_options() is a synchronous method that configures the underlying DBAPI connection object. We must bridge from async context to sync method via run_sync().

This is a DRY utility to eliminate duplication of isolation level application logic across transaction(), managed_session(), and query() methods.

Parameters:

Name Type Description Default
session AsyncSession

SQLAlchemy AsyncSession to configure.

required
isolation_level IsolationLevel | str | None

Desired isolation level (enum, string, or None).

required

Raises:

Type Description
ValueError

If isolation_level is not supported (raised by normalize_isolation_level).

Examples:

>>> async with session_maker() as session:
...     await apply_isolation_level(session, IsolationLevel.SERIALIZABLE)
...     result = await session.execute(select(User))
...     # Query runs with SERIALIZABLE isolation level

Using string isolation level:

>>> await apply_isolation_level(session, "READ COMMITTED")

No-op when None:

>>> await apply_isolation_level(session, None)  # Does nothing
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
async def apply_isolation_level(
    session: AsyncSession,
    isolation_level: IsolationLevel | str | None,
) -> None:
    """Apply isolation level to an async session's connection.

    **Implementation Detail**:
    We use ``run_sync()`` because SQLAlchemy's ``execution_options()`` is a
    synchronous method that configures the underlying DBAPI connection object.
    We must bridge from async context to sync method via ``run_sync()``.

    This is a DRY utility to eliminate duplication of isolation level application
    logic across ``transaction()``, ``managed_session()``, and ``query()`` methods.

    Args:
        session: SQLAlchemy AsyncSession to configure.
        isolation_level: Desired isolation level (enum, string, or None).

    Raises:
        ValueError: If isolation_level is not supported (raised by normalize_isolation_level).

    Examples:
        >>> async with session_maker() as session:
        ...     await apply_isolation_level(session, IsolationLevel.SERIALIZABLE)
        ...     result = await session.execute(select(User))
        ...     # Query runs with SERIALIZABLE isolation level

        Using string isolation level:
        >>> await apply_isolation_level(session, "READ COMMITTED")

        No-op when None:
        >>> await apply_isolation_level(session, None)  # Does nothing
    """
    normalized = normalize_isolation_level(isolation_level)
    if normalized is not None:
        conn = await session.connection()
        # run_sync bridges async → sync for DBAPI-level configuration
        await conn.run_sync(lambda c: c.execution_options(isolation_level=normalized))

Enums for Unit of Work.

IsolationLevel

Bases: StrEnum

PostgreSQL transaction isolation levels.

Values match PostgreSQL's expected form (with spaces) for use with SQLAlchemy execution_options(isolation_level=...).

Source code in sqlalchemy_foundation_kit/uow/enums.py
class IsolationLevel(StrEnum):
    """PostgreSQL transaction isolation levels.

    Values match PostgreSQL's expected form (with spaces) for use with
    SQLAlchemy execution_options(isolation_level=...).
    """

    READ_UNCOMMITTED = "READ UNCOMMITTED"
    READ_COMMITTED = "READ COMMITTED"
    REPEATABLE_READ = "REPEATABLE READ"
    SERIALIZABLE = "SERIALIZABLE"

Example Usage

from sqlalchemy_foundation_kit import (
    AsyncSQLAlchemyUnitOfWork,
    AsyncSQLAlchemyUowTransaction,
    PostgresAdvisoryLockMixin,
)

# Define transaction with repositories
class MyTransaction(AsyncSQLAlchemyUowTransaction, PostgresAdvisoryLockMixin):
    def __init__(self, session):
        super().__init__(session)
        self._users = None

    @property
    def users(self):
        if self._users is None:
            self._users = PostgresUserRepository(self.session)
        return self._users

# Create UoW
class MyUnitOfWork(AsyncSQLAlchemyUnitOfWork[MyTransaction]):
    def __init__(self, session_maker):
        super().__init__(session_maker, transaction_factory=MyTransaction)

# Use in application
async with uow.transaction() as tx:
    # All operations in single transaction
    user = await tx.users.create(email="user@example.com")

    # Advisory lock
    if await tx.try_advisory_lock("process_payments"):
        await process_payments()

Configuration Protocols

Type-safe configuration protocols.

PostgreSQL configuration.

PostgresSettingsProtocol

Bases: Protocol

Protocol for PostgreSQL configuration.

Organized protocol with grouped settings for connection, pool, and query configuration.

Attributes:

Name Type Description
connection ConnectionSettingsProtocol

Connection parameters (host, port, user, database).

pool PoolSettingsProtocol

Connection pool settings.

query QuerySettingsProtocol

Query execution and transaction settings.

application_name str

Application identifier for connections.

db_schema str | None

Optional PostgreSQL schema name.

use_orjson_serialization bool

Enable orjson for JSON operations.

jit str | None

JIT compilation setting (PgBouncer compatibility).

Examples:

Implementing the protocol: >>> class MyConfig: ... connection: ConnectionSettingsProtocol ... pool: PoolSettingsProtocol ... query: QuerySettingsProtocol ... application_name: str = "my-app" ... db_schema: str | None = None ... use_orjson_serialization: bool = True ... jit: str | None = "off" ... ... def to_dsn(self) -> str: ... return f"postgresql://{self.connection.user}@{self.connection.host}..."

Using the protocol: >>> def create_engine(config: PostgresSettingsProtocol): ... dsn = config.to_dsn() ... pool_size = config.pool.pool_size ... echo = config.query.echo

Source code in sqlalchemy_foundation_kit/config/postgres.py
class PostgresSettingsProtocol(Protocol):
    """Protocol for PostgreSQL configuration.

    Organized protocol with grouped settings for connection, pool, and query configuration.

    Attributes:
        connection: Connection parameters (host, port, user, database).
        pool: Connection pool settings.
        query: Query execution and transaction settings.
        application_name: Application identifier for connections.
        db_schema: Optional PostgreSQL schema name.
        use_orjson_serialization: Enable orjson for JSON operations.
        jit: JIT compilation setting (PgBouncer compatibility).

    Examples:
        Implementing the protocol:
            >>> class MyConfig:
            ...     connection: ConnectionSettingsProtocol
            ...     pool: PoolSettingsProtocol
            ...     query: QuerySettingsProtocol
            ...     application_name: str = "my-app"
            ...     db_schema: str | None = None
            ...     use_orjson_serialization: bool = True
            ...     jit: str | None = "off"
            ...
            ...     def to_dsn(self) -> str:
            ...         return f"postgresql://{self.connection.user}@{self.connection.host}..."

        Using the protocol:
            >>> def create_engine(config: PostgresSettingsProtocol):
            ...     dsn = config.to_dsn()
            ...     pool_size = config.pool.pool_size
            ...     echo = config.query.echo
    """

    connection: ConnectionSettingsProtocol
    pool: PoolSettingsProtocol
    query: QuerySettingsProtocol
    application_name: str
    db_schema: str | None
    use_orjson_serialization: bool
    jit: str | None

    def to_dsn(self) -> str:
        """Convert config to DSN string.

        Returns PostgreSQL connection string in format:
        postgresql+asyncpg://user:password@host:port/database

        Examples:
            >>> config.to_dsn()
            'postgresql+asyncpg://user:***@localhost:5432/mydb'
        """
        ...

to_dsn()

Convert config to DSN string.

Returns PostgreSQL connection string in format: postgresql+asyncpg://user:password@host:port/database

Examples:

>>> config.to_dsn()
'postgresql+asyncpg://user:***@localhost:5432/mydb'
Source code in sqlalchemy_foundation_kit/config/postgres.py
def to_dsn(self) -> str:
    """Convert config to DSN string.

    Returns PostgreSQL connection string in format:
    postgresql+asyncpg://user:password@host:port/database

    Examples:
        >>> config.to_dsn()
        'postgresql+asyncpg://user:***@localhost:5432/mydb'
    """
    ...

ConnectionSettingsProtocol

Bases: Protocol

Protocol for PostgreSQL connection settings.

Defines connection parameters required for establishing database connections.

Attributes:

Name Type Description
host str

PostgreSQL server hostname or IP address.

port int

PostgreSQL server port number.

user str

Database username for authentication.

password PasswordLike | str

Database password — either a plain str or a SecretStr-like object implementing get_secret_value().

database str

Target database name.

Source code in sqlalchemy_foundation_kit/config/postgres.py
class ConnectionSettingsProtocol(Protocol):
    """Protocol for PostgreSQL connection settings.

    Defines connection parameters required for establishing database connections.

    Attributes:
        host: PostgreSQL server hostname or IP address.
        port: PostgreSQL server port number.
        user: Database username for authentication.
        password: Database password — either a plain ``str`` or a SecretStr-like object
            implementing ``get_secret_value()``.
        database: Target database name.
    """

    host: str
    port: int
    user: str
    password: PasswordLike | str
    database: str

PoolSettingsProtocol

Bases: Protocol

Protocol for PostgreSQL connection pool settings.

Defines pool configuration for SQLAlchemy engine connection management.

Attributes:

Name Type Description
kind str | type

Connection pool implementation (queue, static, etc.).

size int | None

Number of connections to maintain in pool.

max_overflow int | None

Additional connections allowed when pool exhausted.

pre_ping bool

Test connection health before checkout.

recycle int | None

Recycle connections after N seconds.

timeout float | None

Timeout for acquiring connection from pool.

Examples:

>>> pool: PoolSettingsProtocol = ...
>>> if pool.size > 100:
...     logger.warning("Large pool size detected")
Source code in sqlalchemy_foundation_kit/config/postgres.py
class PoolSettingsProtocol(Protocol):
    """Protocol for PostgreSQL connection pool settings.

    Defines pool configuration for SQLAlchemy engine connection management.

    Attributes:
        kind: Connection pool implementation (queue, static, etc.).
        size: Number of connections to maintain in pool.
        max_overflow: Additional connections allowed when pool exhausted.
        pre_ping: Test connection health before checkout.
        recycle: Recycle connections after N seconds.
        timeout: Timeout for acquiring connection from pool.

    Examples:
        >>> pool: PoolSettingsProtocol = ...
        >>> if pool.size > 100:
        ...     logger.warning("Large pool size detected")
    """

    kind: str | type
    size: int | None
    max_overflow: int | None
    pre_ping: bool
    recycle: int | None
    timeout: float | None

QuerySettingsProtocol

Bases: Protocol

Protocol for PostgreSQL query execution settings.

Defines query behavior, caching, and transaction isolation configuration.

Attributes:

Name Type Description
echo bool

Enable SQL statement logging.

statement_cache_size int | None

Prepared statement cache size.

prepared_statement_cache_size int | None

Server-side prepared statement cache size.

isolation_level str | None

Transaction isolation level.

Examples:

>>> query: QuerySettingsProtocol = ...
>>> if query.echo:
...     logger.info("SQL echo enabled")
Source code in sqlalchemy_foundation_kit/config/postgres.py
class QuerySettingsProtocol(Protocol):
    """Protocol for PostgreSQL query execution settings.

    Defines query behavior, caching, and transaction isolation configuration.

    Attributes:
        echo: Enable SQL statement logging.
        statement_cache_size: Prepared statement cache size.
        prepared_statement_cache_size: Server-side prepared statement cache size.
        isolation_level: Transaction isolation level.

    Examples:
        >>> query: QuerySettingsProtocol = ...
        >>> if query.echo:
        ...     logger.info("SQL echo enabled")
    """

    echo: bool
    statement_cache_size: int | None
    prepared_statement_cache_size: int | None
    isolation_level: str | None

Example Usage

from dataclasses import dataclass
from pydantic import SecretStr
from sqlalchemy_foundation_kit import PostgresSettingsProtocol

@dataclass
class MyPostgresConfig:
    connection: ConnectionSettingsProtocol
    pool: PoolSettingsProtocol
    query: QuerySettingsProtocol
    application_name: str = "my-service"

    def to_dsn(self) -> str:
        return f"postgresql+asyncpg://..."

Metrics Protocols

Observability protocols for monitoring.

Metrics protocols for database monitoring.

Protocols are split by capability (ISP). Implementations may satisfy the narrow protocols selectively (e.g., only record errors), and PostgresMetricsProtocol aggregates them for convenience.

PostgresMetricsProtocol

Bases: PoolStatsRecorder, CheckoutRecorder, ErrorRecorder, Protocol

Composite protocol covering all PostgreSQL metrics capabilities.

Aggregates the narrow capability protocols for convenience. Implementations that only need a subset can implement the individual protocols directly.

Examples:

>>> class MyMetrics:
...     def record_pool_stats(self, pool_size: int, pool_checked_out: int, pool_overflow: int) -> None: ...
...     def record_checkout(self, duration: float) -> None: ...
...     def record_error(self, error_type: str, is_timeout: bool = False) -> None: ...
>>> metrics: PostgresMetricsProtocol = MyMetrics()
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
class PostgresMetricsProtocol(PoolStatsRecorder, CheckoutRecorder, ErrorRecorder, Protocol):
    """Composite protocol covering all PostgreSQL metrics capabilities.

    Aggregates the narrow capability protocols for convenience. Implementations
    that only need a subset can implement the individual protocols directly.

    Examples:
        >>> class MyMetrics:
        ...     def record_pool_stats(self, pool_size: int, pool_checked_out: int, pool_overflow: int) -> None: ...
        ...     def record_checkout(self, duration: float) -> None: ...
        ...     def record_error(self, error_type: str, is_timeout: bool = False) -> None: ...
        >>> metrics: PostgresMetricsProtocol = MyMetrics()
    """

record_checkout(duration)

Record a database connection checkout from the pool.

Parameters:

Name Type Description Default
duration float

Time taken to acquire the connection from the pool, in seconds.

required
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
def record_checkout(self, duration: float) -> None:
    """Record a database connection checkout from the pool.

    Args:
        duration: Time taken to acquire the connection from the pool, in seconds.
    """
    ...

record_error(error_type, is_timeout=False)

Record a database connection or execution error.

Parameters:

Name Type Description Default
error_type str

The type of error that occurred (e.g., "OperationalError").

required
is_timeout bool

True if this error was specifically a connection checkout timeout.

False
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
def record_error(self, error_type: str, is_timeout: bool = False) -> None:
    """Record a database connection or execution error.

    Args:
        error_type: The type of error that occurred (e.g., "OperationalError").
        is_timeout: True if this error was specifically a connection checkout timeout.
    """
    ...

record_pool_stats(pool_size, pool_checked_out, pool_overflow)

Record database connection pool statistics.

Parameters:

Name Type Description Default
pool_size int

Current total number of connections in the pool.

required
pool_checked_out int

Number of connections currently in use.

required
pool_overflow int

Number of connections over the configured pool_size.

required
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
def record_pool_stats(
    self,
    pool_size: int,
    pool_checked_out: int,
    pool_overflow: int,
) -> None:
    """Record database connection pool statistics.

    Args:
        pool_size: Current total number of connections in the pool.
        pool_checked_out: Number of connections currently in use.
        pool_overflow: Number of connections over the configured pool_size.
    """
    ...

PoolStatsRecorder

Bases: Protocol

Capability protocol for recording pool statistics.

Source code in sqlalchemy_foundation_kit/protocols/metrics.py
class PoolStatsRecorder(Protocol):
    """Capability protocol for recording pool statistics."""

    def record_pool_stats(
        self,
        pool_size: int,
        pool_checked_out: int,
        pool_overflow: int,
    ) -> None:
        """Record database connection pool statistics.

        Args:
            pool_size: Current total number of connections in the pool.
            pool_checked_out: Number of connections currently in use.
            pool_overflow: Number of connections over the configured pool_size.
        """
        ...

record_pool_stats(pool_size, pool_checked_out, pool_overflow)

Record database connection pool statistics.

Parameters:

Name Type Description Default
pool_size int

Current total number of connections in the pool.

required
pool_checked_out int

Number of connections currently in use.

required
pool_overflow int

Number of connections over the configured pool_size.

required
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
def record_pool_stats(
    self,
    pool_size: int,
    pool_checked_out: int,
    pool_overflow: int,
) -> None:
    """Record database connection pool statistics.

    Args:
        pool_size: Current total number of connections in the pool.
        pool_checked_out: Number of connections currently in use.
        pool_overflow: Number of connections over the configured pool_size.
    """
    ...

CheckoutRecorder

Bases: Protocol

Capability protocol for recording connection checkout duration.

Source code in sqlalchemy_foundation_kit/protocols/metrics.py
class CheckoutRecorder(Protocol):
    """Capability protocol for recording connection checkout duration."""

    def record_checkout(self, duration: float) -> None:
        """Record a database connection checkout from the pool.

        Args:
            duration: Time taken to acquire the connection from the pool, in seconds.
        """
        ...

record_checkout(duration)

Record a database connection checkout from the pool.

Parameters:

Name Type Description Default
duration float

Time taken to acquire the connection from the pool, in seconds.

required
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
def record_checkout(self, duration: float) -> None:
    """Record a database connection checkout from the pool.

    Args:
        duration: Time taken to acquire the connection from the pool, in seconds.
    """
    ...

ErrorRecorder

Bases: Protocol

Capability protocol for recording database errors.

Source code in sqlalchemy_foundation_kit/protocols/metrics.py
class ErrorRecorder(Protocol):
    """Capability protocol for recording database errors."""

    def record_error(self, error_type: str, is_timeout: bool = False) -> None:
        """Record a database connection or execution error.

        Args:
            error_type: The type of error that occurred (e.g., "OperationalError").
            is_timeout: True if this error was specifically a connection checkout timeout.
        """
        ...

record_error(error_type, is_timeout=False)

Record a database connection or execution error.

Parameters:

Name Type Description Default
error_type str

The type of error that occurred (e.g., "OperationalError").

required
is_timeout bool

True if this error was specifically a connection checkout timeout.

False
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
def record_error(self, error_type: str, is_timeout: bool = False) -> None:
    """Record a database connection or execution error.

    Args:
        error_type: The type of error that occurred (e.g., "OperationalError").
        is_timeout: True if this error was specifically a connection checkout timeout.
    """
    ...

Example Usage

from sqlalchemy_foundation_kit.protocols import PostgresMetricsProtocol

class CustomMetrics:
    """Custom metrics implementation."""

    def record_pool_stats(
        self,
        pool_size: int,
        pool_checked_out: int,
        pool_overflow: int,
    ) -> None:
        # Record pool statistics
        pass

    def record_checkout(self, duration: float) -> None:
        # Record connection checkout duration
        pass

    def record_error(self, error_type: str, is_timeout: bool) -> None:
        # Record database errors
        pass

Contrib Modules

Optional modules with extra functionality.

contrib.settings

Pydantic-based configuration (requires [settings] extra).

Base PostgreSQL configuration using pydantic-settings.

BasePostgresConfig

Bases: BaseSettings

Base PostgreSQL configuration.

Organized configuration for PostgreSQL database connections with grouped settings.

Attributes:

Name Type Description
connection ConnectionSettings

Connection parameters (host, port, credentials, database).

pool PoolSettings

Connection pool configuration (size, overflow, timeouts).

query QuerySettings

Query execution settings (echo, caching, isolation level).

application_name str

Application name for connection identification.

db_schema str | None

Optional PostgreSQL schema name.

use_orjson_serialization bool

Use orjson for JSON serialization (requires orjson).

jit PostgresJit | None

JIT compilation setting (off/on) for PgBouncer compatibility.

metrics_enabled bool

Enable connection pool metrics collection.

Examples:

>>> config = BasePostgresConfig(
...     connection=ConnectionSettings(
...         host="localhost",
...         user="postgres",
...         password=SecretStr("secret"),
...         database="mydb",
...     ),
...     application_name="my-service",
... )
>>> dsn = config.to_dsn()
>>> host = config.connection.host
>>> pool_size = config.pool.size
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
class BasePostgresConfig(BaseSettings):
    """Base PostgreSQL configuration.

    Organized configuration for PostgreSQL database connections with grouped settings.

    Attributes:
        connection: Connection parameters (host, port, credentials, database).
        pool: Connection pool configuration (size, overflow, timeouts).
        query: Query execution settings (echo, caching, isolation level).
        application_name: Application name for connection identification.
        db_schema: Optional PostgreSQL schema name.
        use_orjson_serialization: Use orjson for JSON serialization (requires orjson).
        jit: JIT compilation setting (off/on) for PgBouncer compatibility.
        metrics_enabled: Enable connection pool metrics collection.

    Examples:
        >>> config = BasePostgresConfig(
        ...     connection=ConnectionSettings(
        ...         host="localhost",
        ...         user="postgres",
        ...         password=SecretStr("secret"),
        ...         database="mydb",
        ...     ),
        ...     application_name="my-service",
        ... )
        >>> dsn = config.to_dsn()
        >>> host = config.connection.host
        >>> pool_size = config.pool.size
    """

    # Grouped configuration
    connection: ConnectionSettings = Field(description="PostgreSQL connection settings")
    pool: PoolSettings = Field(default_factory=PoolSettings, description="Connection pool settings")
    query: QuerySettings = Field(default_factory=QuerySettings, description="Query execution settings")

    # Top-level settings
    application_name: str = Field(description="Application name for PostgreSQL")
    db_schema: str | None = Field(default=None, description="PostgreSQL schema name")
    use_orjson_serialization: bool = Field(
        default=True,
        description="Use orjson for JSON serialization (requires orjson installed)",
    )
    jit: PostgresJit | None = Field(default="off", description="JIT setting (off/on)")
    metrics_enabled: bool = Field(default=False, description="Enable PostgreSQL metrics")

    def __repr__(self) -> str:
        """Return representation with masked password."""
        return f"{self.__class__.__name__}(dsn={self.to_dsn(mask_password=True)!r})"

    def to_dsn(self, driver: str | None = "asyncpg", mask_password: bool = False) -> str:
        """Build PostgreSQL DSN for async connections.

        This library is async-only, so DSN always includes asyncpg driver by default.

        Args:
            driver: Driver name (default: 'asyncpg' for async connections).
                Pass None to omit driver suffix.
            mask_password: If True, the password will be masked (default: False).

        Returns:
            PostgreSQL connection string (e.g., 'postgresql+asyncpg://user:pass@host:5432/db').

        Examples:
            >>> config.to_dsn()
            'postgresql+asyncpg://user:secret@localhost:5432/mydb'
            >>> config.to_dsn(mask_password=True)
            'postgresql+asyncpg://user:**********@localhost:5432/mydb'
        """
        user = quote_plus(self.connection.user)
        password = "**********" if mask_password else quote_plus(self.connection.password.get_secret_value())
        scheme = f"postgresql+{driver}" if driver else "postgresql"
        return f"{scheme}://{user}:{password}@{self.connection.host}:{self.connection.port}/{self.connection.database}"
__repr__()

Return representation with masked password.

Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
def __repr__(self) -> str:
    """Return representation with masked password."""
    return f"{self.__class__.__name__}(dsn={self.to_dsn(mask_password=True)!r})"
to_dsn(driver='asyncpg', mask_password=False)

Build PostgreSQL DSN for async connections.

This library is async-only, so DSN always includes asyncpg driver by default.

Parameters:

Name Type Description Default
driver str | None

Driver name (default: 'asyncpg' for async connections). Pass None to omit driver suffix.

'asyncpg'
mask_password bool

If True, the password will be masked (default: False).

False

Returns:

Type Description
str

PostgreSQL connection string (e.g., 'postgresql+asyncpg://user:pass@host:5432/db').

Examples:

>>> config.to_dsn()
'postgresql+asyncpg://user:secret@localhost:5432/mydb'
>>> config.to_dsn(mask_password=True)
'postgresql+asyncpg://user:**********@localhost:5432/mydb'
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
def to_dsn(self, driver: str | None = "asyncpg", mask_password: bool = False) -> str:
    """Build PostgreSQL DSN for async connections.

    This library is async-only, so DSN always includes asyncpg driver by default.

    Args:
        driver: Driver name (default: 'asyncpg' for async connections).
            Pass None to omit driver suffix.
        mask_password: If True, the password will be masked (default: False).

    Returns:
        PostgreSQL connection string (e.g., 'postgresql+asyncpg://user:pass@host:5432/db').

    Examples:
        >>> config.to_dsn()
        'postgresql+asyncpg://user:secret@localhost:5432/mydb'
        >>> config.to_dsn(mask_password=True)
        'postgresql+asyncpg://user:**********@localhost:5432/mydb'
    """
    user = quote_plus(self.connection.user)
    password = "**********" if mask_password else quote_plus(self.connection.password.get_secret_value())
    scheme = f"postgresql+{driver}" if driver else "postgresql"
    return f"{scheme}://{user}:{password}@{self.connection.host}:{self.connection.port}/{self.connection.database}"

ConnectionSettings

Bases: BaseModel

PostgreSQL connection configuration.

Groups all connection-related parameters: host, port, credentials, and database name.

Examples:

>>> connection = ConnectionSettings(
...     host="localhost",
...     port=5432,
...     user="postgres",
...     password=SecretStr("secret"),
...     database="mydb",
... )
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
class ConnectionSettings(BaseModel):
    """PostgreSQL connection configuration.

    Groups all connection-related parameters: host, port, credentials, and database name.

    Examples:
        >>> connection = ConnectionSettings(
        ...     host="localhost",
        ...     port=5432,
        ...     user="postgres",
        ...     password=SecretStr("secret"),
        ...     database="mydb",
        ... )
    """

    host: str = Field(default="localhost", description="PostgreSQL host")
    port: int = Field(default=5432, ge=1, le=65535, description="PostgreSQL port")
    user: str = Field(default="postgres", description="PostgreSQL user")
    password: SecretStr = Field(description="PostgreSQL password")
    database: str = Field(description="Database name")

PoolSettings

Bases: BaseModel

PostgreSQL connection pool configuration.

Groups all connection pool-related parameters for SQLAlchemy engine.

Examples:

>>> pool = PoolSettings(size=10, max_overflow=20)
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
class PoolSettings(BaseModel):
    """PostgreSQL connection pool configuration.

    Groups all connection pool-related parameters for SQLAlchemy engine.

    Examples:
        >>> pool = PoolSettings(size=10, max_overflow=20)
    """

    kind: PoolClassStr = Field(
        default="async_adapted_queue",
        description="PostgreSQL pool implementation kind",
    )
    size: int = Field(default=10, ge=1, description="Connection pool size")
    max_overflow: int = Field(default=20, ge=0, description="Additional connections when pool is exhausted")
    pre_ping: bool = Field(default=True, description="Check connection health before use (pre-ping)")
    recycle: int = Field(default=3600, ge=-1, description="Recycle connections after N seconds")
    timeout: float = Field(
        default=30.0,
        ge=0,
        description="Seconds to wait before giving up on getting connection",
    )

    @model_validator(mode="after")
    def _validate_pool_settings(self) -> PoolSettings:
        """Validate pool configuration constraints."""
        if self.kind == "static" and self.max_overflow > 0:
            raise ValueError(f"max_overflow must be 0 for static pool, got {self.max_overflow}")
        return self

QuerySettings

Bases: BaseModel

PostgreSQL query and performance configuration.

Groups query execution, caching, and transaction isolation settings.

Examples:

>>> query = QuerySettings(echo=False, isolation_level="READ COMMITTED")
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
class QuerySettings(BaseModel):
    """PostgreSQL query and performance configuration.

    Groups query execution, caching, and transaction isolation settings.

    Examples:
        >>> query = QuerySettings(echo=False, isolation_level="READ COMMITTED")
    """

    echo: bool = Field(default=False, description="Echo SQL queries")
    statement_cache_size: int = Field(default=0, ge=0, description="Statement cache size")
    prepared_statement_cache_size: int = Field(default=0, ge=0, description="Prepared statement cache size")
    isolation_level: PostgresIsolationLevel | None = Field(default=None, description="Transaction isolation level")

BasePostgresMigrationsConfig

Bases: BaseSettings

Base configuration for PostgreSQL database migrations.

Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
class BasePostgresMigrationsConfig(BaseSettings):
    """Base configuration for PostgreSQL database migrations."""

    model_config = SettingsConfigDict(
        extra="ignore",
        env_nested_delimiter="__",
        case_sensitive=False,
    )

    postgres: BasePostgresConfig

contrib.metrics

Prometheus metrics (requires [metrics] extra).

Postgres metrics using prometheus-client.

PostgresMetrics

Postgres connection pool metrics.

Metrics
  • postgres_db_pool_size: Current database connection pool size.
  • postgres_db_pool_checked_out: Number of connections currently checked out.
  • postgres_db_pool_overflow: Number of connections over pool_size (within max_overflow).
  • postgres_db_connection_checkout_duration_seconds: Time to acquire connection from pool.
  • postgres_db_connection_timeouts_total: Number of connection checkout timeouts.
  • postgres_db_connection_errors_total: Number of connection errors.
Labels
  • error_type: Type of connection error (for errors_total).
Source code in sqlalchemy_foundation_kit/contrib/metrics/postgres.py
class PostgresMetrics:
    """Postgres connection pool metrics.

    Metrics:
        - postgres_db_pool_size: Current database connection pool size.
        - postgres_db_pool_checked_out: Number of connections currently checked out.
        - postgres_db_pool_overflow: Number of connections over pool_size (within max_overflow).
        - postgres_db_connection_checkout_duration_seconds: Time to acquire connection from pool.
        - postgres_db_connection_timeouts_total: Number of connection checkout timeouts.
        - postgres_db_connection_errors_total: Number of connection errors.

    Labels:
        - error_type: Type of connection error (for errors_total).
    """

    def __init__(self, prefix: str | None = None) -> None:
        """Initialize postgres metrics.

        Args:
            prefix: Metric name prefix.

        Raises:
            ImportError: If prometheus-client is not installed.
        """
        _check_prometheus()

        self.pool_size = Gauge(
            _make_metric_name("postgres_db_pool_size", prefix),
            "Current database connection pool size",
        )
        self.pool_checked_out = Gauge(
            _make_metric_name("postgres_db_pool_checked_out", prefix),
            "Number of database connections currently checked out",
        )
        self.pool_overflow = Gauge(
            _make_metric_name("postgres_db_pool_overflow", prefix),
            "Number of connections over pool_size (within max_overflow)",
        )
        self.connection_checkout_duration = Histogram(
            _make_metric_name("postgres_db_connection_checkout_duration_seconds", prefix),
            "Time to acquire connection from pool",
            buckets=list(CONNECTION_CHECKOUT_BUCKETS),
        )
        self.connection_timeouts_total = Counter(
            _make_metric_name("postgres_db_connection_timeouts_total", prefix),
            "Number of connection checkout timeouts",
        )
        self.connection_errors_total = Counter(
            _make_metric_name("postgres_db_connection_errors_total", prefix),
            "Number of connection errors",
            ["error_type"],
        )

    def record_pool_stats(
        self,
        pool_size: int,
        pool_checked_out: int,
        pool_overflow: int,
    ) -> None:
        """Record database connection pool statistics."""
        self.pool_size.set(float(pool_size))
        self.pool_checked_out.set(float(pool_checked_out))
        self.pool_overflow.set(float(pool_overflow))

    def record_checkout(
        self,
        duration: float,
    ) -> None:
        """Record a database connection checkout from the pool."""
        self.connection_checkout_duration.observe(duration)

    def record_error(
        self,
        error_type: str,
        is_timeout: bool = False,
    ) -> None:
        """Record a database connection or execution error."""
        self.connection_errors_total.labels(error_type=error_type).inc()
        if is_timeout:
            self.connection_timeouts_total.inc()
__init__(prefix=None)

Initialize postgres metrics.

Parameters:

Name Type Description Default
prefix str | None

Metric name prefix.

None

Raises:

Type Description
ImportError

If prometheus-client is not installed.

Source code in sqlalchemy_foundation_kit/contrib/metrics/postgres.py
def __init__(self, prefix: str | None = None) -> None:
    """Initialize postgres metrics.

    Args:
        prefix: Metric name prefix.

    Raises:
        ImportError: If prometheus-client is not installed.
    """
    _check_prometheus()

    self.pool_size = Gauge(
        _make_metric_name("postgres_db_pool_size", prefix),
        "Current database connection pool size",
    )
    self.pool_checked_out = Gauge(
        _make_metric_name("postgres_db_pool_checked_out", prefix),
        "Number of database connections currently checked out",
    )
    self.pool_overflow = Gauge(
        _make_metric_name("postgres_db_pool_overflow", prefix),
        "Number of connections over pool_size (within max_overflow)",
    )
    self.connection_checkout_duration = Histogram(
        _make_metric_name("postgres_db_connection_checkout_duration_seconds", prefix),
        "Time to acquire connection from pool",
        buckets=list(CONNECTION_CHECKOUT_BUCKETS),
    )
    self.connection_timeouts_total = Counter(
        _make_metric_name("postgres_db_connection_timeouts_total", prefix),
        "Number of connection checkout timeouts",
    )
    self.connection_errors_total = Counter(
        _make_metric_name("postgres_db_connection_errors_total", prefix),
        "Number of connection errors",
        ["error_type"],
    )
record_checkout(duration)

Record a database connection checkout from the pool.

Source code in sqlalchemy_foundation_kit/contrib/metrics/postgres.py
def record_checkout(
    self,
    duration: float,
) -> None:
    """Record a database connection checkout from the pool."""
    self.connection_checkout_duration.observe(duration)
record_error(error_type, is_timeout=False)

Record a database connection or execution error.

Source code in sqlalchemy_foundation_kit/contrib/metrics/postgres.py
def record_error(
    self,
    error_type: str,
    is_timeout: bool = False,
) -> None:
    """Record a database connection or execution error."""
    self.connection_errors_total.labels(error_type=error_type).inc()
    if is_timeout:
        self.connection_timeouts_total.inc()
record_pool_stats(pool_size, pool_checked_out, pool_overflow)

Record database connection pool statistics.

Source code in sqlalchemy_foundation_kit/contrib/metrics/postgres.py
def record_pool_stats(
    self,
    pool_size: int,
    pool_checked_out: int,
    pool_overflow: int,
) -> None:
    """Record database connection pool statistics."""
    self.pool_size.set(float(pool_size))
    self.pool_checked_out.set(float(pool_checked_out))
    self.pool_overflow.set(float(pool_overflow))

contrib.di (Dishka)

Dishka dependency injection providers (requires [dishka] extra).

Database providers for dishka.

AsyncDatabaseProvider

Bases: BaseDishkaProvider

Provider for database dependencies.

Provides
  • AsyncSessionManager: Manages database connections and engine lifecycle.
  • async_sessionmaker[AsyncSession]: Factory for creating database sessions.

Note: PostgresMetricsProtocol must be provided by a separate provider (e.g., :class:PrometheusPostgresMetricsProvider) or registered in the container. If you don't want metrics, simply don't register such a provider.

Customization
  • Pass healthcheck_query=None to skip the startup connectivity check.
  • Pass a custom :class:RetryConfig to tune startup retry behaviour.
  • Subclass and override :meth:create_session_manager to fully customize how the manager is constructed (e.g., to pass extra_server_settings, connection_class, or on_engine_created).
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
class AsyncDatabaseProvider(BaseDishkaProvider):
    """Provider for database dependencies.

    Provides:
        - ``AsyncSessionManager``: Manages database connections and engine lifecycle.
        - ``async_sessionmaker[AsyncSession]``: Factory for creating database sessions.

    Note: ``PostgresMetricsProtocol`` must be provided by a separate provider
    (e.g., :class:`PrometheusPostgresMetricsProvider`) or registered in the container.
    If you don't want metrics, simply don't register such a provider.

    Customization:
        - Pass ``healthcheck_query=None`` to skip the startup connectivity check.
        - Pass a custom :class:`RetryConfig` to tune startup retry behaviour.
        - Subclass and override :meth:`create_session_manager` to fully customize
          how the manager is constructed (e.g., to pass ``extra_server_settings``,
          ``connection_class``, or ``on_engine_created``).
    """

    scope = Scope.APP

    def __init__(
        self,
        healthcheck_query: str | None = DEFAULT_HEALTHCHECK_QUERY,
        retry_config: RetryConfig = DEFAULT_RETRY_CONFIG,
    ) -> None:
        """Initialize provider.

        Args:
            healthcheck_query: SQL executed at startup to verify connectivity.
                Pass ``None`` to skip the healthcheck entirely.
            retry_config: Retry behavior for the startup healthcheck.
        """
        super().__init__()
        self._healthcheck_query = healthcheck_query
        self._retry_config = retry_config

    def create_session_manager(
        self,
        postgres_config: PostgresSettingsProtocol,
        metrics: PostgresMetricsProtocol | None,
    ) -> AsyncSessionManager[AsyncSession]:
        """Build an ``AsyncSessionManager`` for the given config.

        Override this hook to customize session manager construction — for example,
        to pass ``extra_server_settings``, a custom ``connection_class``, or an
        ``on_engine_created`` callback for OpenTelemetry instrumentation.

        Args:
            postgres_config: PostgreSQL configuration.
            metrics: Optional metrics collector.

        Returns:
            Configured ``AsyncSessionManager``.
        """
        return create_async_session_manager(postgres_config, metrics=metrics)

    @provide
    async def get_session_manager(
        self,
        postgres_config: PostgresSettingsProtocol,
        metrics: PostgresMetricsProtocol | None = None,
    ) -> AsyncIterator[AsyncSessionManager[AsyncSession]]:
        """Provide database session manager."""
        manager = self.create_session_manager(postgres_config, metrics)

        if self._healthcheck_query is not None:
            query = self._healthcheck_query

            async def test_connection() -> None:
                async with manager.session_maker() as session:
                    await session.execute(text(query))

            await retry_async_connection(
                connect_func=test_connection,
                service_name="PostgreSQL",
                config=self._retry_config,
            )

        try:
            yield manager
        finally:
            try:
                await manager.aclose()
                logger.info("Database session manager closed successfully")
            except SQLAlchemyError as e:
                logger.warning("Error closing database session manager: %s", e)

    @provide
    def get_session_maker(self, session_manager: AsyncSessionManager[AsyncSession]) -> async_sessionmaker[AsyncSession]:
        """Provide session maker."""
        return session_manager.session_maker  # type: ignore[no-any-return]
__init__(healthcheck_query=DEFAULT_HEALTHCHECK_QUERY, retry_config=DEFAULT_RETRY_CONFIG)

Initialize provider.

Parameters:

Name Type Description Default
healthcheck_query str | None

SQL executed at startup to verify connectivity. Pass None to skip the healthcheck entirely.

DEFAULT_HEALTHCHECK_QUERY
retry_config RetryConfig

Retry behavior for the startup healthcheck.

DEFAULT_RETRY_CONFIG
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
def __init__(
    self,
    healthcheck_query: str | None = DEFAULT_HEALTHCHECK_QUERY,
    retry_config: RetryConfig = DEFAULT_RETRY_CONFIG,
) -> None:
    """Initialize provider.

    Args:
        healthcheck_query: SQL executed at startup to verify connectivity.
            Pass ``None`` to skip the healthcheck entirely.
        retry_config: Retry behavior for the startup healthcheck.
    """
    super().__init__()
    self._healthcheck_query = healthcheck_query
    self._retry_config = retry_config
__init_subclass__(**kwargs)

Check dishka availability when creating a subclass.

Source code in sqlalchemy_foundation_kit/contrib/di/_base.py
def __init_subclass__(cls, **kwargs: object) -> None:
    """Check dishka availability when creating a subclass."""
    super().__init_subclass__(**kwargs)
    check_dishka()
create_session_manager(postgres_config, metrics)

Build an AsyncSessionManager for the given config.

Override this hook to customize session manager construction — for example, to pass extra_server_settings, a custom connection_class, or an on_engine_created callback for OpenTelemetry instrumentation.

Parameters:

Name Type Description Default
postgres_config PostgresSettingsProtocol

PostgreSQL configuration.

required
metrics PostgresMetricsProtocol | None

Optional metrics collector.

required

Returns:

Type Description
AsyncSessionManager[AsyncSession]

Configured AsyncSessionManager.

Source code in sqlalchemy_foundation_kit/contrib/di/database.py
def create_session_manager(
    self,
    postgres_config: PostgresSettingsProtocol,
    metrics: PostgresMetricsProtocol | None,
) -> AsyncSessionManager[AsyncSession]:
    """Build an ``AsyncSessionManager`` for the given config.

    Override this hook to customize session manager construction — for example,
    to pass ``extra_server_settings``, a custom ``connection_class``, or an
    ``on_engine_created`` callback for OpenTelemetry instrumentation.

    Args:
        postgres_config: PostgreSQL configuration.
        metrics: Optional metrics collector.

    Returns:
        Configured ``AsyncSessionManager``.
    """
    return create_async_session_manager(postgres_config, metrics=metrics)
get_session_maker(session_manager)

Provide session maker.

Source code in sqlalchemy_foundation_kit/contrib/di/database.py
@provide
def get_session_maker(self, session_manager: AsyncSessionManager[AsyncSession]) -> async_sessionmaker[AsyncSession]:
    """Provide session maker."""
    return session_manager.session_maker  # type: ignore[no-any-return]
get_session_manager(postgres_config, metrics=None) async

Provide database session manager.

Source code in sqlalchemy_foundation_kit/contrib/di/database.py
@provide
async def get_session_manager(
    self,
    postgres_config: PostgresSettingsProtocol,
    metrics: PostgresMetricsProtocol | None = None,
) -> AsyncIterator[AsyncSessionManager[AsyncSession]]:
    """Provide database session manager."""
    manager = self.create_session_manager(postgres_config, metrics)

    if self._healthcheck_query is not None:
        query = self._healthcheck_query

        async def test_connection() -> None:
            async with manager.session_maker() as session:
                await session.execute(text(query))

        await retry_async_connection(
            connect_func=test_connection,
            service_name="PostgreSQL",
            config=self._retry_config,
        )

    try:
        yield manager
    finally:
        try:
            await manager.aclose()
            logger.info("Database session manager closed successfully")
        except SQLAlchemyError as e:
            logger.warning("Error closing database session manager: %s", e)

AsyncUnitOfWorkProvider

Bases: BaseDishkaProvider

Provider for Unit of Work.

Provides
  • AsyncUnitOfWork: Standardized interface for database transactions.

Note: UoW is APP-scoped because it's stateless — it only holds a reference to session_maker (factory). Real database connections are created only when calling uow.transaction(), and are properly closed after the context manager exits.

Customization

Override :meth:create_uow to use a custom transaction class (e.g., one that exposes domain repositories as lazy properties).

Source code in sqlalchemy_foundation_kit/contrib/di/database.py
class AsyncUnitOfWorkProvider(BaseDishkaProvider):
    """Provider for Unit of Work.

    Provides:
        - ``AsyncUnitOfWork``: Standardized interface for database transactions.

    Note: UoW is APP-scoped because it's stateless — it only holds a reference to
    ``session_maker`` (factory). Real database connections are created only when calling
    ``uow.transaction()``, and are properly closed after the context manager exits.

    Customization:
        Override :meth:`create_uow` to use a custom transaction class
        (e.g., one that exposes domain repositories as lazy properties).
    """

    scope = Scope.APP

    def create_uow(
        self,
        session_maker: async_sessionmaker[AsyncSession],
    ) -> AsyncUnitOfWork[AsyncSQLAlchemyUowTransaction]:
        """Construct the UoW instance. Override to inject a custom transaction class."""
        return AsyncSQLAlchemyUnitOfWork(session_maker, transaction_factory=AsyncSQLAlchemyUowTransaction)

    @provide
    def get_uow(
        self,
        session_maker: async_sessionmaker[AsyncSession],
    ) -> AsyncUnitOfWork[AsyncSQLAlchemyUowTransaction]:
        """Provide Unit of Work.

        Returns a stateless UoW instance that can be safely reused.
        Each call to ``uow.transaction()`` creates a new database session/connection.
        """
        return self.create_uow(session_maker)
__init__()

Check dishka availability when instantiating.

Source code in sqlalchemy_foundation_kit/contrib/di/_base.py
def __init__(self) -> None:
    """Check dishka availability when instantiating."""
    check_dishka()
    super().__init__()
__init_subclass__(**kwargs)

Check dishka availability when creating a subclass.

Source code in sqlalchemy_foundation_kit/contrib/di/_base.py
def __init_subclass__(cls, **kwargs: object) -> None:
    """Check dishka availability when creating a subclass."""
    super().__init_subclass__(**kwargs)
    check_dishka()
create_uow(session_maker)

Construct the UoW instance. Override to inject a custom transaction class.

Source code in sqlalchemy_foundation_kit/contrib/di/database.py
def create_uow(
    self,
    session_maker: async_sessionmaker[AsyncSession],
) -> AsyncUnitOfWork[AsyncSQLAlchemyUowTransaction]:
    """Construct the UoW instance. Override to inject a custom transaction class."""
    return AsyncSQLAlchemyUnitOfWork(session_maker, transaction_factory=AsyncSQLAlchemyUowTransaction)
get_uow(session_maker)

Provide Unit of Work.

Returns a stateless UoW instance that can be safely reused. Each call to uow.transaction() creates a new database session/connection.

Source code in sqlalchemy_foundation_kit/contrib/di/database.py
@provide
def get_uow(
    self,
    session_maker: async_sessionmaker[AsyncSession],
) -> AsyncUnitOfWork[AsyncSQLAlchemyUowTransaction]:
    """Provide Unit of Work.

    Returns a stateless UoW instance that can be safely reused.
    Each call to ``uow.transaction()`` creates a new database session/connection.
    """
    return self.create_uow(session_maker)

Metrics providers for dishka.

PrometheusPostgresMetricsProvider

Bases: BaseMetricsProvider

Provider for Prometheus PostgreSQL metrics.

Source code in sqlalchemy_foundation_kit/contrib/di/metrics.py
class PrometheusPostgresMetricsProvider(BaseMetricsProvider):
    """Provider for Prometheus PostgreSQL metrics."""

    @provide
    def get_metrics(
        self,
        metrics: PrometheusMetricsSettingsProtocol,
        default_prefix: str | None,
        postgres: PostgresMetricsSettingsProtocol | None = None,
    ) -> PostgresMetricsProtocol | None:
        """Provide Postgres metrics implementing PostgresMetricsProtocol."""
        if postgres is None or not postgres.metrics_enabled:
            return None
        return PostgresMetrics(prefix=_infra_metrics_prefix(default_prefix))
__init__()

Check dishka availability when instantiating.

Source code in sqlalchemy_foundation_kit/contrib/di/_base.py
def __init__(self) -> None:
    """Check dishka availability when instantiating."""
    check_dishka()
    super().__init__()
__init_subclass__(**kwargs)

Check dishka availability when creating a subclass.

Source code in sqlalchemy_foundation_kit/contrib/di/_base.py
def __init_subclass__(cls, **kwargs: object) -> None:
    """Check dishka availability when creating a subclass."""
    super().__init_subclass__(**kwargs)
    check_dishka()
get_metrics(metrics, default_prefix, postgres=None)

Provide Postgres metrics implementing PostgresMetricsProtocol.

Source code in sqlalchemy_foundation_kit/contrib/di/metrics.py
@provide
def get_metrics(
    self,
    metrics: PrometheusMetricsSettingsProtocol,
    default_prefix: str | None,
    postgres: PostgresMetricsSettingsProtocol | None = None,
) -> PostgresMetricsProtocol | None:
    """Provide Postgres metrics implementing PostgresMetricsProtocol."""
    if postgres is None or not postgres.metrics_enabled:
        return None
    return PostgresMetrics(prefix=_infra_metrics_prefix(default_prefix))

contrib.dependency_injector

dependency-injector containers (requires [dependency-injector] extra).

Database containers for dependency-injector.

DatabaseContainer

Bases: BaseDIContainer

Container for database dependencies.

Provides
  • session_manager: Manages database connections and engine lifecycle.
  • session_maker: Factory for creating database sessions.
  • uow: Unit of Work for database transactions.
Configuration
  • postgres_config: PostgreSQL configuration (PostgresSettingsProtocol).
  • metrics: Optional metrics collector (PostgresMetricsProtocol).
  • healthcheck_query: SQL executed at startup (default: "SELECT 1", None to skip).
  • retry_config: Retry behavior for healthcheck (default: RetryConfig()).
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
class DatabaseContainer(BaseDIContainer):
    """Container for database dependencies.

    Provides:
        - ``session_manager``: Manages database connections and engine lifecycle.
        - ``session_maker``: Factory for creating database sessions.
        - ``uow``: Unit of Work for database transactions.

    Configuration:
        - ``postgres_config``: PostgreSQL configuration (``PostgresSettingsProtocol``).
        - ``metrics``: Optional metrics collector (``PostgresMetricsProtocol``).
        - ``healthcheck_query``: SQL executed at startup (default: ``"SELECT 1"``, ``None`` to skip).
        - ``retry_config``: Retry behavior for healthcheck (default: ``RetryConfig()``).
    """

    # Configuration
    postgres_config = providers.Dependency()  # type: ignore[misc,var-annotated]
    metrics = providers.Dependency(default=None)  # type: ignore[misc,var-annotated]

    # Healthcheck configuration
    healthcheck_query = providers.Object(DEFAULT_HEALTHCHECK_QUERY)  # type: ignore[misc,var-annotated]
    retry_config = providers.Object(DEFAULT_RETRY_CONFIG)  # type: ignore[misc,var-annotated]

    # Session manager (resource: handles lifecycle)
    session_manager = providers.Resource(  # type: ignore[misc,var-annotated]
        _create_session_manager_resource,
        postgres_config=postgres_config,
        metrics=metrics,
        healthcheck_query=healthcheck_query,
        retry_config=retry_config,
    )

    # Session maker
    session_maker = providers.Factory(  # type: ignore[misc,var-annotated]
        _get_session_maker,
        session_manager=session_manager,
    )

    # Unit of Work
    uow = providers.Singleton(  # type: ignore[misc,var-annotated]
        _create_uow,
        session_maker=session_maker,
    )
__init__(*args, **kwargs)

Check dependency-injector availability when instantiating.

Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/_base.py
def __init__(self, *args: object, **kwargs: object) -> None:
    """Check dependency-injector availability when instantiating."""
    check_dependency_injector()
    super().__init__(*args, **kwargs)
__init_subclass__(**kwargs)

Check dependency-injector availability when creating a subclass.

Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/_base.py
def __init_subclass__(cls, **kwargs: object) -> None:
    """Check dependency-injector availability when creating a subclass."""
    super().__init_subclass__(**kwargs)
    check_dependency_injector()

AsyncDatabaseResourceProvider

Helper class for managing database session manager lifecycle.

Use this when you need manual control over session manager lifecycle — for example in tests or when not using dependency-injector containers.

Examples:

>>> provider = AsyncDatabaseResourceProvider(config, metrics)
>>> manager = await provider.start()
>>> # Use manager...
>>> await provider.stop()
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
class AsyncDatabaseResourceProvider:
    """Helper class for managing database session manager lifecycle.

    Use this when you need manual control over session manager lifecycle —
    for example in tests or when not using dependency-injector containers.

    Examples:
        >>> provider = AsyncDatabaseResourceProvider(config, metrics)
        >>> manager = await provider.start()
        >>> # Use manager...
        >>> await provider.stop()
    """

    def __init__(
        self,
        postgres_config: PostgresSettingsProtocol,
        metrics: PostgresMetricsProtocol | None = None,
        healthcheck_query: str | None = DEFAULT_HEALTHCHECK_QUERY,
        retry_config: RetryConfig = DEFAULT_RETRY_CONFIG,
    ) -> None:
        """Initialize provider.

        Args:
            postgres_config: PostgreSQL configuration.
            metrics: Optional metrics collector.
            healthcheck_query: SQL executed at startup to verify connectivity.
                Pass ``None`` to skip the healthcheck entirely.
            retry_config: Retry behavior for healthcheck.
        """
        self._postgres_config = postgres_config
        self._metrics = metrics
        self._healthcheck_query = healthcheck_query
        self._retry_config = retry_config
        self._manager: AsyncSessionManager[AsyncSession] | None = None

    async def start(self) -> AsyncSessionManager[AsyncSession]:
        """Start session manager and perform healthcheck."""
        manager = create_async_session_manager(self._postgres_config, metrics=self._metrics)

        if self._healthcheck_query is not None:
            query = self._healthcheck_query

            async def test_connection() -> None:
                async with manager.session_maker() as session:
                    await session.execute(text(query))

            await retry_async_connection(
                connect_func=test_connection,
                service_name="PostgreSQL",
                config=self._retry_config,
            )

        self._manager = manager
        return manager

    async def stop(self) -> None:
        """Stop session manager and close connections."""
        if self._manager is not None:
            try:
                await self._manager.aclose()
                logger.info("Database session manager closed successfully")
            except SQLAlchemyError as e:
                logger.warning("Error closing database session manager: %s", e)
            finally:
                self._manager = None
__init__(postgres_config, metrics=None, healthcheck_query=DEFAULT_HEALTHCHECK_QUERY, retry_config=DEFAULT_RETRY_CONFIG)

Initialize provider.

Parameters:

Name Type Description Default
postgres_config PostgresSettingsProtocol

PostgreSQL configuration.

required
metrics PostgresMetricsProtocol | None

Optional metrics collector.

None
healthcheck_query str | None

SQL executed at startup to verify connectivity. Pass None to skip the healthcheck entirely.

DEFAULT_HEALTHCHECK_QUERY
retry_config RetryConfig

Retry behavior for healthcheck.

DEFAULT_RETRY_CONFIG
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
def __init__(
    self,
    postgres_config: PostgresSettingsProtocol,
    metrics: PostgresMetricsProtocol | None = None,
    healthcheck_query: str | None = DEFAULT_HEALTHCHECK_QUERY,
    retry_config: RetryConfig = DEFAULT_RETRY_CONFIG,
) -> None:
    """Initialize provider.

    Args:
        postgres_config: PostgreSQL configuration.
        metrics: Optional metrics collector.
        healthcheck_query: SQL executed at startup to verify connectivity.
            Pass ``None`` to skip the healthcheck entirely.
        retry_config: Retry behavior for healthcheck.
    """
    self._postgres_config = postgres_config
    self._metrics = metrics
    self._healthcheck_query = healthcheck_query
    self._retry_config = retry_config
    self._manager: AsyncSessionManager[AsyncSession] | None = None
start() async

Start session manager and perform healthcheck.

Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
async def start(self) -> AsyncSessionManager[AsyncSession]:
    """Start session manager and perform healthcheck."""
    manager = create_async_session_manager(self._postgres_config, metrics=self._metrics)

    if self._healthcheck_query is not None:
        query = self._healthcheck_query

        async def test_connection() -> None:
            async with manager.session_maker() as session:
                await session.execute(text(query))

        await retry_async_connection(
            connect_func=test_connection,
            service_name="PostgreSQL",
            config=self._retry_config,
        )

    self._manager = manager
    return manager
stop() async

Stop session manager and close connections.

Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
async def stop(self) -> None:
    """Stop session manager and close connections."""
    if self._manager is not None:
        try:
            await self._manager.aclose()
            logger.info("Database session manager closed successfully")
        except SQLAlchemyError as e:
            logger.warning("Error closing database session manager: %s", e)
        finally:
            self._manager = None

Metrics containers for dependency-injector.

PrometheusMetricsContainer

Bases: BaseDIContainer

Container for Prometheus PostgreSQL metrics.

Provides: - postgres_metrics: PostgreSQL metrics collector implementing PostgresMetricsProtocol.

Configuration
  • metrics_settings: General prometheus metrics settings (PrometheusMetricsSettingsProtocol).
  • default_prefix: Default prefix for infrastructure metrics (str | None).
  • postgres_settings: PostgreSQL settings with metrics_enabled flag (PostgresMetricsSettingsProtocol).
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/metrics.py
class PrometheusMetricsContainer(BaseDIContainer):
    """Container for Prometheus PostgreSQL metrics.

    Provides:
    - postgres_metrics: PostgreSQL metrics collector implementing PostgresMetricsProtocol.

    Configuration:
        - metrics_settings: General prometheus metrics settings (PrometheusMetricsSettingsProtocol).
        - default_prefix: Default prefix for infrastructure metrics (str | None).
        - postgres_settings: PostgreSQL settings with metrics_enabled flag (PostgresMetricsSettingsProtocol).
    """

    # Configuration
    metrics_settings = providers.Dependency()  # type: ignore[misc,var-annotated]
    default_prefix = providers.Dependency()  # type: ignore[misc,var-annotated]
    postgres_settings = providers.Dependency(default=None)  # type: ignore[misc,var-annotated]

    # Postgres metrics
    postgres_metrics = providers.Singleton(  # type: ignore[misc,var-annotated]
        _create_postgres_metrics,
        metrics_settings=metrics_settings,
        default_prefix=default_prefix,
        postgres_settings=postgres_settings,
    )
__init__(*args, **kwargs)

Check dependency-injector availability when instantiating.

Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/_base.py
def __init__(self, *args: object, **kwargs: object) -> None:
    """Check dependency-injector availability when instantiating."""
    check_dependency_injector()
    super().__init__(*args, **kwargs)
__init_subclass__(**kwargs)

Check dependency-injector availability when creating a subclass.

Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/_base.py
def __init_subclass__(cls, **kwargs: object) -> None:
    """Check dependency-injector availability when creating a subclass."""
    super().__init_subclass__(**kwargs)
    check_dependency_injector()

contrib.telemetry

OpenTelemetry instrumentation (requires [telemetry] extra).

OpenTelemetry instrumentation functions for SQLAlchemy and asyncpg.

instrument_sqlalchemy(engine=None, **kwargs)

Instrument SQLAlchemy engine for OpenTelemetry tracing.

Automatically traces all SQLAlchemy operations including queries, commits, and rollbacks.

Parameters:

Name Type Description Default
engine Any | None

Optional SQLAlchemy engine to instrument. If None, all engines.

None
**kwargs Any

Additional keyword arguments passed to SQLAlchemyInstrumentor.

{}

Raises:

Type Description
ImportError

If opentelemetry-instrumentation-sqlalchemy is not installed.

Examples:

>>> from sqlalchemy import create_engine
>>> from sqlalchemy_foundation_kit.contrib.telemetry import instrument_sqlalchemy
>>> engine = create_engine("postgresql://...")
>>> instrument_sqlalchemy(engine=engine)
Source code in sqlalchemy_foundation_kit/contrib/telemetry/instrumentations.py
def instrument_sqlalchemy(engine: Any | None = None, **kwargs: Any) -> None:
    """Instrument SQLAlchemy engine for OpenTelemetry tracing.

    Automatically traces all SQLAlchemy operations including queries,
    commits, and rollbacks.

    Args:
        engine: Optional SQLAlchemy engine to instrument. If None, all engines.
        **kwargs: Additional keyword arguments passed to SQLAlchemyInstrumentor.

    Raises:
        ImportError: If opentelemetry-instrumentation-sqlalchemy is not installed.

    Examples:
        >>> from sqlalchemy import create_engine
        >>> from sqlalchemy_foundation_kit.contrib.telemetry import instrument_sqlalchemy
        >>> engine = create_engine("postgresql://...")
        >>> instrument_sqlalchemy(engine=engine)
    """
    try:
        from opentelemetry.instrumentation.sqlalchemy import (  # noqa: PLC0415
            SQLAlchemyInstrumentor,
        )
    except ImportError as e:
        raise ImportError(
            "opentelemetry-instrumentation-sqlalchemy not installed. "
            "Install with: pip install 'sqlalchemy-foundation-kit[telemetry]'"
        ) from e

    call_kwargs: dict[str, Any] = dict(kwargs)
    if engine is not None:
        call_kwargs["engine"] = engine

    SQLAlchemyInstrumentor().instrument(**call_kwargs)

instrument_asyncpg(**kwargs)

Instrument asyncpg connections for OpenTelemetry tracing.

Automatically traces all asyncpg database operations at the connection level.

Parameters:

Name Type Description Default
**kwargs Any

Additional keyword arguments passed to AsyncPGInstrumentor.

{}

Raises:

Type Description
ImportError

If opentelemetry-instrumentation-asyncpg is not installed.

Examples:

>>> from sqlalchemy_foundation_kit.contrib.telemetry import instrument_asyncpg
>>> instrument_asyncpg()
Source code in sqlalchemy_foundation_kit/contrib/telemetry/instrumentations.py
def instrument_asyncpg(**kwargs: Any) -> None:
    """Instrument asyncpg connections for OpenTelemetry tracing.

    Automatically traces all asyncpg database operations at the connection level.

    Args:
        **kwargs: Additional keyword arguments passed to AsyncPGInstrumentor.

    Raises:
        ImportError: If opentelemetry-instrumentation-asyncpg is not installed.

    Examples:
        >>> from sqlalchemy_foundation_kit.contrib.telemetry import instrument_asyncpg
        >>> instrument_asyncpg()
    """
    try:
        from opentelemetry.instrumentation.asyncpg import (  # noqa: PLC0415
            AsyncPGInstrumentor,
        )
    except ImportError as e:
        raise ImportError(
            "opentelemetry-instrumentation-asyncpg not installed. "
            "Install with: pip install 'sqlalchemy-foundation-kit[telemetry]'"
        ) from e

    AsyncPGInstrumentor().instrument(**kwargs)

Unit of Work with OpenTelemetry tracing support.

TracedAsyncUnitOfWork

Bases: AsyncSQLAlchemyUnitOfWork[T], Generic[T]

Unit of Work with automatic OpenTelemetry tracing.

Automatically creates spans for transaction() and query() operations, including transaction attributes (isolation level, duration, outcome).

Example

from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider

trace.set_tracer_provider(TracerProvider())

uow = TracedAsyncUnitOfWork( session_maker=session_maker, transaction_factory=MyTransaction, service_name="my-service", )

async with uow.transaction() as tx: # This operation is automatically traced user = await tx.users.create(...)

Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
class TracedAsyncUnitOfWork(AsyncSQLAlchemyUnitOfWork[T], Generic[T]):
    """Unit of Work with automatic OpenTelemetry tracing.

    Automatically creates spans for transaction() and query() operations,
    including transaction attributes (isolation level, duration, outcome).

    Example:
        from opentelemetry import trace
        from opentelemetry.sdk.trace import TracerProvider

        trace.set_tracer_provider(TracerProvider())

        uow = TracedAsyncUnitOfWork(
            session_maker=session_maker,
            transaction_factory=MyTransaction,
            service_name="my-service",
        )

        async with uow.transaction() as tx:
            # This operation is automatically traced
            user = await tx.users.create(...)
    """

    def __init__(
        self,
        session_maker: async_sessionmaker[AsyncSession],
        transaction_factory: Callable[[AsyncSession], T],
        service_name: str = "sqlalchemy-foundation-kit",
        *,
        flush_before_commit: bool = True,
    ) -> None:
        """Initialize traced unit of work.

        Args:
            session_maker: SQLAlchemy async session maker.
            transaction_factory: Factory function to create transaction objects.
            service_name: Service name for OpenTelemetry tracer.
            flush_before_commit: Default ``flush_before_commit`` policy applied when
                :meth:`transaction` is called without an explicit override.
        """
        super().__init__(session_maker, transaction_factory, flush_before_commit=flush_before_commit)
        self._tracer: Tracer | None = trace.get_tracer(service_name) if HAS_OTEL else None

    @asynccontextmanager
    async def _traced(
        self,
        operation: str,
        context_manager: AbstractAsyncContextManager[T],
        attributes: dict[str, str | bool] | None = None,
    ) -> AsyncIterator[T]:
        """Generic tracing wrapper for UoW operations.

        Args:
            operation: Operation name (e.g., "transaction", "query").
            context_manager: Async context manager to wrap with tracing.
            attributes: Optional span attributes to set.

        Yields:
            Transaction object from the wrapped context manager.
        """
        if not self._tracer:
            # No tracing available, delegate to wrapped context manager
            async with context_manager as result:
                yield result
            return

        span: Span = self._tracer.start_span(f"uow.{operation}")
        span.set_attribute("db.operation", operation)

        if attributes:
            for key, value in attributes.items():
                span.set_attribute(key, value)

        try:
            async with context_manager as result:
                yield result

            span.set_status(Status(StatusCode.OK))

        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

        finally:
            span.end()

    @asynccontextmanager
    async def transaction(
        self,
        isolation_level: IsolationLevel | str | None = None,
        flush_before_commit: bool | None = None,
    ) -> AsyncIterator[T]:
        """Create a new transaction context with tracing.

        Automatically creates a span named "uow.transaction" with attributes:
        - db.operation: "transaction"
        - db.isolation_level: The isolation level (if specified)

        Args:
            isolation_level: Optional transaction isolation level.
            flush_before_commit: If True, flush before commit.

        Yields:
            Transaction object with repositories.
        """
        attributes: dict[str, str | bool] = {}
        if isolation_level:
            attributes["db.isolation_level"] = str(isolation_level)

        async with self._traced(
            operation="transaction",
            context_manager=super().transaction(
                isolation_level=isolation_level,
                flush_before_commit=flush_before_commit,
            ),
            attributes=attributes,
        ) as tx:
            yield tx

    @asynccontextmanager
    async def managed_session(
        self,
        isolation_level: IsolationLevel | str | None = None,
    ) -> AsyncIterator[tuple[T, AsyncSession]]:
        """Create a session with manual transaction control and tracing.

        Automatically creates a span named "uow.managed_session" with attributes:
        - db.operation: "managed_session"
        - db.isolation_level: The isolation level (if specified)

        Args:
            isolation_level: Optional transaction isolation level.

        Yields:
            Tuple of (transaction object, session) for manual control.
        """
        attributes: dict[str, str | bool] = {}
        if isolation_level:
            attributes["db.isolation_level"] = str(isolation_level)

        if not self._tracer:
            # No tracing available, delegate directly
            async with super().managed_session(isolation_level=isolation_level) as result:
                yield result
            return

        span: Span = self._tracer.start_span("uow.managed_session")
        span.set_attribute("db.operation", "managed_session")

        if attributes:
            for key, value in attributes.items():
                span.set_attribute(key, value)

        try:
            async with super().managed_session(isolation_level=isolation_level) as result:
                yield result

            span.set_status(Status(StatusCode.OK))

        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

        finally:
            span.end()

    @asynccontextmanager
    async def query(
        self,
        isolation_level: IsolationLevel | str | None = None,
    ) -> AsyncIterator[T]:
        """Create a read-only query context with tracing.

        Automatically creates a span named "uow.query" with attributes:
        - db.operation: "query"
        - db.isolation_level: The isolation level (if specified)

        Args:
            isolation_level: Optional transaction isolation level.

        Yields:
            Query object with repositories.
        """
        attributes: dict[str, str | bool] = {}
        if isolation_level:
            attributes["db.isolation_level"] = str(isolation_level)

        async with self._traced(
            operation="query",
            context_manager=super().query(isolation_level=isolation_level),
            attributes=attributes,
        ) as qx:
            yield qx
__init__(session_maker, transaction_factory, service_name='sqlalchemy-foundation-kit', *, flush_before_commit=True)

Initialize traced unit of work.

Parameters:

Name Type Description Default
session_maker async_sessionmaker[AsyncSession]

SQLAlchemy async session maker.

required
transaction_factory Callable[[AsyncSession], T]

Factory function to create transaction objects.

required
service_name str

Service name for OpenTelemetry tracer.

'sqlalchemy-foundation-kit'
flush_before_commit bool

Default flush_before_commit policy applied when :meth:transaction is called without an explicit override.

True
Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
def __init__(
    self,
    session_maker: async_sessionmaker[AsyncSession],
    transaction_factory: Callable[[AsyncSession], T],
    service_name: str = "sqlalchemy-foundation-kit",
    *,
    flush_before_commit: bool = True,
) -> None:
    """Initialize traced unit of work.

    Args:
        session_maker: SQLAlchemy async session maker.
        transaction_factory: Factory function to create transaction objects.
        service_name: Service name for OpenTelemetry tracer.
        flush_before_commit: Default ``flush_before_commit`` policy applied when
            :meth:`transaction` is called without an explicit override.
    """
    super().__init__(session_maker, transaction_factory, flush_before_commit=flush_before_commit)
    self._tracer: Tracer | None = trace.get_tracer(service_name) if HAS_OTEL else None
managed_session(isolation_level=None) async

Create a session with manual transaction control and tracing.

Automatically creates a span named "uow.managed_session" with attributes: - db.operation: "managed_session" - db.isolation_level: The isolation level (if specified)

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level.

None

Yields:

Type Description
AsyncIterator[tuple[T, AsyncSession]]

Tuple of (transaction object, session) for manual control.

Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
@asynccontextmanager
async def managed_session(
    self,
    isolation_level: IsolationLevel | str | None = None,
) -> AsyncIterator[tuple[T, AsyncSession]]:
    """Create a session with manual transaction control and tracing.

    Automatically creates a span named "uow.managed_session" with attributes:
    - db.operation: "managed_session"
    - db.isolation_level: The isolation level (if specified)

    Args:
        isolation_level: Optional transaction isolation level.

    Yields:
        Tuple of (transaction object, session) for manual control.
    """
    attributes: dict[str, str | bool] = {}
    if isolation_level:
        attributes["db.isolation_level"] = str(isolation_level)

    if not self._tracer:
        # No tracing available, delegate directly
        async with super().managed_session(isolation_level=isolation_level) as result:
            yield result
        return

    span: Span = self._tracer.start_span("uow.managed_session")
    span.set_attribute("db.operation", "managed_session")

    if attributes:
        for key, value in attributes.items():
            span.set_attribute(key, value)

    try:
        async with super().managed_session(isolation_level=isolation_level) as result:
            yield result

        span.set_status(Status(StatusCode.OK))

    except Exception as e:
        span.set_status(Status(StatusCode.ERROR, str(e)))
        span.record_exception(e)
        raise

    finally:
        span.end()
open_session(isolation_level=None) async

Open a session with optional isolation level applied.

This is the extension point for subclasses that need custom session setup (e.g., RLS context, session-level GUCs, custom statement timeouts). Override to wrap or augment session creation while preserving isolation handling.

Used internally by :meth:transaction and :meth:query.

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level.

None

Yields:

Type Description
AsyncIterator[AsyncSession]

Configured AsyncSession instance.

Raises:

Type Description
ValueError

If isolation_level is not supported.

Examples:

Subclass that sets a session-level GUC for every transaction:

class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
    def __init__(self, session_maker, tx_factory, tenant_id):
        super().__init__(session_maker, tx_factory)
        self._tenant_id = tenant_id

    @asynccontextmanager
    async def open_session(self, isolation_level=None):
        async with super().open_session(isolation_level) as session:
            await session.execute(
                text("SET app.tenant_id = :tid"),
                {"tid": self._tenant_id},
            )
            yield session
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
@asynccontextmanager
async def open_session(
    self,
    isolation_level: IsolationLevel | str | None = None,
) -> AsyncIterator[AsyncSession]:
    """Open a session with optional isolation level applied.

    This is the extension point for subclasses that need custom session setup
    (e.g., RLS context, session-level GUCs, custom statement timeouts).
    Override to wrap or augment session creation while preserving isolation handling.

    Used internally by :meth:`transaction` and :meth:`query`.

    Args:
        isolation_level: Optional transaction isolation level.

    Yields:
        Configured AsyncSession instance.

    Raises:
        ValueError: If isolation_level is not supported.

    Examples:
        Subclass that sets a session-level GUC for every transaction:

            class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
                def __init__(self, session_maker, tx_factory, tenant_id):
                    super().__init__(session_maker, tx_factory)
                    self._tenant_id = tenant_id

                @asynccontextmanager
                async def open_session(self, isolation_level=None):
                    async with super().open_session(isolation_level) as session:
                        await session.execute(
                            text("SET app.tenant_id = :tid"),
                            {"tid": self._tenant_id},
                        )
                        yield session
    """
    async with self._session_maker() as session:
        # Apply isolation level if specified (DRY: using utility function)
        await apply_isolation_level(session, isolation_level)
        yield session
query(isolation_level=None) async

Create a read-only query context with tracing.

Automatically creates a span named "uow.query" with attributes: - db.operation: "query" - db.isolation_level: The isolation level (if specified)

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level.

None

Yields:

Type Description
AsyncIterator[T]

Query object with repositories.

Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
@asynccontextmanager
async def query(
    self,
    isolation_level: IsolationLevel | str | None = None,
) -> AsyncIterator[T]:
    """Create a read-only query context with tracing.

    Automatically creates a span named "uow.query" with attributes:
    - db.operation: "query"
    - db.isolation_level: The isolation level (if specified)

    Args:
        isolation_level: Optional transaction isolation level.

    Yields:
        Query object with repositories.
    """
    attributes: dict[str, str | bool] = {}
    if isolation_level:
        attributes["db.isolation_level"] = str(isolation_level)

    async with self._traced(
        operation="query",
        context_manager=super().query(isolation_level=isolation_level),
        attributes=attributes,
    ) as qx:
        yield qx
transaction(isolation_level=None, flush_before_commit=None) async

Create a new transaction context with tracing.

Automatically creates a span named "uow.transaction" with attributes: - db.operation: "transaction" - db.isolation_level: The isolation level (if specified)

Parameters:

Name Type Description Default
isolation_level IsolationLevel | str | None

Optional transaction isolation level.

None
flush_before_commit bool | None

If True, flush before commit.

None

Yields:

Type Description
AsyncIterator[T]

Transaction object with repositories.

Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
@asynccontextmanager
async def transaction(
    self,
    isolation_level: IsolationLevel | str | None = None,
    flush_before_commit: bool | None = None,
) -> AsyncIterator[T]:
    """Create a new transaction context with tracing.

    Automatically creates a span named "uow.transaction" with attributes:
    - db.operation: "transaction"
    - db.isolation_level: The isolation level (if specified)

    Args:
        isolation_level: Optional transaction isolation level.
        flush_before_commit: If True, flush before commit.

    Yields:
        Transaction object with repositories.
    """
    attributes: dict[str, str | bool] = {}
    if isolation_level:
        attributes["db.isolation_level"] = str(isolation_level)

    async with self._traced(
        operation="transaction",
        context_manager=super().transaction(
            isolation_level=isolation_level,
            flush_before_commit=flush_before_commit,
        ),
        attributes=attributes,
    ) as tx:
        yield tx

Engine Utilities

Low-level engine configuration utilities.

SQLAlchemy engine configuration utilities.

PoolClassStr = Literal['null', 'queue', 'singleton_thread', 'async_adapted_queue', 'fallback_async_adapted_queue', 'static'] module-attribute

PoolRegistry

Registry for SQLAlchemy pool classes.

Provides a centralized registry for pool classes that follows the Open/Closed principle: - Open for extension: custom pools can be registered via :meth:register - Closed for modification: built-in pools are immutable

This design allows library users to register custom pool implementations without modifying library code.

Examples:

Register a custom pool class: >>> class MyCustomPool(QueuePool): ... pass >>> PoolRegistry.register("custom", MyCustomPool) >>> pool = PoolRegistry.resolve("custom")

Override built-in pool (not recommended, but possible): >>> PoolRegistry.register("queue", MyCustomQueuePool, override=True)

Source code in sqlalchemy_foundation_kit/base/engine.py
class PoolRegistry:
    """Registry for SQLAlchemy pool classes.

    Provides a centralized registry for pool classes that follows the Open/Closed principle:
    - Open for extension: custom pools can be registered via :meth:`register`
    - Closed for modification: built-in pools are immutable

    This design allows library users to register custom pool implementations without
    modifying library code.

    Examples:
        Register a custom pool class:
            >>> class MyCustomPool(QueuePool):
            ...     pass
            >>> PoolRegistry.register("custom", MyCustomPool)
            >>> pool = PoolRegistry.resolve("custom")

        Override built-in pool (not recommended, but possible):
            >>> PoolRegistry.register("queue", MyCustomQueuePool, override=True)
    """

    _pools: ClassVar[dict[str, type]] = {
        "null": NullPool,
        "queue": QueuePool,
        "singleton_thread": SingletonThreadPool,
        "async_adapted_queue": AsyncAdaptedQueuePool,
        "fallback_async_adapted_queue": FallbackAsyncAdaptedQueuePool,
        "static": StaticPool,
    }

    @classmethod
    def register(cls, name: str, pool_class: type, *, override: bool = False) -> None:
        """Register a custom pool class.

        Args:
            name: Pool class identifier (lowercase recommended).
            pool_class: Pool class type to register.
            override: If True, allows overriding built-in pools (use with caution).

        Raises:
            ValueError: If name already exists and override=False.

        Examples:
            >>> PoolRegistry.register("my_pool", MyCustomPool)
        """
        if name in cls._pools and not override:
            raise ValueError(
                f"Pool class '{name}' is already registered. "
                f"Use override=True to replace it (not recommended for built-ins)."
            )
        cls._pools[name] = pool_class

    @classmethod
    def resolve(cls, name: str) -> type:
        """Resolve pool class by name.

        Args:
            name: Pool class identifier.

        Returns:
            Pool class type.

        Raises:
            ValueError: If pool class name is not registered.

        Examples:
            >>> pool = PoolRegistry.resolve("queue")
            >>> pool
            <class 'sqlalchemy.pool.QueuePool'>
        """
        try:
            return cls._pools[name.lower()]
        except KeyError as e:
            available = ", ".join(sorted(cls._pools.keys()))
            raise ValueError(f"Unknown pool class: {name}. Available: {available}") from e

    @classmethod
    def list_available(cls) -> list[str]:
        """List all registered pool class names.

        Returns:
            Sorted list of registered pool names.

        Examples:
            >>> PoolRegistry.list_available()
            ['async_adapted_queue', 'fallback_async_adapted_queue', 'null', 'queue', ...]
        """
        return sorted(cls._pools.keys())

list_available() classmethod

List all registered pool class names.

Returns:

Type Description
list[str]

Sorted list of registered pool names.

Examples:

>>> PoolRegistry.list_available()
['async_adapted_queue', 'fallback_async_adapted_queue', 'null', 'queue', ...]
Source code in sqlalchemy_foundation_kit/base/engine.py
@classmethod
def list_available(cls) -> list[str]:
    """List all registered pool class names.

    Returns:
        Sorted list of registered pool names.

    Examples:
        >>> PoolRegistry.list_available()
        ['async_adapted_queue', 'fallback_async_adapted_queue', 'null', 'queue', ...]
    """
    return sorted(cls._pools.keys())

register(name, pool_class, *, override=False) classmethod

Register a custom pool class.

Parameters:

Name Type Description Default
name str

Pool class identifier (lowercase recommended).

required
pool_class type

Pool class type to register.

required
override bool

If True, allows overriding built-in pools (use with caution).

False

Raises:

Type Description
ValueError

If name already exists and override=False.

Examples:

>>> PoolRegistry.register("my_pool", MyCustomPool)
Source code in sqlalchemy_foundation_kit/base/engine.py
@classmethod
def register(cls, name: str, pool_class: type, *, override: bool = False) -> None:
    """Register a custom pool class.

    Args:
        name: Pool class identifier (lowercase recommended).
        pool_class: Pool class type to register.
        override: If True, allows overriding built-in pools (use with caution).

    Raises:
        ValueError: If name already exists and override=False.

    Examples:
        >>> PoolRegistry.register("my_pool", MyCustomPool)
    """
    if name in cls._pools and not override:
        raise ValueError(
            f"Pool class '{name}' is already registered. "
            f"Use override=True to replace it (not recommended for built-ins)."
        )
    cls._pools[name] = pool_class

resolve(name) classmethod

Resolve pool class by name.

Parameters:

Name Type Description Default
name str

Pool class identifier.

required

Returns:

Type Description
type

Pool class type.

Raises:

Type Description
ValueError

If pool class name is not registered.

Examples:

>>> pool = PoolRegistry.resolve("queue")
>>> pool
<class 'sqlalchemy.pool.QueuePool'>
Source code in sqlalchemy_foundation_kit/base/engine.py
@classmethod
def resolve(cls, name: str) -> type:
    """Resolve pool class by name.

    Args:
        name: Pool class identifier.

    Returns:
        Pool class type.

    Raises:
        ValueError: If pool class name is not registered.

    Examples:
        >>> pool = PoolRegistry.resolve("queue")
        >>> pool
        <class 'sqlalchemy.pool.QueuePool'>
    """
    try:
        return cls._pools[name.lower()]
    except KeyError as e:
        available = ", ".join(sorted(cls._pools.keys()))
        raise ValueError(f"Unknown pool class: {name}. Available: {available}") from e

build_engine_kwargs(echo, poolclass, isolation_level, pool_settings, connect_args, extra_kwargs, use_orjson=False)

Build SQLAlchemy engine keyword arguments.

Parameters:

Name Type Description Default
echo bool

If True, SQLAlchemy will log all SQL statements.

required
poolclass type

SQLAlchemy pool class.

required
isolation_level str | None

Default transaction isolation level.

required
pool_settings PoolSettingsProtocol | None

Pool configuration settings (validated by caller, e.g., Pydantic).

required
connect_args dict[str, object] | None

Arguments passed to the database driver.

required
extra_kwargs dict[str, object]

Additional keyword arguments for create_async_engine.

required
use_orjson bool

If True, use orjson for JSON serialization.

False

Returns:

Type Description
dict[str, object]

Dictionary of engine keyword arguments ready for create_async_engine().

Raises:

Type Description
ImportError

If use_orjson is True but orjson is not installed.

Examples:

>>> kwargs = build_engine_kwargs(
...     echo=False,
...     poolclass=NullPool,
...     isolation_level=None,
...     pool_settings=None,
...     connect_args=None,
...     extra_kwargs={},
...     use_orjson=False,
... )
>>> kwargs["echo"]
False
Source code in sqlalchemy_foundation_kit/base/engine.py
def build_engine_kwargs(
    echo: bool,
    poolclass: type,
    isolation_level: str | None,
    pool_settings: PoolSettingsProtocol | None,
    connect_args: dict[str, object] | None,
    extra_kwargs: dict[str, object],
    use_orjson: bool = False,
) -> dict[str, object]:
    """Build SQLAlchemy engine keyword arguments.

    Args:
        echo: If True, SQLAlchemy will log all SQL statements.
        poolclass: SQLAlchemy pool class.
        isolation_level: Default transaction isolation level.
        pool_settings: Pool configuration settings (validated by caller, e.g., Pydantic).
        connect_args: Arguments passed to the database driver.
        extra_kwargs: Additional keyword arguments for create_async_engine.
        use_orjson: If True, use orjson for JSON serialization.

    Returns:
        Dictionary of engine keyword arguments ready for create_async_engine().

    Raises:
        ImportError: If use_orjson is True but orjson is not installed.

    Examples:
        >>> kwargs = build_engine_kwargs(
        ...     echo=False,
        ...     poolclass=NullPool,
        ...     isolation_level=None,
        ...     pool_settings=None,
        ...     connect_args=None,
        ...     extra_kwargs={},
        ...     use_orjson=False,
        ... )
        >>> kwargs["echo"]
        False
    """
    engine_kwargs: dict[str, object] = {
        "echo": echo,
        "poolclass": poolclass,
        "isolation_level": isolation_level,
        "pool_pre_ping": pool_settings.pre_ping if pool_settings else True,
    }

    if use_orjson:
        from .serialization import configure_orjson_serialization  # noqa: PLC0415

        engine_kwargs.update(configure_orjson_serialization())

    if pool_settings:
        _apply_pool_settings(engine_kwargs, poolclass, pool_settings)

    if connect_args:
        engine_kwargs["connect_args"] = {k: v for k, v in connect_args.items() if v is not None}

    if extra_kwargs:
        engine_kwargs.update(extra_kwargs)

    return engine_kwargs

resolve_pool_class(poolclass)

Resolve pool class from string name or return the class directly.

Parameters:

Name Type Description Default
poolclass PoolClassStr | str | type

Pool class name (e.g., "null", "queue") or actual class type.

required

Returns:

Type Description
type

Pool class type.

Raises:

Type Description
ValueError

If pool class name is not recognized.

Source code in sqlalchemy_foundation_kit/base/engine.py
def resolve_pool_class(poolclass: PoolClassStr | str | type) -> type:
    """Resolve pool class from string name or return the class directly.

    Args:
        poolclass: Pool class name (e.g., "null", "queue") or actual class type.

    Returns:
        Pool class type.

    Raises:
        ValueError: If pool class name is not recognized.
    """
    if isinstance(poolclass, str):
        return PoolRegistry.resolve(poolclass)

    return poolclass

register_pool_class(name, pool_class, *, override=False)

Register a custom pool class.

Convenience wrapper around :meth:PoolRegistry.register for users who prefer a functional API over the class-based one.

Parameters:

Name Type Description Default
name str

Pool class identifier (lowercase recommended).

required
pool_class type

Pool class type to register.

required
override bool

If True, allows overriding built-in pools (use with caution).

False

Raises:

Type Description
ValueError

If name already exists and override=False.

Source code in sqlalchemy_foundation_kit/base/engine.py
def register_pool_class(name: str, pool_class: type, *, override: bool = False) -> None:
    """Register a custom pool class.

    Convenience wrapper around :meth:`PoolRegistry.register` for users who prefer
    a functional API over the class-based one.

    Args:
        name: Pool class identifier (lowercase recommended).
        pool_class: Pool class type to register.
        override: If True, allows overriding built-in pools (use with caution).

    Raises:
        ValueError: If name already exists and override=False.
    """
    PoolRegistry.register(name, pool_class, override=override)

ORM metadata loading utilities.

load_orm_metadata(models_modules, metadata=None)

Load all ORM models metadata synchronously.

Imports specified modules to ensure that all SQLAlchemy models are registered in the metadata. This is useful for migrations and schema introspection with tools like Alembic.

Parameters:

Name Type Description Default
models_modules Iterable[str]

Iterable of module paths to import (e.g., ["myapp.models", "myapp.core.models"]).

required
metadata MetaData | None

Optional specific MetaData object to use. If None, uses Base.metadata.

None

Returns:

Type Description
MetaData

MetaData object containing all registered models from the imported modules.

Examples:

Load models from multiple modules: >>> from sqlalchemy_foundation_kit.base import load_orm_metadata >>> metadata = load_orm_metadata([ ... "myapp.users.models", ... "myapp.orders.models", ... "myapp.products.models", ... ]) >>> len(metadata.tables) 15

Use with custom metadata: >>> from sqlalchemy import MetaData >>> custom_meta = MetaData(schema="public") >>> metadata = load_orm_metadata(["myapp.models"], metadata=custom_meta)

Typical usage in Alembic env.py: >>> from sqlalchemy_foundation_kit.base import Base, load_orm_metadata >>> target_metadata = Base.metadata >>> load_orm_metadata(["myapp.models"]) # Register all models >>> # Now target_metadata.tables contains all tables

Source code in sqlalchemy_foundation_kit/base/metadata.py
def load_orm_metadata(models_modules: Iterable[str], metadata: MetaData | None = None) -> MetaData:
    """Load all ORM models metadata synchronously.

    Imports specified modules to ensure that all SQLAlchemy models are
    registered in the metadata. This is useful for migrations and schema
    introspection with tools like Alembic.

    Args:
        models_modules: Iterable of module paths to import (e.g., ["myapp.models", "myapp.core.models"]).
        metadata: Optional specific MetaData object to use. If None, uses Base.metadata.

    Returns:
        MetaData object containing all registered models from the imported modules.

    Examples:
        Load models from multiple modules:
            >>> from sqlalchemy_foundation_kit.base import load_orm_metadata
            >>> metadata = load_orm_metadata([
            ...     "myapp.users.models",
            ...     "myapp.orders.models",
            ...     "myapp.products.models",
            ... ])
            >>> len(metadata.tables)
            15

        Use with custom metadata:
            >>> from sqlalchemy import MetaData
            >>> custom_meta = MetaData(schema="public")
            >>> metadata = load_orm_metadata(["myapp.models"], metadata=custom_meta)

        Typical usage in Alembic env.py:
            >>> from sqlalchemy_foundation_kit.base import Base, load_orm_metadata
            >>> target_metadata = Base.metadata
            >>> load_orm_metadata(["myapp.models"])  # Register all models
            >>> # Now target_metadata.tables contains all tables
    """
    for module in models_modules:
        import_module(module)

    return metadata if metadata is not None else Base.metadata

JSON serialization utilities for SQLAlchemy.

configure_orjson_serialization()

Configure orjson serialization for SQLAlchemy engine.

Returns:

Type Description
dict[str, object]

Dictionary with json_serializer and json_deserializer configured.

Raises:

Type Description
ImportError

If orjson is not installed.

Examples:

>>> config = configure_orjson_serialization()
>>> "json_serializer" in config
True
>>> "json_deserializer" in config
True
Source code in sqlalchemy_foundation_kit/base/serialization.py
def configure_orjson_serialization() -> dict[str, object]:
    """Configure orjson serialization for SQLAlchemy engine.

    Returns:
        Dictionary with json_serializer and json_deserializer configured.

    Raises:
        ImportError: If orjson is not installed.

    Examples:
        >>> config = configure_orjson_serialization()
        >>> "json_serializer" in config
        True
        >>> "json_deserializer" in config
        True
    """
    orjson = require_optional("orjson", "json")

    return {
        "json_serializer": _json_serializer,
        "json_deserializer": orjson.loads,
    }

Type Hints

Internal type hints used across the library.

Shared type variables for the library.

Centralized location for all TypeVars to avoid duplication and ensure consistency.

SessionT = TypeVar('SessionT', bound='AsyncSession') module-attribute

T = TypeVar('T', bound='AsyncUowTransaction') module-attribute

T_co = TypeVar('T_co', bound='AsyncUowTransaction', covariant=True) module-attribute

__all__ = ['SessionT', 'T', 'T_co'] module-attribute

AsyncUowTransaction

Bases: Protocol

Transaction-scoped repositories container.

Intended to be extended by concrete transaction types that expose repository attributes for a specific bounded context.

Source code in sqlalchemy_foundation_kit/uow/protocols.py
class AsyncUowTransaction(Protocol):
    """Transaction-scoped repositories container.

    Intended to be extended by concrete transaction types that expose
    repository attributes for a specific bounded context.
    """