diff --git a/src/components/fragments/FragmentEditor.tsx b/src/components/fragments/FragmentEditor.tsx index 2acf5932..6131f8a4 100644 --- a/src/components/fragments/FragmentEditor.tsx +++ b/src/components/fragments/FragmentEditor.tsx @@ -134,6 +134,8 @@ export function FragmentEditor({ }) if (fType === 'prose') { promises.push(queryClient.invalidateQueries({ queryKey: ['proseChain', storyId] })) + // Prose edits trigger re-analysis, so refresh the freshness indicator's index. + promises.push(queryClient.invalidateQueries({ queryKey: ['librarian-analysis-index', storyId] })) } if (fragment?.id) { promises.push(queryClient.invalidateQueries({ queryKey: ['fragment', storyId, fragment.id] })) @@ -206,6 +208,7 @@ export function FragmentEditor({ queryClient.invalidateQueries({ queryKey: ['fragments-archived', storyId] }) if (fType === 'prose') { queryClient.invalidateQueries({ queryKey: ['proseChain', storyId] }) + queryClient.invalidateQueries({ queryKey: ['librarian-analysis-index', storyId] }) } setSaveStatus('saved') if (savedStatusTimerRef.current) clearTimeout(savedStatusTimerRef.current) diff --git a/src/server/agents/create-streaming-runner.ts b/src/server/agents/create-streaming-runner.ts index 81d54edb..58475c10 100644 --- a/src/server/agents/create-streaming-runner.ts +++ b/src/server/agents/create-streaming-runner.ts @@ -17,6 +17,7 @@ import { createFragmentTools } from '../llm/tools' import { reportUsage } from '../llm/token-tracker' import { createLogger } from '../logging' import { createEventStream } from './create-event-stream' +import { holdLibrarianAnalysis } from '../librarian/scheduler' import { compileAgentContext, type CompiledAgentContext } from './compile-agent-context' import { withBranch } from '../fragments/branches' @@ -195,9 +196,14 @@ export function createStreamingRunner {} const streamResult = createEventStream(result.fullStream) + void streamResult.completion.then(releaseAnalysis, releaseAnalysis) // 12. Track token usage after stream completes streamResult.completion.then(async () => { diff --git a/src/server/librarian/chat.ts b/src/server/librarian/chat.ts index 98b91904..73860048 100644 --- a/src/server/librarian/chat.ts +++ b/src/server/librarian/chat.ts @@ -8,6 +8,7 @@ import { pluginRegistry } from '../plugins/registry' import { collectPluginTools } from '../plugins/tools' import { createLogger } from '../logging' import { createEventStream } from '../agents/create-event-stream' +import { holdLibrarianAnalysis } from './scheduler' import { compileAgentContext } from '../agents/compile-agent-context' import { createAgentInstance } from '../agents/agent-instance' import { getFragmentsByTag } from '../fragments/associations' @@ -195,10 +196,13 @@ async function librarianChatInner( })), ] - // Stream with write tools + // Stream with write tools. Hold analysis so multi-step prose edits analyze once on the + // final state, not per edit (see holdLibrarianAnalysis). const result = await chatAgent.stream({ messages: aiMessages, }) - - return createEventStream(result.fullStream) + const releaseAnalysis = holdLibrarianAnalysis(storyId) + const stream = createEventStream(result.fullStream) + void stream.completion.then(releaseAnalysis, releaseAnalysis) + return stream } diff --git a/src/server/librarian/scheduler.ts b/src/server/librarian/scheduler.ts index 25dbd7fb..b5afc6f7 100644 --- a/src/server/librarian/scheduler.ts +++ b/src/server/librarian/scheduler.ts @@ -1,11 +1,24 @@ import type { Fragment } from '../fragments/schema' -import { invokeAgent } from '../agents' import { createLogger } from '../logging' import { getActiveBranchId, withBranch } from '../fragments/branches' +import { clearAnalysisIndexEntry } from './storage' -const pending = new Map>() +interface QueuedRun { + dataDir: string + fragment: Fragment + branchId: string +} + +interface SchedulerState { + running: boolean + /** Latest trigger that arrived while a run was in flight or held; supersedes earlier ones. */ + queued: QueuedRun | null +} + +const scheduler = new Map() const runtimeStatus = new Map() -const DEBOUNCE_MS = 2000 +/** Per-story count of active agent runs that defer analysis until they finish. */ +const holds = new Map() const logger = createLogger('librarian') export type LibrarianRunStatus = 'idle' | 'scheduled' | 'running' | 'error' @@ -37,6 +50,11 @@ function setRuntimeStatus(storyId: string, patch: Partial { const requestLogger = logger.child({ storyId }) - // Capture the active branch NOW, before the debounce delay + // Capture the active branch at trigger time, before any in-flight run can switch it. const branchId = await getActiveBranchId(dataDir, storyId) - // Clear any pending run for this story - const existing = pending.get(storyId) - if (existing) { - requestLogger.debug('Cancelling pending librarian run') - clearTimeout(existing) + const state = scheduler.get(storyId) ?? { running: false, queued: null } + scheduler.set(storyId, state) + + const held = (holds.get(storyId) ?? 0) > 0 + if (state.running || held) { + requestLogger.debug('Deferring re-analysis', { fragmentId: fragment.id, branchId, reason: held ? 'held' : 'running' }) + state.queued = { dataDir, fragment, branchId } + setRuntimeStatus(storyId, { + runStatus: state.running ? 'running' : 'scheduled', + pendingFragmentId: fragment.id, + }) + return } - // Schedule a new run - requestLogger.info('Scheduling librarian run', { fragmentId: fragment.id, branchId, debounceMs: DEBOUNCE_MS }) + state.running = true + void runAnalysis(dataDir, storyId, fragment, branchId) +} + +async function runAnalysis( + dataDir: string, + storyId: string, + fragment: Fragment, + branchId: string, +): Promise { + const requestLogger = logger.child({ storyId }) setRuntimeStatus(storyId, { - runStatus: 'scheduled', - pendingFragmentId: fragment.id, - runningFragmentId: null, + runStatus: 'running', + pendingFragmentId: null, + runningFragmentId: fragment.id, lastError: null, }) - pending.set( - storyId, - setTimeout(async () => { - pending.delete(storyId) - setRuntimeStatus(storyId, { - runStatus: 'running', - pendingFragmentId: null, - runningFragmentId: fragment.id, - }) - try { - requestLogger.info('Starting librarian analysis...', { fragmentId: fragment.id, branchId }) - const startTime = Date.now() - await withBranch(dataDir, storyId, () => invokeAgent({ - dataDir, - storyId, - agentName: 'librarian.analyze', - input: { fragmentId: fragment.id }, - }), branchId) - const durationMs = Date.now() - startTime - requestLogger.info('Librarian analysis completed', { fragmentId: fragment.id, durationMs }) - setRuntimeStatus(storyId, { - runStatus: 'idle', - pendingFragmentId: null, - runningFragmentId: null, - lastError: null, - }) - } catch (err) { - const errorMessage = err instanceof Error ? err.message : String(err) - requestLogger.error('Librarian analysis failed', { - fragmentId: fragment.id, - error: errorMessage, - }) - setRuntimeStatus(storyId, { - runStatus: 'error', - pendingFragmentId: null, - runningFragmentId: null, - lastError: errorMessage, - }) - } - }, DEBOUNCE_MS), - ) + + let lastError: string | null = null + try { + requestLogger.info('Starting librarian analysis...', { fragmentId: fragment.id, branchId }) + const startTime = Date.now() + // Imported lazily: the agents runtime cycles back through the llm tools, and + // it's only needed here at run time. + const { invokeAgent } = await import('../agents') + await withBranch(dataDir, storyId, () => invokeAgent({ + dataDir, + storyId, + agentName: 'librarian.analyze', + input: { fragmentId: fragment.id }, + }), branchId) + requestLogger.info('Librarian analysis completed', { fragmentId: fragment.id, durationMs: Date.now() - startTime }) + } catch (err) { + lastError = err instanceof Error ? err.message : String(err) + requestLogger.error('Librarian analysis failed', { fragmentId: fragment.id, error: lastError }) + } + + // Drain the latest queued trigger (no idle flicker between coalesced runs, so the UI's + // running → idle/error edge fires once). A held trigger waits for its release to flush. + const state = scheduler.get(storyId) + const held = (holds.get(storyId) ?? 0) > 0 + if (state?.queued && !held) { + const next = state.queued + state.queued = null + void runAnalysis(next.dataDir, storyId, next.fragment, next.branchId) + return + } + if (state) state.running = false + setRuntimeStatus(storyId, { + runStatus: lastError ? 'error' : state?.queued ? 'scheduled' : 'idle', + pendingFragmentId: state?.queued?.fragment.id ?? null, + runningFragmentId: null, + lastError, + }) } -/** Clear all pending timers (useful for tests) */ +/** + * Suspend librarian analysis for a story while an agent run edits it, returning a release + * function. Otherwise a run editing prose across several tool steps kicks off (and then + * supersedes) a full analysis per step; deferring to run end collapses that to a single + * analysis of the final state. Refcounted for concurrent runs. + */ +export function holdLibrarianAnalysis(storyId: string): () => void { + holds.set(storyId, (holds.get(storyId) ?? 0) + 1) + let released = false + return () => { + if (released) return + released = true + const remaining = (holds.get(storyId) ?? 1) - 1 + if (remaining > 0) { + holds.set(storyId, remaining) + return + } + holds.delete(storyId) + const state = scheduler.get(storyId) + if (state && !state.running && state.queued) { + const next = state.queued + state.queued = null + state.running = true + void runAnalysis(next.dataDir, storyId, next.fragment, next.branchId) + } + } +} + +function hasMaterialProseChange(before: Fragment, after: Fragment): boolean { + return before.name !== after.name + || before.description !== after.description + || before.content !== after.content +} + +/** + * Schedule librarian re-analysis after a prose fragment changes, from any code path + * (HTTP route or librarian tool). No-ops for non-prose or immaterial changes; marks the + * analysis stale for the UI indicator and schedules the run. + */ +export function reanalyzeAfterProseChange( + dataDir: string, + storyId: string, + before: Fragment, + after: Fragment, +): void { + if (after.type !== 'prose' || !hasMaterialProseChange(before, after)) return + clearAnalysisIndexEntry(dataDir, storyId, after.id).catch(() => {}) + Promise.resolve(triggerLibrarian(dataDir, storyId, after)).catch((err) => { + logger.child({ storyId }).error('triggerLibrarian failed after prose change', { + fragmentId: after.id, + error: err instanceof Error ? err.message : String(err), + }) + }) +} + +/** Reset all scheduler bookkeeping (useful for tests). Does not abort an in-flight run. */ export function clearPending(): void { - for (const [storyId, timer] of pending.entries()) { - clearTimeout(timer) + for (const [storyId, state] of scheduler.entries()) { + state.queued = null + state.running = false setRuntimeStatus(storyId, { runStatus: 'idle', pendingFragmentId: null, runningFragmentId: null, }) } - pending.clear() + scheduler.clear() + holds.clear() } -/** Get the number of pending runs (useful for tests) */ +/** Number of stories with a running or queued analysis (useful for tests). */ export function getPendingCount(): number { - return pending.size + let count = 0 + for (const state of scheduler.values()) { + if (state.running || state.queued) count++ + } + return count } export function getLibrarianRuntimeStatus(storyId: string): LibrarianRuntimeStatus { diff --git a/src/server/llm/tools.ts b/src/server/llm/tools.ts index 16f0ac3f..b94a3682 100644 --- a/src/server/llm/tools.ts +++ b/src/server/llm/tools.ts @@ -16,6 +16,7 @@ import { createLogger } from '../logging' import type { Fragment } from '../fragments/schema' import { generateFragmentId } from '@/lib/fragment-ids' import { checkFragmentWrite, isFragmentLocked } from '../fragments/protection' +import { reanalyzeAfterProseChange } from '../librarian/scheduler' import { capitalize, pluralize } from './agents' const logger = createLogger('llm-tools') @@ -313,6 +314,7 @@ export function createFragmentTools( if (!updated) { return { error: `Fragment not found: ${fragmentId}` } } + reanalyzeAfterProseChange(dataDir, storyId, fragment, updated) return { ok: true, id: fragmentId } }), }) @@ -346,6 +348,7 @@ export function createFragmentTools( if (!updated) { return { error: `Fragment not found: ${fragmentId}` } } + reanalyzeAfterProseChange(dataDir, storyId, fragment, updated) return { ok: true, id: fragmentId } }), }) @@ -403,6 +406,7 @@ export function createFragmentTools( updatedAt: new Date().toISOString(), } await updateFragment(dataDir, storyId, updated) + reanalyzeAfterProseChange(dataDir, storyId, f, updated) edited.push(f.id) } } diff --git a/src/server/routes/fragments.ts b/src/server/routes/fragments.ts index a991b7a2..abe9139a 100644 --- a/src/server/routes/fragments.ts +++ b/src/server/routes/fragments.ts @@ -22,22 +22,12 @@ import { } from '../fragments/associations' import { generateFragmentId } from '@/lib/fragment-ids' import { registry } from '../fragments/registry' -import { triggerLibrarian } from '../librarian/scheduler' -import { clearAnalysisIndexEntry } from '../librarian/storage' -import { createLogger } from '../logging' +import { reanalyzeAfterProseChange } from '../librarian/scheduler' import { installFragmentBundle } from '../erratanet/pack-install' import type { Fragment } from '../fragments/schema' import type { FragmentBundleData } from '@/lib/fragment-clipboard' -function hasMaterialProseChange(before: Fragment, after: Fragment): boolean { - return before.name !== after.name - || before.description !== after.description - || before.content !== after.content -} - export function fragmentRoutes(dataDir: string) { - const logger = createLogger('api:fragments', { dataDir }) - return new Elysia({ detail: { tags: ['Fragments'] } }) .post('/stories/:storyId/fragments', async ({ params, body, set }) => { const story = await getStory(dataDir, params.storyId) @@ -141,7 +131,6 @@ export function fragmentRoutes(dataDir: string) { }, { detail: { summary: 'Get a fragment by ID' } }) .put('/stories/:storyId/fragments/:fragmentId', async ({ params, body, set }) => { - const requestLogger = logger.child({ storyId: params.storyId, extra: { fragmentId: params.fragmentId } }) const existing = await getFragment( dataDir, params.storyId, @@ -176,14 +165,7 @@ export function fragmentRoutes(dataDir: string) { } await updateFragment(dataDir, params.storyId, updated) - if (existing.type === 'prose' && hasMaterialProseChange(existing, updated)) { - clearAnalysisIndexEntry(dataDir, params.storyId, updated.id).catch(() => {}) - Promise.resolve(triggerLibrarian(dataDir, params.storyId, updated)).catch((err) => { - requestLogger.error('triggerLibrarian failed after prose update', { - error: err instanceof Error ? err.message : String(err), - }) - }) - } + reanalyzeAfterProseChange(dataDir, params.storyId, existing, updated) return updated }, { @@ -200,7 +182,6 @@ export function fragmentRoutes(dataDir: string) { }) .patch('/stories/:storyId/fragments/:fragmentId', async ({ params, body, set }) => { - const requestLogger = logger.child({ storyId: params.storyId, extra: { fragmentId: params.fragmentId } }) const existing = await getFragment( dataDir, params.storyId, @@ -223,14 +204,7 @@ export function fragmentRoutes(dataDir: string) { return { error: 'Fragment not found' } } - if (existing.type === 'prose' && hasMaterialProseChange(existing, updated)) { - clearAnalysisIndexEntry(dataDir, params.storyId, updated.id).catch(() => {}) - Promise.resolve(triggerLibrarian(dataDir, params.storyId, updated)).catch((err) => { - requestLogger.error('triggerLibrarian failed after prose edit', { - error: err instanceof Error ? err.message : String(err), - }) - }) - } + reanalyzeAfterProseChange(dataDir, params.storyId, existing, updated) return updated }, { diff --git a/tests/api/prose-edit-reanalysis.test.ts b/tests/api/prose-edit-reanalysis.test.ts index 05b6f46e..2f1e758c 100644 --- a/tests/api/prose-edit-reanalysis.test.ts +++ b/tests/api/prose-edit-reanalysis.test.ts @@ -1,21 +1,16 @@ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' import { createTempDir } from '../setup' -vi.mock('@/server/librarian/scheduler', () => ({ - triggerLibrarian: vi.fn().mockResolvedValue(undefined), - getLibrarianRuntimeStatus: vi.fn(() => ({ - runStatus: 'idle', - pendingFragmentId: null, - runningFragmentId: null, - lastError: null, - updatedAt: new Date().toISOString(), - })), +// Analysis ultimately fires through invokeAgent; stub it so the real gating runs without an LLM +vi.mock('@/server/agents', () => ({ + invokeAgent: vi.fn().mockResolvedValue(undefined), })) -import { triggerLibrarian } from '@/server/librarian/scheduler' +import { invokeAgent } from '@/server/agents' +import { clearPending } from '@/server/librarian/scheduler' import { createApp } from '@/server/api' -const mockedTriggerLibrarian = vi.mocked(triggerLibrarian) +const mockedInvokeAgent = vi.mocked(invokeAgent) let dataDir: string let cleanup: () => Promise @@ -26,10 +21,12 @@ beforeEach(async () => { dataDir = tmp.path cleanup = tmp.cleanup app = createApp(dataDir) - mockedTriggerLibrarian.mockClear() + clearPending() + mockedInvokeAgent.mockClear() }) afterEach(async () => { + clearPending() await cleanup() }) @@ -71,12 +68,13 @@ describe('prose edit reanalysis trigger', () => { }, 'PUT') expect(res.status).toBe(200) - expect(mockedTriggerLibrarian).toHaveBeenCalledTimes(1) - expect(mockedTriggerLibrarian).toHaveBeenCalledWith( + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(1)) + expect(mockedInvokeAgent).toHaveBeenCalledWith({ dataDir, storyId, - expect.objectContaining({ id: fragmentId, type: 'prose' }), - ) + agentName: 'librarian.analyze', + input: { fragmentId }, + }) }) it('does not trigger librarian on prose PUT when only sticky changes', async () => { @@ -90,7 +88,7 @@ describe('prose edit reanalysis trigger', () => { }, 'PUT') expect(res.status).toBe(200) - expect(mockedTriggerLibrarian).not.toHaveBeenCalled() + expect(mockedInvokeAgent).not.toHaveBeenCalled() }) it('triggers librarian on prose PATCH when text replacement changes content', async () => { @@ -102,12 +100,13 @@ describe('prose edit reanalysis trigger', () => { }, 'PATCH') expect(res.status).toBe(200) - expect(mockedTriggerLibrarian).toHaveBeenCalledTimes(1) - expect(mockedTriggerLibrarian).toHaveBeenCalledWith( + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(1)) + expect(mockedInvokeAgent).toHaveBeenCalledWith({ dataDir, storyId, - expect.objectContaining({ id: fragmentId, type: 'prose' }), - ) + agentName: 'librarian.analyze', + input: { fragmentId }, + }) }) it('does not trigger librarian on prose PATCH when replacement is a no-op', async () => { @@ -119,7 +118,7 @@ describe('prose edit reanalysis trigger', () => { }, 'PATCH') expect(res.status).toBe(200) - expect(mockedTriggerLibrarian).not.toHaveBeenCalled() + expect(mockedInvokeAgent).not.toHaveBeenCalled() }) it('does not trigger librarian on non-prose PUT updates', async () => { @@ -138,6 +137,6 @@ describe('prose edit reanalysis trigger', () => { }, 'PUT') expect(res.status).toBe(200) - expect(mockedTriggerLibrarian).not.toHaveBeenCalled() + expect(mockedInvokeAgent).not.toHaveBeenCalled() }) }) diff --git a/tests/librarian/scheduler.test.ts b/tests/librarian/scheduler.test.ts index 9f333db1..b0181b6b 100644 --- a/tests/librarian/scheduler.test.ts +++ b/tests/librarian/scheduler.test.ts @@ -5,15 +5,20 @@ vi.mock('@/server/agents', () => ({ invokeAgent: vi.fn(), })) -// Mock the branches module — scheduler now resolves branch before debounce +// Mock the branches module — scheduler resolves the active branch before running vi.mock('@/server/fragments/branches', () => ({ getActiveBranchId: vi.fn().mockResolvedValue('main'), withBranch: vi.fn((_dataDir: string, _storyId: string, fn: () => Promise, _branchId?: string) => fn()), })) +// Mock librarian storage so reanalyzeAfterProseChange's index clear doesn't touch disk +vi.mock('@/server/librarian/storage', () => ({ + clearAnalysisIndexEntry: vi.fn(() => Promise.resolve()), +})) + // Import mocked modules AFTER vi.mock (vitest hoists mocks to top) import { invokeAgent } from '@/server/agents' -import { triggerLibrarian, clearPending, getPendingCount, getLibrarianRuntimeStatus } from '@/server/librarian/scheduler' +import { triggerLibrarian, reanalyzeAfterProseChange, holdLibrarianAnalysis, clearPending, getPendingCount, getLibrarianRuntimeStatus } from '@/server/librarian/scheduler' import type { Fragment } from '@/server/fragments/schema' const mockedInvokeAgent = vi.mocked(invokeAgent) @@ -37,65 +42,74 @@ function makeFragment(id: string): Fragment { } } +function analysisResult() { + return { + runId: 'ar-test', + output: { + id: 'la-test', + createdAt: new Date().toISOString(), + fragmentId: 'pr-0001', + summaryUpdate: '', + mentionedCharacters: [], + contradictions: [], + fragmentSuggestions: [], + timelineEvents: [], + }, + trace: [], + activityId: 'act-test', + } +} + +/** A controllable in-flight run: resolves only when the returned trigger is called. */ +function deferredRun() { + let release!: () => void + mockedInvokeAgent.mockImplementationOnce( + () => new Promise((resolve) => { release = () => resolve(analysisResult()) }), + ) + return () => release() +} + describe('librarian scheduler', () => { beforeEach(() => { - vi.useFakeTimers() vi.clearAllMocks() clearPending() - mockedInvokeAgent.mockResolvedValue({ - runId: 'ar-test', - output: { - id: 'la-test', - createdAt: new Date().toISOString(), - fragmentId: 'pr-0001', - summaryUpdate: '', - mentionedCharacters: [], - contradictions: [], - fragmentSuggestions: [], - timelineEvents: [], - }, - trace: [], - activityId: 'act-test', - }) + mockedInvokeAgent.mockResolvedValue(analysisResult()) }) afterEach(() => { clearPending() - vi.useRealTimers() }) - it('triggers librarian analyze agent after debounce period', async () => { + it('runs the analyze agent immediately, without a debounce delay', async () => { await triggerLibrarian('/data', 'story-1', makeFragment('pr-0001')) - expect(mockedInvokeAgent).not.toHaveBeenCalled() - expect(getPendingCount()).toBe(1) - expect(getLibrarianRuntimeStatus('story-1').runStatus).toBe('scheduled') - - await vi.advanceTimersByTimeAsync(2000) - - expect(mockedInvokeAgent).toHaveBeenCalledTimes(1) + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(1)) expect(mockedInvokeAgent).toHaveBeenCalledWith({ dataDir: '/data', storyId: 'story-1', agentName: 'librarian.analyze', input: { fragmentId: 'pr-0001' }, }) + await vi.waitFor(() => expect(getLibrarianRuntimeStatus('story-1').runStatus).toBe('idle')) expect(getPendingCount()).toBe(0) - expect(getLibrarianRuntimeStatus('story-1').runStatus).toBe('idle') }) - it('debounces multiple rapid triggers for the same story', async () => { + it('coalesces triggers that arrive while a run is in flight (latest wins)', async () => { + const release = deferredRun() + await triggerLibrarian('/data', 'story-1', makeFragment('pr-0001')) + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(1)) + + // These arrive while the first run is still in flight; only the last survives. await triggerLibrarian('/data', 'story-1', makeFragment('pr-0002')) await triggerLibrarian('/data', 'story-1', makeFragment('pr-0003')) - + expect(mockedInvokeAgent).toHaveBeenCalledTimes(1) expect(getPendingCount()).toBe(1) - await vi.advanceTimersByTimeAsync(2000) + release() - // Only the last fragment should be used - expect(mockedInvokeAgent).toHaveBeenCalledTimes(1) - expect(mockedInvokeAgent).toHaveBeenCalledWith({ + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(2)) + expect(mockedInvokeAgent).toHaveBeenLastCalledWith({ dataDir: '/data', storyId: 'story-1', agentName: 'librarian.analyze', @@ -107,11 +121,7 @@ describe('librarian scheduler', () => { await triggerLibrarian('/data', 'story-1', makeFragment('pr-0001')) await triggerLibrarian('/data', 'story-2', makeFragment('pr-0002')) - expect(getPendingCount()).toBe(2) - - await vi.advanceTimersByTimeAsync(2000) - - expect(mockedInvokeAgent).toHaveBeenCalledTimes(2) + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(2)) expect(mockedInvokeAgent).toHaveBeenCalledWith({ dataDir: '/data', storyId: 'story-1', @@ -130,30 +140,108 @@ describe('librarian scheduler', () => { const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) mockedInvokeAgent.mockRejectedValue(new Error('LLM failed')) + // Should not throw await triggerLibrarian('/data', 'story-1', makeFragment('pr-0001')) - // Should not throw - await vi.advanceTimersByTimeAsync(2000) - - // Should log error with new structured format - expect(consoleSpy).toHaveBeenCalled() - const errorCall = consoleSpy.mock.calls.find(call => - call[0]?.includes && call[0].includes('Librarian analysis failed') - ) - expect(errorCall).toBeDefined() + await vi.waitFor(() => { + const errorCall = consoleSpy.mock.calls.find(call => + call[0]?.includes && call[0].includes('Librarian analysis failed'), + ) + expect(errorCall).toBeDefined() + }) + expect(getLibrarianRuntimeStatus('story-1').runStatus).toBe('error') consoleSpy.mockRestore() }) - it('clearPending cancels all pending runs', async () => { + describe('reanalyzeAfterProseChange', () => { + it('runs re-analysis when prose content materially changes', async () => { + const before = makeFragment('pr-0001') + const after = { ...before, content: 'rewritten content' } + + reanalyzeAfterProseChange('/data', 'story-1', before, after) + + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(1)) + expect(mockedInvokeAgent).toHaveBeenCalledWith({ + dataDir: '/data', + storyId: 'story-1', + agentName: 'librarian.analyze', + input: { fragmentId: 'pr-0001' }, + }) + }) + + it('ignores non-prose fragments', async () => { + const before = { ...makeFragment('ch-0001'), type: 'character' as const } + const after = { ...before, content: 'changed' } + + reanalyzeAfterProseChange('/data', 'story-1', before, after) + + expect(getPendingCount()).toBe(0) + expect(mockedInvokeAgent).not.toHaveBeenCalled() + }) + + it('ignores changes that do not affect analyzed content', async () => { + const before = makeFragment('pr-0001') + const after = { ...before, sticky: true, updatedAt: new Date(Date.now() + 1000).toISOString() } + + reanalyzeAfterProseChange('/data', 'story-1', before, after) + + expect(getPendingCount()).toBe(0) + expect(mockedInvokeAgent).not.toHaveBeenCalled() + }) + }) + + describe('holdLibrarianAnalysis', () => { + it('defers analysis while held, then runs once with the latest fragment on release', async () => { + const release = holdLibrarianAnalysis('story-1') + + // Several edits across an agent run's tool steps; none start while held. + await triggerLibrarian('/data', 'story-1', makeFragment('pr-0001')) + await triggerLibrarian('/data', 'story-1', makeFragment('pr-0002')) + expect(mockedInvokeAgent).not.toHaveBeenCalled() + expect(getLibrarianRuntimeStatus('story-1').runStatus).toBe('scheduled') + + release() + + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(1)) + expect(mockedInvokeAgent).toHaveBeenCalledWith({ + dataDir: '/data', + storyId: 'story-1', + agentName: 'librarian.analyze', + input: { fragmentId: 'pr-0002' }, + }) + }) + + it('releasing with nothing queued does not run analysis', async () => { + const release = holdLibrarianAnalysis('story-1') + release() + await Promise.resolve() + expect(mockedInvokeAgent).not.toHaveBeenCalled() + expect(getPendingCount()).toBe(0) + }) + + it('refcounts nested holds — analysis waits for the last release', async () => { + const release1 = holdLibrarianAnalysis('story-1') + const release2 = holdLibrarianAnalysis('story-1') + + await triggerLibrarian('/data', 'story-1', makeFragment('pr-0001')) + release1() + expect(mockedInvokeAgent).not.toHaveBeenCalled() + + release2() + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(1)) + }) + }) + + it('clearPending resets in-flight and queued bookkeeping', async () => { + // Runs that never settle, so they stay "in flight" for the assertion. + mockedInvokeAgent.mockImplementation(() => new Promise(() => {})) + await triggerLibrarian('/data', 'story-1', makeFragment('pr-0001')) await triggerLibrarian('/data', 'story-2', makeFragment('pr-0002')) + await vi.waitFor(() => expect(mockedInvokeAgent).toHaveBeenCalledTimes(2)) expect(getPendingCount()).toBe(2) clearPending() expect(getPendingCount()).toBe(0) - - await vi.advanceTimersByTimeAsync(5000) - - expect(mockedInvokeAgent).not.toHaveBeenCalled() }) }) diff --git a/tests/llm/generation.test.ts b/tests/llm/generation.test.ts index 713d2ea8..f24c6a16 100644 --- a/tests/llm/generation.test.ts +++ b/tests/llm/generation.test.ts @@ -33,6 +33,13 @@ vi.mock('ai', async () => { } }) +// Stub only the index's invokeAgent so the librarian run the scheduler now fires stays +// in-flight; generation.ts imports its own from '../agents/runner', so it's unaffected. +vi.mock('@/server/agents', async (importOriginal) => { + const actual = await importOriginal() + return { ...actual, invokeAgent: vi.fn(() => new Promise(() => {})) } +}) + import { createApp } from '@/server/api' function makeStory(settingsOverrides?: Partial): StoryMeta {