Source code for pyfetcher.transports.base
"""Base transport protocols for :mod:`pyfetcher`.
Purpose:
Define the minimal sync/async transport interfaces consumed by fetch
services. Implementations provide backend-specific HTTP execution while
conforming to these duck-typed protocols.
Design:
- Sync and async protocols are distinct to avoid forcing implementers
to provide both.
- Streaming is modeled explicitly for async transports.
- Implementations own backend-specific session/client lifecycles.
Examples:
::
>>> hasattr(SyncTransport, "fetch")
True
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import Protocol
from pyfetcher.contracts.request import FetchRequest
from pyfetcher.contracts.response import FetchResponse, StreamChunk
[docs]
class SyncTransport(Protocol):
"""Protocol for synchronous fetch transports.
Implementations must provide a ``fetch`` method that accepts a
:class:`~pyfetcher.contracts.request.FetchRequest` and returns a
normalized :class:`~pyfetcher.contracts.response.FetchResponse`.
"""
[docs]
def fetch(self, request: FetchRequest) -> FetchResponse:
"""Fetch a request synchronously.
Args:
request: The fetch request to execute.
Returns:
A normalized fetch response.
"""
...
[docs]
class AsyncTransport(Protocol):
"""Protocol for asynchronous fetch transports.
Implementations must provide ``afetch`` for full responses and
``astream`` for chunked streaming.
"""
[docs]
async def afetch(self, request: FetchRequest) -> FetchResponse:
"""Fetch a request asynchronously.
Args:
request: The fetch request to execute.
Returns:
A normalized fetch response.
"""
...
[docs]
async def astream(self, request: FetchRequest) -> AsyncIterator[StreamChunk]:
"""Stream a request asynchronously.
Args:
request: The fetch request to stream.
Returns:
An async iterator yielding :class:`StreamChunk` objects.
"""
...