From 0917a217c5bec5a1fd1805c9878e7f33cc2e57b8 Mon Sep 17 00:00:00 2001 From: "codex.41 [OpenClaw]" Date: Wed, 18 Mar 2026 15:17:30 +0000 Subject: [PATCH] Fix leaked active leases and restore lease summary fields --- core/clock.ts | 31 +++++++++-- core/test/scenarios.test.ts | 43 +++++++++++++++ core/test/validator.test.ts | 28 ++++++++++ core/validator.ts | 13 +++++ middle/http.ts | 24 +++++--- middle/test/http.test.ts | 44 +++++++++++++++ scripts/recover-leaked-leases.ts | 94 ++++++++++++++++++++++++++++++++ 7 files changed, 265 insertions(+), 12 deletions(-) create mode 100644 scripts/recover-leaked-leases.ts diff --git a/core/clock.ts b/core/clock.ts index f77ff86..4809ff5 100644 --- a/core/clock.ts +++ b/core/clock.ts @@ -8,6 +8,7 @@ import { type Event, type LeaseExpired, type PhaseTransition, + type RetryScheduled, type SystemState, type Task, type TaskExhausted, @@ -18,6 +19,14 @@ function backoffDue(task: Task, now: number): boolean { return task.condition === "retryWait" && task.retryAfter !== null && task.retryAfter <= now; } +function hasActiveAgent(task: Task): boolean { + return typeof task.leasedTo === "string" && task.leasedTo.trim().length > 0; +} + +function malformedActiveLease(task: Task): boolean { + return task.condition === "active" && (!hasActiveAgent(task) || task.leaseExpiresAt === null); +} + function childrenCompleteReady(state: SystemState, task: Task): { allTerminal: boolean; anyDone: boolean } { if (task.children.length === 0) { return { allTerminal: false, anyDone: false }; @@ -39,6 +48,7 @@ function childrenCompleteReady(state: SystemState, task: Task): { allTerminal: b export class CoreClock { private readonly sourceId: string; + private static readonly LEAKED_LEASE_BACKOFF_MS = 1_000; public constructor(sourceId = "core-clock") { this.sourceId = sourceId; @@ -103,11 +113,22 @@ export class CoreClock { } } - if ( - task.condition === "active" && - task.leaseExpiresAt !== null && - task.leaseExpiresAt <= now - ) { + if (malformedActiveLease(task)) { + const retryScheduled: RetryScheduled = { + type: "RetryScheduled", + taskId: task.id, + ts: now, + fenceToken: task.currentFenceToken, + reason: "orphaned_on_restart", + retryAfter: now + CoreClock.LEAKED_LEASE_BACKOFF_MS, + phase: task.phase, + attemptNumber: Math.max(1, task.attempts[task.phase].used), + }; + due.push(retryScheduled); + continue; + } + + if (task.condition === "active" && task.leaseExpiresAt !== null && task.leaseExpiresAt <= now) { const leaseExpired: LeaseExpired = { type: "LeaseExpired", taskId: task.id, diff --git a/core/test/scenarios.test.ts b/core/test/scenarios.test.ts index 1ac82a1..a585171 100644 --- a/core/test/scenarios.test.ts +++ b/core/test/scenarios.test.ts @@ -1405,6 +1405,49 @@ test("Scenario S: LeaseExpired auto-emission transitions leased task to retryWai assert.equal(state.tasks.T1800?.leasedTo, null); }); +test("Scenario S2: malformed active task with missing lease is retried by the clock", () => { + let state = createInitialState(); + state = mustReduce(state, createTask("T1801", 1)); + state = mustReduce(state, lease("T1801", 2, 1, "analysis", "analyst", "a-1801")); + + state.tasks.T1801!.leaseExpiresAt = null; + + const clock = new CoreClock(); + const due = clock.collectDueEvents(state, 10_000); + const retryScheduled = due.find((event) => event.type === "RetryScheduled" && event.taskId === "T1801"); + assert.ok(retryScheduled); + if (!retryScheduled || retryScheduled.type !== "RetryScheduled") { + assert.fail("expected RetryScheduled"); + } + assert.equal(retryScheduled.reason, "orphaned_on_restart"); + + state = mustReduce(state, retryScheduled); + assert.equal(state.tasks.T1801?.condition, "retryWait"); + assert.equal(state.tasks.T1801?.leasedTo, null); + assert.equal(state.tasks.T1801?.leaseExpiresAt, null); +}); + +test("Scenario S3: malformed active task with empty agent is retried by the clock", () => { + let state = createInitialState(); + state = mustReduce(state, createTask("T1802", 1)); + state = mustReduce(state, lease("T1802", 2, 1, "analysis", "analyst", "a-1802")); + + state.tasks.T1802!.leasedTo = ""; + + const clock = new CoreClock(); + const due = clock.collectDueEvents(state, 10_000); + const retryScheduled = due.find((event) => event.type === "RetryScheduled" && event.taskId === "T1802"); + assert.ok(retryScheduled); + if (!retryScheduled || retryScheduled.type !== "RetryScheduled") { + assert.fail("expected RetryScheduled"); + } + assert.equal(retryScheduled.reason, "orphaned_on_restart"); + + state = mustReduce(state, retryScheduled); + assert.equal(state.tasks.T1802?.condition, "retryWait"); + assert.equal(state.tasks.T1802?.leasedTo, null); +}); + test("Scenario T: WaitResolved(block) blocks task and propagates summary to parent", () => { let state = createInitialState(); state = mustReduce(state, createTask("T1900", 1)); diff --git a/core/test/validator.test.ts b/core/test/validator.test.ts index eb27d25..4638cc9 100644 --- a/core/test/validator.test.ts +++ b/core/test/validator.test.ts @@ -63,6 +63,34 @@ test("validator rejects non-monotonic fence token", () => { assert.equal(error.code, "fence_not_monotonic"); }); +test("validator rejects LeaseGranted with empty agent id", () => { + const state = bootstrapState(); + + const invalidLease: Event = { + type: "LeaseGranted", + taskId: "T10", + ts: 2, + fenceToken: 1, + agentId: " ", + phase: "analysis", + leaseTimeout: 60_000, + sessionId: "sess-1", + sessionType: "fresh", + contextBudget: 512, + agentContext: { + sessionId: "sess-1", + agentId: " ", + memoryRef: null, + contextTokens: null, + modelId: "test", + }, + }; + + const error = validateEvent(state, invalidLease); + assert.ok(error); + assert.equal(error.code, "invalid_agent_id"); +}); + test("validator enforces failure summary on TaskFailed", () => { const state = bootstrapState(); diff --git a/core/validator.ts b/core/validator.ts index 5c6d4fd..eb7ec6f 100644 --- a/core/validator.ts +++ b/core/validator.ts @@ -321,6 +321,14 @@ function validateSessionPolicy( return null; } +function validateLeaseAgent(event: Extract): ValidationError | null { + if (!nonEmptyText(event.agentId)) { + return mkError(event, "invalid_agent_id", "LeaseGranted.agentId must be non-empty."); + } + + return null; +} + function validateWaitAction(event: Extract): ValidationError | null { if (event.action === "block" && !validFailureSummary(event.summary)) { return mkError( @@ -493,6 +501,11 @@ export function validateEvent(state: SystemState, event: Event): ValidationError return mkError(event, "invalid_context_budget", "Context budget must be a positive integer."); } + const agentError = validateLeaseAgent(event); + if (agentError) { + return agentError; + } + const attempt = task.attempts[event.phase]; if (attempt.used >= attempt.max) { return mkError(event, "attempt_budget_exhausted", "Attempt budget exhausted for current phase.", { diff --git a/middle/http.ts b/middle/http.ts index 2278a5f..6c471a3 100644 --- a/middle/http.ts +++ b/middle/http.ts @@ -712,11 +712,15 @@ function handleClaimTask( return { status: 409, body: { error: "terminal_task", message: `Task ${taskId} is terminal` } }; } - const agentId = typeof b.agentId === "string" && b.agentId.trim().length > 0 - ? b.agentId.trim() - : typeof b.agent === "string" && b.agent.trim().length > 0 - ? b.agent.trim() - : "unknown"; + const requestedAgentId = typeof b.agentId === "string" + ? b.agentId + : typeof b.agent === "string" + ? b.agent + : null; + if (requestedAgentId !== null && requestedAgentId.trim().length === 0) { + return { status: 400, body: { error: "invalid_agent_id", message: "agentId must be non-empty." } }; + } + const agentId = requestedAgentId?.trim() || "unknown"; const fenceToken = task.currentFenceToken + 1; const sessionId = crypto.randomUUID(); @@ -971,8 +975,11 @@ function handleListTasks( assignee: t.metadata["assignee"] ?? null, reviewer: t.metadata["reviewer"] ?? null, priority: t.metadata["priority"] ?? "medium", + activeAgent: t.leasedTo, + leaseExpiresAt: t.leaseExpiresAt, createdAt: t.createdAt, updatedAt: t.updatedAt, + updatedAtMs: t.updatedAt, })); return { status: 200, body: { tasks: summaries } }; @@ -2900,8 +2907,11 @@ function collectAttentionTasks(core: Core): { (t) => !t.terminal && t.condition === "active" && - t.leaseExpiresAt !== null && - t.leaseExpiresAt <= now, + ( + t.leaseExpiresAt === null || + (typeof t.leasedTo !== "string" || t.leasedTo.trim().length === 0) || + t.leaseExpiresAt <= now + ), ) .map(toSummary); diff --git a/middle/test/http.test.ts b/middle/test/http.test.ts index f9e3ae9..af87f5b 100644 --- a/middle/test/http.test.ts +++ b/middle/test/http.test.ts @@ -220,6 +220,35 @@ describe("HTTP API", () => { assert.equal(body.tasks.length, 2); }); + test("GET /tasks summaries include active agent and lease metadata", async () => { + await request("POST", "/tasks", { + title: "Leased task", + description: "Should expose lease info in summary", + assignee: "coder", + }); + const claimRes = await request("POST", "/tasks/1/claim", { + agentId: "coder", + source: "test", + }); + assert.equal(claimRes.status, 200); + + const res = await request("GET", "/tasks"); + assert.equal(res.status, 200); + const body = res.body as { + tasks: Array<{ + id: string; + activeAgent: string | null; + leaseExpiresAt: number | null; + updatedAtMs: number | null; + }>; + }; + const task = body.tasks.find((entry) => entry.id === "1"); + assert.ok(task); + assert.equal(task.activeAgent, "coder"); + assert.equal(typeof task.leaseExpiresAt, "number"); + assert.equal(task.updatedAtMs !== null && task.updatedAtMs > 0, true); + }); + test("GET /dispatchable lists dispatchable tasks", async () => { await request("POST", "/tasks", { title: "Ready task", @@ -243,6 +272,21 @@ describe("HTTP API", () => { assert.equal(res.status, 404); }); + test("POST /tasks/:id/claim rejects blank agent id", async () => { + await request("POST", "/tasks", { + title: "Claim validation", + description: "Blank claim agent should fail", + }); + + const res = await request("POST", "/tasks/1/claim", { + agentId: " ", + source: "test", + }); + assert.equal(res.status, 400); + const body = res.body as { error: string }; + assert.equal(body.error, "invalid_agent_id"); + }); + test("PATCH /tasks/:id/metadata updates priority", async () => { await request("POST", "/tasks", { title: "Metadata test", diff --git a/scripts/recover-leaked-leases.ts b/scripts/recover-leaked-leases.ts new file mode 100644 index 0000000..781f918 --- /dev/null +++ b/scripts/recover-leaked-leases.ts @@ -0,0 +1,94 @@ +type Phase = "analysis" | "decomposition" | "execution" | "review"; + +type AttemptBudget = { used: number; max: number }; + +type TaskRecord = { + id: string; + title: string; + phase: Phase | null; + condition: string | null; + terminal: string | null; + leasedTo: string | null; + leaseExpiresAt: number | null; + currentFenceToken: number; + attempts: Record; +}; + +type TasksResponse = { tasks: TaskRecord[] }; + +const apiBase = process.env["TASKCORE_BASE_URL"] ?? "http://127.0.0.1:18800"; +const dryRun = process.argv.includes("--dry-run"); +const retryDelayMs = 1_000; + +function nonEmptyText(value: string | null | undefined): value is string { + return typeof value === "string" && value.trim().length > 0; +} + +function leakedLeaseReason(task: TaskRecord, now: number): "lease_expired" | "orphaned_on_restart" { + if (task.leaseExpiresAt !== null && task.leaseExpiresAt <= now) { + return "lease_expired"; + } + return "orphaned_on_restart"; +} + +function attemptNumber(task: TaskRecord, phase: Phase): number { + return Math.max(1, task.attempts[phase]?.used ?? 0); +} + +async function main(): Promise { + const now = Date.now(); + const response = await fetch(`${apiBase}/tasks?full=true`); + if (!response.ok) { + throw new Error(`Failed to fetch tasks: ${response.status} ${response.statusText}`); + } + + const body = await response.json() as TasksResponse; + const leaked = body.tasks.filter((task) => + task.terminal === null && + task.phase !== null && + task.condition === "active" && + (!nonEmptyText(task.leasedTo) || task.leaseExpiresAt === null || task.leaseExpiresAt <= now) + ); + + if (leaked.length === 0) { + process.stdout.write("No leaked active leases found.\n"); + return; + } + + for (const task of leaked) { + const reason = leakedLeaseReason(task, now); + const payload = { + type: "RetryScheduled", + taskId: task.id, + ts: Date.now(), + fenceToken: task.currentFenceToken, + reason, + retryAfter: Date.now() + retryDelayMs, + phase: task.phase, + attemptNumber: attemptNumber(task, task.phase), + }; + + if (dryRun) { + process.stdout.write(`[dry-run] would recover T${task.id} (${task.title}) with ${reason}\n`); + continue; + } + + const recoverResponse = await fetch(`${apiBase}/tasks/${task.id}/events`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(payload), + }); + if (!recoverResponse.ok) { + const errorBody = await recoverResponse.text(); + throw new Error(`Failed to recover T${task.id}: ${recoverResponse.status} ${errorBody}`); + } + + process.stdout.write(`Recovered T${task.id} (${task.title}) with ${reason}\n`); + } +} + +void main().catch((error) => { + const message = error instanceof Error ? error.message : String(error); + process.stderr.write(`${message}\n`); + process.exitCode = 1; +});