Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions backend/cli/src/auth/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import path from "path"
import { Global } from "../global"
import { JsonStore } from "../util/jsonstore"
import z from "zod"

export const OAUTH_DUMMY_KEY = "synsc-oauth-dummy-key"
Expand Down Expand Up @@ -42,8 +43,7 @@ export namespace Auth {
}

export async function all(): Promise<Record<string, Info>> {
const file = Bun.file(filepath)
const data = await file.json().catch(() => ({}) as Record<string, unknown>)
const data = await JsonStore.read(filepath)
return Object.entries(data).reduce(
(acc, [key, value]) => {
const parsed = Info.safeParse(value)
Expand All @@ -56,15 +56,12 @@ export namespace Auth {
}

export async function set(key: string, info: Info) {
const file = Bun.file(filepath)
const data = await all()
await Bun.write(file, JSON.stringify({ ...data, [key]: info }, null, 2), { mode: 0o600 })
await JsonStore.update(filepath, (data) => ({ ...data, [key]: info }))
}

export async function remove(key: string) {
const file = Bun.file(filepath)
const data = await all()
delete data[key]
await Bun.write(file, JSON.stringify(data, null, 2), { mode: 0o600 })
await JsonStore.update(filepath, (data) => {
delete data[key]
})
}
}
81 changes: 47 additions & 34 deletions backend/cli/src/mcp/auth.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import path from "path"
import z from "zod"
import { Global } from "../global"
import { JsonStore } from "../util/jsonstore"

export namespace McpAuth {
export const Tokens = z.object({
Expand Down Expand Up @@ -53,57 +54,70 @@ export namespace McpAuth {
}

export async function all(): Promise<Record<string, Entry>> {
const file = Bun.file(filepath)
return file.json().catch(() => ({}))
return (await JsonStore.read(filepath)) as Record<string, Entry>
}

/** Atomic read-modify-write of one server's entry. Serialized behind the
* store lock so concurrent updates can't drop each other's writes. */
async function update(mcpName: string, fn: (entry: Entry) => void, serverUrl?: string): Promise<void> {
await JsonStore.update(filepath, (data) => {
const entry = (data[mcpName] ?? {}) as Entry
fn(entry)
// Always update serverUrl if provided
if (serverUrl) entry.serverUrl = serverUrl
data[mcpName] = entry
})
}

export async function set(mcpName: string, entry: Entry, serverUrl?: string): Promise<void> {
const file = Bun.file(filepath)
const data = await all()
// Always update serverUrl if provided
if (serverUrl) {
entry.serverUrl = serverUrl
}
await Bun.write(file, JSON.stringify({ ...data, [mcpName]: entry }, null, 2), { mode: 0o600 })
if (serverUrl) entry.serverUrl = serverUrl
await JsonStore.update(filepath, (data) => ({ ...data, [mcpName]: entry }))
}

export async function remove(mcpName: string): Promise<void> {
const file = Bun.file(filepath)
const data = await all()
delete data[mcpName]
await Bun.write(file, JSON.stringify(data, null, 2), { mode: 0o600 })
await JsonStore.update(filepath, (data) => {
delete data[mcpName]
})
}

export async function updateTokens(mcpName: string, tokens: Tokens, serverUrl?: string): Promise<void> {
const entry = (await get(mcpName)) ?? {}
entry.tokens = tokens
await set(mcpName, entry, serverUrl)
await update(
mcpName,
(entry) => {
entry.tokens = tokens
},
serverUrl,
)
}

export async function updateClientInfo(mcpName: string, clientInfo: ClientInfo, serverUrl?: string): Promise<void> {
const entry = (await get(mcpName)) ?? {}
entry.clientInfo = clientInfo
await set(mcpName, entry, serverUrl)
await update(
mcpName,
(entry) => {
entry.clientInfo = clientInfo
},
serverUrl,
)
}

export async function updateCodeVerifier(mcpName: string, codeVerifier: string): Promise<void> {
const entry = (await get(mcpName)) ?? {}
entry.codeVerifier = codeVerifier
await set(mcpName, entry)
await update(mcpName, (entry) => {
entry.codeVerifier = codeVerifier
})
}

export async function clearCodeVerifier(mcpName: string): Promise<void> {
const entry = await get(mcpName)
if (entry) {
delete entry.codeVerifier
await set(mcpName, entry)
}
await JsonStore.update(filepath, (data) => {
const entry = data[mcpName] as Entry | undefined
if (entry) delete entry.codeVerifier
})
}

export async function updateOAuthState(mcpName: string, oauthState: string): Promise<void> {
const entry = (await get(mcpName)) ?? {}
entry.oauthState = oauthState
await set(mcpName, entry)
await update(mcpName, (entry) => {
entry.oauthState = oauthState
})
}

export async function getOAuthState(mcpName: string): Promise<string | undefined> {
Expand All @@ -112,11 +126,10 @@ export namespace McpAuth {
}

export async function clearOAuthState(mcpName: string): Promise<void> {
const entry = await get(mcpName)
if (entry) {
delete entry.oauthState
await set(mcpName, entry)
}
await JsonStore.update(filepath, (data) => {
const entry = data[mcpName] as Entry | undefined
if (entry) delete entry.oauthState
})
}

/**
Expand Down
95 changes: 88 additions & 7 deletions backend/cli/src/mcp/oauth-provider.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import type { OAuthClientProvider } from "@modelcontextprotocol/sdk/client/auth.js"
import {
discoverAuthorizationServerMetadata,
discoverOAuthProtectedResourceMetadata,
refreshAuthorization,
selectResourceURL,
type OAuthClientProvider,
} from "@modelcontextprotocol/sdk/client/auth.js"
import type {
OAuthClientMetadata,
OAuthTokens,
Expand All @@ -10,6 +16,12 @@ import { Log } from "../util/log"

const log = Log.create({ service: "mcp.oauth" })

// Single-flight token refresh per server. Servers that rotate the refresh
// token invalidate the old one on every refresh, so two concurrent refreshes
// in this process would leave one caller holding a revoked token. Mirrors the
// codex recovery pattern in plugin/codex.ts.
const refreshing = new Map<string, Promise<McpAuth.Tokens | undefined>>()

const OAUTH_CALLBACK_PORT = 19876
const OAUTH_CALLBACK_PATH = "/mcp/oauth/callback"

Expand Down Expand Up @@ -96,14 +108,83 @@ export class McpOAuthProvider implements OAuthClientProvider {
const entry = await McpAuth.getForUrl(this.mcpName, this.serverUrl)
if (!entry?.tokens) return undefined

const expired = entry.tokens.expiresAt !== undefined && entry.tokens.expiresAt < Date.now() / 1000
if (!expired || !entry.tokens.refreshToken) return this.format(entry.tokens)

const refreshed = await this.single(entry.tokens.refreshToken)
if (refreshed) return this.format(refreshed)

// Refresh failed even after recovery — hand back the stored tokens so
// the SDK's own auth flow surfaces re-authentication.
return this.format(entry.tokens)
}

private format(tokens: McpAuth.Tokens): OAuthTokens {
return {
access_token: entry.tokens.accessToken,
access_token: tokens.accessToken,
token_type: "Bearer",
refresh_token: entry.tokens.refreshToken,
expires_in: entry.tokens.expiresAt
? Math.max(0, Math.floor(entry.tokens.expiresAt - Date.now() / 1000))
: undefined,
scope: entry.tokens.scope,
refresh_token: tokens.refreshToken,
expires_in: tokens.expiresAt ? Math.max(0, Math.floor(tokens.expiresAt - Date.now() / 1000)) : undefined,
scope: tokens.scope,
}
}

/** Single-flight wrapper: concurrent callers share one refresh round-trip. */
private single(refreshToken: string): Promise<McpAuth.Tokens | undefined> {
const inflight = refreshing.get(this.mcpName)
if (inflight) return inflight
const started = this.recover(refreshToken).finally(() => {
refreshing.delete(this.mcpName)
})
refreshing.set(this.mcpName, started)
return started
}

/** Refresh with cross-process recovery. The single-flight guard only
* covers this process; when another openscience process wins a refresh
* race against a rotating-refresh server, it has already persisted the
* rotated pair. Re-read the store before surfacing re-auth, and retry
* once with the rotated token. */
private async recover(refreshToken: string): Promise<McpAuth.Tokens | undefined> {
try {
return await this.refresh(refreshToken)
} catch (error) {
const latest = (await McpAuth.getForUrl(this.mcpName, this.serverUrl))?.tokens
const valid = latest?.expiresAt === undefined || latest.expiresAt > Date.now() / 1000
if (latest?.accessToken && valid) return latest
if (latest?.refreshToken && latest.refreshToken !== refreshToken) {
const retried = await this.refresh(latest.refreshToken).catch(() => undefined)
if (retried) return retried
}
log.warn("token refresh failed", {
mcpName: this.mcpName,
error: error instanceof Error ? error.message : String(error),
})
return undefined
}
}

/** One refresh round-trip via the SDK helpers, persisted on success. */
private async refresh(refreshToken: string): Promise<McpAuth.Tokens> {
const client = await this.clientInformation()
if (!client) throw new Error(`no OAuth client information for MCP server: ${this.mcpName}`)
const metadata = await discoverOAuthProtectedResourceMetadata(this.serverUrl).catch(() => undefined)
const issuer = metadata?.authorization_servers?.[0] ?? new URL("/", this.serverUrl)
const server = await discoverAuthorizationServerMetadata(issuer)
const resource = await selectResourceURL(this.serverUrl, this, metadata)
const tokens = await refreshAuthorization(issuer, {
metadata: server,
clientInformation: client,
refreshToken,
resource,
})
await this.saveTokens(tokens)
log.info("refreshed oauth tokens", { mcpName: this.mcpName })
return {
accessToken: tokens.access_token,
refreshToken: tokens.refresh_token,
expiresAt: tokens.expires_in ? Date.now() / 1000 + tokens.expires_in : undefined,
scope: tokens.scope,
}
}

Expand Down
71 changes: 47 additions & 24 deletions backend/cli/src/openscience/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { fileURLToPath } from "url"
import { randomUUID } from "crypto"
import { Global } from "../global"
import { Log } from "../util/log"
import { Lock } from "../util/lock"
import { Env } from "../env"
import { Auth } from "../auth"
import { DEFAULT_MANAGED_API_BASE, MANAGED_API_BASE } from "../endpoints"
Expand Down Expand Up @@ -1158,6 +1159,9 @@ export namespace OpenScience {

async function persistToQueue(params: UsageParams) {
try {
// Serialize against flushPendingUsage so an append can't land between
// the flusher's read and its final rewrite (which would delete it).
using _ = await Lock.write(pendingQueuePath)
await fs.appendFile(pendingQueuePath, JSON.stringify(params) + "\n")
log.info("usage queued for retry", { service: params.service })
} catch (e) {
Expand Down Expand Up @@ -1247,40 +1251,59 @@ export namespace OpenScience {
return null
}

/** Retry any queued usage reports from previous failures (called at startup) */
/** Retry any queued usage reports from previous failures (called at startup).
* Holds the queue lock across the whole read → send → rewrite cycle so a
* concurrent flush in this process can't double-send, and the rewrite
* drops only the lines actually read so an append landing mid-flush
* survives. Best-effort: never throws. */
export async function flushPendingUsage(): Promise<void> {
let lines: string[]
try {
const raw = await fs.readFile(pendingQueuePath, "utf-8")
lines = raw.split("\n").filter(Boolean)
} catch {
return
}
if (!lines.length) return
using _ = await Lock.write(pendingQueuePath)
const raw = await fs.readFile(pendingQueuePath, "utf-8").catch(() => "")
const lines = raw.split("\n").filter(Boolean)
if (!lines.length) return

const session = await getSession()
if (!session) return
const session = await getSession()
if (!session) return

const remaining: string[] = []
for (const line of lines) {
try {
const params: UsageParams = JSON.parse(line)
const result = await sendReport(params, session)
if (!result.ok && !result.permanent) {
remaining.push(line)
const retry: string[] = []
for (const line of lines) {
try {
const params: UsageParams = JSON.parse(line)
const result = await sendReport(params, session)
if (!result.ok && !result.permanent) {
retry.push(line)
}
} catch {
// malformed line, drop it
}
} catch {
// malformed line, drop it
}
}

try {
// Re-read before rewriting: another process may have appended while
// reports were sending. Remove only the lines read above (counted, so
// a report queued twice is removed exactly twice) and keep the rest.
const consumed = new Map<string, number>()
for (const line of lines) consumed.set(line, (consumed.get(line) ?? 0) + 1)
const current = await fs.readFile(pendingQueuePath, "utf-8").catch(() => "")
const appended = current
.split("\n")
.filter(Boolean)
.filter((line) => {
const count = consumed.get(line) ?? 0
if (!count) return true
consumed.set(line, count - 1)
return false
})

const remaining = [...retry, ...appended]
if (remaining.length) {
await fs.writeFile(pendingQueuePath, remaining.join("\n") + "\n")
} else {
await fs.unlink(pendingQueuePath)
return
}
} catch {}
await fs.unlink(pendingQueuePath).catch(() => {})
} catch (e) {
log.warn("usage queue flush failed", { error: e instanceof Error ? e.message : String(e) })
}
}

// === Learned Skills (RSI) ===
Expand Down
Loading
Loading