diff --git a/index.ts b/index.ts index 1eef8e5..2e2f361 100644 --- a/index.ts +++ b/index.ts @@ -1,6 +1,7 @@ -import { execFileSync } from "node:child_process" +import { execFileSync, exec as nodeExec } from "node:child_process" import { createHash } from "node:crypto" import fs from "node:fs" +import { readFile } from "node:fs/promises" import os from "node:os" import path from "node:path" import type { @@ -38,7 +39,7 @@ const REQUESTER_BRIDGE_CAPABILITY_LEGACY = "openclaw.tool.invoke" const REQUESTER_BRIDGE_DEFAULT_TTL_SECONDS = 5 * 60 const GIT_PROBE_TIMEOUT_MS = 250 const LEASE_REQUEST_TIMEOUT_MS = 1200 -const POLL_REQUEST_TIMEOUT_MS = 1200 +const POLL_REQUEST_TIMEOUT_MS = 10_000 const DEFAULT_POLL_INTERVAL_MS = 1000 const DEFAULT_GATEWAY_URL = "http://127.0.0.1:18789" const MAX_ECHO_BYTES = 4096 @@ -344,20 +345,67 @@ function requesterGatewayToken(): string | undefined { ?? safeString(process.env.OPENCLAW_GATEWAY_PASSWORD) } -async function invokeRequesterLocalTool(tool: string, args: Record): Promise<{ ok: true; result: unknown } | { ok: false; error: { code: string; message: string } }> { +function execCommand(command: string, options: { cwd?: string; timeout: number; maxBuffer: number; shell: string }): Promise<{ stdout: string; stderr: string }> { + return new Promise((resolve, reject) => { + nodeExec(command, options, (error, stdout, stderr) => { + if (error) { + ;(error as Error & { stdout?: string; stderr?: string }).stdout = stdout + ;(error as Error & { stdout?: string; stderr?: string }).stderr = stderr + reject(error) + return + } + resolve({ stdout, stderr }) + }) + }) +} + +function textToolResult(text: string): { content: Array<{ type: "text"; text: string }> } { + return { content: [{ type: "text", text }] } +} + +async function invokeRequesterHostTool(tool: string, args: Record): Promise<{ ok: true; result: unknown } | { ok: false; error: { code: string; message: string } }> { + try { + if (tool === "exec") { + const command = safeString(args.command) + if (!command) return { ok: false, error: { code: "invalid_request", message: "exec requires command" } } + const cwd = safeString(args.workdir) ?? safeString(args.cwd) ?? undefined + const timeout = typeof args.timeout === "number" && Number.isFinite(args.timeout) ? Math.max(1, Math.min(args.timeout, 600)) * 1000 : POLL_REQUEST_TIMEOUT_MS + const { stdout, stderr } = await execCommand(command, { cwd, timeout, maxBuffer: 1024 * 1024, shell: "/bin/bash" }) + return { ok: true, result: textToolResult(`${stdout ?? ""}${stderr ?? ""}`) } + } + if (tool === "read") { + const filePath = safeString(args.path) + if (!filePath) return { ok: false, error: { code: "invalid_request", message: "read requires path" } } + const raw = await readFile(filePath, "utf8") + const offset = typeof args.offset === "number" && Number.isFinite(args.offset) ? Math.max(1, Math.floor(args.offset)) : 1 + const limit = typeof args.limit === "number" && Number.isFinite(args.limit) ? Math.max(1, Math.floor(args.limit)) : 2000 + const lines = raw.split(/\r?\n/).slice(offset - 1, offset - 1 + limit).join("\n") + return { ok: true, result: textToolResult(lines) } + } + return invokeRequesterLocalHttpTool(tool, args) + } catch (err) { + const name = err instanceof Error && err.name ? err.name : "requester_host_tool_failed" + const message = err instanceof Error && err.message ? err.message : String(err) + return { ok: false, error: { code: name, message } } + } +} + +async function invokeRequesterLocalHttpTool(tool: string, args: Record): Promise<{ ok: true; result: unknown } | { ok: false; error: { code: string; message: string } }> { const controller = new AbortController() const timer = setTimeout(() => controller.abort(), POLL_REQUEST_TIMEOUT_MS) try { const headers: Record = { "Content-Type": "application/json" } const token = requesterGatewayToken() if (token) headers.Authorization = `Bearer ${token}` + if (isDev) console.debug(`[taas-affinity] local http tool invoke tool=${tool}`) const response = await fetch(`${requesterGatewayUrl().replace(/\/+$/, "")}/tools/invoke`, { method: "POST", headers, - body: JSON.stringify({ tool, args }), + body: JSON.stringify({ name: tool, arguments: args, tool, args }), signal: controller.signal, }) - const json = await response.json().catch(() => undefined) as { ok?: boolean; result?: unknown; error?: { type?: string; code?: string; message?: string } } | undefined + const json = await response.json().catch(() => undefined) as { ok?: boolean; result?: unknown; error?: { code?: string; type?: string; message?: string } } | undefined + if (isDev) console.debug(`[taas-affinity] local http tool response status=${response.status} ok=${json?.ok}`) if (!response.ok || !json || json.ok !== true) { const code = json?.error?.code ?? json?.error?.type ?? `http_${response.status}` const message = json?.error?.message ?? `Requester local tool invoke failed: ${code}` @@ -387,7 +435,7 @@ async function executeSafeBridgeOperation(operation: BridgePollOperation): Promi if (tool === "bridge.echo") { return { ok: true, result: { echo: boundedJson(toolArgs), scaffold: true } } } - return invokeRequesterLocalTool(tool, toolArgs) + return invokeRequesterHostTool(tool, toolArgs) } async function postBridgeResult(resultsUrl: string, descriptor: Record, operation: BridgePollOperation, outcome: Awaited>): Promise { @@ -1014,28 +1062,33 @@ function runAbortCheck(agentsDir?: string): void { // ── Background task scheduler ──────────────────────────────────────────────── const backgroundTimers: (NodeJS.Timeout)[] = [] +function trackBackgroundTimer(timer: NodeJS.Timeout): void { + timer.unref?.() + backgroundTimers.push(timer) +} + function startBackgroundTasks(): void { // Trash sweeper — randomised initial delay (0-30s) to stagger const sweepDelay = Math.floor(Math.random() * 30_000) const sweepInit = setTimeout(() => { runTrashSweep() - backgroundTimers.push(setInterval(() => runTrashSweep(), SWEEP_INTERVAL_MS)) + trackBackgroundTimer(setInterval(() => runTrashSweep(), SWEEP_INTERVAL_MS)) }, sweepDelay) - backgroundTimers.push(sweepInit) + trackBackgroundTimer(sweepInit) // Stuck-run status writer — 5s initial delay, then every 30s const statusInit = setTimeout(() => { writeRunStatus() - backgroundTimers.push(setInterval(() => writeRunStatus(), STATUS_INTERVAL_MS)) + trackBackgroundTimer(setInterval(() => writeRunStatus(), STATUS_INTERVAL_MS)) }, 5_000) - backgroundTimers.push(statusInit) + trackBackgroundTimer(statusInit) // Zombie auto-abort — 10s initial delay, then every AUTO_ABORT_CHECK_INTERVAL_MS const abortInit = setTimeout(() => { runAbortCheck() - backgroundTimers.push(setInterval(() => runAbortCheck(), AUTO_ABORT_CHECK_INTERVAL_MS)) + trackBackgroundTimer(setInterval(() => runAbortCheck(), AUTO_ABORT_CHECK_INTERVAL_MS)) }, 10_000) - backgroundTimers.push(abortInit) + trackBackgroundTimer(abortInit) } const LAST_ROUTE_LIMIT = 256 @@ -1233,7 +1286,7 @@ export default { api.registerProvider({ id: "taas-affinity-hook", label: "CloudSigma TaaS Token Cache Optimizer", - hookAliases: ["cloudsigma", "cloudsigma-staging"], + hookAliases: ["cloudsigma", "cloudsigma-staging", "openai-completions"], auth: [], wrapStreamFn: buildWrapper, resolveTransportTurnState: buildTransportTurnState, diff --git a/openclaw.plugin.json b/openclaw.plugin.json index db3581f..a66b880 100644 --- a/openclaw.plugin.json +++ b/openclaw.plugin.json @@ -1,10 +1,15 @@ { - "id": "openclaw-taas-affinity", - "providers": ["cloudsigma", "cloudsigma-staging"], - "configSchema": { - "type": "object", - "additionalProperties": false, - "properties": {}, - "required": [] - } + "id": "openclaw-taas-affinity", + "providers": [ + "cloudsigma", + "cloudsigma-staging" + ], + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": {}, + "required": [] + }, + "name": "CloudSigma TaaS Affinity", + "description": "Provider plugin: injects X-Session-Id header for TaaS prompt-cache affinity" } diff --git a/test/requester-bridge.test.ts b/test/requester-bridge.test.ts index 8c5e2cd..4180b37 100644 --- a/test/requester-bridge.test.ts +++ b/test/requester-bridge.test.ts @@ -335,7 +335,7 @@ test("plugin executes non-scaffold requester tools through requester-local gatew try { await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl: `http://127.0.0.1:${taasAddress.port}` }) await new Promise((resolve) => setTimeout(resolve, 150)) - assert.deepEqual(gatewayBody, { tool: "prd_list", args: { query: "requester bridge" } }) + assert.deepEqual(gatewayBody, { name: "prd_list", arguments: { query: "requester bridge" }, tool: "prd_list", args: { query: "requester bridge" } }) assert.equal(resultBody.operation_id, "bro_tool") assert.equal(resultBody.ok, true) assert.deepEqual(resultBody.result, { rows: [{ title: "PRD" }] }) diff --git a/test/smoke.mjs b/test/smoke.mjs index 550ff31..6ba9469 100644 --- a/test/smoke.mjs +++ b/test/smoke.mjs @@ -13,7 +13,7 @@ plugin.register({ assert.ok(provider, "provider should be registered") assert.equal(provider.id, "taas-affinity-hook") -assert.deepEqual(provider.hookAliases, ["cloudsigma", "cloudsigma-staging"]) +assert.deepEqual(provider.hookAliases, ["cloudsigma", "cloudsigma-staging", "openai-completions"]) assert.equal(typeof provider.wrapStreamFn, "function") assert.equal(typeof provider.resolveTransportTurnState, "function") @@ -224,3 +224,7 @@ console.log("autorouter capture smoke ok") } console.log("per-agent keying smoke ok") + +// The plugin starts background maintenance timers during registration; the smoke +// test has completed all assertions by this point, so exit explicitly. +process.exit(0)