Quick Start¶
This guide will get you up and running with sqlalchemy-foundation-kit in 5 minutes.
Installation¶
Install the library with the extras you need:
# Basic installation (core features only)
pip install sqlalchemy-foundation-kit
# With Pydantic settings (recommended)
pip install sqlalchemy-foundation-kit[settings]
# With Prometheus metrics
pip install sqlalchemy-foundation-kit[settings,metrics]
# With OpenTelemetry tracing
pip install sqlalchemy-foundation-kit[settings,telemetry]
# All features
pip install sqlalchemy-foundation-kit[all]
Basic Usage¶
1. Define Your Configuration¶
Using BasePostgresConfig from contrib.settings:
from pydantic import SecretStr
from pydantic_settings import BaseSettings
from sqlalchemy_foundation_kit.contrib.settings import BasePostgresConfig
class Settings(BaseSettings):
postgres: BasePostgresConfig = BasePostgresConfig(
connection={
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": SecretStr("secret"),
"database": "mydb",
},
pool={
"size": 10,
"max_overflow": 20,
"timeout": 30.0,
"pre_ping": True,
},
query={
"echo": False,
"statement_cache_size": 0,
},
application_name="my-service",
)
settings = Settings()
Or implement PostgresSettingsProtocol directly if you don't want pydantic-settings:
from dataclasses import dataclass
from pydantic import SecretStr
@dataclass
class ConnectionSettings:
host: str = "localhost"
port: int = 5432
user: str = "postgres"
password: SecretStr = SecretStr("secret")
database: str = "mydb"
@dataclass
class PoolSettings:
kind: str = "async_adapted_queue"
size: int = 10
max_overflow: int = 20
pre_ping: bool = True
timeout: float = 30.0
@dataclass
class QuerySettings:
echo: bool = False
statement_cache_size: int = 0
@dataclass
class PostgresConfig:
connection: ConnectionSettings = ConnectionSettings()
pool: PoolSettings = PoolSettings()
query: QuerySettings = QuerySettings()
application_name: str = "my-service"
def to_dsn(self) -> str:
conn = self.connection
password = conn.password.get_secret_value() if hasattr(conn.password, 'get_secret_value') else conn.password
return f"postgresql+asyncpg://{conn.user}:{password}@{conn.host}:{conn.port}/{conn.database}"
2. Define ORM Models¶
Use BaseTable and DatetimeColumnsMixin for automatic naming conventions and timestamps:
from uuid import UUID, uuid4
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy_foundation_kit import BaseTable, DatetimeColumnsMixin
class UserDB(BaseTable, DatetimeColumnsMixin):
"""User ORM model."""
__tablename__ = "users"
__created_at_index__ = True # Creates index on created_at
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
email: Mapped[str] = mapped_column(unique=True, index=True)
username: Mapped[str]
is_active: Mapped[bool] = mapped_column(default=True)
# DatetimeColumnsMixin provides:
# - created_at: Mapped[datetime] (with server default)
# - updated_at: Mapped[datetime] (auto-updated on changes)
3. Create Session Manager¶
AsyncSessionManager handles connection pooling, health checks, and graceful shutdown:
from sqlalchemy_foundation_kit import create_async_session_manager
async def main():
# Create session manager
session_manager = create_async_session_manager(settings.postgres)
# Use transactional context
async with session_manager.get_transaction() as session:
user = UserDB(
email="john@example.com",
username="john_doe"
)
session.add(user)
# Auto-commit on exit
# Auto-rollback on exception
# Graceful shutdown (wait for connections to close)
await session_manager.close()
4. Use Unit of Work Pattern¶
The Unit of Work pattern ensures transactional consistency across multiple repository operations:
from sqlalchemy_foundation_kit import (
AsyncSQLAlchemyUnitOfWork,
AsyncSQLAlchemyUowTransaction,
)
# 1. Define your transaction with repositories
class MyTransaction(AsyncSQLAlchemyUowTransaction):
def __init__(self, session):
super().__init__(session)
self._users = None
@property
def users(self):
"""Lazy-loaded user repository."""
if self._users is None:
self._users = PostgresUserRepository(self.session)
return self._users
# 2. Define repository (returns domain entities, not ORM models)
from uuid import UUID
from typing import Protocol
class User: # Domain entity
id: UUID
email: str
username: str
class UserRepository(Protocol):
async def create(self, email: str, username: str) -> User: ...
async def get_by_id(self, user_id: UUID) -> User | None: ...
class PostgresUserRepository:
def __init__(self, session):
self._session = session
def _to_entity(self, db_model: UserDB) -> User:
return User(
id=db_model.id,
email=db_model.email,
username=db_model.username,
)
async def create(self, email: str, username: str) -> User:
user_db = UserDB(email=email, username=username)
self._session.add(user_db)
await self._session.flush()
return self._to_entity(user_db)
async def get_by_id(self, user_id: UUID) -> User | None:
result = await self._session.execute(
select(UserDB).where(UserDB.id == user_id)
)
db_model = result.scalar_one_or_none()
return self._to_entity(db_model) if db_model else None
# 3. Create UoW
class MyUnitOfWork(AsyncSQLAlchemyUnitOfWork[MyTransaction]):
def __init__(self, session_maker):
super().__init__(
session_maker,
transaction_factory=MyTransaction
)
# 4. Use in application layer
class CreateUserUseCase:
def __init__(self, uow: MyUnitOfWork):
self._uow = uow
async def execute(self, email: str, username: str) -> User:
async with self._uow.transaction() as tx:
# All operations in single transaction
user = await tx.users.create(email=email, username=username)
# Add outbox event in same transaction
# await tx.outbox.create(UserCreatedEvent(...))
return user # Transaction auto-committed
Complete Example¶
Here's a complete working example:
from uuid import UUID, uuid4
from pydantic import SecretStr
from pydantic_settings import BaseSettings
from sqlalchemy import select
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy_foundation_kit import (
create_async_session_manager,
AsyncSQLAlchemyUnitOfWork,
AsyncSQLAlchemyUowTransaction,
BaseTable,
DatetimeColumnsMixin,
)
from sqlalchemy_foundation_kit.contrib.settings import BasePostgresConfig
# Configuration
class Settings(BaseSettings):
postgres: BasePostgresConfig = BasePostgresConfig(
connection={
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": SecretStr("secret"),
"database": "mydb",
}
)
settings = Settings()
# ORM Model
class UserDB(BaseTable, DatetimeColumnsMixin):
__tablename__ = "users"
__created_at_index__ = True
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
email: Mapped[str] = mapped_column(unique=True)
username: Mapped[str]
# Domain Entity
class User:
def __init__(self, id: UUID, email: str, username: str):
self.id = id
self.email = email
self.username = username
# Repository
class PostgresUserRepository:
def __init__(self, session):
self._session = session
def _to_entity(self, db_model: UserDB) -> User:
return User(
id=db_model.id,
email=db_model.email,
username=db_model.username,
)
async def create(self, email: str, username: str) -> User:
user_db = UserDB(email=email, username=username)
self._session.add(user_db)
await self._session.flush()
return self._to_entity(user_db)
async def get_by_email(self, email: str) -> User | None:
result = await self._session.execute(
select(UserDB).where(UserDB.email == email)
)
db_model = result.scalar_one_or_none()
return self._to_entity(db_model) if db_model else None
# Transaction
class MyTransaction(AsyncSQLAlchemyUowTransaction):
def __init__(self, session):
super().__init__(session)
self._users = None
@property
def users(self):
if self._users is None:
self._users = PostgresUserRepository(self.session)
return self._users
# Unit of Work
class MyUnitOfWork(AsyncSQLAlchemyUnitOfWork[MyTransaction]):
def __init__(self, session_maker):
super().__init__(session_maker, transaction_factory=MyTransaction)
# Use Case
class CreateUserUseCase:
def __init__(self, uow: MyUnitOfWork):
self._uow = uow
async def execute(self, email: str, username: str) -> User:
async with self._uow.transaction() as tx:
existing = await tx.users.get_by_email(email)
if existing:
raise ValueError(f"User with email {email} already exists")
user = await tx.users.create(email=email, username=username)
return user
# Main application
async def main():
# Create session manager
session_manager = create_async_session_manager(settings.postgres)
# Create UoW
uow = MyUnitOfWork(session_manager.session_maker)
# Create use case
use_case = CreateUserUseCase(uow)
# Execute
try:
user = await use_case.execute(
email="john@example.com",
username="john_doe"
)
print(f"Created user: {user.email}")
except ValueError as e:
print(f"Error: {e}")
# Cleanup
await session_manager.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Next Steps¶
- Configuration — Learn about all configuration options
- Advanced Usage — Metrics, telemetry, DI, advisory locks
- API Reference — Complete API documentation
Common Patterns¶
Health Check¶
async def health_check():
"""Check database connectivity."""
is_healthy = await session_manager.healthcheck()
return {"database": "healthy" if is_healthy else "unhealthy"}
Graceful Shutdown¶
import signal
async def shutdown(session_manager: AsyncSessionManager):
"""Graceful shutdown handler."""
print("Shutting down...")
await session_manager.close(timeout=30.0)
print("Database connections closed")
# Register signal handler
loop = asyncio.get_event_loop()
loop.add_signal_handler(
signal.SIGTERM,
lambda: asyncio.create_task(shutdown(session_manager))
)
Retry on Connection Error¶
from sqlalchemy_foundation_kit import retry_async_connection, DEFAULT_RETRY_CONFIG
@retry_async_connection(config=DEFAULT_RETRY_CONFIG)
async def fetch_user(session, user_id: UUID):
"""Retries on connection errors."""
result = await session.execute(
select(UserDB).where(UserDB.id == user_id)
)
return result.scalar_one_or_none()
Custom JSON Type¶
from pydantic import BaseModel
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy_foundation_kit import PydanticJSONB
class UserMetadata(BaseModel):
theme: str
language: str
class UserDB(BaseTable):
__tablename__ = "users"
id: Mapped[UUID] = mapped_column(primary_key=True)
metadata: Mapped[UserMetadata] = mapped_column(
PydanticJSONB(UserMetadata)
)
# Usage
user = UserDB(
metadata=UserMetadata(theme="dark", language="en")
)
# Automatically serialized to JSONB in database