Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/components/fragments/FragmentEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ export function FragmentEditor({
})
if (fType === 'prose') {
promises.push(queryClient.invalidateQueries({ queryKey: ['proseChain', storyId] }))
// Prose edits trigger re-analysis, so refresh the freshness indicator's index.
promises.push(queryClient.invalidateQueries({ queryKey: ['librarian-analysis-index', storyId] }))
}
if (fragment?.id) {
promises.push(queryClient.invalidateQueries({ queryKey: ['fragment', storyId, fragment.id] }))
Expand Down Expand Up @@ -206,6 +208,7 @@ export function FragmentEditor({
queryClient.invalidateQueries({ queryKey: ['fragments-archived', storyId] })
if (fType === 'prose') {
queryClient.invalidateQueries({ queryKey: ['proseChain', storyId] })
queryClient.invalidateQueries({ queryKey: ['librarian-analysis-index', storyId] })
}
setSaveStatus('saved')
if (savedStatusTimerRef.current) clearTimeout(savedStatusTimerRef.current)
Expand Down
8 changes: 7 additions & 1 deletion src/server/agents/create-streaming-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { createFragmentTools } from '../llm/tools'
import { reportUsage } from '../llm/token-tracker'
import { createLogger } from '../logging'
import { createEventStream } from './create-event-stream'
import { holdLibrarianAnalysis } from '../librarian/scheduler'
import { compileAgentContext, type CompiledAgentContext } from './compile-agent-context'
import { withBranch } from '../fragments/branches'

Expand Down Expand Up @@ -195,9 +196,14 @@ export function createStreamingRunner<TOpts extends object, TValidated = Record<
? config.messages({ compiled, opts })
: userMessage ? [{ role: 'user' as const, content: userMessage.content }] : []

// 11. Stream
// 11. Stream. Hold analysis for write-enabled runs so multi-step prose edits
// analyze once on the final state, not per edit (see holdLibrarianAnalysis).
const result = await agent.stream({ messages })
const releaseAnalysis = config.readOnly === false
? holdLibrarianAnalysis(storyId)
: () => {}
const streamResult = createEventStream(result.fullStream)
void streamResult.completion.then(releaseAnalysis, releaseAnalysis)

// 12. Track token usage after stream completes
streamResult.completion.then(async () => {
Expand Down
10 changes: 7 additions & 3 deletions src/server/librarian/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { pluginRegistry } from '../plugins/registry'
import { collectPluginTools } from '../plugins/tools'
import { createLogger } from '../logging'
import { createEventStream } from '../agents/create-event-stream'
import { holdLibrarianAnalysis } from './scheduler'
import { compileAgentContext } from '../agents/compile-agent-context'
import { createAgentInstance } from '../agents/agent-instance'
import { getFragmentsByTag } from '../fragments/associations'
Expand Down Expand Up @@ -195,10 +196,13 @@ async function librarianChatInner(
})),
]

// Stream with write tools
// Stream with write tools. Hold analysis so multi-step prose edits analyze once on the
// final state, not per edit (see holdLibrarianAnalysis).
const result = await chatAgent.stream({
messages: aiMessages,
})

return createEventStream(result.fullStream)
const releaseAnalysis = holdLibrarianAnalysis(storyId)
const stream = createEventStream(result.fullStream)
void stream.completion.then(releaseAnalysis, releaseAnalysis)
return stream
}
213 changes: 152 additions & 61 deletions src/server/librarian/scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import type { Fragment } from '../fragments/schema'
import { invokeAgent } from '../agents'
import { createLogger } from '../logging'
import { getActiveBranchId, withBranch } from '../fragments/branches'
import { clearAnalysisIndexEntry } from './storage'

const pending = new Map<string, ReturnType<typeof setTimeout>>()
interface QueuedRun {
dataDir: string
fragment: Fragment
branchId: string
}

interface SchedulerState {
running: boolean
/** Latest trigger that arrived while a run was in flight or held; supersedes earlier ones. */
queued: QueuedRun | null
}

const scheduler = new Map<string, SchedulerState>()
const runtimeStatus = new Map<string, LibrarianRuntimeStatus>()
const DEBOUNCE_MS = 2000
/** Per-story count of active agent runs that defer analysis until they finish. */
const holds = new Map<string, number>()
const logger = createLogger('librarian')

export type LibrarianRunStatus = 'idle' | 'scheduled' | 'running' | 'error'
Expand Down Expand Up @@ -37,90 +50,168 @@ function setRuntimeStatus(storyId: string, patch: Partial<LibrarianRuntimeStatus
})
}

/**
* Schedule a librarian analysis for a story. Analysis is one of the longest steps, so a
* run starts immediately when the story is idle. Triggers arriving while a run is in
* flight or held are coalesced to the latest fragment and run once the story settles.
*/
export async function triggerLibrarian(
dataDir: string,
storyId: string,
fragment: Fragment,
): Promise<void> {
const requestLogger = logger.child({ storyId })

// Capture the active branch NOW, before the debounce delay
// Capture the active branch at trigger time, before any in-flight run can switch it.
const branchId = await getActiveBranchId(dataDir, storyId)

// Clear any pending run for this story
const existing = pending.get(storyId)
if (existing) {
requestLogger.debug('Cancelling pending librarian run')
clearTimeout(existing)
const state = scheduler.get(storyId) ?? { running: false, queued: null }
scheduler.set(storyId, state)

const held = (holds.get(storyId) ?? 0) > 0
if (state.running || held) {
requestLogger.debug('Deferring re-analysis', { fragmentId: fragment.id, branchId, reason: held ? 'held' : 'running' })
state.queued = { dataDir, fragment, branchId }
setRuntimeStatus(storyId, {
runStatus: state.running ? 'running' : 'scheduled',
pendingFragmentId: fragment.id,
})
return
}

// Schedule a new run
requestLogger.info('Scheduling librarian run', { fragmentId: fragment.id, branchId, debounceMs: DEBOUNCE_MS })
state.running = true
void runAnalysis(dataDir, storyId, fragment, branchId)
}

async function runAnalysis(
dataDir: string,
storyId: string,
fragment: Fragment,
branchId: string,
): Promise<void> {
const requestLogger = logger.child({ storyId })
setRuntimeStatus(storyId, {
runStatus: 'scheduled',
pendingFragmentId: fragment.id,
runningFragmentId: null,
runStatus: 'running',
pendingFragmentId: null,
runningFragmentId: fragment.id,
lastError: null,
})
pending.set(
storyId,
setTimeout(async () => {
pending.delete(storyId)
setRuntimeStatus(storyId, {
runStatus: 'running',
pendingFragmentId: null,
runningFragmentId: fragment.id,
})
try {
requestLogger.info('Starting librarian analysis...', { fragmentId: fragment.id, branchId })
const startTime = Date.now()
await withBranch(dataDir, storyId, () => invokeAgent({
dataDir,
storyId,
agentName: 'librarian.analyze',
input: { fragmentId: fragment.id },
}), branchId)
const durationMs = Date.now() - startTime
requestLogger.info('Librarian analysis completed', { fragmentId: fragment.id, durationMs })
setRuntimeStatus(storyId, {
runStatus: 'idle',
pendingFragmentId: null,
runningFragmentId: null,
lastError: null,
})
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err)
requestLogger.error('Librarian analysis failed', {
fragmentId: fragment.id,
error: errorMessage,
})
setRuntimeStatus(storyId, {
runStatus: 'error',
pendingFragmentId: null,
runningFragmentId: null,
lastError: errorMessage,
})
}
}, DEBOUNCE_MS),
)

let lastError: string | null = null
try {
requestLogger.info('Starting librarian analysis...', { fragmentId: fragment.id, branchId })
const startTime = Date.now()
// Imported lazily: the agents runtime cycles back through the llm tools, and
// it's only needed here at run time.
const { invokeAgent } = await import('../agents')
await withBranch(dataDir, storyId, () => invokeAgent({
dataDir,
storyId,
agentName: 'librarian.analyze',
input: { fragmentId: fragment.id },
}), branchId)
requestLogger.info('Librarian analysis completed', { fragmentId: fragment.id, durationMs: Date.now() - startTime })
} catch (err) {
lastError = err instanceof Error ? err.message : String(err)
requestLogger.error('Librarian analysis failed', { fragmentId: fragment.id, error: lastError })
}

// Drain the latest queued trigger (no idle flicker between coalesced runs, so the UI's
// running → idle/error edge fires once). A held trigger waits for its release to flush.
const state = scheduler.get(storyId)
const held = (holds.get(storyId) ?? 0) > 0
if (state?.queued && !held) {
const next = state.queued
state.queued = null
void runAnalysis(next.dataDir, storyId, next.fragment, next.branchId)
return
}
if (state) state.running = false
setRuntimeStatus(storyId, {
runStatus: lastError ? 'error' : state?.queued ? 'scheduled' : 'idle',
pendingFragmentId: state?.queued?.fragment.id ?? null,
runningFragmentId: null,
lastError,
})
}

/** Clear all pending timers (useful for tests) */
/**
* Suspend librarian analysis for a story while an agent run edits it, returning a release
* function. Otherwise a run editing prose across several tool steps kicks off (and then
* supersedes) a full analysis per step; deferring to run end collapses that to a single
* analysis of the final state. Refcounted for concurrent runs.
*/
export function holdLibrarianAnalysis(storyId: string): () => void {
holds.set(storyId, (holds.get(storyId) ?? 0) + 1)
let released = false
return () => {
if (released) return
released = true
const remaining = (holds.get(storyId) ?? 1) - 1
if (remaining > 0) {
holds.set(storyId, remaining)
return
}
holds.delete(storyId)
const state = scheduler.get(storyId)
if (state && !state.running && state.queued) {
const next = state.queued
state.queued = null
state.running = true
void runAnalysis(next.dataDir, storyId, next.fragment, next.branchId)
}
}
}

function hasMaterialProseChange(before: Fragment, after: Fragment): boolean {
return before.name !== after.name
|| before.description !== after.description
|| before.content !== after.content
}

/**
* Schedule librarian re-analysis after a prose fragment changes, from any code path
* (HTTP route or librarian tool). No-ops for non-prose or immaterial changes; marks the
* analysis stale for the UI indicator and schedules the run.
*/
export function reanalyzeAfterProseChange(
dataDir: string,
storyId: string,
before: Fragment,
after: Fragment,
): void {
if (after.type !== 'prose' || !hasMaterialProseChange(before, after)) return
clearAnalysisIndexEntry(dataDir, storyId, after.id).catch(() => {})
Promise.resolve(triggerLibrarian(dataDir, storyId, after)).catch((err) => {
logger.child({ storyId }).error('triggerLibrarian failed after prose change', {
fragmentId: after.id,
error: err instanceof Error ? err.message : String(err),
})
})
}

/** Reset all scheduler bookkeeping (useful for tests). Does not abort an in-flight run. */
export function clearPending(): void {
for (const [storyId, timer] of pending.entries()) {
clearTimeout(timer)
for (const [storyId, state] of scheduler.entries()) {
state.queued = null
state.running = false
setRuntimeStatus(storyId, {
runStatus: 'idle',
pendingFragmentId: null,
runningFragmentId: null,
})
}
pending.clear()
scheduler.clear()
holds.clear()
}

/** Get the number of pending runs (useful for tests) */
/** Number of stories with a running or queued analysis (useful for tests). */
export function getPendingCount(): number {
return pending.size
let count = 0
for (const state of scheduler.values()) {
if (state.running || state.queued) count++
}
return count
}

export function getLibrarianRuntimeStatus(storyId: string): LibrarianRuntimeStatus {
Expand Down
4 changes: 4 additions & 0 deletions src/server/llm/tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { createLogger } from '../logging'
import type { Fragment } from '../fragments/schema'
import { generateFragmentId } from '@/lib/fragment-ids'
import { checkFragmentWrite, isFragmentLocked } from '../fragments/protection'
import { reanalyzeAfterProseChange } from '../librarian/scheduler'
import { capitalize, pluralize } from './agents'

const logger = createLogger('llm-tools')
Expand Down Expand Up @@ -313,6 +314,7 @@ export function createFragmentTools(
if (!updated) {
return { error: `Fragment not found: ${fragmentId}` }
}
reanalyzeAfterProseChange(dataDir, storyId, fragment, updated)
return { ok: true, id: fragmentId }
}),
})
Expand Down Expand Up @@ -346,6 +348,7 @@ export function createFragmentTools(
if (!updated) {
return { error: `Fragment not found: ${fragmentId}` }
}
reanalyzeAfterProseChange(dataDir, storyId, fragment, updated)
return { ok: true, id: fragmentId }
}),
})
Expand Down Expand Up @@ -403,6 +406,7 @@ export function createFragmentTools(
updatedAt: new Date().toISOString(),
}
await updateFragment(dataDir, storyId, updated)
reanalyzeAfterProseChange(dataDir, storyId, f, updated)
edited.push(f.id)
}
}
Expand Down
Loading