diff --git a/internal/agent/build.go b/internal/agent/build.go index c93fecc..e22138a 100644 --- a/internal/agent/build.go +++ b/internal/agent/build.go @@ -39,6 +39,10 @@ type BuildParams struct { PermissionDecider PermDecisionFunc InteractionFunc tool.InteractionFunc ToolProgress func(toolCallID string, msg string) + + // OnEvent observes every agent lifecycle event synchronously, alongside + // outbox delivery. Used by the trace recorder; nil leaves recording off. + OnEvent func(core.Event) } func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) { @@ -87,7 +91,7 @@ func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) { })) tools := tool.AdaptToolRegistry(schemas, cwdFunc, adaptOpts...) for _, t := range p.MCPTools { - tools.Add(t) + tools.Add(t, "mcp:"+t.Name()) } compactClient := client @@ -111,6 +115,7 @@ func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) { Tools: tool.WithPermission(tools, pb.PermissionFunc()), CompactFunc: compactFunc, CWD: p.CWD, + OnEvent: p.OnEvent, }) return ag, pb, nil diff --git a/internal/app/agent.go b/internal/app/agent.go index a70ed16..9253895 100644 --- a/internal/app/agent.go +++ b/internal/app/agent.go @@ -61,11 +61,18 @@ func (m *model) buildAgentParams() agent.BuildParams { mcpTools = mcp.AsCoreTools(schemas, mcpCaller) } + maxTokens := kit.GetMaxTokens(m.services.LLM.Store(), m.env.CurrentModel, setting.DefaultMaxTokens) + var onEvent func(core.Event) + if rec := m.services.Session.NewRecorder("main", m.env.LLMProvider.Name(), m.env.GetModelID(), maxTokens); rec != nil { + onEvent = rec.OnAgentEvent + } + return agent.BuildParams{ Provider: m.env.LLMProvider, ModelID: m.env.GetModelID(), - MaxTokens: kit.GetMaxTokens(m.services.LLM.Store(), m.env.CurrentModel, setting.DefaultMaxTokens), + MaxTokens: maxTokens, ThinkingEffort: m.env.EffectiveThinkingEffort(), + OnEvent: onEvent, CWD: m.env.CWD, CWDFunc: func() string { return m.env.CWD }, diff --git a/internal/core/agent.go b/internal/core/agent.go index 8df891c..650333d 100644 --- a/internal/core/agent.go +++ b/internal/core/agent.go @@ -133,7 +133,7 @@ func NewAgent(cfg Config) Agent { outbox = make(chan Event, cfg.OutboxBuf) } - return &agent{ + a := &agent{ id: cfg.ID, agentType: cfg.AgentType, color: cfg.Color, @@ -148,6 +148,16 @@ func NewAgent(cfg Config) Agent { outbox: outbox, onEvent: cfg.OnEvent, } + // Mirror system + tools mutations onto the event bus. Attach after + // construction so each registry replays its initial members back to the + // observer — the recorder sees a complete event chain from t0. + cfg.System.SetObserver(func(c SystemChange) { + a.emitTelemetry(SystemChangeEvent(a.id, c)) + }) + cfg.Tools.SetObserver(func(c ToolsChange) { + a.emitTelemetry(ToolsChangeEvent(a.id, c)) + }) + return a } // Result represents the outcome of one completed turn (end_turn). @@ -178,6 +188,15 @@ const ( OnMessage EventType = "Message" // message received on inbox (Message in Data) OnTurn EventType = "Turn" // think+act cycle completed (Result in Data) OnCompact EventType = "Compact" // conversation compacted (CompactInfo in Data) + + // OnSystemChange fires when a system-prompt section is added, replaced, + // or removed. Data is SystemChange. Non-critical telemetry — never blocks + // the outbox on backpressure. + OnSystemChange EventType = "SystemChange" + + // OnToolsChange fires when a tool is registered or unregistered. Data is + // ToolsChange. Like OnSystemChange, non-blocking telemetry. + OnToolsChange EventType = "ToolsChange" ) // Event carries context for an agent lifecycle point. @@ -205,6 +224,50 @@ type CompactInfo struct { OriginalCount int } +// InferenceContext is the PreInfer payload — what was about to be sent to the +// LLM, expressed as content-addressed digests so consumers (trace recorder, +// debug logger) can reference inputs without copying them on every turn. +type InferenceContext struct { + SystemDigest string // sha256 of rendered system prompt + ToolsDigest string // sha256 of canonicalized tool schemas + MessageIDs []string // active chain at request time, in send order +} + +func (e Event) InferenceContext() (InferenceContext, bool) { + ic, ok := e.Data.(InferenceContext) + return ic, ok +} + +// SystemChange describes one mutation to the system prompt's section map. +// Emitted on Use/Drop. The recorder translates these into +// system.section.added / system.section.removed records. +type SystemChange struct { + Name string // section name (stable across mutations) + Slot int // render slot + Content string // rendered content; empty when Removed + Removed bool // true on Drop, false on Use + Caller string // who triggered the mutation (e.g. "system:init", "command:/identity") +} + +func (e Event) SystemChange() (SystemChange, bool) { + c, ok := e.Data.(SystemChange) + return c, ok +} + +// ToolsChange describes one mutation to the tool registry. On removal, +// Schema.Name carries the dropped tool's name and other fields are zero. +type ToolsChange struct { + Schema ToolSchema // populated on Add (zero on Remove) + Name string // populated on Remove (empty on Add) + Removed bool // true on Remove, false on Add + Caller string // who triggered the mutation +} + +func (e Event) ToolsChange() (ToolsChange, bool) { + c, ok := e.Data.(ToolsChange) + return c, ok +} + // Typed event constructors — enforce correct Data types at construction. func StartEvent(agentID string) Event { return Event{Type: OnStart, Source: agentID} } @@ -214,7 +277,9 @@ func StopEvent(agentID string, err error) Event { func ChunkEvent(agentID string, c Chunk) Event { return Event{Type: OnChunk, Source: agentID, Data: c} } func MessageEvent(msg Message) Event { return Event{Type: OnMessage, Source: msg.From, Data: msg} } func TurnEvent(agentID string, r Result) Event { return Event{Type: OnTurn, Source: agentID, Data: r} } -func PreInferEvent(agentID string) Event { return Event{Type: PreInfer, Source: agentID} } +func PreInferEvent(agentID string, ctx InferenceContext) Event { + return Event{Type: PreInfer, Source: agentID, Data: ctx} +} func PostInferEvent(agentID string, r *InferResponse) Event { return Event{Type: PostInfer, Source: agentID, Data: r} } @@ -223,3 +288,11 @@ func PostToolEvent(tr ToolResult) Event { return Event{Type: PostTool, Source: t func CompactEvent(agentID string, info CompactInfo) Event { return Event{Type: OnCompact, Source: agentID, Data: info} } + +func SystemChangeEvent(agentID string, c SystemChange) Event { + return Event{Type: OnSystemChange, Source: agentID, Data: c} +} + +func ToolsChangeEvent(agentID string, c ToolsChange) Event { + return Event{Type: OnToolsChange, Source: agentID, Data: c} +} diff --git a/internal/core/agent_impl.go b/internal/core/agent_impl.go index 725d594..fc4f212 100644 --- a/internal/core/agent_impl.go +++ b/internal/core/agent_impl.go @@ -210,8 +210,6 @@ func (a *agent) ThinkAct(ctx context.Context) (*Result, error) { } } - a.emit(ctx, PreInferEvent(a.id)) - resp, err := a.streamInfer(ctx) if err != nil { // Reactive compaction: if prompt too long, compact and retry @@ -424,11 +422,24 @@ func ToolCallIDFromContext(ctx context.Context) string { // --- internals --- // streamInfer calls the LLM, streams chunks to outbox, returns the final response. +// +// Emits PreInferEvent (with input digests) before the LLM call so observers +// can record exactly what was sent without copying the bytes on every turn. func (a *agent) streamInfer(ctx context.Context) (*InferResponse, error) { + sys := a.system.Prompt() + msgs := a.snapshot() + tools := a.tools.Schemas() + + a.emit(ctx, PreInferEvent(a.id, InferenceContext{ + SystemDigest: sha256Hex([]byte(sys)), + ToolsDigest: toolsDigest(tools), + MessageIDs: messageIDs(msgs), + })) + chunks, err := a.llm.Infer(ctx, InferRequest{ - System: a.system.Prompt(), - Messages: a.snapshot(), - Tools: a.tools.Schemas(), + System: sys, + Messages: msgs, + Tools: tools, }) if err != nil { return nil, fmt.Errorf("infer: %w", err) @@ -475,6 +486,23 @@ func (a *agent) emit(ctx context.Context, event Event) { } } +// emitTelemetry delivers a fire-and-forget event: synchronously to onEvent, +// non-blocking to the outbox (dropped if full). Used for events whose +// consumers tolerate misses (system changes, hot-path tracing) and which can +// fire from goroutines without a useful ctx (e.g. system observer callbacks). +func (a *agent) emitTelemetry(event Event) { + if a.onEvent != nil { + a.onEvent(event) + } + if a.outbox == nil || a.closed.Load() { + return + } + select { + case a.outbox <- event: + default: + } +} + // emitFinal sends a critical event that must be delivered even on ctx cancellation. // Used for StopEvent — consumers rely on it for cleanup/session saving. // No-op when outbox is nil. Blocks up to 5 seconds; logs a warning if delivery fails. diff --git a/internal/core/digest.go b/internal/core/digest.go new file mode 100644 index 0000000..1312b27 --- /dev/null +++ b/internal/core/digest.go @@ -0,0 +1,51 @@ +package core + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "sort" +) + +// sha256Hex returns "sha256:" + lowercase hex of the SHA-256 sum of b. +// The prefix makes algorithm choice explicit on the wire. +func sha256Hex(b []byte) string { + sum := sha256.Sum256(b) + return "sha256:" + hex.EncodeToString(sum[:]) +} + +// toolsDigest canonicalizes a tool schema list (sort by Name, marshal each) +// and returns its sha256. Stable across runs as long as schemas are stable. +func toolsDigest(schemas []ToolSchema) string { + if len(schemas) == 0 { + return sha256Hex(nil) + } + sorted := make([]ToolSchema, len(schemas)) + copy(sorted, schemas) + sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name }) + + b, err := json.Marshal(sorted) + if err != nil { + // Marshal can only fail on unsupported types; ToolSchema fields are + // JSON-safe. Fall back to a digest of the names so the value is still + // stable rather than empty. + names := make([]string, len(sorted)) + for i, s := range sorted { + names[i] = s.Name + } + b, _ = json.Marshal(names) + } + return sha256Hex(b) +} + +// messageIDs extracts non-empty message IDs from the conversation snapshot, +// in send order. Empty IDs (legacy data) are skipped rather than padded. +func messageIDs(msgs []Message) []string { + out := make([]string, 0, len(msgs)) + for _, m := range msgs { + if m.ID != "" { + out = append(out, m.ID) + } + } + return out +} diff --git a/internal/core/system.go b/internal/core/system.go index 8267f7a..271e6ff 100644 --- a/internal/core/system.go +++ b/internal/core/system.go @@ -19,13 +19,26 @@ type System interface { // invalidated only when sections change. Prompt() string - // Use registers or replaces a section by Name. - Use(Section) + // Use registers or replaces a section by Name. Caller is a short tag + // describing what triggered the mutation (e.g. "system:init", + // "command:/identity") and surfaces in the trace. + Use(sec Section, caller string) // Drop removes a section by Name. No-op if absent. - Drop(name string) + Drop(name, caller string) // Refresh marks one section's rendered output stale. Use after the // section's underlying state changed but the Section value did not. - Refresh(name string) + Refresh(name, caller string) + + // Sections returns a snapshot of currently registered sections in + // render order (slot ascending, insertion order ascending). Used by + // observers that attach after construction to replay the existing state. + Sections() []Section + + // SetObserver installs a callback invoked synchronously on every + // subsequent mutation. Attaching also replays existing sections as + // "added" events so the observer sees a complete history starting + // from the moment of attachment. + SetObserver(fn func(SystemChange)) } diff --git a/internal/core/system/builder_test.go b/internal/core/system/builder_test.go index 2121653..35e4a05 100644 --- a/internal/core/system/builder_test.go +++ b/internal/core/system/builder_test.go @@ -188,13 +188,13 @@ func TestSystemUseDropRefresh(t *testing.T) { sys.Use(core.Section{ Slot: core.SlotEnvironment, Name: "test-section", Source: core.Dynamic, Render: func() string { return "TEST_SECTION_BODY" }, - }) + }, "test") if !strings.Contains(sys.Prompt(), "TEST_SECTION_BODY") { t.Error("Use should add a new section's content to Prompt()") } // Drop: remove it. - sys.Drop("test-section") + sys.Drop("test-section", "test") if strings.Contains(sys.Prompt(), "TEST_SECTION_BODY") { t.Error("Drop should remove the section from Prompt()") } diff --git a/internal/core/system/catalog.go b/internal/core/system/catalog.go index 405b430..22741e9 100644 --- a/internal/core/system/catalog.go +++ b/internal/core/system/catalog.go @@ -57,13 +57,14 @@ func loadEmbedOptional(path string) string { // applyDefaults registers the always-on sections for a Scope. // Options passed to Build can override identity by Name. func applyDefaults(sys core.System, scope core.Scope) { - sys.Use(defaultIdentity()) - sys.Use(policy()) - sys.Use(guidelines("tools", cachedTools)) + const caller = "system:init" + sys.Use(defaultIdentity(), caller) + sys.Use(policy(), caller) + sys.Use(guidelines("tools", cachedTools), caller) if scope == core.ScopeMain { // Task tracking + interactive questions are main-agent behaviors. - sys.Use(guidelines("tasks", cachedTasks)) - sys.Use(guidelines("questions", cachedQuestions)) + sys.Use(guidelines("tasks", cachedTasks), caller) + sys.Use(guidelines("questions", cachedQuestions), caller) } } @@ -139,7 +140,7 @@ func WithIdentity(text string) Option { if text == "" { return } - sys.Use(identitySection(text)) + sys.Use(identitySection(text), "system:init") } } @@ -148,10 +149,10 @@ func WithIdentity(text string) Option { func SwapIdentity(sys core.System, text string) { text = strings.TrimSpace(text) if text == "" { - sys.Use(defaultIdentity()) + sys.Use(defaultIdentity(), "command:identity") return } - sys.Use(identitySection(text)) + sys.Use(identitySection(text), "command:identity") } // identitySection builds the slot-0 identity Section for a user-defined persona. @@ -178,7 +179,7 @@ func WithProvider(name string) Option { Render: func() string { return wrap("provider", map[string]string{"name": name}, body) }, - }) + }, "system:init") } } @@ -188,7 +189,7 @@ func WithGitGuidelines(isGit bool) Option { if !isGit { return } - sys.Use(guidelines("git", cachedGit)) + sys.Use(guidelines("git", cachedGit), "system:init") } } @@ -216,7 +217,7 @@ func WithSubagentIdentity(b SubagentBrief) Option { sys.Use(core.Section{ Slot: core.SlotIdentity, Name: "identity", Source: core.Injected, Render: func() string { return renderSubagentIdentity(b) }, - }) + }, "subagent:init") } } @@ -278,7 +279,7 @@ func WithEnvironment(env Environment) Option { sys.Use(core.Section{ Slot: core.SlotEnvironment, Name: "environment", Source: core.Dynamic, Render: func() string { return renderEnvironment(env) }, - }) + }, "system:init") } } diff --git a/internal/core/system_impl.go b/internal/core/system_impl.go index 4cda4d6..b41cb27 100644 --- a/internal/core/system_impl.go +++ b/internal/core/system_impl.go @@ -21,6 +21,11 @@ type system struct { counter int // monotonic insertion sequence cached string dirty bool + + // observer is invoked synchronously on every Use/Drop after the mutation + // applies. Set via SetObserver; nil until then. Reads happen on the same + // goroutine that mutated, so no extra locking around the call. + observer func(SystemChange) } type sectionEntry struct { @@ -57,9 +62,8 @@ func (s *system) Prompt() string { return s.cached } -func (s *system) Use(sec Section) { +func (s *system) Use(sec Section, caller string) { s.mu.Lock() - defer s.mu.Unlock() if e, ok := s.sections[sec.Name]; ok { // Replacing an existing section: preserve its insertion order so // position in the prompt does not jump around on hot updates. @@ -70,24 +74,102 @@ func (s *system) Use(sec Section) { s.sections[sec.Name] = §ionEntry{def: sec, inserted: s.counter} } s.dirty = true + obs := s.observer + s.mu.Unlock() + + if obs != nil { + obs(SystemChange{ + Name: sec.Name, + Slot: int(sec.Slot), + Content: renderSection(sec), + Caller: caller, + }) + } } -func (s *system) Drop(name string) { +func (s *system) Drop(name, caller string) { s.mu.Lock() - defer s.mu.Unlock() - if _, ok := s.sections[name]; ok { + _, existed := s.sections[name] + if existed { delete(s.sections, name) s.dirty = true } + obs := s.observer + s.mu.Unlock() + + if existed && obs != nil { + obs(SystemChange{Name: name, Removed: true, Caller: caller}) + } } -func (s *system) Refresh(name string) { +func (s *system) Refresh(name, caller string) { s.mu.Lock() - defer s.mu.Unlock() + var ( + obs func(SystemChange) + changed bool + sec Section + ) if e, ok := s.sections[name]; ok { e.fresh = false s.dirty = true + changed = true + sec = e.def + obs = s.observer + } + s.mu.Unlock() + + if changed && obs != nil { + obs(SystemChange{ + Name: sec.Name, + Slot: int(sec.Slot), + Content: renderSection(sec), + Caller: caller, + }) + } +} + +// Sections returns a snapshot of currently registered sections in render order. +func (s *system) Sections() []Section { + s.mu.RLock() + defer s.mu.RUnlock() + entries := s.sortedEntries() + out := make([]Section, 0, len(entries)) + for _, e := range entries { + out = append(out, e.def) + } + return out +} + +// SetObserver attaches an observer for section mutations. On attach, the +// existing sections are replayed as synthetic "added" events so the observer +// sees a complete history starting from this moment. +func (s *system) SetObserver(fn func(SystemChange)) { + s.mu.Lock() + s.observer = fn + entries := s.sortedEntries() + s.mu.Unlock() + + if fn == nil { + return + } + for _, e := range entries { + fn(SystemChange{ + Name: e.def.Name, + Slot: int(e.def.Slot), + Content: renderSection(e.def), + Caller: "system:init", + }) + } +} + +// renderSection invokes a Section's Render func once, returning "" on nil. +// Used by observers that need the rendered content without going through the +// cached-Prompt path (which joins all sections). +func renderSection(sec Section) string { + if sec.Render == nil { + return "" } + return sec.Render() } // sortedEntries returns entries in render order (Slot, insertion order). diff --git a/internal/core/tool.go b/internal/core/tool.go index f353e2d..eb6cacc 100644 --- a/internal/core/tool.go +++ b/internal/core/tool.go @@ -34,7 +34,15 @@ type ToolSchema struct { type Tools interface { Get(name string) Tool All() []Tool - Add(tool Tool) - Remove(name string) + // Add registers (or replaces) a tool. Caller tags the mutation source + // (e.g. "mcp:weather", "agent:init") for trace records. + Add(tool Tool, caller string) + // Remove unregisters a tool by name. No-op if absent. + Remove(name, caller string) Schemas() []ToolSchema + + // SetObserver installs a callback invoked synchronously on every + // subsequent Add/Remove. Attaching also replays existing tools as + // synthetic Add events so the observer sees the full registry from t0. + SetObserver(fn func(ToolsChange)) } diff --git a/internal/core/tool_impl.go b/internal/core/tool_impl.go index 4080c44..589b863 100644 --- a/internal/core/tool_impl.go +++ b/internal/core/tool_impl.go @@ -13,6 +13,8 @@ type toolSet struct { tools map[string]Tool dirty bool // true when schemas cache needs rebuild cache []ToolSchema // cached schemas + + observer func(ToolsChange) // invoked after Add/Remove; nil until SetObserver } // NewTools creates an empty tool set. @@ -44,20 +46,52 @@ func (s *toolSet) All() []Tool { return out } -func (s *toolSet) Add(tool Tool) { +func (s *toolSet) Add(tool Tool, caller string) { s.mu.Lock() - defer s.mu.Unlock() s.tools[tool.Name()] = tool s.dirty = true + obs := s.observer + s.mu.Unlock() + + if obs != nil { + obs(ToolsChange{Schema: tool.Schema(), Caller: caller}) + } } -func (s *toolSet) Remove(name string) { +func (s *toolSet) Remove(name, caller string) { s.mu.Lock() - defer s.mu.Unlock() - if _, ok := s.tools[name]; ok { + _, existed := s.tools[name] + if existed { delete(s.tools, name) s.dirty = true } + obs := s.observer + s.mu.Unlock() + + if existed && obs != nil { + obs(ToolsChange{Name: name, Removed: true, Caller: caller}) + } +} + +// SetObserver attaches a callback invoked on every Add/Remove. On attach, +// existing tools are replayed as Add events with caller="tools:init". +func (s *toolSet) SetObserver(fn func(ToolsChange)) { + s.mu.Lock() + s.observer = fn + snapshot := make([]Tool, 0, len(s.tools)) + for _, t := range s.tools { + snapshot = append(snapshot, t) + } + s.mu.Unlock() + + if fn == nil { + return + } + // Stable order so replay is reproducible across processes. + sort.Slice(snapshot, func(i, j int) bool { return snapshot[i].Name() < snapshot[j].Name() }) + for _, t := range snapshot { + fn(ToolsChange{Schema: t.Schema(), Caller: "tools:init"}) + } } func (s *toolSet) Schemas() []ToolSchema { diff --git a/internal/session/message_convert.go b/internal/session/message_convert.go index d862ad8..2b3dfe1 100644 --- a/internal/session/message_convert.go +++ b/internal/session/message_convert.go @@ -13,6 +13,47 @@ import ( var inlineImageTokenPattern = regexp.MustCompile(`\[Image #(\d+)\]`) +// systemReminderRe matches a complete ... +// block including tags. Reminders are appended verbatim by reminder.AttachToContent +// and hook UserPromptSubmit additionalContext flows through the same path, so +// recognizing the wrapper suffices to mark harness-injected content. +var systemReminderRe = regexp.MustCompile(`(?s).*?`) + +const SourceReminder = "reminder" + +// splitTextByProvenance returns ContentBlocks that together preserve the input +// byte-for-byte, marking blocks with Source="reminder" so +// traces can distinguish user-typed text from harness-injected reminders / +// hook additionalContext. Empty input returns nil. +// +// The function never trims whitespace — concatenating all returned Text fields +// in order reproduces the original input. This keeps the read path +// (extractUserContent, which joins text blocks) round-trip safe. +func splitTextByProvenance(text string) []ContentBlock { + if text == "" { + return nil + } + matches := systemReminderRe.FindAllStringIndex(text, -1) + if len(matches) == 0 { + return []ContentBlock{{Type: "text", Text: text}} + } + + blocks := make([]ContentBlock, 0, 2*len(matches)+1) + cursor := 0 + for _, m := range matches { + start, end := m[0], m[1] + if start > cursor { + blocks = append(blocks, ContentBlock{Type: "text", Text: text[cursor:start]}) + } + blocks = append(blocks, ContentBlock{Type: "text", Text: text[start:end], Source: SourceReminder}) + cursor = end + } + if cursor < len(text) { + blocks = append(blocks, ContentBlock{Type: "text", Text: text[cursor:]}) + } + return blocks +} + func messagesToEntries(msgs []core.Message) []Entry { entries := make([]Entry, 0, len(msgs)) var prevUUID string @@ -106,9 +147,7 @@ func userContentToBlocks(content, displayContent string, images []core.Image) [] ImageSource: &ImageSource{Type: "base64", MediaType: img.MediaType, Data: img.Data}, }) } - if content != "" { - blocks = append(blocks, ContentBlock{Type: "text", Text: content}) - } + blocks = append(blocks, splitTextByProvenance(content)...) return blocks } @@ -123,9 +162,8 @@ func interleavedUserContentToBlocks(content, displayContent string, images []cor start, end := match[0], match[1] idStart, idEnd := match[2], match[3] - textPart := displayContent[last:start] - if textPart != "" { - blocks = append(blocks, ContentBlock{Type: "text", Text: textPart}) + if textPart := displayContent[last:start]; textPart != "" { + blocks = append(blocks, splitTextByProvenance(textPart)...) } id, err := strconv.Atoi(displayContent[idStart:idEnd]) @@ -143,11 +181,11 @@ func interleavedUserContentToBlocks(content, displayContent string, images []cor } if tail := displayContent[last:]; tail != "" { - blocks = append(blocks, ContentBlock{Type: "text", Text: tail}) + blocks = append(blocks, splitTextByProvenance(tail)...) } if len(blocks) == 0 && content != "" { - blocks = append(blocks, ContentBlock{Type: "text", Text: content}) + blocks = append(blocks, splitTextByProvenance(content)...) } return blocks diff --git a/internal/session/message_convert_test.go b/internal/session/message_convert_test.go index 5d23a61..7716d71 100644 --- a/internal/session/message_convert_test.go +++ b/internal/session/message_convert_test.go @@ -1,11 +1,61 @@ package session import ( + "strings" "testing" "github.com/genai-io/gen-code/internal/core" ) +// A user message that contains harness-injected blocks must +// be persisted as multiple ContentBlocks: user-typed text with empty Source, +// reminder text with Source="reminder". Concatenating the text fields must +// reproduce the input byte-for-byte (round-trip safety for read path). +func Test_userContentToBlocks_splitsByProvenance(t *testing.T) { + const reminder1 = "\nskills directory\n" + const reminder2 = "\nuser memory\n" + input := "hello\n\n" + reminder1 + "\n\n" + reminder2 + + blocks := userContentToBlocks(input, "", nil) + + var sb strings.Builder + var reminderCount, userCount int + for _, b := range blocks { + if b.Type != "text" { + t.Fatalf("unexpected block type: %q", b.Type) + } + sb.WriteString(b.Text) + switch b.Source { + case SourceReminder: + reminderCount++ + if !strings.HasPrefix(b.Text, "") { + t.Errorf("reminder block missing wrapper: %q", b.Text) + } + case "": + userCount++ + default: + t.Fatalf("unexpected Source: %q", b.Source) + } + } + if sb.String() != input { + t.Fatalf("round-trip mismatch:\n got: %q\nwant: %q", sb.String(), input) + } + if reminderCount != 2 { + t.Fatalf("reminder block count = %d, want 2", reminderCount) + } + if userCount == 0 { + t.Fatalf("expected at least one user block, got %d", userCount) + } +} + +// Plain user content with no reminders produces exactly one user-text block. +func Test_userContentToBlocks_plainTextOneBlock(t *testing.T) { + blocks := userContentToBlocks("just a question", "", nil) + if len(blocks) != 1 || blocks[0].Type != "text" || blocks[0].Source != "" { + t.Fatalf("expected 1 user-text block, got %+v", blocks) + } +} + func Test_messagesToEntries_roundtrip(t *testing.T) { // Test that messagesToEntries -> EntriesToMessages roundtrips correctly. msgs := []core.Message{ diff --git a/internal/session/recorder.go b/internal/session/recorder.go new file mode 100644 index 0000000..b9ace74 --- /dev/null +++ b/internal/session/recorder.go @@ -0,0 +1,258 @@ +package session + +import ( + "context" + "encoding/json" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + + "github.com/genai-io/gen-code/internal/core" + "github.com/genai-io/gen-code/internal/log" + "github.com/genai-io/gen-code/internal/session/transcript" +) + +// Recorder turns core.Agent lifecycle events into transcript records. +// +// One Recorder is bound to one (sessionID, agentID) pair. The OnAgentEvent +// method is meant to be passed as core.Config.OnEvent — it's called +// synchronously from the agent goroutine, so handlers stay fast. +// +// The recorder is additive: existing code paths (Store.Save) keep persisting +// messages and state. The recorder writes only event-driven records +// (inference.requested / inference.responded) that the snapshot path can't +// observe. +type Recorder struct { + fs *transcript.FileStore + sessionID string + agentID string + provider string + model string + maxTokens int + + turn atomic.Int64 + + mu sync.Mutex + lastRequest *requestState +} + +type requestState struct { + turn int + startedAt time.Time + messageIDs []string +} + +// RecorderOptions configures a Recorder. provider/model/maxTokens are captured +// at construction time and stamped on every inference record. Mid-session +// model swaps will be handled by a future model.changed event, not by mutating +// the recorder. +// +// Cwd and ProjectID are needed at construction so the recorder can write +// session.started before any telemetry — see NewRecorder for why. +type RecorderOptions struct { + FileStore *transcript.FileStore + SessionID string + AgentID string + Provider string + Model string + MaxTokens int + Cwd string + ProjectID string +} + +// NewRecorder constructs a Recorder and ensures session.started lands on +// disk BEFORE the recorder can be invoked. This matters because the caller +// (core.NewAgent → System/Tools.SetObserver) immediately replays existing +// sections/tools as synthetic events; those events create the transcript +// file via AppendSystemSection / AppendTools. If session.started hasn't +// been written first, the subsequent Store.Save → Start call short-circuits +// on fileExists and provider/model/parent metadata is lost. +// +// Start is idempotent — a no-op on existing files — so callers (Store.Save) +// that also invoke it stay correct. +func NewRecorder(opts RecorderOptions) *Recorder { + if opts.FileStore != nil && opts.SessionID != "" { + _ = opts.FileStore.Start(context.Background(), transcript.StartCommand{ + SessionID: opts.SessionID, + ProjectID: opts.ProjectID, + Cwd: opts.Cwd, + Provider: opts.Provider, + Model: opts.Model, + Time: time.Now(), + }) + } + return &Recorder{ + fs: opts.FileStore, + sessionID: opts.SessionID, + agentID: opts.AgentID, + provider: opts.Provider, + model: opts.Model, + maxTokens: opts.MaxTokens, + } +} + +// OnAgentEvent is the core.Config.OnEvent callback. It dispatches by event +// type and writes the corresponding transcript record. Errors are logged +// rather than propagated — failing to record telemetry must not break the +// running session. +func (r *Recorder) OnAgentEvent(ev core.Event) { + if r == nil || r.fs == nil || r.sessionID == "" { + return + } + switch ev.Type { + case core.PreInfer: + r.onPreInfer(ev) + case core.PostInfer: + r.onPostInfer(ev) + case core.OnSystemChange: + r.onSystemChange(ev) + case core.OnToolsChange: + r.onToolsChange(ev) + } +} + +func (r *Recorder) onToolsChange(ev core.Event) { + c, ok := ev.ToolsChange() + if !ok { + return + } + typ := transcript.ToolsAdded + payload := transcript.ToolsRecord{Caller: c.Caller} + if c.Removed { + typ = transcript.ToolsRemoved + payload.Name = c.Name + } else { + payload.Schema = toolSchemaView(c.Schema) + } + err := r.fs.AppendTools(context.Background(), transcript.AppendToolsCommand{ + SessionID: r.sessionID, + AgentID: r.agentID, + Time: time.Now(), + Type: typ, + Record: payload, + }) + if err != nil { + log.Logger().Warn("recorder: append tools change failed", zap.Error(err)) + } +} + +func toolSchemaView(s core.ToolSchema) *transcript.ToolSchemaView { + view := &transcript.ToolSchemaView{ + Name: s.Name, + Description: s.Description, + } + if s.Parameters != nil { + if data, err := json.Marshal(s.Parameters); err == nil { + view.Parameters = data + } + } + return view +} + +func (r *Recorder) onSystemChange(ev core.Event) { + c, ok := ev.SystemChange() + if !ok { + return + } + typ := transcript.SystemSectionAdded + if c.Removed { + typ = transcript.SystemSectionRemoved + } + err := r.fs.AppendSystemSection(context.Background(), transcript.AppendSystemSectionCommand{ + SessionID: r.sessionID, + AgentID: r.agentID, + Time: time.Now(), + Type: typ, + Record: transcript.SystemSectionRecord{ + Name: c.Name, + Slot: c.Slot, + Content: c.Content, + Caller: c.Caller, + }, + }) + if err != nil { + log.Logger().Warn("recorder: append system section failed", zap.Error(err)) + } +} + +func (r *Recorder) onPreInfer(ev core.Event) { + ic, ok := ev.InferenceContext() + if !ok { + return + } + + turn := int(r.turn.Add(1)) + now := time.Now() + + r.mu.Lock() + r.lastRequest = &requestState{ + turn: turn, + startedAt: now, + messageIDs: ic.MessageIDs, + } + r.mu.Unlock() + + err := r.fs.AppendInference(context.Background(), transcript.AppendInferenceCommand{ + SessionID: r.sessionID, + AgentID: r.agentID, + Time: now, + Type: transcript.InferenceRequested, + Record: transcript.InferenceRecord{ + Turn: turn, + Provider: r.provider, + Model: r.model, + MaxTokens: r.maxTokens, + SystemDigest: ic.SystemDigest, + ToolsDigest: ic.ToolsDigest, + MessageIDs: ic.MessageIDs, + }, + }) + if err != nil { + log.Logger().Warn("recorder: append inference.requested failed", zap.Error(err)) + } +} + +func (r *Recorder) onPostInfer(ev core.Event) { + resp, ok := ev.Response() + if !ok || resp == nil { + return + } + + r.mu.Lock() + prev := r.lastRequest + r.lastRequest = nil + r.mu.Unlock() + + now := time.Now() + var turn int + var latencyMs int64 + if prev != nil { + turn = prev.turn + latencyMs = now.Sub(prev.startedAt).Milliseconds() + } + + err := r.fs.AppendInference(context.Background(), transcript.AppendInferenceCommand{ + SessionID: r.sessionID, + AgentID: r.agentID, + Time: now, + Type: transcript.InferenceResponded, + Record: transcript.InferenceRecord{ + Turn: turn, + Provider: r.provider, + Model: r.model, + StopReason: string(resp.StopReason), + LatencyMs: latencyMs, + Usage: &transcript.InferenceUsage{ + InputTokens: resp.TokensIn, + OutputTokens: resp.TokensOut, + CacheCreateTokens: resp.CacheCreateTokens, + CacheReadTokens: resp.CacheReadTokens, + }, + }, + }) + if err != nil { + log.Logger().Warn("recorder: append inference.responded failed", zap.Error(err)) + } +} diff --git a/internal/session/recorder_lifecycle_test.go b/internal/session/recorder_lifecycle_test.go new file mode 100644 index 0000000..adc29d7 --- /dev/null +++ b/internal/session/recorder_lifecycle_test.go @@ -0,0 +1,72 @@ +package session + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/genai-io/gen-code/internal/core" + "github.com/genai-io/gen-code/internal/session/transcript" +) + +// Regression: NewRecorder must write session.started before returning, so +// that subsequent telemetry writes from SetObserver replay don't beat it to +// the disk. If session.started were written lazily (or only by Store.Save), +// the transcript file would already exist when Save calls Start — Start +// would short-circuit and provider/model metadata would be lost. +func TestRecorder_WritesSessionStartedBeforeTelemetry(t *testing.T) { + dir := t.TempDir() + fs, err := transcript.NewFileStore(dir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore: %v", err) + } + // Construct the recorder — should write session.started synchronously. + rec := NewRecorder(RecorderOptions{ + FileStore: fs, + SessionID: "sess", + AgentID: "main", + Provider: "anthropic", + Model: "claude-x", + MaxTokens: 4096, + Cwd: "/tmp/project", + ProjectID: "proj-1", + }) + + // Fire a SystemChange right after — mirroring what SetObserver replay + // does inside core.NewAgent. + rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{ + Name: "identity", Slot: 0, Content: "You are X", Caller: "system:init", + }}) + + data, err := os.ReadFile(filepath.Join(dir, "transcripts", "sess.jsonl")) + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + var firstType, foundProvider, foundModel string + for i, line := range strings.Split(string(data), "\n") { + if line == "" { + continue + } + var raw map[string]any + if err := json.Unmarshal([]byte(line), &raw); err != nil { + t.Fatalf("line %d not JSON: %v", i, err) + } + if i == 0 { + firstType, _ = raw["type"].(string) + } + if t, _ := raw["type"].(string); t == "session.started" { + if sess, ok := raw["session"].(map[string]any); ok { + foundProvider, _ = sess["provider"].(string) + foundModel, _ = sess["model"].(string) + } + } + } + if firstType != "session.started" { + t.Fatalf("first record type = %q, want session.started", firstType) + } + if foundProvider != "anthropic" || foundModel != "claude-x" { + t.Fatalf("session.started provider=%q model=%q, want anthropic/claude-x", foundProvider, foundModel) + } +} diff --git a/internal/session/recorder_test.go b/internal/session/recorder_test.go new file mode 100644 index 0000000..2ef6ef6 --- /dev/null +++ b/internal/session/recorder_test.go @@ -0,0 +1,250 @@ +package session + +import ( + "bufio" + "context" + "encoding/json" + "os" + "strings" + "testing" + "time" + + "github.com/genai-io/gen-code/internal/core" + "github.com/genai-io/gen-code/internal/session/transcript" +) + +// One turn through PreInfer + PostInfer must produce one inference.requested +// and one inference.responded record, with the request's MessageIDs/digests +// preserved and the response's usage/stop_reason populated. The turn counter +// links the pair. +func TestRecorderWritesRequestedAndRespondedPerTurn(t *testing.T) { + dir := t.TempDir() + fs, err := transcript.NewFileStore(dir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore: %v", err) + } + if err := fs.Start(context.Background(), transcript.StartCommand{ + SessionID: "sess-1", Cwd: "/tmp", Provider: "anthropic", Model: "claude-x", Time: time.Now(), + }); err != nil { + t.Fatalf("Start: %v", err) + } + + rec := NewRecorder(RecorderOptions{ + FileStore: fs, SessionID: "sess-1", AgentID: "main", + Provider: "anthropic", Model: "claude-x", MaxTokens: 4096, + }) + + rec.OnAgentEvent(core.Event{Type: core.PreInfer, Source: "main", Data: core.InferenceContext{ + SystemDigest: "sha256:sys", ToolsDigest: "sha256:tools", MessageIDs: []string{"m1", "m2"}, + }}) + rec.OnAgentEvent(core.Event{Type: core.PostInfer, Source: "main", Data: &core.InferResponse{ + StopReason: core.StopEndTurn, TokensIn: 42, TokensOut: 8, CacheReadTokens: 10, + }}) + + tx, err := fs.Load(context.Background(), "sess-1") + if err != nil { + t.Fatalf("Load: %v", err) + } + // Project() flattens to messages; inspect raw records via a re-load instead. + // Easiest path: read the file directly via Load and look at the messages — + // but inference records aren't messages. Use the file to read raw records. + _ = tx + + // Read raw records from disk to assert on inference records. + records := readAllRecords(t, dir, "sess-1") + + var reqs, resps []transcript.Record + for _, r := range records { + switch r.Type { + case transcript.InferenceRequested: + reqs = append(reqs, r) + case transcript.InferenceResponded: + resps = append(resps, r) + } + } + if len(reqs) != 1 { + t.Fatalf("inference.requested count = %d, want 1", len(reqs)) + } + if len(resps) != 1 { + t.Fatalf("inference.responded count = %d, want 1", len(resps)) + } + + req := reqs[0].Inference + if req == nil { + t.Fatal("request record missing Inference payload") + } + if req.Turn != 1 { + t.Fatalf("requested.Turn = %d, want 1", req.Turn) + } + if req.SystemDigest != "sha256:sys" || req.ToolsDigest != "sha256:tools" { + t.Fatalf("requested digests = %+v", req) + } + if len(req.MessageIDs) != 2 || req.MessageIDs[0] != "m1" { + t.Fatalf("requested.MessageIDs = %+v", req.MessageIDs) + } + + resp := resps[0].Inference + if resp == nil { + t.Fatal("response record missing Inference payload") + } + if resp.Turn != 1 { + t.Fatalf("responded.Turn = %d, want 1 (must match request)", resp.Turn) + } + if resp.StopReason != string(core.StopEndTurn) { + t.Fatalf("responded.StopReason = %q", resp.StopReason) + } + if resp.Usage == nil || resp.Usage.InputTokens != 42 || resp.Usage.OutputTokens != 8 || resp.Usage.CacheReadTokens != 10 { + t.Fatalf("responded.Usage = %+v", resp.Usage) + } +} + +// System mutations must produce one system.section.added per added/replaced +// section and one system.section.removed per dropped section. Caller info is +// preserved verbatim so trace consumers can answer "who changed this?". +func TestRecorderWritesSystemSectionEvents(t *testing.T) { + dir := t.TempDir() + fs, err := transcript.NewFileStore(dir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore: %v", err) + } + if err := fs.Start(context.Background(), transcript.StartCommand{ + SessionID: "sess-sys", Cwd: "/tmp", Provider: "p", Model: "m", Time: time.Now(), + }); err != nil { + t.Fatalf("Start: %v", err) + } + + rec := NewRecorder(RecorderOptions{ + FileStore: fs, SessionID: "sess-sys", AgentID: "main", + Provider: "p", Model: "m", MaxTokens: 1, + }) + + rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{ + Name: "identity", Slot: 0, Content: "You are X", Caller: "system:init", + }}) + rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{ + Name: "identity", Slot: 0, Content: "You are Y", Caller: "command:/identity", + }}) + rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{ + Name: "policy", Removed: true, Caller: "test:teardown", + }}) + + records := readAllRecords(t, dir, "sess-sys") + var added, removed []transcript.Record + for _, r := range records { + switch r.Type { + case transcript.SystemSectionAdded: + added = append(added, r) + case transcript.SystemSectionRemoved: + removed = append(removed, r) + } + } + if len(added) != 2 { + t.Fatalf("system.section.added count = %d, want 2", len(added)) + } + if len(removed) != 1 { + t.Fatalf("system.section.removed count = %d, want 1", len(removed)) + } + if added[1].System.Caller != "command:/identity" || added[1].System.Content != "You are Y" { + t.Fatalf("replaced section payload = %+v", added[1].System) + } + if removed[0].System.Name != "policy" || removed[0].System.Caller != "test:teardown" { + t.Fatalf("removed section payload = %+v", removed[0].System) + } +} + +// Tool registry changes must produce one tools.added per Add and one +// tools.removed per Remove. Schema/Name and Caller are preserved. +func TestRecorderWritesToolsChangeEvents(t *testing.T) { + dir := t.TempDir() + fs, err := transcript.NewFileStore(dir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore: %v", err) + } + if err := fs.Start(context.Background(), transcript.StartCommand{ + SessionID: "sess-tools", Cwd: "/tmp", Provider: "p", Model: "m", Time: time.Now(), + }); err != nil { + t.Fatalf("Start: %v", err) + } + + rec := NewRecorder(RecorderOptions{ + FileStore: fs, SessionID: "sess-tools", AgentID: "main", + Provider: "p", Model: "m", MaxTokens: 1, + }) + + rec.OnAgentEvent(core.Event{Type: core.OnToolsChange, Data: core.ToolsChange{ + Schema: core.ToolSchema{Name: "Read", Description: "read a file"}, + Caller: "tools:init", + }}) + rec.OnAgentEvent(core.Event{Type: core.OnToolsChange, Data: core.ToolsChange{ + Schema: core.ToolSchema{Name: "Bash", Description: "run shell"}, + Caller: "mcp:Bash", + }}) + rec.OnAgentEvent(core.Event{Type: core.OnToolsChange, Data: core.ToolsChange{ + Name: "Bash", Removed: true, Caller: "mode:plan", + }}) + + records := readAllRecords(t, dir, "sess-tools") + var added, removed []transcript.Record + for _, r := range records { + switch r.Type { + case transcript.ToolsAdded: + added = append(added, r) + case transcript.ToolsRemoved: + removed = append(removed, r) + } + } + if len(added) != 2 { + t.Fatalf("tools.added count = %d, want 2", len(added)) + } + if len(removed) != 1 { + t.Fatalf("tools.removed count = %d, want 1", len(removed)) + } + if added[0].Tools.Schema == nil || added[0].Tools.Schema.Name != "Read" || added[0].Tools.Caller != "tools:init" { + t.Fatalf("first added record = %+v", added[0].Tools) + } + if removed[0].Tools.Name != "Bash" || removed[0].Tools.Caller != "mode:plan" { + t.Fatalf("removed record = %+v", removed[0].Tools) + } +} + +// A nil-safe recorder (no FileStore) must accept events without panicking. +func TestRecorderNilSafe(t *testing.T) { + var rec *Recorder + rec.OnAgentEvent(core.Event{Type: core.PreInfer}) + + empty := NewRecorder(RecorderOptions{}) + empty.OnAgentEvent(core.Event{Type: core.PreInfer}) +} + +func readAllRecords(t *testing.T, baseDir, sessionID string) []transcript.Record { + t.Helper() + fs, err := transcript.NewFileStore(baseDir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore (reread): %v", err) + } + path := fs.TranscriptPath(sessionID) + f, err := os.Open(path) + if err != nil { + t.Fatalf("open transcript: %v", err) + } + defer f.Close() + + var out []transcript.Record + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 1024*1024), 16*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + var r transcript.Record + if err := json.Unmarshal([]byte(line), &r); err != nil { + t.Fatalf("decode record: %v", err) + } + out = append(out, r) + } + if err := scanner.Err(); err != nil { + t.Fatalf("scan transcript: %v", err) + } + return out +} diff --git a/internal/session/service.go b/internal/session/service.go index 64f1625..04d13f9 100644 --- a/internal/session/service.go +++ b/internal/session/service.go @@ -26,6 +26,9 @@ type Service interface { LoadLatest() (*Snapshot, error) List() ([]*SessionMetadata, error) Fork(id string) (*Snapshot, error) + + // tracing + NewRecorder(agentID, provider, model string, maxTokens int) *Recorder } // Compile-time check: *Setup implements Service. diff --git a/internal/session/setup.go b/internal/session/setup.go index 1d2e621..7ce0248 100644 --- a/internal/session/setup.go +++ b/internal/session/setup.go @@ -134,3 +134,27 @@ func (s *Setup) Fork(id string) (*Snapshot, error) { } return st.Fork(id) } + +// NewRecorder binds a Recorder to the current session and transcript store. +// Returns nil if the store is not initialized — callers can pass the nil +// result through to core.Config.OnEvent safely because Recorder.OnAgentEvent +// is nil-safe. +func (s *Setup) NewRecorder(agentID, provider, model string, maxTokens int) *Recorder { + s.mu.RLock() + st := s.Store + sessionID := s.SessionID + s.mu.RUnlock() + if st == nil || st.transcriptStore == nil || sessionID == "" { + return nil + } + return NewRecorder(RecorderOptions{ + FileStore: st.transcriptStore, + SessionID: sessionID, + AgentID: agentID, + Provider: provider, + Model: model, + MaxTokens: maxTokens, + Cwd: st.cwd, + ProjectID: st.projectID, + }) +} diff --git a/internal/session/transcript/fs_store.go b/internal/session/transcript/fs_store.go index 7712829..3d4beb9 100644 --- a/internal/session/transcript/fs_store.go +++ b/internal/session/transcript/fs_store.go @@ -149,6 +149,78 @@ func (s *FileStore) PatchState(ctx context.Context, cmd PatchStateCommand) error return s.refreshIndexLocked(cmd.SessionID) } +func (s *FileStore) AppendTools(ctx context.Context, cmd AppendToolsCommand) error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := ctx.Err(); err != nil { + return err + } + if cmd.Type != ToolsAdded && cmd.Type != ToolsRemoved { + return fmt.Errorf("append tools: unexpected type %q", cmd.Type) + } + + rec := cmd.Record + full := Record{ + ID: fmt.Sprintf("%s:tools:%d", cmd.SessionID, cmd.Time.UnixNano()), + SessionID: cmd.SessionID, + Time: cmd.Time, + Type: cmd.Type, + AgentID: cmd.AgentID, + Tools: &rec, + } + return s.appendRecord(s.transcriptPath(cmd.SessionID), full, false) +} + +func (s *FileStore) AppendSystemSection(ctx context.Context, cmd AppendSystemSectionCommand) error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := ctx.Err(); err != nil { + return err + } + if cmd.Type != SystemSectionAdded && cmd.Type != SystemSectionRemoved { + return fmt.Errorf("append system section: unexpected type %q", cmd.Type) + } + + rec := cmd.Record + full := Record{ + ID: fmt.Sprintf("%s:system:%d", cmd.SessionID, cmd.Time.UnixNano()), + SessionID: cmd.SessionID, + Time: cmd.Time, + Type: cmd.Type, + AgentID: cmd.AgentID, + System: &rec, + } + return s.appendRecord(s.transcriptPath(cmd.SessionID), full, false) +} + +func (s *FileStore) AppendInference(ctx context.Context, cmd AppendInferenceCommand) error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := ctx.Err(); err != nil { + return err + } + if cmd.Type != InferenceRequested && cmd.Type != InferenceResponded { + return fmt.Errorf("append inference: unexpected type %q", cmd.Type) + } + + rec := cmd.Record + full := Record{ + ID: fmt.Sprintf("%s:inference:%d", cmd.SessionID, cmd.Time.UnixNano()), + SessionID: cmd.SessionID, + Time: cmd.Time, + Type: cmd.Type, + AgentID: cmd.AgentID, + Inference: &rec, + } + // inference.responded sync-flushes the file so any preceding telemetry + // (system.section.*, tools.*, inference.requested) from this turn lands + // on disk together; inference.requested itself stays in the page cache. + return s.appendRecord(s.transcriptPath(cmd.SessionID), full, cmd.Type == InferenceResponded) +} + func (s *FileStore) Compact(ctx context.Context, cmd CompactCommand) error { s.mu.Lock() defer s.mu.Unlock() @@ -355,15 +427,19 @@ func (s *FileStore) indexPath() string { } // appendRecord writes one record to the JSONL. fsync is gated by sync so -// hot-path telemetry can be buffered in the OS page cache and rolled up to -// disk at turn boundaries. +// hot-path telemetry (system.section.*, tools.*, inference.requested) can be +// buffered in the OS page cache and rolled up to disk at turn boundaries. // // Durability classes: -// - sync=true: user input (message.appended), lifecycle events -// (session.started, session.compacted), and turn-completion writes. +// - sync=true: user input (message.appended), turn-completion +// (inference.responded), lifecycle events (session.started, session.compacted). // A crash after these must not lose them. -// - sync=false: pure telemetry (state.patched today; more in follow-up -// commits). Worst case on crash: in-flight turn's telemetry is lost. +// - sync=false: pure telemetry. Worst case on crash: the in-flight turn's +// telemetry is lost, but the message/state of preceding turns is intact +// because their write fsynced. +// +// Single-process per file: the append+close pair preserves order regardless +// of fsync; durability is the only thing the flag toggles. func (s *FileStore) appendRecord(path string, rec Record, sync bool) error { if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return fmt.Errorf("create transcript dir: %w", err) diff --git a/internal/session/transcript/records.go b/internal/session/transcript/records.go index ef489b2..4ae5c8c 100644 --- a/internal/session/transcript/records.go +++ b/internal/session/transcript/records.go @@ -8,11 +8,17 @@ import ( // Record type values follow . (past tense), lowercase, // dot-separated. See docs/tracing.md for the full taxonomy. const ( - SessionStarted = "session.started" - SessionForked = "session.forked" - SessionCompacted = "session.compacted" - MessageAppended = "message.appended" - StatePatched = "state.patched" + SessionStarted = "session.started" + SessionForked = "session.forked" + SessionCompacted = "session.compacted" + MessageAppended = "message.appended" + StatePatched = "state.patched" + InferenceRequested = "inference.requested" + InferenceResponded = "inference.responded" + SystemSectionAdded = "system.section.added" + SystemSectionRemoved = "system.section.removed" + ToolsAdded = "tools.added" + ToolsRemoved = "tools.removed" ) const ( @@ -37,9 +43,12 @@ type Record struct { GitBranch string `json:"gitBranch,omitempty"` AgentID string `json:"agentId,omitempty"` - Message *MessageRecord `json:"message,omitempty"` - State *StateRecord `json:"state,omitempty"` - Session *SessionRecord `json:"session,omitempty"` + Message *MessageRecord `json:"message,omitempty"` + State *StateRecord `json:"state,omitempty"` + Session *SessionRecord `json:"session,omitempty"` + Inference *InferenceRecord `json:"inference,omitempty"` + System *SystemSectionRecord `json:"system,omitempty"` + Tools *ToolsRecord `json:"tools,omitempty"` } type MessageRecord struct { @@ -68,6 +77,63 @@ type SessionRecord struct { BoundaryID string `json:"boundaryId,omitempty"` } +// InferenceRecord carries the payload for inference.requested / +// inference.responded. The "requested" side captures the digests of what was +// sent to the LLM (system prompt, tools, active message chain); the "responded" +// side captures stop reason, latency, and token usage. Big fields live in the +// digests — full payloads are reconstructible by replaying preceding records. +type InferenceRecord struct { + Turn int `json:"turn"` + Provider string `json:"provider,omitempty"` + Model string `json:"model,omitempty"` + MaxTokens int `json:"maxTokens,omitempty"` + SystemDigest string `json:"systemDigest,omitempty"` + ToolsDigest string `json:"toolsDigest,omitempty"` + + // MessageIDs is the active chain at request time, in send order. + // Recorded on inference.requested only. + MessageIDs []string `json:"messageIds,omitempty"` + + // Response fields — populated on inference.responded only. + StopReason string `json:"stopReason,omitempty"` + LatencyMs int64 `json:"latencyMs,omitempty"` + Usage *InferenceUsage `json:"usage,omitempty"` +} + +type InferenceUsage struct { + InputTokens int `json:"inputTokens"` + OutputTokens int `json:"outputTokens"` + CacheCreateTokens int `json:"cacheCreateTokens,omitempty"` + CacheReadTokens int `json:"cacheReadTokens,omitempty"` +} + +// SystemSectionRecord carries the payload for system.section.added / +// system.section.removed. On removal, Content is empty. +type SystemSectionRecord struct { + Name string `json:"name"` + Slot int `json:"slot,omitempty"` + Content string `json:"content,omitempty"` + Caller string `json:"caller,omitempty"` +} + +// ToolSchemaView mirrors a tool schema in transcript-local types so the +// transcript package stays free of cross-package imports. Recorder converts +// from the runtime ToolSchema before writing. +type ToolSchemaView struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Parameters json.RawMessage `json:"parameters,omitempty"` +} + +// ToolsRecord carries the payload for tools.added / tools.removed. +// One tool per record (Add/Remove fire individually); Schema is set on +// "added", Name on "removed". +type ToolsRecord struct { + Schema *ToolSchemaView `json:"schema,omitempty"` + Name string `json:"name,omitempty"` + Caller string `json:"caller,omitempty"` +} + type ContentBlock struct { Type string `json:"type"` diff --git a/internal/session/transcript/store.go b/internal/session/transcript/store.go index d542a9c..ddc1978 100644 --- a/internal/session/transcript/store.go +++ b/internal/session/transcript/store.go @@ -51,6 +51,35 @@ type ForkCommand struct { Time time.Time } +// AppendInferenceCommand writes either an inference.requested or +// inference.responded record. Type selects which; Record carries the payload. +type AppendInferenceCommand struct { + SessionID string + AgentID string + Time time.Time + Type string // InferenceRequested or InferenceResponded + Record InferenceRecord +} + +// AppendSystemSectionCommand writes system.section.added or +// system.section.removed. Removed sections drop Content from Record. +type AppendSystemSectionCommand struct { + SessionID string + AgentID string + Time time.Time + Type string // SystemSectionAdded or SystemSectionRemoved + Record SystemSectionRecord +} + +// AppendToolsCommand writes tools.added or tools.removed. +type AppendToolsCommand struct { + SessionID string + AgentID string + Time time.Time + Type string // ToolsAdded or ToolsRemoved + Record ToolsRecord +} + type ListOptions struct { Limit int IncludeSidechain bool diff --git a/internal/subagent/executor.go b/internal/subagent/executor.go index fc7543a..5aa6a49 100644 --- a/internal/subagent/executor.go +++ b/internal/subagent/executor.go @@ -285,7 +285,7 @@ func (e *Executor) buildAgent(ctx context.Context, rc *runConfig, agentCwd strin if e.mcpRegistry != nil { mcpCaller := mcp.NewCaller(e.mcpRegistry) for _, t := range mcp.AsCoreTools(schemas, mcpCaller) { - tools.Add(t) + tools.Add(t, "mcp:"+t.Name()) } } diff --git a/internal/subagent/executor_test.go b/internal/subagent/executor_test.go index eb40977..2da6cbe 100644 --- a/internal/subagent/executor_test.go +++ b/internal/subagent/executor_test.go @@ -554,7 +554,9 @@ func (s *stubLLM) InputLimit() int { return 0 } // stubSystem is a minimal core.System for tests. type stubSystem struct{} -func (s *stubSystem) Prompt() string { return "" } -func (s *stubSystem) Use(_ core.Section) {} -func (s *stubSystem) Drop(_ string) {} -func (s *stubSystem) Refresh(_ string) {} +func (s *stubSystem) Prompt() string { return "" } +func (s *stubSystem) Use(_ core.Section, _ string) {} +func (s *stubSystem) Drop(_, _ string) {} +func (s *stubSystem) Refresh(_, _ string) {} +func (s *stubSystem) Sections() []core.Section { return nil } +func (s *stubSystem) SetObserver(_ func(core.SystemChange)) {} diff --git a/internal/subagent/progress_tools.go b/internal/subagent/progress_tools.go index 374b1e3..9a852ec 100644 --- a/internal/subagent/progress_tools.go +++ b/internal/subagent/progress_tools.go @@ -19,10 +19,11 @@ func (p *progressTools) Get(name string) core.Tool { } return &progressTool{inner: t, onExec: p.onExec} } -func (p *progressTools) All() []core.Tool { return p.inner.All() } -func (p *progressTools) Add(t core.Tool) { p.inner.Add(t) } -func (p *progressTools) Remove(name string) { p.inner.Remove(name) } -func (p *progressTools) Schemas() []core.ToolSchema { return p.inner.Schemas() } +func (p *progressTools) All() []core.Tool { return p.inner.All() } +func (p *progressTools) Add(t core.Tool, caller string) { p.inner.Add(t, caller) } +func (p *progressTools) Remove(name, caller string) { p.inner.Remove(name, caller) } +func (p *progressTools) Schemas() []core.ToolSchema { return p.inner.Schemas() } +func (p *progressTools) SetObserver(fn func(core.ToolsChange)) { p.inner.SetObserver(fn) } type progressTool struct { inner core.Tool diff --git a/internal/tool/permission.go b/internal/tool/permission.go index f574837..a774257 100644 --- a/internal/tool/permission.go +++ b/internal/tool/permission.go @@ -32,10 +32,11 @@ func (pt *permissionTools) Get(name string) core.Tool { return &permissionTool{inner: t, check: pt.check} } -func (pt *permissionTools) All() []core.Tool { return pt.inner.All() } -func (pt *permissionTools) Add(tool core.Tool) { pt.inner.Add(tool) } -func (pt *permissionTools) Remove(name string) { pt.inner.Remove(name) } -func (pt *permissionTools) Schemas() []core.ToolSchema { return pt.inner.Schemas() } +func (pt *permissionTools) All() []core.Tool { return pt.inner.All() } +func (pt *permissionTools) Add(tool core.Tool, caller string) { pt.inner.Add(tool, caller) } +func (pt *permissionTools) Remove(name, caller string) { pt.inner.Remove(name, caller) } +func (pt *permissionTools) Schemas() []core.ToolSchema { return pt.inner.Schemas() } +func (pt *permissionTools) SetObserver(fn func(core.ToolsChange)) { pt.inner.SetObserver(fn) } // permissionTool wraps a single core.Tool with permission checking. type permissionTool struct {