API Reference¶
Complete API documentation auto-generated from source code.
Core Module¶
The main sqlalchemy_foundation_kit module exports all core functionality.
sqlalchemy_foundation_kit
¶
Foundation layer for SQLAlchemy-based services with UoW, session management, and observability.
UnConstrainedEnum = partial(Enum, native_enum=False, create_constraint=False, validate_strings=True, values_callable=_extract_enum_values)
module-attribute
¶
GenericJSONDict = dict[str, Any]
module-attribute
¶
DB_NAMING_CONVENTION = {'ix': '%(column_0_label)s_idx', 'uq': '%(table_name)s_%(column_0_name)s_key', 'ck': '%(table_name)s_%(constraint_name)s_check', 'fk': '%(table_name)s_%(column_0_name)s_fkey', 'pk': '%(table_name)s_pkey'}
module-attribute
¶
Base
¶
Bases: DeclarativeBase
Base class for all ORM models.
Source code in sqlalchemy_foundation_kit/base/models.py
BaseTable
¶
DatetimeColumnsMixin
¶
Mixin for tables that need created_at and updated_at timestamps.
Control indexing via created_at_index and updated_at_index class variables in the model. Defaults to False.
Source code in sqlalchemy_foundation_kit/base/models.py
PydanticJSONB
¶
Bases: TypeDecorator
SQLAlchemy TypeDecorator for Pydantic models stored as JSONB.
Validates and serializes values against the supplied Pydantic-compatible type
on both write and read paths. A model_type is required — if you need
raw dict storage without validation, use SQLAlchemy's built-in JSONB directly
(or the :data:GenericJSONDict alias).
Source code in sqlalchemy_foundation_kit/base/types.py
__init__(model_type, *args, **kwargs)
¶
Initialize the type decorator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_type
|
type[T]
|
Pydantic model class (or any type compatible with
|
required |
Source code in sqlalchemy_foundation_kit/base/types.py
AsyncSessionManager
¶
Manages async database sessions with configurable connection pooling.
Supports two initialization approaches:
- Direct constructor — all parameters in constructor with defaults.
- Builder pattern — use :class:
AsyncSessionManagerBuilderfor more readable complex configurations.
Source code in sqlalchemy_foundation_kit/session/manager.py
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 | |
engine
property
¶
Get the underlying engine.
session_maker
property
¶
Get the session maker.
__aenter__()
async
¶
__aexit__(exc_type, exc_val, exc_tb)
async
¶
Close the engine on exit.
__init__(url, echo=False, poolclass='null', session_class=None, expire_on_commit=False, connect_args=None, isolation_level=None, pool_settings=None, use_orjson=False, metrics=None, on_engine_created=None, dispose_timeout=DEFAULT_DISPOSE_TIMEOUT_SECONDS, **kwargs)
¶
Initialize session manager with direct configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Database connection URL (required). |
required |
echo
|
bool
|
If True, SQLAlchemy will log all SQL statements (default: False). |
False
|
poolclass
|
str | type
|
SQLAlchemy pool class or name (default: "null"). Use "queue" for production, "null" for testing. |
'null'
|
session_class
|
type[SessionT] | None
|
Custom |
None
|
expire_on_commit
|
bool
|
If True, objects expire after commit (default: False). |
False
|
connect_args
|
dict[str, object] | None
|
Arguments passed to the database driver (default: None). |
None
|
isolation_level
|
str | None
|
Default transaction isolation level (default: None). |
None
|
pool_settings
|
PoolSettingsProtocol | None
|
Pool configuration settings (default: None). |
None
|
use_orjson
|
bool
|
If True, use orjson for JSON serialization (default: False). |
False
|
metrics
|
PostgresMetricsProtocol | None
|
Optional metrics collector (default: None). |
None
|
on_engine_created
|
Callable[[AsyncEngine], None] | None
|
Optional callback invoked with |
None
|
dispose_timeout
|
float
|
Maximum seconds to wait for engine disposal in :meth: |
DEFAULT_DISPOSE_TIMEOUT_SECONDS
|
**kwargs
|
object
|
Additional keyword arguments for |
{}
|
Source code in sqlalchemy_foundation_kit/session/manager.py
aclose()
async
¶
Close the engine and all connections.
Source code in sqlalchemy_foundation_kit/session/manager.py
get_session()
async
¶
Get a new database session.
get_transaction(isolation_level=None)
async
¶
Get a new database session with automatic transaction management.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
str | None
|
Optional isolation level for the transaction. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[SessionT]
|
Managed async session with active transaction. |
Source code in sqlalchemy_foundation_kit/session/manager.py
AsyncSessionManagerBuilder
¶
Builder for AsyncSessionManager construction.
Provides a fluent API for configuring AsyncSessionManager with many optional parameters. This pattern improves code readability and follows KISS principle by avoiding constructors with 10+ parameters.
Examples:
Basic usage: >>> manager = ( ... AsyncSessionManagerBuilder("postgresql+asyncpg://...") ... .with_pool("queue") ... .with_echo(True) ... .build() ... )
Advanced configuration: >>> manager = ( ... AsyncSessionManagerBuilderCustomSession ... .with_session_class(CustomSession) ... .with_pool("queue", pool_settings=settings.pool) ... .with_metrics(metrics) ... .with_callbacks(on_engine_created=instrument_engine) ... .with_json_serialization(orjson=True) ... .with_isolation_level("READ COMMITTED") ... .build() ... )
Reusable configuration: >>> builder = ( ... AsyncSessionManagerBuilder("postgresql+asyncpg://...") ... .with_pool("queue") ... .with_metrics(metrics) ... ) >>> manager1 = builder.with_echo(True).build() >>> manager2 = builder.with_echo(False).build()
Source code in sqlalchemy_foundation_kit/session/builder.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 | |
__init__(url)
¶
Initialize builder with database URL (required).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Database connection URL (required parameter). |
required |
Source code in sqlalchemy_foundation_kit/session/builder.py
build()
¶
Build AsyncSessionManager instance with configured parameters.
Returns:
| Type | Description |
|---|---|
AsyncSessionManager[SessionT]
|
Configured AsyncSessionManager instance. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If use_orjson=True but orjson is not installed. |
Examples:
>>> manager = builder.build()
>>> async with manager.get_session() as session:
... await session.execute(...)
Source code in sqlalchemy_foundation_kit/session/builder.py
with_callbacks(on_engine_created)
¶
Register engine creation callback.
Useful for attaching instrumentation (OpenTelemetry), custom event listeners, or debug hooks right after engine creation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_engine_created
|
Callable[[AsyncEngine], None]
|
Callback invoked with AsyncEngine after creation. |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Examples:
>>> from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
>>> def instrument(engine):
... SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
>>> builder.with_callbacks(on_engine_created=instrument)
Source code in sqlalchemy_foundation_kit/session/builder.py
with_connect_args(**connect_args)
¶
Add database driver connection arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**connect_args
|
object
|
Arguments passed to the database driver. |
{}
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Examples:
>>> builder.with_connect_args(
... server_settings={"application_name": "myapp"},
... command_timeout=60,
... )
Source code in sqlalchemy_foundation_kit/session/builder.py
with_dispose_timeout(timeout)
¶
Configure how long :meth:AsyncSessionManager.aclose waits for engine disposal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum seconds to wait for the engine to dispose. |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_echo(echo=True)
¶
Enable SQL statement logging.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
echo
|
bool
|
If True, SQLAlchemy logs all SQL statements. |
True
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_expire_on_commit(expire=True)
¶
Configure object expiration behavior after commit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expire
|
bool
|
If True, all objects expire after commit. |
True
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_extra_kwargs(**kwargs)
¶
Add additional keyword arguments for create_async_engine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
object
|
Additional engine configuration. |
{}
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_isolation_level(isolation_level)
¶
Set default transaction isolation level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
str
|
Default isolation level (e.g., "READ COMMITTED"). |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_json_serialization(orjson=True)
¶
Configure JSON serialization backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
orjson
|
bool
|
If True, use orjson for faster JSON serialization. |
True
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If orjson=True but orjson is not installed. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_metrics(metrics)
¶
Enable connection pool metrics collection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
PostgresMetricsProtocol
|
Metrics collector implementing PostgresMetricsProtocol. |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_pool(poolclass, pool_settings=None)
¶
Configure connection pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
poolclass
|
str | type
|
Pool class name or type (e.g., "queue", "null"). |
required |
pool_settings
|
PoolSettingsProtocol | None
|
Optional pool configuration settings. |
None
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_session_class(session_class)
¶
Use custom session class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_class
|
type[SessionT]
|
Custom AsyncSession subclass. |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
AsyncCConnection
¶
Bases: Connection
Custom async connection class for pgbouncer magic.
This subclass overrides only the private method _get_unique_id so that prepared statement identifiers are unique per connection. That is required when using pgbouncer in transaction mode, where the same server connection may be reused for different logical connections.
See: https://github.com/sqlalchemy/sqlalchemy/issues/6467
Source code in sqlalchemy_foundation_kit/session/connection.py
AsyncUnitOfWork
¶
Bases: Protocol, Generic[T_co]
Provides transactional context for repository operations.
Provides three modes of operation
transaction(): For write operations with automatic commit/rollback.managed_session(): For write operations with manual commit/rollback control.query(): For read-only operations without transaction management.
Source code in sqlalchemy_foundation_kit/uow/protocols.py
managed_session(isolation_level=None)
¶
Create a session with manual transaction control.
Unlike :meth:transaction, this does NOT auto-commit on success. The caller
must explicitly call session.commit() or session.rollback(). Useful for
complex transactional logic where the commit decision depends on multiple
conditions or external factors.
The second element of the yielded tuple is the underlying session object
(typed as Any in the protocol to avoid leaking SQLAlchemy types — concrete
implementations like AsyncSQLAlchemyUnitOfWork yield AsyncSession).
On exception inside the context, the session is automatically rolled back.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
str | None
|
Optional transaction isolation level. |
None
|
Source code in sqlalchemy_foundation_kit/uow/protocols.py
query(isolation_level=None)
¶
transaction(isolation_level=None, flush_before_commit=None)
¶
Create a new transaction context with automatic commit/rollback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
str | None
|
Optional transaction isolation level. |
None
|
flush_before_commit
|
bool | None
|
If True, flush session before commit to surface
constraint violations within transaction. If |
None
|
Source code in sqlalchemy_foundation_kit/uow/protocols.py
AsyncSQLAlchemyUnitOfWork
¶
Bases: AsyncUnitOfWork[T], Generic[T]
Base async SQLAlchemy Unit of Work.
Provides transactional context for repository operations using SQLAlchemy AsyncSession.
Methods:
| Name | Description |
|---|---|
transaction |
For write operations with automatic commit/rollback. |
query |
For read-only operations without transaction management. |
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 | |
__init__(session_maker, transaction_factory, *, flush_before_commit=True)
¶
Initialize the unit of work.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_maker
|
async_sessionmaker[AsyncSession]
|
Async session factory. |
required |
transaction_factory
|
Callable[[AsyncSession], T]
|
Callable producing the transaction object exposed to callers. |
required |
flush_before_commit
|
bool
|
Default |
True
|
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
managed_session(isolation_level=None)
async
¶
Create a session with manual transaction control.
Unlike transaction(), this does NOT auto-commit. The caller must explicitly call session.commit() or session.rollback(). This is useful for complex transactional logic where commit decision depends on multiple conditions or external factors.
A transaction is started automatically, but you have full control over when to commit or rollback. If you exit without calling either, SQLAlchemy will automatically rollback on session close.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED". |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[T, AsyncSession]]
|
Tuple of (transaction object, session) for manual control. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
Manual transaction control: async with uow.managed_session() as (tx, session): await tx.users.create(...)
# Manually decide when to commit
if should_commit:
await session.commit()
else:
await session.rollback()
Conditional commit based on external service: async with uow.managed_session() as (tx, session): user = await tx.users.create(...)
# Call external service
result = await external_api.validate(user)
if result.success:
await session.commit()
else:
await session.rollback()
Multiple operations with intermediate decision: async with uow.managed_session() as (tx, session): user = await tx.users.create(...)
# First checkpoint
await session.flush()
# More operations
await tx.profiles.create(user_id=user.id)
# Final decision
await session.commit()
Note
Prefer :meth:transaction for the vast majority of use cases — it commits
automatically and enforces the UoW pattern. This method is an advanced escape
hatch for scenarios where the commit/rollback decision depends on conditions
that can only be evaluated after data is written (e.g., external service
validation). session.commit() calls belong exclusively at the use-case
boundary via this method, never inside repository implementations.
Warning
You MUST explicitly call session.commit() or session.rollback(). Exiting the context without calling either will result in automatic rollback when the session closes.
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 | |
open_session(isolation_level=None)
async
¶
Open a session with optional isolation level applied.
This is the extension point for subclasses that need custom session setup (e.g., RLS context, session-level GUCs, custom statement timeouts). Override to wrap or augment session creation while preserving isolation handling.
Used internally by :meth:transaction and :meth:query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[AsyncSession]
|
Configured AsyncSession instance. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
Subclass that sets a session-level GUC for every transaction:
class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
def __init__(self, session_maker, tx_factory, tenant_id):
super().__init__(session_maker, tx_factory)
self._tenant_id = tenant_id
@asynccontextmanager
async def open_session(self, isolation_level=None):
async with super().open_session(isolation_level) as session:
await session.execute(
text("SET app.tenant_id = :tid"),
{"tid": self._tenant_id},
)
yield session
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
query(isolation_level=None)
async
¶
Create a read-only query context without transaction management.
This method is designed for read-only operations and does not start a transaction or perform any commit/rollback. It's semantically clearer than managed_session() for read operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED". |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
Read-only query: async with uow.query() as qx: users = await qx.users.list_all() user = await qx.users.get_by_id(user_id) # No commit/rollback - just closes session
Note
While this method is intended for read-only operations, SQLAlchemy does not enforce this at the session level. It's up to the caller to ensure only read operations are performed.
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
transaction(isolation_level=None, flush_before_commit=None)
async
¶
Create a new transaction context with automatic commit/rollback.
The Unit of Work automatically commits the transaction on successful exit and rolls back on exception. This ensures atomic operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED". |
None
|
flush_before_commit
|
bool | None
|
If True, flush the session before commit to surface
constraint violations while still inside the transaction. If |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
Write operation with automatic commit: async with uow.transaction() as tx: await tx.users.create(...) # Auto-commit on exit, rollback on exception
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
AsyncUowTransaction
¶
Bases: Protocol
Transaction-scoped repositories container.
Intended to be extended by concrete transaction types that expose repository attributes for a specific bounded context.
AsyncSQLAlchemyUowTransaction
¶
Bases: AsyncUowTransaction
Base async SQLAlchemy transaction-scoped repositories.
This class provides access to the underlying SQLAlchemy session and is intended to be subclassed by services to expose specific repositories.
Example
class IdentityTransaction(AsyncSQLAlchemyUowTransaction): @property def users(self) -> UserRepository: return PostgresUserRepository(self.session)
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
session
property
¶
Get the underlying SQLAlchemy async session.
IsolationLevel
¶
Bases: StrEnum
PostgreSQL transaction isolation levels.
Values match PostgreSQL's expected form (with spaces) for use with SQLAlchemy execution_options(isolation_level=...).
Source code in sqlalchemy_foundation_kit/uow/enums.py
PostgresAdvisoryLockMixin
¶
Mixin providing PostgreSQL advisory lock support for UoW transactions.
Requires the class to have a session property returning AsyncSession.
Example
class IdentityTransaction(AsyncSQLAlchemyUowTransaction, PostgresAdvisoryLockMixin): @property def users(self) -> UserRepository: return PostgresUserRepository(self.session)
Now has access to try_advisory_lock method¶
async with uow.transaction() as tx: if await tx.try_advisory_lock(12345): # Protected operation ...
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
try_advisory_lock(key)
async
¶
Acquire a Postgres transaction-scoped advisory lock.
Delegates to :func:try_advisory_xact_lock for actual locking logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
int
|
Integer lock key. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if lock was acquired, False if already held by another session. |
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
SupportsAdvisoryLock
¶
Bases: Protocol
Capability protocol for transactions supporting advisory locks.
Use this when your transaction needs PostgreSQL advisory lock support. Not all transactions require this capability (e.g., in-memory, read-only).
Source code in sqlalchemy_foundation_kit/uow/protocols.py
try_advisory_lock(key)
async
¶
Try to acquire a transaction-scoped advisory lock identified by key.
Returns True if the lock was acquired (and is held for the rest of the
transaction), False if another transaction already holds it.
Source code in sqlalchemy_foundation_kit/uow/protocols.py
RetryConfig
dataclass
¶
Configuration for retry behavior with exponential backoff.
Attributes:
| Name | Type | Description |
|---|---|---|
max_retries |
int
|
Maximum number of attempts before giving up. |
retry_delay |
float
|
Base delay in seconds; actual delay is |
max_backoff_delay |
float
|
Maximum delay between retries in seconds. Prevents exponential backoff from growing indefinitely. |
Source code in sqlalchemy_foundation_kit/session/retry.py
PostgresSettingsProtocol
¶
Bases: Protocol
Protocol for PostgreSQL configuration.
Organized protocol with grouped settings for connection, pool, and query configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
connection |
ConnectionSettingsProtocol
|
Connection parameters (host, port, user, database). |
pool |
PoolSettingsProtocol
|
Connection pool settings. |
query |
QuerySettingsProtocol
|
Query execution and transaction settings. |
application_name |
str
|
Application identifier for connections. |
db_schema |
str | None
|
Optional PostgreSQL schema name. |
use_orjson_serialization |
bool
|
Enable orjson for JSON operations. |
jit |
str | None
|
JIT compilation setting (PgBouncer compatibility). |
Examples:
Implementing the protocol: >>> class MyConfig: ... connection: ConnectionSettingsProtocol ... pool: PoolSettingsProtocol ... query: QuerySettingsProtocol ... application_name: str = "my-app" ... db_schema: str | None = None ... use_orjson_serialization: bool = True ... jit: str | None = "off" ... ... def to_dsn(self) -> str: ... return f"postgresql://{self.connection.user}@{self.connection.host}..."
Using the protocol: >>> def create_engine(config: PostgresSettingsProtocol): ... dsn = config.to_dsn() ... pool_size = config.pool.pool_size ... echo = config.query.echo
Source code in sqlalchemy_foundation_kit/config/postgres.py
to_dsn()
¶
Convert config to DSN string.
Returns PostgreSQL connection string in format: postgresql+asyncpg://user:password@host:port/database
Examples:
Source code in sqlalchemy_foundation_kit/config/postgres.py
ConnectionSettingsProtocol
¶
Bases: Protocol
Protocol for PostgreSQL connection settings.
Defines connection parameters required for establishing database connections.
Attributes:
| Name | Type | Description |
|---|---|---|
host |
str
|
PostgreSQL server hostname or IP address. |
port |
int
|
PostgreSQL server port number. |
user |
str
|
Database username for authentication. |
password |
PasswordLike | str
|
Database password — either a plain |
database |
str
|
Target database name. |
Source code in sqlalchemy_foundation_kit/config/postgres.py
PoolSettingsProtocol
¶
Bases: Protocol
Protocol for PostgreSQL connection pool settings.
Defines pool configuration for SQLAlchemy engine connection management.
Attributes:
| Name | Type | Description |
|---|---|---|
kind |
str | type
|
Connection pool implementation (queue, static, etc.). |
size |
int | None
|
Number of connections to maintain in pool. |
max_overflow |
int | None
|
Additional connections allowed when pool exhausted. |
pre_ping |
bool
|
Test connection health before checkout. |
recycle |
int | None
|
Recycle connections after N seconds. |
timeout |
float | None
|
Timeout for acquiring connection from pool. |
Examples:
>>> pool: PoolSettingsProtocol = ...
>>> if pool.size > 100:
... logger.warning("Large pool size detected")
Source code in sqlalchemy_foundation_kit/config/postgres.py
QuerySettingsProtocol
¶
Bases: Protocol
Protocol for PostgreSQL query execution settings.
Defines query behavior, caching, and transaction isolation configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
echo |
bool
|
Enable SQL statement logging. |
statement_cache_size |
int | None
|
Prepared statement cache size. |
prepared_statement_cache_size |
int | None
|
Server-side prepared statement cache size. |
isolation_level |
str | None
|
Transaction isolation level. |
Examples:
Source code in sqlalchemy_foundation_kit/config/postgres.py
PostgresMetricsProtocol
¶
Bases: PoolStatsRecorder, CheckoutRecorder, ErrorRecorder, Protocol
Composite protocol covering all PostgreSQL metrics capabilities.
Aggregates the narrow capability protocols for convenience. Implementations that only need a subset can implement the individual protocols directly.
Examples:
>>> class MyMetrics:
... def record_pool_stats(self, pool_size: int, pool_checked_out: int, pool_overflow: int) -> None: ...
... def record_checkout(self, duration: float) -> None: ...
... def record_error(self, error_type: str, is_timeout: bool = False) -> None: ...
>>> metrics: PostgresMetricsProtocol = MyMetrics()
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
record_checkout(duration)
¶
Record a database connection checkout from the pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
duration
|
float
|
Time taken to acquire the connection from the pool, in seconds. |
required |
record_error(error_type, is_timeout=False)
¶
Record a database connection or execution error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_type
|
str
|
The type of error that occurred (e.g., "OperationalError"). |
required |
is_timeout
|
bool
|
True if this error was specifically a connection checkout timeout. |
False
|
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
record_pool_stats(pool_size, pool_checked_out, pool_overflow)
¶
Record database connection pool statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_size
|
int
|
Current total number of connections in the pool. |
required |
pool_checked_out
|
int
|
Number of connections currently in use. |
required |
pool_overflow
|
int
|
Number of connections over the configured pool_size. |
required |
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
create_async_session_manager(postgres_config, application_name=None, metrics=None, on_engine_created=None, connection_class=None, extra_server_settings=None, extra_connect_args=None, **kwargs)
¶
Create async session manager with PostgreSQL-specific configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
postgres_config
|
PostgresSettingsProtocol
|
PostgreSQL configuration implementing PostgresSettingsProtocol. |
required |
application_name
|
str | None
|
Optional custom application name. If None, uses postgres_config.application_name. |
None
|
metrics
|
PostgresMetricsProtocol | None
|
Optional metrics collector for connection pool monitoring. |
None
|
on_engine_created
|
Callable[[AsyncEngine], None] | None
|
Optional callback invoked with the AsyncEngine right after creation. Use it to attach OpenTelemetry instrumentation, custom listeners, etc. |
None
|
connection_class
|
type[Connection] | None
|
Custom asyncpg Connection subclass. Defaults to |
None
|
extra_server_settings
|
dict[str, str] | None
|
Additional PostgreSQL |
None
|
extra_connect_args
|
dict[str, object] | None
|
Additional asyncpg |
None
|
**kwargs
|
Any
|
Additional keyword arguments passed to AsyncSessionManager. |
{}
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManager[AsyncSession]
|
Configured AsyncSessionManager instance. |
Examples:
Basic usage: >>> manager = create_async_session_manager(postgres_config)
With metrics: >>> from sqlalchemy_foundation_kit.contrib.metrics import PostgresMetrics >>> manager = create_async_session_manager(postgres_config, metrics=PostgresMetrics())
With custom server settings and command timeout: >>> manager = create_async_session_manager( ... postgres_config, ... extra_server_settings={"statement_timeout": "30000", "timezone": "UTC"}, ... extra_connect_args={"command_timeout": 60}, ... )
With OpenTelemetry tracing bound to this engine: >>> from sqlalchemy_foundation_kit.contrib.telemetry import instrument_engine >>> manager = create_async_session_manager( ... postgres_config, ... on_engine_created=instrument_engine, ... )
Source code in sqlalchemy_foundation_kit/session/factories.py
try_advisory_xact_lock(session, key)
async
¶
Acquire a Postgres transaction-scoped advisory lock.
Uses pg_try_advisory_xact_lock: non-blocking, released automatically
at transaction end. String keys are hashed to integers. The key is then
truncated to signed 64-bit as Postgres expects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
SQLAlchemy AsyncSession within an active transaction. |
required |
key
|
str | int
|
Lock identifier (string or integer). Strings are hashed to integers. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if lock was acquired, False if already held by another session. |
Examples:
>>> async with session_maker() as session:
... async with session.begin():
... if await try_advisory_xact_lock(session, "my_operation"):
... # Perform protected operation
... await session.execute(...)
... await session.commit()
Source code in sqlalchemy_foundation_kit/session/locks.py
retry_async_connection(connect_func, service_name, config=DEFAULT_RETRY_CONFIG)
async
¶
Retry an async connection callable with exponential backoff.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connect_func
|
Callable[[], Awaitable[None]]
|
Callable that attempts to establish/test the connection. |
required |
service_name
|
str
|
Human-readable service name used in log messages. |
required |
config
|
RetryConfig
|
Retry behavior configuration. |
DEFAULT_RETRY_CONFIG
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If config.max_retries is less than 1. |
Exception
|
Re-raises the last exception when all attempts fail. |
Source code in sqlalchemy_foundation_kit/session/retry.py
build_engine_kwargs(echo, poolclass, isolation_level, pool_settings, connect_args, extra_kwargs, use_orjson=False)
¶
Build SQLAlchemy engine keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
echo
|
bool
|
If True, SQLAlchemy will log all SQL statements. |
required |
poolclass
|
type
|
SQLAlchemy pool class. |
required |
isolation_level
|
str | None
|
Default transaction isolation level. |
required |
pool_settings
|
PoolSettingsProtocol | None
|
Pool configuration settings (validated by caller, e.g., Pydantic). |
required |
connect_args
|
dict[str, object] | None
|
Arguments passed to the database driver. |
required |
extra_kwargs
|
dict[str, object]
|
Additional keyword arguments for create_async_engine. |
required |
use_orjson
|
bool
|
If True, use orjson for JSON serialization. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dictionary of engine keyword arguments ready for create_async_engine(). |
Raises:
| Type | Description |
|---|---|
ImportError
|
If use_orjson is True but orjson is not installed. |
Examples:
>>> kwargs = build_engine_kwargs(
... echo=False,
... poolclass=NullPool,
... isolation_level=None,
... pool_settings=None,
... connect_args=None,
... extra_kwargs={},
... use_orjson=False,
... )
>>> kwargs["echo"]
False
Source code in sqlalchemy_foundation_kit/base/engine.py
resolve_pool_class(poolclass)
¶
Resolve pool class from string name or return the class directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
poolclass
|
PoolClassStr | str | type
|
Pool class name (e.g., "null", "queue") or actual class type. |
required |
Returns:
| Type | Description |
|---|---|
type
|
Pool class type. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pool class name is not recognized. |
Source code in sqlalchemy_foundation_kit/base/engine.py
register_pool_class(name, pool_class, *, override=False)
¶
Register a custom pool class.
Convenience wrapper around :meth:PoolRegistry.register for users who prefer
a functional API over the class-based one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Pool class identifier (lowercase recommended). |
required |
pool_class
|
type
|
Pool class type to register. |
required |
override
|
bool
|
If True, allows overriding built-in pools (use with caution). |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If name already exists and override=False. |
Source code in sqlalchemy_foundation_kit/base/engine.py
load_orm_metadata(models_modules, metadata=None)
¶
Load all ORM models metadata synchronously.
Imports specified modules to ensure that all SQLAlchemy models are registered in the metadata. This is useful for migrations and schema introspection with tools like Alembic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
models_modules
|
Iterable[str]
|
Iterable of module paths to import (e.g., ["myapp.models", "myapp.core.models"]). |
required |
metadata
|
MetaData | None
|
Optional specific MetaData object to use. If None, uses Base.metadata. |
None
|
Returns:
| Type | Description |
|---|---|
MetaData
|
MetaData object containing all registered models from the imported modules. |
Examples:
Load models from multiple modules: >>> from sqlalchemy_foundation_kit.base import load_orm_metadata >>> metadata = load_orm_metadata([ ... "myapp.users.models", ... "myapp.orders.models", ... "myapp.products.models", ... ]) >>> len(metadata.tables) 15
Use with custom metadata: >>> from sqlalchemy import MetaData >>> custom_meta = MetaData(schema="public") >>> metadata = load_orm_metadata(["myapp.models"], metadata=custom_meta)
Typical usage in Alembic env.py: >>> from sqlalchemy_foundation_kit.base import Base, load_orm_metadata >>> target_metadata = Base.metadata >>> load_orm_metadata(["myapp.models"]) # Register all models >>> # Now target_metadata.tables contains all tables
Source code in sqlalchemy_foundation_kit/base/metadata.py
configure_orjson_serialization()
¶
Configure orjson serialization for SQLAlchemy engine.
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dictionary with json_serializer and json_deserializer configured. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If orjson is not installed. |
Examples:
>>> config = configure_orjson_serialization()
>>> "json_serializer" in config
True
>>> "json_deserializer" in config
True
Source code in sqlalchemy_foundation_kit/base/serialization.py
Base ORM Models¶
Base classes and mixins for SQLAlchemy models.
Database base models and mixins.
DB_NAMING_CONVENTION = {'ix': '%(column_0_label)s_idx', 'uq': '%(table_name)s_%(column_0_name)s_key', 'ck': '%(table_name)s_%(constraint_name)s_check', 'fk': '%(table_name)s_%(column_0_name)s_fkey', 'pk': '%(table_name)s_pkey'}
module-attribute
¶
Base
¶
Bases: DeclarativeBase
Base class for all ORM models.
Source code in sqlalchemy_foundation_kit/base/models.py
BaseTable
¶
DatetimeColumnsMixin
¶
Mixin for tables that need created_at and updated_at timestamps.
Control indexing via created_at_index and updated_at_index class variables in the model. Defaults to False.
Source code in sqlalchemy_foundation_kit/base/models.py
Example Usage¶
from uuid import UUID, uuid4
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy_foundation_kit import BaseTable, DatetimeColumnsMixin
class UserDB(BaseTable, DatetimeColumnsMixin):
"""User ORM model with automatic timestamps."""
__tablename__ = "users"
__created_at_index__ = True # Creates index on created_at
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
email: Mapped[str] = mapped_column(unique=True)
username: Mapped[str]
# Provides:
# - created_at: Mapped[datetime] (server default)
# - updated_at: Mapped[datetime] (auto-updated)
# - __repr__: String representation
Custom Types¶
Custom SQLAlchemy types for PostgreSQL.
Custom SQLAlchemy types.
GenericJSONDict = dict[str, Any]
module-attribute
¶
PydanticJSONB
¶
Bases: TypeDecorator
SQLAlchemy TypeDecorator for Pydantic models stored as JSONB.
Validates and serializes values against the supplied Pydantic-compatible type
on both write and read paths. A model_type is required — if you need
raw dict storage without validation, use SQLAlchemy's built-in JSONB directly
(or the :data:GenericJSONDict alias).
Source code in sqlalchemy_foundation_kit/base/types.py
__init__(model_type, *args, **kwargs)
¶
Initialize the type decorator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_type
|
type[T]
|
Pydantic model class (or any type compatible with
|
required |
Source code in sqlalchemy_foundation_kit/base/types.py
Example Usage¶
from pydantic import BaseModel
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy_foundation_kit import PydanticJSONB, BaseTable
class UserPreferences(BaseModel):
theme: str
language: str
class UserDB(BaseTable):
__tablename__ = "users"
id: Mapped[UUID] = mapped_column(primary_key=True)
preferences: Mapped[UserPreferences] = mapped_column(
PydanticJSONB(UserPreferences)
)
# Automatically validated on read/write
user = UserDB(
preferences=UserPreferences(theme="dark", language="en")
)
Session Management¶
Async session manager with connection pooling and health checks.
Async database session manager with connection pooling.
AsyncSessionManager
¶
Manages async database sessions with configurable connection pooling.
Supports two initialization approaches:
- Direct constructor — all parameters in constructor with defaults.
- Builder pattern — use :class:
AsyncSessionManagerBuilderfor more readable complex configurations.
Source code in sqlalchemy_foundation_kit/session/manager.py
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 | |
engine
property
¶
Get the underlying engine.
session_maker
property
¶
Get the session maker.
__aenter__()
async
¶
__aexit__(exc_type, exc_val, exc_tb)
async
¶
Close the engine on exit.
__init__(url, echo=False, poolclass='null', session_class=None, expire_on_commit=False, connect_args=None, isolation_level=None, pool_settings=None, use_orjson=False, metrics=None, on_engine_created=None, dispose_timeout=DEFAULT_DISPOSE_TIMEOUT_SECONDS, **kwargs)
¶
Initialize session manager with direct configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Database connection URL (required). |
required |
echo
|
bool
|
If True, SQLAlchemy will log all SQL statements (default: False). |
False
|
poolclass
|
str | type
|
SQLAlchemy pool class or name (default: "null"). Use "queue" for production, "null" for testing. |
'null'
|
session_class
|
type[SessionT] | None
|
Custom |
None
|
expire_on_commit
|
bool
|
If True, objects expire after commit (default: False). |
False
|
connect_args
|
dict[str, object] | None
|
Arguments passed to the database driver (default: None). |
None
|
isolation_level
|
str | None
|
Default transaction isolation level (default: None). |
None
|
pool_settings
|
PoolSettingsProtocol | None
|
Pool configuration settings (default: None). |
None
|
use_orjson
|
bool
|
If True, use orjson for JSON serialization (default: False). |
False
|
metrics
|
PostgresMetricsProtocol | None
|
Optional metrics collector (default: None). |
None
|
on_engine_created
|
Callable[[AsyncEngine], None] | None
|
Optional callback invoked with |
None
|
dispose_timeout
|
float
|
Maximum seconds to wait for engine disposal in :meth: |
DEFAULT_DISPOSE_TIMEOUT_SECONDS
|
**kwargs
|
object
|
Additional keyword arguments for |
{}
|
Source code in sqlalchemy_foundation_kit/session/manager.py
aclose()
async
¶
Close the engine and all connections.
Source code in sqlalchemy_foundation_kit/session/manager.py
get_session()
async
¶
Get a new database session.
get_transaction(isolation_level=None)
async
¶
Get a new database session with automatic transaction management.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
str | None
|
Optional isolation level for the transaction. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[SessionT]
|
Managed async session with active transaction. |
Source code in sqlalchemy_foundation_kit/session/manager.py
attach_metrics(engine, metrics)
¶
Attach metrics event listeners to a SQLAlchemy engine.
Registers event handlers for connection checkout, checkin, and error events to collect pool statistics and connection metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
engine
|
AsyncEngine
|
SQLAlchemy |
required |
metrics
|
PostgresMetricsProtocol
|
Metrics collector implementing |
required |
Source code in sqlalchemy_foundation_kit/session/manager.py
Builder pattern for AsyncSessionManager configuration.
Provides a fluent interface for constructing AsyncSessionManager instances with many optional parameters, following the Builder pattern to simplify complex object creation and improve code readability.
AsyncSessionManagerBuilder
¶
Builder for AsyncSessionManager construction.
Provides a fluent API for configuring AsyncSessionManager with many optional parameters. This pattern improves code readability and follows KISS principle by avoiding constructors with 10+ parameters.
Examples:
Basic usage: >>> manager = ( ... AsyncSessionManagerBuilder("postgresql+asyncpg://...") ... .with_pool("queue") ... .with_echo(True) ... .build() ... )
Advanced configuration: >>> manager = ( ... AsyncSessionManagerBuilderCustomSession ... .with_session_class(CustomSession) ... .with_pool("queue", pool_settings=settings.pool) ... .with_metrics(metrics) ... .with_callbacks(on_engine_created=instrument_engine) ... .with_json_serialization(orjson=True) ... .with_isolation_level("READ COMMITTED") ... .build() ... )
Reusable configuration: >>> builder = ( ... AsyncSessionManagerBuilder("postgresql+asyncpg://...") ... .with_pool("queue") ... .with_metrics(metrics) ... ) >>> manager1 = builder.with_echo(True).build() >>> manager2 = builder.with_echo(False).build()
Source code in sqlalchemy_foundation_kit/session/builder.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 | |
__init__(url)
¶
Initialize builder with database URL (required).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Database connection URL (required parameter). |
required |
Source code in sqlalchemy_foundation_kit/session/builder.py
build()
¶
Build AsyncSessionManager instance with configured parameters.
Returns:
| Type | Description |
|---|---|
AsyncSessionManager[SessionT]
|
Configured AsyncSessionManager instance. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If use_orjson=True but orjson is not installed. |
Examples:
>>> manager = builder.build()
>>> async with manager.get_session() as session:
... await session.execute(...)
Source code in sqlalchemy_foundation_kit/session/builder.py
with_callbacks(on_engine_created)
¶
Register engine creation callback.
Useful for attaching instrumentation (OpenTelemetry), custom event listeners, or debug hooks right after engine creation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_engine_created
|
Callable[[AsyncEngine], None]
|
Callback invoked with AsyncEngine after creation. |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Examples:
>>> from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
>>> def instrument(engine):
... SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
>>> builder.with_callbacks(on_engine_created=instrument)
Source code in sqlalchemy_foundation_kit/session/builder.py
with_connect_args(**connect_args)
¶
Add database driver connection arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**connect_args
|
object
|
Arguments passed to the database driver. |
{}
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Examples:
>>> builder.with_connect_args(
... server_settings={"application_name": "myapp"},
... command_timeout=60,
... )
Source code in sqlalchemy_foundation_kit/session/builder.py
with_dispose_timeout(timeout)
¶
Configure how long :meth:AsyncSessionManager.aclose waits for engine disposal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum seconds to wait for the engine to dispose. |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_echo(echo=True)
¶
Enable SQL statement logging.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
echo
|
bool
|
If True, SQLAlchemy logs all SQL statements. |
True
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_expire_on_commit(expire=True)
¶
Configure object expiration behavior after commit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expire
|
bool
|
If True, all objects expire after commit. |
True
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_extra_kwargs(**kwargs)
¶
Add additional keyword arguments for create_async_engine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
object
|
Additional engine configuration. |
{}
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_isolation_level(isolation_level)
¶
Set default transaction isolation level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
str
|
Default isolation level (e.g., "READ COMMITTED"). |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_json_serialization(orjson=True)
¶
Configure JSON serialization backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
orjson
|
bool
|
If True, use orjson for faster JSON serialization. |
True
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If orjson=True but orjson is not installed. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_metrics(metrics)
¶
Enable connection pool metrics collection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
PostgresMetricsProtocol
|
Metrics collector implementing PostgresMetricsProtocol. |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_pool(poolclass, pool_settings=None)
¶
Configure connection pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
poolclass
|
str | type
|
Pool class name or type (e.g., "queue", "null"). |
required |
pool_settings
|
PoolSettingsProtocol | None
|
Optional pool configuration settings. |
None
|
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
with_session_class(session_class)
¶
Use custom session class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_class
|
type[SessionT]
|
Custom AsyncSession subclass. |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManagerBuilder[SessionT]
|
Self for method chaining. |
Source code in sqlalchemy_foundation_kit/session/builder.py
Custom connection class for pgbouncer compatibility (async).
AsyncCConnection
¶
Bases: Connection
Custom async connection class for pgbouncer magic.
This subclass overrides only the private method _get_unique_id so that prepared statement identifiers are unique per connection. That is required when using pgbouncer in transaction mode, where the same server connection may be reused for different logical connections.
See: https://github.com/sqlalchemy/sqlalchemy/issues/6467
Source code in sqlalchemy_foundation_kit/session/connection.py
Retry utilities for database connection startup.
DEFAULT_RETRY_CONFIG = RetryConfig()
module-attribute
¶
RetryConfig
dataclass
¶
Configuration for retry behavior with exponential backoff.
Attributes:
| Name | Type | Description |
|---|---|---|
max_retries |
int
|
Maximum number of attempts before giving up. |
retry_delay |
float
|
Base delay in seconds; actual delay is |
max_backoff_delay |
float
|
Maximum delay between retries in seconds. Prevents exponential backoff from growing indefinitely. |
Source code in sqlalchemy_foundation_kit/session/retry.py
retry_async_connection(connect_func, service_name, config=DEFAULT_RETRY_CONFIG)
async
¶
Retry an async connection callable with exponential backoff.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connect_func
|
Callable[[], Awaitable[None]]
|
Callable that attempts to establish/test the connection. |
required |
service_name
|
str
|
Human-readable service name used in log messages. |
required |
config
|
RetryConfig
|
Retry behavior configuration. |
DEFAULT_RETRY_CONFIG
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If config.max_retries is less than 1. |
Exception
|
Re-raises the last exception when all attempts fail. |
Source code in sqlalchemy_foundation_kit/session/retry.py
PostgreSQL advisory locks (async).
try_advisory_xact_lock(session, key)
async
¶
Acquire a Postgres transaction-scoped advisory lock.
Uses pg_try_advisory_xact_lock: non-blocking, released automatically
at transaction end. String keys are hashed to integers. The key is then
truncated to signed 64-bit as Postgres expects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
SQLAlchemy AsyncSession within an active transaction. |
required |
key
|
str | int
|
Lock identifier (string or integer). Strings are hashed to integers. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if lock was acquired, False if already held by another session. |
Examples:
>>> async with session_maker() as session:
... async with session.begin():
... if await try_advisory_xact_lock(session, "my_operation"):
... # Perform protected operation
... await session.execute(...)
... await session.commit()
Source code in sqlalchemy_foundation_kit/session/locks.py
Example Usage¶
from sqlalchemy_foundation_kit import create_async_session_manager
# Create session manager
session_manager = create_async_session_manager(
settings.postgres,
metrics=metrics,
)
# Transactional context
async with session_manager.get_transaction() as session:
user = UserDB(email="user@example.com")
session.add(user)
# Auto-commit on exit, auto-rollback on exception
# Health check
is_healthy = await session_manager.healthcheck()
# Graceful shutdown
await session_manager.close(timeout=30.0)
Unit of Work¶
Unit of Work pattern for transactional consistency.
Unit of Work protocols.
AsyncUnitOfWork
¶
Bases: Protocol, Generic[T_co]
Provides transactional context for repository operations.
Provides three modes of operation
transaction(): For write operations with automatic commit/rollback.managed_session(): For write operations with manual commit/rollback control.query(): For read-only operations without transaction management.
Source code in sqlalchemy_foundation_kit/uow/protocols.py
managed_session(isolation_level=None)
¶
Create a session with manual transaction control.
Unlike :meth:transaction, this does NOT auto-commit on success. The caller
must explicitly call session.commit() or session.rollback(). Useful for
complex transactional logic where the commit decision depends on multiple
conditions or external factors.
The second element of the yielded tuple is the underlying session object
(typed as Any in the protocol to avoid leaking SQLAlchemy types — concrete
implementations like AsyncSQLAlchemyUnitOfWork yield AsyncSession).
On exception inside the context, the session is automatically rolled back.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
str | None
|
Optional transaction isolation level. |
None
|
Source code in sqlalchemy_foundation_kit/uow/protocols.py
query(isolation_level=None)
¶
transaction(isolation_level=None, flush_before_commit=None)
¶
Create a new transaction context with automatic commit/rollback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
str | None
|
Optional transaction isolation level. |
None
|
flush_before_commit
|
bool | None
|
If True, flush session before commit to surface
constraint violations within transaction. If |
None
|
Source code in sqlalchemy_foundation_kit/uow/protocols.py
AsyncUowTransaction
¶
Bases: Protocol
Transaction-scoped repositories container.
Intended to be extended by concrete transaction types that expose repository attributes for a specific bounded context.
SupportsAdvisoryLock
¶
Bases: Protocol
Capability protocol for transactions supporting advisory locks.
Use this when your transaction needs PostgreSQL advisory lock support. Not all transactions require this capability (e.g., in-memory, read-only).
Source code in sqlalchemy_foundation_kit/uow/protocols.py
try_advisory_lock(key)
async
¶
Try to acquire a transaction-scoped advisory lock identified by key.
Returns True if the lock was acquired (and is held for the rest of the
transaction), False if another transaction already holds it.
Source code in sqlalchemy_foundation_kit/uow/protocols.py
Unit of Work implementation (async SQLAlchemy).
AsyncSQLAlchemyUnitOfWork
¶
Bases: AsyncUnitOfWork[T], Generic[T]
Base async SQLAlchemy Unit of Work.
Provides transactional context for repository operations using SQLAlchemy AsyncSession.
Methods:
| Name | Description |
|---|---|
transaction |
For write operations with automatic commit/rollback. |
query |
For read-only operations without transaction management. |
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 | |
__init__(session_maker, transaction_factory, *, flush_before_commit=True)
¶
Initialize the unit of work.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_maker
|
async_sessionmaker[AsyncSession]
|
Async session factory. |
required |
transaction_factory
|
Callable[[AsyncSession], T]
|
Callable producing the transaction object exposed to callers. |
required |
flush_before_commit
|
bool
|
Default |
True
|
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
managed_session(isolation_level=None)
async
¶
Create a session with manual transaction control.
Unlike transaction(), this does NOT auto-commit. The caller must explicitly call session.commit() or session.rollback(). This is useful for complex transactional logic where commit decision depends on multiple conditions or external factors.
A transaction is started automatically, but you have full control over when to commit or rollback. If you exit without calling either, SQLAlchemy will automatically rollback on session close.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED". |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[T, AsyncSession]]
|
Tuple of (transaction object, session) for manual control. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
Manual transaction control: async with uow.managed_session() as (tx, session): await tx.users.create(...)
# Manually decide when to commit
if should_commit:
await session.commit()
else:
await session.rollback()
Conditional commit based on external service: async with uow.managed_session() as (tx, session): user = await tx.users.create(...)
# Call external service
result = await external_api.validate(user)
if result.success:
await session.commit()
else:
await session.rollback()
Multiple operations with intermediate decision: async with uow.managed_session() as (tx, session): user = await tx.users.create(...)
# First checkpoint
await session.flush()
# More operations
await tx.profiles.create(user_id=user.id)
# Final decision
await session.commit()
Note
Prefer :meth:transaction for the vast majority of use cases — it commits
automatically and enforces the UoW pattern. This method is an advanced escape
hatch for scenarios where the commit/rollback decision depends on conditions
that can only be evaluated after data is written (e.g., external service
validation). session.commit() calls belong exclusively at the use-case
boundary via this method, never inside repository implementations.
Warning
You MUST explicitly call session.commit() or session.rollback(). Exiting the context without calling either will result in automatic rollback when the session closes.
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 | |
open_session(isolation_level=None)
async
¶
Open a session with optional isolation level applied.
This is the extension point for subclasses that need custom session setup (e.g., RLS context, session-level GUCs, custom statement timeouts). Override to wrap or augment session creation while preserving isolation handling.
Used internally by :meth:transaction and :meth:query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[AsyncSession]
|
Configured AsyncSession instance. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
Subclass that sets a session-level GUC for every transaction:
class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
def __init__(self, session_maker, tx_factory, tenant_id):
super().__init__(session_maker, tx_factory)
self._tenant_id = tenant_id
@asynccontextmanager
async def open_session(self, isolation_level=None):
async with super().open_session(isolation_level) as session:
await session.execute(
text("SET app.tenant_id = :tid"),
{"tid": self._tenant_id},
)
yield session
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
query(isolation_level=None)
async
¶
Create a read-only query context without transaction management.
This method is designed for read-only operations and does not start a transaction or perform any commit/rollback. It's semantically clearer than managed_session() for read operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED". |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
Read-only query: async with uow.query() as qx: users = await qx.users.list_all() user = await qx.users.get_by_id(user_id) # No commit/rollback - just closes session
Note
While this method is intended for read-only operations, SQLAlchemy does not enforce this at the session level. It's up to the caller to ensure only read operations are performed.
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
transaction(isolation_level=None, flush_before_commit=None)
async
¶
Create a new transaction context with automatic commit/rollback.
The Unit of Work automatically commits the transaction on successful exit and rolls back on exception. This ensures atomic operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. Can be an IsolationLevel enum member or a string value. Supported values: "READ_COMMITTED", "REPEATABLE_READ", "SERIALIZABLE", "READ_UNCOMMITTED". |
None
|
flush_before_commit
|
bool | None
|
If True, flush the session before commit to surface
constraint violations while still inside the transaction. If |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
Write operation with automatic commit: async with uow.transaction() as tx: await tx.users.create(...) # Auto-commit on exit, rollback on exception
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
AsyncSQLAlchemyUowTransaction
¶
Bases: AsyncUowTransaction
Base async SQLAlchemy transaction-scoped repositories.
This class provides access to the underlying SQLAlchemy session and is intended to be subclassed by services to expose specific repositories.
Example
class IdentityTransaction(AsyncSQLAlchemyUowTransaction): @property def users(self) -> UserRepository: return PostgresUserRepository(self.session)
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
session
property
¶
Get the underlying SQLAlchemy async session.
PostgresAdvisoryLockMixin
¶
Mixin providing PostgreSQL advisory lock support for UoW transactions.
Requires the class to have a session property returning AsyncSession.
Example
class IdentityTransaction(AsyncSQLAlchemyUowTransaction, PostgresAdvisoryLockMixin): @property def users(self) -> UserRepository: return PostgresUserRepository(self.session)
Now has access to try_advisory_lock method¶
async with uow.transaction() as tx: if await tx.try_advisory_lock(12345): # Protected operation ...
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
try_advisory_lock(key)
async
¶
Acquire a Postgres transaction-scoped advisory lock.
Delegates to :func:try_advisory_xact_lock for actual locking logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
int
|
Integer lock key. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if lock was acquired, False if already held by another session. |
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
normalize_isolation_level(isolation_level)
¶
Normalize isolation_level to a valid PostgreSQL string or None.
Accepts both enum members and strings. Strings may use either underscores or spaces (e.g., "READ_COMMITTED" or "READ COMMITTED") for convenience.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Enum member, string, or None. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
PostgreSQL-form string (with spaces) valid for execution_options, or None. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
>>> normalize_isolation_level(None)
None
>>> normalize_isolation_level(IsolationLevel.READ_COMMITTED)
'READ COMMITTED'
>>> normalize_isolation_level("READ_COMMITTED")
'READ COMMITTED'
>>> normalize_isolation_level("read committed")
'READ COMMITTED'
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
apply_isolation_level(session, isolation_level)
async
¶
Apply isolation level to an async session's connection.
Implementation Detail:
We use run_sync() because SQLAlchemy's execution_options() is a
synchronous method that configures the underlying DBAPI connection object.
We must bridge from async context to sync method via run_sync().
This is a DRY utility to eliminate duplication of isolation level application
logic across transaction(), managed_session(), and query() methods.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
SQLAlchemy AsyncSession to configure. |
required |
isolation_level
|
IsolationLevel | str | None
|
Desired isolation level (enum, string, or None). |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported (raised by normalize_isolation_level). |
Examples:
>>> async with session_maker() as session:
... await apply_isolation_level(session, IsolationLevel.SERIALIZABLE)
... result = await session.execute(select(User))
... # Query runs with SERIALIZABLE isolation level
Using string isolation level:
No-op when None:
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
Enums for Unit of Work.
IsolationLevel
¶
Bases: StrEnum
PostgreSQL transaction isolation levels.
Values match PostgreSQL's expected form (with spaces) for use with SQLAlchemy execution_options(isolation_level=...).
Source code in sqlalchemy_foundation_kit/uow/enums.py
Example Usage¶
from sqlalchemy_foundation_kit import (
AsyncSQLAlchemyUnitOfWork,
AsyncSQLAlchemyUowTransaction,
PostgresAdvisoryLockMixin,
)
# Define transaction with repositories
class MyTransaction(AsyncSQLAlchemyUowTransaction, PostgresAdvisoryLockMixin):
def __init__(self, session):
super().__init__(session)
self._users = None
@property
def users(self):
if self._users is None:
self._users = PostgresUserRepository(self.session)
return self._users
# Create UoW
class MyUnitOfWork(AsyncSQLAlchemyUnitOfWork[MyTransaction]):
def __init__(self, session_maker):
super().__init__(session_maker, transaction_factory=MyTransaction)
# Use in application
async with uow.transaction() as tx:
# All operations in single transaction
user = await tx.users.create(email="user@example.com")
# Advisory lock
if await tx.try_advisory_lock("process_payments"):
await process_payments()
Configuration Protocols¶
Type-safe configuration protocols.
PostgreSQL configuration.
PostgresSettingsProtocol
¶
Bases: Protocol
Protocol for PostgreSQL configuration.
Organized protocol with grouped settings for connection, pool, and query configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
connection |
ConnectionSettingsProtocol
|
Connection parameters (host, port, user, database). |
pool |
PoolSettingsProtocol
|
Connection pool settings. |
query |
QuerySettingsProtocol
|
Query execution and transaction settings. |
application_name |
str
|
Application identifier for connections. |
db_schema |
str | None
|
Optional PostgreSQL schema name. |
use_orjson_serialization |
bool
|
Enable orjson for JSON operations. |
jit |
str | None
|
JIT compilation setting (PgBouncer compatibility). |
Examples:
Implementing the protocol: >>> class MyConfig: ... connection: ConnectionSettingsProtocol ... pool: PoolSettingsProtocol ... query: QuerySettingsProtocol ... application_name: str = "my-app" ... db_schema: str | None = None ... use_orjson_serialization: bool = True ... jit: str | None = "off" ... ... def to_dsn(self) -> str: ... return f"postgresql://{self.connection.user}@{self.connection.host}..."
Using the protocol: >>> def create_engine(config: PostgresSettingsProtocol): ... dsn = config.to_dsn() ... pool_size = config.pool.pool_size ... echo = config.query.echo
Source code in sqlalchemy_foundation_kit/config/postgres.py
to_dsn()
¶
Convert config to DSN string.
Returns PostgreSQL connection string in format: postgresql+asyncpg://user:password@host:port/database
Examples:
Source code in sqlalchemy_foundation_kit/config/postgres.py
ConnectionSettingsProtocol
¶
Bases: Protocol
Protocol for PostgreSQL connection settings.
Defines connection parameters required for establishing database connections.
Attributes:
| Name | Type | Description |
|---|---|---|
host |
str
|
PostgreSQL server hostname or IP address. |
port |
int
|
PostgreSQL server port number. |
user |
str
|
Database username for authentication. |
password |
PasswordLike | str
|
Database password — either a plain |
database |
str
|
Target database name. |
Source code in sqlalchemy_foundation_kit/config/postgres.py
PoolSettingsProtocol
¶
Bases: Protocol
Protocol for PostgreSQL connection pool settings.
Defines pool configuration for SQLAlchemy engine connection management.
Attributes:
| Name | Type | Description |
|---|---|---|
kind |
str | type
|
Connection pool implementation (queue, static, etc.). |
size |
int | None
|
Number of connections to maintain in pool. |
max_overflow |
int | None
|
Additional connections allowed when pool exhausted. |
pre_ping |
bool
|
Test connection health before checkout. |
recycle |
int | None
|
Recycle connections after N seconds. |
timeout |
float | None
|
Timeout for acquiring connection from pool. |
Examples:
>>> pool: PoolSettingsProtocol = ...
>>> if pool.size > 100:
... logger.warning("Large pool size detected")
Source code in sqlalchemy_foundation_kit/config/postgres.py
QuerySettingsProtocol
¶
Bases: Protocol
Protocol for PostgreSQL query execution settings.
Defines query behavior, caching, and transaction isolation configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
echo |
bool
|
Enable SQL statement logging. |
statement_cache_size |
int | None
|
Prepared statement cache size. |
prepared_statement_cache_size |
int | None
|
Server-side prepared statement cache size. |
isolation_level |
str | None
|
Transaction isolation level. |
Examples:
Source code in sqlalchemy_foundation_kit/config/postgres.py
Example Usage¶
from dataclasses import dataclass
from pydantic import SecretStr
from sqlalchemy_foundation_kit import PostgresSettingsProtocol
@dataclass
class MyPostgresConfig:
connection: ConnectionSettingsProtocol
pool: PoolSettingsProtocol
query: QuerySettingsProtocol
application_name: str = "my-service"
def to_dsn(self) -> str:
return f"postgresql+asyncpg://..."
Metrics Protocols¶
Observability protocols for monitoring.
Metrics protocols for database monitoring.
Protocols are split by capability (ISP). Implementations may satisfy
the narrow protocols selectively (e.g., only record errors), and
PostgresMetricsProtocol aggregates them for convenience.
PostgresMetricsProtocol
¶
Bases: PoolStatsRecorder, CheckoutRecorder, ErrorRecorder, Protocol
Composite protocol covering all PostgreSQL metrics capabilities.
Aggregates the narrow capability protocols for convenience. Implementations that only need a subset can implement the individual protocols directly.
Examples:
>>> class MyMetrics:
... def record_pool_stats(self, pool_size: int, pool_checked_out: int, pool_overflow: int) -> None: ...
... def record_checkout(self, duration: float) -> None: ...
... def record_error(self, error_type: str, is_timeout: bool = False) -> None: ...
>>> metrics: PostgresMetricsProtocol = MyMetrics()
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
record_checkout(duration)
¶
Record a database connection checkout from the pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
duration
|
float
|
Time taken to acquire the connection from the pool, in seconds. |
required |
record_error(error_type, is_timeout=False)
¶
Record a database connection or execution error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_type
|
str
|
The type of error that occurred (e.g., "OperationalError"). |
required |
is_timeout
|
bool
|
True if this error was specifically a connection checkout timeout. |
False
|
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
record_pool_stats(pool_size, pool_checked_out, pool_overflow)
¶
Record database connection pool statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_size
|
int
|
Current total number of connections in the pool. |
required |
pool_checked_out
|
int
|
Number of connections currently in use. |
required |
pool_overflow
|
int
|
Number of connections over the configured pool_size. |
required |
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
PoolStatsRecorder
¶
Bases: Protocol
Capability protocol for recording pool statistics.
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
record_pool_stats(pool_size, pool_checked_out, pool_overflow)
¶
Record database connection pool statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_size
|
int
|
Current total number of connections in the pool. |
required |
pool_checked_out
|
int
|
Number of connections currently in use. |
required |
pool_overflow
|
int
|
Number of connections over the configured pool_size. |
required |
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
CheckoutRecorder
¶
Bases: Protocol
Capability protocol for recording connection checkout duration.
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
ErrorRecorder
¶
Bases: Protocol
Capability protocol for recording database errors.
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
record_error(error_type, is_timeout=False)
¶
Record a database connection or execution error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_type
|
str
|
The type of error that occurred (e.g., "OperationalError"). |
required |
is_timeout
|
bool
|
True if this error was specifically a connection checkout timeout. |
False
|
Source code in sqlalchemy_foundation_kit/protocols/metrics.py
Example Usage¶
from sqlalchemy_foundation_kit.protocols import PostgresMetricsProtocol
class CustomMetrics:
"""Custom metrics implementation."""
def record_pool_stats(
self,
pool_size: int,
pool_checked_out: int,
pool_overflow: int,
) -> None:
# Record pool statistics
pass
def record_checkout(self, duration: float) -> None:
# Record connection checkout duration
pass
def record_error(self, error_type: str, is_timeout: bool) -> None:
# Record database errors
pass
Contrib Modules¶
Optional modules with extra functionality.
contrib.settings¶
Pydantic-based configuration (requires [settings] extra).
Base PostgreSQL configuration using pydantic-settings.
BasePostgresConfig
¶
Bases: BaseSettings
Base PostgreSQL configuration.
Organized configuration for PostgreSQL database connections with grouped settings.
Attributes:
| Name | Type | Description |
|---|---|---|
connection |
ConnectionSettings
|
Connection parameters (host, port, credentials, database). |
pool |
PoolSettings
|
Connection pool configuration (size, overflow, timeouts). |
query |
QuerySettings
|
Query execution settings (echo, caching, isolation level). |
application_name |
str
|
Application name for connection identification. |
db_schema |
str | None
|
Optional PostgreSQL schema name. |
use_orjson_serialization |
bool
|
Use orjson for JSON serialization (requires orjson). |
jit |
PostgresJit | None
|
JIT compilation setting (off/on) for PgBouncer compatibility. |
metrics_enabled |
bool
|
Enable connection pool metrics collection. |
Examples:
>>> config = BasePostgresConfig(
... connection=ConnectionSettings(
... host="localhost",
... user="postgres",
... password=SecretStr("secret"),
... database="mydb",
... ),
... application_name="my-service",
... )
>>> dsn = config.to_dsn()
>>> host = config.connection.host
>>> pool_size = config.pool.size
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
__repr__()
¶
to_dsn(driver='asyncpg', mask_password=False)
¶
Build PostgreSQL DSN for async connections.
This library is async-only, so DSN always includes asyncpg driver by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
driver
|
str | None
|
Driver name (default: 'asyncpg' for async connections). Pass None to omit driver suffix. |
'asyncpg'
|
mask_password
|
bool
|
If True, the password will be masked (default: False). |
False
|
Returns:
| Type | Description |
|---|---|
str
|
PostgreSQL connection string (e.g., 'postgresql+asyncpg://user:pass@host:5432/db'). |
Examples:
>>> config.to_dsn()
'postgresql+asyncpg://user:secret@localhost:5432/mydb'
>>> config.to_dsn(mask_password=True)
'postgresql+asyncpg://user:**********@localhost:5432/mydb'
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
ConnectionSettings
¶
Bases: BaseModel
PostgreSQL connection configuration.
Groups all connection-related parameters: host, port, credentials, and database name.
Examples:
>>> connection = ConnectionSettings(
... host="localhost",
... port=5432,
... user="postgres",
... password=SecretStr("secret"),
... database="mydb",
... )
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
PoolSettings
¶
Bases: BaseModel
PostgreSQL connection pool configuration.
Groups all connection pool-related parameters for SQLAlchemy engine.
Examples:
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
QuerySettings
¶
Bases: BaseModel
PostgreSQL query and performance configuration.
Groups query execution, caching, and transaction isolation settings.
Examples:
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
BasePostgresMigrationsConfig
¶
Bases: BaseSettings
Base configuration for PostgreSQL database migrations.
Source code in sqlalchemy_foundation_kit/contrib/settings/postgres.py
contrib.metrics¶
Prometheus metrics (requires [metrics] extra).
Postgres metrics using prometheus-client.
PostgresMetrics
¶
Postgres connection pool metrics.
Metrics
- postgres_db_pool_size: Current database connection pool size.
- postgres_db_pool_checked_out: Number of connections currently checked out.
- postgres_db_pool_overflow: Number of connections over pool_size (within max_overflow).
- postgres_db_connection_checkout_duration_seconds: Time to acquire connection from pool.
- postgres_db_connection_timeouts_total: Number of connection checkout timeouts.
- postgres_db_connection_errors_total: Number of connection errors.
Labels
- error_type: Type of connection error (for errors_total).
Source code in sqlalchemy_foundation_kit/contrib/metrics/postgres.py
__init__(prefix=None)
¶
Initialize postgres metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str | None
|
Metric name prefix. |
None
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If prometheus-client is not installed. |
Source code in sqlalchemy_foundation_kit/contrib/metrics/postgres.py
record_checkout(duration)
¶
Record a database connection checkout from the pool.
record_error(error_type, is_timeout=False)
¶
Record a database connection or execution error.
Source code in sqlalchemy_foundation_kit/contrib/metrics/postgres.py
record_pool_stats(pool_size, pool_checked_out, pool_overflow)
¶
Record database connection pool statistics.
Source code in sqlalchemy_foundation_kit/contrib/metrics/postgres.py
contrib.di (Dishka)¶
Dishka dependency injection providers (requires [dishka] extra).
Database providers for dishka.
AsyncDatabaseProvider
¶
Bases: BaseDishkaProvider
Provider for database dependencies.
Provides
AsyncSessionManager: Manages database connections and engine lifecycle.async_sessionmaker[AsyncSession]: Factory for creating database sessions.
Note: PostgresMetricsProtocol must be provided by a separate provider
(e.g., :class:PrometheusPostgresMetricsProvider) or registered in the container.
If you don't want metrics, simply don't register such a provider.
Customization
- Pass
healthcheck_query=Noneto skip the startup connectivity check. - Pass a custom :class:
RetryConfigto tune startup retry behaviour. - Subclass and override :meth:
create_session_managerto fully customize how the manager is constructed (e.g., to passextra_server_settings,connection_class, oron_engine_created).
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | |
__init__(healthcheck_query=DEFAULT_HEALTHCHECK_QUERY, retry_config=DEFAULT_RETRY_CONFIG)
¶
Initialize provider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
healthcheck_query
|
str | None
|
SQL executed at startup to verify connectivity.
Pass |
DEFAULT_HEALTHCHECK_QUERY
|
retry_config
|
RetryConfig
|
Retry behavior for the startup healthcheck. |
DEFAULT_RETRY_CONFIG
|
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
__init_subclass__(**kwargs)
¶
create_session_manager(postgres_config, metrics)
¶
Build an AsyncSessionManager for the given config.
Override this hook to customize session manager construction — for example,
to pass extra_server_settings, a custom connection_class, or an
on_engine_created callback for OpenTelemetry instrumentation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
postgres_config
|
PostgresSettingsProtocol
|
PostgreSQL configuration. |
required |
metrics
|
PostgresMetricsProtocol | None
|
Optional metrics collector. |
required |
Returns:
| Type | Description |
|---|---|
AsyncSessionManager[AsyncSession]
|
Configured |
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
get_session_maker(session_manager)
¶
Provide session maker.
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
get_session_manager(postgres_config, metrics=None)
async
¶
Provide database session manager.
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
AsyncUnitOfWorkProvider
¶
Bases: BaseDishkaProvider
Provider for Unit of Work.
Provides
AsyncUnitOfWork: Standardized interface for database transactions.
Note: UoW is APP-scoped because it's stateless — it only holds a reference to
session_maker (factory). Real database connections are created only when calling
uow.transaction(), and are properly closed after the context manager exits.
Customization
Override :meth:create_uow to use a custom transaction class
(e.g., one that exposes domain repositories as lazy properties).
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
__init__()
¶
__init_subclass__(**kwargs)
¶
create_uow(session_maker)
¶
Construct the UoW instance. Override to inject a custom transaction class.
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
get_uow(session_maker)
¶
Provide Unit of Work.
Returns a stateless UoW instance that can be safely reused.
Each call to uow.transaction() creates a new database session/connection.
Source code in sqlalchemy_foundation_kit/contrib/di/database.py
Metrics providers for dishka.
PrometheusPostgresMetricsProvider
¶
Bases: BaseMetricsProvider
Provider for Prometheus PostgreSQL metrics.
Source code in sqlalchemy_foundation_kit/contrib/di/metrics.py
__init__()
¶
__init_subclass__(**kwargs)
¶
get_metrics(metrics, default_prefix, postgres=None)
¶
Provide Postgres metrics implementing PostgresMetricsProtocol.
Source code in sqlalchemy_foundation_kit/contrib/di/metrics.py
contrib.dependency_injector¶
dependency-injector containers (requires [dependency-injector] extra).
Database containers for dependency-injector.
DatabaseContainer
¶
Bases: BaseDIContainer
Container for database dependencies.
Provides
session_manager: Manages database connections and engine lifecycle.session_maker: Factory for creating database sessions.uow: Unit of Work for database transactions.
Configuration
postgres_config: PostgreSQL configuration (PostgresSettingsProtocol).metrics: Optional metrics collector (PostgresMetricsProtocol).healthcheck_query: SQL executed at startup (default:"SELECT 1",Noneto skip).retry_config: Retry behavior for healthcheck (default:RetryConfig()).
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
AsyncDatabaseResourceProvider
¶
Helper class for managing database session manager lifecycle.
Use this when you need manual control over session manager lifecycle — for example in tests or when not using dependency-injector containers.
Examples:
>>> provider = AsyncDatabaseResourceProvider(config, metrics)
>>> manager = await provider.start()
>>> # Use manager...
>>> await provider.stop()
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
__init__(postgres_config, metrics=None, healthcheck_query=DEFAULT_HEALTHCHECK_QUERY, retry_config=DEFAULT_RETRY_CONFIG)
¶
Initialize provider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
postgres_config
|
PostgresSettingsProtocol
|
PostgreSQL configuration. |
required |
metrics
|
PostgresMetricsProtocol | None
|
Optional metrics collector. |
None
|
healthcheck_query
|
str | None
|
SQL executed at startup to verify connectivity.
Pass |
DEFAULT_HEALTHCHECK_QUERY
|
retry_config
|
RetryConfig
|
Retry behavior for healthcheck. |
DEFAULT_RETRY_CONFIG
|
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
start()
async
¶
Start session manager and perform healthcheck.
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
stop()
async
¶
Stop session manager and close connections.
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/database.py
Metrics containers for dependency-injector.
PrometheusMetricsContainer
¶
Bases: BaseDIContainer
Container for Prometheus PostgreSQL metrics.
Provides: - postgres_metrics: PostgreSQL metrics collector implementing PostgresMetricsProtocol.
Configuration
- metrics_settings: General prometheus metrics settings (PrometheusMetricsSettingsProtocol).
- default_prefix: Default prefix for infrastructure metrics (str | None).
- postgres_settings: PostgreSQL settings with metrics_enabled flag (PostgresMetricsSettingsProtocol).
Source code in sqlalchemy_foundation_kit/contrib/dependency_injector/metrics.py
contrib.telemetry¶
OpenTelemetry instrumentation (requires [telemetry] extra).
OpenTelemetry instrumentation functions for SQLAlchemy and asyncpg.
instrument_sqlalchemy(engine=None, **kwargs)
¶
Instrument SQLAlchemy engine for OpenTelemetry tracing.
Automatically traces all SQLAlchemy operations including queries, commits, and rollbacks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
engine
|
Any | None
|
Optional SQLAlchemy engine to instrument. If None, all engines. |
None
|
**kwargs
|
Any
|
Additional keyword arguments passed to SQLAlchemyInstrumentor. |
{}
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If opentelemetry-instrumentation-sqlalchemy is not installed. |
Examples:
>>> from sqlalchemy import create_engine
>>> from sqlalchemy_foundation_kit.contrib.telemetry import instrument_sqlalchemy
>>> engine = create_engine("postgresql://...")
>>> instrument_sqlalchemy(engine=engine)
Source code in sqlalchemy_foundation_kit/contrib/telemetry/instrumentations.py
instrument_asyncpg(**kwargs)
¶
Instrument asyncpg connections for OpenTelemetry tracing.
Automatically traces all asyncpg database operations at the connection level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Additional keyword arguments passed to AsyncPGInstrumentor. |
{}
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If opentelemetry-instrumentation-asyncpg is not installed. |
Examples:
>>> from sqlalchemy_foundation_kit.contrib.telemetry import instrument_asyncpg
>>> instrument_asyncpg()
Source code in sqlalchemy_foundation_kit/contrib/telemetry/instrumentations.py
Unit of Work with OpenTelemetry tracing support.
TracedAsyncUnitOfWork
¶
Bases: AsyncSQLAlchemyUnitOfWork[T], Generic[T]
Unit of Work with automatic OpenTelemetry tracing.
Automatically creates spans for transaction() and query() operations, including transaction attributes (isolation level, duration, outcome).
Example
from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider
trace.set_tracer_provider(TracerProvider())
uow = TracedAsyncUnitOfWork( session_maker=session_maker, transaction_factory=MyTransaction, service_name="my-service", )
async with uow.transaction() as tx: # This operation is automatically traced user = await tx.users.create(...)
Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | |
__init__(session_maker, transaction_factory, service_name='sqlalchemy-foundation-kit', *, flush_before_commit=True)
¶
Initialize traced unit of work.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_maker
|
async_sessionmaker[AsyncSession]
|
SQLAlchemy async session maker. |
required |
transaction_factory
|
Callable[[AsyncSession], T]
|
Factory function to create transaction objects. |
required |
service_name
|
str
|
Service name for OpenTelemetry tracer. |
'sqlalchemy-foundation-kit'
|
flush_before_commit
|
bool
|
Default |
True
|
Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
managed_session(isolation_level=None)
async
¶
Create a session with manual transaction control and tracing.
Automatically creates a span named "uow.managed_session" with attributes: - db.operation: "managed_session" - db.isolation_level: The isolation level (if specified)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[T, AsyncSession]]
|
Tuple of (transaction object, session) for manual control. |
Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
open_session(isolation_level=None)
async
¶
Open a session with optional isolation level applied.
This is the extension point for subclasses that need custom session setup (e.g., RLS context, session-level GUCs, custom statement timeouts). Override to wrap or augment session creation while preserving isolation handling.
Used internally by :meth:transaction and :meth:query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[AsyncSession]
|
Configured AsyncSession instance. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If isolation_level is not supported. |
Examples:
Subclass that sets a session-level GUC for every transaction:
class TenantUnitOfWork(AsyncSQLAlchemyUnitOfWork):
def __init__(self, session_maker, tx_factory, tenant_id):
super().__init__(session_maker, tx_factory)
self._tenant_id = tenant_id
@asynccontextmanager
async def open_session(self, isolation_level=None):
async with super().open_session(isolation_level) as session:
await session.execute(
text("SET app.tenant_id = :tid"),
{"tid": self._tenant_id},
)
yield session
Source code in sqlalchemy_foundation_kit/uow/sqlalchemy.py
query(isolation_level=None)
async
¶
Create a read-only query context with tracing.
Automatically creates a span named "uow.query" with attributes: - db.operation: "query" - db.isolation_level: The isolation level (if specified)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[T]
|
Query object with repositories. |
Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
transaction(isolation_level=None, flush_before_commit=None)
async
¶
Create a new transaction context with tracing.
Automatically creates a span named "uow.transaction" with attributes: - db.operation: "transaction" - db.isolation_level: The isolation level (if specified)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
isolation_level
|
IsolationLevel | str | None
|
Optional transaction isolation level. |
None
|
flush_before_commit
|
bool | None
|
If True, flush before commit. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[T]
|
Transaction object with repositories. |
Source code in sqlalchemy_foundation_kit/contrib/telemetry/uow.py
Engine Utilities¶
Low-level engine configuration utilities.
SQLAlchemy engine configuration utilities.
PoolClassStr = Literal['null', 'queue', 'singleton_thread', 'async_adapted_queue', 'fallback_async_adapted_queue', 'static']
module-attribute
¶
PoolRegistry
¶
Registry for SQLAlchemy pool classes.
Provides a centralized registry for pool classes that follows the Open/Closed principle:
- Open for extension: custom pools can be registered via :meth:register
- Closed for modification: built-in pools are immutable
This design allows library users to register custom pool implementations without modifying library code.
Examples:
Register a custom pool class: >>> class MyCustomPool(QueuePool): ... pass >>> PoolRegistry.register("custom", MyCustomPool) >>> pool = PoolRegistry.resolve("custom")
Override built-in pool (not recommended, but possible): >>> PoolRegistry.register("queue", MyCustomQueuePool, override=True)
Source code in sqlalchemy_foundation_kit/base/engine.py
list_available()
classmethod
¶
register(name, pool_class, *, override=False)
classmethod
¶
Register a custom pool class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Pool class identifier (lowercase recommended). |
required |
pool_class
|
type
|
Pool class type to register. |
required |
override
|
bool
|
If True, allows overriding built-in pools (use with caution). |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If name already exists and override=False. |
Examples:
Source code in sqlalchemy_foundation_kit/base/engine.py
resolve(name)
classmethod
¶
Resolve pool class by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Pool class identifier. |
required |
Returns:
| Type | Description |
|---|---|
type
|
Pool class type. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pool class name is not registered. |
Examples:
Source code in sqlalchemy_foundation_kit/base/engine.py
build_engine_kwargs(echo, poolclass, isolation_level, pool_settings, connect_args, extra_kwargs, use_orjson=False)
¶
Build SQLAlchemy engine keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
echo
|
bool
|
If True, SQLAlchemy will log all SQL statements. |
required |
poolclass
|
type
|
SQLAlchemy pool class. |
required |
isolation_level
|
str | None
|
Default transaction isolation level. |
required |
pool_settings
|
PoolSettingsProtocol | None
|
Pool configuration settings (validated by caller, e.g., Pydantic). |
required |
connect_args
|
dict[str, object] | None
|
Arguments passed to the database driver. |
required |
extra_kwargs
|
dict[str, object]
|
Additional keyword arguments for create_async_engine. |
required |
use_orjson
|
bool
|
If True, use orjson for JSON serialization. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dictionary of engine keyword arguments ready for create_async_engine(). |
Raises:
| Type | Description |
|---|---|
ImportError
|
If use_orjson is True but orjson is not installed. |
Examples:
>>> kwargs = build_engine_kwargs(
... echo=False,
... poolclass=NullPool,
... isolation_level=None,
... pool_settings=None,
... connect_args=None,
... extra_kwargs={},
... use_orjson=False,
... )
>>> kwargs["echo"]
False
Source code in sqlalchemy_foundation_kit/base/engine.py
resolve_pool_class(poolclass)
¶
Resolve pool class from string name or return the class directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
poolclass
|
PoolClassStr | str | type
|
Pool class name (e.g., "null", "queue") or actual class type. |
required |
Returns:
| Type | Description |
|---|---|
type
|
Pool class type. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pool class name is not recognized. |
Source code in sqlalchemy_foundation_kit/base/engine.py
register_pool_class(name, pool_class, *, override=False)
¶
Register a custom pool class.
Convenience wrapper around :meth:PoolRegistry.register for users who prefer
a functional API over the class-based one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Pool class identifier (lowercase recommended). |
required |
pool_class
|
type
|
Pool class type to register. |
required |
override
|
bool
|
If True, allows overriding built-in pools (use with caution). |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If name already exists and override=False. |
Source code in sqlalchemy_foundation_kit/base/engine.py
ORM metadata loading utilities.
load_orm_metadata(models_modules, metadata=None)
¶
Load all ORM models metadata synchronously.
Imports specified modules to ensure that all SQLAlchemy models are registered in the metadata. This is useful for migrations and schema introspection with tools like Alembic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
models_modules
|
Iterable[str]
|
Iterable of module paths to import (e.g., ["myapp.models", "myapp.core.models"]). |
required |
metadata
|
MetaData | None
|
Optional specific MetaData object to use. If None, uses Base.metadata. |
None
|
Returns:
| Type | Description |
|---|---|
MetaData
|
MetaData object containing all registered models from the imported modules. |
Examples:
Load models from multiple modules: >>> from sqlalchemy_foundation_kit.base import load_orm_metadata >>> metadata = load_orm_metadata([ ... "myapp.users.models", ... "myapp.orders.models", ... "myapp.products.models", ... ]) >>> len(metadata.tables) 15
Use with custom metadata: >>> from sqlalchemy import MetaData >>> custom_meta = MetaData(schema="public") >>> metadata = load_orm_metadata(["myapp.models"], metadata=custom_meta)
Typical usage in Alembic env.py: >>> from sqlalchemy_foundation_kit.base import Base, load_orm_metadata >>> target_metadata = Base.metadata >>> load_orm_metadata(["myapp.models"]) # Register all models >>> # Now target_metadata.tables contains all tables
Source code in sqlalchemy_foundation_kit/base/metadata.py
JSON serialization utilities for SQLAlchemy.
configure_orjson_serialization()
¶
Configure orjson serialization for SQLAlchemy engine.
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dictionary with json_serializer and json_deserializer configured. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If orjson is not installed. |
Examples:
>>> config = configure_orjson_serialization()
>>> "json_serializer" in config
True
>>> "json_deserializer" in config
True
Source code in sqlalchemy_foundation_kit/base/serialization.py
Type Hints¶
Internal type hints used across the library.
Shared type variables for the library.
Centralized location for all TypeVars to avoid duplication and ensure consistency.