From f0e52cbf6d0f2a2f492fc6162197426e914e801f Mon Sep 17 00:00:00 2001 From: Meng Yan Date: Fri, 15 May 2026 18:00:23 +0800 Subject: [PATCH 1/2] feat: trace recorder for inference / system / tools + content provenance Adds session.Recorder, a synchronous observer on core.Agent's event bus that translates lifecycle events into transcript records. Wired via core.Config.OnEvent (new BuildParams.OnEvent passthrough; constructed by Session.NewRecorder at agent start). New records per turn: inference.requested digests of system prompt / tool list / message chain inference.responded stop reason, latency, token usage System observability: Use(sec, caller), Drop(name, caller) on core.System require explicit caller strings (system:init, command:identity, subagent:init). System.SetObserver replays existing sections on attach so the event chain is complete from t0. Recorder writes system.section.added/removed. Tools observability: Same pattern on core.Tools: Add/Remove gain caller, SetObserver replays. MCP registrations pass caller="mcp:". Both wrappers (permissionTools, progressTools) pass-through. Recorder writes tools.added (with schema) / tools.removed (with name). Content provenance: splitTextByProvenance splits user-message content on boundaries and tags those blocks with Source="reminder". Round-trip safe: extractUserContent concatenates back, preserving core.Message.Content byte-for-byte. Non-blocking telemetry: agent.emitTelemetry uses select-default on the outbox so observer-fired events never deadlock the agent goroutine on a slow TUI consumer. Tests: 4 new Recorder tests + 2 provenance tests. --- internal/agent/build.go | 7 +- internal/app/agent.go | 9 +- internal/core/agent.go | 77 ++++++- internal/core/agent_impl.go | 38 +++- internal/core/digest.go | 51 +++++ internal/core/system.go | 21 +- internal/core/system/builder_test.go | 4 +- internal/core/system/catalog.go | 25 +-- internal/core/system_impl.go | 96 ++++++++- internal/core/tool.go | 12 +- internal/core/tool_impl.go | 44 +++- internal/session/message_convert.go | 54 ++++- internal/session/message_convert_test.go | 50 +++++ internal/session/recorder.go | 233 +++++++++++++++++++++ internal/session/recorder_test.go | 250 +++++++++++++++++++++++ internal/session/service.go | 3 + internal/session/setup.go | 22 ++ internal/session/transcript/fs_store.go | 88 +++++++- internal/session/transcript/records.go | 82 +++++++- internal/session/transcript/store.go | 29 +++ internal/subagent/executor.go | 2 +- internal/subagent/executor_test.go | 10 +- internal/subagent/progress_tools.go | 9 +- internal/tool/permission.go | 9 +- 24 files changed, 1149 insertions(+), 76 deletions(-) create mode 100644 internal/core/digest.go create mode 100644 internal/session/recorder.go create mode 100644 internal/session/recorder_test.go diff --git a/internal/agent/build.go b/internal/agent/build.go index c93fecc8..e22138a3 100644 --- a/internal/agent/build.go +++ b/internal/agent/build.go @@ -39,6 +39,10 @@ type BuildParams struct { PermissionDecider PermDecisionFunc InteractionFunc tool.InteractionFunc ToolProgress func(toolCallID string, msg string) + + // OnEvent observes every agent lifecycle event synchronously, alongside + // outbox delivery. Used by the trace recorder; nil leaves recording off. + OnEvent func(core.Event) } func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) { @@ -87,7 +91,7 @@ func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) { })) tools := tool.AdaptToolRegistry(schemas, cwdFunc, adaptOpts...) for _, t := range p.MCPTools { - tools.Add(t) + tools.Add(t, "mcp:"+t.Name()) } compactClient := client @@ -111,6 +115,7 @@ func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) { Tools: tool.WithPermission(tools, pb.PermissionFunc()), CompactFunc: compactFunc, CWD: p.CWD, + OnEvent: p.OnEvent, }) return ag, pb, nil diff --git a/internal/app/agent.go b/internal/app/agent.go index a70ed164..9253895d 100644 --- a/internal/app/agent.go +++ b/internal/app/agent.go @@ -61,11 +61,18 @@ func (m *model) buildAgentParams() agent.BuildParams { mcpTools = mcp.AsCoreTools(schemas, mcpCaller) } + maxTokens := kit.GetMaxTokens(m.services.LLM.Store(), m.env.CurrentModel, setting.DefaultMaxTokens) + var onEvent func(core.Event) + if rec := m.services.Session.NewRecorder("main", m.env.LLMProvider.Name(), m.env.GetModelID(), maxTokens); rec != nil { + onEvent = rec.OnAgentEvent + } + return agent.BuildParams{ Provider: m.env.LLMProvider, ModelID: m.env.GetModelID(), - MaxTokens: kit.GetMaxTokens(m.services.LLM.Store(), m.env.CurrentModel, setting.DefaultMaxTokens), + MaxTokens: maxTokens, ThinkingEffort: m.env.EffectiveThinkingEffort(), + OnEvent: onEvent, CWD: m.env.CWD, CWDFunc: func() string { return m.env.CWD }, diff --git a/internal/core/agent.go b/internal/core/agent.go index 8df891cb..650333d2 100644 --- a/internal/core/agent.go +++ b/internal/core/agent.go @@ -133,7 +133,7 @@ func NewAgent(cfg Config) Agent { outbox = make(chan Event, cfg.OutboxBuf) } - return &agent{ + a := &agent{ id: cfg.ID, agentType: cfg.AgentType, color: cfg.Color, @@ -148,6 +148,16 @@ func NewAgent(cfg Config) Agent { outbox: outbox, onEvent: cfg.OnEvent, } + // Mirror system + tools mutations onto the event bus. Attach after + // construction so each registry replays its initial members back to the + // observer — the recorder sees a complete event chain from t0. + cfg.System.SetObserver(func(c SystemChange) { + a.emitTelemetry(SystemChangeEvent(a.id, c)) + }) + cfg.Tools.SetObserver(func(c ToolsChange) { + a.emitTelemetry(ToolsChangeEvent(a.id, c)) + }) + return a } // Result represents the outcome of one completed turn (end_turn). @@ -178,6 +188,15 @@ const ( OnMessage EventType = "Message" // message received on inbox (Message in Data) OnTurn EventType = "Turn" // think+act cycle completed (Result in Data) OnCompact EventType = "Compact" // conversation compacted (CompactInfo in Data) + + // OnSystemChange fires when a system-prompt section is added, replaced, + // or removed. Data is SystemChange. Non-critical telemetry — never blocks + // the outbox on backpressure. + OnSystemChange EventType = "SystemChange" + + // OnToolsChange fires when a tool is registered or unregistered. Data is + // ToolsChange. Like OnSystemChange, non-blocking telemetry. + OnToolsChange EventType = "ToolsChange" ) // Event carries context for an agent lifecycle point. @@ -205,6 +224,50 @@ type CompactInfo struct { OriginalCount int } +// InferenceContext is the PreInfer payload — what was about to be sent to the +// LLM, expressed as content-addressed digests so consumers (trace recorder, +// debug logger) can reference inputs without copying them on every turn. +type InferenceContext struct { + SystemDigest string // sha256 of rendered system prompt + ToolsDigest string // sha256 of canonicalized tool schemas + MessageIDs []string // active chain at request time, in send order +} + +func (e Event) InferenceContext() (InferenceContext, bool) { + ic, ok := e.Data.(InferenceContext) + return ic, ok +} + +// SystemChange describes one mutation to the system prompt's section map. +// Emitted on Use/Drop. The recorder translates these into +// system.section.added / system.section.removed records. +type SystemChange struct { + Name string // section name (stable across mutations) + Slot int // render slot + Content string // rendered content; empty when Removed + Removed bool // true on Drop, false on Use + Caller string // who triggered the mutation (e.g. "system:init", "command:/identity") +} + +func (e Event) SystemChange() (SystemChange, bool) { + c, ok := e.Data.(SystemChange) + return c, ok +} + +// ToolsChange describes one mutation to the tool registry. On removal, +// Schema.Name carries the dropped tool's name and other fields are zero. +type ToolsChange struct { + Schema ToolSchema // populated on Add (zero on Remove) + Name string // populated on Remove (empty on Add) + Removed bool // true on Remove, false on Add + Caller string // who triggered the mutation +} + +func (e Event) ToolsChange() (ToolsChange, bool) { + c, ok := e.Data.(ToolsChange) + return c, ok +} + // Typed event constructors — enforce correct Data types at construction. func StartEvent(agentID string) Event { return Event{Type: OnStart, Source: agentID} } @@ -214,7 +277,9 @@ func StopEvent(agentID string, err error) Event { func ChunkEvent(agentID string, c Chunk) Event { return Event{Type: OnChunk, Source: agentID, Data: c} } func MessageEvent(msg Message) Event { return Event{Type: OnMessage, Source: msg.From, Data: msg} } func TurnEvent(agentID string, r Result) Event { return Event{Type: OnTurn, Source: agentID, Data: r} } -func PreInferEvent(agentID string) Event { return Event{Type: PreInfer, Source: agentID} } +func PreInferEvent(agentID string, ctx InferenceContext) Event { + return Event{Type: PreInfer, Source: agentID, Data: ctx} +} func PostInferEvent(agentID string, r *InferResponse) Event { return Event{Type: PostInfer, Source: agentID, Data: r} } @@ -223,3 +288,11 @@ func PostToolEvent(tr ToolResult) Event { return Event{Type: PostTool, Source: t func CompactEvent(agentID string, info CompactInfo) Event { return Event{Type: OnCompact, Source: agentID, Data: info} } + +func SystemChangeEvent(agentID string, c SystemChange) Event { + return Event{Type: OnSystemChange, Source: agentID, Data: c} +} + +func ToolsChangeEvent(agentID string, c ToolsChange) Event { + return Event{Type: OnToolsChange, Source: agentID, Data: c} +} diff --git a/internal/core/agent_impl.go b/internal/core/agent_impl.go index 725d5942..fc4f2125 100644 --- a/internal/core/agent_impl.go +++ b/internal/core/agent_impl.go @@ -210,8 +210,6 @@ func (a *agent) ThinkAct(ctx context.Context) (*Result, error) { } } - a.emit(ctx, PreInferEvent(a.id)) - resp, err := a.streamInfer(ctx) if err != nil { // Reactive compaction: if prompt too long, compact and retry @@ -424,11 +422,24 @@ func ToolCallIDFromContext(ctx context.Context) string { // --- internals --- // streamInfer calls the LLM, streams chunks to outbox, returns the final response. +// +// Emits PreInferEvent (with input digests) before the LLM call so observers +// can record exactly what was sent without copying the bytes on every turn. func (a *agent) streamInfer(ctx context.Context) (*InferResponse, error) { + sys := a.system.Prompt() + msgs := a.snapshot() + tools := a.tools.Schemas() + + a.emit(ctx, PreInferEvent(a.id, InferenceContext{ + SystemDigest: sha256Hex([]byte(sys)), + ToolsDigest: toolsDigest(tools), + MessageIDs: messageIDs(msgs), + })) + chunks, err := a.llm.Infer(ctx, InferRequest{ - System: a.system.Prompt(), - Messages: a.snapshot(), - Tools: a.tools.Schemas(), + System: sys, + Messages: msgs, + Tools: tools, }) if err != nil { return nil, fmt.Errorf("infer: %w", err) @@ -475,6 +486,23 @@ func (a *agent) emit(ctx context.Context, event Event) { } } +// emitTelemetry delivers a fire-and-forget event: synchronously to onEvent, +// non-blocking to the outbox (dropped if full). Used for events whose +// consumers tolerate misses (system changes, hot-path tracing) and which can +// fire from goroutines without a useful ctx (e.g. system observer callbacks). +func (a *agent) emitTelemetry(event Event) { + if a.onEvent != nil { + a.onEvent(event) + } + if a.outbox == nil || a.closed.Load() { + return + } + select { + case a.outbox <- event: + default: + } +} + // emitFinal sends a critical event that must be delivered even on ctx cancellation. // Used for StopEvent — consumers rely on it for cleanup/session saving. // No-op when outbox is nil. Blocks up to 5 seconds; logs a warning if delivery fails. diff --git a/internal/core/digest.go b/internal/core/digest.go new file mode 100644 index 00000000..1312b276 --- /dev/null +++ b/internal/core/digest.go @@ -0,0 +1,51 @@ +package core + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "sort" +) + +// sha256Hex returns "sha256:" + lowercase hex of the SHA-256 sum of b. +// The prefix makes algorithm choice explicit on the wire. +func sha256Hex(b []byte) string { + sum := sha256.Sum256(b) + return "sha256:" + hex.EncodeToString(sum[:]) +} + +// toolsDigest canonicalizes a tool schema list (sort by Name, marshal each) +// and returns its sha256. Stable across runs as long as schemas are stable. +func toolsDigest(schemas []ToolSchema) string { + if len(schemas) == 0 { + return sha256Hex(nil) + } + sorted := make([]ToolSchema, len(schemas)) + copy(sorted, schemas) + sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name }) + + b, err := json.Marshal(sorted) + if err != nil { + // Marshal can only fail on unsupported types; ToolSchema fields are + // JSON-safe. Fall back to a digest of the names so the value is still + // stable rather than empty. + names := make([]string, len(sorted)) + for i, s := range sorted { + names[i] = s.Name + } + b, _ = json.Marshal(names) + } + return sha256Hex(b) +} + +// messageIDs extracts non-empty message IDs from the conversation snapshot, +// in send order. Empty IDs (legacy data) are skipped rather than padded. +func messageIDs(msgs []Message) []string { + out := make([]string, 0, len(msgs)) + for _, m := range msgs { + if m.ID != "" { + out = append(out, m.ID) + } + } + return out +} diff --git a/internal/core/system.go b/internal/core/system.go index 8267f7ab..271e6ff0 100644 --- a/internal/core/system.go +++ b/internal/core/system.go @@ -19,13 +19,26 @@ type System interface { // invalidated only when sections change. Prompt() string - // Use registers or replaces a section by Name. - Use(Section) + // Use registers or replaces a section by Name. Caller is a short tag + // describing what triggered the mutation (e.g. "system:init", + // "command:/identity") and surfaces in the trace. + Use(sec Section, caller string) // Drop removes a section by Name. No-op if absent. - Drop(name string) + Drop(name, caller string) // Refresh marks one section's rendered output stale. Use after the // section's underlying state changed but the Section value did not. - Refresh(name string) + Refresh(name, caller string) + + // Sections returns a snapshot of currently registered sections in + // render order (slot ascending, insertion order ascending). Used by + // observers that attach after construction to replay the existing state. + Sections() []Section + + // SetObserver installs a callback invoked synchronously on every + // subsequent mutation. Attaching also replays existing sections as + // "added" events so the observer sees a complete history starting + // from the moment of attachment. + SetObserver(fn func(SystemChange)) } diff --git a/internal/core/system/builder_test.go b/internal/core/system/builder_test.go index 21216530..35e4a05f 100644 --- a/internal/core/system/builder_test.go +++ b/internal/core/system/builder_test.go @@ -188,13 +188,13 @@ func TestSystemUseDropRefresh(t *testing.T) { sys.Use(core.Section{ Slot: core.SlotEnvironment, Name: "test-section", Source: core.Dynamic, Render: func() string { return "TEST_SECTION_BODY" }, - }) + }, "test") if !strings.Contains(sys.Prompt(), "TEST_SECTION_BODY") { t.Error("Use should add a new section's content to Prompt()") } // Drop: remove it. - sys.Drop("test-section") + sys.Drop("test-section", "test") if strings.Contains(sys.Prompt(), "TEST_SECTION_BODY") { t.Error("Drop should remove the section from Prompt()") } diff --git a/internal/core/system/catalog.go b/internal/core/system/catalog.go index 405b430a..22741e9c 100644 --- a/internal/core/system/catalog.go +++ b/internal/core/system/catalog.go @@ -57,13 +57,14 @@ func loadEmbedOptional(path string) string { // applyDefaults registers the always-on sections for a Scope. // Options passed to Build can override identity by Name. func applyDefaults(sys core.System, scope core.Scope) { - sys.Use(defaultIdentity()) - sys.Use(policy()) - sys.Use(guidelines("tools", cachedTools)) + const caller = "system:init" + sys.Use(defaultIdentity(), caller) + sys.Use(policy(), caller) + sys.Use(guidelines("tools", cachedTools), caller) if scope == core.ScopeMain { // Task tracking + interactive questions are main-agent behaviors. - sys.Use(guidelines("tasks", cachedTasks)) - sys.Use(guidelines("questions", cachedQuestions)) + sys.Use(guidelines("tasks", cachedTasks), caller) + sys.Use(guidelines("questions", cachedQuestions), caller) } } @@ -139,7 +140,7 @@ func WithIdentity(text string) Option { if text == "" { return } - sys.Use(identitySection(text)) + sys.Use(identitySection(text), "system:init") } } @@ -148,10 +149,10 @@ func WithIdentity(text string) Option { func SwapIdentity(sys core.System, text string) { text = strings.TrimSpace(text) if text == "" { - sys.Use(defaultIdentity()) + sys.Use(defaultIdentity(), "command:identity") return } - sys.Use(identitySection(text)) + sys.Use(identitySection(text), "command:identity") } // identitySection builds the slot-0 identity Section for a user-defined persona. @@ -178,7 +179,7 @@ func WithProvider(name string) Option { Render: func() string { return wrap("provider", map[string]string{"name": name}, body) }, - }) + }, "system:init") } } @@ -188,7 +189,7 @@ func WithGitGuidelines(isGit bool) Option { if !isGit { return } - sys.Use(guidelines("git", cachedGit)) + sys.Use(guidelines("git", cachedGit), "system:init") } } @@ -216,7 +217,7 @@ func WithSubagentIdentity(b SubagentBrief) Option { sys.Use(core.Section{ Slot: core.SlotIdentity, Name: "identity", Source: core.Injected, Render: func() string { return renderSubagentIdentity(b) }, - }) + }, "subagent:init") } } @@ -278,7 +279,7 @@ func WithEnvironment(env Environment) Option { sys.Use(core.Section{ Slot: core.SlotEnvironment, Name: "environment", Source: core.Dynamic, Render: func() string { return renderEnvironment(env) }, - }) + }, "system:init") } } diff --git a/internal/core/system_impl.go b/internal/core/system_impl.go index 4cda4d61..b41cb270 100644 --- a/internal/core/system_impl.go +++ b/internal/core/system_impl.go @@ -21,6 +21,11 @@ type system struct { counter int // monotonic insertion sequence cached string dirty bool + + // observer is invoked synchronously on every Use/Drop after the mutation + // applies. Set via SetObserver; nil until then. Reads happen on the same + // goroutine that mutated, so no extra locking around the call. + observer func(SystemChange) } type sectionEntry struct { @@ -57,9 +62,8 @@ func (s *system) Prompt() string { return s.cached } -func (s *system) Use(sec Section) { +func (s *system) Use(sec Section, caller string) { s.mu.Lock() - defer s.mu.Unlock() if e, ok := s.sections[sec.Name]; ok { // Replacing an existing section: preserve its insertion order so // position in the prompt does not jump around on hot updates. @@ -70,24 +74,102 @@ func (s *system) Use(sec Section) { s.sections[sec.Name] = §ionEntry{def: sec, inserted: s.counter} } s.dirty = true + obs := s.observer + s.mu.Unlock() + + if obs != nil { + obs(SystemChange{ + Name: sec.Name, + Slot: int(sec.Slot), + Content: renderSection(sec), + Caller: caller, + }) + } } -func (s *system) Drop(name string) { +func (s *system) Drop(name, caller string) { s.mu.Lock() - defer s.mu.Unlock() - if _, ok := s.sections[name]; ok { + _, existed := s.sections[name] + if existed { delete(s.sections, name) s.dirty = true } + obs := s.observer + s.mu.Unlock() + + if existed && obs != nil { + obs(SystemChange{Name: name, Removed: true, Caller: caller}) + } } -func (s *system) Refresh(name string) { +func (s *system) Refresh(name, caller string) { s.mu.Lock() - defer s.mu.Unlock() + var ( + obs func(SystemChange) + changed bool + sec Section + ) if e, ok := s.sections[name]; ok { e.fresh = false s.dirty = true + changed = true + sec = e.def + obs = s.observer + } + s.mu.Unlock() + + if changed && obs != nil { + obs(SystemChange{ + Name: sec.Name, + Slot: int(sec.Slot), + Content: renderSection(sec), + Caller: caller, + }) + } +} + +// Sections returns a snapshot of currently registered sections in render order. +func (s *system) Sections() []Section { + s.mu.RLock() + defer s.mu.RUnlock() + entries := s.sortedEntries() + out := make([]Section, 0, len(entries)) + for _, e := range entries { + out = append(out, e.def) + } + return out +} + +// SetObserver attaches an observer for section mutations. On attach, the +// existing sections are replayed as synthetic "added" events so the observer +// sees a complete history starting from this moment. +func (s *system) SetObserver(fn func(SystemChange)) { + s.mu.Lock() + s.observer = fn + entries := s.sortedEntries() + s.mu.Unlock() + + if fn == nil { + return + } + for _, e := range entries { + fn(SystemChange{ + Name: e.def.Name, + Slot: int(e.def.Slot), + Content: renderSection(e.def), + Caller: "system:init", + }) + } +} + +// renderSection invokes a Section's Render func once, returning "" on nil. +// Used by observers that need the rendered content without going through the +// cached-Prompt path (which joins all sections). +func renderSection(sec Section) string { + if sec.Render == nil { + return "" } + return sec.Render() } // sortedEntries returns entries in render order (Slot, insertion order). diff --git a/internal/core/tool.go b/internal/core/tool.go index f353e2dc..eb6cacc7 100644 --- a/internal/core/tool.go +++ b/internal/core/tool.go @@ -34,7 +34,15 @@ type ToolSchema struct { type Tools interface { Get(name string) Tool All() []Tool - Add(tool Tool) - Remove(name string) + // Add registers (or replaces) a tool. Caller tags the mutation source + // (e.g. "mcp:weather", "agent:init") for trace records. + Add(tool Tool, caller string) + // Remove unregisters a tool by name. No-op if absent. + Remove(name, caller string) Schemas() []ToolSchema + + // SetObserver installs a callback invoked synchronously on every + // subsequent Add/Remove. Attaching also replays existing tools as + // synthetic Add events so the observer sees the full registry from t0. + SetObserver(fn func(ToolsChange)) } diff --git a/internal/core/tool_impl.go b/internal/core/tool_impl.go index 4080c441..589b8630 100644 --- a/internal/core/tool_impl.go +++ b/internal/core/tool_impl.go @@ -13,6 +13,8 @@ type toolSet struct { tools map[string]Tool dirty bool // true when schemas cache needs rebuild cache []ToolSchema // cached schemas + + observer func(ToolsChange) // invoked after Add/Remove; nil until SetObserver } // NewTools creates an empty tool set. @@ -44,20 +46,52 @@ func (s *toolSet) All() []Tool { return out } -func (s *toolSet) Add(tool Tool) { +func (s *toolSet) Add(tool Tool, caller string) { s.mu.Lock() - defer s.mu.Unlock() s.tools[tool.Name()] = tool s.dirty = true + obs := s.observer + s.mu.Unlock() + + if obs != nil { + obs(ToolsChange{Schema: tool.Schema(), Caller: caller}) + } } -func (s *toolSet) Remove(name string) { +func (s *toolSet) Remove(name, caller string) { s.mu.Lock() - defer s.mu.Unlock() - if _, ok := s.tools[name]; ok { + _, existed := s.tools[name] + if existed { delete(s.tools, name) s.dirty = true } + obs := s.observer + s.mu.Unlock() + + if existed && obs != nil { + obs(ToolsChange{Name: name, Removed: true, Caller: caller}) + } +} + +// SetObserver attaches a callback invoked on every Add/Remove. On attach, +// existing tools are replayed as Add events with caller="tools:init". +func (s *toolSet) SetObserver(fn func(ToolsChange)) { + s.mu.Lock() + s.observer = fn + snapshot := make([]Tool, 0, len(s.tools)) + for _, t := range s.tools { + snapshot = append(snapshot, t) + } + s.mu.Unlock() + + if fn == nil { + return + } + // Stable order so replay is reproducible across processes. + sort.Slice(snapshot, func(i, j int) bool { return snapshot[i].Name() < snapshot[j].Name() }) + for _, t := range snapshot { + fn(ToolsChange{Schema: t.Schema(), Caller: "tools:init"}) + } } func (s *toolSet) Schemas() []ToolSchema { diff --git a/internal/session/message_convert.go b/internal/session/message_convert.go index d862ad8b..2b3dfe1d 100644 --- a/internal/session/message_convert.go +++ b/internal/session/message_convert.go @@ -13,6 +13,47 @@ import ( var inlineImageTokenPattern = regexp.MustCompile(`\[Image #(\d+)\]`) +// systemReminderRe matches a complete ... +// block including tags. Reminders are appended verbatim by reminder.AttachToContent +// and hook UserPromptSubmit additionalContext flows through the same path, so +// recognizing the wrapper suffices to mark harness-injected content. +var systemReminderRe = regexp.MustCompile(`(?s).*?`) + +const SourceReminder = "reminder" + +// splitTextByProvenance returns ContentBlocks that together preserve the input +// byte-for-byte, marking blocks with Source="reminder" so +// traces can distinguish user-typed text from harness-injected reminders / +// hook additionalContext. Empty input returns nil. +// +// The function never trims whitespace — concatenating all returned Text fields +// in order reproduces the original input. This keeps the read path +// (extractUserContent, which joins text blocks) round-trip safe. +func splitTextByProvenance(text string) []ContentBlock { + if text == "" { + return nil + } + matches := systemReminderRe.FindAllStringIndex(text, -1) + if len(matches) == 0 { + return []ContentBlock{{Type: "text", Text: text}} + } + + blocks := make([]ContentBlock, 0, 2*len(matches)+1) + cursor := 0 + for _, m := range matches { + start, end := m[0], m[1] + if start > cursor { + blocks = append(blocks, ContentBlock{Type: "text", Text: text[cursor:start]}) + } + blocks = append(blocks, ContentBlock{Type: "text", Text: text[start:end], Source: SourceReminder}) + cursor = end + } + if cursor < len(text) { + blocks = append(blocks, ContentBlock{Type: "text", Text: text[cursor:]}) + } + return blocks +} + func messagesToEntries(msgs []core.Message) []Entry { entries := make([]Entry, 0, len(msgs)) var prevUUID string @@ -106,9 +147,7 @@ func userContentToBlocks(content, displayContent string, images []core.Image) [] ImageSource: &ImageSource{Type: "base64", MediaType: img.MediaType, Data: img.Data}, }) } - if content != "" { - blocks = append(blocks, ContentBlock{Type: "text", Text: content}) - } + blocks = append(blocks, splitTextByProvenance(content)...) return blocks } @@ -123,9 +162,8 @@ func interleavedUserContentToBlocks(content, displayContent string, images []cor start, end := match[0], match[1] idStart, idEnd := match[2], match[3] - textPart := displayContent[last:start] - if textPart != "" { - blocks = append(blocks, ContentBlock{Type: "text", Text: textPart}) + if textPart := displayContent[last:start]; textPart != "" { + blocks = append(blocks, splitTextByProvenance(textPart)...) } id, err := strconv.Atoi(displayContent[idStart:idEnd]) @@ -143,11 +181,11 @@ func interleavedUserContentToBlocks(content, displayContent string, images []cor } if tail := displayContent[last:]; tail != "" { - blocks = append(blocks, ContentBlock{Type: "text", Text: tail}) + blocks = append(blocks, splitTextByProvenance(tail)...) } if len(blocks) == 0 && content != "" { - blocks = append(blocks, ContentBlock{Type: "text", Text: content}) + blocks = append(blocks, splitTextByProvenance(content)...) } return blocks diff --git a/internal/session/message_convert_test.go b/internal/session/message_convert_test.go index 5d23a61c..7716d715 100644 --- a/internal/session/message_convert_test.go +++ b/internal/session/message_convert_test.go @@ -1,11 +1,61 @@ package session import ( + "strings" "testing" "github.com/genai-io/gen-code/internal/core" ) +// A user message that contains harness-injected blocks must +// be persisted as multiple ContentBlocks: user-typed text with empty Source, +// reminder text with Source="reminder". Concatenating the text fields must +// reproduce the input byte-for-byte (round-trip safety for read path). +func Test_userContentToBlocks_splitsByProvenance(t *testing.T) { + const reminder1 = "\nskills directory\n" + const reminder2 = "\nuser memory\n" + input := "hello\n\n" + reminder1 + "\n\n" + reminder2 + + blocks := userContentToBlocks(input, "", nil) + + var sb strings.Builder + var reminderCount, userCount int + for _, b := range blocks { + if b.Type != "text" { + t.Fatalf("unexpected block type: %q", b.Type) + } + sb.WriteString(b.Text) + switch b.Source { + case SourceReminder: + reminderCount++ + if !strings.HasPrefix(b.Text, "") { + t.Errorf("reminder block missing wrapper: %q", b.Text) + } + case "": + userCount++ + default: + t.Fatalf("unexpected Source: %q", b.Source) + } + } + if sb.String() != input { + t.Fatalf("round-trip mismatch:\n got: %q\nwant: %q", sb.String(), input) + } + if reminderCount != 2 { + t.Fatalf("reminder block count = %d, want 2", reminderCount) + } + if userCount == 0 { + t.Fatalf("expected at least one user block, got %d", userCount) + } +} + +// Plain user content with no reminders produces exactly one user-text block. +func Test_userContentToBlocks_plainTextOneBlock(t *testing.T) { + blocks := userContentToBlocks("just a question", "", nil) + if len(blocks) != 1 || blocks[0].Type != "text" || blocks[0].Source != "" { + t.Fatalf("expected 1 user-text block, got %+v", blocks) + } +} + func Test_messagesToEntries_roundtrip(t *testing.T) { // Test that messagesToEntries -> EntriesToMessages roundtrips correctly. msgs := []core.Message{ diff --git a/internal/session/recorder.go b/internal/session/recorder.go new file mode 100644 index 00000000..37a8fda8 --- /dev/null +++ b/internal/session/recorder.go @@ -0,0 +1,233 @@ +package session + +import ( + "context" + "encoding/json" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + + "github.com/genai-io/gen-code/internal/core" + "github.com/genai-io/gen-code/internal/log" + "github.com/genai-io/gen-code/internal/session/transcript" +) + +// Recorder turns core.Agent lifecycle events into transcript records. +// +// One Recorder is bound to one (sessionID, agentID) pair. The OnAgentEvent +// method is meant to be passed as core.Config.OnEvent — it's called +// synchronously from the agent goroutine, so handlers stay fast. +// +// The recorder is additive: existing code paths (Store.Save) keep persisting +// messages and state. The recorder writes only event-driven records +// (inference.requested / inference.responded) that the snapshot path can't +// observe. +type Recorder struct { + fs *transcript.FileStore + sessionID string + agentID string + provider string + model string + maxTokens int + + turn atomic.Int64 + + mu sync.Mutex + lastRequest *requestState +} + +type requestState struct { + turn int + startedAt time.Time + messageIDs []string +} + +// RecorderOptions configures a Recorder. provider/model/maxTokens are captured +// at construction time and stamped on every inference record. Mid-session +// model swaps will be handled by a future model.changed event, not by mutating +// the recorder. +type RecorderOptions struct { + FileStore *transcript.FileStore + SessionID string + AgentID string + Provider string + Model string + MaxTokens int +} + +func NewRecorder(opts RecorderOptions) *Recorder { + return &Recorder{ + fs: opts.FileStore, + sessionID: opts.SessionID, + agentID: opts.AgentID, + provider: opts.Provider, + model: opts.Model, + maxTokens: opts.MaxTokens, + } +} + +// OnAgentEvent is the core.Config.OnEvent callback. It dispatches by event +// type and writes the corresponding transcript record. Errors are logged +// rather than propagated — failing to record telemetry must not break the +// running session. +func (r *Recorder) OnAgentEvent(ev core.Event) { + if r == nil || r.fs == nil || r.sessionID == "" { + return + } + switch ev.Type { + case core.PreInfer: + r.onPreInfer(ev) + case core.PostInfer: + r.onPostInfer(ev) + case core.OnSystemChange: + r.onSystemChange(ev) + case core.OnToolsChange: + r.onToolsChange(ev) + } +} + +func (r *Recorder) onToolsChange(ev core.Event) { + c, ok := ev.ToolsChange() + if !ok { + return + } + typ := transcript.ToolsAdded + payload := transcript.ToolsRecord{Caller: c.Caller} + if c.Removed { + typ = transcript.ToolsRemoved + payload.Name = c.Name + } else { + payload.Schema = toolSchemaView(c.Schema) + } + err := r.fs.AppendTools(context.Background(), transcript.AppendToolsCommand{ + SessionID: r.sessionID, + AgentID: r.agentID, + Time: time.Now(), + Type: typ, + Record: payload, + }) + if err != nil { + log.Logger().Warn("recorder: append tools change failed", zap.Error(err)) + } +} + +func toolSchemaView(s core.ToolSchema) *transcript.ToolSchemaView { + view := &transcript.ToolSchemaView{ + Name: s.Name, + Description: s.Description, + } + if s.Parameters != nil { + if data, err := json.Marshal(s.Parameters); err == nil { + view.Parameters = data + } + } + return view +} + +func (r *Recorder) onSystemChange(ev core.Event) { + c, ok := ev.SystemChange() + if !ok { + return + } + typ := transcript.SystemSectionAdded + if c.Removed { + typ = transcript.SystemSectionRemoved + } + err := r.fs.AppendSystemSection(context.Background(), transcript.AppendSystemSectionCommand{ + SessionID: r.sessionID, + AgentID: r.agentID, + Time: time.Now(), + Type: typ, + Record: transcript.SystemSectionRecord{ + Name: c.Name, + Slot: c.Slot, + Content: c.Content, + Caller: c.Caller, + }, + }) + if err != nil { + log.Logger().Warn("recorder: append system section failed", zap.Error(err)) + } +} + +func (r *Recorder) onPreInfer(ev core.Event) { + ic, ok := ev.InferenceContext() + if !ok { + return + } + + turn := int(r.turn.Add(1)) + now := time.Now() + + r.mu.Lock() + r.lastRequest = &requestState{ + turn: turn, + startedAt: now, + messageIDs: ic.MessageIDs, + } + r.mu.Unlock() + + err := r.fs.AppendInference(context.Background(), transcript.AppendInferenceCommand{ + SessionID: r.sessionID, + AgentID: r.agentID, + Time: now, + Type: transcript.InferenceRequested, + Record: transcript.InferenceRecord{ + Turn: turn, + Provider: r.provider, + Model: r.model, + MaxTokens: r.maxTokens, + SystemDigest: ic.SystemDigest, + ToolsDigest: ic.ToolsDigest, + MessageIDs: ic.MessageIDs, + }, + }) + if err != nil { + log.Logger().Warn("recorder: append inference.requested failed", zap.Error(err)) + } +} + +func (r *Recorder) onPostInfer(ev core.Event) { + resp, ok := ev.Response() + if !ok || resp == nil { + return + } + + r.mu.Lock() + prev := r.lastRequest + r.lastRequest = nil + r.mu.Unlock() + + now := time.Now() + var turn int + var latencyMs int64 + if prev != nil { + turn = prev.turn + latencyMs = now.Sub(prev.startedAt).Milliseconds() + } + + err := r.fs.AppendInference(context.Background(), transcript.AppendInferenceCommand{ + SessionID: r.sessionID, + AgentID: r.agentID, + Time: now, + Type: transcript.InferenceResponded, + Record: transcript.InferenceRecord{ + Turn: turn, + Provider: r.provider, + Model: r.model, + StopReason: string(resp.StopReason), + LatencyMs: latencyMs, + Usage: &transcript.InferenceUsage{ + InputTokens: resp.TokensIn, + OutputTokens: resp.TokensOut, + CacheCreateTokens: resp.CacheCreateTokens, + CacheReadTokens: resp.CacheReadTokens, + }, + }, + }) + if err != nil { + log.Logger().Warn("recorder: append inference.responded failed", zap.Error(err)) + } +} diff --git a/internal/session/recorder_test.go b/internal/session/recorder_test.go new file mode 100644 index 00000000..2ef6ef6b --- /dev/null +++ b/internal/session/recorder_test.go @@ -0,0 +1,250 @@ +package session + +import ( + "bufio" + "context" + "encoding/json" + "os" + "strings" + "testing" + "time" + + "github.com/genai-io/gen-code/internal/core" + "github.com/genai-io/gen-code/internal/session/transcript" +) + +// One turn through PreInfer + PostInfer must produce one inference.requested +// and one inference.responded record, with the request's MessageIDs/digests +// preserved and the response's usage/stop_reason populated. The turn counter +// links the pair. +func TestRecorderWritesRequestedAndRespondedPerTurn(t *testing.T) { + dir := t.TempDir() + fs, err := transcript.NewFileStore(dir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore: %v", err) + } + if err := fs.Start(context.Background(), transcript.StartCommand{ + SessionID: "sess-1", Cwd: "/tmp", Provider: "anthropic", Model: "claude-x", Time: time.Now(), + }); err != nil { + t.Fatalf("Start: %v", err) + } + + rec := NewRecorder(RecorderOptions{ + FileStore: fs, SessionID: "sess-1", AgentID: "main", + Provider: "anthropic", Model: "claude-x", MaxTokens: 4096, + }) + + rec.OnAgentEvent(core.Event{Type: core.PreInfer, Source: "main", Data: core.InferenceContext{ + SystemDigest: "sha256:sys", ToolsDigest: "sha256:tools", MessageIDs: []string{"m1", "m2"}, + }}) + rec.OnAgentEvent(core.Event{Type: core.PostInfer, Source: "main", Data: &core.InferResponse{ + StopReason: core.StopEndTurn, TokensIn: 42, TokensOut: 8, CacheReadTokens: 10, + }}) + + tx, err := fs.Load(context.Background(), "sess-1") + if err != nil { + t.Fatalf("Load: %v", err) + } + // Project() flattens to messages; inspect raw records via a re-load instead. + // Easiest path: read the file directly via Load and look at the messages — + // but inference records aren't messages. Use the file to read raw records. + _ = tx + + // Read raw records from disk to assert on inference records. + records := readAllRecords(t, dir, "sess-1") + + var reqs, resps []transcript.Record + for _, r := range records { + switch r.Type { + case transcript.InferenceRequested: + reqs = append(reqs, r) + case transcript.InferenceResponded: + resps = append(resps, r) + } + } + if len(reqs) != 1 { + t.Fatalf("inference.requested count = %d, want 1", len(reqs)) + } + if len(resps) != 1 { + t.Fatalf("inference.responded count = %d, want 1", len(resps)) + } + + req := reqs[0].Inference + if req == nil { + t.Fatal("request record missing Inference payload") + } + if req.Turn != 1 { + t.Fatalf("requested.Turn = %d, want 1", req.Turn) + } + if req.SystemDigest != "sha256:sys" || req.ToolsDigest != "sha256:tools" { + t.Fatalf("requested digests = %+v", req) + } + if len(req.MessageIDs) != 2 || req.MessageIDs[0] != "m1" { + t.Fatalf("requested.MessageIDs = %+v", req.MessageIDs) + } + + resp := resps[0].Inference + if resp == nil { + t.Fatal("response record missing Inference payload") + } + if resp.Turn != 1 { + t.Fatalf("responded.Turn = %d, want 1 (must match request)", resp.Turn) + } + if resp.StopReason != string(core.StopEndTurn) { + t.Fatalf("responded.StopReason = %q", resp.StopReason) + } + if resp.Usage == nil || resp.Usage.InputTokens != 42 || resp.Usage.OutputTokens != 8 || resp.Usage.CacheReadTokens != 10 { + t.Fatalf("responded.Usage = %+v", resp.Usage) + } +} + +// System mutations must produce one system.section.added per added/replaced +// section and one system.section.removed per dropped section. Caller info is +// preserved verbatim so trace consumers can answer "who changed this?". +func TestRecorderWritesSystemSectionEvents(t *testing.T) { + dir := t.TempDir() + fs, err := transcript.NewFileStore(dir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore: %v", err) + } + if err := fs.Start(context.Background(), transcript.StartCommand{ + SessionID: "sess-sys", Cwd: "/tmp", Provider: "p", Model: "m", Time: time.Now(), + }); err != nil { + t.Fatalf("Start: %v", err) + } + + rec := NewRecorder(RecorderOptions{ + FileStore: fs, SessionID: "sess-sys", AgentID: "main", + Provider: "p", Model: "m", MaxTokens: 1, + }) + + rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{ + Name: "identity", Slot: 0, Content: "You are X", Caller: "system:init", + }}) + rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{ + Name: "identity", Slot: 0, Content: "You are Y", Caller: "command:/identity", + }}) + rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{ + Name: "policy", Removed: true, Caller: "test:teardown", + }}) + + records := readAllRecords(t, dir, "sess-sys") + var added, removed []transcript.Record + for _, r := range records { + switch r.Type { + case transcript.SystemSectionAdded: + added = append(added, r) + case transcript.SystemSectionRemoved: + removed = append(removed, r) + } + } + if len(added) != 2 { + t.Fatalf("system.section.added count = %d, want 2", len(added)) + } + if len(removed) != 1 { + t.Fatalf("system.section.removed count = %d, want 1", len(removed)) + } + if added[1].System.Caller != "command:/identity" || added[1].System.Content != "You are Y" { + t.Fatalf("replaced section payload = %+v", added[1].System) + } + if removed[0].System.Name != "policy" || removed[0].System.Caller != "test:teardown" { + t.Fatalf("removed section payload = %+v", removed[0].System) + } +} + +// Tool registry changes must produce one tools.added per Add and one +// tools.removed per Remove. Schema/Name and Caller are preserved. +func TestRecorderWritesToolsChangeEvents(t *testing.T) { + dir := t.TempDir() + fs, err := transcript.NewFileStore(dir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore: %v", err) + } + if err := fs.Start(context.Background(), transcript.StartCommand{ + SessionID: "sess-tools", Cwd: "/tmp", Provider: "p", Model: "m", Time: time.Now(), + }); err != nil { + t.Fatalf("Start: %v", err) + } + + rec := NewRecorder(RecorderOptions{ + FileStore: fs, SessionID: "sess-tools", AgentID: "main", + Provider: "p", Model: "m", MaxTokens: 1, + }) + + rec.OnAgentEvent(core.Event{Type: core.OnToolsChange, Data: core.ToolsChange{ + Schema: core.ToolSchema{Name: "Read", Description: "read a file"}, + Caller: "tools:init", + }}) + rec.OnAgentEvent(core.Event{Type: core.OnToolsChange, Data: core.ToolsChange{ + Schema: core.ToolSchema{Name: "Bash", Description: "run shell"}, + Caller: "mcp:Bash", + }}) + rec.OnAgentEvent(core.Event{Type: core.OnToolsChange, Data: core.ToolsChange{ + Name: "Bash", Removed: true, Caller: "mode:plan", + }}) + + records := readAllRecords(t, dir, "sess-tools") + var added, removed []transcript.Record + for _, r := range records { + switch r.Type { + case transcript.ToolsAdded: + added = append(added, r) + case transcript.ToolsRemoved: + removed = append(removed, r) + } + } + if len(added) != 2 { + t.Fatalf("tools.added count = %d, want 2", len(added)) + } + if len(removed) != 1 { + t.Fatalf("tools.removed count = %d, want 1", len(removed)) + } + if added[0].Tools.Schema == nil || added[0].Tools.Schema.Name != "Read" || added[0].Tools.Caller != "tools:init" { + t.Fatalf("first added record = %+v", added[0].Tools) + } + if removed[0].Tools.Name != "Bash" || removed[0].Tools.Caller != "mode:plan" { + t.Fatalf("removed record = %+v", removed[0].Tools) + } +} + +// A nil-safe recorder (no FileStore) must accept events without panicking. +func TestRecorderNilSafe(t *testing.T) { + var rec *Recorder + rec.OnAgentEvent(core.Event{Type: core.PreInfer}) + + empty := NewRecorder(RecorderOptions{}) + empty.OnAgentEvent(core.Event{Type: core.PreInfer}) +} + +func readAllRecords(t *testing.T, baseDir, sessionID string) []transcript.Record { + t.Helper() + fs, err := transcript.NewFileStore(baseDir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore (reread): %v", err) + } + path := fs.TranscriptPath(sessionID) + f, err := os.Open(path) + if err != nil { + t.Fatalf("open transcript: %v", err) + } + defer f.Close() + + var out []transcript.Record + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 1024*1024), 16*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + var r transcript.Record + if err := json.Unmarshal([]byte(line), &r); err != nil { + t.Fatalf("decode record: %v", err) + } + out = append(out, r) + } + if err := scanner.Err(); err != nil { + t.Fatalf("scan transcript: %v", err) + } + return out +} diff --git a/internal/session/service.go b/internal/session/service.go index 64f16256..04d13f95 100644 --- a/internal/session/service.go +++ b/internal/session/service.go @@ -26,6 +26,9 @@ type Service interface { LoadLatest() (*Snapshot, error) List() ([]*SessionMetadata, error) Fork(id string) (*Snapshot, error) + + // tracing + NewRecorder(agentID, provider, model string, maxTokens int) *Recorder } // Compile-time check: *Setup implements Service. diff --git a/internal/session/setup.go b/internal/session/setup.go index 1d2e6214..917bad46 100644 --- a/internal/session/setup.go +++ b/internal/session/setup.go @@ -134,3 +134,25 @@ func (s *Setup) Fork(id string) (*Snapshot, error) { } return st.Fork(id) } + +// NewRecorder binds a Recorder to the current session and transcript store. +// Returns nil if the store is not initialized — callers can pass the nil +// result through to core.Config.OnEvent safely because Recorder.OnAgentEvent +// is nil-safe. +func (s *Setup) NewRecorder(agentID, provider, model string, maxTokens int) *Recorder { + s.mu.RLock() + st := s.Store + sessionID := s.SessionID + s.mu.RUnlock() + if st == nil || st.transcriptStore == nil || sessionID == "" { + return nil + } + return NewRecorder(RecorderOptions{ + FileStore: st.transcriptStore, + SessionID: sessionID, + AgentID: agentID, + Provider: provider, + Model: model, + MaxTokens: maxTokens, + }) +} diff --git a/internal/session/transcript/fs_store.go b/internal/session/transcript/fs_store.go index 7712829d..3d4beb92 100644 --- a/internal/session/transcript/fs_store.go +++ b/internal/session/transcript/fs_store.go @@ -149,6 +149,78 @@ func (s *FileStore) PatchState(ctx context.Context, cmd PatchStateCommand) error return s.refreshIndexLocked(cmd.SessionID) } +func (s *FileStore) AppendTools(ctx context.Context, cmd AppendToolsCommand) error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := ctx.Err(); err != nil { + return err + } + if cmd.Type != ToolsAdded && cmd.Type != ToolsRemoved { + return fmt.Errorf("append tools: unexpected type %q", cmd.Type) + } + + rec := cmd.Record + full := Record{ + ID: fmt.Sprintf("%s:tools:%d", cmd.SessionID, cmd.Time.UnixNano()), + SessionID: cmd.SessionID, + Time: cmd.Time, + Type: cmd.Type, + AgentID: cmd.AgentID, + Tools: &rec, + } + return s.appendRecord(s.transcriptPath(cmd.SessionID), full, false) +} + +func (s *FileStore) AppendSystemSection(ctx context.Context, cmd AppendSystemSectionCommand) error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := ctx.Err(); err != nil { + return err + } + if cmd.Type != SystemSectionAdded && cmd.Type != SystemSectionRemoved { + return fmt.Errorf("append system section: unexpected type %q", cmd.Type) + } + + rec := cmd.Record + full := Record{ + ID: fmt.Sprintf("%s:system:%d", cmd.SessionID, cmd.Time.UnixNano()), + SessionID: cmd.SessionID, + Time: cmd.Time, + Type: cmd.Type, + AgentID: cmd.AgentID, + System: &rec, + } + return s.appendRecord(s.transcriptPath(cmd.SessionID), full, false) +} + +func (s *FileStore) AppendInference(ctx context.Context, cmd AppendInferenceCommand) error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := ctx.Err(); err != nil { + return err + } + if cmd.Type != InferenceRequested && cmd.Type != InferenceResponded { + return fmt.Errorf("append inference: unexpected type %q", cmd.Type) + } + + rec := cmd.Record + full := Record{ + ID: fmt.Sprintf("%s:inference:%d", cmd.SessionID, cmd.Time.UnixNano()), + SessionID: cmd.SessionID, + Time: cmd.Time, + Type: cmd.Type, + AgentID: cmd.AgentID, + Inference: &rec, + } + // inference.responded sync-flushes the file so any preceding telemetry + // (system.section.*, tools.*, inference.requested) from this turn lands + // on disk together; inference.requested itself stays in the page cache. + return s.appendRecord(s.transcriptPath(cmd.SessionID), full, cmd.Type == InferenceResponded) +} + func (s *FileStore) Compact(ctx context.Context, cmd CompactCommand) error { s.mu.Lock() defer s.mu.Unlock() @@ -355,15 +427,19 @@ func (s *FileStore) indexPath() string { } // appendRecord writes one record to the JSONL. fsync is gated by sync so -// hot-path telemetry can be buffered in the OS page cache and rolled up to -// disk at turn boundaries. +// hot-path telemetry (system.section.*, tools.*, inference.requested) can be +// buffered in the OS page cache and rolled up to disk at turn boundaries. // // Durability classes: -// - sync=true: user input (message.appended), lifecycle events -// (session.started, session.compacted), and turn-completion writes. +// - sync=true: user input (message.appended), turn-completion +// (inference.responded), lifecycle events (session.started, session.compacted). // A crash after these must not lose them. -// - sync=false: pure telemetry (state.patched today; more in follow-up -// commits). Worst case on crash: in-flight turn's telemetry is lost. +// - sync=false: pure telemetry. Worst case on crash: the in-flight turn's +// telemetry is lost, but the message/state of preceding turns is intact +// because their write fsynced. +// +// Single-process per file: the append+close pair preserves order regardless +// of fsync; durability is the only thing the flag toggles. func (s *FileStore) appendRecord(path string, rec Record, sync bool) error { if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return fmt.Errorf("create transcript dir: %w", err) diff --git a/internal/session/transcript/records.go b/internal/session/transcript/records.go index ef489b2e..4ae5c8cb 100644 --- a/internal/session/transcript/records.go +++ b/internal/session/transcript/records.go @@ -8,11 +8,17 @@ import ( // Record type values follow . (past tense), lowercase, // dot-separated. See docs/tracing.md for the full taxonomy. const ( - SessionStarted = "session.started" - SessionForked = "session.forked" - SessionCompacted = "session.compacted" - MessageAppended = "message.appended" - StatePatched = "state.patched" + SessionStarted = "session.started" + SessionForked = "session.forked" + SessionCompacted = "session.compacted" + MessageAppended = "message.appended" + StatePatched = "state.patched" + InferenceRequested = "inference.requested" + InferenceResponded = "inference.responded" + SystemSectionAdded = "system.section.added" + SystemSectionRemoved = "system.section.removed" + ToolsAdded = "tools.added" + ToolsRemoved = "tools.removed" ) const ( @@ -37,9 +43,12 @@ type Record struct { GitBranch string `json:"gitBranch,omitempty"` AgentID string `json:"agentId,omitempty"` - Message *MessageRecord `json:"message,omitempty"` - State *StateRecord `json:"state,omitempty"` - Session *SessionRecord `json:"session,omitempty"` + Message *MessageRecord `json:"message,omitempty"` + State *StateRecord `json:"state,omitempty"` + Session *SessionRecord `json:"session,omitempty"` + Inference *InferenceRecord `json:"inference,omitempty"` + System *SystemSectionRecord `json:"system,omitempty"` + Tools *ToolsRecord `json:"tools,omitempty"` } type MessageRecord struct { @@ -68,6 +77,63 @@ type SessionRecord struct { BoundaryID string `json:"boundaryId,omitempty"` } +// InferenceRecord carries the payload for inference.requested / +// inference.responded. The "requested" side captures the digests of what was +// sent to the LLM (system prompt, tools, active message chain); the "responded" +// side captures stop reason, latency, and token usage. Big fields live in the +// digests — full payloads are reconstructible by replaying preceding records. +type InferenceRecord struct { + Turn int `json:"turn"` + Provider string `json:"provider,omitempty"` + Model string `json:"model,omitempty"` + MaxTokens int `json:"maxTokens,omitempty"` + SystemDigest string `json:"systemDigest,omitempty"` + ToolsDigest string `json:"toolsDigest,omitempty"` + + // MessageIDs is the active chain at request time, in send order. + // Recorded on inference.requested only. + MessageIDs []string `json:"messageIds,omitempty"` + + // Response fields — populated on inference.responded only. + StopReason string `json:"stopReason,omitempty"` + LatencyMs int64 `json:"latencyMs,omitempty"` + Usage *InferenceUsage `json:"usage,omitempty"` +} + +type InferenceUsage struct { + InputTokens int `json:"inputTokens"` + OutputTokens int `json:"outputTokens"` + CacheCreateTokens int `json:"cacheCreateTokens,omitempty"` + CacheReadTokens int `json:"cacheReadTokens,omitempty"` +} + +// SystemSectionRecord carries the payload for system.section.added / +// system.section.removed. On removal, Content is empty. +type SystemSectionRecord struct { + Name string `json:"name"` + Slot int `json:"slot,omitempty"` + Content string `json:"content,omitempty"` + Caller string `json:"caller,omitempty"` +} + +// ToolSchemaView mirrors a tool schema in transcript-local types so the +// transcript package stays free of cross-package imports. Recorder converts +// from the runtime ToolSchema before writing. +type ToolSchemaView struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Parameters json.RawMessage `json:"parameters,omitempty"` +} + +// ToolsRecord carries the payload for tools.added / tools.removed. +// One tool per record (Add/Remove fire individually); Schema is set on +// "added", Name on "removed". +type ToolsRecord struct { + Schema *ToolSchemaView `json:"schema,omitempty"` + Name string `json:"name,omitempty"` + Caller string `json:"caller,omitempty"` +} + type ContentBlock struct { Type string `json:"type"` diff --git a/internal/session/transcript/store.go b/internal/session/transcript/store.go index d542a9ca..ddc19785 100644 --- a/internal/session/transcript/store.go +++ b/internal/session/transcript/store.go @@ -51,6 +51,35 @@ type ForkCommand struct { Time time.Time } +// AppendInferenceCommand writes either an inference.requested or +// inference.responded record. Type selects which; Record carries the payload. +type AppendInferenceCommand struct { + SessionID string + AgentID string + Time time.Time + Type string // InferenceRequested or InferenceResponded + Record InferenceRecord +} + +// AppendSystemSectionCommand writes system.section.added or +// system.section.removed. Removed sections drop Content from Record. +type AppendSystemSectionCommand struct { + SessionID string + AgentID string + Time time.Time + Type string // SystemSectionAdded or SystemSectionRemoved + Record SystemSectionRecord +} + +// AppendToolsCommand writes tools.added or tools.removed. +type AppendToolsCommand struct { + SessionID string + AgentID string + Time time.Time + Type string // ToolsAdded or ToolsRemoved + Record ToolsRecord +} + type ListOptions struct { Limit int IncludeSidechain bool diff --git a/internal/subagent/executor.go b/internal/subagent/executor.go index fc7543a9..5aa6a496 100644 --- a/internal/subagent/executor.go +++ b/internal/subagent/executor.go @@ -285,7 +285,7 @@ func (e *Executor) buildAgent(ctx context.Context, rc *runConfig, agentCwd strin if e.mcpRegistry != nil { mcpCaller := mcp.NewCaller(e.mcpRegistry) for _, t := range mcp.AsCoreTools(schemas, mcpCaller) { - tools.Add(t) + tools.Add(t, "mcp:"+t.Name()) } } diff --git a/internal/subagent/executor_test.go b/internal/subagent/executor_test.go index eb409774..2da6cbe0 100644 --- a/internal/subagent/executor_test.go +++ b/internal/subagent/executor_test.go @@ -554,7 +554,9 @@ func (s *stubLLM) InputLimit() int { return 0 } // stubSystem is a minimal core.System for tests. type stubSystem struct{} -func (s *stubSystem) Prompt() string { return "" } -func (s *stubSystem) Use(_ core.Section) {} -func (s *stubSystem) Drop(_ string) {} -func (s *stubSystem) Refresh(_ string) {} +func (s *stubSystem) Prompt() string { return "" } +func (s *stubSystem) Use(_ core.Section, _ string) {} +func (s *stubSystem) Drop(_, _ string) {} +func (s *stubSystem) Refresh(_, _ string) {} +func (s *stubSystem) Sections() []core.Section { return nil } +func (s *stubSystem) SetObserver(_ func(core.SystemChange)) {} diff --git a/internal/subagent/progress_tools.go b/internal/subagent/progress_tools.go index 374b1e36..9a852ecc 100644 --- a/internal/subagent/progress_tools.go +++ b/internal/subagent/progress_tools.go @@ -19,10 +19,11 @@ func (p *progressTools) Get(name string) core.Tool { } return &progressTool{inner: t, onExec: p.onExec} } -func (p *progressTools) All() []core.Tool { return p.inner.All() } -func (p *progressTools) Add(t core.Tool) { p.inner.Add(t) } -func (p *progressTools) Remove(name string) { p.inner.Remove(name) } -func (p *progressTools) Schemas() []core.ToolSchema { return p.inner.Schemas() } +func (p *progressTools) All() []core.Tool { return p.inner.All() } +func (p *progressTools) Add(t core.Tool, caller string) { p.inner.Add(t, caller) } +func (p *progressTools) Remove(name, caller string) { p.inner.Remove(name, caller) } +func (p *progressTools) Schemas() []core.ToolSchema { return p.inner.Schemas() } +func (p *progressTools) SetObserver(fn func(core.ToolsChange)) { p.inner.SetObserver(fn) } type progressTool struct { inner core.Tool diff --git a/internal/tool/permission.go b/internal/tool/permission.go index f574837d..a7742578 100644 --- a/internal/tool/permission.go +++ b/internal/tool/permission.go @@ -32,10 +32,11 @@ func (pt *permissionTools) Get(name string) core.Tool { return &permissionTool{inner: t, check: pt.check} } -func (pt *permissionTools) All() []core.Tool { return pt.inner.All() } -func (pt *permissionTools) Add(tool core.Tool) { pt.inner.Add(tool) } -func (pt *permissionTools) Remove(name string) { pt.inner.Remove(name) } -func (pt *permissionTools) Schemas() []core.ToolSchema { return pt.inner.Schemas() } +func (pt *permissionTools) All() []core.Tool { return pt.inner.All() } +func (pt *permissionTools) Add(tool core.Tool, caller string) { pt.inner.Add(tool, caller) } +func (pt *permissionTools) Remove(name, caller string) { pt.inner.Remove(name, caller) } +func (pt *permissionTools) Schemas() []core.ToolSchema { return pt.inner.Schemas() } +func (pt *permissionTools) SetObserver(fn func(core.ToolsChange)) { pt.inner.SetObserver(fn) } // permissionTool wraps a single core.Tool with permission checking. type permissionTool struct { From 606aa6858936c8aa34e725763c706b5a3aec54eb Mon Sep 17 00:00:00 2001 From: Meng Yan Date: Fri, 15 May 2026 23:13:12 +0800 Subject: [PATCH 2/2] fix: write session.started before observer-driven telemetry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NewRecorder now calls FileStore.Start synchronously before returning, so session.started lands on disk first. Previously the call order 1. app builds Recorder 2. core.NewAgent wires System/Tools.SetObserver 3. SetObserver replays existing members → AppendSystemSection/AppendTools creates the transcript file 4. Store.Save → Start → file exists, no-op left provider/model/parentID metadata absent from every transcript that involved any observer replay (i.e. every main-agent session). Resume and the projects index would show empty Provider/Model. Fix lives in the recorder constructor because that's the latest point before observers can fire. RecorderOptions gains Cwd and ProjectID so Start can be called with the right metadata. Setup.NewRecorder threads them through from Store.cwd/Store.projectID. Test: TestRecorder_WritesSessionStartedBeforeTelemetry asserts that after NewRecorder() returns and an OnSystemChange event is fired, the first record on disk is session.started with provider+model populated. --- internal/session/recorder.go | 25 +++++++ internal/session/recorder_lifecycle_test.go | 72 +++++++++++++++++++++ internal/session/setup.go | 2 + 3 files changed, 99 insertions(+) create mode 100644 internal/session/recorder_lifecycle_test.go diff --git a/internal/session/recorder.go b/internal/session/recorder.go index 37a8fda8..b9ace745 100644 --- a/internal/session/recorder.go +++ b/internal/session/recorder.go @@ -48,6 +48,9 @@ type requestState struct { // at construction time and stamped on every inference record. Mid-session // model swaps will be handled by a future model.changed event, not by mutating // the recorder. +// +// Cwd and ProjectID are needed at construction so the recorder can write +// session.started before any telemetry — see NewRecorder for why. type RecorderOptions struct { FileStore *transcript.FileStore SessionID string @@ -55,9 +58,31 @@ type RecorderOptions struct { Provider string Model string MaxTokens int + Cwd string + ProjectID string } +// NewRecorder constructs a Recorder and ensures session.started lands on +// disk BEFORE the recorder can be invoked. This matters because the caller +// (core.NewAgent → System/Tools.SetObserver) immediately replays existing +// sections/tools as synthetic events; those events create the transcript +// file via AppendSystemSection / AppendTools. If session.started hasn't +// been written first, the subsequent Store.Save → Start call short-circuits +// on fileExists and provider/model/parent metadata is lost. +// +// Start is idempotent — a no-op on existing files — so callers (Store.Save) +// that also invoke it stay correct. func NewRecorder(opts RecorderOptions) *Recorder { + if opts.FileStore != nil && opts.SessionID != "" { + _ = opts.FileStore.Start(context.Background(), transcript.StartCommand{ + SessionID: opts.SessionID, + ProjectID: opts.ProjectID, + Cwd: opts.Cwd, + Provider: opts.Provider, + Model: opts.Model, + Time: time.Now(), + }) + } return &Recorder{ fs: opts.FileStore, sessionID: opts.SessionID, diff --git a/internal/session/recorder_lifecycle_test.go b/internal/session/recorder_lifecycle_test.go new file mode 100644 index 00000000..adc29d7d --- /dev/null +++ b/internal/session/recorder_lifecycle_test.go @@ -0,0 +1,72 @@ +package session + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/genai-io/gen-code/internal/core" + "github.com/genai-io/gen-code/internal/session/transcript" +) + +// Regression: NewRecorder must write session.started before returning, so +// that subsequent telemetry writes from SetObserver replay don't beat it to +// the disk. If session.started were written lazily (or only by Store.Save), +// the transcript file would already exist when Save calls Start — Start +// would short-circuit and provider/model metadata would be lost. +func TestRecorder_WritesSessionStartedBeforeTelemetry(t *testing.T) { + dir := t.TempDir() + fs, err := transcript.NewFileStore(dir, "proj-1") + if err != nil { + t.Fatalf("NewFileStore: %v", err) + } + // Construct the recorder — should write session.started synchronously. + rec := NewRecorder(RecorderOptions{ + FileStore: fs, + SessionID: "sess", + AgentID: "main", + Provider: "anthropic", + Model: "claude-x", + MaxTokens: 4096, + Cwd: "/tmp/project", + ProjectID: "proj-1", + }) + + // Fire a SystemChange right after — mirroring what SetObserver replay + // does inside core.NewAgent. + rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{ + Name: "identity", Slot: 0, Content: "You are X", Caller: "system:init", + }}) + + data, err := os.ReadFile(filepath.Join(dir, "transcripts", "sess.jsonl")) + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + var firstType, foundProvider, foundModel string + for i, line := range strings.Split(string(data), "\n") { + if line == "" { + continue + } + var raw map[string]any + if err := json.Unmarshal([]byte(line), &raw); err != nil { + t.Fatalf("line %d not JSON: %v", i, err) + } + if i == 0 { + firstType, _ = raw["type"].(string) + } + if t, _ := raw["type"].(string); t == "session.started" { + if sess, ok := raw["session"].(map[string]any); ok { + foundProvider, _ = sess["provider"].(string) + foundModel, _ = sess["model"].(string) + } + } + } + if firstType != "session.started" { + t.Fatalf("first record type = %q, want session.started", firstType) + } + if foundProvider != "anthropic" || foundModel != "claude-x" { + t.Fatalf("session.started provider=%q model=%q, want anthropic/claude-x", foundProvider, foundModel) + } +} diff --git a/internal/session/setup.go b/internal/session/setup.go index 917bad46..7ce02486 100644 --- a/internal/session/setup.go +++ b/internal/session/setup.go @@ -154,5 +154,7 @@ func (s *Setup) NewRecorder(agentID, provider, model string, maxTokens int) *Rec Provider: provider, Model: model, MaxTokens: maxTokens, + Cwd: st.cwd, + ProjectID: st.projectID, }) }