-
Notifications
You must be signed in to change notification settings - Fork 87
LCORE-2309: Added Pydantic AI Bridge #1817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3def1bc
2c38478
9926b0c
5460b50
7d77ef4
45ba392
53e1b86
fe39d3e
d4c320d
773aca1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """Pydantic AI integrations/extensions for Lightspeed Core Stack.""" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| """Pydantic AI provider for Llama Stack.""" | ||
|
|
||
| from pydantic_ai_lightspeed.llamastack._provider import LlamaStackProvider | ||
|
|
||
| __all__ = ["LlamaStackProvider"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| """Llama Stack provider implementation for Pydantic AI.""" | ||
|
|
||
| from __future__ import annotations as _annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| import httpx | ||
| from openai import AsyncOpenAI | ||
| from pydantic_ai import ModelProfile | ||
| from pydantic_ai.models import create_async_http_client | ||
| from pydantic_ai.profiles.openai import openai_model_profile | ||
| from pydantic_ai.providers import Provider | ||
|
|
||
| from pydantic_ai_lightspeed.llamastack._transport import LlamaStackLibraryTransport | ||
|
|
||
| if TYPE_CHECKING: | ||
| from llama_stack.core.library_client import AsyncLlamaStackAsLibraryClient | ||
|
|
||
| DEFAULT_BASE_URL = "http://localhost:8321/v1" | ||
|
|
||
|
|
||
| class LlamaStackProvider(Provider[AsyncOpenAI]): | ||
| """Provider for Llama Stack — connects to a Llama Stack server's OpenAI-compatible API. | ||
|
|
||
| Supports two modes: | ||
|
|
||
| 1. **Server mode** — connect to a running Llama Stack server via HTTP | ||
| 2. **Library mode** — run Llama Stack in-process via ``AsyncLlamaStackAsLibraryClient`` | ||
| """ | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| """The provider name.""" | ||
| return "llama-stack" | ||
|
|
||
| @property | ||
| def base_url(self) -> str: | ||
| """The base URL for the provider API.""" | ||
| return str(self._client.base_url) | ||
|
|
||
| @property | ||
| def client(self) -> AsyncOpenAI: | ||
| """The OpenAI-compatible client for the provider.""" | ||
| return self._client | ||
|
|
||
| @staticmethod | ||
| def model_profile(model_name: str) -> ModelProfile | None: | ||
| """Return the model profile for the named model, if available.""" | ||
| return openai_model_profile(model_name) | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| base_url: str | None = None, | ||
| api_key: str | None = None, | ||
| library_client: AsyncLlamaStackAsLibraryClient | None = None, | ||
| http_client: httpx.AsyncClient | None = None, | ||
| ) -> None: | ||
| """Create a new Llama Stack provider. | ||
|
|
||
| Args: | ||
| base_url: The base URL for the Llama Stack server (OpenAI-compatible endpoint). | ||
| Defaults to ``http://localhost:8321/v1``. | ||
| Must be ``None`` when ``library_client`` is provided. | ||
| api_key: The API key for authentication. Defaults to ``'not-needed'`` since | ||
| local Llama Stack servers typically don't require one. | ||
| Must be ``None`` when ``library_client`` is provided. | ||
| library_client: An initialized ``AsyncLlamaStackAsLibraryClient`` for library mode. | ||
| When provided, requests are dispatched in-process (no server needed). | ||
| Mutually exclusive with ``base_url``, ``api_key``, and ``http_client``. | ||
| http_client: An existing ``httpx.AsyncClient`` to use for making HTTP requests. | ||
| Must be ``None`` when ``library_client`` is provided. | ||
| """ | ||
| if library_client is not None: | ||
|
github-advanced-security[bot] marked this conversation as resolved.
Fixed
|
||
| if base_url is not None: | ||
| raise ValueError("Cannot provide both `library_client` and `base_url`") | ||
| if api_key is not None: | ||
| raise ValueError("Cannot provide both `library_client` and `api_key`") | ||
| if http_client is not None: | ||
| raise ValueError( | ||
| "Cannot provide both `library_client` and `http_client`" | ||
| ) | ||
|
|
||
| self._library_client = library_client | ||
| transport = LlamaStackLibraryTransport(library_client) | ||
| lib_http_client = httpx.AsyncClient( | ||
| transport=transport, | ||
Check warningCode scanning / Bandit Call to httpx without timeout Warning
Call to httpx without timeout
|
||
|
Comment on lines
+86
to
+87
|
||
| base_url="http://llama-stack-library", | ||
| timeout=httpx.Timeout(None), | ||
| ) | ||
| self._client = AsyncOpenAI( | ||
| http_client=lib_http_client, | ||
| base_url="http://llama-stack-library/v1", | ||
| api_key="not-needed", | ||
| ) | ||
| else: | ||
| base_url = base_url or DEFAULT_BASE_URL | ||
| api_key = api_key or "not-needed" | ||
|
|
||
| if http_client is not None: | ||
| self._client = AsyncOpenAI( | ||
| base_url=base_url, api_key=api_key, http_client=http_client | ||
| ) | ||
| else: | ||
| oai_http_client = create_async_http_client() | ||
| self._client = AsyncOpenAI( | ||
| base_url=base_url, api_key=api_key, http_client=oai_http_client | ||
| ) | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Return a string representation of the provider.""" | ||
| return f"LlamaStackProvider(name={self.name!r}, base_url={self.base_url!r})" | ||
|
|
||
| def _set_http_client(self, http_client: httpx.AsyncClient) -> None: | ||
| """Inject an httpx.AsyncClient into the underlying OpenAI client. | ||
|
|
||
| Replaces the internal HTTP transport by assigning directly to the | ||
| protected ``self._client._client`` attribute of the AsyncOpenAI instance. | ||
|
|
||
| Args: | ||
| http_client: The async HTTP client to use for subsequent requests. | ||
| """ | ||
| self._client._client = http_client # pyright: ignore[reportPrivateUsage] # pylint: disable=protected-access | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,197 @@ | ||
| """httpx transport that routes OpenAI-compatible requests through a Llama Stack library client.""" | ||
|
|
||
| from __future__ import annotations as _annotations | ||
|
|
||
| import json | ||
| from collections.abc import AsyncGenerator, AsyncIterator | ||
| from typing import Any | ||
|
|
||
| import httpx | ||
| from llama_stack.core.library_client import ( | ||
| AsyncLlamaStackAsLibraryClient, | ||
| convert_pydantic_to_json_value, | ||
| ) | ||
| from llama_stack.core.request_headers import ( | ||
| PROVIDER_DATA_VAR, | ||
| request_provider_data_context, | ||
| ) | ||
| from llama_stack.core.server.routes import find_matching_route | ||
| from llama_stack.core.utils.context import preserve_contexts_async_generator | ||
|
|
||
|
|
||
| class _AsyncByteStream(httpx.AsyncByteStream): | ||
| """Wraps an async byte generator as an httpx AsyncByteStream.""" | ||
|
|
||
| def __init__(self, gen: AsyncGenerator[bytes, None]) -> None: | ||
| """Store an async generator that yields raw bytes for streaming. | ||
|
|
||
| Args: | ||
| gen: An async generator producing byte chunks to stream. | ||
| """ | ||
| self._gen = gen | ||
|
|
||
| async def __aiter__(self) -> AsyncIterator[bytes]: | ||
| """Yield bytes chunks from the wrapped generator. | ||
|
|
||
| Returns: | ||
| An async iterator of bytes fulfilling the httpx.AsyncByteStream contract. | ||
| """ | ||
| async for chunk in self._gen: | ||
| yield chunk | ||
|
Comment on lines
+22
to
+40
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: In httpx 0.27.0, the expected lifecycle for custom AsyncByteStream implementations is that consuming the response body to completion automatically closes the stream, and explicit Response.aclose is what closes the response and releases the connection. When a custom stream needs cleanup (e.g., releasing resources), it should implement aclose (and ensure it’s safe to call). 1) Should closing an httpx.Response propagate an aclose call to the stream? Yes. - Response.aclose is implemented to “Close the response and release the connection” and it explicitly calls await self.stream.aclose for AsyncByteStream instances [1]. - The AsyncByteStream base class’s aread method ensures that, when the stream is read, it calls await self.aclose in a finally block [2]. - The Response streaming methods are documented as being closed automatically when used inside the client’s.stream context manager; alternatively, in manual mode you must call Response.aclose eventually, otherwise connections may be left open [3]. So: Response close/aclose drives stream cleanup via AsyncByteStream.aclose. 2) Is implementing aclose recommended when wrapping an async generator? Yes, if your wrapper is a custom AsyncByteStream that owns resources or needs generator cleanup. The AsyncByteStream interface includes an aclose method specifically for cleanup, and the framework relies on it to be awaited at the end of reading [2]. Additionally, httpx’s own AsyncIteratorByteStream wrapper records whether the provided iterable is an async generator and enforces “StreamConsumed” semantics for async generators after they’ve been iterated once [4]. That wrapper’s code also treats aread/iteration completion as a point at which aclose is expected to happen (via the base-class aread finally calling aclose) [2][4]. Practical guidance for custom wrappers around async generators: - Implement AsyncByteStream.aclose and ensure it properly closes the underlying generator (e.g., by calling await agen.aclose in your wrapper’s aclose, or using a pattern equivalent to that), so that early Response.aclose or early loop exit triggers generator cleanup via stream.aclose [1]. - If your wrapper doesn’t implement aclose, you won’t get cleanup when Response.aclose calls into the stream, beyond whatever implicit behavior happens to occur during iteration [2][1]. References (from httpx code/docs): AsyncByteStream.aclose contract and aread finally behavior [2]; Response.aclose calling await self.stream.aclose [1]; Response streaming context/manual-mode cleanup expectations [3]; AsyncIteratorByteStream’s handling of async generators [4]. Citations:
🏁 Script executed: #!/bin/bash
set -euo pipefail
# Locate the target file and print the relevant sections around _AsyncByteStream and header logic.
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"
echo "== File exists =="
ls -l "$FILE"
echo
echo "== _AsyncByteStream definition =="
rg -n "_AsyncByteStream" "$FILE"
rg -n "class _AsyncByteStream|def __aiter__" "$FILE"
echo
# Print a reasonable window around the class definition
LINE=$(rg -n "class _AsyncByteStream" "$FILE" | head -n1 | cut -d: -f1)
START=$((LINE-20))
END=$((LINE+80))
sed -n "${START},${END}p" "$FILE" | nl -ba
echo
echo "== Search for provider-data header injection =="
rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" || true
echo
echo "== Any header casing / duplicate handling nearby =="
# Print around the first match if found
FIRST=$(rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" | head -n1 | cut -d: -f1 || true)
if [[ -n "${FIRST:-}" ]]; then
START=$((FIRST-30))
END=$((FIRST+80))
sed -n "${START},${END}p" "$FILE" | nl -ba
fi
echo
echo "== Any existing aclose implementation in this file =="
rg -n "def aclose|aclose\(" "$FILE" || trueRepository: lightspeed-core/lightspeed-stack Length of output: 505 🏁 Script executed: #!/bin/bash
set -euo pipefail
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"
echo "== _AsyncByteStream definition =="
LINE=$(rg -n "class _AsyncByteStream" "$FILE" | head -n1 | cut -d: -f1)
START=$((LINE-15))
END=$((LINE+90))
sed -n "${START},${END}p" "$FILE" | cat -n -w3 -s ':'
echo
echo "== Search for provider-data header injection =="
rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" || true
echo
echo "== Any existing aclose implementation in this file =="
rg -n "def aclose|aclose\(" "$FILE" || trueRepository: lightspeed-core/lightspeed-stack Length of output: 194 🏁 Script executed: #!/bin/bash
set -euo pipefail
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"
echo "== _AsyncByteStream definition (context) =="
LINE=$(rg -n "class _AsyncByteStream" "$FILE" | head -n1 | cut -d: -f1)
START=$((LINE-15))
END=$((LINE+90))
sed -n "${START},${END}p" "$FILE" | cat -n
echo
echo "== provider-data header injection hits =="
rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" || true
echo
echo "== Any existing aclose implementation / calls =="
rg -n "def aclose|aclose\(" "$FILE" || trueRepository: lightspeed-core/lightspeed-stack Length of output: 4951 🏁 Script executed: #!/bin/bash
set -euo pipefail
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"
sed -n '1,40p' "$FILE" | cat -nRepository: lightspeed-core/lightspeed-stack Length of output: 1652 Add
🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| class LlamaStackLibraryTransport(httpx.AsyncBaseTransport): | ||
| """Custom httpx transport that dispatches requests through a Llama Stack library client. | ||
|
|
||
| Instead of making real HTTP calls, this transport routes requests directly | ||
| to the Llama Stack's in-process route handlers via the library client's | ||
| route matching and body conversion logic. | ||
| """ | ||
|
|
||
| def __init__(self, client: AsyncLlamaStackAsLibraryClient) -> None: | ||
| """Initialize the transport with a Llama Stack library client. | ||
|
|
||
| Args: | ||
| client: An initialized ``AsyncLlamaStackAsLibraryClient`` whose route | ||
| handlers will receive dispatched requests. | ||
| """ | ||
| self._client = client | ||
|
|
||
| async def handle_async_request(self, request: httpx.Request) -> httpx.Response: | ||
| """Dispatch an httpx request to the in-process Llama Stack route handlers. | ||
|
|
||
| Args: | ||
| request: The outgoing httpx request to route. | ||
|
|
||
| Returns: | ||
| An httpx response built from the matched route handler result. | ||
|
|
||
| Raises: | ||
| RuntimeError: If the library client has not been initialized. | ||
| """ | ||
| if self._client.route_impls is None: | ||
| raise RuntimeError( | ||
| "Llama Stack library client not initialized. Call initialize() first." | ||
| ) | ||
|
|
||
| method = request.method | ||
| path = request.url.raw_path.decode("utf-8") | ||
|
|
||
| body = json.loads(request.content) if request.content else {} | ||
|
|
||
| headers: dict[str, str] = { | ||
|
github-advanced-security[bot] marked this conversation as resolved.
Fixed
|
||
| k.decode("utf-8") if isinstance(k, bytes) else k: ( | ||
| v.decode("utf-8") if isinstance(v, bytes) else v | ||
| ) | ||
| for k, v in request.headers.raw | ||
| } | ||
|
|
||
| if self._client.provider_data: | ||
| keys = ["X-LlamaStack-Provider-Data", "x-llamastack-provider-data"] | ||
| if all(key not in headers for key in keys): | ||
| headers["X-LlamaStack-Provider-Data"] = json.dumps( | ||
| self._client.provider_data | ||
| ) | ||
|
Comment on lines
+89
to
+94
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Normalize header names before deciding whether to inject provider data. HTTP header names are case-insensitive. This check only recognizes two spellings, so a request that already carries Proposed fix if self._client.provider_data:
- keys = ["X-LlamaStack-Provider-Data", "x-llamastack-provider-data"]
- if all(key not in headers for key in keys):
+ normalized_headers = {key.lower() for key in headers}
+ if "x-llamastack-provider-data" not in normalized_headers:
headers["X-LlamaStack-Provider-Data"] = json.dumps(
self._client.provider_data
)🤖 Prompt for AI Agents |
||
|
|
||
| with request_provider_data_context(headers): | ||
| is_stream = body.get("stream", False) | ||
|
|
||
| if is_stream: | ||
| return await self._handle_streaming(request, method, path, body) | ||
| return await self._handle_non_streaming(request, method, path, body) | ||
|
|
||
| async def _handle_non_streaming( | ||
| self, | ||
| request: httpx.Request, | ||
| method: str, | ||
| path: str, | ||
| body: dict[str, Any], | ||
| ) -> httpx.Response: | ||
| """Dispatch a non-streaming request to the matched route handler. | ||
|
|
||
| Args: | ||
| request: The original httpx request (attached to the response). | ||
| method: The HTTP method (e.g. ``"POST"``). | ||
| path: The decoded URL path used for route matching. | ||
| body: The parsed JSON request body. | ||
|
|
||
| Returns: | ||
| An httpx.Response containing the JSON-serialized handler result. | ||
|
|
||
| Raises: | ||
| RuntimeError: If route_impls is not initialized. | ||
| """ | ||
| if self._client.route_impls is None: | ||
| raise RuntimeError("route_impls is not initialized") | ||
|
|
||
| matched_func, path_params, _, _ = find_matching_route( | ||
| method, path, self._client.route_impls | ||
| ) | ||
| merged_body = {**body, **path_params} | ||
| merged_body = self._client._convert_body( # pylint: disable=protected-access | ||
| matched_func, merged_body | ||
| ) | ||
|
|
||
| result = await matched_func(**merged_body) | ||
|
|
||
| json_content = json.dumps(convert_pydantic_to_json_value(result)) | ||
| status_code = httpx.codes.OK | ||
|
|
||
| if method.upper() == "DELETE" and result is None: | ||
| status_code = httpx.codes.NO_CONTENT | ||
|
github-advanced-security[bot] marked this conversation as resolved.
Fixed
|
||
| json_content = "" | ||
|
|
||
| return httpx.Response( | ||
| status_code=status_code, | ||
| content=json_content.encode("utf-8"), | ||
| headers={"Content-Type": "application/json"}, | ||
| request=request, | ||
| ) | ||
|
|
||
| async def _handle_streaming( | ||
| self, | ||
| request: httpx.Request, | ||
| method: str, | ||
| path: str, | ||
| body: dict[str, Any], | ||
| ) -> httpx.Response: | ||
| """Dispatch a streaming request and return an SSE event-stream response. | ||
|
|
||
| Args: | ||
| request: The original httpx request (attached to the response). | ||
| method: The HTTP method (e.g. ``"POST"``). | ||
| path: The decoded URL path used for route matching. | ||
| body: The parsed JSON request body (must contain ``stream: True``). | ||
|
|
||
| Returns: | ||
| An httpx.Response with a streaming body of SSE-formatted chunks. | ||
|
|
||
| Raises: | ||
| RuntimeError: If route_impls is not initialized. | ||
| """ | ||
| if self._client.route_impls is None: | ||
| raise RuntimeError("route_impls is not initialized") | ||
|
|
||
| func, path_params, _, _ = find_matching_route( | ||
| method, path, self._client.route_impls | ||
| ) | ||
| merged_body = {**body, **path_params} | ||
| merged_body = self._client._convert_body( # pylint: disable=protected-access | ||
| func, merged_body | ||
| ) | ||
|
|
||
| result = await func(**merged_body) | ||
|
|
||
| async def gen() -> AsyncGenerator[bytes, None]: | ||
| async for chunk in result: | ||
| data = json.dumps(convert_pydantic_to_json_value(chunk)) | ||
| yield f"data: {data}\n\n".encode("utf-8") | ||
|
|
||
| wrapped_gen = preserve_contexts_async_generator(gen(), [PROVIDER_DATA_VAR]) | ||
|
|
||
| return httpx.Response( | ||
| status_code=httpx.codes.OK, | ||
| stream=_AsyncByteStream(wrapped_gen), | ||
| headers={"Content-Type": "text/event-stream"}, | ||
| request=request, | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.