diff --git a/docs.json b/docs.json index 5ab241a1..8c9ef6b2 100644 --- a/docs.json +++ b/docs.json @@ -124,7 +124,8 @@ "pages": [ "guides/projects/private-rag-bot", "guides/projects/private-research-agent", - "guides/projects/security-code-reviewer" + "guides/projects/security-code-reviewer", + "guides/projects/rust-llm-gateway" ] } ] diff --git a/guides/projects/rust-llm-gateway.mdx b/guides/projects/rust-llm-gateway.mdx new file mode 100644 index 00000000..fa5b0423 --- /dev/null +++ b/guides/projects/rust-llm-gateway.mdx @@ -0,0 +1,1240 @@ +--- +title: "Building a Rust LLM Gateway" +slug: rust-llm-gateway-venice +"og:title": "Building a Rust LLM Gateway with Venice AI" +"og:description": "A practical guide to building an OpenAI-compatible Rust LLM gateway with Axum, Postgres, SQLx, streaming responses, rate limits, and OpenTelemetry." + +--- +import { AuthorByline } from "/snippets/authorByline.jsx"; + + + +Most AI apps start by calling a model API directly. That works well for prototypes, but once multiple apps, services, or customers need access, direct provider calls get harder to manage. Every service needs a provider key, every client needs to learn provider-specific behaviour, and every team ends up solving authentication, limits, and observability slightly differently. + +An LLM gateway gives us one place to authenticate callers, enforce rate limits, hide upstream provider keys, record telemetry, and keep a stable API for our own applications. In this tutorial, we'll build one in Rust with Axum, Postgres, SQLx, and the Venice AI API. + +By the end, you'll have a gateway that exposes an OpenAI-compatible `/v1/chat/completions` endpoint, accepts your own bearer tokens, forwards requests to Venice, supports streaming responses, and emits useful OpenTelemetry spans and metrics. + +Interested in the full code implementation? Check out [the GitHub repo.](https://github.com/joshua-mo-143/venice-llm-gateway-demo) + +## Pre-requisites + +- Rust 1.92+ +- Docker and Docker Compose +- A Venice API key +- `curl` +- Basic familiarity with Rust web services + +Before we start, export your Venice API key: + +```bash +export VENICE_API_KEY="your-venice-api-key" +``` + +We will never expose this key to client apps. The gateway will hold it server-side and clients will authenticate with gateway-specific API keys instead. + +## What We're Building + +The reference implementation is a small Rust service with a few clear parts: + + +| Part | What it does | +| -------------- | ------------------------------------------------------------------------------------- | +| Axum router | Serves `/healthz` and `/v1/chat/completions` | +| Auth extractor | Validates gateway bearer tokens against hashed keys in Postgres | +| Rate limiter | Uses fixed windows stored in Postgres | +| Venice client | Proxies non-streaming and streaming chat completion requests | +| Telemetry | Records GenAI spans, token usage, Venice billing cost, latency, and streaming timings | +| Docker Compose | Runs Postgres and the gateway locally | + + +![Architecture diagram showing a client calling the Rust gateway, Postgres, Venice AI, and OpenTelemetry](/images/guides/llm-gateway-architecture.svg) + +*A client sends an OpenAI-compatible request to the gateway. The gateway authenticates the caller, checks rate limits, forwards the request to Venice, and records telemetry along the way.* + +As part of the gateway, we'll be ensuring that this service remains horizontally scalable with the least amount of surface area covered when it comes to the API itself. There's a few reasons for this - one of them primarily being that if you have a very high amount of throughput for example, you will almost certainly want to use replicas (ie, spin up more than 1 of the same service). This means that if you aren't already, architecturally you'll want to put your original service and the replicas behind a load balancer so that if one container or service goes down, the entire service does not experience an outage. + +Additionally, we will also assume that we own API key creation in some manner, although the gateway service shouldn't be minting them in isolation. This will be represented as a Postgres table that we seed when used locally. In production, this would typically be handled by the authentication service. Although it is possible to handle creating an API key upstream for every user who uses your LLM gateway, in practice this is not generally advisable. By offloading this responsibility to the upstream service, you also offload any control you would normally have - which means you can't fully enforce things like rate limiting and spend caps. + +The source tree stays intentionally small: + +```text +. +├── Cargo.toml +├── Dockerfile +├── docker-compose.yml +├── migrations/ +│ └── 0001_api_keys.sql +├── scripts/ +│ ├── seed_api_key.sh +│ └── smoke_chat.sh +└── src/ + ├── auth.rs + ├── config.rs + ├── error.rs + ├── main.rs + ├── rate_limit.rs + ├── router.rs + ├── sse.rs + ├── state.rs + ├── telemetry.rs + └── venice.rs +``` + +Without any further ado, let's get building. + +## Creating the Rust service + +Start with a new Rust binary project: + +```bash +cargo new llm-gateway +cd llm-gateway +``` + +Add the dependencies we need in `Cargo.toml` - explanations added in the code snippet: + +```toml +[dependencies] +# Web backend +axum = "0.8.9" +tower = "0.5.3" +tower-http = { version = "0.6.11", features = ["trace", "request-id", "sensitive-headers", "timeout", "limit"] } + +## Observability +opentelemetry = "0.32.0" +opentelemetry-otlp = "0.32.0" +opentelemetry_sdk = "0.32.1" +tracing = "0.1.44" +tracing-opentelemetry = "0.33.0" +tracing-subscriber = { version = "0.3.23", features = ["env-filter", "json"] } + +# HTTP request/response handling +reqwest = { version = "0.12.28", default-features = false, features = ["json", "stream", "rustls-tls", "http2", "charset"] } +reqwest-eventsource = "0.6.0" +bytes = "1.11.1" +futures-util = "0.3.32" + +# Serialization +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.150" + +# Hashing/crypto +sha2 = "0.11.0" +subtle = "2.6.1" + +# SQL +sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "uuid", "time", "macros", "migrate"] } + +# utils +thiserror = "2.0.18" # a crate for writing ergonomic errors +time = { version = "0.3.47", features = ["serde", "formatting", "macros"] } # time +tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread", "signal", "net", "time"] } # the most popular async Rust runtime +uuid = { version = "1.23.3", features = ["serde", "v4"] } +``` + +## Loading configuration + +The gateway reads everything from environment variables. For infrastructure code like this, environment variables are a good default because the same binary can run locally, in Docker Compose, or in a hosted environment without a separate config file format. Additionally, many providers will allow you to store your own environment variables as secrets in their own container runtime. This is often much safer than trying to use something like `dotenv` (or `dotenvy` in Rust, as the original `dotenv` crate is mostly deprecated). + +Create `src/config.rs`: + +```rust +use std::{env, net::SocketAddr, str::FromStr, time::Duration}; + +#[derive(Clone, Debug)] +pub struct Config { + pub bind_addr: SocketAddr, + pub database_url: String, + pub venice_api_key: String, + pub venice_base_url: String, + pub service_name: String, + pub otlp_endpoint: Option, + pub request_timeout: Duration, + pub request_body_limit_bytes: usize, + pub venice_max_retries: u32, + pub capture_genai_content: bool, +} + +impl Config { + pub fn from_env() -> Result { + Ok(Self { + bind_addr: parse_env("BIND_ADDR", "0.0.0.0:3000")?, + database_url: required_env("DATABASE_URL")?, + venice_api_key: required_env("VENICE_API_KEY")?, + venice_base_url: env::var("VENICE_BASE_URL") + .unwrap_or_else(|_| "https://api.venice.ai/api/v1".to_owned()), + service_name: env::var("OTEL_SERVICE_NAME") + .unwrap_or_else(|_| "llm-gateway".to_owned()), + otlp_endpoint: optional_nonempty_env("OTEL_EXPORTER_OTLP_ENDPOINT"), + request_timeout: Duration::from_secs(parse_env("REQUEST_TIMEOUT_SECONDS", "120")?), + request_body_limit_bytes: parse_env("REQUEST_BODY_LIMIT_BYTES", "1048576")?, + venice_max_retries: parse_env("VENICE_MAX_RETRIES", "2")?, + capture_genai_content: parse_env("CAPTURE_GENAI_CONTENT", "false")?, + }) + } +} +``` + +While there are a lot of possible values being parsed here from environment variables, generally speaking you only need two: +- The database URL +- Your Venice API key + +Two defaults matter here. `VENICE_BASE_URL` points at `https://api.venice.ai/api/v1`, and `CAPTURE_GENAI_CONTENT` defaults to `false` so prompt content is not logged unless you intentionally enable it. + +That second default is the more important one. A gateway can see every prompt and response flowing through it, but observability should not automatically become content capture. In most production systems, token counts, latency, model names, status codes, and billing metadata are enough for operations. Generally speaking, logging prompts and conversations in production may not only be a privacy liability - it can also be a storage liability. Adding them means creating spans and traces with an extremely high level of cardinality (ie, the uniqueness of data within a dataset). This can make searching through your observability data very inexpensive, in addition to potentially hampering performance when searching through the data. + +## Creating the database schema + +Next, create `migrations/0001_api_keys.sql`. + +We will store only the first 12 characters of each gateway API key as a lookup prefix, plus the SHA-256 hash of the full key. That lets the gateway find a candidate row quickly without storing raw credentials. + +The prefix is not secret. It exists for indexing. The hash is what proves that the caller presented the full key. This is the same basic shape used by many API-key systems: show the raw key once, store a non-reversible representation, and keep a short prefix around for lookup and support workflows. + +```sql +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +CREATE TABLE api_keys + ( api_key_id uuid PRIMARY KEY DEFAULT gen_random_uuid() + , subject_id uuid NOT NULL + , name text NOT NULL + , key_prefix text NOT NULL + , key_hash bytea NOT NULL + , rate_limit_requests integer NOT NULL DEFAULT 60 + , rate_limit_window_seconds integer NOT NULL DEFAULT 60 + , is_active boolean NOT NULL DEFAULT true + , created_at timestamptz NOT NULL DEFAULT now() + , last_used_at timestamptz + , revoked_at timestamptz + , CONSTRAINT api_keys_prefix_length CHECK (length(key_prefix) = 12) + , CONSTRAINT api_keys_hash_sha256 CHECK (length(key_hash) = 32) + , CONSTRAINT api_keys_rate_limit_requests_positive CHECK (rate_limit_requests > 0) + , CONSTRAINT api_keys_rate_limit_window_positive CHECK (rate_limit_window_seconds > 0) + , CONSTRAINT api_keys_revoked_inactive CHECK (revoked_at IS NULL OR NOT is_active) + ); + +CREATE UNIQUE INDEX api_keys_key_prefix_key +ON api_keys (key_prefix); + +CREATE UNIQUE INDEX api_keys_key_hash_key +ON api_keys (key_hash); +``` + +Now add a table for fixed-window rate limiting: + +```sql +CREATE TABLE api_key_rate_limit_windows + ( api_key_id uuid NOT NULL + REFERENCES api_keys (api_key_id) + ON DELETE RESTRICT + ON UPDATE RESTRICT + , window_start_at timestamptz NOT NULL + , window_seconds integer NOT NULL + , request_count integer NOT NULL + , updated_at timestamptz NOT NULL DEFAULT now() + , PRIMARY KEY (api_key_id, window_start_at, window_seconds) + , CONSTRAINT api_key_rate_limit_windows_window_positive CHECK (window_seconds > 0) + , CONSTRAINT api_key_rate_limit_windows_request_count_nonnegative CHECK (request_count >= 0) + ); +``` + +This schema is small, but it gives us the important invariants: + +- API keys are never stored in plaintext. +- Rate limit settings must be positive. +- Revoked keys cannot remain active. +- A rate limit window is uniquely identified by key, start time, and window length. + +Keeping those invariants in Postgres is useful because every caller has to pass through this database state. Even if we later add an admin API, a background key-rotation job, or a migration that imports keys from another system, the database still rejects impossible states like an active key with a revocation timestamp. + +## Building the Venice client + +Next, we will create `src/venice.rs`. The client only needs to know the upstream chat completions URL, the Venice API key, and how many times to retry transient failures. + +Keeping this wrapper small is intentional - the gateway should not reimplement Venice's entire API. At a basic level, the gateway's job is to attach the server-side credential, apply a timeout, retry requests that are safe to retry, and return the upstream response in a form the router can forward. + +```rust +use std::time::Duration; + +use reqwest::{Client, RequestBuilder, Response, StatusCode}; +use reqwest_eventsource::{EventSource, RequestBuilderExt}; +use serde_json::Value; +use tokio::time::sleep; + +use crate::{config::Config, error::AppError}; + +#[derive(Clone)] +pub struct VeniceClient { + http: Client, + api_key: String, + chat_completions_url: String, + max_retries: u32, +} + +impl VeniceClient { + pub fn new(config: &Config) -> Result { + let http = Client::builder() + .timeout(config.request_timeout) + .user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"))) + .build()?; + + Ok(Self { + http, + api_key: config.venice_api_key.clone(), + chat_completions_url: format!( + "{}/chat/completions", + config.venice_base_url.trim_end_matches('/') + ), + max_retries: config.venice_max_retries, + }) + } +} +``` + +For non-streaming requests, we can retry connection errors, timeouts, and transient HTTP status codes: + +```rust +impl VeniceClient { + pub async fn chat_completions(&self, payload: &Value) -> Result { + let mut attempt = 0; + + loop { + let result = self.chat_request(payload).send().await; + + match result { + Ok(response) + if should_retry_status(response.status()) && attempt < self.max_retries => + { + attempt += 1; + sleep(retry_delay(attempt)).await; + } + Ok(response) => return Ok(response), + Err(error) if should_retry_error(&error) && attempt < self.max_retries => { + attempt += 1; + sleep(retry_delay(attempt)).await; + } + Err(error) => return Err(error.into()), + } + } + } +} + +fn should_retry_status(status: StatusCode) -> bool { + matches!( + status, + StatusCode::REQUEST_TIMEOUT + | StatusCode::TOO_MANY_REQUESTS + | StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT + ) +} +``` + +Retries are only applied to the non-streaming path. Once a streaming response has started, retrying inside the gateway could duplicate partial output or confuse clients that already received chunks. For streaming, the better default is to surface the error and let the caller decide whether to retry the whole request. + +For streaming, we create an `EventSource` from the same request: + +```rust +impl VeniceClient { + pub fn chat_completions_eventsource(&self, payload: &Value) -> Result { + self.chat_request(payload) + .eventsource() + .map_err(|error| AppError::EventSourceSetup(error.to_string())) + } + + fn chat_request(&self, payload: &Value) -> RequestBuilder { + self.http + .post(&self.chat_completions_url) + .bearer_auth(&self.api_key) + .json(payload) + } +} +``` + +Venice's chat completions endpoint is OpenAI-compatible, so the gateway can accept a familiar body: + +```json +{ + "model": "zai-org-glm-5-1", + "messages": [ + { + "role": "user", + "content": "Say hello from behind a Rust gateway." + } + ] +} +``` + +You can swap the model for any chat-capable model available to your Venice account. + +Notice that the request body is still a `serde_json::Value`. That is a deliberate compatibility choice. If we model every possible chat-completion field in Rust, we have to keep the gateway updated every time the upstream API adds a useful option. By parsing only what we need elsewhere, we let newer Venice parameters pass through without a gateway release. + +## Sharing application state + +Create `src/state.rs`: + +```rust +use std::sync::Arc; + +use sqlx::PgPool; + +use crate::{config::Config, error::AppError, venice::VeniceClient}; + +#[derive(Clone)] +pub struct AppState { + pub config: Arc, + pub db: PgPool, + pub venice: VeniceClient, +} + +impl AppState { + pub fn new(config: Config, db: PgPool) -> Result { + let venice = VeniceClient::new(&config)?; + + Ok(Self { + config: Arc::new(config), + db, + venice, + }) + } +} +``` + +Axum clones state into handlers, so the state itself should be cheap to clone. `PgPool` is already a shared pool handle, and `Arc` keeps configuration cheap as well. + +This gives every handler access to the same three things: immutable configuration, pooled database connections, and the Venice client. Keeping them in one `AppState` also makes testing simpler later because handlers receive their dependencies through Axum state instead of reading globals. + +## Authenticating gateway API keys + +The client sends its gateway key like this: + +```http +Authorization: Bearer llmg_dev_0123456789abcdef +``` + +Create `src/auth.rs` and implement an Axum extractor. The extractor lets protected handlers declare that they require an authenticated key: + +```rust +use axum::{ + extract::FromRequestParts, + http::{HeaderMap, header, request::Parts}, +}; +use sha2::{Digest, Sha256}; +use sqlx::{FromRow, PgPool}; +use subtle::ConstantTimeEq; +use uuid::Uuid; + +use crate::{error::AppError, state::AppState}; + +const KEY_PREFIX_BYTES: usize = 12; + +#[derive(Clone, Debug)] +pub struct AuthenticatedApiKey { + pub api_key_id: Uuid, + pub subject_id: Uuid, + pub rate_limit_requests: i32, + pub rate_limit_window_seconds: i32, +} + +impl FromRequestParts for AuthenticatedApiKey { + type Rejection = AppError; + + async fn from_request_parts( + parts: &mut Parts, + state: &AppState, + ) -> Result { + authenticate(&state.db, &parts.headers).await + } +} +``` + +The actual authentication flow is: + +1. Parse the bearer token. +2. Take the first 12 bytes as the key prefix. +3. Hash the full candidate token with SHA-256. +4. Load the active key row by prefix. +5. Compare the stored hash and candidate hash in constant time. + +```rust +async fn authenticate(db: &PgPool, headers: &HeaderMap) -> Result { + let token = bearer_token(headers)?; + let key_prefix = token + .get(..KEY_PREFIX_BYTES) + .ok_or(AppError::Unauthorized)?; + let candidate_hash: [u8; 32] = Sha256::digest(token.as_bytes()).into(); + + let stored = sqlx::query_as::<_, StoredApiKey>( + r#" + SELECT api_key_id, subject_id, key_hash, rate_limit_requests, rate_limit_window_seconds + FROM api_keys + WHERE key_prefix = $1 + AND is_active + AND revoked_at IS NULL + "#, + ) + .bind(key_prefix) + .fetch_optional(db) + .await?; + + let stored = stored.ok_or(AppError::Unauthorized)?; + + if stored.key_hash.ct_eq(candidate_hash.as_slice()).unwrap_u8() != 1 { + return Err(AppError::Unauthorized); + } + + Ok(AuthenticatedApiKey { + api_key_id: stored.api_key_id, + subject_id: stored.subject_id, + rate_limit_requests: stored.rate_limit_requests, + rate_limit_window_seconds: stored.rate_limit_window_seconds, + }) +} +``` + +This keeps upstream and gateway credentials separate. Your production apps can rotate gateway keys without changing the Venice API key, and the Venice key never needs to leave the server. + +The extractor pattern is helpful because authentication becomes part of the handler's type signature. A route that accepts `AuthenticatedApiKey` cannot accidentally skip auth inside the function body; Axum has to construct that value before the handler runs. That keeps the protected path easy to audit. + +## Adding fixed-window rate limits + +Create `src/rate_limit.rs`. The rate limiter uses one SQL statement to insert a new window or increment the existing one: + +```rust +pub async fn check(db: &PgPool, api_key: &AuthenticatedApiKey) -> Result<(), AppError> { + let window = sqlx::query_as::<_, RateLimitWindow>( + r#" + WITH current_window AS + ( SELECT + to_timestamp( + floor(extract(epoch FROM now()) / $2::double precision) * $2 + )::timestamptz AS window_start_at + ) + INSERT INTO api_key_rate_limit_windows + (api_key_id, window_start_at, window_seconds, request_count) + SELECT $1, current_window.window_start_at, $2, 1 + FROM current_window + ON CONFLICT (api_key_id, window_start_at, window_seconds) + DO UPDATE + SET request_count = api_key_rate_limit_windows.request_count + 1, + updated_at = now() + WHERE api_key_rate_limit_windows.request_count < $3 + RETURNING request_count, window_start_at + "#, + ) + .bind(api_key.api_key_id) + .bind(api_key.rate_limit_window_seconds) + .bind(api_key.rate_limit_requests) + .fetch_optional(db) + .await?; + + let _window = window.ok_or_else(|| AppError::RateLimited { + retry_after: retry_after(api_key.rate_limit_window_seconds), + })?; + + Ok(()) +} +``` + +The `WHERE api_key_rate_limit_windows.request_count < $3` clause is the important bit. When the window is already full, Postgres does not update the row and `RETURNING` produces no row. The handler can turn that into a `429 Too Many Requests` response with a `Retry-After` header. + +A fixed window is not the most sophisticated rate limiter, but it is easy to explain, easy to inspect, and good enough for a gateway tutorial. The tradeoff is that traffic can bunch up around window boundaries. If you need smoother behaviour at scale, a token bucket or sliding-window limiter backed by Redis is a natural next step. + +## Returning OpenAI-style errors + +Create `src/error.rs` and make application errors implement `IntoResponse`: + +```rust +#[derive(Debug, thiserror::Error)] +pub enum AppError { + #[error("missing or invalid bearer token")] + Unauthorized, + #[error("rate limit exceeded")] + RateLimited { retry_after: Duration }, + #[error("bad request: {0}")] + BadRequest(String), + #[error("database error: {0}")] + Database(#[from] sqlx::Error), + #[error("upstream request failed: {0}")] + UpstreamTransport(#[from] reqwest::Error), + #[error("failed to create upstream event source: {0}")] + EventSourceSetup(String), + #[error("upstream event source failed: {0}")] + EventSource(String), + #[error("upstream returned an error")] + Upstream { + status: StatusCode, + body: serde_json::Value, + }, +} +``` + +For gateway-generated errors, return a JSON body shaped like common model API errors: + +```json +{ + "error": { + "message": "rate limit exceeded", + "type": "rate_limit_error", + "code": null + } +} +``` + +For upstream Venice errors, preserve the upstream status code and body. That makes debugging much easier for clients because provider-level validation errors still look like provider-level validation errors. + +This split keeps the gateway honest about where an error came from. If the gateway rejects a request because the bearer token is missing or the caller is over limit, it returns a gateway-shaped error. If Venice rejects the model request, we preserve the upstream body so client developers can see the provider's validation message instead of a generic proxy failure. + +## Building the router + +Now we can wire the HTTP routes in `src/router.rs`: + +```rust +use axum::{ + Json, Router, + body::Body, + extract::{DefaultBodyLimit, State}, + http::{StatusCode, header::{CACHE_CONTROL, CONTENT_TYPE}}, + response::{IntoResponse, Response}, + routing::{get, post}, +}; +use serde::Serialize; +use serde_json::Value; +use tower::ServiceBuilder; +use tower_http::trace::TraceLayer; + +use crate::{ + auth, + error::AppError, + rate_limit, sse, + state::AppState, + telemetry::{self, GenAiRequest}, +}; + +pub fn build(state: AppState) -> Router { + let body_limit = state.config.request_body_limit_bytes; + + Router::new() + .route("/healthz", get(healthz)) + .route("/v1/chat/completions", post(chat_completions)) + .with_state(state) + .layer(DefaultBodyLimit::max(body_limit)) + .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())) +} + +async fn healthz() -> impl IntoResponse { + Json(Health { status: "ok" }) +} + +#[derive(Serialize)] +struct Health { + status: &'static str, +} +``` + +The chat handler starts by requiring an `AuthenticatedApiKey`. If authentication fails, Axum never enters the handler body: + +```rust +async fn chat_completions( + api_key: auth::AuthenticatedApiKey, + State(state): State, + Json(payload): Json, +) -> Result { + rate_limit::check(&state.db, &api_key).await?; + + let request = GenAiRequest::from_payload(&payload).map_err(AppError::BadRequest)?; + let span = telemetry::span_for_request(&request); + + if request.stream { + let eventsource = state.venice.chat_completions_eventsource(&payload)?; + let stream = sse::observe_eventsource( + eventsource, + request, + span, + None, + api_key.api_key_id, + ); + + return Ok(Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "text/event-stream") + .header(CACHE_CONTROL, "no-cache") + .body(Body::from_stream(stream)) + .expect("static streaming response headers are valid")); + } + + let upstream = state.venice.chat_completions(&payload).await?; + let status = upstream.status(); + + if !status.is_success() { + let bytes = upstream.bytes().await?; + return Err(AppError::Upstream { + status, + body: upstream_body_from_bytes(&bytes), + }); + } + + let bytes = upstream.bytes().await?; + let metadata = telemetry::metadata_from_bytes(&bytes); + span.in_scope(|| { + telemetry::record_response(&request, &span, &metadata, None); + telemetry::record_billing(api_key.api_key_id, &metadata); + }); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/json") + .body(Body::from(bytes)) + .expect("static json response headers are valid")) +} +``` + +The gateway validates only the fields it needs for gateway behavior: `model`, `messages`, and `stream`. Everything else in the JSON body passes through to Venice. That keeps the gateway compatible with provider features you may want to use later. + +The handler also makes the two response modes explicit. Non-streaming requests wait for Venice to return a full JSON response, then record response metadata before sending the bytes downstream. Streaming requests return immediately with a `text/event-stream` body backed by an async stream. That split keeps the non-streaming path simple while giving the streaming path enough control to observe chunks as they pass through. + +## Supporting streaming responses + +Streaming chat completions use server-sent events. Venice sends SSE data, and the gateway relays that data back to the client. + +The gateway should avoid buffering the whole stream because that would defeat the purpose of streaming. Users care about the time to first token, not only the time to final token. By forwarding each upstream event as it arrives, clients can render partial output while the model is still generating. + +Create `src/sse.rs`: + +```rust +use bytes::Bytes; +use futures_util::{Stream, StreamExt, stream}; +use reqwest_eventsource::{Event, EventSource}; +use uuid::Uuid; + +use crate::{ + error::AppError, + telemetry::{self, GenAiRequest, GenAiResponseMetadata}, +}; + +pub fn observe_eventsource( + eventsource: EventSource, + request: GenAiRequest, + span: tracing::Span, + pending_event: Option, + api_key_id: Uuid, +) -> impl Stream> + Send + 'static { + let state = StreamState { + eventsource, + pending_event, + request, + span, + api_key_id, + metadata: GenAiResponseMetadata::default(), + first_chunk_at: None, + previous_chunk_at: None, + }; + + stream::unfold(state, |mut state| async move { + loop { + let event = match state.pending_event.take() { + Some(event) => Some(Ok(event)), + None => state.eventsource.next().await, + }; + + match event { + Some(Ok(Event::Open)) => continue, + Some(Ok(Event::Message(message))) => { + let bytes = state.observe_message(&message.data); + if message.data == "[DONE]" { + state.eventsource.close(); + } + return Some((Ok(bytes), state)); + } + Some(Err(error)) => { + return Some((Err(AppError::EventSource(error.to_string())), state)); + } + None => { + telemetry::record_response(&state.request, &state.span, &state.metadata, None); + telemetry::record_billing(state.api_key_id, &state.metadata); + return None; + } + } + } + }) +} +``` + +Each message is encoded back into SSE format: + +```rust +fn encode_sse_data(data: &str) -> Bytes { + let mut encoded = String::new(); + for line in data.lines() { + encoded.push_str("data: "); + encoded.push_str(line); + encoded.push('\n'); + } + if data.is_empty() { + encoded.push_str("data: \n"); + } + encoded.push('\n'); + Bytes::from(encoded) +} +``` + +This preserves the client experience that OpenAI-compatible SDKs expect: chunks arrive as `data: ...` events, and the stream ends with `data: [DONE]`. + +The stream observer is also where we can collect metadata without changing what the client sees. Each chunk is forwarded in SSE format, but the gateway can still watch for response IDs, finish reasons, token usage, cost fields, and timing information as those chunks pass through. + +## Recording GenAI telemetry + +Gateways are useful because every request passes through one place. That makes them a great spot to record model, latency, token usage, finish reasons, billing cost, and streaming timings. + +Create `src/telemetry.rs` and start by parsing the request: + +```rust +#[derive(Clone, Debug)] +pub struct GenAiRequest { + pub model: String, + pub stream: bool, + pub started_at: Instant, +} + +#[derive(Clone, Debug, Default, PartialEq)] +pub struct GenAiResponseMetadata { + pub response_id: Option, + pub response_model: Option, + pub finish_reasons: Vec, + pub input_tokens: Option, + pub output_tokens: Option, + pub cost_diem: Option, + pub cost_usd: Option, +} + +impl GenAiRequest { + pub fn from_payload(payload: &Value) -> Result { + let model = payload + .get("model") + .and_then(Value::as_str) + .filter(|model| !model.trim().is_empty()) + .ok_or_else(|| "request body must include a non-empty model".to_owned())?; + + let messages = payload + .get("messages") + .and_then(Value::as_array) + .filter(|messages| !messages.is_empty()) + .ok_or_else(|| "request body must include at least one message".to_owned())?; + + if messages + .iter() + .any(|message| message.get("role").and_then(Value::as_str).is_none()) + { + return Err("every message must include a role".to_owned()); + } + + Ok(Self { + model: model.to_owned(), + stream: payload + .get("stream") + .and_then(Value::as_bool) + .unwrap_or(false), + started_at: Instant::now(), + }) + } +} +``` + +Then create a span using GenAI semantic attributes: + +```rust +pub fn span_for_request(request: &GenAiRequest) -> Span { + tracing::info_span!( + "gen_ai.client", + "otel.name" = %format!("chat {}", request.model), + "otel.kind" = "client", + "gen_ai.operation.name" = "chat", + "gen_ai.provider.name" = "venice", + "gen_ai.request.model" = %request.model, + "gen_ai.request.stream" = request.stream, + "server.address" = "api.venice.ai", + "gen_ai.response.id" = tracing::field::Empty, + "gen_ai.response.model" = tracing::field::Empty, + "gen_ai.response.finish_reasons" = tracing::field::Empty, + "gen_ai.response.time_to_first_chunk" = tracing::field::Empty, + "gen_ai.usage.input_tokens" = tracing::field::Empty, + "gen_ai.usage.output_tokens" = tracing::field::Empty, + "venice.cost.diem" = tracing::field::Empty, + "venice.cost.usd" = tracing::field::Empty, + "error.type" = tracing::field::Empty, + "otel.status_code" = tracing::field::Empty, + ) +} +``` + +When a non-streaming response comes back, deserialize the known response metadata fields into structs. The gateway still forwards the original bytes to the client, but telemetry does not need to walk arbitrary JSON. The billing log uses the gateway key UUID rather than the plaintext bearer token, and the request ID comes from Venice's response `id`: + +```rust +#[derive(Debug, Default, Deserialize)] +struct ChatCompletionResponse { + id: Option, + model: Option, + #[serde(default)] + choices: Vec, + usage: Option, + cost: Option, +} + +#[derive(Debug, Deserialize)] +struct ChatCompletionChoice { + finish_reason: Option, +} + +#[derive(Debug, Deserialize)] +struct ChatCompletionUsage { + prompt_tokens: Option, + input_tokens: Option, + completion_tokens: Option, + output_tokens: Option, +} + +#[derive(Debug, Deserialize)] +struct VeniceCost { + #[serde(default, deserialize_with = "optional_finite_f64")] + diem: Option, + #[serde(default, deserialize_with = "optional_finite_f64")] + usd: Option, +} + +pub fn metadata_from_bytes(bytes: &[u8]) -> GenAiResponseMetadata { + serde_json::from_slice::(bytes) + .map(GenAiResponseMetadata::from) + .unwrap_or_default() +} + +pub fn record_billing(api_key_id: Uuid, metadata: &GenAiResponseMetadata) { + let (Some(cost_diem), Some(cost_usd)) = (metadata.cost_diem, metadata.cost_usd) else { + return; + }; + + tracing::info!( + api_key_id = %api_key_id, + request_id = metadata.response_id.as_deref().unwrap_or("unknown"), + cost_diem, + cost_usd, + "venice.billing" + ); +} +``` + +For streaming responses, record time to first chunk and time between output chunks as the SSE stream is relayed. These metrics are especially useful when you care about perceived latency, not just total request time. + +Telemetry is where a gateway becomes more than a proxy. Once spans include the requested model, upstream model, token counts, finish reasons, status, and per-key billing logs, you can answer practical operational questions: which clients are spending the most, which models are slowest, whether streaming is improving perceived latency, and whether errors are coming from auth, rate limits, transport, or the model provider. + +## Starting the server + +Now wire everything together in `src/main.rs`: + +```rust +mod auth; +mod config; +mod error; +mod rate_limit; +mod router; +mod sse; +mod state; +mod telemetry; +mod venice; + +use sqlx::postgres::PgPoolOptions; +use tokio::net::TcpListener; +use tracing::info; + +use crate::{config::Config, state::AppState}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let config = Config::from_env()?; + let _telemetry = telemetry::init(&config)?; + + let db = PgPoolOptions::new() + .max_connections(10) + .connect(&config.database_url) + .await?; + + sqlx::migrate!("./migrations").run(&db).await?; + + let bind_addr = config.bind_addr; + let state = AppState::new(config, db)?; + let app = router::build(state); + let listener = TcpListener::bind(bind_addr).await?; + + info!(%bind_addr, "starting gateway"); + + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal()) + .await?; + + Ok(()) +} +``` + +On startup, the gateway: + +1. Reads configuration. +2. Initializes telemetry. +3. Connects to Postgres. +4. Runs SQLx migrations. +5. Builds shared app state. +6. Starts the Axum server. + +Running migrations on startup is convenient for this tutorial because `docker compose up` can bring the whole stack to a working state. In a larger production deployment, you may prefer to run migrations as a separate release step so schema changes are reviewed and applied before new gateway instances start. + +## Seeding a local gateway key + +For local development, create `scripts/seed_api_key.sh`. The script inserts a gateway API key into Postgres by storing its prefix and SHA-256 hash: + +```sh +#!/bin/sh +set -eu + +DATABASE_URL="${DATABASE_URL:-postgres://llm_gateway:llm_gateway@localhost:5432/llm_gateway}" +GATEWAY_API_KEY="${GATEWAY_API_KEY:-llmg_dev_0123456789abcdef}" +SUBJECT_ID="${SUBJECT_ID:-00000000-0000-0000-0000-000000000001}" + +psql "$DATABASE_URL" \ + -v ON_ERROR_STOP=1 \ + -v gateway_api_key="$GATEWAY_API_KEY" \ + -v subject_id="$SUBJECT_ID" <<'SQL' +INSERT INTO api_keys + (subject_id, name, key_prefix, key_hash, rate_limit_requests, rate_limit_window_seconds) +VALUES + ( :'subject_id'::uuid + , 'local development key' + , left(:'gateway_api_key', 12) + , digest(:'gateway_api_key', 'sha256') + , 60 + , 60 + ) +ON CONFLICT (key_prefix) +DO UPDATE +SET key_hash = EXCLUDED.key_hash, + is_active = true, + revoked_at = NULL; +SQL +``` + +The default local key is: + +```text +llmg_dev_0123456789abcdef +``` + +For a real deployment, generate longer random keys, show them once to the caller, and store only the hash. + +The seed script is intentionally boring because local credentials should be easy to recreate. The production version is where you would add stronger key generation, audit logging, expiration, and a one-time display flow. + +## Running locally + +To run locally, we'll use Docker Compose to start both the gateway and Postgres. This keeps the tutorial reproducible: readers do not need a manually configured database, and the gateway can use the same `DATABASE_URL` shape it would use in a containerized deployment. + +```yaml +name: llm-gateway + +services: + postgres: + image: postgres:17-alpine + environment: + POSTGRES_DB: llm_gateway + POSTGRES_USER: llm_gateway + POSTGRES_PASSWORD: llm_gateway + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U llm_gateway -d llm_gateway"] + interval: 5s + timeout: 5s + retries: 10 + + gateway: + build: . + depends_on: + postgres: + condition: service_healthy + environment: + BIND_ADDR: 0.0.0.0:3000 + DATABASE_URL: postgres://llm_gateway:llm_gateway@postgres:5432/llm_gateway + OTEL_SERVICE_NAME: llm-gateway + VENICE_API_KEY: ${VENICE_API_KEY:?set VENICE_API_KEY in your shell or .env} + ports: + - "3000:3000" + healthcheck: + test: ["CMD", "curl", "-fsS", "http://localhost:3000/healthz"] + interval: 5s + timeout: 5s + retries: 10 +``` + +We'll also need a small Dockerfile that builds the Rust binary and copies it into a smaller runtime image: + +```dockerfile +FROM rust:1.92-bookworm AS builder + +WORKDIR /app + +COPY Cargo.toml Cargo.lock ./ +COPY migrations ./migrations +COPY src ./src + +RUN cargo build --release + +FROM debian:bookworm-slim + +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates curl \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /app/target/release/llm-gateway /usr/local/bin/llm-gateway + +EXPOSE 3000 + +CMD ["llm-gateway"] +``` + +To run the stack, use the following command: + +```bash +docker compose up --build +``` + +Don't forget you can also run this detached with the `-d` flag if you want to use your terminal for other things after (and then use `docker compose down` to remove it). + +In another terminal, seed the development gateway key: + +```bash +docker compose --profile seed run --rm seed +``` + +If your machine already has Postgres running on port `5432`, remove the host port mapping for the Compose Postgres service. The gateway only needs to reach Postgres on Docker's internal network. + +The important thing to keep in mind is that the Venice API key belongs only in the gateway environment. Client requests should use the seeded gateway key. That separation is the whole point of putting a gateway in front of the model provider. + +## Testing the gateway + +First, check health: + +```bash +curl http://localhost:3000/healthz +``` + +You should see: + +```json +{"status":"ok"} +``` + +Now send a non-streaming chat completion request: + +```bash +curl http://localhost:3000/v1/chat/completions \ + -H "Authorization: Bearer llmg_dev_0123456789abcdef" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "llama-3.3-70b", + "messages": [ + { + "role": "user", + "content": "Reply with one short sentence saying the gateway works." + } + ] + }' +``` + +The response should look like an OpenAI-compatible chat completion: + +```json +{ + "id": "chatcmpl-...", + "object": "chat.completion", + "model": "llama-3.3-70b", + "choices": [ + { + "message": { + "role": "assistant", + "content": "The gateway is working." + }, + "finish_reason": "stop" + } + ], + "usage": { + "prompt_tokens": 123, + "completion_tokens": 5, + "total_tokens": 128 + }, + "cost": { + "diem": 0.42, + "usd": 0.0012 + } +} +``` + +For streaming: + +```bash +curl -N http://localhost:3000/v1/chat/completions \ + -H "Authorization: Bearer llmg_dev_0123456789abcdef" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "llama-3.3-70b", + "stream": true, + "messages": [ + { + "role": "user", + "content": "Write one short sentence confirming streaming works." + } + ] + }' +``` + +You should see SSE chunks: + +```text +data: {"id":"chatcmpl-...","object":"chat.completion.chunk",...} + +data: {"id":"chatcmpl-...","object":"chat.completion.chunk",...} + +data: [DONE] +``` + +The repo also includes a smoke test script: + +```bash +sh ./scripts/smoke_chat.sh +``` + +For local code quality checks, run: + +```bash +cargo fmt --check +cargo test +cargo clippy --all-targets --all-features -- -D warnings +``` + +Testing both response modes matters because they exercise different proxy paths. The non-streaming test proves that auth, rate limiting, upstream forwarding, and JSON response telemetry work. The streaming test proves that the gateway can keep an SSE connection open and forward chunks without buffering the final answer first. + +## Extending this gateway + +This gateway is intentionally small, but it gives you a solid foundation. Good next steps include: + +- Add per-subject budgets and monthly spend limits. +- Support multiple upstream providers behind the same OpenAI-compatible API. +- Store request metadata for audit logs while keeping prompt logging disabled by default. +- Add an admin API for creating, revoking, and rotating gateway keys. +- Add model allowlists per API key. +- Add Redis or another shared store if you need lower-latency rate limiting across many gateway instances. + +The main design idea is to keep policy in the gateway and inference in Venice. That lets client apps use a familiar API while your platform keeps control over keys, usage, limits, and observability. + +## Finishing up + +Thanks for reading! Hopefully this helped you see how to build a practical LLM gateway in Rust without turning it into a huge platform project. + +By combining Axum, Postgres, SQLx, OpenTelemetry, and Venice's OpenAI-compatible chat completions API, we can build a gateway that is small enough to understand and useful enough to sit in front of real applications. diff --git a/images/guides/llm-gateway-architecture.svg b/images/guides/llm-gateway-architecture.svg new file mode 100644 index 00000000..830e3221 --- /dev/null +++ b/images/guides/llm-gateway-architecture.svg @@ -0,0 +1,62 @@ + + Rust LLM gateway architecture + A client sends an OpenAI-compatible request to a Rust Axum gateway, which checks Postgres for authentication and rate limits, forwards to Venice AI, and exports telemetry. + + + + + + + + + Rust LLM Gateway + The gateway keeps provider credentials server-side while enforcing policy and recording telemetry for every request. + + + Client app + Bearer gateway key + OpenAI-compatible JSON + + + Rust gateway + Axum router + Auth extractor + Fixed-window limiter + Venice proxy client + SSE stream observer + + + Postgres + Key hashes and limits + + + Venice AI + Chat completions + Streaming responses + + + Telemetry + OpenTelemetry + Billing logs + + + Provider key stays inside the gateway + + + + + + + + diff --git a/llms.txt b/llms.txt index e7313506..ec55a6ad 100644 --- a/llms.txt +++ b/llms.txt @@ -142,6 +142,7 @@ Venice offers four tiers of privacy: **Anonymized** (third-party models with ide - [Building a Private RAG Bot](https://docs.venice.ai/guides/projects/private-rag-bot): Modern private RAG with Venice embeddings, Qdrant vector search, FastEmbed re-ranking, and Venice chat completions - [Building a Private Research Agent](https://docs.venice.ai/guides/projects/private-research-agent): Python research agent that plans searches, reads sources with Venice's scrape API, extracts evidence, and writes cited Markdown reports - [Building a Codebase Security Reviewer](https://docs.venice.ai/guides/projects/security-code-reviewer): Python security agent that finds vulnerabilities and chains them into exploit paths using Venice, an AST repo map, and Pydantic guardrails +- [Building a Rust LLM Gateway](https://docs.venice.ai/guides/projects/rust-llm-gateway): OpenAI-compatible Rust gateway with Axum, Postgres-backed API keys, fixed-window rate limits, streaming responses, and OpenTelemetry ## Key Features