Advanced Usage¶
Advanced patterns and techniques for deadline-budget.
Dynamic Timeout Caps¶
Calculate timeout caps based on runtime conditions:
from deadline_budget import DeadlineBudget
def get_timeout_cap(operation: str, priority: str) -> float:
"""Get timeout cap based on operation and priority."""
caps = {
("create_user", "high"): 5.0,
("create_user", "low"): 2.0,
("send_email", "high"): 3.0,
("send_email", "low"): 1.0,
}
return caps.get((operation, priority), 1.0)
budget = DeadlineBudget(total_seconds=10.0, safety_margin=0.5)
# Dynamic cap based on priority
timeout = budget.timeout_for(cap=get_timeout_cap("create_user", priority="high"))
await identity_service.create_user(email="...", timeout=timeout)
Nested Orchestrations¶
Pass budgets to nested orchestrations:
from deadline_budget import BudgetContext
async def register_user(ctx: BudgetContext, email: str, password: str) -> User:
"""Nested orchestration that receives parent budget."""
user = await identity_service.create_user(
email=email,
timeout=ctx.timeout_for_call("identity_create_user"),
)
await credential_service.set_password(
user_id=user.id,
password=password,
timeout=ctx.timeout_for_call("credential_set_password"),
)
return user
async def register_and_notify(email: str, password: str) -> User:
"""Parent orchestration."""
ctx = BudgetContext.create(
total_seconds=15.0,
safety_margin=0.5,
call_caps={
"identity_create_user": 3.0,
"credential_set_password": 3.0,
"notification_send": 2.0,
},
)
# Pass budget to nested orchestration
user = await register_user(ctx, email, password)
# Continue using same budget
await notification_service.send_welcome(
user_id=user.id,
timeout=ctx.timeout_for_call("notification_send"),
)
return user
Progressive Timeout Caps¶
Reduce timeout caps as budget depletes:
from deadline_budget import DeadlineBudget
budget = DeadlineBudget(total_seconds=10.0, safety_margin=0.5)
# Early calls: generous caps
timeout1 = budget.timeout_for(cap=5.0)
await service_a.call(timeout=timeout1)
# Later calls: tighter caps
timeout2 = budget.timeout_for(cap=3.0)
await service_b.call(timeout=timeout2)
# Final calls: minimal caps
timeout3 = budget.timeout_for(cap=1.0)
await service_c.call(timeout=timeout3)
Conditional Logic Based on Remaining Budget¶
from deadline_budget import BudgetContext
ctx = BudgetContext.create(total_seconds=10.0, safety_margin=0.5)
# Step 1: Always execute
user = await identity_service.create_user(
email="...",
timeout=ctx.timeout_for_call("identity_create_user"),
)
# Step 2: Only if sufficient budget remains
if ctx.remaining() > 2.0:
await analytics_service.track_signup(
user_id=user.id,
timeout=ctx.timeout_for_call("analytics_track"),
)
else:
logger.warning("Skipping analytics due to insufficient budget")
Metrics Collection¶
Track budget consumption for monitoring:
from deadline_budget import BudgetContext, DeadlineExceededError
import time
async def instrumented_orchestration():
ctx = BudgetContext.create(total_seconds=10.0, safety_margin=0.5)
start = time.perf_counter()
try:
await step_1(ctx)
await step_2(ctx)
await step_3(ctx)
# Collect metrics
elapsed = ctx.elapsed()
remaining = ctx.remaining()
utilization = (elapsed / 10.0) * 100
metrics.record({
"elapsed_seconds": elapsed,
"remaining_seconds": remaining,
"budget_utilization_percent": utilization,
})
except DeadlineExceededError as e:
metrics.increment("deadline_exceeded")
raise
Custom Budget Strategies¶
Create custom budget allocation strategies:
from deadline_budget import DeadlineBudget
class AdaptiveBudget:
"""Budget that adjusts caps based on historical performance."""
def __init__(self, total_seconds: float, call_history: dict[str, float]):
self.budget = DeadlineBudget(total_seconds=total_seconds, safety_margin=0.5)
self.call_history = call_history # Historical avg duration per call
def timeout_for_call(self, call_name: str) -> float:
"""Use historical avg + 50% buffer as cap."""
avg_duration = self.call_history.get(call_name, 1.0)
cap = avg_duration * 1.5
return self.budget.timeout_for(cap=cap)
# Usage
history = {
"identity_create_user": 1.8, # Historical avg: 1.8s
"credential_set_password": 1.2,
}
adaptive = AdaptiveBudget(total_seconds=10.0, call_history=history)
timeout = adaptive.timeout_for_call("identity_create_user") # cap = 1.8 * 1.5 = 2.7s
Error Recovery with Budget¶
Retry with reduced timeouts on failure:
from deadline_budget import DeadlineBudget
import asyncio
async def call_with_retry(
budget: DeadlineBudget,
func,
max_retries: int = 2,
cap: float = 5.0,
):
"""Retry with progressively smaller timeouts."""
for attempt in range(max_retries + 1):
if budget.expired():
raise DeadlineExceededError("Budget exhausted before retry")
# Reduce cap on retries
retry_cap = cap / (attempt + 1)
timeout = budget.timeout_for(cap=retry_cap)
try:
return await func(timeout=timeout)
except asyncio.TimeoutError:
if attempt == max_retries:
raise
continue
# Usage
budget = DeadlineBudget(total_seconds=10.0, safety_margin=0.5)
result = await call_with_retry(
budget,
lambda timeout: identity_service.create_user(email="...", timeout=timeout),
max_retries=2,
cap=5.0,
)
Integration with Observability¶
Integrate with OpenTelemetry or other tracing:
from deadline_budget import BudgetContext
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def traced_orchestration():
ctx = BudgetContext.create(total_seconds=10.0, safety_margin=0.5)
with tracer.start_as_current_span("orchestration") as span:
span.set_attribute("budget.total", 10.0)
span.set_attribute("budget.safety_margin", 0.5)
await step_1(ctx)
span.set_attribute("budget.after_step_1", ctx.remaining())
await step_2(ctx)
span.set_attribute("budget.after_step_2", ctx.remaining())
span.set_attribute("budget.final_elapsed", ctx.elapsed())
span.set_attribute("budget.final_remaining", ctx.remaining())
Testing with Budgets¶
Mock time for deterministic tests:
from unittest.mock import patch
from deadline_budget import DeadlineBudget
import time
def test_budget_exhaustion():
with patch("time.perf_counter") as mock_time:
mock_time.return_value = 0.0
budget = DeadlineBudget(total_seconds=10.0, safety_margin=0.5)
# Simulate 5s elapsed
mock_time.return_value = 5.0
assert budget.remaining() == 4.5 # 10 - 0.5 safety - 5 elapsed
# Simulate 10s elapsed (budget exhausted)
mock_time.return_value = 10.0
assert budget.expired()
Best Practices¶
- Always set safety margins — Network latency and overhead can cause violations
- Monitor budget utilization — Track metrics to tune caps
- Fail fast — Check expiration after critical sections
- Reserve for cleanup — Use
reserve_for_nextfor final operations - Pass budgets down — Share budgets with nested orchestrations
- Log budget state — Include elapsed/remaining in error logs