Source code for pyfetcher.fetch.stream
"""Streaming convenience helpers for :mod:`pyfetcher`.
Purpose:
Provide small helpers for consuming async stream chunks into a single
bytes object.
Examples:
::
>>> collect_bytes([b"a", b"b"])
b'ab'
"""
from __future__ import annotations
from collections.abc import AsyncIterable
[docs]
def collect_bytes(chunks: list[bytes]) -> bytes:
"""Concatenate a list of byte chunks into a single bytes object.
Args:
chunks: List of byte chunks.
Returns:
All chunks concatenated.
Examples:
::
>>> collect_bytes([b"a", b"b"])
b'ab'
"""
return b"".join(chunks)
[docs]
async def collect_stream_bytes(chunks: AsyncIterable[bytes]) -> bytes:
"""Collect an async byte stream into a single bytes object.
Iterates the async iterable and accumulates all bytes into a single
contiguous buffer.
Args:
chunks: Async iterable yielding byte chunks.
Returns:
All chunks concatenated.
Examples:
::
>>> async def _demo():
... async def _gen():
... yield b"a"
... yield b"b"
... return await collect_stream_bytes(_gen())
"""
collected = bytearray()
async for chunk in chunks:
collected.extend(chunk)
return bytes(collected)