From 369039a80d4142c89e2eacb7ae6a99b850c1d4e5 Mon Sep 17 00:00:00 2001 From: Danylo_Kriachkov Date: Thu, 11 Jun 2026 12:02:57 +0300 Subject: [PATCH 1/4] feat: add streaming download functionality for large files in AsyncFiles class --- README.md | 10 +++++ aidial_client/_http_client/_async.py | 36 +++++++++++++++ aidial_client/resources/files.py | 28 +++++++++++- tests/resources/files/test_download.py | 61 ++++++++++++++++++++++++++ 4 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 tests/resources/files/test_download.py diff --git a/README.md b/README.md index 98ffb61..5fdd5ad 100644 --- a/README.md +++ b/README.md @@ -492,6 +492,16 @@ result = await async_client.files.download( ) ``` +For large async downloads, use `stream_download()` to process bytes as they arrive without buffering the full response in memory: + +```python +async with async_client.files.stream_download( + url=await async_client.my_files_home() / "relative_folder/my-file.txt" +) as result: + async for bytes_chunk in result: + ... +``` + As a result, you will receive an object of type `FileDownloadResponse`, that you can iterate by byte chunks: ```python diff --git a/aidial_client/_http_client/_async.py b/aidial_client/_http_client/_async.py index 5dcf37c..f82c82f 100644 --- a/aidial_client/_http_client/_async.py +++ b/aidial_client/_http_client/_async.py @@ -120,6 +120,42 @@ async def request( return process_block_response(cast_to=cast_to, response=response) + @asynccontextmanager + async def stream( + self, + *, + options: FinalRequestOptions, + on_http_error: Optional[ + Callable[[httpx.HTTPStatusError], Optional[DialException]] + ] = None, + ) -> AsyncIterator[httpx.Response]: + auth_headers = await self.auth_headers() + request = self._build_request(options, auth_headers) + try: + response = await self._internal_http_client.send( + request, stream=True + ) + except httpx.TimeoutException as err: + raise DialException( + message="Request timed out", + status_code=HTTPStatus.REQUEST_TIMEOUT, + ) from err + except httpx.HTTPError as err: + raise DialException(message=f"Request failed: {err}") from err + + try: + try: + response.raise_for_status() + except httpx.HTTPStatusError as err: + custom_error = on_http_error(err) if on_http_error else None + raised_error = custom_error or self._make_dial_error_from_response( + err.response + ) + raise raised_error from err + yield response + finally: + await response.aclose() + @asynccontextmanager async def stream_sse( self, diff --git a/aidial_client/resources/files.py b/aidial_client/resources/files.py index 47421aa..befc5ba 100644 --- a/aidial_client/resources/files.py +++ b/aidial_client/resources/files.py @@ -1,5 +1,6 @@ from pathlib import PurePosixPath -from typing import Literal, Optional, Union +from contextlib import asynccontextmanager +from typing import AsyncIterator, Literal, Optional, Union from urllib.parse import urljoin import httpx @@ -209,6 +210,31 @@ async def download( response=response, filename=storage_resource.filename ) + @asynccontextmanager + async def stream_download( + self, + url: Union[str, PurePosixPath], + etag_if_match: Optional[str] = None, + ) -> AsyncIterator[FileDownloadResponse]: + storage_resource = self.get_storage_resource(str(url)) + if storage_resource.filename is None: + raise InvalidDialURLError("URL points to a directory, not a file") + async with self.http_client.stream( + options=FinalRequestOptions( + method="GET", + url=urljoin(API_PREFIX, storage_resource.api_path), + headers=remove_none( + { + "If-Match": etag_if_match, + } + ), + ), + on_http_error=_files_error_processor, + ) as response: + yield FileDownloadResponse( + response=response, filename=storage_resource.filename + ) + async def delete( self, url: Union[str, PurePosixPath], diff --git a/tests/resources/files/test_download.py b/tests/resources/files/test_download.py new file mode 100644 index 0000000..34d1e8d --- /dev/null +++ b/tests/resources/files/test_download.py @@ -0,0 +1,61 @@ +from typing import Any, Dict, List, cast +from unittest.mock import AsyncMock + +import httpx +import pytest + +from aidial_client._client import AsyncDial +from aidial_client._exception import InvalidDialURLError +from tests.client_mock import MockStreamIterator + + +@pytest.mark.asyncio +async def test_stream_download_async_streams_and_closes_response(): + captured_requests: List[httpx.Request] = [] + captured_kwargs: List[Dict[str, Any]] = [] + captured_responses: List[httpx.Response] = [] + client = AsyncDial(api_key="dummy", base_url="http://dial.core") + client._get_my_bucket = cast(Any, AsyncMock(return_value="test-bucket")) + + async def send_mock( + request: httpx.Request, *, stream: bool = False, **kwargs: Any + ) -> httpx.Response: + captured_requests.append(request) + captured_kwargs.append({"stream": stream, **kwargs}) + response = httpx.Response( + status_code=200, + request=request, + stream=MockStreamIterator(mock_chunks=[b"hello ", b"world"]), + ) + captured_responses.append(response) + return response + + client._http_client._internal_http_client.send = cast(Any, send_mock) + + async with client.files.stream_download( + url=await client.my_files_home() / "folder/file.txt" + ) as response: + assert response.filename == "file.txt" + assert b"".join([chunk async for chunk in response]) == b"hello world" + + assert captured_requests[0].url.path == "/v1/files/test-bucket/folder/file.txt" + assert captured_kwargs == [{"stream": True}] + assert captured_responses[0].is_closed is True + + +@pytest.mark.asyncio +async def test_stream_download_async_rejects_directory_url(): + client = AsyncDial(api_key="dummy", base_url="http://dial.core") + client._get_my_bucket = cast(Any, AsyncMock(return_value="test-bucket")) + send_mock = AsyncMock() + client._http_client._internal_http_client.send = cast(Any, send_mock) + + with pytest.raises( + InvalidDialURLError, match="URL points to a directory, not a file" + ): + async with client.files.stream_download( + url="files/test-bucket/folder/" + ): + pass + + send_mock.assert_not_called() From c041845601edf363a4bd9bf9313d87973c806ae7 Mon Sep 17 00:00:00 2001 From: Danylo_Kriachkov Date: Mon, 22 Jun 2026 15:55:23 +0300 Subject: [PATCH 2/4] refactor: update type hints for on_http_error and adjust list type annotations in tests --- aidial_client/_http_client/_async.py | 11 ++++++----- tests/resources/files/test_download.py | 12 +++++++----- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/aidial_client/_http_client/_async.py b/aidial_client/_http_client/_async.py index 32b032f..5f8fc43 100644 --- a/aidial_client/_http_client/_async.py +++ b/aidial_client/_http_client/_async.py @@ -116,9 +116,9 @@ async def stream( self, *, options: FinalRequestOptions, - on_http_error: Optional[ - Callable[[httpx.HTTPStatusError], Optional[DialException]] - ] = None, + on_http_error: ( + Callable[[httpx.HTTPStatusError], DialException | None] | None + ) = None, ) -> AsyncIterator[httpx.Response]: auth_headers = await self.auth_headers() request = self._build_request(options, auth_headers) @@ -139,8 +139,9 @@ async def stream( response.raise_for_status() except httpx.HTTPStatusError as err: custom_error = on_http_error(err) if on_http_error else None - raised_error = custom_error or self._make_dial_error_from_response( - err.response + raised_error = ( + custom_error + or self._make_dial_error_from_response(err.response) ) raise raised_error from err yield response diff --git a/tests/resources/files/test_download.py b/tests/resources/files/test_download.py index 34d1e8d..d5320d3 100644 --- a/tests/resources/files/test_download.py +++ b/tests/resources/files/test_download.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, cast +from typing import Any, cast from unittest.mock import AsyncMock import httpx @@ -11,9 +11,9 @@ @pytest.mark.asyncio async def test_stream_download_async_streams_and_closes_response(): - captured_requests: List[httpx.Request] = [] - captured_kwargs: List[Dict[str, Any]] = [] - captured_responses: List[httpx.Response] = [] + captured_requests: list[httpx.Request] = [] + captured_kwargs: list[dict[str, Any]] = [] + captured_responses: list[httpx.Response] = [] client = AsyncDial(api_key="dummy", base_url="http://dial.core") client._get_my_bucket = cast(Any, AsyncMock(return_value="test-bucket")) @@ -38,7 +38,9 @@ async def send_mock( assert response.filename == "file.txt" assert b"".join([chunk async for chunk in response]) == b"hello world" - assert captured_requests[0].url.path == "/v1/files/test-bucket/folder/file.txt" + assert ( + captured_requests[0].url.path == "/v1/files/test-bucket/folder/file.txt" + ) assert captured_kwargs == [{"stream": True}] assert captured_responses[0].is_closed is True From 74c50b312646eae8aef00bc04097bb9e5144d822 Mon Sep 17 00:00:00 2001 From: Danylo_Kriachkov Date: Tue, 23 Jun 2026 14:22:03 +0300 Subject: [PATCH 3/4] refactor: streamline error handling in HTTP clients and enhance download request preparation - Replaced inline error handling with a dedicated `_raise_for_status` method in both synchronous and asynchronous HTTP clients. - Introduced `_prepare_download_request` method in `DialStorageResourceMixin` to encapsulate download request logic, improving code clarity and reusability. - Updated type hints for `on_http_error` to use a consistent `ErrorHandler` type across the codebase. --- aidial_client/_http_client/_async.py | 29 +++--------- aidial_client/_http_client/_base.py | 20 +++++++++ aidial_client/_http_client/_sync.py | 15 ++----- aidial_client/helpers/storage_resource.py | 24 ++++++++++ aidial_client/resources/files.py | 55 ++++------------------- 5 files changed, 63 insertions(+), 80 deletions(-) diff --git a/aidial_client/_http_client/_async.py b/aidial_client/_http_client/_async.py index 5f8fc43..943c156 100644 --- a/aidial_client/_http_client/_async.py +++ b/aidial_client/_http_client/_async.py @@ -1,5 +1,5 @@ import asyncio -from collections.abc import AsyncIterator, Callable, Mapping +from collections.abc import AsyncIterator, Mapping from contextlib import asynccontextmanager, suppress from http import HTTPStatus from typing import Any @@ -8,7 +8,7 @@ from aidial_client._auth import AsyncAuthValue, aget_combined_auth_headers from aidial_client._exception import DialException -from aidial_client._http_client._base import BaseHTTPClient +from aidial_client._http_client._base import BaseHTTPClient, ErrorHandler from aidial_client._internal_types._defaults import NOT_GIVEN, NotGiven from aidial_client._internal_types._generic import ResponseT from aidial_client._internal_types._http_request import FinalRequestOptions @@ -51,8 +51,7 @@ async def request( options: FinalRequestOptions, cast_to: type[ResponseT], remaining_retries: int | None = None, - on_http_error: Callable[[httpx.HTTPStatusError], DialException | None] - | None = None, + on_http_error: ErrorHandler | None = None, ) -> ResponseT: retries = self._remaining_retries(remaining_retries, options) auth_headers = await self.auth_headers() @@ -101,13 +100,7 @@ async def request( cast_to=cast_to, remaining_retries=retries, ) - # Try to get a custom error from response status_code/code/message - custom_error = on_http_error(err) if on_http_error else None - # or fallback to default processing - raised_error = custom_error or self._make_dial_error_from_response( - err.response - ) - raise raised_error from err + self._raise_for_status(response, on_http_error) return process_block_response(cast_to=cast_to, response=response) @@ -116,9 +109,7 @@ async def stream( self, *, options: FinalRequestOptions, - on_http_error: ( - Callable[[httpx.HTTPStatusError], DialException | None] | None - ) = None, + on_http_error: ErrorHandler | None = None, ) -> AsyncIterator[httpx.Response]: auth_headers = await self.auth_headers() request = self._build_request(options, auth_headers) @@ -135,15 +126,7 @@ async def stream( raise DialException(message=f"Request failed: {err}") from err try: - try: - response.raise_for_status() - except httpx.HTTPStatusError as err: - custom_error = on_http_error(err) if on_http_error else None - raised_error = ( - custom_error - or self._make_dial_error_from_response(err.response) - ) - raise raised_error from err + self._raise_for_status(response, on_http_error) yield response finally: await response.aclose() diff --git a/aidial_client/_http_client/_base.py b/aidial_client/_http_client/_base.py index 7723c53..2b00077 100644 --- a/aidial_client/_http_client/_base.py +++ b/aidial_client/_http_client/_base.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from collections.abc import Callable from http import HTTPStatus from random import uniform from typing import Generic, TypeVar @@ -16,6 +17,8 @@ "_HttpInternalClientT", bound=httpx.Client | httpx.AsyncClient ) +ErrorHandler = Callable[[httpx.HTTPStatusError], DialException | None] + class BaseHTTPClient(ABC, Generic[_HttpInternalClientT, AuthValueT]): _internal_http_client: _HttpInternalClientT @@ -106,6 +109,23 @@ def _calculate_retry_sleep_seconds( timeout = sleep_seconds + uniform(-0.5, 0.5) # noqa: S311 return max(0, timeout) + def _raise_for_status( + self, + response: httpx.Response, + on_http_error: ErrorHandler | None, + ) -> None: + try: + response.raise_for_status() + except httpx.HTTPStatusError as err: + # Try to get a custom error from response status_code/code/message + custom_error = on_http_error(err) if on_http_error else None + # or fallback to default processing + raised_error = ( + custom_error + or self._make_dial_error_from_response(err.response) + ) + raise raised_error from err + def _make_dial_error_from_response( self, response: httpx.Response, diff --git a/aidial_client/_http_client/_sync.py b/aidial_client/_http_client/_sync.py index 40e8cff..31d4796 100644 --- a/aidial_client/_http_client/_sync.py +++ b/aidial_client/_http_client/_sync.py @@ -1,5 +1,5 @@ import time -from collections.abc import Callable, Iterator, Mapping +from collections.abc import Iterator, Mapping from contextlib import contextmanager, suppress from http import HTTPStatus from typing import Any @@ -8,7 +8,7 @@ from aidial_client._auth import SyncAuthValue, get_combined_auth_headers from aidial_client._exception import DialException -from aidial_client._http_client._base import BaseHTTPClient +from aidial_client._http_client._base import BaseHTTPClient, ErrorHandler from aidial_client._internal_types._defaults import NOT_GIVEN, NotGiven from aidial_client._internal_types._generic import ResponseT from aidial_client._internal_types._http_request import FinalRequestOptions @@ -50,8 +50,7 @@ def request( cast_to: type[ResponseT], options: FinalRequestOptions, remaining_retries: int | None = None, - on_http_error: Callable[[httpx.HTTPStatusError], DialException | None] - | None = None, + on_http_error: ErrorHandler | None = None, ) -> ResponseT: retries = self._remaining_retries(remaining_retries, options) auth_headers = self.auth_headers() @@ -101,13 +100,7 @@ def request( cast_to=cast_to, remaining_retries=retries, ) - # Try to get a custom error from response status_code/code/message - custom_error = on_http_error(err) if on_http_error else None - # or fallback to default processing - raised_error = custom_error or self._make_dial_error_from_response( - err.response - ) - raise raised_error from err + self._raise_for_status(response, on_http_error) return process_block_response(cast_to=cast_to, response=response) diff --git a/aidial_client/helpers/storage_resource.py b/aidial_client/helpers/storage_resource.py index 3bf00e5..27c9501 100644 --- a/aidial_client/helpers/storage_resource.py +++ b/aidial_client/helpers/storage_resource.py @@ -5,6 +5,8 @@ from aidial_client._compatibility.pydantic_v1 import BaseModel from aidial_client._constants import API_PREFIX from aidial_client._exception import InvalidDialURLError, NotDialURLError +from aidial_client._internal_types._http_request import FinalRequestOptions +from aidial_client._utils._dict import remove_none from aidial_client.helpers._url import enforce_trailing_slash StorageResourceType = Literal["files", "conversations", "prompts"] @@ -156,3 +158,25 @@ def get_display_name(self, url: str) -> str: Get the display name of the resource from the URL """ return self.get_storage_resource(url).bucket_path + + def _prepare_download_request( + self, + url: str | PurePosixPath, + etag_if_match: str | None, + ) -> tuple[FinalRequestOptions, str]: + storage_resource = self.get_storage_resource(str(url)) + + if storage_resource.filename is None: + raise InvalidDialURLError("URL points to a directory, not a file") + + options = FinalRequestOptions( + method="GET", + url=urljoin(API_PREFIX, storage_resource.api_path), + headers=remove_none( + { + "If-Match": etag_if_match, + } + ), + ) + + return options, storage_resource.filename diff --git a/aidial_client/resources/files.py b/aidial_client/resources/files.py index 8e3507c..5ea97cc 100644 --- a/aidial_client/resources/files.py +++ b/aidial_client/resources/files.py @@ -10,7 +10,6 @@ from aidial_client._exception import ( DialException, EtagMismatchError, - InvalidDialURLError, ResourceNotFoundError, ) from aidial_client._internal_types._generic import NoneType @@ -72,25 +71,13 @@ def download( url: str | PurePosixPath, etag_if_match: str | None = None, ) -> FileDownloadResponse: - storage_resource = self.get_storage_resource(str(url)) - if storage_resource.filename is None: - raise InvalidDialURLError("URL points to a directory, not a file") + options, filename = self._prepare_download_request(url, etag_if_match) response = self.http_client.request( cast_to=httpx.Response, - options=FinalRequestOptions( - method="GET", - url=urljoin(API_PREFIX, storage_resource.api_path), - headers=remove_none( - { - "If-Match": etag_if_match, - } - ), - ), + options=options, on_http_error=_files_error_processor, ) - return FileDownloadResponse( - response=response, filename=storage_resource.filename - ) + return FileDownloadResponse(response=response, filename=filename) def delete( self, @@ -190,25 +177,13 @@ async def download( url: str | PurePosixPath, etag_if_match: str | None = None, ) -> FileDownloadResponse: - storage_resource = self.get_storage_resource(str(url)) - if storage_resource.filename is None: - raise InvalidDialURLError("URL points to a directory, not a file") + options, filename = self._prepare_download_request(url, etag_if_match) response = await self.http_client.request( cast_to=httpx.Response, - options=FinalRequestOptions( - method="GET", - url=urljoin(API_PREFIX, storage_resource.api_path), - headers=remove_none( - { - "If-Match": etag_if_match, - } - ), - ), + options=options, on_http_error=_files_error_processor, ) - return FileDownloadResponse( - response=response, filename=storage_resource.filename - ) + return FileDownloadResponse(response=response, filename=filename) @asynccontextmanager async def stream_download( @@ -216,24 +191,12 @@ async def stream_download( url: str | PurePosixPath, etag_if_match: str | None = None, ) -> AsyncIterator[FileDownloadResponse]: - storage_resource = self.get_storage_resource(str(url)) - if storage_resource.filename is None: - raise InvalidDialURLError("URL points to a directory, not a file") + options, filename = self._prepare_download_request(url, etag_if_match) async with self.http_client.stream( - options=FinalRequestOptions( - method="GET", - url=urljoin(API_PREFIX, storage_resource.api_path), - headers=remove_none( - { - "If-Match": etag_if_match, - } - ), - ), + options=options, on_http_error=_files_error_processor, ) as response: - yield FileDownloadResponse( - response=response, filename=storage_resource.filename - ) + yield FileDownloadResponse(response=response, filename=filename) async def delete( self, From 07abf27a7a43f881b50176ef10de95f1caac0b12 Mon Sep 17 00:00:00 2001 From: Danylo_Kriachkov Date: Tue, 23 Jun 2026 14:24:02 +0300 Subject: [PATCH 4/4] refactor: improve error handling logic in BaseHTTPClient - Simplified the error handling process by consolidating the raised error logic into a single line, enhancing readability and maintainability. --- aidial_client/_http_client/_base.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/aidial_client/_http_client/_base.py b/aidial_client/_http_client/_base.py index 2b00077..5712ab6 100644 --- a/aidial_client/_http_client/_base.py +++ b/aidial_client/_http_client/_base.py @@ -120,9 +120,8 @@ def _raise_for_status( # Try to get a custom error from response status_code/code/message custom_error = on_http_error(err) if on_http_error else None # or fallback to default processing - raised_error = ( - custom_error - or self._make_dial_error_from_response(err.response) + raised_error = custom_error or self._make_dial_error_from_response( + err.response ) raise raised_error from err