Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions core/clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type Event,
type LeaseExpired,
type PhaseTransition,
type RetryScheduled,
type SystemState,
type Task,
type TaskExhausted,
Expand All @@ -18,6 +19,14 @@ function backoffDue(task: Task, now: number): boolean {
return task.condition === "retryWait" && task.retryAfter !== null && task.retryAfter <= now;
}

function hasActiveAgent(task: Task): boolean {
return typeof task.leasedTo === "string" && task.leasedTo.trim().length > 0;
}

function malformedActiveLease(task: Task): boolean {
return task.condition === "active" && (!hasActiveAgent(task) || task.leaseExpiresAt === null);
}

function childrenCompleteReady(state: SystemState, task: Task): { allTerminal: boolean; anyDone: boolean } {
if (task.children.length === 0) {
return { allTerminal: false, anyDone: false };
Expand All @@ -39,6 +48,7 @@ function childrenCompleteReady(state: SystemState, task: Task): { allTerminal: b

export class CoreClock {
private readonly sourceId: string;
private static readonly LEAKED_LEASE_BACKOFF_MS = 1_000;

public constructor(sourceId = "core-clock") {
this.sourceId = sourceId;
Expand Down Expand Up @@ -103,11 +113,22 @@ export class CoreClock {
}
}

if (
task.condition === "active" &&
task.leaseExpiresAt !== null &&
task.leaseExpiresAt <= now
) {
if (malformedActiveLease(task)) {
const retryScheduled: RetryScheduled = {
type: "RetryScheduled",
taskId: task.id,
ts: now,
fenceToken: task.currentFenceToken,
reason: "orphaned_on_restart",
retryAfter: now + CoreClock.LEAKED_LEASE_BACKOFF_MS,
phase: task.phase,
attemptNumber: Math.max(1, task.attempts[task.phase].used),
};
due.push(retryScheduled);
continue;
}

if (task.condition === "active" && task.leaseExpiresAt !== null && task.leaseExpiresAt <= now) {
const leaseExpired: LeaseExpired = {
type: "LeaseExpired",
taskId: task.id,
Expand Down
43 changes: 43 additions & 0 deletions core/test/scenarios.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,49 @@ test("Scenario S: LeaseExpired auto-emission transitions leased task to retryWai
assert.equal(state.tasks.T1800?.leasedTo, null);
});

test("Scenario S2: malformed active task with missing lease is retried by the clock", () => {
let state = createInitialState();
state = mustReduce(state, createTask("T1801", 1));
state = mustReduce(state, lease("T1801", 2, 1, "analysis", "analyst", "a-1801"));

state.tasks.T1801!.leaseExpiresAt = null;

const clock = new CoreClock();
const due = clock.collectDueEvents(state, 10_000);
const retryScheduled = due.find((event) => event.type === "RetryScheduled" && event.taskId === "T1801");
assert.ok(retryScheduled);
if (!retryScheduled || retryScheduled.type !== "RetryScheduled") {
assert.fail("expected RetryScheduled");
}
assert.equal(retryScheduled.reason, "orphaned_on_restart");

state = mustReduce(state, retryScheduled);
assert.equal(state.tasks.T1801?.condition, "retryWait");
assert.equal(state.tasks.T1801?.leasedTo, null);
assert.equal(state.tasks.T1801?.leaseExpiresAt, null);
});

test("Scenario S3: malformed active task with empty agent is retried by the clock", () => {
let state = createInitialState();
state = mustReduce(state, createTask("T1802", 1));
state = mustReduce(state, lease("T1802", 2, 1, "analysis", "analyst", "a-1802"));

state.tasks.T1802!.leasedTo = "";

const clock = new CoreClock();
const due = clock.collectDueEvents(state, 10_000);
const retryScheduled = due.find((event) => event.type === "RetryScheduled" && event.taskId === "T1802");
assert.ok(retryScheduled);
if (!retryScheduled || retryScheduled.type !== "RetryScheduled") {
assert.fail("expected RetryScheduled");
}
assert.equal(retryScheduled.reason, "orphaned_on_restart");

state = mustReduce(state, retryScheduled);
assert.equal(state.tasks.T1802?.condition, "retryWait");
assert.equal(state.tasks.T1802?.leasedTo, null);
});

test("Scenario T: WaitResolved(block) blocks task and propagates summary to parent", () => {
let state = createInitialState();
state = mustReduce(state, createTask("T1900", 1));
Expand Down
28 changes: 28 additions & 0 deletions core/test/validator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,34 @@ test("validator rejects non-monotonic fence token", () => {
assert.equal(error.code, "fence_not_monotonic");
});

test("validator rejects LeaseGranted with empty agent id", () => {
const state = bootstrapState();

const invalidLease: Event = {
type: "LeaseGranted",
taskId: "T10",
ts: 2,
fenceToken: 1,
agentId: " ",
phase: "analysis",
leaseTimeout: 60_000,
sessionId: "sess-1",
sessionType: "fresh",
contextBudget: 512,
agentContext: {
sessionId: "sess-1",
agentId: " ",
memoryRef: null,
contextTokens: null,
modelId: "test",
},
};

const error = validateEvent(state, invalidLease);
assert.ok(error);
assert.equal(error.code, "invalid_agent_id");
});

test("validator enforces failure summary on TaskFailed", () => {
const state = bootstrapState();

Expand Down
13 changes: 13 additions & 0 deletions core/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,14 @@ function validateSessionPolicy(
return null;
}

function validateLeaseAgent(event: Extract<Event, { type: "LeaseGranted" }>): ValidationError | null {
if (!nonEmptyText(event.agentId)) {
return mkError(event, "invalid_agent_id", "LeaseGranted.agentId must be non-empty.");
}

return null;
}

function validateWaitAction(event: Extract<Event, { type: "WaitResolved" }>): ValidationError | null {
if (event.action === "block" && !validFailureSummary(event.summary)) {
return mkError(
Expand Down Expand Up @@ -493,6 +501,11 @@ export function validateEvent(state: SystemState, event: Event): ValidationError
return mkError(event, "invalid_context_budget", "Context budget must be a positive integer.");
}

const agentError = validateLeaseAgent(event);
if (agentError) {
return agentError;
}

const attempt = task.attempts[event.phase];
if (attempt.used >= attempt.max) {
return mkError(event, "attempt_budget_exhausted", "Attempt budget exhausted for current phase.", {
Expand Down
24 changes: 17 additions & 7 deletions middle/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,11 +712,15 @@ function handleClaimTask(
return { status: 409, body: { error: "terminal_task", message: `Task ${taskId} is terminal` } };
}

const agentId = typeof b.agentId === "string" && b.agentId.trim().length > 0
? b.agentId.trim()
: typeof b.agent === "string" && b.agent.trim().length > 0
? b.agent.trim()
: "unknown";
const requestedAgentId = typeof b.agentId === "string"
? b.agentId
: typeof b.agent === "string"
? b.agent
: null;
if (requestedAgentId !== null && requestedAgentId.trim().length === 0) {
return { status: 400, body: { error: "invalid_agent_id", message: "agentId must be non-empty." } };
}
const agentId = requestedAgentId?.trim() || "unknown";

const fenceToken = task.currentFenceToken + 1;
const sessionId = crypto.randomUUID();
Expand Down Expand Up @@ -971,8 +975,11 @@ function handleListTasks(
assignee: t.metadata["assignee"] ?? null,
reviewer: t.metadata["reviewer"] ?? null,
priority: t.metadata["priority"] ?? "medium",
activeAgent: t.leasedTo,
leaseExpiresAt: t.leaseExpiresAt,
createdAt: t.createdAt,
updatedAt: t.updatedAt,
updatedAtMs: t.updatedAt,
}));

return { status: 200, body: { tasks: summaries } };
Expand Down Expand Up @@ -2900,8 +2907,11 @@ function collectAttentionTasks(core: Core): {
(t) =>
!t.terminal &&
t.condition === "active" &&
t.leaseExpiresAt !== null &&
t.leaseExpiresAt <= now,
(
t.leaseExpiresAt === null ||
(typeof t.leasedTo !== "string" || t.leasedTo.trim().length === 0) ||
t.leaseExpiresAt <= now
),
)
.map(toSummary);

Expand Down
44 changes: 44 additions & 0 deletions middle/test/http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,35 @@ describe("HTTP API", () => {
assert.equal(body.tasks.length, 2);
});

test("GET /tasks summaries include active agent and lease metadata", async () => {
await request("POST", "/tasks", {
title: "Leased task",
description: "Should expose lease info in summary",
assignee: "coder",
});
const claimRes = await request("POST", "/tasks/1/claim", {
agentId: "coder",
source: "test",
});
assert.equal(claimRes.status, 200);

const res = await request("GET", "/tasks");
assert.equal(res.status, 200);
const body = res.body as {
tasks: Array<{
id: string;
activeAgent: string | null;
leaseExpiresAt: number | null;
updatedAtMs: number | null;
}>;
};
const task = body.tasks.find((entry) => entry.id === "1");
assert.ok(task);
assert.equal(task.activeAgent, "coder");
assert.equal(typeof task.leaseExpiresAt, "number");
assert.equal(task.updatedAtMs !== null && task.updatedAtMs > 0, true);
});

test("GET /dispatchable lists dispatchable tasks", async () => {
await request("POST", "/tasks", {
title: "Ready task",
Expand All @@ -243,6 +272,21 @@ describe("HTTP API", () => {
assert.equal(res.status, 404);
});

test("POST /tasks/:id/claim rejects blank agent id", async () => {
await request("POST", "/tasks", {
title: "Claim validation",
description: "Blank claim agent should fail",
});

const res = await request("POST", "/tasks/1/claim", {
agentId: " ",
source: "test",
});
assert.equal(res.status, 400);
const body = res.body as { error: string };
assert.equal(body.error, "invalid_agent_id");
});

test("PATCH /tasks/:id/metadata updates priority", async () => {
await request("POST", "/tasks", {
title: "Metadata test",
Expand Down
94 changes: 94 additions & 0 deletions scripts/recover-leaked-leases.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
type Phase = "analysis" | "decomposition" | "execution" | "review";

type AttemptBudget = { used: number; max: number };

type TaskRecord = {
id: string;
title: string;
phase: Phase | null;
condition: string | null;
terminal: string | null;
leasedTo: string | null;
leaseExpiresAt: number | null;
currentFenceToken: number;
attempts: Record<Phase, AttemptBudget>;
};

type TasksResponse = { tasks: TaskRecord[] };

const apiBase = process.env["TASKCORE_BASE_URL"] ?? "http://127.0.0.1:18800";
const dryRun = process.argv.includes("--dry-run");
const retryDelayMs = 1_000;

function nonEmptyText(value: string | null | undefined): value is string {
return typeof value === "string" && value.trim().length > 0;
}

function leakedLeaseReason(task: TaskRecord, now: number): "lease_expired" | "orphaned_on_restart" {
if (task.leaseExpiresAt !== null && task.leaseExpiresAt <= now) {
return "lease_expired";
}
return "orphaned_on_restart";
}

function attemptNumber(task: TaskRecord, phase: Phase): number {
return Math.max(1, task.attempts[phase]?.used ?? 0);
}

async function main(): Promise<void> {
const now = Date.now();
const response = await fetch(`${apiBase}/tasks?full=true`);
if (!response.ok) {
throw new Error(`Failed to fetch tasks: ${response.status} ${response.statusText}`);
}

const body = await response.json() as TasksResponse;
const leaked = body.tasks.filter((task) =>
task.terminal === null &&
task.phase !== null &&
task.condition === "active" &&
(!nonEmptyText(task.leasedTo) || task.leaseExpiresAt === null || task.leaseExpiresAt <= now)
);

if (leaked.length === 0) {
process.stdout.write("No leaked active leases found.\n");
return;
}

for (const task of leaked) {
const reason = leakedLeaseReason(task, now);
const payload = {
type: "RetryScheduled",
taskId: task.id,
ts: Date.now(),
fenceToken: task.currentFenceToken,
reason,
retryAfter: Date.now() + retryDelayMs,
phase: task.phase,
attemptNumber: attemptNumber(task, task.phase),
};

if (dryRun) {
process.stdout.write(`[dry-run] would recover T${task.id} (${task.title}) with ${reason}\n`);
continue;
}

const recoverResponse = await fetch(`${apiBase}/tasks/${task.id}/events`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload),
});
if (!recoverResponse.ok) {
const errorBody = await recoverResponse.text();
throw new Error(`Failed to recover T${task.id}: ${recoverResponse.status} ${errorBody}`);
}

process.stdout.write(`Recovered T${task.id} (${task.title}) with ${reason}\n`);
}
}

void main().catch((error) => {
const message = error instanceof Error ? error.message : String(error);
process.stderr.write(`${message}\n`);
process.exitCode = 1;
});
Loading