From 7e60f8fa401cfd514bc7280f99c95a9af1a33645 Mon Sep 17 00:00:00 2001 From: Gold Date: Mon, 22 Jun 2026 12:13:53 -0300 Subject: [PATCH] fix(timeouts): make consolidation timeouts configurable --- .env.example | 2 + README.md | 6 ++ deploy/coolify/entrypoint.sh | 2 +- deploy/fly/entrypoint.sh | 2 +- deploy/railway/entrypoint.sh | 2 +- deploy/render/entrypoint.sh | 2 +- eval/scripts/sandbox.sh | 2 +- iii-config.docker.yaml | 2 +- iii-config.yaml | 2 +- src/config.ts | 31 ++++++ src/functions/consolidate.ts | 18 +++- src/index.ts | 7 +- test/config-timeouts.test.ts | 192 +++++++++++++++++++++++++++++++++++ 13 files changed, 260 insertions(+), 10 deletions(-) create mode 100644 test/config-timeouts.test.ts diff --git a/.env.example b/.env.example index 77ca0f3a3..53e0f6c34 100644 --- a/.env.example +++ b/.env.example @@ -50,6 +50,8 @@ # embedding). The OpenAI LLM path also honours the OpenAI-scoped # OPENAI_TIMEOUT_MS alias for back-compat with v0.9.17 (precedence). # AGENTMEMORY_LLM_TIMEOUT_MS=60000 # Default: 60 000 ms (60 s) +# AGENTMEMORY_CONSOLIDATION_COMPRESS_TIMEOUT_MS=120000 # Optional: consolidate per-concept LLM timeout (1 000..300 000 ms). +# AGENTMEMORY_INVOCATION_TIMEOUT_MS=600000 # Optional: worker invocation timeout (1 000..600 000 ms). # Opt-in Claude-subscription fallback (spawns @anthropic-ai/claude-agent-sdk # child sessions). Off by default — the agent-sdk fallback can trigger diff --git a/README.md b/README.md index c4ec2c1e0..64a6d3217 100644 --- a/README.md +++ b/README.md @@ -1430,6 +1430,12 @@ Create `~/.agentmemory/.env`: # with v0.9.17. # Increase for slow networks or large batch calls; # decrease to fail-fast on rate-limit holds. +# AGENTMEMORY_CONSOLIDATION_COMPRESS_TIMEOUT_MS=120000 + # Optional: per-concept consolidate timeout in ms. + # Bounded to 1 000..300 000. +# AGENTMEMORY_INVOCATION_TIMEOUT_MS=600000 + # Optional: worker invocation timeout in ms. + # Bounded to 1 000..600 000. # Search tuning # BM25_WEIGHT=0.4 diff --git a/deploy/coolify/entrypoint.sh b/deploy/coolify/entrypoint.sh index ffdd63339..76577b6f6 100755 --- a/deploy/coolify/entrypoint.sh +++ b/deploy/coolify/entrypoint.sh @@ -29,7 +29,7 @@ workers: config: port: 3111 host: 0.0.0.0 - default_timeout: 180000 + default_timeout: 600000 cors: allowed_origins: - "http://localhost:3111" diff --git a/deploy/fly/entrypoint.sh b/deploy/fly/entrypoint.sh index 5fd4cc268..fcff689a0 100755 --- a/deploy/fly/entrypoint.sh +++ b/deploy/fly/entrypoint.sh @@ -29,7 +29,7 @@ workers: config: port: 3111 host: 0.0.0.0 - default_timeout: 180000 + default_timeout: 600000 cors: allowed_origins: - "http://localhost:3111" diff --git a/deploy/railway/entrypoint.sh b/deploy/railway/entrypoint.sh index ffdd63339..76577b6f6 100755 --- a/deploy/railway/entrypoint.sh +++ b/deploy/railway/entrypoint.sh @@ -29,7 +29,7 @@ workers: config: port: 3111 host: 0.0.0.0 - default_timeout: 180000 + default_timeout: 600000 cors: allowed_origins: - "http://localhost:3111" diff --git a/deploy/render/entrypoint.sh b/deploy/render/entrypoint.sh index ffdd63339..76577b6f6 100755 --- a/deploy/render/entrypoint.sh +++ b/deploy/render/entrypoint.sh @@ -29,7 +29,7 @@ workers: config: port: 3111 host: 0.0.0.0 - default_timeout: 180000 + default_timeout: 600000 cors: allowed_origins: - "http://localhost:3111" diff --git a/eval/scripts/sandbox.sh b/eval/scripts/sandbox.sh index 5d4023300..571ae80c7 100755 --- a/eval/scripts/sandbox.sh +++ b/eval/scripts/sandbox.sh @@ -41,7 +41,7 @@ workers: config: port: $SANDBOX_PORT host: 127.0.0.1 - default_timeout: 180000 + default_timeout: 600000 cors: allowed_origins: ["http://localhost:$SANDBOX_PORT", "http://127.0.0.1:$SANDBOX_PORT"] allowed_methods: [GET, POST, PUT, DELETE, OPTIONS] diff --git a/iii-config.docker.yaml b/iii-config.docker.yaml index 682236ec3..792648476 100644 --- a/iii-config.docker.yaml +++ b/iii-config.docker.yaml @@ -3,7 +3,7 @@ workers: config: port: 3111 host: 0.0.0.0 - default_timeout: 180000 + default_timeout: 600000 cors: allowed_origins: ["http://localhost:3111", "http://localhost:3113", "http://127.0.0.1:3111", "http://127.0.0.1:3113"] allowed_methods: [GET, POST, PUT, DELETE, OPTIONS] diff --git a/iii-config.yaml b/iii-config.yaml index 62892f514..9a0a4b53b 100644 --- a/iii-config.yaml +++ b/iii-config.yaml @@ -3,7 +3,7 @@ workers: config: port: 3111 host: 127.0.0.1 - default_timeout: 180000 + default_timeout: 600000 cors: allowed_origins: ["http://localhost:3111", "http://localhost:3113", "http://127.0.0.1:3111", "http://127.0.0.1:3113"] allowed_methods: [GET, POST, PUT, DELETE, OPTIONS] diff --git a/src/config.ts b/src/config.ts index f68da2e31..01d004d1d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -16,6 +16,37 @@ function safeParseInt(value: string | undefined, fallback: number): number { return Number.isNaN(parsed) ? fallback : parsed; } +type PositiveIntOptions = { + min?: number; + max?: number; +}; + +export function parsePositiveInt( + value: string | undefined, + options: PositiveIntOptions = {}, +): number | undefined { + if (!value) return undefined; + const trimmed = value.trim(); + if (!/^\d+$/.test(trimmed)) return undefined; + const parsed = Number(trimmed); + if (!Number.isFinite(parsed) || parsed <= 0) return undefined; + if (options.min !== undefined && parsed < options.min) return undefined; + if (options.max !== undefined && parsed > options.max) return undefined; + return parsed; +} + +export function resolveTimeoutMs( + values: Array, + fallback: number, + options: PositiveIntOptions = {}, +): number { + for (const value of values) { + const parsed = parsePositiveInt(value, options); + if (parsed !== undefined) return parsed; + } + return fallback; +} + const DATA_DIR = join(homedir(), ".agentmemory"); const ENV_FILE = join(DATA_DIR, ".env"); diff --git a/src/functions/consolidate.ts b/src/functions/consolidate.ts index d394e1766..eb658c5f1 100644 --- a/src/functions/consolidate.ts +++ b/src/functions/consolidate.ts @@ -7,6 +7,7 @@ import type { } from "../types.js"; import { KV, generateId } from "../state/schema.js"; import { StateKV } from "../state/kv.js"; +import { getEnvVar, resolveTimeoutMs } from "../config.js"; import { recordAudit } from "./audit.js"; const CONSOLIDATION_SYSTEM = `You are a memory consolidation engine. Given a set of related observations from coding sessions, synthesize them into a single long-term memory. @@ -138,6 +139,17 @@ export function registerConsolidateFunction( ) .join("\n\n"); + const consolidationCompressTimeoutMs = resolveTimeoutMs( + [ + getEnvVar("AGENTMEMORY_CONSOLIDATION_COMPRESS_TIMEOUT_MS"), + getEnvVar("OPENAI_TIMEOUT_MS"), + getEnvVar("AGENTMEMORY_LLM_TIMEOUT_MS"), + ], + 120_000, + { min: 1_000, max: 300_000 }, + ); + llmCallCount++; + try { const response = await Promise.race([ provider.compress( @@ -145,10 +157,12 @@ export function registerConsolidateFunction( `Concept: "${concept}"\n\nObservations:\n${prompt}`, ), new Promise((_, reject) => - setTimeout(() => reject(new Error("compress timeout")), 30_000), + setTimeout( + () => reject(new Error("compress timeout")), + consolidationCompressTimeoutMs, + ), ), ]); - llmCallCount++; const parsed = parseMemoryXml(response, sessionIds); if (!parsed) continue; diff --git a/src/index.ts b/src/index.ts index 4233e8a67..bef942492 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,6 +2,7 @@ import { registerWorker } from "iii-sdk"; import { loadConfig, getEnvVar, + resolveTimeoutMs, loadEmbeddingConfig, loadFallbackConfig, loadClaudeBridgeConfig, @@ -194,7 +195,11 @@ async function main() { const sdk = registerWorker(config.engineUrl, { workerName: "agentmemory", - invocationTimeoutMs: 180000, + invocationTimeoutMs: resolveTimeoutMs( + [getEnvVar("AGENTMEMORY_INVOCATION_TIMEOUT_MS")], + 600_000, + { min: 1_000, max: 600_000 }, + ), otel: { serviceName: OTEL_CONFIG.serviceName, serviceVersion: OTEL_CONFIG.serviceVersion, diff --git a/test/config-timeouts.test.ts b/test/config-timeouts.test.ts new file mode 100644 index 000000000..b4b3d0a0c --- /dev/null +++ b/test/config-timeouts.test.ts @@ -0,0 +1,192 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { MemoryProvider } from "../src/types.js"; +import { registerConsolidateFunction } from "../src/functions/consolidate.js"; +import { KV } from "../src/state/schema.js"; +import type { CompressedObservation, Session } from "../src/types.js"; + +vi.mock("../src/logger.js", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +vi.mock("../src/functions/audit.js", () => ({ + recordAudit: vi.fn(), +})); + +function makeMockKV() { + const store = new Map>(); + return { + get: async (scope: string, key: string): Promise => + (store.get(scope)?.get(key) as T) ?? null, + set: async (scope: string, key: string, data: T): Promise => { + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, data); + return data; + }, + delete: async (scope: string, key: string): Promise => { + store.get(scope)?.delete(key); + }, + list: async (scope: string): Promise => { + const entries = store.get(scope); + return entries ? (Array.from(entries.values()) as T[]) : []; + }, + }; +} + +function makeMockSdk() { + const functions = new Map(); + return { + registerFunction: (id: string, handler: Function) => { + functions.set(id, handler); + }, + registerTrigger: () => {}, + trigger: async (id: string, payload: unknown) => { + const fn = functions.get(id); + if (!fn) throw new Error(`No function registered: ${id}`); + return fn(payload); + }, + }; +} + +function makeSession(id: string): Session { + return { + id, + project: "agentmemory", + cwd: "/tmp/agentmemory", + startedAt: new Date().toISOString(), + status: "completed", + observationCount: 5, + }; +} + +function makeObs(id: string, sessionId: string, concept: string): CompressedObservation { + return { + id, + sessionId, + timestamp: new Date().toISOString(), + type: "decision", + title: `obs ${id}`, + facts: ["fact"], + narrative: "narrative", + concepts: [concept], + files: ["src/index.ts"], + importance: 8, + }; +} + +async function seedObservations( + kv: ReturnType, + sessionId: string, + concepts: string[], +) { + for (const concept of concepts) { + for (let i = 0; i < 3; i++) { + await kv.set( + KV.observations(sessionId), + `${concept}_${i}`, + makeObs(`${concept}_${i}`, sessionId, concept), + ); + } + } +} + +describe("timeout configurability regressions", () => { + afterEach(() => { + delete process.env["AGENTMEMORY_CONSOLIDATION_COMPRESS_TIMEOUT_MS"]; + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + delete process.env["OPENAI_TIMEOUT_MS"]; + delete process.env["AGENTMEMORY_INVOCATION_TIMEOUT_MS"]; + }); + + it("parsePositiveInt accepts digits only, supports bounds, and rejects malformed values", async () => { + vi.resetModules(); + const { parsePositiveInt } = await import("../src/config.js"); + expect(parsePositiveInt("600000")).toBe(600000); + expect(parsePositiveInt("999", { min: 1_000 })).toBeUndefined(); + expect(parsePositiveInt("600001", { max: 600_000 })).toBeUndefined(); + expect(parsePositiveInt("30ms")).toBeUndefined(); + expect(parsePositiveInt("1_000")).toBeUndefined(); + expect(parsePositiveInt(undefined)).toBeUndefined(); + }); + + it("resolveTimeoutMs honors precedence and falls back when values are invalid or out of range", async () => { + vi.resetModules(); + const { resolveTimeoutMs } = await import("../src/config.js"); + + expect(resolveTimeoutMs(["1500", "2500"], 60_000, { min: 1_000, max: 10_000 })).toBe(1500); + expect(resolveTimeoutMs(["999", "2500"], 60_000, { min: 1_000, max: 10_000 })).toBe(2500); + expect(resolveTimeoutMs(["700000"], 60_000, { min: 1_000, max: 600_000 })).toBe(60_000); + expect(resolveTimeoutMs(["bad", undefined], 60_000, { min: 1_000, max: 600_000 })).toBe(60_000); + }); + + it("mem::consolidate honors AGENTMEMORY_CONSOLIDATION_COMPRESS_TIMEOUT_MS", async () => { + vi.useFakeTimers(); + process.env["AGENTMEMORY_CONSOLIDATION_COMPRESS_TIMEOUT_MS"] = "1000"; + + const sdk = makeMockSdk(); + const kv = makeMockKV(); + const provider: MemoryProvider = { + name: "mock", + compress: vi.fn().mockImplementation(() => new Promise(() => {})), + embed: vi.fn().mockResolvedValue(new Float32Array(384)), + embedBatch: vi.fn().mockResolvedValue([]), + dimensions: 384, + compressionModel: "mock", + }; + + const session = makeSession("sess_timeout"); + await kv.set(KV.sessions, session.id, session); + await seedObservations(kv, session.id, ["timeouts"]); + + registerConsolidateFunction(sdk as never, kv as never, provider as never); + const pending = sdk.trigger("mem::consolidate", { + project: "agentmemory", + minObservations: 1, + }) as Promise<{ consolidated: number; totalObservations: number }>; + + await vi.advanceTimersByTimeAsync(1_000); + const result = await pending; + + expect(result.totalObservations).toBe(3); + expect(result.consolidated).toBe(0); + expect(provider.compress).toHaveBeenCalledOnce(); + vi.useRealTimers(); + }); + + it("mem::consolidate counts timed-out attempts toward the LLM budget", async () => { + vi.useFakeTimers(); + process.env["AGENTMEMORY_CONSOLIDATION_COMPRESS_TIMEOUT_MS"] = "1000"; + + const sdk = makeMockSdk(); + const kv = makeMockKV(); + const provider: MemoryProvider = { + name: "mock", + compress: vi.fn().mockImplementation(() => new Promise(() => {})), + embed: vi.fn().mockResolvedValue(new Float32Array(384)), + embedBatch: vi.fn().mockResolvedValue([]), + dimensions: 384, + compressionModel: "mock", + }; + + const session = makeSession("sess_budget"); + await kv.set(KV.sessions, session.id, session); + await seedObservations( + kv, + session.id, + Array.from({ length: 12 }, (_, i) => `concept_${i}`), + ); + + registerConsolidateFunction(sdk as never, kv as never, provider as never); + const pending = sdk.trigger("mem::consolidate", { + project: "agentmemory", + minObservations: 1, + }) as Promise<{ consolidated: number; totalObservations: number }>; + + await vi.advanceTimersByTimeAsync(10_000); + const result = await pending; + + expect(result.totalObservations).toBe(36); + expect(result.consolidated).toBe(0); + expect(provider.compress).toHaveBeenCalledTimes(10); + vi.useRealTimers(); + }); +});