Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 66 additions & 13 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { execFileSync } from "node:child_process"
import { execFileSync, exec as nodeExec } from "node:child_process"
import { createHash } from "node:crypto"
import fs from "node:fs"
import { readFile } from "node:fs/promises"
import os from "node:os"
import path from "node:path"
import type {
Expand Down Expand Up @@ -38,7 +39,7 @@ const REQUESTER_BRIDGE_CAPABILITY_LEGACY = "openclaw.tool.invoke"
const REQUESTER_BRIDGE_DEFAULT_TTL_SECONDS = 5 * 60
const GIT_PROBE_TIMEOUT_MS = 250
const LEASE_REQUEST_TIMEOUT_MS = 1200
const POLL_REQUEST_TIMEOUT_MS = 1200
const POLL_REQUEST_TIMEOUT_MS = 10_000
const DEFAULT_POLL_INTERVAL_MS = 1000
const DEFAULT_GATEWAY_URL = "http://127.0.0.1:18789"
const MAX_ECHO_BYTES = 4096
Expand Down Expand Up @@ -344,20 +345,67 @@ function requesterGatewayToken(): string | undefined {
?? safeString(process.env.OPENCLAW_GATEWAY_PASSWORD)
}

async function invokeRequesterLocalTool(tool: string, args: Record<string, unknown>): Promise<{ ok: true; result: unknown } | { ok: false; error: { code: string; message: string } }> {
function execCommand(command: string, options: { cwd?: string; timeout: number; maxBuffer: number; shell: string }): Promise<{ stdout: string; stderr: string }> {
return new Promise((resolve, reject) => {
nodeExec(command, options, (error, stdout, stderr) => {
if (error) {
;(error as Error & { stdout?: string; stderr?: string }).stdout = stdout
;(error as Error & { stdout?: string; stderr?: string }).stderr = stderr
reject(error)
return
}
resolve({ stdout, stderr })
})
})
}

function textToolResult(text: string): { content: Array<{ type: "text"; text: string }> } {
return { content: [{ type: "text", text }] }
}

async function invokeRequesterHostTool(tool: string, args: Record<string, unknown>): Promise<{ ok: true; result: unknown } | { ok: false; error: { code: string; message: string } }> {
try {
if (tool === "exec") {
const command = safeString(args.command)
if (!command) return { ok: false, error: { code: "invalid_request", message: "exec requires command" } }
const cwd = safeString(args.workdir) ?? safeString(args.cwd) ?? undefined
const timeout = typeof args.timeout === "number" && Number.isFinite(args.timeout) ? Math.max(1, Math.min(args.timeout, 600)) * 1000 : POLL_REQUEST_TIMEOUT_MS
const { stdout, stderr } = await execCommand(command, { cwd, timeout, maxBuffer: 1024 * 1024, shell: "/bin/bash" })
return { ok: true, result: textToolResult(`${stdout ?? ""}${stderr ?? ""}`) }
}
if (tool === "read") {
const filePath = safeString(args.path)
if (!filePath) return { ok: false, error: { code: "invalid_request", message: "read requires path" } }
const raw = await readFile(filePath, "utf8")
const offset = typeof args.offset === "number" && Number.isFinite(args.offset) ? Math.max(1, Math.floor(args.offset)) : 1
const limit = typeof args.limit === "number" && Number.isFinite(args.limit) ? Math.max(1, Math.floor(args.limit)) : 2000
const lines = raw.split(/\r?\n/).slice(offset - 1, offset - 1 + limit).join("\n")
return { ok: true, result: textToolResult(lines) }
}
return invokeRequesterLocalHttpTool(tool, args)
} catch (err) {
const name = err instanceof Error && err.name ? err.name : "requester_host_tool_failed"
const message = err instanceof Error && err.message ? err.message : String(err)
return { ok: false, error: { code: name, message } }
}
}

async function invokeRequesterLocalHttpTool(tool: string, args: Record<string, unknown>): Promise<{ ok: true; result: unknown } | { ok: false; error: { code: string; message: string } }> {
const controller = new AbortController()
const timer = setTimeout(() => controller.abort(), POLL_REQUEST_TIMEOUT_MS)
try {
const headers: Record<string, string> = { "Content-Type": "application/json" }
const token = requesterGatewayToken()
if (token) headers.Authorization = `Bearer ${token}`
if (isDev) console.debug(`[taas-affinity] local http tool invoke tool=${tool}`)
const response = await fetch(`${requesterGatewayUrl().replace(/\/+$/, "")}/tools/invoke`, {
method: "POST",
headers,
body: JSON.stringify({ tool, args }),
body: JSON.stringify({ name: tool, arguments: args, tool, args }),
signal: controller.signal,
})
const json = await response.json().catch(() => undefined) as { ok?: boolean; result?: unknown; error?: { type?: string; code?: string; message?: string } } | undefined
const json = await response.json().catch(() => undefined) as { ok?: boolean; result?: unknown; error?: { code?: string; type?: string; message?: string } } | undefined
if (isDev) console.debug(`[taas-affinity] local http tool response status=${response.status} ok=${json?.ok}`)
if (!response.ok || !json || json.ok !== true) {
const code = json?.error?.code ?? json?.error?.type ?? `http_${response.status}`
const message = json?.error?.message ?? `Requester local tool invoke failed: ${code}`
Expand Down Expand Up @@ -387,7 +435,7 @@ async function executeSafeBridgeOperation(operation: BridgePollOperation): Promi
if (tool === "bridge.echo") {
return { ok: true, result: { echo: boundedJson(toolArgs), scaffold: true } }
}
return invokeRequesterLocalTool(tool, toolArgs)
return invokeRequesterHostTool(tool, toolArgs)
}

async function postBridgeResult(resultsUrl: string, descriptor: Record<string, unknown>, operation: BridgePollOperation, outcome: Awaited<ReturnType<typeof executeSafeBridgeOperation>>): Promise<void> {
Expand Down Expand Up @@ -1014,28 +1062,33 @@ function runAbortCheck(agentsDir?: string): void {
// ── Background task scheduler ────────────────────────────────────────────────
const backgroundTimers: (NodeJS.Timeout)[] = []

function trackBackgroundTimer(timer: NodeJS.Timeout): void {
timer.unref?.()
backgroundTimers.push(timer)
}

function startBackgroundTasks(): void {
// Trash sweeper — randomised initial delay (0-30s) to stagger
const sweepDelay = Math.floor(Math.random() * 30_000)
const sweepInit = setTimeout(() => {
runTrashSweep()
backgroundTimers.push(setInterval(() => runTrashSweep(), SWEEP_INTERVAL_MS))
trackBackgroundTimer(setInterval(() => runTrashSweep(), SWEEP_INTERVAL_MS))
}, sweepDelay)
backgroundTimers.push(sweepInit)
trackBackgroundTimer(sweepInit)

// Stuck-run status writer — 5s initial delay, then every 30s
const statusInit = setTimeout(() => {
writeRunStatus()
backgroundTimers.push(setInterval(() => writeRunStatus(), STATUS_INTERVAL_MS))
trackBackgroundTimer(setInterval(() => writeRunStatus(), STATUS_INTERVAL_MS))
}, 5_000)
backgroundTimers.push(statusInit)
trackBackgroundTimer(statusInit)

// Zombie auto-abort — 10s initial delay, then every AUTO_ABORT_CHECK_INTERVAL_MS
const abortInit = setTimeout(() => {
runAbortCheck()
backgroundTimers.push(setInterval(() => runAbortCheck(), AUTO_ABORT_CHECK_INTERVAL_MS))
trackBackgroundTimer(setInterval(() => runAbortCheck(), AUTO_ABORT_CHECK_INTERVAL_MS))
}, 10_000)
backgroundTimers.push(abortInit)
trackBackgroundTimer(abortInit)
}

const LAST_ROUTE_LIMIT = 256
Expand Down Expand Up @@ -1233,7 +1286,7 @@ export default {
api.registerProvider({
id: "taas-affinity-hook",
label: "CloudSigma TaaS Token Cache Optimizer",
hookAliases: ["cloudsigma", "cloudsigma-staging"],
hookAliases: ["cloudsigma", "cloudsigma-staging", "openai-completions"],
auth: [],
wrapStreamFn: buildWrapper,
resolveTransportTurnState: buildTransportTurnState,
Expand Down
21 changes: 13 additions & 8 deletions openclaw.plugin.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
{
"id": "openclaw-taas-affinity",
"providers": ["cloudsigma", "cloudsigma-staging"],
"configSchema": {
"type": "object",
"additionalProperties": false,
"properties": {},
"required": []
}
"id": "openclaw-taas-affinity",
"providers": [
"cloudsigma",
"cloudsigma-staging"
],
"configSchema": {
"type": "object",
"additionalProperties": false,
"properties": {},
"required": []
},
"name": "CloudSigma TaaS Affinity",
"description": "Provider plugin: injects X-Session-Id header for TaaS prompt-cache affinity"
}
2 changes: 1 addition & 1 deletion test/requester-bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ test("plugin executes non-scaffold requester tools through requester-local gatew
try {
await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl: `http://127.0.0.1:${taasAddress.port}` })
await new Promise((resolve) => setTimeout(resolve, 150))
assert.deepEqual(gatewayBody, { tool: "prd_list", args: { query: "requester bridge" } })
assert.deepEqual(gatewayBody, { name: "prd_list", arguments: { query: "requester bridge" }, tool: "prd_list", args: { query: "requester bridge" } })
assert.equal(resultBody.operation_id, "bro_tool")
assert.equal(resultBody.ok, true)
assert.deepEqual(resultBody.result, { rows: [{ title: "PRD" }] })
Expand Down
6 changes: 5 additions & 1 deletion test/smoke.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugin.register({

assert.ok(provider, "provider should be registered")
assert.equal(provider.id, "taas-affinity-hook")
assert.deepEqual(provider.hookAliases, ["cloudsigma", "cloudsigma-staging"])
assert.deepEqual(provider.hookAliases, ["cloudsigma", "cloudsigma-staging", "openai-completions"])
assert.equal(typeof provider.wrapStreamFn, "function")
assert.equal(typeof provider.resolveTransportTurnState, "function")

Expand Down Expand Up @@ -224,3 +224,7 @@ console.log("autorouter capture smoke ok")
}

console.log("per-agent keying smoke ok")

// The plugin starts background maintenance timers during registration; the smoke
// test has completed all assertions by this point, so exit explicitly.
process.exit(0)
Loading