Source code for pyfetcher.transports.curl_cffi

"""curl_cffi transport implementation for :mod:`pyfetcher`.

Purpose:
    Provide a transport backed by ``curl_cffi`` that impersonates real browser
    TLS fingerprints (JA3/JA4), making requests appear to originate from
    genuine browsers at the network layer. This is the most effective approach
    for bypassing TLS-based bot detection (e.g. Cloudflare, Akamai).

Design:
    - Uses ``curl_cffi.requests.Session`` for synchronous requests.
    - Uses ``curl_cffi.requests.AsyncSession`` for asynchronous requests.
    - The ``impersonate`` parameter selects which browser's TLS fingerprint
      to match (e.g. ``'chrome120'``, ``'firefox'``, ``'safari'``).
    - Sessions are lazily initialized on first use.
    - ``curl_cffi`` is imported lazily so it remains an optional dependency.

Examples:
    ::

        >>> transport = CurlCffiTransport(impersonate="chrome120")
        >>> hasattr(transport, "fetch")
        True
"""

from __future__ import annotations

from collections.abc import AsyncIterator
from time import perf_counter

from pyfetcher.contracts.request import FetchRequest
from pyfetcher.contracts.response import FetchResponse, StreamChunk

#: Supported curl_cffi browser impersonation targets.
CURL_CFFI_TARGETS: list[str] = [
    "chrome99",
    "chrome100",
    "chrome101",
    "chrome104",
    "chrome107",
    "chrome110",
    "chrome116",
    "chrome119",
    "chrome120",
    "chrome123",
    "chrome124",
    "chrome131",
    "chrome133",
    "edge99",
    "edge101",
    "firefox95",
    "firefox100",
    "firefox102",
    "firefox109",
    "firefox117",
    "safari15_3",
    "safari15_5",
    "safari17_0",
    "safari17_2_1",
    "safari18_0",
]


[docs] class CurlCffiTransport: """Transport using curl_cffi for browser TLS fingerprint impersonation. Matches real browser JA3/JA4 TLS fingerprints at the network level, making requests indistinguishable from genuine browser traffic to TLS-based bot detection systems. Args: impersonate: Browser TLS fingerprint target (e.g. ``'chrome120'``, ``'firefox'``, ``'safari'``). See :data:`CURL_CFFI_TARGETS` for all options. sync_session: Optional externally managed sync session. async_session: Optional externally managed async session. Examples: :: >>> transport = CurlCffiTransport(impersonate="chrome120") >>> hasattr(transport, "afetch") True """ def __init__( self, *, impersonate: str = "chrome120", sync_session: object | None = None, async_session: object | None = None, ) -> None: self.impersonate = impersonate self._sync_session = sync_session self._async_session = async_session self._owns_sync = sync_session is None self._owns_async = async_session is None def _get_sync_session(self) -> object: """Get or lazily create the synchronous curl_cffi session. Returns: A ``curl_cffi.requests.Session`` instance. Raises: ImportError: If ``curl_cffi`` is not installed. """ if self._sync_session is None: from curl_cffi.requests import Session # type: ignore[import-untyped] self._sync_session = Session(impersonate=self.impersonate) return self._sync_session def _get_async_session(self) -> object: """Get or lazily create the asynchronous curl_cffi session. Returns: A ``curl_cffi.requests.AsyncSession`` instance. Raises: ImportError: If ``curl_cffi`` is not installed. """ if self._async_session is None: from curl_cffi.requests import AsyncSession # type: ignore[import-untyped] self._async_session = AsyncSession(impersonate=self.impersonate) return self._async_session
[docs] def fetch(self, request: FetchRequest) -> FetchResponse: """Fetch a request synchronously with browser TLS impersonation. Args: request: The fetch request to execute. Returns: A normalized :class:`~pyfetcher.contracts.response.FetchResponse`. Raises: ImportError: If ``curl_cffi`` is not installed. """ session = self._get_sync_session() start = perf_counter() response = session.request( # type: ignore[union-attr] request.method, request.url.unicode_string(), params=request.params or None, headers=request.headers or None, data=request.data, json=request.json_data, allow_redirects=request.allow_redirects, verify=request.verify_ssl, timeout=request.timeout.total_seconds, ) elapsed_ms = (perf_counter() - start) * 1000.0 response.raise_for_status() return FetchResponse( request_url=request.url.unicode_string(), final_url=str(response.url), status_code=response.status_code, headers=dict(response.headers), content_type=response.headers.get("content-type"), text=response.text, body=response.content, backend="curl_cffi", elapsed_ms=elapsed_ms, )
[docs] async def afetch(self, request: FetchRequest) -> FetchResponse: """Fetch a request asynchronously with browser TLS impersonation. Args: request: The fetch request to execute. Returns: A normalized :class:`~pyfetcher.contracts.response.FetchResponse`. Raises: ImportError: If ``curl_cffi`` is not installed. """ session = self._get_async_session() start = perf_counter() response = await session.request( # type: ignore[union-attr] request.method, request.url.unicode_string(), params=request.params or None, headers=request.headers or None, data=request.data, json=request.json_data, allow_redirects=request.allow_redirects, verify=request.verify_ssl, timeout=request.timeout.total_seconds, ) elapsed_ms = (perf_counter() - start) * 1000.0 response.raise_for_status() return FetchResponse( request_url=request.url.unicode_string(), final_url=str(response.url), status_code=response.status_code, headers=dict(response.headers), content_type=response.headers.get("content-type"), text=response.text, body=response.content, backend="curl_cffi", elapsed_ms=elapsed_ms, )
[docs] async def astream(self, request: FetchRequest) -> AsyncIterator[StreamChunk]: """Stream a request asynchronously with browser TLS impersonation. Args: request: The fetch request to stream. Yields: :class:`~pyfetcher.contracts.response.StreamChunk` objects. Raises: ImportError: If ``curl_cffi`` is not installed. """ session = self._get_async_session() response = await session.request( # type: ignore[union-attr] request.method, request.url.unicode_string(), params=request.params or None, headers=request.headers or None, data=request.data, json=request.json_data, allow_redirects=request.allow_redirects, verify=request.verify_ssl, timeout=request.timeout.total_seconds, stream=True, ) response.raise_for_status() index = 0 async for chunk in response.aiter_content(): # type: ignore[union-attr] yield StreamChunk( request_url=request.url.unicode_string(), final_url=str(response.url), backend="curl_cffi", index=index, data=chunk if isinstance(chunk, bytes) else chunk.encode(), ) index += 1
[docs] def close(self) -> None: """Close the owned sync session if present.""" if self._owns_sync and self._sync_session is not None: close = getattr(self._sync_session, "close", None) if callable(close): close() self._sync_session = None
[docs] async def aclose(self) -> None: """Close the owned async session if present.""" if self._owns_async and self._async_session is not None: aclose = getattr(self._async_session, "close", None) if callable(aclose): await aclose() self._async_session = None