diff --git a/backend/cli/src/auth/index.ts b/backend/cli/src/auth/index.ts index dc1d86d..402f8e2 100644 --- a/backend/cli/src/auth/index.ts +++ b/backend/cli/src/auth/index.ts @@ -1,5 +1,6 @@ import path from "path" import { Global } from "../global" +import { JsonStore } from "../util/jsonstore" import z from "zod" export const OAUTH_DUMMY_KEY = "synsc-oauth-dummy-key" @@ -42,8 +43,7 @@ export namespace Auth { } export async function all(): Promise> { - const file = Bun.file(filepath) - const data = await file.json().catch(() => ({}) as Record) + const data = await JsonStore.read(filepath) return Object.entries(data).reduce( (acc, [key, value]) => { const parsed = Info.safeParse(value) @@ -56,15 +56,12 @@ export namespace Auth { } export async function set(key: string, info: Info) { - const file = Bun.file(filepath) - const data = await all() - await Bun.write(file, JSON.stringify({ ...data, [key]: info }, null, 2), { mode: 0o600 }) + await JsonStore.update(filepath, (data) => ({ ...data, [key]: info })) } export async function remove(key: string) { - const file = Bun.file(filepath) - const data = await all() - delete data[key] - await Bun.write(file, JSON.stringify(data, null, 2), { mode: 0o600 }) + await JsonStore.update(filepath, (data) => { + delete data[key] + }) } } diff --git a/backend/cli/src/mcp/auth.ts b/backend/cli/src/mcp/auth.ts index 0f91a35..f636050 100644 --- a/backend/cli/src/mcp/auth.ts +++ b/backend/cli/src/mcp/auth.ts @@ -1,6 +1,7 @@ import path from "path" import z from "zod" import { Global } from "../global" +import { JsonStore } from "../util/jsonstore" export namespace McpAuth { export const Tokens = z.object({ @@ -53,57 +54,70 @@ export namespace McpAuth { } export async function all(): Promise> { - const file = Bun.file(filepath) - return file.json().catch(() => ({})) + return (await JsonStore.read(filepath)) as Record + } + + /** Atomic read-modify-write of one server's entry. Serialized behind the + * store lock so concurrent updates can't drop each other's writes. */ + async function update(mcpName: string, fn: (entry: Entry) => void, serverUrl?: string): Promise { + await JsonStore.update(filepath, (data) => { + const entry = (data[mcpName] ?? {}) as Entry + fn(entry) + // Always update serverUrl if provided + if (serverUrl) entry.serverUrl = serverUrl + data[mcpName] = entry + }) } export async function set(mcpName: string, entry: Entry, serverUrl?: string): Promise { - const file = Bun.file(filepath) - const data = await all() // Always update serverUrl if provided - if (serverUrl) { - entry.serverUrl = serverUrl - } - await Bun.write(file, JSON.stringify({ ...data, [mcpName]: entry }, null, 2), { mode: 0o600 }) + if (serverUrl) entry.serverUrl = serverUrl + await JsonStore.update(filepath, (data) => ({ ...data, [mcpName]: entry })) } export async function remove(mcpName: string): Promise { - const file = Bun.file(filepath) - const data = await all() - delete data[mcpName] - await Bun.write(file, JSON.stringify(data, null, 2), { mode: 0o600 }) + await JsonStore.update(filepath, (data) => { + delete data[mcpName] + }) } export async function updateTokens(mcpName: string, tokens: Tokens, serverUrl?: string): Promise { - const entry = (await get(mcpName)) ?? {} - entry.tokens = tokens - await set(mcpName, entry, serverUrl) + await update( + mcpName, + (entry) => { + entry.tokens = tokens + }, + serverUrl, + ) } export async function updateClientInfo(mcpName: string, clientInfo: ClientInfo, serverUrl?: string): Promise { - const entry = (await get(mcpName)) ?? {} - entry.clientInfo = clientInfo - await set(mcpName, entry, serverUrl) + await update( + mcpName, + (entry) => { + entry.clientInfo = clientInfo + }, + serverUrl, + ) } export async function updateCodeVerifier(mcpName: string, codeVerifier: string): Promise { - const entry = (await get(mcpName)) ?? {} - entry.codeVerifier = codeVerifier - await set(mcpName, entry) + await update(mcpName, (entry) => { + entry.codeVerifier = codeVerifier + }) } export async function clearCodeVerifier(mcpName: string): Promise { - const entry = await get(mcpName) - if (entry) { - delete entry.codeVerifier - await set(mcpName, entry) - } + await JsonStore.update(filepath, (data) => { + const entry = data[mcpName] as Entry | undefined + if (entry) delete entry.codeVerifier + }) } export async function updateOAuthState(mcpName: string, oauthState: string): Promise { - const entry = (await get(mcpName)) ?? {} - entry.oauthState = oauthState - await set(mcpName, entry) + await update(mcpName, (entry) => { + entry.oauthState = oauthState + }) } export async function getOAuthState(mcpName: string): Promise { @@ -112,11 +126,10 @@ export namespace McpAuth { } export async function clearOAuthState(mcpName: string): Promise { - const entry = await get(mcpName) - if (entry) { - delete entry.oauthState - await set(mcpName, entry) - } + await JsonStore.update(filepath, (data) => { + const entry = data[mcpName] as Entry | undefined + if (entry) delete entry.oauthState + }) } /** diff --git a/backend/cli/src/mcp/oauth-provider.ts b/backend/cli/src/mcp/oauth-provider.ts index f3a2017..391d802 100644 --- a/backend/cli/src/mcp/oauth-provider.ts +++ b/backend/cli/src/mcp/oauth-provider.ts @@ -1,4 +1,10 @@ -import type { OAuthClientProvider } from "@modelcontextprotocol/sdk/client/auth.js" +import { + discoverAuthorizationServerMetadata, + discoverOAuthProtectedResourceMetadata, + refreshAuthorization, + selectResourceURL, + type OAuthClientProvider, +} from "@modelcontextprotocol/sdk/client/auth.js" import type { OAuthClientMetadata, OAuthTokens, @@ -10,6 +16,12 @@ import { Log } from "../util/log" const log = Log.create({ service: "mcp.oauth" }) +// Single-flight token refresh per server. Servers that rotate the refresh +// token invalidate the old one on every refresh, so two concurrent refreshes +// in this process would leave one caller holding a revoked token. Mirrors the +// codex recovery pattern in plugin/codex.ts. +const refreshing = new Map>() + const OAUTH_CALLBACK_PORT = 19876 const OAUTH_CALLBACK_PATH = "/mcp/oauth/callback" @@ -96,14 +108,83 @@ export class McpOAuthProvider implements OAuthClientProvider { const entry = await McpAuth.getForUrl(this.mcpName, this.serverUrl) if (!entry?.tokens) return undefined + const expired = entry.tokens.expiresAt !== undefined && entry.tokens.expiresAt < Date.now() / 1000 + if (!expired || !entry.tokens.refreshToken) return this.format(entry.tokens) + + const refreshed = await this.single(entry.tokens.refreshToken) + if (refreshed) return this.format(refreshed) + + // Refresh failed even after recovery — hand back the stored tokens so + // the SDK's own auth flow surfaces re-authentication. + return this.format(entry.tokens) + } + + private format(tokens: McpAuth.Tokens): OAuthTokens { return { - access_token: entry.tokens.accessToken, + access_token: tokens.accessToken, token_type: "Bearer", - refresh_token: entry.tokens.refreshToken, - expires_in: entry.tokens.expiresAt - ? Math.max(0, Math.floor(entry.tokens.expiresAt - Date.now() / 1000)) - : undefined, - scope: entry.tokens.scope, + refresh_token: tokens.refreshToken, + expires_in: tokens.expiresAt ? Math.max(0, Math.floor(tokens.expiresAt - Date.now() / 1000)) : undefined, + scope: tokens.scope, + } + } + + /** Single-flight wrapper: concurrent callers share one refresh round-trip. */ + private single(refreshToken: string): Promise { + const inflight = refreshing.get(this.mcpName) + if (inflight) return inflight + const started = this.recover(refreshToken).finally(() => { + refreshing.delete(this.mcpName) + }) + refreshing.set(this.mcpName, started) + return started + } + + /** Refresh with cross-process recovery. The single-flight guard only + * covers this process; when another openscience process wins a refresh + * race against a rotating-refresh server, it has already persisted the + * rotated pair. Re-read the store before surfacing re-auth, and retry + * once with the rotated token. */ + private async recover(refreshToken: string): Promise { + try { + return await this.refresh(refreshToken) + } catch (error) { + const latest = (await McpAuth.getForUrl(this.mcpName, this.serverUrl))?.tokens + const valid = latest?.expiresAt === undefined || latest.expiresAt > Date.now() / 1000 + if (latest?.accessToken && valid) return latest + if (latest?.refreshToken && latest.refreshToken !== refreshToken) { + const retried = await this.refresh(latest.refreshToken).catch(() => undefined) + if (retried) return retried + } + log.warn("token refresh failed", { + mcpName: this.mcpName, + error: error instanceof Error ? error.message : String(error), + }) + return undefined + } + } + + /** One refresh round-trip via the SDK helpers, persisted on success. */ + private async refresh(refreshToken: string): Promise { + const client = await this.clientInformation() + if (!client) throw new Error(`no OAuth client information for MCP server: ${this.mcpName}`) + const metadata = await discoverOAuthProtectedResourceMetadata(this.serverUrl).catch(() => undefined) + const issuer = metadata?.authorization_servers?.[0] ?? new URL("/", this.serverUrl) + const server = await discoverAuthorizationServerMetadata(issuer) + const resource = await selectResourceURL(this.serverUrl, this, metadata) + const tokens = await refreshAuthorization(issuer, { + metadata: server, + clientInformation: client, + refreshToken, + resource, + }) + await this.saveTokens(tokens) + log.info("refreshed oauth tokens", { mcpName: this.mcpName }) + return { + accessToken: tokens.access_token, + refreshToken: tokens.refresh_token, + expiresAt: tokens.expires_in ? Date.now() / 1000 + tokens.expires_in : undefined, + scope: tokens.scope, } } diff --git a/backend/cli/src/openscience/index.ts b/backend/cli/src/openscience/index.ts index 61dfaef..34a842d 100644 --- a/backend/cli/src/openscience/index.ts +++ b/backend/cli/src/openscience/index.ts @@ -7,6 +7,7 @@ import { fileURLToPath } from "url" import { randomUUID } from "crypto" import { Global } from "../global" import { Log } from "../util/log" +import { Lock } from "../util/lock" import { Env } from "../env" import { Auth } from "../auth" import { DEFAULT_MANAGED_API_BASE, MANAGED_API_BASE } from "../endpoints" @@ -1158,6 +1159,9 @@ export namespace OpenScience { async function persistToQueue(params: UsageParams) { try { + // Serialize against flushPendingUsage so an append can't land between + // the flusher's read and its final rewrite (which would delete it). + using _ = await Lock.write(pendingQueuePath) await fs.appendFile(pendingQueuePath, JSON.stringify(params) + "\n") log.info("usage queued for retry", { service: params.service }) } catch (e) { @@ -1247,40 +1251,59 @@ export namespace OpenScience { return null } - /** Retry any queued usage reports from previous failures (called at startup) */ + /** Retry any queued usage reports from previous failures (called at startup). + * Holds the queue lock across the whole read → send → rewrite cycle so a + * concurrent flush in this process can't double-send, and the rewrite + * drops only the lines actually read so an append landing mid-flush + * survives. Best-effort: never throws. */ export async function flushPendingUsage(): Promise { - let lines: string[] try { - const raw = await fs.readFile(pendingQueuePath, "utf-8") - lines = raw.split("\n").filter(Boolean) - } catch { - return - } - if (!lines.length) return + using _ = await Lock.write(pendingQueuePath) + const raw = await fs.readFile(pendingQueuePath, "utf-8").catch(() => "") + const lines = raw.split("\n").filter(Boolean) + if (!lines.length) return - const session = await getSession() - if (!session) return + const session = await getSession() + if (!session) return - const remaining: string[] = [] - for (const line of lines) { - try { - const params: UsageParams = JSON.parse(line) - const result = await sendReport(params, session) - if (!result.ok && !result.permanent) { - remaining.push(line) + const retry: string[] = [] + for (const line of lines) { + try { + const params: UsageParams = JSON.parse(line) + const result = await sendReport(params, session) + if (!result.ok && !result.permanent) { + retry.push(line) + } + } catch { + // malformed line, drop it } - } catch { - // malformed line, drop it } - } - try { + // Re-read before rewriting: another process may have appended while + // reports were sending. Remove only the lines read above (counted, so + // a report queued twice is removed exactly twice) and keep the rest. + const consumed = new Map() + for (const line of lines) consumed.set(line, (consumed.get(line) ?? 0) + 1) + const current = await fs.readFile(pendingQueuePath, "utf-8").catch(() => "") + const appended = current + .split("\n") + .filter(Boolean) + .filter((line) => { + const count = consumed.get(line) ?? 0 + if (!count) return true + consumed.set(line, count - 1) + return false + }) + + const remaining = [...retry, ...appended] if (remaining.length) { await fs.writeFile(pendingQueuePath, remaining.join("\n") + "\n") - } else { - await fs.unlink(pendingQueuePath) + return } - } catch {} + await fs.unlink(pendingQueuePath).catch(() => {}) + } catch (e) { + log.warn("usage queue flush failed", { error: e instanceof Error ? e.message : String(e) }) + } } // === Learned Skills (RSI) === diff --git a/backend/cli/src/util/jsonstore.ts b/backend/cli/src/util/jsonstore.ts new file mode 100644 index 0000000..b8c6c8b --- /dev/null +++ b/backend/cli/src/util/jsonstore.ts @@ -0,0 +1,70 @@ +import fs from "fs/promises" +import { Lock } from "./lock" + +/** + * Shared persistence for small JSON-object credential stores (auth.json, + * mcp-auth.json). Every write goes through a temp file in the same directory + * followed by a rename, so a crash or concurrent reader never observes a torn + * file, and every read-modify-write cycle is serialized behind the in-process + * Lock keyed by file path. + * + * A file that exists but cannot be parsed is fatal on the write path: + * proceeding with `{}` would rewrite the store containing only the entry + * being saved and silently destroy every other credential. The corrupt file + * is backed up alongside and the write throws. The read path degrades to + * `{}` so the CLI still boots. + */ +export namespace JsonStore { + async function parse(filepath: string): Promise> { + const file = Bun.file(filepath) + if (!(await file.exists())) return {} + const text = await file.text() + if (!text.trim()) return {} + return JSON.parse(text) as Record + } + + /** Read path: a missing, empty, or corrupt file degrades to `{}`. */ + export async function read(filepath: string): Promise> { + using _ = await Lock.read(filepath) + return await parse(filepath).catch(() => ({}) as Record) + } + + /** Write path: refuse to build on a file that exists but cannot be parsed. */ + async function load(filepath: string): Promise> { + try { + return await parse(filepath) + } catch (error) { + const backup = `${filepath}.corrupt-${process.pid}` + await fs.copyFile(filepath, backup).catch(() => {}) + const reason = error instanceof Error ? error.message : String(error) + throw new Error( + `${filepath} exists but could not be parsed (${reason}). ` + + `Refusing to overwrite it — that would discard every other entry. ` + + `The unmodified file was backed up to ${backup}; repair or remove ${filepath} and retry.`, + ) + } + } + + async function replace(filepath: string, data: Record) { + const temp = `${filepath}.${process.pid}.tmp` + await Bun.write(temp, JSON.stringify(data, null, 2), { mode: 0o600 }) + try { + await fs.rename(temp, filepath) + } catch (error) { + await fs.unlink(temp).catch(() => {}) + throw error + } + } + + /** Serialized, atomic read-modify-write. The callback may mutate `data` in + * place or return a replacement object. */ + export async function update( + filepath: string, + fn: (data: Record) => Record | void, + ): Promise { + using _ = await Lock.write(filepath) + const data = await load(filepath) + const next = fn(data) ?? data + await replace(filepath, next) + } +} diff --git a/backend/cli/test/auth/auth.test.ts b/backend/cli/test/auth/auth.test.ts new file mode 100644 index 0000000..4fb4cec --- /dev/null +++ b/backend/cli/test/auth/auth.test.ts @@ -0,0 +1,72 @@ +import { test, expect, beforeEach, afterAll } from "bun:test" +import path from "path" +import fs from "fs/promises" +import { Global } from "../../src/global" +import { Auth } from "../../src/auth" + +const filepath = path.join(Global.Path.data, "auth.json") + +async function clean() { + await fs.mkdir(Global.Path.data, { recursive: true }) + const entries = await fs.readdir(Global.Path.data) + await Promise.all( + entries + .filter((name) => name.startsWith("auth.json")) + .map((name) => fs.rm(path.join(Global.Path.data, name), { force: true })), + ) +} + +beforeEach(clean) +afterAll(clean) + +test("concurrent set calls keep every provider", async () => { + await Promise.all([ + Auth.set("provider-a", { type: "api", key: "key-a" }), + Auth.set("provider-b", { type: "api", key: "key-b" }), + Auth.set("provider-c", { type: "oauth", refresh: "refresh-c", access: "access-c", expires: 123 }), + ]) + const all = await Auth.all() + expect(Object.keys(all).sort()).toEqual(["provider-a", "provider-b", "provider-c"]) + expect(all["provider-a"]).toEqual({ type: "api", key: "key-a" }) + + const leftover = (await fs.readdir(Global.Path.data)).filter((name) => name.endsWith(".tmp")) + expect(leftover).toEqual([]) +}) + +test("set on a corrupt auth.json throws and leaves a backup instead of wiping", async () => { + const corrupt = '{"anthropic": {"type": "api", "key": "sk-real"' + await fs.writeFile(filepath, corrupt) + + await expect(Auth.set("openai", { type: "api", key: "sk-new" })).rejects.toThrow(/backed up/) + + // Original file untouched, backup created alongside + expect(await Bun.file(filepath).text()).toBe(corrupt) + expect(await Bun.file(`${filepath}.corrupt-${process.pid}`).text()).toBe(corrupt) + + // Read path still degrades to {} so the CLI can boot + expect(await Auth.all()).toEqual({}) +}) + +test("remove on a corrupt auth.json throws and leaves a backup", async () => { + const corrupt = "not json at all" + await fs.writeFile(filepath, corrupt) + + await expect(Auth.remove("anthropic")).rejects.toThrow(/backed up/) + expect(await Bun.file(filepath).text()).toBe(corrupt) + expect(await Bun.file(`${filepath}.corrupt-${process.pid}`).text()).toBe(corrupt) +}) + +test("remove drops only the named provider", async () => { + await Auth.set("provider-a", { type: "api", key: "key-a" }) + await Auth.set("provider-b", { type: "api", key: "key-b" }) + await Auth.remove("provider-a") + const all = await Auth.all() + expect(Object.keys(all)).toEqual(["provider-b"]) +}) + +test("set preserves entries it does not understand", async () => { + await fs.writeFile(filepath, JSON.stringify({ future: { type: "hologram", shard: 7 } })) + await Auth.set("provider-a", { type: "api", key: "key-a" }) + const raw = (await Bun.file(filepath).json()) as Record + expect(Object.keys(raw).sort()).toEqual(["future", "provider-a"]) +}) diff --git a/backend/cli/test/mcp/auth.test.ts b/backend/cli/test/mcp/auth.test.ts new file mode 100644 index 0000000..172486d --- /dev/null +++ b/backend/cli/test/mcp/auth.test.ts @@ -0,0 +1,53 @@ +import { test, expect, beforeEach, afterAll } from "bun:test" +import path from "path" +import fs from "fs/promises" +import { Global } from "../../src/global" +import { McpAuth } from "../../src/mcp/auth" + +const filepath = path.join(Global.Path.data, "mcp-auth.json") + +async function clean() { + await fs.mkdir(Global.Path.data, { recursive: true }) + const entries = await fs.readdir(Global.Path.data) + await Promise.all( + entries + .filter((name) => name.startsWith("mcp-auth.json")) + .map((name) => fs.rm(path.join(Global.Path.data, name), { force: true })), + ) +} + +beforeEach(clean) +afterAll(clean) + +test("concurrent updates keep every server entry", async () => { + await Promise.all([ + McpAuth.updateTokens("server-a", { accessToken: "token-a" }, "https://a.example"), + McpAuth.updateTokens("server-b", { accessToken: "token-b" }, "https://b.example"), + McpAuth.updateClientInfo("server-c", { clientId: "client-c" }, "https://c.example"), + ]) + const all = await McpAuth.all() + expect(Object.keys(all).sort()).toEqual(["server-a", "server-b", "server-c"]) + expect(all["server-a"].tokens?.accessToken).toBe("token-a") + expect(all["server-c"].clientInfo?.clientId).toBe("client-c") +}) + +test("updateTokens does not drop the rest of an entry", async () => { + await McpAuth.updateClientInfo("server-a", { clientId: "client-a" }, "https://a.example") + await McpAuth.updateTokens("server-a", { accessToken: "token-a" }) + const entry = await McpAuth.get("server-a") + expect(entry?.clientInfo?.clientId).toBe("client-a") + expect(entry?.tokens?.accessToken).toBe("token-a") + expect(entry?.serverUrl).toBe("https://a.example") +}) + +test("set on a corrupt mcp-auth.json throws and leaves a backup", async () => { + const corrupt = "{ definitely not json" + await fs.writeFile(filepath, corrupt) + + await expect(McpAuth.set("server-a", { tokens: { accessToken: "token-a" } })).rejects.toThrow(/backed up/) + expect(await Bun.file(filepath).text()).toBe(corrupt) + expect(await Bun.file(`${filepath}.corrupt-${process.pid}`).text()).toBe(corrupt) + + // Read path still degrades to {} + expect(await McpAuth.all()).toEqual({}) +}) diff --git a/backend/cli/test/mcp/oauth-refresh.test.ts b/backend/cli/test/mcp/oauth-refresh.test.ts new file mode 100644 index 0000000..2a3b7cc --- /dev/null +++ b/backend/cli/test/mcp/oauth-refresh.test.ts @@ -0,0 +1,151 @@ +import { test, expect, beforeEach } from "bun:test" +import path from "path" +import fs from "fs/promises" +import { Global } from "../../src/global" +import { McpAuth } from "../../src/mcp/auth" +import { McpOAuthProvider } from "../../src/mcp/oauth-provider" + +beforeEach(async () => { + await fs.mkdir(Global.Path.data, { recursive: true }) + const entries = await fs.readdir(Global.Path.data) + await Promise.all( + entries + .filter((name) => name.startsWith("mcp-auth.json")) + .map((name) => fs.rm(path.join(Global.Path.data, name), { force: true })), + ) +}) + +// Real OAuth authorization server on a loopback port. Serves discovery +// metadata and a token endpoint whose behavior each test controls. +function serve(token: (params: URLSearchParams, origin: string) => Promise | Response) { + return Bun.serve({ + port: 0, + hostname: "127.0.0.1", + async fetch(req) { + const url = new URL(req.url) + if (url.pathname === "/.well-known/oauth-authorization-server") { + return Response.json({ + issuer: url.origin, + authorization_endpoint: `${url.origin}/authorize`, + token_endpoint: `${url.origin}/token`, + response_types_supported: ["code"], + grant_types_supported: ["authorization_code", "refresh_token"], + token_endpoint_auth_methods_supported: ["none"], + }) + } + if (url.pathname === "/token") { + return token(new URLSearchParams(await req.text()), url.origin) + } + return new Response("not found", { status: 404 }) + }, + }) +} + +function provider(name: string, url: string) { + return new McpOAuthProvider(name, url, { clientId: "client-1" }, { onRedirect: async () => {} }) +} + +test("expired tokens refresh once across concurrent callers", async () => { + const name = "refresh-single-flight" + const counter = { refreshes: 0 } + const server = serve((params) => { + expect(params.get("grant_type")).toBe("refresh_token") + expect(params.get("refresh_token")).toBe("rotate-1") + counter.refreshes++ + return Response.json({ + access_token: "fresh-access", + token_type: "Bearer", + refresh_token: "rotate-2", + expires_in: 3600, + }) + }) + const url = `http://127.0.0.1:${server.port}` + await McpAuth.set( + name, + { tokens: { accessToken: "stale-access", refreshToken: "rotate-1", expiresAt: Date.now() / 1000 - 60 } }, + url, + ) + + const client = provider(name, url) + const [first, second] = await Promise.all([client.tokens(), client.tokens()]) + expect(counter.refreshes).toBe(1) + expect(first?.access_token).toBe("fresh-access") + expect(second?.access_token).toBe("fresh-access") + + const saved = await McpAuth.get(name) + expect(saved?.tokens?.accessToken).toBe("fresh-access") + expect(saved?.tokens?.refreshToken).toBe("rotate-2") + + server.stop(true) + await McpAuth.remove(name) +}) + +test("failed refresh recovers with the rotated token another process persisted", async () => { + const name = "refresh-recovery" + const attempts: string[] = [] + const server = serve(async (params) => { + const sent = params.get("refresh_token") ?? "" + attempts.push(sent) + if (sent === "revoked-1") { + // Simulate the winning process: it already persisted the rotated pair + // (with an expired access token, so recovery must retry the refresh). + await McpAuth.updateTokens(name, { + accessToken: "stale-access", + refreshToken: "rotated-2", + expiresAt: Date.now() / 1000 - 60, + }) + return Response.json({ error: "invalid_grant" }, { status: 400 }) + } + expect(sent).toBe("rotated-2") + return Response.json({ + access_token: "recovered-access", + token_type: "Bearer", + refresh_token: "rotated-3", + expires_in: 3600, + }) + }) + const url = `http://127.0.0.1:${server.port}` + await McpAuth.set( + name, + { tokens: { accessToken: "stale-access", refreshToken: "revoked-1", expiresAt: Date.now() / 1000 - 60 } }, + url, + ) + + const tokens = await provider(name, url).tokens() + expect(attempts).toEqual(["revoked-1", "rotated-2"]) + expect(tokens?.access_token).toBe("recovered-access") + + const saved = await McpAuth.get(name) + expect(saved?.tokens?.refreshToken).toBe("rotated-3") + + server.stop(true) + await McpAuth.remove(name) +}) + +test("failed refresh uses a still-valid access token another process persisted", async () => { + const name = "refresh-reuse" + const attempts: string[] = [] + const server = serve(async (params) => { + attempts.push(params.get("refresh_token") ?? "") + // Simulate the winning process persisting a fresh, unexpired pair. + await McpAuth.updateTokens(name, { + accessToken: "winner-access", + refreshToken: "winner-refresh", + expiresAt: Date.now() / 1000 + 3600, + }) + return Response.json({ error: "invalid_grant" }, { status: 400 }) + }) + const url = `http://127.0.0.1:${server.port}` + await McpAuth.set( + name, + { tokens: { accessToken: "stale-access", refreshToken: "revoked-1", expiresAt: Date.now() / 1000 - 60 } }, + url, + ) + + const tokens = await provider(name, url).tokens() + expect(attempts).toEqual(["revoked-1"]) + expect(tokens?.access_token).toBe("winner-access") + + server.stop(true) + await McpAuth.remove(name) +}) diff --git a/backend/cli/test/openscience-usage.test.ts b/backend/cli/test/openscience-usage.test.ts new file mode 100644 index 0000000..a56ea36 --- /dev/null +++ b/backend/cli/test/openscience-usage.test.ts @@ -0,0 +1,32 @@ +import { test, expect, afterEach } from "bun:test" +import path from "path" +import fs from "fs/promises" +import { Global } from "../src/global" +import { OpenScience } from "../src/openscience" + +const queue = path.join(Global.Path.data, "usage-queue.jsonl") +const session = path.join(Global.Path.data, "openscience-session.json") + +afterEach(async () => { + await fs.rm(queue, { force: true }).catch(() => {}) + await fs.rm(session, { force: true }).catch(() => {}) +}) + +test("flushPendingUsage without a session leaves the queue intact", async () => { + await fs.mkdir(Global.Path.data, { recursive: true }) + await fs.rm(session, { force: true }).catch(() => {}) + const line = JSON.stringify({ service: "test", event_type: "tokens", tokens_used: 1 }) + "\n" + await fs.writeFile(queue, line) + + await OpenScience.flushPendingUsage() + expect(await fs.readFile(queue, "utf-8")).toBe(line) +}) + +test("flushPendingUsage drops malformed lines and removes the empty queue", async () => { + await fs.mkdir(Global.Path.data, { recursive: true }) + await Bun.write(session, JSON.stringify({ api_key: "thk_test.secret", user_id: "user-1" })) + await fs.writeFile(queue, "not-json\nalso not json\n") + + await OpenScience.flushPendingUsage() + expect(await Bun.file(queue).exists()).toBe(false) +})