"""Async object storage client for :mod:`pyfetcher.store`.
Purpose:
Provide an async client for MinIO/S3 operations: upload, download,
delete, list, and presigned URL generation. Uses aioboto3 for async
S3 compatibility.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import aioboto3
from pyfetcher.config import PyfetcherConfig
[docs]
class ObjectStoreClient:
"""Async object storage client for MinIO/S3.
Provides upload, download, delete, and presigned URL operations
using aioboto3's async S3 interface against a MinIO endpoint.
Args:
config: Application configuration with MinIO connection details.
"""
def __init__(self, config: PyfetcherConfig | None = None) -> None:
self._config = config or PyfetcherConfig()
self._session = aioboto3.Session()
def _client_kwargs(self) -> dict[str, Any]:
"""Build kwargs for the S3 client context manager."""
return {
"service_name": "s3",
"endpoint_url": f"{'https' if self._config.minio_secure else 'http'}://{self._config.minio_endpoint}",
"aws_access_key_id": self._config.minio_access_key,
"aws_secret_access_key": self._config.minio_secret_key,
}
[docs]
async def ensure_bucket(self, bucket: str | None = None) -> None:
"""Create the bucket if it doesn't exist.
Args:
bucket: Bucket name. Defaults to config bucket.
"""
bucket = bucket or self._config.minio_bucket
async with self._session.client(**self._client_kwargs()) as s3:
try:
await s3.head_bucket(Bucket=bucket)
except Exception:
await s3.create_bucket(Bucket=bucket)
[docs]
async def upload_bytes(
self,
key: str,
data: bytes,
*,
bucket: str | None = None,
content_type: str | None = None,
) -> str:
"""Upload bytes to object storage.
Args:
key: Object key.
data: Bytes to upload.
bucket: Bucket name. Defaults to config bucket.
content_type: Optional MIME type.
Returns:
The object key.
"""
bucket = bucket or self._config.minio_bucket
extra: dict[str, str] = {}
if content_type:
extra["ContentType"] = content_type
async with self._session.client(**self._client_kwargs()) as s3:
await s3.put_object(Bucket=bucket, Key=key, Body=data, **extra)
return key
[docs]
async def upload_file(
self,
key: str,
file_path: str | Path,
*,
bucket: str | None = None,
content_type: str | None = None,
) -> str:
"""Upload a local file to object storage.
Args:
key: Object key.
file_path: Local file path.
bucket: Bucket name. Defaults to config bucket.
content_type: Optional MIME type.
Returns:
The object key.
"""
bucket = bucket or self._config.minio_bucket
extra_args: dict[str, str] = {}
if content_type:
extra_args["ContentType"] = content_type
async with self._session.client(**self._client_kwargs()) as s3:
await s3.upload_file(str(file_path), bucket, key, ExtraArgs=extra_args or None)
return key
[docs]
async def download_bytes(self, key: str, *, bucket: str | None = None) -> bytes:
"""Download an object as bytes.
Args:
key: Object key.
bucket: Bucket name. Defaults to config bucket.
Returns:
The object contents as bytes.
"""
bucket = bucket or self._config.minio_bucket
async with self._session.client(**self._client_kwargs()) as s3:
response = await s3.get_object(Bucket=bucket, Key=key)
return await response["Body"].read()
[docs]
async def presigned_get_url(
self, key: str, *, bucket: str | None = None, expires_in: int = 3600
) -> str:
"""Generate a presigned GET URL.
Args:
key: Object key.
bucket: Bucket name. Defaults to config bucket.
expires_in: URL expiration in seconds.
Returns:
A presigned URL string.
"""
bucket = bucket or self._config.minio_bucket
async with self._session.client(**self._client_kwargs()) as s3:
return await s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=expires_in,
)
[docs]
async def delete(self, key: str, *, bucket: str | None = None) -> None:
"""Delete an object.
Args:
key: Object key.
bucket: Bucket name. Defaults to config bucket.
"""
bucket = bucket or self._config.minio_bucket
async with self._session.client(**self._client_kwargs()) as s3:
await s3.delete_object(Bucket=bucket, Key=key)
[docs]
async def list_keys(
self, prefix: str = "", *, bucket: str | None = None, max_keys: int = 1000
) -> list[str]:
"""List object keys under a prefix.
Args:
prefix: Key prefix filter.
bucket: Bucket name. Defaults to config bucket.
max_keys: Maximum keys to return.
Returns:
A list of matching object keys.
"""
bucket = bucket or self._config.minio_bucket
async with self._session.client(**self._client_kwargs()) as s3:
response = await s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=max_keys)
return [obj["Key"] for obj in response.get("Contents", [])]