Skip to content

API Reference

Auto-generated from source using mkdocstrings.

pg_partsmith

Top-level public API: entities, enums, exceptions, and period calculators.

Entities

Represents a time period for partition boundaries.

Attributes:

Name Type Description
year int

Year component.

month int | None

Month component (1-12), optional.

day int | None

Day component (1-31), optional.

week int | None

ISO week number (1-53), optional.

Source code in pg_partsmith/entities.py
@dataclass(frozen=True)
@functools.total_ordering
class Period:
    """Represents a time period for partition boundaries.

    Attributes:
        year: Year component.
        month: Month component (1-12), optional.
        day: Day component (1-31), optional.
        week: ISO week number (1-53), optional.
    """

    year: int
    month: int | None = None
    day: int | None = None
    week: int | None = None

    def __post_init__(self) -> None:
        """Validate period components."""
        self._check_kind_consistency()
        self._check_month_range()
        self._check_day_resolves_to_real_date()
        self._check_week_resolves_to_real_iso_week()

    def _check_kind_consistency(self) -> None:
        if self.week is not None and (self.month is not None or self.day is not None):
            msg = "Period cannot have both week and month/day"
            raise ValueError(msg)

    def _check_month_range(self) -> None:
        if self.month is None:
            return
        if not MIN_MONTH <= self.month <= MAX_MONTH:
            msg = f"Month must be between {MIN_MONTH} and {MAX_MONTH}, got {self.month}"
            raise ValueError(msg)

    def _check_day_resolves_to_real_date(self) -> None:
        if self.day is None:
            return
        if self.month is None:
            msg = "Month is required when day is specified"
            raise ValueError(msg)
        try:
            date(self.year, self.month, self.day)
        except ValueError as exc:
            msg = f"Invalid date: {self.year}-{self.month}-{self.day}"
            raise ValueError(msg) from exc

    def _check_week_resolves_to_real_iso_week(self) -> None:
        if self.week is None:
            return
        if not MIN_ISO_WEEK <= self.week <= MAX_ISO_WEEK:
            msg = f"Week must be between {MIN_ISO_WEEK} and {MAX_ISO_WEEK}, got {self.week}"
            raise ValueError(msg)
        try:
            date.fromisocalendar(self.year, self.week, 1)
        except ValueError as exc:
            msg = f"Invalid ISO week: {self.year:04d}-W{self.week:02d}"
            raise ValueError(msg) from exc

    def to_date(self, day: int = 1) -> date:
        """Convert period to date.

        Args:
            day: Day of month (default: 1). Ignored for weekly periods and
                when self.day is already set.

        Returns:
            Date object representing this period. For weekly periods returns the
            Monday of the ISO week.
        """
        if self.week is not None:
            return date.fromisocalendar(self.year, self.week, 1)

        month = self.month if self.month is not None else 1
        day_value = self.day if self.day is not None else day

        return date(self.year, month, day_value)

    def __add__(self, offset: int) -> Period:
        """Add offset to period (implementation depends on granularity).

        Args:
            offset: Number of periods to add.

        Returns:
            New Period object.
        """
        if self.week is not None:
            monday = date.fromisocalendar(self.year, self.week, 1)
            new_date = monday + timedelta(weeks=offset)
            iso_year, iso_week, _ = new_date.isocalendar()
            return Period(year=iso_year, week=iso_week)

        if self.day is not None and self.month is not None:
            d = date(self.year, self.month, self.day) + timedelta(days=offset)
            return Period(year=d.year, month=d.month, day=d.day)

        if self.month is not None:
            total_months = self.year * 12 + self.month - 1 + offset
            new_year = total_months // 12
            new_month = (total_months % 12) + 1
            return Period(year=new_year, month=new_month)

        return Period(year=self.year + offset)

    def __sub__(self, offset: int) -> Period:
        """Subtract offset from period.

        Args:
            offset: Number of periods to subtract.

        Returns:
            New Period object.
        """
        return self.__add__(-offset)

    def __lt__(self, other: Period) -> bool:
        """Compare periods."""
        # Required by Python's data model: returning NotImplemented lets Python
        # try the reflected comparison on `other`.
        if not isinstance(other, Period):  # type: ignore[unreachable]
            return NotImplemented  # type: ignore[unreachable]

        self_kind = self._granularity_key(self)
        other_kind = self._granularity_key(other)
        if self_kind != other_kind:
            return NotImplemented

        if self_kind == "year":
            return self.year < other.year

        if self_kind == "month":
            return (self.year, self.month) < (other.year, other.month)

        if self_kind == "day":
            return (self.year, self.month, self.day) < (other.year, other.month, other.day)

        # week
        return (self.year, self.week) < (other.year, other.week)

    @staticmethod
    def _granularity_key(p: Period) -> str:
        if p.week is not None:
            return "week"
        if p.day is not None:
            return "day"
        if p.month is not None:
            return "month"
        return "year"

    def __str__(self) -> str:
        """String representation."""
        if self.month is not None and self.day is not None:
            return f"{self.year:04d}_{self.month:02d}_{self.day:02d}"
        if self.month is not None:
            return f"{self.year:04d}_{self.month:02d}"
        if self.week is not None:
            return f"{self.year:04d}_w{self.week:02d}"
        return f"{self.year:04d}"

__add__(offset)

Add offset to period (implementation depends on granularity).

Parameters:

Name Type Description Default
offset int

Number of periods to add.

required

Returns:

Type Description
Period

New Period object.

Source code in pg_partsmith/entities.py
def __add__(self, offset: int) -> Period:
    """Add offset to period (implementation depends on granularity).

    Args:
        offset: Number of periods to add.

    Returns:
        New Period object.
    """
    if self.week is not None:
        monday = date.fromisocalendar(self.year, self.week, 1)
        new_date = monday + timedelta(weeks=offset)
        iso_year, iso_week, _ = new_date.isocalendar()
        return Period(year=iso_year, week=iso_week)

    if self.day is not None and self.month is not None:
        d = date(self.year, self.month, self.day) + timedelta(days=offset)
        return Period(year=d.year, month=d.month, day=d.day)

    if self.month is not None:
        total_months = self.year * 12 + self.month - 1 + offset
        new_year = total_months // 12
        new_month = (total_months % 12) + 1
        return Period(year=new_year, month=new_month)

    return Period(year=self.year + offset)

__lt__(other)

Compare periods.

Source code in pg_partsmith/entities.py
def __lt__(self, other: Period) -> bool:
    """Compare periods."""
    # Required by Python's data model: returning NotImplemented lets Python
    # try the reflected comparison on `other`.
    if not isinstance(other, Period):  # type: ignore[unreachable]
        return NotImplemented  # type: ignore[unreachable]

    self_kind = self._granularity_key(self)
    other_kind = self._granularity_key(other)
    if self_kind != other_kind:
        return NotImplemented

    if self_kind == "year":
        return self.year < other.year

    if self_kind == "month":
        return (self.year, self.month) < (other.year, other.month)

    if self_kind == "day":
        return (self.year, self.month, self.day) < (other.year, other.month, other.day)

    # week
    return (self.year, self.week) < (other.year, other.week)

__post_init__()

Validate period components.

Source code in pg_partsmith/entities.py
def __post_init__(self) -> None:
    """Validate period components."""
    self._check_kind_consistency()
    self._check_month_range()
    self._check_day_resolves_to_real_date()
    self._check_week_resolves_to_real_iso_week()

__str__()

String representation.

Source code in pg_partsmith/entities.py
def __str__(self) -> str:
    """String representation."""
    if self.month is not None and self.day is not None:
        return f"{self.year:04d}_{self.month:02d}_{self.day:02d}"
    if self.month is not None:
        return f"{self.year:04d}_{self.month:02d}"
    if self.week is not None:
        return f"{self.year:04d}_w{self.week:02d}"
    return f"{self.year:04d}"

__sub__(offset)

Subtract offset from period.

Parameters:

Name Type Description Default
offset int

Number of periods to subtract.

required

Returns:

Type Description
Period

New Period object.

Source code in pg_partsmith/entities.py
def __sub__(self, offset: int) -> Period:
    """Subtract offset from period.

    Args:
        offset: Number of periods to subtract.

    Returns:
        New Period object.
    """
    return self.__add__(-offset)

to_date(day=1)

Convert period to date.

Parameters:

Name Type Description Default
day int

Day of month (default: 1). Ignored for weekly periods and when self.day is already set.

1

Returns:

Type Description
date

Date object representing this period. For weekly periods returns the

date

Monday of the ISO week.

Source code in pg_partsmith/entities.py
def to_date(self, day: int = 1) -> date:
    """Convert period to date.

    Args:
        day: Day of month (default: 1). Ignored for weekly periods and
            when self.day is already set.

    Returns:
        Date object representing this period. For weekly periods returns the
        Monday of the ISO week.
    """
    if self.week is not None:
        return date.fromisocalendar(self.year, self.week, 1)

    month = self.month if self.month is not None else 1
    day_value = self.day if self.day is not None else day

    return date(self.year, month, day_value)

Bases: BaseModel

Metadata about a partition.

Attributes:

Name Type Description
name StrippedNonEmptyStr

Partition table name.

partition_type PartitionType

Type of partition (RANGE, LIST, HASH).

from_value str | None

Start boundary value (for RANGE).

to_value str | None

End boundary value (for RANGE).

boundaries_expr str | None

Raw boundary expression as reported by PostgreSQL (pg_get_expr(relpartbound, oid)). Useful when parsing boundaries fails but the partition is still attached.

is_attached bool

Whether partition is currently attached to parent table.

is_default bool

Whether this is the DEFAULT partition (no explicit boundaries).

parent_table StrippedNonEmptyStr | None

Name of parent partitioned table.

Source code in pg_partsmith/entities.py
class PartitionInfo(BaseModel):
    """Metadata about a partition.

    Attributes:
        name: Partition table name.
        partition_type: Type of partition (RANGE, LIST, HASH).
        from_value: Start boundary value (for RANGE).
        to_value: End boundary value (for RANGE).
        boundaries_expr: Raw boundary expression as reported by PostgreSQL
            (``pg_get_expr(relpartbound, oid)``). Useful when parsing boundaries
            fails but the partition is still attached.
        is_attached: Whether partition is currently attached to parent table.
        is_default: Whether this is the DEFAULT partition (no explicit boundaries).
        parent_table: Name of parent partitioned table.
    """

    model_config = ConfigDict(frozen=True)

    name: StrippedNonEmptyStr
    partition_type: PartitionType
    from_value: str | None = None
    to_value: str | None = None
    boundaries_expr: str | None = None
    is_attached: bool = True
    is_default: bool = False
    parent_table: StrippedNonEmptyStr | None = None

    @model_validator(mode="after")
    def validate_range_boundaries(self) -> PartitionInfo:
        """Validate that attached RANGE partitions have boundaries.

        Detached (orphaned) partitions may have lost their boundary metadata
        from the catalog and are allowed to carry ``None`` boundaries.

        For attached partitions we accept either parsed boundaries
        (``from_value`` + ``to_value``) OR a raw boundaries expression so that
        callers can still reason about partitions even when expression parsing
        fails.
        """
        if self._requires_boundaries() and not (self._has_parsed_boundaries() or self._has_raw_boundaries()):
            msg = "Attached RANGE partitions must have from_value/to_value or boundaries_expr"
            raise ValueError(msg)
        return self

    def _requires_boundaries(self) -> bool:
        return self.partition_type == PartitionType.RANGE and self.is_attached and not self.is_default

    def _has_parsed_boundaries(self) -> bool:
        return self.from_value is not None and self.to_value is not None

    def _has_raw_boundaries(self) -> bool:
        return self.boundaries_expr is not None and self.boundaries_expr.strip() != ""

validate_range_boundaries()

Validate that attached RANGE partitions have boundaries.

Detached (orphaned) partitions may have lost their boundary metadata from the catalog and are allowed to carry None boundaries.

For attached partitions we accept either parsed boundaries (from_value + to_value) OR a raw boundaries expression so that callers can still reason about partitions even when expression parsing fails.

Source code in pg_partsmith/entities.py
@model_validator(mode="after")
def validate_range_boundaries(self) -> PartitionInfo:
    """Validate that attached RANGE partitions have boundaries.

    Detached (orphaned) partitions may have lost their boundary metadata
    from the catalog and are allowed to carry ``None`` boundaries.

    For attached partitions we accept either parsed boundaries
    (``from_value`` + ``to_value``) OR a raw boundaries expression so that
    callers can still reason about partitions even when expression parsing
    fails.
    """
    if self._requires_boundaries() and not (self._has_parsed_boundaries() or self._has_raw_boundaries()):
        msg = "Attached RANGE partitions must have from_value/to_value or boundaries_expr"
        raise ValueError(msg)
    return self

Bases: BaseModel

Configuration for table partitioning maintenance.

Only TIME_BASED (RANGE by date/time) partitioning is currently supported. VALUE_BASED and HASH_BASED strategies are reserved for future use and will raise a ValueError at construction time.

Attributes:

Name Type Description
schema

Optional schema name for the partitioned table. When set, all DDL and catalogue queries are schema-qualified, making behaviour deterministic in databases with multiple schemas.

table_name StrippedNonEmptyStr

Name of the partitioned table (lowercase, max 63 chars minus the longest generated partition suffix).

partition_type PartitionType

Type of partitioning (RANGE, LIST, HASH).

partition_strategy PartitionStrategy

Strategy for partitioning.

partition_column StrippedNonEmptyStr

Column used for partitioning.

granularity PartitionGranularity | None

Time granularity (for TIME_BASED strategy).

create_ahead_count PositiveInt

Number of periods to ensure exist, including the current period.

retention_count PositiveInt

Number of partitions to retain.

auto_attach_after_create bool

Whether to attach immediately after creation.

Source code in pg_partsmith/entities.py
class TablePartitionConfig(BaseModel):
    """Configuration for table partitioning maintenance.

    Only TIME_BASED (RANGE by date/time) partitioning is currently supported.
    VALUE_BASED and HASH_BASED strategies are reserved for future use and will
    raise a ValueError at construction time.

    Attributes:
        schema: Optional schema name for the partitioned table. When set, all
            DDL and catalogue queries are schema-qualified, making behaviour
            deterministic in databases with multiple schemas.
        table_name: Name of the partitioned table (lowercase, max 63 chars minus
            the longest generated partition suffix).
        partition_type: Type of partitioning (RANGE, LIST, HASH).
        partition_strategy: Strategy for partitioning.
        partition_column: Column used for partitioning.
        granularity: Time granularity (for TIME_BASED strategy).
        create_ahead_count: Number of periods to ensure exist, including the current period.
        retention_count: Number of partitions to retain.
        auto_attach_after_create: Whether to attach immediately after creation.
    """

    model_config = ConfigDict(frozen=True)

    # NOTE: We store the value under a different field name to avoid Pydantic's
    # warning about shadowing BaseModel.schema(). Externally, the public API is
    # still `schema=...` and `config.db_schema`.
    schema_name: StrippedNonEmptyStr | None = Field(default=None, alias="schema")
    table_name: StrippedNonEmptyStr
    partition_type: PartitionType
    partition_strategy: PartitionStrategy
    partition_column: StrippedNonEmptyStr
    granularity: PartitionGranularity | None = None
    create_ahead_count: PositiveInt = Field(
        default=DEFAULT_CREATE_AHEAD_COUNT,
        description="Number of periods to ensure exist, including the current period",
    )
    retention_count: PositiveInt = Field(default=DEFAULT_RETENTION_COUNT, description="Number of partitions to retain")
    auto_attach_after_create: bool = True

    @property
    def db_schema(self) -> StrippedNonEmptyStr | None:
        """PostgreSQL schema name."""
        return self.schema_name

    @field_validator("table_name", "partition_column")
    @classmethod
    def validate_identifier(cls, v: str) -> str:
        """Validate and normalise SQL identifiers."""
        result = _validate_pg_identifier(v)
        if result is None:
            msg = "SQL identifier cannot be empty"
            raise ValueError(msg)
        return result

    @field_validator("schema_name")
    @classmethod
    def validate_schema(cls, v: str | None) -> str | None:
        """Validate and normalise schema name."""
        return _validate_pg_identifier(v)

    @model_validator(mode="after")
    def validate_strategy_requirements(self) -> TablePartitionConfig:
        """Validate strategy-specific requirements."""
        if self.partition_strategy in (PartitionStrategy.VALUE_BASED, PartitionStrategy.HASH_BASED):
            msg = (
                f"{self.partition_strategy.value!r} strategy is not yet implemented. "
                "Only TIME_BASED is currently supported."
            )
            raise ValueError(msg)

        if self.partition_strategy == PartitionStrategy.TIME_BASED:
            if self.granularity is None:
                msg = "TIME_BASED strategy requires granularity"
                raise ValueError(msg)
            if self.partition_type != PartitionType.RANGE:
                msg = "TIME_BASED strategy requires RANGE partition type"
                raise ValueError(msg)

            # Validate that generated partition names will not exceed PostgreSQL's
            # 63-byte identifier limit (max_identifier_length default).
            suffix_len = {
                PartitionGranularity.DAY: len("__0000_00_00"),
                PartitionGranularity.WEEK: len("__0000_w00"),
                PartitionGranularity.MONTH: len("__0000_00"),
                PartitionGranularity.YEAR: len("__0000"),
            }[self.granularity]
            if len(self.table_name) + suffix_len > MAX_IDENTIFIER_LENGTH:
                msg = (
                    f"table_name {self.table_name!r} is too long for "
                    f"{self.granularity.value} granularity: "
                    f"table_name ({len(self.table_name)}) + suffix ({suffix_len}) = "
                    f"{len(self.table_name) + suffix_len} > {MAX_IDENTIFIER_LENGTH} bytes."
                )
                raise ValueError(msg)

        return self

db_schema property

PostgreSQL schema name.

validate_identifier(v) classmethod

Validate and normalise SQL identifiers.

Source code in pg_partsmith/entities.py
@field_validator("table_name", "partition_column")
@classmethod
def validate_identifier(cls, v: str) -> str:
    """Validate and normalise SQL identifiers."""
    result = _validate_pg_identifier(v)
    if result is None:
        msg = "SQL identifier cannot be empty"
        raise ValueError(msg)
    return result

validate_schema(v) classmethod

Validate and normalise schema name.

Source code in pg_partsmith/entities.py
@field_validator("schema_name")
@classmethod
def validate_schema(cls, v: str | None) -> str | None:
    """Validate and normalise schema name."""
    return _validate_pg_identifier(v)

validate_strategy_requirements()

Validate strategy-specific requirements.

Source code in pg_partsmith/entities.py
@model_validator(mode="after")
def validate_strategy_requirements(self) -> TablePartitionConfig:
    """Validate strategy-specific requirements."""
    if self.partition_strategy in (PartitionStrategy.VALUE_BASED, PartitionStrategy.HASH_BASED):
        msg = (
            f"{self.partition_strategy.value!r} strategy is not yet implemented. "
            "Only TIME_BASED is currently supported."
        )
        raise ValueError(msg)

    if self.partition_strategy == PartitionStrategy.TIME_BASED:
        if self.granularity is None:
            msg = "TIME_BASED strategy requires granularity"
            raise ValueError(msg)
        if self.partition_type != PartitionType.RANGE:
            msg = "TIME_BASED strategy requires RANGE partition type"
            raise ValueError(msg)

        # Validate that generated partition names will not exceed PostgreSQL's
        # 63-byte identifier limit (max_identifier_length default).
        suffix_len = {
            PartitionGranularity.DAY: len("__0000_00_00"),
            PartitionGranularity.WEEK: len("__0000_w00"),
            PartitionGranularity.MONTH: len("__0000_00"),
            PartitionGranularity.YEAR: len("__0000"),
        }[self.granularity]
        if len(self.table_name) + suffix_len > MAX_IDENTIFIER_LENGTH:
            msg = (
                f"table_name {self.table_name!r} is too long for "
                f"{self.granularity.value} granularity: "
                f"table_name ({len(self.table_name)}) + suffix ({suffix_len}) = "
                f"{len(self.table_name) + suffix_len} > {MAX_IDENTIFIER_LENGTH} bytes."
            )
            raise ValueError(msg)

    return self

Bases: BaseModel

Result of partition maintenance operation.

Attributes:

Name Type Description
created_count NonNegativeInt

Number of partitions created.

detached_count NonNegativeInt

Number of partitions detached in this run.

dropped_count NonNegativeInt

Number of partitions dropped.

duration_ms NonNegativeInt

Duration of maintenance in milliseconds.

error str | None

Fatal error message (set when the whole maintenance run fails).

Source code in pg_partsmith/entities.py
class MaintenanceResult(BaseModel):
    """Result of partition maintenance operation.

    Attributes:
        created_count: Number of partitions created.
        detached_count: Number of partitions detached in this run.
        dropped_count: Number of partitions dropped.
        duration_ms: Duration of maintenance in milliseconds.
        error: Fatal error message (set when the whole maintenance run fails).
    """

    model_config = ConfigDict(frozen=True)

    created_count: NonNegativeInt = 0
    detached_count: NonNegativeInt = 0
    dropped_count: NonNegativeInt = 0
    duration_ms: NonNegativeInt = 0
    error: str | None = None

    @property
    def success(self) -> bool:
        """True only when there is no fatal error."""
        return self.error is None

success property

True only when there is no fatal error.

Exceptions

Domain exceptions for partition management.

PartitionError

Bases: Exception

Base exception for partition-related errors.

Source code in pg_partsmith/exceptions.py
class PartitionError(Exception):
    """Base exception for partition-related errors."""

PartitionAlreadyExistsError

Bases: PartitionError

Raised when attempting to create a partition that already exists.

Source code in pg_partsmith/exceptions.py
class PartitionAlreadyExistsError(PartitionError):
    """Raised when attempting to create a partition that already exists."""

    def __init__(self, partition_name: str) -> None:
        super().__init__(f"Partition already exists: {partition_name}")
        self.partition_name = partition_name

PartitionNotFoundError

Bases: PartitionError

Raised when a partition is not found.

Source code in pg_partsmith/exceptions.py
class PartitionNotFoundError(PartitionError):
    """Raised when a partition is not found."""

    def __init__(self, partition_name: str) -> None:
        super().__init__(f"Partition not found: {partition_name}")
        self.partition_name = partition_name

PartitionAttachedError

Bases: PartitionError

Raised when attempting to drop an attached partition.

Source code in pg_partsmith/exceptions.py
class PartitionAttachedError(PartitionError):
    """Raised when attempting to drop an attached partition."""

    def __init__(self, partition_name: str, table_name: str) -> None:
        super().__init__(f"Partition {partition_name} is still attached to table {table_name}")
        self.partition_name = partition_name
        self.table_name = table_name

PartitionDetachInProgressError

Bases: PartitionError

Raised when detach operation is in progress.

Source code in pg_partsmith/exceptions.py
class PartitionDetachInProgressError(PartitionError):
    """Raised when detach operation is in progress."""

    def __init__(self, partition_name: str) -> None:
        super().__init__(f"Detach operation in progress for partition: {partition_name}")
        self.partition_name = partition_name

InvalidPartitionConfigError

Bases: PartitionError

Raised when partition configuration is invalid.

Source code in pg_partsmith/exceptions.py
class InvalidPartitionConfigError(PartitionError):
    """Raised when partition configuration is invalid."""

    def __init__(self, message: str) -> None:
        super().__init__(f"Invalid partition configuration: {message}")

LockAcquisitionError

Bases: PartitionError

Raised when unable to acquire lock for partition operation.

Source code in pg_partsmith/exceptions.py
class LockAcquisitionError(PartitionError):
    """Raised when unable to acquire lock for partition operation."""

    def __init__(self, table_name: str, reason: str | None = None) -> None:
        msg = f"Failed to acquire lock for table {table_name}"
        if reason:
            msg = f"{msg}: {reason}"
        super().__init__(msg)
        self.table_name = table_name
        self.reason = reason

DropRetryExhaustedError

Bases: PartitionError

Raised when all drop_partition retry attempts are exhausted.

This means PostgreSQL returned a retryable error (deadlock, lock timeout, or query cancellation) on every attempt. Inspect cause for the last underlying error.

Source code in pg_partsmith/exceptions.py
class DropRetryExhaustedError(PartitionError):
    """Raised when all drop_partition retry attempts are exhausted.

    This means PostgreSQL returned a retryable error (deadlock, lock timeout,
    or query cancellation) on every attempt.  Inspect ``cause`` for the last
    underlying error.
    """

    def __init__(self, partition_name: str, attempts: int, cause: BaseException | None = None) -> None:
        msg = (
            f"Failed to drop partition {partition_name!r} after {attempts} attempt(s) "
            "due to persistent lock contention or deadlock"
        )
        if cause:
            msg = f"{msg}: {cause}"
        super().__init__(msg)
        self.partition_name = partition_name
        self.attempts = attempts
        self.cause = cause

UnmanagedPartitionDropError

Bases: PartitionError

Raised when attempting to drop a table not managed by this library.

Source code in pg_partsmith/exceptions.py
class UnmanagedPartitionDropError(PartitionError):
    """Raised when attempting to drop a table not managed by this library."""

    def __init__(self, partition_name: str) -> None:
        msg = f"Refusing to drop unmanaged table {partition_name!r}; set drop_allow_unmanaged=True to override."
        super().__init__(msg)
        self.partition_name = partition_name

Period strategies

Bases: ABC

Base class for all period calculators.

Implements common logic for period calculations and defines the interface for granularity-specific strategies. Subclass and override any method to customise behaviour.

Subclasses must define _NAME_PATTERN (a compiled regex) and implement _period_from_match to construct a Period from regex groups. Group 1 is conventionally the table name; subsequent groups encode the period.

Source code in pg_partsmith/strategies/base.py
class BasePeriodCalculator(ABC):
    """Base class for all period calculators.

    Implements common logic for period calculations and defines
    the interface for granularity-specific strategies.
    Subclass and override any method to customise behaviour.

    Subclasses must define ``_NAME_PATTERN`` (a compiled regex) and implement
    ``_period_from_match`` to construct a ``Period`` from regex groups.
    Group 1 is conventionally the table name; subsequent groups encode the period.
    """

    _NAME_PATTERN: ClassVar[re.Pattern[str]]

    @abstractmethod
    def current_period(self) -> Period:
        """Return the current period based on the current UTC time."""
        ...

    @abstractmethod
    def format_partition_name(self, table_name: str, period: Period) -> str:
        """Format partition name for a given table and period."""
        ...

    @abstractmethod
    def get_boundaries(self, period: Period) -> tuple[str, str]:
        """Return ``(from_value, to_value)`` boundaries for a period as ISO strings."""
        ...

    @abstractmethod
    def _period_from_match(self, match: re.Match[str]) -> Period:
        """Build a ``Period`` from a successful ``_NAME_PATTERN`` match.

        Subclasses may raise ``ValueError`` for invalid calendar values; the
        public ``parse_partition_name`` translates that into ``None``.
        """
        ...

    def parse_partition_name(self, partition_name: str) -> Period | None:
        """Parse period from a partition name.

        Returns ``None`` if the name does not match ``_NAME_PATTERN`` or encodes
        an invalid calendar value (e.g. month 13).
        """
        match = self._NAME_PATTERN.match(partition_name)
        if not match:
            return None
        try:
            return self._period_from_match(match)
        except ValueError:
            return None

    def next_periods(self, count: int) -> list[Period]:
        """Generate N periods starting from the current period (inclusive)."""
        if count <= 0:
            msg = "Count must be positive"
            raise ValueError(msg)

        current = self.current_period()
        return [self.period_after(current, i) for i in range(count)]

    def period_after(self, reference: Period, offset: int) -> Period:
        """Return the period ``offset`` steps after ``reference``."""
        if offset < 0:
            msg = "Offset must be non-negative"
            raise ValueError(msg)

        return reference + offset

    def period_before(self, reference: Period, offset: int) -> Period:
        """Return the period ``offset`` steps before ``reference``."""
        if offset < 0:
            msg = "Offset must be non-negative"
            raise ValueError(msg)

        return reference - offset

current_period() abstractmethod

Return the current period based on the current UTC time.

Source code in pg_partsmith/strategies/base.py
@abstractmethod
def current_period(self) -> Period:
    """Return the current period based on the current UTC time."""
    ...

format_partition_name(table_name, period) abstractmethod

Format partition name for a given table and period.

Source code in pg_partsmith/strategies/base.py
@abstractmethod
def format_partition_name(self, table_name: str, period: Period) -> str:
    """Format partition name for a given table and period."""
    ...

get_boundaries(period) abstractmethod

Return (from_value, to_value) boundaries for a period as ISO strings.

Source code in pg_partsmith/strategies/base.py
@abstractmethod
def get_boundaries(self, period: Period) -> tuple[str, str]:
    """Return ``(from_value, to_value)`` boundaries for a period as ISO strings."""
    ...

next_periods(count)

Generate N periods starting from the current period (inclusive).

Source code in pg_partsmith/strategies/base.py
def next_periods(self, count: int) -> list[Period]:
    """Generate N periods starting from the current period (inclusive)."""
    if count <= 0:
        msg = "Count must be positive"
        raise ValueError(msg)

    current = self.current_period()
    return [self.period_after(current, i) for i in range(count)]

parse_partition_name(partition_name)

Parse period from a partition name.

Returns None if the name does not match _NAME_PATTERN or encodes an invalid calendar value (e.g. month 13).

Source code in pg_partsmith/strategies/base.py
def parse_partition_name(self, partition_name: str) -> Period | None:
    """Parse period from a partition name.

    Returns ``None`` if the name does not match ``_NAME_PATTERN`` or encodes
    an invalid calendar value (e.g. month 13).
    """
    match = self._NAME_PATTERN.match(partition_name)
    if not match:
        return None
    try:
        return self._period_from_match(match)
    except ValueError:
        return None

period_after(reference, offset)

Return the period offset steps after reference.

Source code in pg_partsmith/strategies/base.py
def period_after(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps after ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference + offset

period_before(reference, offset)

Return the period offset steps before reference.

Source code in pg_partsmith/strategies/base.py
def period_before(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps before ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference - offset

Bases: BasePeriodCalculator

Calculator for daily partitions.

Generates partitions with day granularity. Partition naming: {table}__{YYYY}_{MM}_{DD}

Source code in pg_partsmith/strategies/day.py
class DayPeriodCalculator(BasePeriodCalculator):
    """Calculator for daily partitions.

    Generates partitions with day granularity.
    Partition naming: ``{table}__{YYYY}_{MM}_{DD}``
    """

    _NAME_PATTERN: ClassVar[re.Pattern[str]] = re.compile(r"^(.+)__(\d{4})_(\d{2})_(\d{2})$")

    def current_period(self) -> Period:
        """Get current day period."""
        now = utc_now()
        return Period(year=now.year, month=now.month, day=now.day)

    def format_partition_name(self, table_name: str, period: Period) -> str:
        """Format partition name: ``table__YYYY_MM_DD``."""
        if period.month is None or period.day is None:
            msg = "Month and day are required for DayPeriodCalculator"
            raise ValueError(msg)
        return f"{table_name}__{period.year:04d}_{period.month:02d}_{period.day:02d}"

    def _period_from_match(self, match: re.Match[str]) -> Period:
        return Period(
            year=int(match.group(2)),
            month=int(match.group(3)),
            day=int(match.group(4)),
        )

    def get_boundaries(self, period: Period) -> tuple[str, str]:
        """Get day boundaries as ``(start_date, end_date)`` in ISO format."""
        if period.month is None or period.day is None:
            msg = "Month and day are required for DayPeriodCalculator"
            raise ValueError(msg)

        start_date = datetime(period.year, period.month, period.day, tzinfo=UTC)
        end_date = start_date + timedelta(days=1)

        return (start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))

current_period()

Get current day period.

Source code in pg_partsmith/strategies/day.py
def current_period(self) -> Period:
    """Get current day period."""
    now = utc_now()
    return Period(year=now.year, month=now.month, day=now.day)

format_partition_name(table_name, period)

Format partition name: table__YYYY_MM_DD.

Source code in pg_partsmith/strategies/day.py
def format_partition_name(self, table_name: str, period: Period) -> str:
    """Format partition name: ``table__YYYY_MM_DD``."""
    if period.month is None or period.day is None:
        msg = "Month and day are required for DayPeriodCalculator"
        raise ValueError(msg)
    return f"{table_name}__{period.year:04d}_{period.month:02d}_{period.day:02d}"

get_boundaries(period)

Get day boundaries as (start_date, end_date) in ISO format.

Source code in pg_partsmith/strategies/day.py
def get_boundaries(self, period: Period) -> tuple[str, str]:
    """Get day boundaries as ``(start_date, end_date)`` in ISO format."""
    if period.month is None or period.day is None:
        msg = "Month and day are required for DayPeriodCalculator"
        raise ValueError(msg)

    start_date = datetime(period.year, period.month, period.day, tzinfo=UTC)
    end_date = start_date + timedelta(days=1)

    return (start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))

next_periods(count)

Generate N periods starting from the current period (inclusive).

Source code in pg_partsmith/strategies/base.py
def next_periods(self, count: int) -> list[Period]:
    """Generate N periods starting from the current period (inclusive)."""
    if count <= 0:
        msg = "Count must be positive"
        raise ValueError(msg)

    current = self.current_period()
    return [self.period_after(current, i) for i in range(count)]

parse_partition_name(partition_name)

Parse period from a partition name.

Returns None if the name does not match _NAME_PATTERN or encodes an invalid calendar value (e.g. month 13).

Source code in pg_partsmith/strategies/base.py
def parse_partition_name(self, partition_name: str) -> Period | None:
    """Parse period from a partition name.

    Returns ``None`` if the name does not match ``_NAME_PATTERN`` or encodes
    an invalid calendar value (e.g. month 13).
    """
    match = self._NAME_PATTERN.match(partition_name)
    if not match:
        return None
    try:
        return self._period_from_match(match)
    except ValueError:
        return None

period_after(reference, offset)

Return the period offset steps after reference.

Source code in pg_partsmith/strategies/base.py
def period_after(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps after ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference + offset

period_before(reference, offset)

Return the period offset steps before reference.

Source code in pg_partsmith/strategies/base.py
def period_before(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps before ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference - offset

Bases: BasePeriodCalculator

Calculator for weekly partitions.

Generates partitions with ISO-week granularity. Partition naming: {table}__{YYYY}_w{WW}

Source code in pg_partsmith/strategies/week.py
class WeekPeriodCalculator(BasePeriodCalculator):
    """Calculator for weekly partitions.

    Generates partitions with ISO-week granularity.
    Partition naming: ``{table}__{YYYY}_w{WW}``
    """

    _NAME_PATTERN: ClassVar[re.Pattern[str]] = re.compile(r"^(.+)__(\d{4})_w(\d{2})$")

    def current_period(self) -> Period:
        """Get current ISO-week period."""
        now = utc_now()
        iso_year, iso_week, _ = now.isocalendar()
        return Period(year=iso_year, week=iso_week)

    def format_partition_name(self, table_name: str, period: Period) -> str:
        """Format partition name: ``table__YYYY_wWW``."""
        if period.week is None:
            msg = "Week is required for WeekPeriodCalculator"
            raise ValueError(msg)
        return f"{table_name}__{period.year:04d}_w{period.week:02d}"

    def _period_from_match(self, match: re.Match[str]) -> Period:
        return Period(year=int(match.group(2)), week=int(match.group(3)))

    def get_boundaries(self, period: Period) -> tuple[str, str]:
        """Get ISO-week boundaries (Monday to Monday) as ISO date strings."""
        if period.week is None:
            msg = "Week is required for WeekPeriodCalculator"
            raise ValueError(msg)

        date_str = f"{period.year:04d}-W{period.week:02d}-1"
        start_date = datetime.strptime(date_str, "%G-W%V-%u").replace(tzinfo=UTC)
        end_date = start_date + timedelta(weeks=1)

        return (start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))

current_period()

Get current ISO-week period.

Source code in pg_partsmith/strategies/week.py
def current_period(self) -> Period:
    """Get current ISO-week period."""
    now = utc_now()
    iso_year, iso_week, _ = now.isocalendar()
    return Period(year=iso_year, week=iso_week)

format_partition_name(table_name, period)

Format partition name: table__YYYY_wWW.

Source code in pg_partsmith/strategies/week.py
def format_partition_name(self, table_name: str, period: Period) -> str:
    """Format partition name: ``table__YYYY_wWW``."""
    if period.week is None:
        msg = "Week is required for WeekPeriodCalculator"
        raise ValueError(msg)
    return f"{table_name}__{period.year:04d}_w{period.week:02d}"

get_boundaries(period)

Get ISO-week boundaries (Monday to Monday) as ISO date strings.

Source code in pg_partsmith/strategies/week.py
def get_boundaries(self, period: Period) -> tuple[str, str]:
    """Get ISO-week boundaries (Monday to Monday) as ISO date strings."""
    if period.week is None:
        msg = "Week is required for WeekPeriodCalculator"
        raise ValueError(msg)

    date_str = f"{period.year:04d}-W{period.week:02d}-1"
    start_date = datetime.strptime(date_str, "%G-W%V-%u").replace(tzinfo=UTC)
    end_date = start_date + timedelta(weeks=1)

    return (start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))

next_periods(count)

Generate N periods starting from the current period (inclusive).

Source code in pg_partsmith/strategies/base.py
def next_periods(self, count: int) -> list[Period]:
    """Generate N periods starting from the current period (inclusive)."""
    if count <= 0:
        msg = "Count must be positive"
        raise ValueError(msg)

    current = self.current_period()
    return [self.period_after(current, i) for i in range(count)]

parse_partition_name(partition_name)

Parse period from a partition name.

Returns None if the name does not match _NAME_PATTERN or encodes an invalid calendar value (e.g. month 13).

Source code in pg_partsmith/strategies/base.py
def parse_partition_name(self, partition_name: str) -> Period | None:
    """Parse period from a partition name.

    Returns ``None`` if the name does not match ``_NAME_PATTERN`` or encodes
    an invalid calendar value (e.g. month 13).
    """
    match = self._NAME_PATTERN.match(partition_name)
    if not match:
        return None
    try:
        return self._period_from_match(match)
    except ValueError:
        return None

period_after(reference, offset)

Return the period offset steps after reference.

Source code in pg_partsmith/strategies/base.py
def period_after(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps after ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference + offset

period_before(reference, offset)

Return the period offset steps before reference.

Source code in pg_partsmith/strategies/base.py
def period_before(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps before ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference - offset

Bases: BasePeriodCalculator

Calculator for monthly partitions.

Generates partitions with month granularity. Partition naming: {table}__{YYYY}_{MM}

Source code in pg_partsmith/strategies/month.py
class MonthPeriodCalculator(BasePeriodCalculator):
    """Calculator for monthly partitions.

    Generates partitions with month granularity.
    Partition naming: ``{table}__{YYYY}_{MM}``
    """

    _NAME_PATTERN: ClassVar[re.Pattern[str]] = re.compile(r"^(.+)__(\d{4})_(\d{2})$")

    def current_period(self) -> Period:
        """Get current month period."""
        now = utc_now()
        return Period(year=now.year, month=now.month)

    def format_partition_name(self, table_name: str, period: Period) -> str:
        """Format partition name: ``table__YYYY_MM``."""
        if period.month is None:
            msg = "Month is required for MonthPeriodCalculator"
            raise ValueError(msg)
        return f"{table_name}__{period.year:04d}_{period.month:02d}"

    def _period_from_match(self, match: re.Match[str]) -> Period:
        return Period(year=int(match.group(2)), month=int(match.group(3)))

    def get_boundaries(self, period: Period) -> tuple[str, str]:
        """Get month boundaries as ``(start_date, end_date)`` in ISO format."""
        if period.month is None:
            msg = "Month is required for MonthPeriodCalculator"
            raise ValueError(msg)

        start_date = datetime(period.year, period.month, 1, tzinfo=UTC)
        end_date = start_date + relativedelta(months=1)

        return (start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))

current_period()

Get current month period.

Source code in pg_partsmith/strategies/month.py
def current_period(self) -> Period:
    """Get current month period."""
    now = utc_now()
    return Period(year=now.year, month=now.month)

format_partition_name(table_name, period)

Format partition name: table__YYYY_MM.

Source code in pg_partsmith/strategies/month.py
def format_partition_name(self, table_name: str, period: Period) -> str:
    """Format partition name: ``table__YYYY_MM``."""
    if period.month is None:
        msg = "Month is required for MonthPeriodCalculator"
        raise ValueError(msg)
    return f"{table_name}__{period.year:04d}_{period.month:02d}"

get_boundaries(period)

Get month boundaries as (start_date, end_date) in ISO format.

Source code in pg_partsmith/strategies/month.py
def get_boundaries(self, period: Period) -> tuple[str, str]:
    """Get month boundaries as ``(start_date, end_date)`` in ISO format."""
    if period.month is None:
        msg = "Month is required for MonthPeriodCalculator"
        raise ValueError(msg)

    start_date = datetime(period.year, period.month, 1, tzinfo=UTC)
    end_date = start_date + relativedelta(months=1)

    return (start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))

next_periods(count)

Generate N periods starting from the current period (inclusive).

Source code in pg_partsmith/strategies/base.py
def next_periods(self, count: int) -> list[Period]:
    """Generate N periods starting from the current period (inclusive)."""
    if count <= 0:
        msg = "Count must be positive"
        raise ValueError(msg)

    current = self.current_period()
    return [self.period_after(current, i) for i in range(count)]

parse_partition_name(partition_name)

Parse period from a partition name.

Returns None if the name does not match _NAME_PATTERN or encodes an invalid calendar value (e.g. month 13).

Source code in pg_partsmith/strategies/base.py
def parse_partition_name(self, partition_name: str) -> Period | None:
    """Parse period from a partition name.

    Returns ``None`` if the name does not match ``_NAME_PATTERN`` or encodes
    an invalid calendar value (e.g. month 13).
    """
    match = self._NAME_PATTERN.match(partition_name)
    if not match:
        return None
    try:
        return self._period_from_match(match)
    except ValueError:
        return None

period_after(reference, offset)

Return the period offset steps after reference.

Source code in pg_partsmith/strategies/base.py
def period_after(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps after ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference + offset

period_before(reference, offset)

Return the period offset steps before reference.

Source code in pg_partsmith/strategies/base.py
def period_before(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps before ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference - offset

Bases: BasePeriodCalculator

Calculator for yearly partitions.

Generates partitions with year granularity. Partition naming: {table}__{YYYY}

Source code in pg_partsmith/strategies/year.py
class YearPeriodCalculator(BasePeriodCalculator):
    """Calculator for yearly partitions.

    Generates partitions with year granularity.
    Partition naming: ``{table}__{YYYY}``
    """

    _NAME_PATTERN: ClassVar[re.Pattern[str]] = re.compile(r"^(.+)__(\d{4})$")

    def current_period(self) -> Period:
        """Get current year period."""
        now = utc_now()
        return Period(year=now.year)

    def format_partition_name(self, table_name: str, period: Period) -> str:
        """Format partition name: ``table__YYYY``."""
        return f"{table_name}__{period.year:04d}"

    def _period_from_match(self, match: re.Match[str]) -> Period:
        return Period(year=int(match.group(2)))

    def get_boundaries(self, period: Period) -> tuple[str, str]:
        """Get year boundaries as ``(start_date, end_date)`` in ISO format."""
        start_date = datetime(period.year, 1, 1, tzinfo=UTC)
        end_date = start_date + relativedelta(years=1)

        return (start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))

current_period()

Get current year period.

Source code in pg_partsmith/strategies/year.py
def current_period(self) -> Period:
    """Get current year period."""
    now = utc_now()
    return Period(year=now.year)

format_partition_name(table_name, period)

Format partition name: table__YYYY.

Source code in pg_partsmith/strategies/year.py
def format_partition_name(self, table_name: str, period: Period) -> str:
    """Format partition name: ``table__YYYY``."""
    return f"{table_name}__{period.year:04d}"

get_boundaries(period)

Get year boundaries as (start_date, end_date) in ISO format.

Source code in pg_partsmith/strategies/year.py
def get_boundaries(self, period: Period) -> tuple[str, str]:
    """Get year boundaries as ``(start_date, end_date)`` in ISO format."""
    start_date = datetime(period.year, 1, 1, tzinfo=UTC)
    end_date = start_date + relativedelta(years=1)

    return (start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))

next_periods(count)

Generate N periods starting from the current period (inclusive).

Source code in pg_partsmith/strategies/base.py
def next_periods(self, count: int) -> list[Period]:
    """Generate N periods starting from the current period (inclusive)."""
    if count <= 0:
        msg = "Count must be positive"
        raise ValueError(msg)

    current = self.current_period()
    return [self.period_after(current, i) for i in range(count)]

parse_partition_name(partition_name)

Parse period from a partition name.

Returns None if the name does not match _NAME_PATTERN or encodes an invalid calendar value (e.g. month 13).

Source code in pg_partsmith/strategies/base.py
def parse_partition_name(self, partition_name: str) -> Period | None:
    """Parse period from a partition name.

    Returns ``None`` if the name does not match ``_NAME_PATTERN`` or encodes
    an invalid calendar value (e.g. month 13).
    """
    match = self._NAME_PATTERN.match(partition_name)
    if not match:
        return None
    try:
        return self._period_from_match(match)
    except ValueError:
        return None

period_after(reference, offset)

Return the period offset steps after reference.

Source code in pg_partsmith/strategies/base.py
def period_after(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps after ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference + offset

period_before(reference, offset)

Return the period offset steps before reference.

Source code in pg_partsmith/strategies/base.py
def period_before(self, reference: Period, offset: int) -> Period:
    """Return the period ``offset`` steps before ``reference``."""
    if offset < 0:
        msg = "Offset must be non-negative"
        raise ValueError(msg)

    return reference - offset

pg_partsmith.aio

Async implementations: service, maintainer, repositories, lock managers, and hooks.

Service and maintainer

Service for managing the full partition lifecycle.

Orchestrates partition creation, detachment, and deletion by delegating to specialized component services.

Source code in pg_partsmith/aio/service.py
class PartitionLifecycleService:
    """Service for managing the full partition lifecycle.

    Orchestrates partition creation, detachment, and deletion by delegating
    to specialized component services.
    """

    def __init__(
        self,
        repo: PartitionRepository,
        metadata: PartitionMetadataProvider,
        locks: LockManager,
        period_calculator: PeriodCalculator[Period],
        hooks: list[PartitionLifecycleHooks] | None = None,
    ) -> None:
        """Initialize the partition lifecycle service.

        Args:
            repo: DDL operations on partitions (create / attach / detach / drop).
            metadata: Read-only access to PostgreSQL catalog data.
            locks: Distributed lock manager preventing concurrent maintenance runs.
            period_calculator: Strategy for determining partition names and boundaries.
            hooks: Optional list of lifecycle hooks called around each step.
        """
        self._locks = locks
        self._metadata = metadata

        # Component services
        self._validation_service = PartitionValidationService(metadata)
        self._creation_service = PartitionCreationService(repo, metadata, period_calculator, hooks)
        self._pruning_service = PartitionPruningService(metadata, period_calculator)
        self._detachment_service = PartitionDetachmentService(repo, hooks)
        self._deletion_service = PartitionDeletionService(repo, hooks)

    async def create_future_partitions(self, config: TablePartitionConfig) -> list[PartitionInfo]:
        """Create partitions for future periods.

        Ensures partitions exist for the next ``config.create_ahead_count`` periods
        starting from the current period (inclusive). Idempotent: existing partitions
        are skipped.

        Args:
            config: Table partitioning configuration.

        Returns:
            List of newly created partitions (empty if all already existed).

        Raises:
            PartitionAlreadyExistsError: If a partition exists with conflicting boundaries.
            InvalidPartitionConfigError: If ``config`` is incompatible with the parent table.
        """
        return await self._creation_service.create_future_partitions(config)

    async def get_partitions_for_pruning(self, config: TablePartitionConfig) -> list[PartitionInfo]:
        """Return partitions older than ``config.retention_count`` periods.

        Args:
            config: Table partitioning configuration.

        Returns:
            Partitions that are eligible for detach + drop, sorted oldest first.
        """
        return await self._pruning_service.get_partitions_for_pruning(config)

    async def detach_old_partitions(
        self,
        table_name: str,
        partitions: list[PartitionInfo],
    ) -> list[str]:
        """Detach attached partitions from their parent table.

        Args:
            table_name: Qualified parent table name.
            partitions: Attached partitions to detach.

        Returns:
            Names of successfully detached partitions.

        Raises:
            PartitionDetachInProgressError: If a concurrent detach is in progress.
        """
        return await self._detachment_service.detach_old_partitions(table_name, partitions)

    async def drop_detached_partitions(
        self,
        table_name: str,
        partition_names: list[str],
    ) -> int:
        """Drop previously detached, marker-tagged partitions.

        Attached partitions are skipped with a warning (they raise
        ``PartitionAttachedError`` internally). Unmanaged tables are refused
        unless the underlying repository is configured otherwise.

        Args:
            table_name: Qualified parent table name (used for hook context).
            partition_names: Names of partitions to drop.

        Returns:
            Number of partitions actually dropped.
        """
        return await self._deletion_service.drop_detached_partitions(table_name, partition_names)

    async def maintain_lifecycle(
        self,
        config: TablePartitionConfig,
        *,
        skip_create: bool = False,
        skip_detach: bool = False,
        skip_drop: bool = False,
    ) -> MaintenanceResult:
        """Run create + detach + drop in a single locked maintenance window.

        The whole sequence runs under a single distributed lock acquired through
        the configured :class:`LockManager`, so concurrent maintainers do not
        race on the same parent table.

        Args:
            config: Table partitioning configuration.
            skip_create: Skip the create-ahead step.
            skip_detach: Skip detaching old partitions (orphans are still dropped).
            skip_drop: Skip dropping detached partitions.

        Returns:
            ``MaintenanceResult`` with the per-step counters; ``error`` is unset
            because exceptions propagate from this method (the maintainer is
            responsible for catching them).

        Raises:
            LockAcquisitionError: If the table-level maintenance lock is unavailable.
            InvalidPartitionConfigError: If ``config`` does not match the parent table.
        """
        qualified_parent = qualify(config.db_schema, config.table_name)

        created_count = 0
        detached_count = 0
        dropped_count = 0

        async with self._locks.acquire_lock(qualified_parent):
            await self._validation_service.validate_config(config)

            # Optimization: fetch all partitions once
            all_partitions = await self._metadata.list_partitions(qualified_parent)

            if not skip_create:
                created = await self._creation_service.create_future_partitions(
                    config, existing_partitions=all_partitions
                )
                created_count = len(created)
                if created:
                    all_partitions.extend(created)

            partitions_to_prune = await self._pruning_service.identify_partitions_to_prune(config, all_partitions)

            if not partitions_to_prune:
                return MaintenanceResult(created_count=created_count)

            attached_to_detach = [p for p in partitions_to_prune if p.is_attached]
            orphan_names = [p.name for p in partitions_to_prune if not p.is_attached]

            names_to_drop = orphan_names
            if not skip_detach:
                detached_names = await self._detachment_service.detach_old_partitions(
                    qualified_parent,
                    attached_to_detach,
                )
                detached_count = len(detached_names)
                names_to_drop = orphan_names + detached_names

            if not skip_drop and names_to_drop:
                dropped_count = await self._deletion_service.drop_detached_partitions(
                    qualified_parent,
                    names_to_drop,
                )

        return MaintenanceResult(
            created_count=created_count,
            detached_count=detached_count,
            dropped_count=dropped_count,
        )

__init__(repo, metadata, locks, period_calculator, hooks=None)

Initialize the partition lifecycle service.

Parameters:

Name Type Description Default
repo PartitionRepository

DDL operations on partitions (create / attach / detach / drop).

required
metadata PartitionMetadataProvider

Read-only access to PostgreSQL catalog data.

required
locks LockManager

Distributed lock manager preventing concurrent maintenance runs.

required
period_calculator PeriodCalculator[Period]

Strategy for determining partition names and boundaries.

required
hooks list[PartitionLifecycleHooks] | None

Optional list of lifecycle hooks called around each step.

None
Source code in pg_partsmith/aio/service.py
def __init__(
    self,
    repo: PartitionRepository,
    metadata: PartitionMetadataProvider,
    locks: LockManager,
    period_calculator: PeriodCalculator[Period],
    hooks: list[PartitionLifecycleHooks] | None = None,
) -> None:
    """Initialize the partition lifecycle service.

    Args:
        repo: DDL operations on partitions (create / attach / detach / drop).
        metadata: Read-only access to PostgreSQL catalog data.
        locks: Distributed lock manager preventing concurrent maintenance runs.
        period_calculator: Strategy for determining partition names and boundaries.
        hooks: Optional list of lifecycle hooks called around each step.
    """
    self._locks = locks
    self._metadata = metadata

    # Component services
    self._validation_service = PartitionValidationService(metadata)
    self._creation_service = PartitionCreationService(repo, metadata, period_calculator, hooks)
    self._pruning_service = PartitionPruningService(metadata, period_calculator)
    self._detachment_service = PartitionDetachmentService(repo, hooks)
    self._deletion_service = PartitionDeletionService(repo, hooks)

create_future_partitions(config) async

Create partitions for future periods.

Ensures partitions exist for the next config.create_ahead_count periods starting from the current period (inclusive). Idempotent: existing partitions are skipped.

Parameters:

Name Type Description Default
config TablePartitionConfig

Table partitioning configuration.

required

Returns:

Type Description
list[PartitionInfo]

List of newly created partitions (empty if all already existed).

Raises:

Type Description
PartitionAlreadyExistsError

If a partition exists with conflicting boundaries.

InvalidPartitionConfigError

If config is incompatible with the parent table.

Source code in pg_partsmith/aio/service.py
async def create_future_partitions(self, config: TablePartitionConfig) -> list[PartitionInfo]:
    """Create partitions for future periods.

    Ensures partitions exist for the next ``config.create_ahead_count`` periods
    starting from the current period (inclusive). Idempotent: existing partitions
    are skipped.

    Args:
        config: Table partitioning configuration.

    Returns:
        List of newly created partitions (empty if all already existed).

    Raises:
        PartitionAlreadyExistsError: If a partition exists with conflicting boundaries.
        InvalidPartitionConfigError: If ``config`` is incompatible with the parent table.
    """
    return await self._creation_service.create_future_partitions(config)

detach_old_partitions(table_name, partitions) async

Detach attached partitions from their parent table.

Parameters:

Name Type Description Default
table_name str

Qualified parent table name.

required
partitions list[PartitionInfo]

Attached partitions to detach.

required

Returns:

Type Description
list[str]

Names of successfully detached partitions.

Raises:

Type Description
PartitionDetachInProgressError

If a concurrent detach is in progress.

Source code in pg_partsmith/aio/service.py
async def detach_old_partitions(
    self,
    table_name: str,
    partitions: list[PartitionInfo],
) -> list[str]:
    """Detach attached partitions from their parent table.

    Args:
        table_name: Qualified parent table name.
        partitions: Attached partitions to detach.

    Returns:
        Names of successfully detached partitions.

    Raises:
        PartitionDetachInProgressError: If a concurrent detach is in progress.
    """
    return await self._detachment_service.detach_old_partitions(table_name, partitions)

drop_detached_partitions(table_name, partition_names) async

Drop previously detached, marker-tagged partitions.

Attached partitions are skipped with a warning (they raise PartitionAttachedError internally). Unmanaged tables are refused unless the underlying repository is configured otherwise.

Parameters:

Name Type Description Default
table_name str

Qualified parent table name (used for hook context).

required
partition_names list[str]

Names of partitions to drop.

required

Returns:

Type Description
int

Number of partitions actually dropped.

Source code in pg_partsmith/aio/service.py
async def drop_detached_partitions(
    self,
    table_name: str,
    partition_names: list[str],
) -> int:
    """Drop previously detached, marker-tagged partitions.

    Attached partitions are skipped with a warning (they raise
    ``PartitionAttachedError`` internally). Unmanaged tables are refused
    unless the underlying repository is configured otherwise.

    Args:
        table_name: Qualified parent table name (used for hook context).
        partition_names: Names of partitions to drop.

    Returns:
        Number of partitions actually dropped.
    """
    return await self._deletion_service.drop_detached_partitions(table_name, partition_names)

get_partitions_for_pruning(config) async

Return partitions older than config.retention_count periods.

Parameters:

Name Type Description Default
config TablePartitionConfig

Table partitioning configuration.

required

Returns:

Type Description
list[PartitionInfo]

Partitions that are eligible for detach + drop, sorted oldest first.

Source code in pg_partsmith/aio/service.py
async def get_partitions_for_pruning(self, config: TablePartitionConfig) -> list[PartitionInfo]:
    """Return partitions older than ``config.retention_count`` periods.

    Args:
        config: Table partitioning configuration.

    Returns:
        Partitions that are eligible for detach + drop, sorted oldest first.
    """
    return await self._pruning_service.get_partitions_for_pruning(config)

maintain_lifecycle(config, *, skip_create=False, skip_detach=False, skip_drop=False) async

Run create + detach + drop in a single locked maintenance window.

The whole sequence runs under a single distributed lock acquired through the configured :class:LockManager, so concurrent maintainers do not race on the same parent table.

Parameters:

Name Type Description Default
config TablePartitionConfig

Table partitioning configuration.

required
skip_create bool

Skip the create-ahead step.

False
skip_detach bool

Skip detaching old partitions (orphans are still dropped).

False
skip_drop bool

Skip dropping detached partitions.

False

Returns:

Type Description
MaintenanceResult

MaintenanceResult with the per-step counters; error is unset

MaintenanceResult

because exceptions propagate from this method (the maintainer is

MaintenanceResult

responsible for catching them).

Raises:

Type Description
LockAcquisitionError

If the table-level maintenance lock is unavailable.

InvalidPartitionConfigError

If config does not match the parent table.

Source code in pg_partsmith/aio/service.py
async def maintain_lifecycle(
    self,
    config: TablePartitionConfig,
    *,
    skip_create: bool = False,
    skip_detach: bool = False,
    skip_drop: bool = False,
) -> MaintenanceResult:
    """Run create + detach + drop in a single locked maintenance window.

    The whole sequence runs under a single distributed lock acquired through
    the configured :class:`LockManager`, so concurrent maintainers do not
    race on the same parent table.

    Args:
        config: Table partitioning configuration.
        skip_create: Skip the create-ahead step.
        skip_detach: Skip detaching old partitions (orphans are still dropped).
        skip_drop: Skip dropping detached partitions.

    Returns:
        ``MaintenanceResult`` with the per-step counters; ``error`` is unset
        because exceptions propagate from this method (the maintainer is
        responsible for catching them).

    Raises:
        LockAcquisitionError: If the table-level maintenance lock is unavailable.
        InvalidPartitionConfigError: If ``config`` does not match the parent table.
    """
    qualified_parent = qualify(config.db_schema, config.table_name)

    created_count = 0
    detached_count = 0
    dropped_count = 0

    async with self._locks.acquire_lock(qualified_parent):
        await self._validation_service.validate_config(config)

        # Optimization: fetch all partitions once
        all_partitions = await self._metadata.list_partitions(qualified_parent)

        if not skip_create:
            created = await self._creation_service.create_future_partitions(
                config, existing_partitions=all_partitions
            )
            created_count = len(created)
            if created:
                all_partitions.extend(created)

        partitions_to_prune = await self._pruning_service.identify_partitions_to_prune(config, all_partitions)

        if not partitions_to_prune:
            return MaintenanceResult(created_count=created_count)

        attached_to_detach = [p for p in partitions_to_prune if p.is_attached]
        orphan_names = [p.name for p in partitions_to_prune if not p.is_attached]

        names_to_drop = orphan_names
        if not skip_detach:
            detached_names = await self._detachment_service.detach_old_partitions(
                qualified_parent,
                attached_to_detach,
            )
            detached_count = len(detached_names)
            names_to_drop = orphan_names + detached_names

        if not skip_drop and names_to_drop:
            dropped_count = await self._deletion_service.drop_detached_partitions(
                qualified_parent,
                names_to_drop,
            )

    return MaintenanceResult(
        created_count=created_count,
        detached_count=detached_count,
        dropped_count=dropped_count,
    )

Orchestrator for partition lifecycle maintenance.

Wraps a lifecycle service with timing, logging, and error handling. Operational failures are logged and re-raised by run_maintenance. Use run_maintenance_safe (or the maintain_partitions helper) when you need a scheduler-friendly API that always returns MaintenanceResult.

Source code in pg_partsmith/aio/maintainer.py
class PartitionMaintainer:
    """Orchestrator for partition lifecycle maintenance.

    Wraps a lifecycle service with timing, logging, and error handling.
    Operational failures are logged and re-raised by ``run_maintenance``.
    Use ``run_maintenance_safe`` (or the ``maintain_partitions`` helper) when
    you need a scheduler-friendly API that always returns ``MaintenanceResult``.
    """

    def __init__(
        self,
        lifecycle_service: PartitionLifecycle,
    ) -> None:
        """Initialize maintainer.

        Args:
            lifecycle_service: Partition lifecycle service.
        """
        self._service = lifecycle_service

    async def run_maintenance(
        self,
        config: TablePartitionConfig,
        *,
        skip_create: bool = False,
        skip_detach: bool = False,
        skip_drop: bool = False,
    ) -> MaintenanceResult:
        """Execute full partition lifecycle maintenance.

        Args:
            config: Table partition configuration.
            skip_create: Skip creating future partitions.
            skip_detach: Skip detaching old partitions.
            skip_drop: Skip dropping detached partitions.

        Returns:
            Maintenance result with counts and duration.

        Raises:
            asyncio.CancelledError: Propagated after being logged.
            Exception: Propagated after being logged.
        """
        start_time = time.perf_counter()
        qualified_table = qualify(config.db_schema, config.table_name)

        logger.info(
            "Starting partition maintenance",
            extra={
                "table_name": qualified_table,
                "skip_create": skip_create,
                "skip_detach": skip_detach,
                "skip_drop": skip_drop,
            },
        )

        try:
            result = await self._service.maintain_lifecycle(
                config, skip_create=skip_create, skip_detach=skip_detach, skip_drop=skip_drop
            )

            duration_ms = int((time.perf_counter() - start_time) * 1000)
            result = result.model_copy(update={"duration_ms": duration_ms})

            logger.info(
                "Partition maintenance completed successfully",
                extra={
                    "table_name": qualified_table,
                    "created_count": result.created_count,
                    "detached_count": result.detached_count,
                    "dropped_count": result.dropped_count,
                    "duration_ms": duration_ms,
                    "duration": format_duration_ms(duration_ms),
                },
            )
        except (asyncio.CancelledError, KeyboardInterrupt, SystemExit):
            duration_ms = int((time.perf_counter() - start_time) * 1000)

            logger.info(
                "Partition maintenance was interrupted by system signal",
                extra={
                    "table_name": qualified_table,
                    "duration_ms": duration_ms,
                },
            )
            raise
        except (RuntimeError, ValueError) as e:
            duration_ms = int((time.perf_counter() - start_time) * 1000)
            logger.warning(
                "Partition maintenance failed (recoverable error)",
                extra={
                    "table_name": qualified_table,
                    "duration_ms": duration_ms,
                    "error": str(e),
                    "error_type": type(e).__name__,
                },
            )
            raise
        except Exception:
            duration_ms = int((time.perf_counter() - start_time) * 1000)

            logger.exception(
                "Partition maintenance raised unexpected exception",
                extra={
                    "table_name": qualified_table,
                    "duration_ms": duration_ms,
                },
            )
            raise
        else:
            return result

    async def run_maintenance_safe(
        self,
        config: TablePartitionConfig,
        *,
        skip_create: bool = False,
        skip_detach: bool = False,
        skip_drop: bool = False,
    ) -> MaintenanceResult:
        """Run maintenance and always return ``MaintenanceResult``, never raise.

        Scheduler-friendly wrapper around :meth:`run_maintenance`. Any exception
        — including ``asyncio.CancelledError`` — is captured and reported via
        ``result.error``; the ``duration_ms`` field always reflects the elapsed
        time even on failure.

        Args:
            config: Table partitioning configuration.
            skip_create: Skip creating future partitions.
            skip_detach: Skip detaching old partitions.
            skip_drop: Skip dropping detached partitions.

        Returns:
            ``MaintenanceResult`` with counts on success or ``error`` set on failure.
        """
        start_time = time.perf_counter()
        try:
            return await self.run_maintenance(
                config,
                skip_create=skip_create,
                skip_detach=skip_detach,
                skip_drop=skip_drop,
            )
        except (asyncio.CancelledError, KeyboardInterrupt, SystemExit) as e:
            duration_ms = int((time.perf_counter() - start_time) * 1000)
            msg = str(e)
            error = f"{type(e).__name__}: {msg}" if msg else type(e).__name__
            return MaintenanceResult(duration_ms=duration_ms, error=error)
        except Exception as e:
            duration_ms = int((time.perf_counter() - start_time) * 1000)
            msg = str(e)
            error = f"{type(e).__name__}: {msg}" if msg else type(e).__name__
            return MaintenanceResult(duration_ms=duration_ms, error=error)

run_maintenance(config, *, skip_create=False, skip_detach=False, skip_drop=False) async

Execute full partition lifecycle maintenance.

Parameters:

Name Type Description Default
config TablePartitionConfig

Table partition configuration.

required
skip_create bool

Skip creating future partitions.

False
skip_detach bool

Skip detaching old partitions.

False
skip_drop bool

Skip dropping detached partitions.

False

Returns:

Type Description
MaintenanceResult

Maintenance result with counts and duration.

Raises:

Type Description
CancelledError

Propagated after being logged.

Exception

Propagated after being logged.

Source code in pg_partsmith/aio/maintainer.py
async def run_maintenance(
    self,
    config: TablePartitionConfig,
    *,
    skip_create: bool = False,
    skip_detach: bool = False,
    skip_drop: bool = False,
) -> MaintenanceResult:
    """Execute full partition lifecycle maintenance.

    Args:
        config: Table partition configuration.
        skip_create: Skip creating future partitions.
        skip_detach: Skip detaching old partitions.
        skip_drop: Skip dropping detached partitions.

    Returns:
        Maintenance result with counts and duration.

    Raises:
        asyncio.CancelledError: Propagated after being logged.
        Exception: Propagated after being logged.
    """
    start_time = time.perf_counter()
    qualified_table = qualify(config.db_schema, config.table_name)

    logger.info(
        "Starting partition maintenance",
        extra={
            "table_name": qualified_table,
            "skip_create": skip_create,
            "skip_detach": skip_detach,
            "skip_drop": skip_drop,
        },
    )

    try:
        result = await self._service.maintain_lifecycle(
            config, skip_create=skip_create, skip_detach=skip_detach, skip_drop=skip_drop
        )

        duration_ms = int((time.perf_counter() - start_time) * 1000)
        result = result.model_copy(update={"duration_ms": duration_ms})

        logger.info(
            "Partition maintenance completed successfully",
            extra={
                "table_name": qualified_table,
                "created_count": result.created_count,
                "detached_count": result.detached_count,
                "dropped_count": result.dropped_count,
                "duration_ms": duration_ms,
                "duration": format_duration_ms(duration_ms),
            },
        )
    except (asyncio.CancelledError, KeyboardInterrupt, SystemExit):
        duration_ms = int((time.perf_counter() - start_time) * 1000)

        logger.info(
            "Partition maintenance was interrupted by system signal",
            extra={
                "table_name": qualified_table,
                "duration_ms": duration_ms,
            },
        )
        raise
    except (RuntimeError, ValueError) as e:
        duration_ms = int((time.perf_counter() - start_time) * 1000)
        logger.warning(
            "Partition maintenance failed (recoverable error)",
            extra={
                "table_name": qualified_table,
                "duration_ms": duration_ms,
                "error": str(e),
                "error_type": type(e).__name__,
            },
        )
        raise
    except Exception:
        duration_ms = int((time.perf_counter() - start_time) * 1000)

        logger.exception(
            "Partition maintenance raised unexpected exception",
            extra={
                "table_name": qualified_table,
                "duration_ms": duration_ms,
            },
        )
        raise
    else:
        return result

run_maintenance_safe(config, *, skip_create=False, skip_detach=False, skip_drop=False) async

Run maintenance and always return MaintenanceResult, never raise.

Scheduler-friendly wrapper around :meth:run_maintenance. Any exception — including asyncio.CancelledError — is captured and reported via result.error; the duration_ms field always reflects the elapsed time even on failure.

Parameters:

Name Type Description Default
config TablePartitionConfig

Table partitioning configuration.

required
skip_create bool

Skip creating future partitions.

False
skip_detach bool

Skip detaching old partitions.

False
skip_drop bool

Skip dropping detached partitions.

False

Returns:

Type Description
MaintenanceResult

MaintenanceResult with counts on success or error set on failure.

Source code in pg_partsmith/aio/maintainer.py
async def run_maintenance_safe(
    self,
    config: TablePartitionConfig,
    *,
    skip_create: bool = False,
    skip_detach: bool = False,
    skip_drop: bool = False,
) -> MaintenanceResult:
    """Run maintenance and always return ``MaintenanceResult``, never raise.

    Scheduler-friendly wrapper around :meth:`run_maintenance`. Any exception
    — including ``asyncio.CancelledError`` — is captured and reported via
    ``result.error``; the ``duration_ms`` field always reflects the elapsed
    time even on failure.

    Args:
        config: Table partitioning configuration.
        skip_create: Skip creating future partitions.
        skip_detach: Skip detaching old partitions.
        skip_drop: Skip dropping detached partitions.

    Returns:
        ``MaintenanceResult`` with counts on success or ``error`` set on failure.
    """
    start_time = time.perf_counter()
    try:
        return await self.run_maintenance(
            config,
            skip_create=skip_create,
            skip_detach=skip_detach,
            skip_drop=skip_drop,
        )
    except (asyncio.CancelledError, KeyboardInterrupt, SystemExit) as e:
        duration_ms = int((time.perf_counter() - start_time) * 1000)
        msg = str(e)
        error = f"{type(e).__name__}: {msg}" if msg else type(e).__name__
        return MaintenanceResult(duration_ms=duration_ms, error=error)
    except Exception as e:
        duration_ms = int((time.perf_counter() - start_time) * 1000)
        msg = str(e)
        error = f"{type(e).__name__}: {msg}" if msg else type(e).__name__
        return MaintenanceResult(duration_ms=duration_ms, error=error)

PostgreSQL implementations

PostgreSQL implementation of partition repository.

Facade that delegates to specialized helper classes for improved maintenance and SRP.

Source code in pg_partsmith/aio/repositories/repository.py
class PostgresPartitionRepository:
    """PostgreSQL implementation of partition repository.

    Facade that delegates to specialized helper classes for improved maintenance and SRP.
    """

    def __init__(
        self,
        engine: AsyncEngine,
        *,
        ddl_timezone: str | None = DEFAULT_DDL_TIMEZONE,
        ddl_timeout_seconds: float = DEFAULT_DDL_TIMEOUT_SECONDS,
        marker_prefix: str | None = None,
        drop_allow_unmanaged: bool = False,
        drop_lock_timeout_ms: int = DEFAULT_DROP_LOCK_TIMEOUT_MS,
        drop_max_retries: int = DEFAULT_DROP_MAX_RETRIES,
        drop_retry_delay: float = DEFAULT_DROP_RETRY_DELAY,
        drop_max_backoff: float = DEFAULT_DROP_MAX_BACKOFF,
    ) -> None:
        self._engine = engine
        self._marker_prefix = orphan_comment_prefix(marker_prefix=marker_prefix)

        # Configuration validation
        self._ddl_timeout_seconds = validate_ddl_timeout(ddl_timeout_seconds)
        self._ddl_timezone = validate_timezone(ddl_timezone)
        self._drop_allow_unmanaged = bool(drop_allow_unmanaged)
        self._drop_lock_timeout_ms = validate_int(drop_lock_timeout_ms, "drop_lock_timeout_ms", min_val=0)
        self._drop_max_retries = validate_int(drop_max_retries, "drop_max_retries", min_val=1)
        self._drop_retry_delay = validate_float(drop_retry_delay, "drop_retry_delay", min_val=0.0)
        self._drop_max_backoff = validate_float(drop_max_backoff, "drop_max_backoff", min_val=0.0)

        # Initialize helpers
        self._resolver = PartitionRelationResolver(engine)
        self._fk_manager = PartitionForeignKeyManager(engine, self._ddl_timeout_seconds)
        self._creator = PartitionCreator(engine, self._ddl_timeout_seconds, self._ddl_timezone)
        self._remover = PartitionRemover(
            engine=engine,
            ddl_timeout=self._ddl_timeout_seconds,
            drop_lock_timeout_ms=self._drop_lock_timeout_ms,
            drop_max_retries=self._drop_max_retries,
            drop_retry_delay=self._drop_retry_delay,
            drop_max_backoff=self._drop_max_backoff,
            marker_prefix=self._marker_prefix,
            resolver=self._resolver,
            fk_manager=self._fk_manager,
            allow_unmanaged=self._drop_allow_unmanaged,
        )

    # Public API delegated to helpers
    async def create_partition(
        self, config: TablePartitionConfig, partition_name: str, from_value: str, to_value: str
    ) -> PartitionInfo:
        return await self._creator.create(config, partition_name, from_value, to_value)

    async def attach_partition(self, table_name: str, partition_name: str, from_value: str, to_value: str) -> None:
        await self._creator.attach(table_name, partition_name, from_value, to_value)

    async def detach_partition(self, table_name: str, partition_name: str, *, concurrent: bool = True) -> None:
        await self._remover.detach(table_name, partition_name, concurrent=concurrent)

    async def drop_partition(self, partition_name: str) -> None:
        await self._remover.drop(partition_name)

    async def partition_exists(self, partition_name: str) -> bool:
        return await self._resolver.exists(partition_name)

    async def is_partition_attached(self, table_name: str, partition_name: str) -> bool:
        return await self._resolver.is_attached(table_name, partition_name)

    async def reconcile_default_rows(
        self,
        *,
        default_partition_name: str,
        target_partition_name: str,
        partition_column: str,
        from_value: str,
        to_value: str,
    ) -> int:
        return await self._creator.reconcile_default_rows(
            default_partition_name=default_partition_name,
            target_partition_name=target_partition_name,
            partition_column=partition_column,
            from_value=from_value,
            to_value=to_value,
        )

Provider for PostgreSQL partition metadata.

Queries pg_catalog to retrieve information about table partitioning. Override any method to customise catalog queries for your schema setup.

Each method opens its own read-only connection from the engine pool so it is safe to call outside any existing transaction.

Source code in pg_partsmith/aio/metadata.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class PostgresMetadataProvider:
    """Provider for PostgreSQL partition metadata.

    Queries pg_catalog to retrieve information about table partitioning.
    Override any method to customise catalog queries for your schema setup.

    Each method opens its own read-only connection from the engine pool so it
    is safe to call outside any existing transaction.
    """

    def __init__(self, engine: AsyncEngine, *, marker_prefix: str | None = None) -> None:
        """Initialize provider.

        Args:
            engine: SQLAlchemy async engine.
            marker_prefix: Optional COMMENT marker prefix for orphaned partitions.
                When None, the library default prefix is used. Pass the same
                value to both repository and metadata provider if you override it.
        """
        self._engine = engine
        self._marker_prefix = orphan_comment_prefix(marker_prefix=marker_prefix)

    @staticmethod
    def _maybe_decode(val: str | bytes | None, encoding: str = "utf-8") -> str | None:
        """Decode bytes to string if needed."""
        if val is None:
            return None
        if isinstance(val, bytes):
            return val.decode(encoding, errors="replace")
        return val

    async def get_partition_type(self, table_name: str) -> PartitionType | None:
        """Get partition type for a table."""
        async with self._engine.connect() as conn:
            result = await conn.execute(
                text(
                    """
                    SELECT partstrat
                    FROM pg_partitioned_table t
                    WHERE t.partrelid = to_regclass(:table_name)
                    """
                ),
                {"table_name": to_regclass_argument(table_name)},
            )
            strat = self._maybe_decode(result.scalar(), encoding="ascii")

        if strat == "r":
            return PartitionType.RANGE
        if strat == "l":
            return PartitionType.LIST
        if strat == "h":
            return PartitionType.HASH
        return None

    async def get_partition_column(self, table_name: str) -> str | None:
        """Get partition column for a table.

        Raises:
            ValueError: If the table uses a composite (multi-column) partition
                key.  Only single-column keys are supported by this library.
        """
        async with self._engine.connect() as conn:
            result = await conn.execute(
                text(
                    """
                    SELECT a.attname
                    FROM pg_partitioned_table t
                    JOIN pg_attribute a ON a.attrelid = t.partrelid AND a.attnum = ANY(t.partattrs)
                    WHERE t.partrelid = to_regclass(:table_name)
                    ORDER BY a.attnum
                    """
                ),
                {"table_name": to_regclass_argument(table_name)},
            )
            rows = result.fetchall()

        if not rows:
            return None

        if len(rows) > 1:
            cols = [r[0] for r in rows]
            msg = (
                f"Table {table_name!r} uses a composite partition key {cols!r}. "
                "Only single-column partition keys are supported."
            )
            raise ValueError(msg)

        return self._maybe_decode(rows[0][0])

    async def list_partitions(self, table_name: str) -> list[PartitionInfo]:
        """List all partitions for a table, including orphaned detached ones.

        Orphaned partitions are detached-but-not-dropped tables previously
        detached by this library. They are detected by a COMMENT marker set on
        successful detach and returned with ``is_attached=False`` and ``None``
        boundaries.
        """
        parent_schema, _ = split_qualified_name(table_name)
        want_qualified = parent_schema is not None

        async with self._engine.connect() as conn:
            # 1. Get partition type and canonical parent name in one query
            parent_info_result = await conn.execute(
                text(
                    """
                    SELECT
                        pt.partstrat,
                        ns.nspname || '.' || c.relname AS qualified_name
                    FROM pg_class c
                    JOIN pg_namespace ns ON c.relnamespace = ns.oid
                    LEFT JOIN pg_partitioned_table pt ON pt.partrelid = c.oid
                    WHERE c.oid = to_regclass(:name)
                    """
                ),
                {"name": to_regclass_argument(table_name)},
            )
            parent_row = parent_info_result.fetchone()
            if not parent_row:
                return []

            strat = self._maybe_decode(parent_row[0], encoding="ascii")
            parent_qualified = self._maybe_decode(parent_row[1]) or table_name

            partition_type: PartitionType | None = None
            if strat == "r":
                partition_type = PartitionType.RANGE
            elif strat == "l":
                partition_type = PartitionType.LIST
            elif strat == "h":
                partition_type = PartitionType.HASH

            if not partition_type:
                return []

            # 2. Get attached partitions
            attached_result = await conn.execute(
                text(
                    """
                    SELECT
                        ns.nspname AS partition_schema,
                        child.relname AS partition_name,
                        pg_get_expr(child.relpartbound, child.oid) AS boundaries,
                        child.relispartition AS is_attached
                    FROM pg_inherits inh
                    JOIN pg_class child ON inh.inhrelid = child.oid
                    JOIN pg_namespace ns ON child.relnamespace = ns.oid
                    WHERE inh.inhparent = to_regclass(:table_name)
                    ORDER BY ns.nspname, child.relname
                    """
                ),
                {"table_name": to_regclass_argument(table_name)},
            )
            attached_rows = attached_result.fetchall()

            # 3. Get orphan partitions
            orphan_result = await conn.execute(
                text(
                    """
                    SELECT
                        ns.nspname AS partition_schema,
                        c.relname AS partition_name
                    FROM pg_class c
                    JOIN pg_namespace ns ON c.relnamespace = ns.oid
                    JOIN pg_description d
                      ON d.objoid = c.oid
                     AND d.classoid = 'pg_class'::regclass
                     AND d.objsubid = 0
                    WHERE c.relkind = 'r'
                      AND c.relispartition = false
                      AND split_part(d.description, E'\\n', 1) = :marker
                       AND NOT EXISTS (
                           SELECT 1
                           FROM pg_inherits inh
                           WHERE inh.inhrelid = c.oid
                       )
                    ORDER BY ns.nspname, c.relname
                    """
                ),
                {
                    "marker": orphan_table_comment(parent_qualified, marker_prefix=self._marker_prefix),
                },
            )
            orphan_rows = orphan_result.fetchall()

        partitions: list[PartitionInfo] = []

        for row in attached_rows:
            relname: str | bytes = row.partition_name
            if isinstance(relname, bytes):
                relname = relname.decode("utf-8", errors="replace")

            part_schema: str | bytes = row.partition_schema
            if isinstance(part_schema, bytes):
                part_schema = part_schema.decode("utf-8", errors="replace")

            name = qualify(part_schema if want_qualified else None, relname)

            boundaries = row.boundaries
            if isinstance(boundaries, bytes):
                boundaries = boundaries.decode("utf-8", errors="replace")

            boundaries_str = str(boundaries) if boundaries is not None else ""
            is_default = boundaries_str.strip().upper() == "DEFAULT"
            from_val, to_val = (None, None) if is_default else self._parse_boundaries(boundaries_str)
            partitions.append(
                PartitionInfo(
                    name=name,
                    partition_type=partition_type,
                    from_value=from_val,
                    to_value=to_val,
                    boundaries_expr=boundaries_str if boundaries_str else None,
                    is_attached=row.is_attached,
                    is_default=is_default,
                    parent_table=table_name,
                )
            )

        for row in orphan_rows:
            orphan_relname: str | bytes = row.partition_name
            if isinstance(orphan_relname, bytes):
                orphan_relname = orphan_relname.decode("utf-8", errors="replace")

            orphan_schema: str | bytes = row.partition_schema
            if isinstance(orphan_schema, bytes):
                orphan_schema = orphan_schema.decode("utf-8", errors="replace")

            name = qualify(orphan_schema if want_qualified else None, orphan_relname)
            partitions.append(
                PartitionInfo(
                    name=name,
                    partition_type=partition_type,
                    from_value=None,
                    to_value=None,
                    is_attached=False,
                    parent_table=table_name,
                )
            )

        return partitions

    async def partition_exists(self, partition_name: str) -> bool:
        """Check if a partition table exists in pg_class.

        Args:
            partition_name: Partition table name.

        Returns:
            True if the table exists as a regular table.
        """
        async with self._engine.connect() as conn:
            result = await conn.execute(
                text(
                    """
                    SELECT EXISTS (
                        SELECT 1
                        FROM pg_class
                        WHERE oid = to_regclass(:partition_name)
                          AND relkind = 'r'
                    )
                    """
                ),
                {"partition_name": to_regclass_argument(partition_name)},
            )
            return bool(result.scalar())

    async def is_partition_attached(self, table_name: str, partition_name: str) -> bool:
        """Check if a partition is currently attached to its parent via pg_inherits.

        Args:
            table_name: Parent table name.
            partition_name: Partition table name.

        Returns:
            True if the partition is attached.
        """
        async with self._engine.connect() as conn:
            result = await conn.execute(
                text(
                    """
                    SELECT EXISTS (
                        SELECT 1
                        FROM pg_inherits inh
                        JOIN pg_class child ON inh.inhrelid = child.oid
                        WHERE inh.inhparent = to_regclass(:table_name)
                          AND inh.inhrelid = to_regclass(:partition_name)
                          AND child.relispartition = true
                    )
                    """
                ),
                {
                    "table_name": to_regclass_argument(table_name),
                    "partition_name": to_regclass_argument(partition_name),
                },
            )
            return bool(result.scalar())

    async def get_partition_boundaries(self, partition_name: str) -> tuple[str, str] | None:
        """Get partition boundaries.

        Args:
            partition_name: Partition table name.

        Returns:
            Tuple of (from_value, to_value) or None if not a range partition.
        """
        async with self._engine.connect() as conn:
            result = await conn.execute(
                text(
                    """
                    SELECT pg_get_expr(relpartbound, oid)
                    FROM pg_class
                    WHERE oid = to_regclass(:partition_name)
                    """
                ),
                {"partition_name": to_regclass_argument(partition_name)},
            )
            boundaries_expr = result.scalar()

        if not boundaries_expr:
            return None

        if isinstance(boundaries_expr, bytes):
            boundaries_expr = boundaries_expr.decode("utf-8", errors="replace")

        from_val, to_val = self._parse_boundaries(boundaries_expr)
        if from_val is not None and to_val is not None:
            return from_val, to_val

        return None

    def _parse_boundaries(self, boundaries_expr: str | None) -> tuple[str | None, str | None]:
        """Parse boundary values from pg_get_expr output.

        ``pg_get_expr(relpartbound, oid)`` can include casts and varying
        whitespace depending on PostgreSQL version and the partition key type.
        This parser aims to extract stable "boundary values" for the most common
        cases, without trying to fully parse SQL expressions.

        Examples:
          FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')
          FOR VALUES FROM ('2024-01-01'::date) TO ('2024-02-01'::date)
          FOR VALUES FROM (1::bigint) TO (5::bigint)
          FOR VALUES FROM (MINVALUE) TO (MAXVALUE)
        """
        if not boundaries_expr:
            return None, None

        if boundaries_expr.strip().upper() == "DEFAULT":
            return None, None

        # Split into FROM and TO parts by finding ") TO (" which is the most
        # reliable separator for range boundaries.
        parts = _BOUNDARY_SEP_PATTERN.split(boundaries_expr)
        if len(parts) != 2:
            return None, None

        from_part = parts[0]
        to_part = parts[1]

        # Strip "FOR VALUES FROM (" prefix from the first part
        from_part = _FROM_PREFIX_PATTERN.sub("", from_part)
        # Strip trailing ")" from the second part
        to_part = _TRAILING_PAREN_PATTERN.sub("", to_part)

        def _strip_outer_parens(s: str) -> str:
            s = s.strip()
            # pg_get_expr sometimes wraps constants in extra parentheses
            while s.startswith("(") and s.endswith(")") and len(s) > 1:
                s = s[1:-1].strip()
            return s

        def _normalize(expr: str) -> str:
            expr = _strip_outer_parens(expr)

            # CAST(x AS type) → x
            cast_match = _CAST_PATTERN.match(expr)
            if cast_match:
                expr = _strip_outer_parens(cast_match.group("inner"))

            # Prefer extracting a string literal if present.
            str_match = _STR_LITERAL_PATTERN.search(expr)
            if str_match:
                return str_match.group("s").replace("''", "'")

            # Strip ::type casts.
            if "::" in expr:
                expr = expr.split("::", 1)[0].strip()

            return expr.strip()

        return _normalize(from_part), _normalize(to_part)

    async def get_default_partition(self, table_name: str) -> PartitionInfo | None:
        """Get DEFAULT partition for a table if it exists and is attached.

        Args:
            table_name: Parent table name.

        Returns:
            PartitionInfo with is_default=True, or None if no default partition exists.
        """
        all_partitions = await self.list_partitions(table_name)
        defaults = [p for p in all_partitions if p.is_default and p.is_attached]
        return defaults[0] if defaults else None

__init__(engine, *, marker_prefix=None)

Initialize provider.

Parameters:

Name Type Description Default
engine AsyncEngine

SQLAlchemy async engine.

required
marker_prefix str | None

Optional COMMENT marker prefix for orphaned partitions. When None, the library default prefix is used. Pass the same value to both repository and metadata provider if you override it.

None
Source code in pg_partsmith/aio/metadata.py
def __init__(self, engine: AsyncEngine, *, marker_prefix: str | None = None) -> None:
    """Initialize provider.

    Args:
        engine: SQLAlchemy async engine.
        marker_prefix: Optional COMMENT marker prefix for orphaned partitions.
            When None, the library default prefix is used. Pass the same
            value to both repository and metadata provider if you override it.
    """
    self._engine = engine
    self._marker_prefix = orphan_comment_prefix(marker_prefix=marker_prefix)

get_default_partition(table_name) async

Get DEFAULT partition for a table if it exists and is attached.

Parameters:

Name Type Description Default
table_name str

Parent table name.

required

Returns:

Type Description
PartitionInfo | None

PartitionInfo with is_default=True, or None if no default partition exists.

Source code in pg_partsmith/aio/metadata.py
async def get_default_partition(self, table_name: str) -> PartitionInfo | None:
    """Get DEFAULT partition for a table if it exists and is attached.

    Args:
        table_name: Parent table name.

    Returns:
        PartitionInfo with is_default=True, or None if no default partition exists.
    """
    all_partitions = await self.list_partitions(table_name)
    defaults = [p for p in all_partitions if p.is_default and p.is_attached]
    return defaults[0] if defaults else None

get_partition_boundaries(partition_name) async

Get partition boundaries.

Parameters:

Name Type Description Default
partition_name str

Partition table name.

required

Returns:

Type Description
tuple[str, str] | None

Tuple of (from_value, to_value) or None if not a range partition.

Source code in pg_partsmith/aio/metadata.py
async def get_partition_boundaries(self, partition_name: str) -> tuple[str, str] | None:
    """Get partition boundaries.

    Args:
        partition_name: Partition table name.

    Returns:
        Tuple of (from_value, to_value) or None if not a range partition.
    """
    async with self._engine.connect() as conn:
        result = await conn.execute(
            text(
                """
                SELECT pg_get_expr(relpartbound, oid)
                FROM pg_class
                WHERE oid = to_regclass(:partition_name)
                """
            ),
            {"partition_name": to_regclass_argument(partition_name)},
        )
        boundaries_expr = result.scalar()

    if not boundaries_expr:
        return None

    if isinstance(boundaries_expr, bytes):
        boundaries_expr = boundaries_expr.decode("utf-8", errors="replace")

    from_val, to_val = self._parse_boundaries(boundaries_expr)
    if from_val is not None and to_val is not None:
        return from_val, to_val

    return None

get_partition_column(table_name) async

Get partition column for a table.

Raises:

Type Description
ValueError

If the table uses a composite (multi-column) partition key. Only single-column keys are supported by this library.

Source code in pg_partsmith/aio/metadata.py
async def get_partition_column(self, table_name: str) -> str | None:
    """Get partition column for a table.

    Raises:
        ValueError: If the table uses a composite (multi-column) partition
            key.  Only single-column keys are supported by this library.
    """
    async with self._engine.connect() as conn:
        result = await conn.execute(
            text(
                """
                SELECT a.attname
                FROM pg_partitioned_table t
                JOIN pg_attribute a ON a.attrelid = t.partrelid AND a.attnum = ANY(t.partattrs)
                WHERE t.partrelid = to_regclass(:table_name)
                ORDER BY a.attnum
                """
            ),
            {"table_name": to_regclass_argument(table_name)},
        )
        rows = result.fetchall()

    if not rows:
        return None

    if len(rows) > 1:
        cols = [r[0] for r in rows]
        msg = (
            f"Table {table_name!r} uses a composite partition key {cols!r}. "
            "Only single-column partition keys are supported."
        )
        raise ValueError(msg)

    return self._maybe_decode(rows[0][0])

get_partition_type(table_name) async

Get partition type for a table.

Source code in pg_partsmith/aio/metadata.py
async def get_partition_type(self, table_name: str) -> PartitionType | None:
    """Get partition type for a table."""
    async with self._engine.connect() as conn:
        result = await conn.execute(
            text(
                """
                SELECT partstrat
                FROM pg_partitioned_table t
                WHERE t.partrelid = to_regclass(:table_name)
                """
            ),
            {"table_name": to_regclass_argument(table_name)},
        )
        strat = self._maybe_decode(result.scalar(), encoding="ascii")

    if strat == "r":
        return PartitionType.RANGE
    if strat == "l":
        return PartitionType.LIST
    if strat == "h":
        return PartitionType.HASH
    return None

is_partition_attached(table_name, partition_name) async

Check if a partition is currently attached to its parent via pg_inherits.

Parameters:

Name Type Description Default
table_name str

Parent table name.

required
partition_name str

Partition table name.

required

Returns:

Type Description
bool

True if the partition is attached.

Source code in pg_partsmith/aio/metadata.py
async def is_partition_attached(self, table_name: str, partition_name: str) -> bool:
    """Check if a partition is currently attached to its parent via pg_inherits.

    Args:
        table_name: Parent table name.
        partition_name: Partition table name.

    Returns:
        True if the partition is attached.
    """
    async with self._engine.connect() as conn:
        result = await conn.execute(
            text(
                """
                SELECT EXISTS (
                    SELECT 1
                    FROM pg_inherits inh
                    JOIN pg_class child ON inh.inhrelid = child.oid
                    WHERE inh.inhparent = to_regclass(:table_name)
                      AND inh.inhrelid = to_regclass(:partition_name)
                      AND child.relispartition = true
                )
                """
            ),
            {
                "table_name": to_regclass_argument(table_name),
                "partition_name": to_regclass_argument(partition_name),
            },
        )
        return bool(result.scalar())

list_partitions(table_name) async

List all partitions for a table, including orphaned detached ones.

Orphaned partitions are detached-but-not-dropped tables previously detached by this library. They are detected by a COMMENT marker set on successful detach and returned with is_attached=False and None boundaries.

Source code in pg_partsmith/aio/metadata.py
async def list_partitions(self, table_name: str) -> list[PartitionInfo]:
    """List all partitions for a table, including orphaned detached ones.

    Orphaned partitions are detached-but-not-dropped tables previously
    detached by this library. They are detected by a COMMENT marker set on
    successful detach and returned with ``is_attached=False`` and ``None``
    boundaries.
    """
    parent_schema, _ = split_qualified_name(table_name)
    want_qualified = parent_schema is not None

    async with self._engine.connect() as conn:
        # 1. Get partition type and canonical parent name in one query
        parent_info_result = await conn.execute(
            text(
                """
                SELECT
                    pt.partstrat,
                    ns.nspname || '.' || c.relname AS qualified_name
                FROM pg_class c
                JOIN pg_namespace ns ON c.relnamespace = ns.oid
                LEFT JOIN pg_partitioned_table pt ON pt.partrelid = c.oid
                WHERE c.oid = to_regclass(:name)
                """
            ),
            {"name": to_regclass_argument(table_name)},
        )
        parent_row = parent_info_result.fetchone()
        if not parent_row:
            return []

        strat = self._maybe_decode(parent_row[0], encoding="ascii")
        parent_qualified = self._maybe_decode(parent_row[1]) or table_name

        partition_type: PartitionType | None = None
        if strat == "r":
            partition_type = PartitionType.RANGE
        elif strat == "l":
            partition_type = PartitionType.LIST
        elif strat == "h":
            partition_type = PartitionType.HASH

        if not partition_type:
            return []

        # 2. Get attached partitions
        attached_result = await conn.execute(
            text(
                """
                SELECT
                    ns.nspname AS partition_schema,
                    child.relname AS partition_name,
                    pg_get_expr(child.relpartbound, child.oid) AS boundaries,
                    child.relispartition AS is_attached
                FROM pg_inherits inh
                JOIN pg_class child ON inh.inhrelid = child.oid
                JOIN pg_namespace ns ON child.relnamespace = ns.oid
                WHERE inh.inhparent = to_regclass(:table_name)
                ORDER BY ns.nspname, child.relname
                """
            ),
            {"table_name": to_regclass_argument(table_name)},
        )
        attached_rows = attached_result.fetchall()

        # 3. Get orphan partitions
        orphan_result = await conn.execute(
            text(
                """
                SELECT
                    ns.nspname AS partition_schema,
                    c.relname AS partition_name
                FROM pg_class c
                JOIN pg_namespace ns ON c.relnamespace = ns.oid
                JOIN pg_description d
                  ON d.objoid = c.oid
                 AND d.classoid = 'pg_class'::regclass
                 AND d.objsubid = 0
                WHERE c.relkind = 'r'
                  AND c.relispartition = false
                  AND split_part(d.description, E'\\n', 1) = :marker
                   AND NOT EXISTS (
                       SELECT 1
                       FROM pg_inherits inh
                       WHERE inh.inhrelid = c.oid
                   )
                ORDER BY ns.nspname, c.relname
                """
            ),
            {
                "marker": orphan_table_comment(parent_qualified, marker_prefix=self._marker_prefix),
            },
        )
        orphan_rows = orphan_result.fetchall()

    partitions: list[PartitionInfo] = []

    for row in attached_rows:
        relname: str | bytes = row.partition_name
        if isinstance(relname, bytes):
            relname = relname.decode("utf-8", errors="replace")

        part_schema: str | bytes = row.partition_schema
        if isinstance(part_schema, bytes):
            part_schema = part_schema.decode("utf-8", errors="replace")

        name = qualify(part_schema if want_qualified else None, relname)

        boundaries = row.boundaries
        if isinstance(boundaries, bytes):
            boundaries = boundaries.decode("utf-8", errors="replace")

        boundaries_str = str(boundaries) if boundaries is not None else ""
        is_default = boundaries_str.strip().upper() == "DEFAULT"
        from_val, to_val = (None, None) if is_default else self._parse_boundaries(boundaries_str)
        partitions.append(
            PartitionInfo(
                name=name,
                partition_type=partition_type,
                from_value=from_val,
                to_value=to_val,
                boundaries_expr=boundaries_str if boundaries_str else None,
                is_attached=row.is_attached,
                is_default=is_default,
                parent_table=table_name,
            )
        )

    for row in orphan_rows:
        orphan_relname: str | bytes = row.partition_name
        if isinstance(orphan_relname, bytes):
            orphan_relname = orphan_relname.decode("utf-8", errors="replace")

        orphan_schema: str | bytes = row.partition_schema
        if isinstance(orphan_schema, bytes):
            orphan_schema = orphan_schema.decode("utf-8", errors="replace")

        name = qualify(orphan_schema if want_qualified else None, orphan_relname)
        partitions.append(
            PartitionInfo(
                name=name,
                partition_type=partition_type,
                from_value=None,
                to_value=None,
                is_attached=False,
                parent_table=table_name,
            )
        )

    return partitions

partition_exists(partition_name) async

Check if a partition table exists in pg_class.

Parameters:

Name Type Description Default
partition_name str

Partition table name.

required

Returns:

Type Description
bool

True if the table exists as a regular table.

Source code in pg_partsmith/aio/metadata.py
async def partition_exists(self, partition_name: str) -> bool:
    """Check if a partition table exists in pg_class.

    Args:
        partition_name: Partition table name.

    Returns:
        True if the table exists as a regular table.
    """
    async with self._engine.connect() as conn:
        result = await conn.execute(
            text(
                """
                SELECT EXISTS (
                    SELECT 1
                    FROM pg_class
                    WHERE oid = to_regclass(:partition_name)
                      AND relkind = 'r'
                )
                """
            ),
            {"partition_name": to_regclass_argument(partition_name)},
        )
        return bool(result.scalar())

Lock managers

Lock manager using PostgreSQL advisory locks.

Holds the advisory lock on a dedicated AUTOCOMMIT connection from the engine pool. This guarantees the lock survives any number of commits or rollbacks on the caller's session, which is required when the caller needs to commit DDL (e.g. ATTACH PARTITION) before running DETACH PARTITION CONCURRENTLY.

Override _compute_lock_id to customise the lock ID derivation.

Source code in pg_partsmith/aio/lock/postgres.py
class PostgresAdvisoryLockManager:
    """Lock manager using PostgreSQL advisory locks.

    Holds the advisory lock on a dedicated AUTOCOMMIT connection from the
    engine pool. This guarantees the lock survives any number of commits or
    rollbacks on the caller's session, which is required when the caller
    needs to commit DDL (e.g. ATTACH PARTITION) before running
    DETACH PARTITION CONCURRENTLY.

    Override `_compute_lock_id` to customise the lock ID derivation.
    """

    def __init__(
        self,
        engine: AsyncEngine,
        prefix: str = "partitioner",
        acquire_min_interval_seconds: float = 0.0,
    ) -> None:
        """Initialize lock manager.

        Args:
            engine: SQLAlchemy async engine used to open a dedicated connection
                for the advisory lock.
            prefix: Prefix for lock key generation.
            acquire_min_interval_seconds: Minimum seconds between acquire attempts
                per table (rate limiting). 0 disables.
        """
        self._engine = engine
        self._prefix = prefix
        self._acquire_min_interval = max(0.0, acquire_min_interval_seconds)
        self._last_acquire_time: dict[str, float] = {}
        self._rate_limit_lock = asyncio.Lock()

    def _compute_lock_id(self, table_name: str) -> int:
        """Compute the advisory lock ID for a table name.

        Override this method to customise the ID derivation strategy.

        Args:
            table_name: Table name to lock.

        Returns:
            Advisory lock ID.
        """
        return calculate_lock_id(table_name, prefix=self._prefix)

    def acquire_lock(self, table_name: str) -> AbstractAsyncContextManager[None]:
        """Acquire advisory lock for a table.

        Opens a dedicated AUTOCOMMIT connection from the engine pool and
        acquires a session-level advisory lock on it. The lock is released
        when the context manager exits, with cancellation-safe cleanup.

        Args:
            table_name: Table name to lock.

        Returns:
            Async context manager for the lock.

        Raises:
            LockAcquisitionError: If the lock cannot be acquired.
        """
        return self._lock_scope(table_name)

    @asynccontextmanager
    async def _lock_scope(self, table_name: str) -> AsyncIterator[None]:
        """Internal acquire/release flow for a single advisory lock."""
        await self._respect_rate_limit(table_name)
        lock_id = self._compute_lock_id(table_name)

        async with self._engine.connect() as base_conn:
            conn = await base_conn.execution_options(isolation_level="AUTOCOMMIT")
            await self._try_acquire(conn, lock_id, table_name)

            body_exc: BaseException | None = None
            try:
                yield
            except BaseException as exc:
                body_exc = exc
                raise
            finally:
                await self._release_safely(conn, lock_id, table_name, body_exc)

    async def _respect_rate_limit(self, table_name: str) -> None:
        """Sleep enough to enforce the configured min-interval between acquires."""
        if self._acquire_min_interval <= 0:
            return
        async with self._rate_limit_lock:
            now = time.monotonic()
            last = self._last_acquire_time.get(table_name, 0.0)
            delay = last + self._acquire_min_interval - now
            if delay > 0:
                await asyncio.sleep(delay)
            self._last_acquire_time[table_name] = time.monotonic()

    async def _try_acquire(self, conn: AsyncConnection, lock_id: int, table_name: str) -> None:
        """Run ``pg_try_advisory_lock`` and raise if not granted."""
        result = await conn.execute(text("SELECT pg_try_advisory_lock(:lock_id)"), {"lock_id": lock_id})
        if not result.scalar():
            raise LockAcquisitionError(table_name, "advisory lock unavailable")

    async def _release_safely(
        self,
        conn: AsyncConnection,
        lock_id: int,
        table_name: str,
        body_exc: BaseException | None,
    ) -> None:
        """Release the lock with shielding so cancellation cannot leak a held lock."""
        try:
            await asyncio.shield(self._unlock(conn, lock_id, table_name))
        except (asyncio.CancelledError, KeyboardInterrupt, SystemExit):
            # Defensively invalidate so the connection is not returned to the pool with a dangling lock.
            with contextlib.suppress(Exception):
                await asyncio.shield(conn.invalidate())
            raise
        except Exception:
            # Body exception takes precedence; otherwise propagate the unlock failure.
            if body_exc is None:
                raise
            logger.warning(
                "Failed to release advisory lock",
                extra={"table_name": table_name, "lock_id": lock_id},
            )

    async def _unlock(self, conn: AsyncConnection, lock_id: int, table_name: str) -> None:
        """Run ``pg_advisory_unlock``; invalidate the connection on any failure."""
        try:
            await conn.execute(text("SELECT pg_advisory_unlock(:lock_id)"), {"lock_id": lock_id})
        except (asyncio.CancelledError, KeyboardInterrupt, SystemExit):
            raise
        except (SQLAlchemyError, OSError) as e:
            logger.warning(
                "Failed to release advisory lock (recoverable)",
                extra={"table_name": table_name, "lock_id": lock_id, "error": str(e)},
            )
            with contextlib.suppress(Exception):
                await conn.invalidate()
            raise
        except Exception:
            logger.exception(
                "Unexpected error while releasing advisory lock",
                extra={"table_name": table_name, "lock_id": lock_id},
            )
            with contextlib.suppress(Exception):
                await conn.invalidate()
            raise

    async def is_locked(self, table_name: str) -> bool:
        """Check if lock is held by any session.

        Args:
            table_name: Table name.

        Returns:
            True if the advisory lock for the given table is currently held.
        """
        lock_id = self._compute_lock_id(table_name)
        # Split 64-bit lock_id into classid and objid as stored in pg_locks for int8 advisory locks (objsubid=1).
        class_id = (lock_id >> 32) & 0xFFFFFFFF
        if class_id > 0x7FFFFFFF:
            class_id -= 0x100000000
        obj_id = lock_id & 0xFFFFFFFF
        if obj_id > 0x7FFFFFFF:
            obj_id -= 0x100000000

        async with self._engine.connect() as base_conn:
            conn = await base_conn.execution_options(isolation_level="AUTOCOMMIT")
            result = await conn.execute(
                text(
                    """
                    SELECT count(*)
                    FROM pg_locks
                    WHERE locktype = 'advisory'
                      AND granted = true
                      AND database = (SELECT oid FROM pg_database WHERE datname = current_database())
                      AND classid = CAST(:class_id AS int4)
                      AND objid = CAST(:obj_id AS int4)
                      AND objsubid = 1
                    """
                ),
                {"class_id": class_id, "obj_id": obj_id},
            )
            count = result.scalar()
        return bool(count is not None and count > 0)

__init__(engine, prefix='partitioner', acquire_min_interval_seconds=0.0)

Initialize lock manager.

Parameters:

Name Type Description Default
engine AsyncEngine

SQLAlchemy async engine used to open a dedicated connection for the advisory lock.

required
prefix str

Prefix for lock key generation.

'partitioner'
acquire_min_interval_seconds float

Minimum seconds between acquire attempts per table (rate limiting). 0 disables.

0.0
Source code in pg_partsmith/aio/lock/postgres.py
def __init__(
    self,
    engine: AsyncEngine,
    prefix: str = "partitioner",
    acquire_min_interval_seconds: float = 0.0,
) -> None:
    """Initialize lock manager.

    Args:
        engine: SQLAlchemy async engine used to open a dedicated connection
            for the advisory lock.
        prefix: Prefix for lock key generation.
        acquire_min_interval_seconds: Minimum seconds between acquire attempts
            per table (rate limiting). 0 disables.
    """
    self._engine = engine
    self._prefix = prefix
    self._acquire_min_interval = max(0.0, acquire_min_interval_seconds)
    self._last_acquire_time: dict[str, float] = {}
    self._rate_limit_lock = asyncio.Lock()

acquire_lock(table_name)

Acquire advisory lock for a table.

Opens a dedicated AUTOCOMMIT connection from the engine pool and acquires a session-level advisory lock on it. The lock is released when the context manager exits, with cancellation-safe cleanup.

Parameters:

Name Type Description Default
table_name str

Table name to lock.

required

Returns:

Type Description
AbstractAsyncContextManager[None]

Async context manager for the lock.

Raises:

Type Description
LockAcquisitionError

If the lock cannot be acquired.

Source code in pg_partsmith/aio/lock/postgres.py
def acquire_lock(self, table_name: str) -> AbstractAsyncContextManager[None]:
    """Acquire advisory lock for a table.

    Opens a dedicated AUTOCOMMIT connection from the engine pool and
    acquires a session-level advisory lock on it. The lock is released
    when the context manager exits, with cancellation-safe cleanup.

    Args:
        table_name: Table name to lock.

    Returns:
        Async context manager for the lock.

    Raises:
        LockAcquisitionError: If the lock cannot be acquired.
    """
    return self._lock_scope(table_name)

is_locked(table_name) async

Check if lock is held by any session.

Parameters:

Name Type Description Default
table_name str

Table name.

required

Returns:

Type Description
bool

True if the advisory lock for the given table is currently held.

Source code in pg_partsmith/aio/lock/postgres.py
async def is_locked(self, table_name: str) -> bool:
    """Check if lock is held by any session.

    Args:
        table_name: Table name.

    Returns:
        True if the advisory lock for the given table is currently held.
    """
    lock_id = self._compute_lock_id(table_name)
    # Split 64-bit lock_id into classid and objid as stored in pg_locks for int8 advisory locks (objsubid=1).
    class_id = (lock_id >> 32) & 0xFFFFFFFF
    if class_id > 0x7FFFFFFF:
        class_id -= 0x100000000
    obj_id = lock_id & 0xFFFFFFFF
    if obj_id > 0x7FFFFFFF:
        obj_id -= 0x100000000

    async with self._engine.connect() as base_conn:
        conn = await base_conn.execution_options(isolation_level="AUTOCOMMIT")
        result = await conn.execute(
            text(
                """
                SELECT count(*)
                FROM pg_locks
                WHERE locktype = 'advisory'
                  AND granted = true
                  AND database = (SELECT oid FROM pg_database WHERE datname = current_database())
                  AND classid = CAST(:class_id AS int4)
                  AND objid = CAST(:obj_id AS int4)
                  AND objsubid = 1
                """
            ),
            {"class_id": class_id, "obj_id": obj_id},
        )
        count = result.scalar()
    return bool(count is not None and count > 0)

Lock manager using Redis for distributed coordination.

Uses SET NX EX to acquire the lock and a background renewal task to extend the TTL while the lock is held, preventing expiry during long DDL operations (e.g. DETACH PARTITION CONCURRENTLY). The lock is released atomically via a Lua script that checks the ownership token, so it is safe even if Redis restarts during the renewal window.

The renewal interval is ttl_seconds // 3 (with random jitter to avoid thundering herds). If renewal fails — e.g. Redis is unreachable or another holder takes over — the watchdog logs a warning and cancels the holder task, forcing the maintenance run to stop (fail-safe).

For production use you may want to subclass and override acquire_lock to use Redlock or another algorithm with stronger guarantees.

Raises:

Type Description
ImportError

If the redis-locks optional dependency is not installed.

Source code in pg_partsmith/aio/lock/redis.py
class RedisDistributedLockManager:
    """Lock manager using Redis for distributed coordination.

    Uses ``SET NX EX`` to acquire the lock and a background renewal task to
    extend the TTL while the lock is held, preventing expiry during long DDL
    operations (e.g. ``DETACH PARTITION CONCURRENTLY``). The lock is released
    atomically via a Lua script that checks the ownership token, so it is safe
    even if Redis restarts during the renewal window.

    The renewal interval is ``ttl_seconds // 3`` (with random jitter to avoid
    thundering herds). If renewal fails — e.g. Redis is unreachable or another
    holder takes over — the watchdog logs a warning and cancels the holder
    task, forcing the maintenance run to stop (fail-safe).

    For production use you may want to subclass and override ``acquire_lock``
    to use Redlock or another algorithm with stronger guarantees.

    Raises:
        ImportError: If the ``redis-locks`` optional dependency is not installed.
    """

    def __init__(
        self,
        redis_client: RedisClientProtocol,
        prefix: str = "partitioner:lock",
        ttl_seconds: int = 300,
        acquire_min_interval_seconds: float = 0.0,
    ) -> None:
        """Initialize lock manager.

        Args:
            redis_client: Redis client instance.
            prefix: Prefix for Redis keys.
            ttl_seconds: Lock time-to-live in seconds. The lock is automatically
                renewed every ``ttl_seconds // 3`` seconds so that it does not
                expire during long DDL operations.
            acquire_min_interval_seconds: Minimum seconds between acquire attempts
                per table (rate limiting). 0 disables.

        Raises:
            ImportError: If ``redis-py`` is not installed.
            ValueError: If ``ttl_seconds`` is below the minimum.
        """
        if not _redis_available:
            msg = (
                "redis-py is required for RedisDistributedLockManager. "
                "Install it with: pip install pg-partsmith[redis-locks]"
            )
            raise ImportError(msg)

        if ttl_seconds < _MIN_TTL_SECONDS:
            msg = f"ttl_seconds must be >= {_MIN_TTL_SECONDS}, got {ttl_seconds!r}"
            raise ValueError(msg)

        self._redis = redis_client
        self._prefix = prefix
        self._ttl = ttl_seconds
        self._renew_interval = max(1, ttl_seconds // 3)

        self._unlock_script = self._redis.register_script(_UNLOCK_LUA)
        self._renew_script = self._redis.register_script(_RENEW_LUA)
        self._acquire_min_interval = max(0.0, acquire_min_interval_seconds)
        self._last_acquire_time: dict[str, float] = {}
        self._rate_limit_lock = asyncio.Lock()

    def _get_lock_key(self, table_name: str) -> str:
        return f"{self._prefix}:{table_name}"

    def acquire_lock(self, table_name: str) -> AbstractAsyncContextManager[None]:
        """Acquire Redis lock with automatic TTL renewal.

        Args:
            table_name: Table name.

        Returns:
            Async context manager for the lock.

        Raises:
            LockAcquisitionError: If the lock is already held.
        """
        return self._lock_scope(table_name)

    @asynccontextmanager
    async def _lock_scope(self, table_name: str) -> AsyncIterator[None]:
        """Internal acquire/release flow for a single Redis lock."""
        await self._respect_rate_limit(table_name)

        key = self._get_lock_key(table_name)
        token = secrets.token_hex(16)

        if not await self._redis.set(key, token, ex=self._ttl, nx=True):
            raise LockAcquisitionError(table_name, "Redis lock unavailable")

        holder_task = asyncio.current_task()
        if holder_task is None:
            raise RuntimeError("Could not determine current asyncio task")

        watchdog = asyncio.create_task(
            self._renewal_watchdog(key, token, table_name, holder_task),
            name=f"redis-lock-watchdog:{key}",
        )
        try:
            yield
        finally:
            await self._cancel_watchdog(watchdog)
            await asyncio.shield(self._release_safely(key, token, table_name))

    async def _respect_rate_limit(self, table_name: str) -> None:
        """Sleep enough to enforce the configured min-interval between acquires."""
        if self._acquire_min_interval <= 0:
            return
        async with self._rate_limit_lock:
            now = time.monotonic()
            last = self._last_acquire_time.get(table_name, 0.0)
            delay = last + self._acquire_min_interval - now
            if delay > 0:
                await asyncio.sleep(delay)
            self._last_acquire_time[table_name] = time.monotonic()

    async def _renewal_watchdog(
        self,
        key: str,
        token: str,
        table_name: str,
        holder_task: asyncio.Task[Any],
    ) -> None:
        """Periodically extend the lock TTL until cancelled or renewal fails."""
        while True:
            jitter = random.uniform(*_RENEW_JITTER_RANGE)  # noqa: S311
            await asyncio.sleep(self._renew_interval * jitter)
            try:
                renewed = await self._renew_script(keys=[key], args=[token, str(self._ttl)])
            except (asyncio.CancelledError, KeyboardInterrupt, SystemExit):
                raise
            except (OSError, ConnectionError, TimeoutError, RuntimeError) as exc:
                self._fail_holder(holder_task, key, table_name, "recoverable error", exc)
                return
            except Exception as exc:
                self._fail_holder(holder_task, key, table_name, "unexpected exception", exc)
                return

            if not renewed:
                self._fail_holder(holder_task, key, table_name, "lock lost", None)
                return

    def _fail_holder(
        self,
        holder_task: asyncio.Task[Any],
        key: str,
        table_name: str,
        reason: str,
        exc: Exception | None,
    ) -> None:
        """Log the renewal failure and cancel the holder task (fail-safe)."""
        extra: dict[str, str] = {"table_name": table_name, "key": key, "reason": reason}
        if exc is not None:
            extra["error"] = str(exc)
            logger.warning(
                f"Redis lock renewal failed: {reason}; cancelling maintenance task",
                extra=extra,
                exc_info=True,
            )
        else:
            logger.warning(
                f"Redis lock renewal failed: {reason}; cancelling maintenance task",
                extra=extra,
            )
        if not holder_task.done():
            holder_task.cancel()

    async def _cancel_watchdog(self, watchdog: asyncio.Task[None]) -> None:
        """Cancel the renewal watchdog and absorb its CancelledError."""
        watchdog.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await watchdog

    async def _release_safely(self, key: str, token: str, table_name: str) -> None:
        """Release the lock; failures are logged but do not propagate.

        If unlock fails, the TTL will eventually expire the lock.
        """
        try:
            await self._unlock_script(keys=[key], args=[token])
        except (asyncio.CancelledError, KeyboardInterrupt, SystemExit):
            raise
        except (OSError, ConnectionError, TimeoutError, RuntimeError) as exc:
            logger.warning(
                "Failed to release Redis lock (recoverable); TTL will expire it eventually",
                extra={
                    "table_name": table_name,
                    "key": key,
                    "error": str(exc),
                    "error_type": type(exc).__name__,
                },
            )
        except Exception:
            logger.exception(
                "Unexpected failure while releasing Redis lock",
                extra={"table_name": table_name, "key": key},
            )

    async def is_locked(self, table_name: str) -> bool:
        """Return True if the Redis lock for ``table_name`` is currently held."""
        key = self._get_lock_key(table_name)
        return bool(await self._redis.exists(key))

__init__(redis_client, prefix='partitioner:lock', ttl_seconds=300, acquire_min_interval_seconds=0.0)

Initialize lock manager.

Parameters:

Name Type Description Default
redis_client RedisClientProtocol

Redis client instance.

required
prefix str

Prefix for Redis keys.

'partitioner:lock'
ttl_seconds int

Lock time-to-live in seconds. The lock is automatically renewed every ttl_seconds // 3 seconds so that it does not expire during long DDL operations.

300
acquire_min_interval_seconds float

Minimum seconds between acquire attempts per table (rate limiting). 0 disables.

0.0

Raises:

Type Description
ImportError

If redis-py is not installed.

ValueError

If ttl_seconds is below the minimum.

Source code in pg_partsmith/aio/lock/redis.py
def __init__(
    self,
    redis_client: RedisClientProtocol,
    prefix: str = "partitioner:lock",
    ttl_seconds: int = 300,
    acquire_min_interval_seconds: float = 0.0,
) -> None:
    """Initialize lock manager.

    Args:
        redis_client: Redis client instance.
        prefix: Prefix for Redis keys.
        ttl_seconds: Lock time-to-live in seconds. The lock is automatically
            renewed every ``ttl_seconds // 3`` seconds so that it does not
            expire during long DDL operations.
        acquire_min_interval_seconds: Minimum seconds between acquire attempts
            per table (rate limiting). 0 disables.

    Raises:
        ImportError: If ``redis-py`` is not installed.
        ValueError: If ``ttl_seconds`` is below the minimum.
    """
    if not _redis_available:
        msg = (
            "redis-py is required for RedisDistributedLockManager. "
            "Install it with: pip install pg-partsmith[redis-locks]"
        )
        raise ImportError(msg)

    if ttl_seconds < _MIN_TTL_SECONDS:
        msg = f"ttl_seconds must be >= {_MIN_TTL_SECONDS}, got {ttl_seconds!r}"
        raise ValueError(msg)

    self._redis = redis_client
    self._prefix = prefix
    self._ttl = ttl_seconds
    self._renew_interval = max(1, ttl_seconds // 3)

    self._unlock_script = self._redis.register_script(_UNLOCK_LUA)
    self._renew_script = self._redis.register_script(_RENEW_LUA)
    self._acquire_min_interval = max(0.0, acquire_min_interval_seconds)
    self._last_acquire_time: dict[str, float] = {}
    self._rate_limit_lock = asyncio.Lock()

acquire_lock(table_name)

Acquire Redis lock with automatic TTL renewal.

Parameters:

Name Type Description Default
table_name str

Table name.

required

Returns:

Type Description
AbstractAsyncContextManager[None]

Async context manager for the lock.

Raises:

Type Description
LockAcquisitionError

If the lock is already held.

Source code in pg_partsmith/aio/lock/redis.py
def acquire_lock(self, table_name: str) -> AbstractAsyncContextManager[None]:
    """Acquire Redis lock with automatic TTL renewal.

    Args:
        table_name: Table name.

    Returns:
        Async context manager for the lock.

    Raises:
        LockAcquisitionError: If the lock is already held.
    """
    return self._lock_scope(table_name)

is_locked(table_name) async

Return True if the Redis lock for table_name is currently held.

Source code in pg_partsmith/aio/lock/redis.py
async def is_locked(self, table_name: str) -> bool:
    """Return True if the Redis lock for ``table_name`` is currently held."""
    key = self._get_lock_key(table_name)
    return bool(await self._redis.exists(key))

Hooks

No-op base implementation of partition lifecycle hooks.

Subclass and override only the methods you need. All methods are no-ops by default so you can selectively add behaviour without implementing every step.

Source code in pg_partsmith/aio/hooks.py
class BasePartitionLifecycleHooks:
    """No-op base implementation of partition lifecycle hooks.

    Subclass and override only the methods you need.
    All methods are no-ops by default so you can selectively add behaviour
    without implementing every step.
    """

    async def before_create(
        self,
        config: TablePartitionConfig,
        partition_name: str,
        from_value: str,
        to_value: str,
    ) -> None:
        """Called before a partition is created.

        Args:
            config: Table partition configuration.
            partition_name: Name the new partition will be given.
            from_value: Start boundary value.
            to_value: End boundary value.
        """

    async def after_create(
        self,
        config: TablePartitionConfig,
        partition: PartitionInfo,
    ) -> None:
        """Called after a partition has been created (and optionally attached).

        Args:
            config: Table partition configuration.
            partition: Info about the newly created partition.
        """

    async def before_detach(
        self,
        table_name: str,
        partition: PartitionInfo,
    ) -> None:
        """Called before a partition is detached from its parent table.

        This is a good place to export or archive data while the partition
        is still accessible via the parent table's indexes and constraints.

        Args:
            table_name: Parent table name.
            partition: Info about the partition being detached.
        """

    async def after_detach(
        self,
        table_name: str,
        partition_name: str,
    ) -> None:
        """Called after a partition has been detached.

        Args:
            table_name: Parent table name.
            partition_name: Name of the detached partition.
        """

    async def before_drop(
        self,
        table_name: str,
        partition_name: str,
    ) -> None:
        """Called before a partition table is dropped.

        This is the last chance to read or export data from the partition
        before it is permanently destroyed.

        Args:
            table_name: Parent table name.
            partition_name: Name of the partition about to be dropped.
        """

    async def after_drop(
        self,
        table_name: str,
        partition_name: str,
    ) -> None:
        """Called after a partition table has been dropped.

        Args:
            table_name: Parent table name.
            partition_name: Name of the dropped partition.
        """

after_create(config, partition) async

Called after a partition has been created (and optionally attached).

Parameters:

Name Type Description Default
config TablePartitionConfig

Table partition configuration.

required
partition PartitionInfo

Info about the newly created partition.

required
Source code in pg_partsmith/aio/hooks.py
async def after_create(
    self,
    config: TablePartitionConfig,
    partition: PartitionInfo,
) -> None:
    """Called after a partition has been created (and optionally attached).

    Args:
        config: Table partition configuration.
        partition: Info about the newly created partition.
    """

after_detach(table_name, partition_name) async

Called after a partition has been detached.

Parameters:

Name Type Description Default
table_name str

Parent table name.

required
partition_name str

Name of the detached partition.

required
Source code in pg_partsmith/aio/hooks.py
async def after_detach(
    self,
    table_name: str,
    partition_name: str,
) -> None:
    """Called after a partition has been detached.

    Args:
        table_name: Parent table name.
        partition_name: Name of the detached partition.
    """

after_drop(table_name, partition_name) async

Called after a partition table has been dropped.

Parameters:

Name Type Description Default
table_name str

Parent table name.

required
partition_name str

Name of the dropped partition.

required
Source code in pg_partsmith/aio/hooks.py
async def after_drop(
    self,
    table_name: str,
    partition_name: str,
) -> None:
    """Called after a partition table has been dropped.

    Args:
        table_name: Parent table name.
        partition_name: Name of the dropped partition.
    """

before_create(config, partition_name, from_value, to_value) async

Called before a partition is created.

Parameters:

Name Type Description Default
config TablePartitionConfig

Table partition configuration.

required
partition_name str

Name the new partition will be given.

required
from_value str

Start boundary value.

required
to_value str

End boundary value.

required
Source code in pg_partsmith/aio/hooks.py
async def before_create(
    self,
    config: TablePartitionConfig,
    partition_name: str,
    from_value: str,
    to_value: str,
) -> None:
    """Called before a partition is created.

    Args:
        config: Table partition configuration.
        partition_name: Name the new partition will be given.
        from_value: Start boundary value.
        to_value: End boundary value.
    """

before_detach(table_name, partition) async

Called before a partition is detached from its parent table.

This is a good place to export or archive data while the partition is still accessible via the parent table's indexes and constraints.

Parameters:

Name Type Description Default
table_name str

Parent table name.

required
partition PartitionInfo

Info about the partition being detached.

required
Source code in pg_partsmith/aio/hooks.py
async def before_detach(
    self,
    table_name: str,
    partition: PartitionInfo,
) -> None:
    """Called before a partition is detached from its parent table.

    This is a good place to export or archive data while the partition
    is still accessible via the parent table's indexes and constraints.

    Args:
        table_name: Parent table name.
        partition: Info about the partition being detached.
    """

before_drop(table_name, partition_name) async

Called before a partition table is dropped.

This is the last chance to read or export data from the partition before it is permanently destroyed.

Parameters:

Name Type Description Default
table_name str

Parent table name.

required
partition_name str

Name of the partition about to be dropped.

required
Source code in pg_partsmith/aio/hooks.py
async def before_drop(
    self,
    table_name: str,
    partition_name: str,
) -> None:
    """Called before a partition table is dropped.

    This is the last chance to read or export data from the partition
    before it is permanently destroyed.

    Args:
        table_name: Parent table name.
        partition_name: Name of the partition about to be dropped.
    """