Skip to content

API Reference

Auto-generated from source using mkdocstrings.

redis_client_kit

Redis client infrastructure with optional Pydantic, Prometheus, and Dishka support.

RedisMetricsProtocol

Bases: Protocol

Protocol for Redis metrics collection.

Defines the interface for collecting Redis client metrics. Implementation can use any metrics library (Prometheus, StatsD, etc.).

Example implementation: RedisMetrics from redis_client_kit.metrics

Source code in redis_client_kit/protocols.py
class RedisMetricsProtocol(Protocol):
    """Protocol for Redis metrics collection.

    Defines the interface for collecting Redis client metrics.
    Implementation can use any metrics library (Prometheus, StatsD, etc.).

    Example implementation: RedisMetrics from redis_client_kit.metrics
    """

    def record_command(
        self,
        command: str,
        status: str,
        duration: float,
    ) -> None:
        """Record a completed Redis command.

        Args:
            command: Redis command name (e.g., "GET", "SET", "HGETALL")
            status: Execution status ("success" or "error")
            duration: Command execution duration in seconds
        """
        ...

    def record_error(
        self,
        error_type: str,
    ) -> None:
        """Record a Redis connection or execution error.

        Args:
            error_type: Error type name (e.g., "ConnectionError", "TimeoutError")
        """
        ...

    def record_pool_stats(
        self,
        pool_size: int,
        pool_checked_out: int,
    ) -> None:
        """Record Redis connection pool statistics.

        Args:
            pool_size: Total number of connections in the pool
            pool_checked_out: Number of connections currently in use
        """
        ...

record_command(command, status, duration)

Record a completed Redis command.

Parameters:

Name Type Description Default
command str

Redis command name (e.g., "GET", "SET", "HGETALL")

required
status str

Execution status ("success" or "error")

required
duration float

Command execution duration in seconds

required
Source code in redis_client_kit/protocols.py
def record_command(
    self,
    command: str,
    status: str,
    duration: float,
) -> None:
    """Record a completed Redis command.

    Args:
        command: Redis command name (e.g., "GET", "SET", "HGETALL")
        status: Execution status ("success" or "error")
        duration: Command execution duration in seconds
    """
    ...

record_error(error_type)

Record a Redis connection or execution error.

Parameters:

Name Type Description Default
error_type str

Error type name (e.g., "ConnectionError", "TimeoutError")

required
Source code in redis_client_kit/protocols.py
def record_error(
    self,
    error_type: str,
) -> None:
    """Record a Redis connection or execution error.

    Args:
        error_type: Error type name (e.g., "ConnectionError", "TimeoutError")
    """
    ...

record_pool_stats(pool_size, pool_checked_out)

Record Redis connection pool statistics.

Parameters:

Name Type Description Default
pool_size int

Total number of connections in the pool

required
pool_checked_out int

Number of connections currently in use

required
Source code in redis_client_kit/protocols.py
def record_pool_stats(
    self,
    pool_size: int,
    pool_checked_out: int,
) -> None:
    """Record Redis connection pool statistics.

    Args:
        pool_size: Total number of connections in the pool
        pool_checked_out: Number of connections currently in use
    """
    ...

RedisSettingsProtocol

Bases: Protocol

Protocol for Redis configuration with grouped settings.

Examples:

Single node setup:

>>> class Settings(RedisSettingsProtocol):
...     connection = RedisConnectionSettings(host="localhost", port=6379, db=0)
...     cluster = RedisClusterSettings(enabled=False)
...     # ... other attributes ...

Redis Cluster setup:

>>> class ClusterSettings(RedisSettingsProtocol):
...     connection = RedisConnectionSettings(...)
...     cluster = RedisClusterSettings(
...         enabled=True,
...         nodes=["redis-1:6379", "redis-2:6379", "redis-3:6379"]
...     )
...     # ... other attributes ...
Source code in redis_client_kit/config.py
class RedisSettingsProtocol(Protocol):
    """Protocol for Redis configuration with grouped settings.

    Examples:
        Single node setup:
        >>> class Settings(RedisSettingsProtocol):
        ...     connection = RedisConnectionSettings(host="localhost", port=6379, db=0)
        ...     cluster = RedisClusterSettings(enabled=False)
        ...     # ... other attributes ...

        Redis Cluster setup:
        >>> class ClusterSettings(RedisSettingsProtocol):
        ...     connection = RedisConnectionSettings(...)
        ...     cluster = RedisClusterSettings(
        ...         enabled=True,
        ...         nodes=["redis-1:6379", "redis-2:6379", "redis-3:6379"]
        ...     )
        ...     # ... other attributes ...
    """

    connection: RedisConnectionProtocol
    cluster: RedisClusterProtocol
    pool: RedisPoolProtocol
    retry: RedisRetryProtocol
    ssl: RedisSSLProtocol
    response: RedisResponseProtocol
    health_check_interval: int | None

build_base_redis_kwargs(settings)

Build base Redis client keyword arguments.

Source code in redis_client_kit/utils.py
def build_base_redis_kwargs(settings: RedisSettingsProtocol) -> dict[str, object]:
    """Build base Redis client keyword arguments."""
    retry = build_redis_retry(settings)

    if settings.ssl.enabled:
        if settings.ssl.ca_certs:
            _validate_pem_format(settings.ssl.ca_certs, "CERTIFICATE")
        if settings.ssl.certfile:
            _validate_pem_format(settings.ssl.certfile, "CERTIFICATE")
        if settings.ssl.keyfile:
            _validate_pem_format(settings.ssl.keyfile, "PRIVATE KEY")

    kwargs: dict[str, object] = {
        "password": settings.connection.get_password(),
        "max_connections": settings.pool.max_connections,
        "socket_timeout": settings.pool.socket_timeout,
        "socket_connect_timeout": settings.pool.socket_connect_timeout,
        "socket_keepalive": settings.pool.socket_keepalive,
        "socket_keepalive_options": settings.pool.socket_keepalive_options,
        "health_check_interval": settings.health_check_interval,
        "decode_responses": settings.response.decode_responses,
        "encoding": settings.response.encoding,
        "client_name": settings.connection.client_name,
        "protocol": settings.connection.protocol,
        # SSL
        "ssl": settings.ssl.enabled,
        "ssl_cert_reqs": settings.ssl.cert_reqs,
        "ssl_ca_certs": settings.ssl.ca_certs,
        "ssl_certfile": settings.ssl.certfile,
        "ssl_keyfile": settings.ssl.keyfile,
    }

    if settings.cluster.enabled:
        kwargs["require_full_coverage"] = settings.cluster.require_full_coverage
        kwargs["read_from_replicas"] = settings.cluster.read_from_replicas

    if retry is not None:
        kwargs["retry"] = retry

    return kwargs

build_redis_retry(settings)

Build Redis Retry object from settings.

Source code in redis_client_kit/utils.py
def build_redis_retry(settings: RedisSettingsProtocol) -> Retry | None:
    """Build Redis Retry object from settings."""
    if settings.retry.enabled and settings.retry.max_attempts:
        return Retry(
            backoff=ExponentialBackoff(
                cap=settings.retry.backoff_cap,
                base=settings.retry.backoff_base,
            ),
            retries=settings.retry.max_attempts,
        )
    return None

check_async_redis_health(client) async

Check async Redis connection health.

Parameters:

Name Type Description Default
client AsyncRedisClient

Redis client (single or cluster) to check

required

Returns:

Type Description
bool

True if Redis is healthy, False otherwise

Source code in redis_client_kit/aio/lifecycle.py
async def check_async_redis_health(client: AsyncRedisClient) -> bool:
    """Check async Redis connection health.

    Args:
        client: Redis client (single or cluster) to check

    Returns:
        True if Redis is healthy, False otherwise
    """
    try:
        result: Any = await client.ping()  # type: ignore[misc]

        # Redis Cluster ping returns dict[str, bool] (node_id -> success); single node returns bool.
        if isinstance(result, dict) and result and all(isinstance(k, str) for k in result):
            return all(bool(v) for v in result.values())

        return bool(result)
    except (RedisError, RedisClusterException, OSError, ConnectionError, TimeoutError) as e:
        logger.warning(
            "Async Redis health check failed",
            extra={"error": str(e), "error_type": type(e).__name__},
        )
        return False
    except (asyncio.CancelledError, KeyboardInterrupt, SystemExit):
        raise
    except Exception:
        # Catch unexpected errors like encoding/decoding or internal redis-py bugs.
        # We return False to indicate unhealthy state but log the full exception for debugging.
        logger.exception("Unexpected error during Redis health check")
        return False

check_redis_health(client)

Check sync Redis connection health.

Parameters:

Name Type Description Default
client SyncRedisClient

Redis client (single or cluster) to check

required

Returns:

Type Description
bool

True if Redis is healthy, False otherwise

Source code in redis_client_kit/sync/lifecycle.py
def check_redis_health(client: SyncRedisClient) -> bool:
    """Check sync Redis connection health.

    Args:
        client: Redis client (single or cluster) to check

    Returns:
        True if Redis is healthy, False otherwise
    """
    try:
        result: Any = client.ping()  # type: ignore[misc]

        # Redis Cluster ping returns dict[str, bool] (node_id -> success); single node returns bool.
        if isinstance(result, dict) and result and all(isinstance(k, str) for k in result):
            return all(bool(v) for v in result.values())

        return bool(result)
    except (RedisError, RedisClusterException, OSError, ConnectionError, TimeoutError) as e:
        logger.warning(
            "Redis health check failed",
            extra={"error": str(e), "error_type": type(e).__name__},
        )
        return False
    except (KeyboardInterrupt, SystemExit):
        raise
    except Exception:
        # Catch unexpected errors like encoding/decoding or internal redis-py bugs.
        # We return False to indicate unhealthy state but log the full exception for debugging.
        logger.exception("Unexpected error during Redis health check")
        return False

close_async_redis_client(client) async

Close async Redis client and release pool connections.

Parameters:

Name Type Description Default
client AsyncRedisClient

Redis client (single or cluster) to close

required
Note

Uses asyncio.shield to ensure close operation completes even if task is cancelled.

Source code in redis_client_kit/aio/lifecycle.py
async def close_async_redis_client(client: AsyncRedisClient) -> None:
    """Close async Redis client and release pool connections.

    Args:
        client: Redis client (single or cluster) to close

    Note:
        Uses asyncio.shield to ensure close operation completes even if task is cancelled.
    """
    try:
        await asyncio.wait_for(asyncio.shield(client.aclose()), timeout=ACLOSE_TIMEOUT_S)
    except TimeoutError:
        logger.warning("Redis client aclose timed out", extra={"timeout_s": ACLOSE_TIMEOUT_S})
    except (asyncio.CancelledError, KeyboardInterrupt):
        raise
    except Exception as e:
        logger.warning("Error closing Redis client", extra={"error": str(e)})
    else:
        logger.info("Closed Redis client")

close_redis_client(client)

Close sync Redis client and release pool connections.

Parameters:

Name Type Description Default
client SyncRedisClient

Redis client (single or cluster) to close

required
Source code in redis_client_kit/sync/lifecycle.py
def close_redis_client(client: SyncRedisClient) -> None:
    """Close sync Redis client and release pool connections.

    Args:
        client: Redis client (single or cluster) to close
    """
    try:
        client.close()
    except (KeyboardInterrupt, SystemExit):
        raise
    except Exception as e:
        logger.warning("Error closing Redis client", extra={"error": str(e)})
    else:
        logger.info("Closed Redis client")

create_async_redis_client(settings, metrics=None)

Create async Redis client with connection pool.

Returns redis-py client (Redis or RedisCluster) with optional metrics instrumentation. - If metrics is None: returns plain Redis/RedisCluster (zero overhead) - If metrics provided: returns InstrumentedRedis/InstrumentedRedisCluster

Automatically selects single or cluster mode based on settings.

Parameters:

Name Type Description Default
settings RedisSettingsProtocol

Redis configuration

required
metrics RedisMetricsProtocol | None

Optional Prometheus metrics collector (None = no instrumentation)

None

Returns:

Type Description
AsyncRedisClient

Redis, RedisCluster, InstrumentedRedis, or InstrumentedRedisCluster instance

Source code in redis_client_kit/aio/factory.py
def create_async_redis_client(
    settings: RedisSettingsProtocol, metrics: RedisMetricsProtocol | None = None
) -> AsyncRedisClient:
    """Create async Redis client with connection pool.

    Returns redis-py client (Redis or RedisCluster) with optional metrics instrumentation.
    - If metrics is None: returns plain Redis/RedisCluster (zero overhead)
    - If metrics provided: returns InstrumentedRedis/InstrumentedRedisCluster

    Automatically selects single or cluster mode based on settings.

    Args:
        settings: Redis configuration
        metrics: Optional Prometheus metrics collector (None = no instrumentation)

    Returns:
        Redis, RedisCluster, InstrumentedRedis, or InstrumentedRedisCluster instance
    """
    if settings.cluster.enabled:
        return _create_async_cluster_client(settings, metrics=metrics)

    return _create_async_single_client(settings, metrics=metrics)

create_redis_client(settings, metrics=None)

Create sync Redis client with connection pool.

Returns redis-py client (Redis or RedisCluster) with optional metrics instrumentation. - If metrics is None: returns plain Redis/RedisCluster (zero overhead) - If metrics provided: returns InstrumentedRedis/InstrumentedRedisCluster

Automatically selects single or cluster mode based on settings.

Parameters:

Name Type Description Default
settings RedisSettingsProtocol

Redis configuration

required
metrics RedisMetricsProtocol | None

Optional Prometheus metrics collector (None = no instrumentation)

None

Returns:

Type Description
SyncRedisClient

Redis, RedisCluster, InstrumentedRedis, or InstrumentedRedisCluster instance

Source code in redis_client_kit/sync/factory.py
def create_redis_client(
    settings: RedisSettingsProtocol, metrics: RedisMetricsProtocol | None = None
) -> SyncRedisClient:
    """Create sync Redis client with connection pool.

    Returns redis-py client (Redis or RedisCluster) with optional metrics instrumentation.
    - If metrics is None: returns plain Redis/RedisCluster (zero overhead)
    - If metrics provided: returns InstrumentedRedis/InstrumentedRedisCluster

    Automatically selects single or cluster mode based on settings.

    Args:
        settings: Redis configuration
        metrics: Optional Prometheus metrics collector (None = no instrumentation)

    Returns:
        Redis, RedisCluster, InstrumentedRedis, or InstrumentedRedisCluster instance
    """
    if settings.cluster.enabled:
        return _create_cluster_client(settings, metrics=metrics)

    return _create_single_client(settings, metrics=metrics)

parse_redis_url_node(node)

Parse Redis node from host:port, [ipv6]:port or redis:// URL string.

Source code in redis_client_kit/utils.py
def parse_redis_url_node(node: str) -> tuple[str, int]:
    """Parse Redis node from host:port, [ipv6]:port or redis:// URL string."""
    normalized_node = node.strip()
    if not normalized_node:
        raise ValueError("Redis node cannot be empty")

    # If already a URL, don't prepend scheme
    url = normalized_node if normalized_node.startswith(("redis://", "rediss://")) else f"redis://{normalized_node}"

    parsed = urlparse(url)
    host = parsed.hostname
    port = parsed.port

    if host is None or port is None:
        raise ValueError(f"Invalid Redis node: {node!r}")

    # Validate port range
    if not (1 <= port <= 65535):
        raise ValueError(f"Invalid Redis port {port}: must be between 1 and 65535")

    return host, port