Source code for pyfetcher.transports.httpx

"""HTTPX transport implementation for :mod:`pyfetcher`.

Purpose:
    Provide pooled synchronous and asynchronous fetching using ``httpx`` as
    the underlying HTTP client library.

Design:
    - One transport instance owns long-lived clients and pooling state.
    - Request conversion stays thin because the main contracts are
      transport-agnostic.
    - Streaming is supported via ``httpx.AsyncClient.stream``.
    - Clients are lazily initialized on first use with settings derived
      from the request's embedded policies.

Examples:
    ::

        >>> transport = HttpxTransport()
        >>> hasattr(transport, "fetch")
        True
"""

from __future__ import annotations

from collections.abc import AsyncIterator
from time import perf_counter

import httpx

from pyfetcher.contracts.policy import PoolPolicy, TimeoutPolicy
from pyfetcher.contracts.request import FetchRequest
from pyfetcher.contracts.response import FetchResponse, StreamChunk


def _build_timeout(timeout: TimeoutPolicy) -> httpx.Timeout:
    """Convert a timeout policy into an ``httpx.Timeout`` object.

    Args:
        timeout: The timeout policy to convert.

    Returns:
        A configured ``httpx.Timeout`` instance.

    Examples:
        ::

            >>> isinstance(_build_timeout(TimeoutPolicy()), httpx.Timeout)
            True
    """
    return httpx.Timeout(
        timeout=timeout.total_seconds,
        connect=timeout.connect_seconds,
        read=timeout.read_seconds,
        write=timeout.write_seconds,
        pool=timeout.pool_seconds,
    )


def _build_limits(pool: PoolPolicy) -> httpx.Limits:
    """Convert a pool policy into an ``httpx.Limits`` object.

    Args:
        pool: The pool policy to convert.

    Returns:
        A configured ``httpx.Limits`` instance.

    Examples:
        ::

            >>> isinstance(_build_limits(PoolPolicy()), httpx.Limits)
            True
    """
    return httpx.Limits(
        max_connections=pool.max_connections,
        max_keepalive_connections=pool.max_keepalive_connections,
        keepalive_expiry=pool.keepalive_expiry_seconds,
    )


[docs] class HttpxTransport: """Combined sync/async HTTPX transport with pooled client management. Manages long-lived ``httpx.Client`` and ``httpx.AsyncClient`` instances that are lazily initialized from the first request's embedded policies. Supports synchronous fetch, asynchronous fetch, and async streaming. Args: sync_client: Optional externally managed sync client (caller owns lifecycle). async_client: Optional externally managed async client (caller owns lifecycle). Examples: :: >>> transport = HttpxTransport() >>> hasattr(transport, "afetch") True """ def __init__( self, *, sync_client: httpx.Client | None = None, async_client: httpx.AsyncClient | None = None, ) -> None: self._sync_client = sync_client self._async_client = async_client self._owns_sync_client = sync_client is None self._owns_async_client = async_client is None def _get_sync_client(self, request: FetchRequest) -> httpx.Client: """Get or lazily create the synchronous HTTPX client. Args: request: Fetch request whose policies configure the client. Returns: A configured ``httpx.Client`` instance. """ if self._sync_client is None: self._sync_client = httpx.Client( timeout=_build_timeout(request.timeout), limits=_build_limits(request.pool), follow_redirects=request.allow_redirects, verify=request.verify_ssl, http2=request.http2, ) return self._sync_client def _get_async_client(self, request: FetchRequest) -> httpx.AsyncClient: """Get or lazily create the asynchronous HTTPX client. Args: request: Fetch request whose policies configure the client. Returns: A configured ``httpx.AsyncClient`` instance. """ if self._async_client is None: self._async_client = httpx.AsyncClient( timeout=_build_timeout(request.timeout), limits=_build_limits(request.pool), follow_redirects=request.allow_redirects, verify=request.verify_ssl, http2=request.http2, ) return self._async_client
[docs] def fetch(self, request: FetchRequest) -> FetchResponse: """Fetch a request synchronously. Args: request: The fetch request to execute. Returns: A normalized :class:`~pyfetcher.contracts.response.FetchResponse`. Raises: httpx.HTTPStatusError: If the response status indicates an error. """ client = self._get_sync_client(request) start = perf_counter() response = client.request( request.method, request.url.unicode_string(), params=request.params, headers=request.headers, content=request.data, json=request.json_data, ) elapsed_ms = (perf_counter() - start) * 1000.0 response.raise_for_status() return FetchResponse( request_url=request.url.unicode_string(), final_url=str(response.url), status_code=response.status_code, headers=dict(response.headers), content_type=response.headers.get("content-type"), text=response.text, body=response.content, backend="httpx", elapsed_ms=elapsed_ms, )
[docs] async def afetch(self, request: FetchRequest) -> FetchResponse: """Fetch a request asynchronously. Args: request: The fetch request to execute. Returns: A normalized :class:`~pyfetcher.contracts.response.FetchResponse`. Raises: httpx.HTTPStatusError: If the response status indicates an error. """ client = self._get_async_client(request) start = perf_counter() response = await client.request( request.method, request.url.unicode_string(), params=request.params, headers=request.headers, content=request.data, json=request.json_data, ) elapsed_ms = (perf_counter() - start) * 1000.0 response.raise_for_status() return FetchResponse( request_url=request.url.unicode_string(), final_url=str(response.url), status_code=response.status_code, headers=dict(response.headers), content_type=response.headers.get("content-type"), text=response.text, body=response.content, backend="httpx", elapsed_ms=elapsed_ms, )
[docs] async def astream(self, request: FetchRequest) -> AsyncIterator[StreamChunk]: """Stream a request asynchronously. Yields chunks of the response body as :class:`StreamChunk` objects, each carrying the raw bytes and a zero-based index. Args: request: The fetch request to stream. Yields: :class:`~pyfetcher.contracts.response.StreamChunk` objects. Raises: httpx.HTTPStatusError: If the response status indicates an error. """ client = self._get_async_client(request) async with client.stream( request.method, request.url.unicode_string(), params=request.params, headers=request.headers, content=request.data, json=request.json_data, ) as response: response.raise_for_status() index = 0 async for chunk in response.aiter_bytes(chunk_size=request.stream.chunk_size): yield StreamChunk( request_url=request.url.unicode_string(), final_url=str(response.url), backend="httpx", index=index, data=chunk, ) index += 1
[docs] def close(self) -> None: """Close the owned sync client if present. Only closes clients that were created internally (not externally provided via the constructor). """ if self._owns_sync_client and self._sync_client is not None: self._sync_client.close() self._sync_client = None
[docs] async def aclose(self) -> None: """Close the owned async client if present. Only closes clients that were created internally (not externally provided via the constructor). """ if self._owns_async_client and self._async_client is not None: await self._async_client.aclose() self._async_client = None