Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,46 @@ apimux completion fish > ~/.config/fish/completions/apimux.fish
apimux completion powershell | Out-String | Invoke-Expression
```

## Caller Context (for billing webhook callers)

API keys that have a billing webhook configured on the apimux-service side
need each request to carry caller-provided attribution context so the
receiver can settle the call against the right subject. `apimux` scans the
process environment for any variable prefixed `APIMUX_CALLER_CTX_` and
forwards it transparently as the matching `X-Apimux-Caller-Ctx-*` HTTP
header — the prefix convention is generic by design so apimux never has
to know which keys a particular downstream uses.

The naming rule:

```
APIMUX_CALLER_CTX_<SUFFIX> → X-Apimux-Caller-Ctx-<Suffix-With-Dashes>
```

For Kamay's apimux receiver, that looks like:

```bash
APIMUX_CALLER_CTX_USER_ID="usr_abc" \
APIMUX_CALLER_CTX_WORKSPACE_ID="ws_def" \
APIMUX_CALLER_CTX_CONVERSATION_ID="conv_xyz" \
APIMUX_CALLER_CTX_MESSAGE_ID="msg_qrs" \
apimux reddit search --query "headphones"
```

A different downstream might use entirely different keys (the CLI does not
care):

```bash
APIMUX_CALLER_CTX_ACCOUNT_ID="acct_abc" \
APIMUX_CALLER_CTX_PROJECT_ID="proj_def" \
apimux reddit search --query "..."
```

Unset / empty / whitespace-only values are treated as absent — no header
is sent and apimux-service records SQL NULL on `api_call_logs.caller_context`.
If the apikey has no webhook configured the headers are still persisted
for reconciliation but no webhook event is enqueued.

## Repo Layout

- `cmd/apimux`: entrypoint
Expand Down
45 changes: 45 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
)
Expand Down Expand Up @@ -80,6 +81,7 @@ func (c *Client) ExecuteCapability(ctx context.Context, capability string, param
if c.apiKey != "" {
req.Header.Set("Authorization", "Bearer "+c.apiKey)
}
applyCallerContextHeaders(req)

resp, err := c.httpClient.Do(req)
if err != nil {
Expand All @@ -101,6 +103,49 @@ func (c *Client) ExecuteCapability(ctx context.Context, capability string, param
}, nil
}

// callerCtxEnvPrefix matches the apimux-side header prefix one-to-one.
// Any environment variable starting with this is forwarded as an
// X-Apimux-Caller-Ctx-* header to the capability request. The naming is
// deliberately generic — apimux does not interpret the keys, the
// configured webhook receiver decides what schema it expects.
const callerCtxEnvPrefix = "APIMUX_CALLER_CTX_"

// applyCallerContextHeaders scans the process environment for every
// variable prefixed with APIMUX_CALLER_CTX_ and copies its value into the
// matching X-Apimux-Caller-Ctx-* HTTP header on req. Empty / whitespace-only
// values are dropped — apimux-service treats absent and empty header
// values identically and persists SQL NULL on api_call_logs.caller_context
// in either case, so emitting blanks would only pollute the wire.
//
// Header naming: strip the env prefix, replace '_' with '-', and let Go's
// http.Header canonicalizer normalize the casing. apimux extracts the key
// back out by lowercasing and swapping '-' for '_', so:
//
// APIMUX_CALLER_CTX_USER_ID -> X-Apimux-Caller-Ctx-USER-ID
// -> canonical X-Apimux-Caller-Ctx-User-Id
// -> jsonb key "user_id"
func applyCallerContextHeaders(req *http.Request) {
for _, kv := range os.Environ() {
eq := strings.IndexByte(kv, '=')
if eq <= 0 {
continue
}
name, value := kv[:eq], kv[eq+1:]
if !strings.HasPrefix(name, callerCtxEnvPrefix) {
continue
}
v := strings.TrimSpace(value)
if v == "" {
continue
}
suffix := name[len(callerCtxEnvPrefix):]
if suffix == "" {
continue
}
req.Header.Set("X-Apimux-Caller-Ctx-"+strings.ReplaceAll(suffix, "_", "-"), v)
}
}

func (c *Client) ListSchemas(ctx context.Context) (Response, error) {
return c.doRequest(ctx, http.MethodGet, c.baseURL+"/v1/schema", nil)
}
Expand Down
163 changes: 163 additions & 0 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,166 @@ type roundTripFunc func(*http.Request) (*http.Response, error)
func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
}

// TestExecuteCapabilityForwardsCallerContextEnvVarsAsHeaders pins the
// contract with apimux-service: every APIMUX_CALLER_CTX_* env var present
// in the process must surface as the matching X-Apimux-Caller-Ctx-* header
// on the outbound request. apimux groups them into a single caller_context
// jsonb on api_call_logs, so missing this transparently disables the
// billing webhook for that call.
func TestExecuteCapabilityForwardsCallerContextEnvVarsAsHeaders(t *testing.T) {
t.Setenv("APIMUX_CALLER_CTX_USER_ID", "usr_alpha")
t.Setenv("APIMUX_CALLER_CTX_WORKSPACE_ID", "ws_beta")
t.Setenv("APIMUX_CALLER_CTX_CONVERSATION_ID", "conv_gamma")
t.Setenv("APIMUX_CALLER_CTX_MESSAGE_ID", "msg_delta")

var got http.Header
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
got = r.Header.Clone()
_, _ = w.Write([]byte(`{"ok":true}`))
}))
defer server.Close()

client := NewWithHTTPClient(Config{BaseURL: server.URL, APIKey: "k"}, server.Client())
if _, err := client.ExecuteCapability(context.Background(), "reddit.search", nil); err != nil {
t.Fatalf("ExecuteCapability error = %v", err)
}

want := map[string]string{
"X-Apimux-Caller-Ctx-User-Id": "usr_alpha",
"X-Apimux-Caller-Ctx-Workspace-Id": "ws_beta",
"X-Apimux-Caller-Ctx-Conversation-Id": "conv_gamma",
"X-Apimux-Caller-Ctx-Message-Id": "msg_delta",
}
for header, expected := range want {
if got.Get(header) != expected {
t.Errorf("header %s = %q, want %q", header, got.Get(header), expected)
}
}
}

// TestExecuteCapabilityOmitsCallerContextHeadersWhenEnvUnset is the
// symmetric guarantee: a fresh process without any APIMUX_CALLER_CTX_* env
// vars must NOT attach the headers. apimux-service treats absent and empty
// equivalently, but emitting a blank header would still leak the key name
// into access logs / curl traces.
func TestExecuteCapabilityOmitsCallerContextHeadersWhenEnvUnset(t *testing.T) {
t.Setenv("APIMUX_CALLER_CTX_USER_ID", "")
t.Setenv("APIMUX_CALLER_CTX_WORKSPACE_ID", "")
t.Setenv("APIMUX_CALLER_CTX_CONVERSATION_ID", "")
t.Setenv("APIMUX_CALLER_CTX_MESSAGE_ID", "")

var got http.Header
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
got = r.Header.Clone()
_, _ = w.Write([]byte(`{"ok":true}`))
}))
defer server.Close()

client := NewWithHTTPClient(Config{BaseURL: server.URL, APIKey: "k"}, server.Client())
if _, err := client.ExecuteCapability(context.Background(), "reddit.search", nil); err != nil {
t.Fatalf("ExecuteCapability error = %v", err)
}

for _, h := range []string{
"X-Apimux-Caller-Ctx-User-Id",
"X-Apimux-Caller-Ctx-Workspace-Id",
"X-Apimux-Caller-Ctx-Conversation-Id",
"X-Apimux-Caller-Ctx-Message-Id",
} {
if v, ok := got[http.CanonicalHeaderKey(h)]; ok {
t.Errorf("expected %s absent when env unset, got %v", h, v)
}
}
}

// TestExecuteCapabilityForwardsPartialCallerContext covers the realistic
// case where the caller only knows a user id (e.g. ad-hoc CLI invocation
// outside a conversation). The available headers MUST flow; the missing
// ones MUST NOT be padded with empty values.
func TestExecuteCapabilityForwardsPartialCallerContext(t *testing.T) {
t.Setenv("APIMUX_CALLER_CTX_USER_ID", "usr_alpha")
t.Setenv("APIMUX_CALLER_CTX_WORKSPACE_ID", "")
t.Setenv("APIMUX_CALLER_CTX_CONVERSATION_ID", "conv_gamma")
t.Setenv("APIMUX_CALLER_CTX_MESSAGE_ID", "")

var got http.Header
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
got = r.Header.Clone()
_, _ = w.Write([]byte(`{"ok":true}`))
}))
defer server.Close()

client := NewWithHTTPClient(Config{BaseURL: server.URL, APIKey: "k"}, server.Client())
if _, err := client.ExecuteCapability(context.Background(), "reddit.search", nil); err != nil {
t.Fatalf("ExecuteCapability error = %v", err)
}

if got.Get("X-Apimux-Caller-Ctx-User-Id") != "usr_alpha" {
t.Errorf("user id header missing or wrong: %q", got.Get("X-Apimux-Caller-Ctx-User-Id"))
}
if got.Get("X-Apimux-Caller-Ctx-Conversation-Id") != "conv_gamma" {
t.Errorf("conversation id header missing or wrong: %q", got.Get("X-Apimux-Caller-Ctx-Conversation-Id"))
}
if _, ok := got[http.CanonicalHeaderKey("X-Apimux-Caller-Ctx-Workspace-Id")]; ok {
t.Errorf("workspace id header should be absent for empty env var")
}
if _, ok := got[http.CanonicalHeaderKey("X-Apimux-Caller-Ctx-Message-Id")]; ok {
t.Errorf("message id header should be absent for empty env var")
}
}

// TestExecuteCapabilityTrimsWhitespaceFromCallerContext guards against the
// common deployment mistake of injecting env values with trailing newlines
// (e.g. from `echo "$id"` rather than `printf %s`). A trimmed empty string
// must NOT produce a header.
func TestExecuteCapabilityTrimsWhitespaceFromCallerContext(t *testing.T) {
t.Setenv("APIMUX_CALLER_CTX_USER_ID", " usr_alpha\n")
t.Setenv("APIMUX_CALLER_CTX_WORKSPACE_ID", " ")
t.Setenv("APIMUX_CALLER_CTX_CONVERSATION_ID", "")
t.Setenv("APIMUX_CALLER_CTX_MESSAGE_ID", "")

var got http.Header
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
got = r.Header.Clone()
_, _ = w.Write([]byte(`{"ok":true}`))
}))
defer server.Close()

client := NewWithHTTPClient(Config{BaseURL: server.URL, APIKey: "k"}, server.Client())
if _, err := client.ExecuteCapability(context.Background(), "reddit.search", nil); err != nil {
t.Fatalf("ExecuteCapability error = %v", err)
}

if got.Get("X-Apimux-Caller-Ctx-User-Id") != "usr_alpha" {
t.Errorf("user id should be trimmed to %q, got %q", "usr_alpha", got.Get("X-Apimux-Caller-Ctx-User-Id"))
}
if _, ok := got[http.CanonicalHeaderKey("X-Apimux-Caller-Ctx-Workspace-Id")]; ok {
t.Errorf("whitespace-only env value must not produce a header")
}
}

// TestExecuteCapabilityForwardsArbitraryCallerContextKey demonstrates the
// receiver-agnostic property: a downstream that wants to bill on a key
// apimux has never heard of (here APIMUX_CALLER_CTX_ACCOUNT_ID) just sets
// the env var, and the header flows. apimux persists whatever it gets;
// nothing in this codebase needs to know about "account_id".
func TestExecuteCapabilityForwardsArbitraryCallerContextKey(t *testing.T) {
t.Setenv("APIMUX_CALLER_CTX_ACCOUNT_ID", "acct_xyz")

var got http.Header
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
got = r.Header.Clone()
_, _ = w.Write([]byte(`{"ok":true}`))
}))
defer server.Close()

client := NewWithHTTPClient(Config{BaseURL: server.URL, APIKey: "k"}, server.Client())
if _, err := client.ExecuteCapability(context.Background(), "reddit.search", nil); err != nil {
t.Fatalf("ExecuteCapability error = %v", err)
}

if got.Get("X-Apimux-Caller-Ctx-Account-Id") != "acct_xyz" {
t.Errorf("arbitrary caller-ctx key not forwarded: %q", got.Get("X-Apimux-Caller-Ctx-Account-Id"))
}
}
Loading