Source code for pyfetcher.contracts.policy
"""Policy models for :mod:`pyfetcher`.
Purpose:
Provide serializable policy objects that control retries, timeouts,
connection pooling, and streaming behavior.
Design:
- Policies are explicit and reusable across transports.
- Policies are serializable so they can be persisted or queued later.
- Backend-specific conversion happens outside these models.
Examples:
::
>>> retry = RetryPolicy(attempts=4)
>>> retry.attempts
4
"""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field
[docs]
class RetryPolicy(BaseModel):
"""Retry policy shared by fetch services.
Controls how failed requests are retried using exponential backoff. Status
codes that trigger retries are configurable, as is whether connection-level
errors should be retried.
Args:
attempts: Total number of attempts including the first call.
wait_base_seconds: Base exponential backoff delay in seconds.
wait_max_seconds: Maximum delay between attempts in seconds.
retry_status_codes: HTTP status codes that should trigger retries.
retry_on_connection_errors: Whether connection errors should retry.
reraise: Whether the final failure should be re-raised to the caller.
Examples:
::
>>> RetryPolicy(attempts=3).attempts
3
"""
model_config = ConfigDict(extra="forbid", frozen=True)
attempts: int = 3
wait_base_seconds: float = 0.5
wait_max_seconds: float = 8.0
retry_status_codes: set[int] = Field(
default_factory=lambda: {408, 409, 425, 429, 500, 502, 503, 504},
)
retry_on_connection_errors: bool = True
reraise: bool = True
[docs]
class TimeoutPolicy(BaseModel):
"""Timeout policy shared by fetch services.
Provides granular timeout control for different phases of an HTTP request.
The ``total_seconds`` value acts as an overall budget that caps the entire
operation regardless of the per-phase values.
Args:
total_seconds: Overall timeout budget in seconds.
connect_seconds: Maximum time to establish a TCP connection.
read_seconds: Maximum time to receive the response body.
write_seconds: Maximum time to send the request body.
pool_seconds: Maximum time to acquire a connection from the pool.
Examples:
::
>>> TimeoutPolicy().total_seconds
30.0
"""
model_config = ConfigDict(extra="forbid", frozen=True)
total_seconds: float = 30.0
connect_seconds: float = 10.0
read_seconds: float = 30.0
write_seconds: float = 30.0
pool_seconds: float = 10.0
[docs]
class PoolPolicy(BaseModel):
"""Connection pooling and concurrency policy.
Controls connection pool sizing and keepalive behavior for transport
backends, as well as the concurrency limit used for async batch
operations.
Args:
max_connections: Maximum total connections across all hosts.
max_keepalive_connections: Maximum keepalive connections where supported.
keepalive_expiry_seconds: Time-to-live for idle keepalive connections.
max_connections_per_host: Maximum connections to a single host.
concurrency: Maximum in-flight tasks for async batching.
Examples:
::
>>> PoolPolicy(concurrency=8).concurrency
8
"""
model_config = ConfigDict(extra="forbid", frozen=True)
max_connections: int = 100
max_keepalive_connections: int = 20
keepalive_expiry_seconds: float = 10.0
max_connections_per_host: int = 10
concurrency: int = 8
[docs]
class StreamPolicy(BaseModel):
"""Streaming behavior policy.
Controls chunk sizing and optional byte limits for streaming operations.
The ``decode_text`` flag signals downstream consumers that text decoding
is desired.
Args:
chunk_size: Size in bytes of each emitted chunk.
decode_text: Whether downstream consumers expect text decoding.
max_bytes: Optional cap on total consumed bytes (``None`` for unlimited).
Examples:
::
>>> StreamPolicy().chunk_size
65536
"""
model_config = ConfigDict(extra="forbid", frozen=True)
chunk_size: int = 65_536
decode_text: bool = False
max_bytes: int | None = None