diff --git a/internal/agent/build.go b/internal/agent/build.go
index c93fecc..e22138a 100644
--- a/internal/agent/build.go
+++ b/internal/agent/build.go
@@ -39,6 +39,10 @@ type BuildParams struct {
PermissionDecider PermDecisionFunc
InteractionFunc tool.InteractionFunc
ToolProgress func(toolCallID string, msg string)
+
+ // OnEvent observes every agent lifecycle event synchronously, alongside
+ // outbox delivery. Used by the trace recorder; nil leaves recording off.
+ OnEvent func(core.Event)
}
func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) {
@@ -87,7 +91,7 @@ func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) {
}))
tools := tool.AdaptToolRegistry(schemas, cwdFunc, adaptOpts...)
for _, t := range p.MCPTools {
- tools.Add(t)
+ tools.Add(t, "mcp:"+t.Name())
}
compactClient := client
@@ -111,6 +115,7 @@ func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) {
Tools: tool.WithPermission(tools, pb.PermissionFunc()),
CompactFunc: compactFunc,
CWD: p.CWD,
+ OnEvent: p.OnEvent,
})
return ag, pb, nil
diff --git a/internal/app/agent.go b/internal/app/agent.go
index a70ed16..9253895 100644
--- a/internal/app/agent.go
+++ b/internal/app/agent.go
@@ -61,11 +61,18 @@ func (m *model) buildAgentParams() agent.BuildParams {
mcpTools = mcp.AsCoreTools(schemas, mcpCaller)
}
+ maxTokens := kit.GetMaxTokens(m.services.LLM.Store(), m.env.CurrentModel, setting.DefaultMaxTokens)
+ var onEvent func(core.Event)
+ if rec := m.services.Session.NewRecorder("main", m.env.LLMProvider.Name(), m.env.GetModelID(), maxTokens); rec != nil {
+ onEvent = rec.OnAgentEvent
+ }
+
return agent.BuildParams{
Provider: m.env.LLMProvider,
ModelID: m.env.GetModelID(),
- MaxTokens: kit.GetMaxTokens(m.services.LLM.Store(), m.env.CurrentModel, setting.DefaultMaxTokens),
+ MaxTokens: maxTokens,
ThinkingEffort: m.env.EffectiveThinkingEffort(),
+ OnEvent: onEvent,
CWD: m.env.CWD,
CWDFunc: func() string { return m.env.CWD },
diff --git a/internal/core/agent.go b/internal/core/agent.go
index 8df891c..650333d 100644
--- a/internal/core/agent.go
+++ b/internal/core/agent.go
@@ -133,7 +133,7 @@ func NewAgent(cfg Config) Agent {
outbox = make(chan Event, cfg.OutboxBuf)
}
- return &agent{
+ a := &agent{
id: cfg.ID,
agentType: cfg.AgentType,
color: cfg.Color,
@@ -148,6 +148,16 @@ func NewAgent(cfg Config) Agent {
outbox: outbox,
onEvent: cfg.OnEvent,
}
+ // Mirror system + tools mutations onto the event bus. Attach after
+ // construction so each registry replays its initial members back to the
+ // observer — the recorder sees a complete event chain from t0.
+ cfg.System.SetObserver(func(c SystemChange) {
+ a.emitTelemetry(SystemChangeEvent(a.id, c))
+ })
+ cfg.Tools.SetObserver(func(c ToolsChange) {
+ a.emitTelemetry(ToolsChangeEvent(a.id, c))
+ })
+ return a
}
// Result represents the outcome of one completed turn (end_turn).
@@ -178,6 +188,15 @@ const (
OnMessage EventType = "Message" // message received on inbox (Message in Data)
OnTurn EventType = "Turn" // think+act cycle completed (Result in Data)
OnCompact EventType = "Compact" // conversation compacted (CompactInfo in Data)
+
+ // OnSystemChange fires when a system-prompt section is added, replaced,
+ // or removed. Data is SystemChange. Non-critical telemetry — never blocks
+ // the outbox on backpressure.
+ OnSystemChange EventType = "SystemChange"
+
+ // OnToolsChange fires when a tool is registered or unregistered. Data is
+ // ToolsChange. Like OnSystemChange, non-blocking telemetry.
+ OnToolsChange EventType = "ToolsChange"
)
// Event carries context for an agent lifecycle point.
@@ -205,6 +224,50 @@ type CompactInfo struct {
OriginalCount int
}
+// InferenceContext is the PreInfer payload — what was about to be sent to the
+// LLM, expressed as content-addressed digests so consumers (trace recorder,
+// debug logger) can reference inputs without copying them on every turn.
+type InferenceContext struct {
+ SystemDigest string // sha256 of rendered system prompt
+ ToolsDigest string // sha256 of canonicalized tool schemas
+ MessageIDs []string // active chain at request time, in send order
+}
+
+func (e Event) InferenceContext() (InferenceContext, bool) {
+ ic, ok := e.Data.(InferenceContext)
+ return ic, ok
+}
+
+// SystemChange describes one mutation to the system prompt's section map.
+// Emitted on Use/Drop. The recorder translates these into
+// system.section.added / system.section.removed records.
+type SystemChange struct {
+ Name string // section name (stable across mutations)
+ Slot int // render slot
+ Content string // rendered content; empty when Removed
+ Removed bool // true on Drop, false on Use
+ Caller string // who triggered the mutation (e.g. "system:init", "command:/identity")
+}
+
+func (e Event) SystemChange() (SystemChange, bool) {
+ c, ok := e.Data.(SystemChange)
+ return c, ok
+}
+
+// ToolsChange describes one mutation to the tool registry. On removal,
+// Schema.Name carries the dropped tool's name and other fields are zero.
+type ToolsChange struct {
+ Schema ToolSchema // populated on Add (zero on Remove)
+ Name string // populated on Remove (empty on Add)
+ Removed bool // true on Remove, false on Add
+ Caller string // who triggered the mutation
+}
+
+func (e Event) ToolsChange() (ToolsChange, bool) {
+ c, ok := e.Data.(ToolsChange)
+ return c, ok
+}
+
// Typed event constructors — enforce correct Data types at construction.
func StartEvent(agentID string) Event { return Event{Type: OnStart, Source: agentID} }
@@ -214,7 +277,9 @@ func StopEvent(agentID string, err error) Event {
func ChunkEvent(agentID string, c Chunk) Event { return Event{Type: OnChunk, Source: agentID, Data: c} }
func MessageEvent(msg Message) Event { return Event{Type: OnMessage, Source: msg.From, Data: msg} }
func TurnEvent(agentID string, r Result) Event { return Event{Type: OnTurn, Source: agentID, Data: r} }
-func PreInferEvent(agentID string) Event { return Event{Type: PreInfer, Source: agentID} }
+func PreInferEvent(agentID string, ctx InferenceContext) Event {
+ return Event{Type: PreInfer, Source: agentID, Data: ctx}
+}
func PostInferEvent(agentID string, r *InferResponse) Event {
return Event{Type: PostInfer, Source: agentID, Data: r}
}
@@ -223,3 +288,11 @@ func PostToolEvent(tr ToolResult) Event { return Event{Type: PostTool, Source: t
func CompactEvent(agentID string, info CompactInfo) Event {
return Event{Type: OnCompact, Source: agentID, Data: info}
}
+
+func SystemChangeEvent(agentID string, c SystemChange) Event {
+ return Event{Type: OnSystemChange, Source: agentID, Data: c}
+}
+
+func ToolsChangeEvent(agentID string, c ToolsChange) Event {
+ return Event{Type: OnToolsChange, Source: agentID, Data: c}
+}
diff --git a/internal/core/agent_impl.go b/internal/core/agent_impl.go
index 725d594..fc4f212 100644
--- a/internal/core/agent_impl.go
+++ b/internal/core/agent_impl.go
@@ -210,8 +210,6 @@ func (a *agent) ThinkAct(ctx context.Context) (*Result, error) {
}
}
- a.emit(ctx, PreInferEvent(a.id))
-
resp, err := a.streamInfer(ctx)
if err != nil {
// Reactive compaction: if prompt too long, compact and retry
@@ -424,11 +422,24 @@ func ToolCallIDFromContext(ctx context.Context) string {
// --- internals ---
// streamInfer calls the LLM, streams chunks to outbox, returns the final response.
+//
+// Emits PreInferEvent (with input digests) before the LLM call so observers
+// can record exactly what was sent without copying the bytes on every turn.
func (a *agent) streamInfer(ctx context.Context) (*InferResponse, error) {
+ sys := a.system.Prompt()
+ msgs := a.snapshot()
+ tools := a.tools.Schemas()
+
+ a.emit(ctx, PreInferEvent(a.id, InferenceContext{
+ SystemDigest: sha256Hex([]byte(sys)),
+ ToolsDigest: toolsDigest(tools),
+ MessageIDs: messageIDs(msgs),
+ }))
+
chunks, err := a.llm.Infer(ctx, InferRequest{
- System: a.system.Prompt(),
- Messages: a.snapshot(),
- Tools: a.tools.Schemas(),
+ System: sys,
+ Messages: msgs,
+ Tools: tools,
})
if err != nil {
return nil, fmt.Errorf("infer: %w", err)
@@ -475,6 +486,23 @@ func (a *agent) emit(ctx context.Context, event Event) {
}
}
+// emitTelemetry delivers a fire-and-forget event: synchronously to onEvent,
+// non-blocking to the outbox (dropped if full). Used for events whose
+// consumers tolerate misses (system changes, hot-path tracing) and which can
+// fire from goroutines without a useful ctx (e.g. system observer callbacks).
+func (a *agent) emitTelemetry(event Event) {
+ if a.onEvent != nil {
+ a.onEvent(event)
+ }
+ if a.outbox == nil || a.closed.Load() {
+ return
+ }
+ select {
+ case a.outbox <- event:
+ default:
+ }
+}
+
// emitFinal sends a critical event that must be delivered even on ctx cancellation.
// Used for StopEvent — consumers rely on it for cleanup/session saving.
// No-op when outbox is nil. Blocks up to 5 seconds; logs a warning if delivery fails.
diff --git a/internal/core/digest.go b/internal/core/digest.go
new file mode 100644
index 0000000..1312b27
--- /dev/null
+++ b/internal/core/digest.go
@@ -0,0 +1,51 @@
+package core
+
+import (
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/json"
+ "sort"
+)
+
+// sha256Hex returns "sha256:" + lowercase hex of the SHA-256 sum of b.
+// The prefix makes algorithm choice explicit on the wire.
+func sha256Hex(b []byte) string {
+ sum := sha256.Sum256(b)
+ return "sha256:" + hex.EncodeToString(sum[:])
+}
+
+// toolsDigest canonicalizes a tool schema list (sort by Name, marshal each)
+// and returns its sha256. Stable across runs as long as schemas are stable.
+func toolsDigest(schemas []ToolSchema) string {
+ if len(schemas) == 0 {
+ return sha256Hex(nil)
+ }
+ sorted := make([]ToolSchema, len(schemas))
+ copy(sorted, schemas)
+ sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name })
+
+ b, err := json.Marshal(sorted)
+ if err != nil {
+ // Marshal can only fail on unsupported types; ToolSchema fields are
+ // JSON-safe. Fall back to a digest of the names so the value is still
+ // stable rather than empty.
+ names := make([]string, len(sorted))
+ for i, s := range sorted {
+ names[i] = s.Name
+ }
+ b, _ = json.Marshal(names)
+ }
+ return sha256Hex(b)
+}
+
+// messageIDs extracts non-empty message IDs from the conversation snapshot,
+// in send order. Empty IDs (legacy data) are skipped rather than padded.
+func messageIDs(msgs []Message) []string {
+ out := make([]string, 0, len(msgs))
+ for _, m := range msgs {
+ if m.ID != "" {
+ out = append(out, m.ID)
+ }
+ }
+ return out
+}
diff --git a/internal/core/system.go b/internal/core/system.go
index 8267f7a..271e6ff 100644
--- a/internal/core/system.go
+++ b/internal/core/system.go
@@ -19,13 +19,26 @@ type System interface {
// invalidated only when sections change.
Prompt() string
- // Use registers or replaces a section by Name.
- Use(Section)
+ // Use registers or replaces a section by Name. Caller is a short tag
+ // describing what triggered the mutation (e.g. "system:init",
+ // "command:/identity") and surfaces in the trace.
+ Use(sec Section, caller string)
// Drop removes a section by Name. No-op if absent.
- Drop(name string)
+ Drop(name, caller string)
// Refresh marks one section's rendered output stale. Use after the
// section's underlying state changed but the Section value did not.
- Refresh(name string)
+ Refresh(name, caller string)
+
+ // Sections returns a snapshot of currently registered sections in
+ // render order (slot ascending, insertion order ascending). Used by
+ // observers that attach after construction to replay the existing state.
+ Sections() []Section
+
+ // SetObserver installs a callback invoked synchronously on every
+ // subsequent mutation. Attaching also replays existing sections as
+ // "added" events so the observer sees a complete history starting
+ // from the moment of attachment.
+ SetObserver(fn func(SystemChange))
}
diff --git a/internal/core/system/builder_test.go b/internal/core/system/builder_test.go
index 2121653..35e4a05 100644
--- a/internal/core/system/builder_test.go
+++ b/internal/core/system/builder_test.go
@@ -188,13 +188,13 @@ func TestSystemUseDropRefresh(t *testing.T) {
sys.Use(core.Section{
Slot: core.SlotEnvironment, Name: "test-section", Source: core.Dynamic,
Render: func() string { return "TEST_SECTION_BODY" },
- })
+ }, "test")
if !strings.Contains(sys.Prompt(), "TEST_SECTION_BODY") {
t.Error("Use should add a new section's content to Prompt()")
}
// Drop: remove it.
- sys.Drop("test-section")
+ sys.Drop("test-section", "test")
if strings.Contains(sys.Prompt(), "TEST_SECTION_BODY") {
t.Error("Drop should remove the section from Prompt()")
}
diff --git a/internal/core/system/catalog.go b/internal/core/system/catalog.go
index 405b430..22741e9 100644
--- a/internal/core/system/catalog.go
+++ b/internal/core/system/catalog.go
@@ -57,13 +57,14 @@ func loadEmbedOptional(path string) string {
// applyDefaults registers the always-on sections for a Scope.
// Options passed to Build can override identity by Name.
func applyDefaults(sys core.System, scope core.Scope) {
- sys.Use(defaultIdentity())
- sys.Use(policy())
- sys.Use(guidelines("tools", cachedTools))
+ const caller = "system:init"
+ sys.Use(defaultIdentity(), caller)
+ sys.Use(policy(), caller)
+ sys.Use(guidelines("tools", cachedTools), caller)
if scope == core.ScopeMain {
// Task tracking + interactive questions are main-agent behaviors.
- sys.Use(guidelines("tasks", cachedTasks))
- sys.Use(guidelines("questions", cachedQuestions))
+ sys.Use(guidelines("tasks", cachedTasks), caller)
+ sys.Use(guidelines("questions", cachedQuestions), caller)
}
}
@@ -139,7 +140,7 @@ func WithIdentity(text string) Option {
if text == "" {
return
}
- sys.Use(identitySection(text))
+ sys.Use(identitySection(text), "system:init")
}
}
@@ -148,10 +149,10 @@ func WithIdentity(text string) Option {
func SwapIdentity(sys core.System, text string) {
text = strings.TrimSpace(text)
if text == "" {
- sys.Use(defaultIdentity())
+ sys.Use(defaultIdentity(), "command:identity")
return
}
- sys.Use(identitySection(text))
+ sys.Use(identitySection(text), "command:identity")
}
// identitySection builds the slot-0 identity Section for a user-defined persona.
@@ -178,7 +179,7 @@ func WithProvider(name string) Option {
Render: func() string {
return wrap("provider", map[string]string{"name": name}, body)
},
- })
+ }, "system:init")
}
}
@@ -188,7 +189,7 @@ func WithGitGuidelines(isGit bool) Option {
if !isGit {
return
}
- sys.Use(guidelines("git", cachedGit))
+ sys.Use(guidelines("git", cachedGit), "system:init")
}
}
@@ -216,7 +217,7 @@ func WithSubagentIdentity(b SubagentBrief) Option {
sys.Use(core.Section{
Slot: core.SlotIdentity, Name: "identity", Source: core.Injected,
Render: func() string { return renderSubagentIdentity(b) },
- })
+ }, "subagent:init")
}
}
@@ -278,7 +279,7 @@ func WithEnvironment(env Environment) Option {
sys.Use(core.Section{
Slot: core.SlotEnvironment, Name: "environment", Source: core.Dynamic,
Render: func() string { return renderEnvironment(env) },
- })
+ }, "system:init")
}
}
diff --git a/internal/core/system_impl.go b/internal/core/system_impl.go
index 4cda4d6..b41cb27 100644
--- a/internal/core/system_impl.go
+++ b/internal/core/system_impl.go
@@ -21,6 +21,11 @@ type system struct {
counter int // monotonic insertion sequence
cached string
dirty bool
+
+ // observer is invoked synchronously on every Use/Drop after the mutation
+ // applies. Set via SetObserver; nil until then. Reads happen on the same
+ // goroutine that mutated, so no extra locking around the call.
+ observer func(SystemChange)
}
type sectionEntry struct {
@@ -57,9 +62,8 @@ func (s *system) Prompt() string {
return s.cached
}
-func (s *system) Use(sec Section) {
+func (s *system) Use(sec Section, caller string) {
s.mu.Lock()
- defer s.mu.Unlock()
if e, ok := s.sections[sec.Name]; ok {
// Replacing an existing section: preserve its insertion order so
// position in the prompt does not jump around on hot updates.
@@ -70,24 +74,102 @@ func (s *system) Use(sec Section) {
s.sections[sec.Name] = §ionEntry{def: sec, inserted: s.counter}
}
s.dirty = true
+ obs := s.observer
+ s.mu.Unlock()
+
+ if obs != nil {
+ obs(SystemChange{
+ Name: sec.Name,
+ Slot: int(sec.Slot),
+ Content: renderSection(sec),
+ Caller: caller,
+ })
+ }
}
-func (s *system) Drop(name string) {
+func (s *system) Drop(name, caller string) {
s.mu.Lock()
- defer s.mu.Unlock()
- if _, ok := s.sections[name]; ok {
+ _, existed := s.sections[name]
+ if existed {
delete(s.sections, name)
s.dirty = true
}
+ obs := s.observer
+ s.mu.Unlock()
+
+ if existed && obs != nil {
+ obs(SystemChange{Name: name, Removed: true, Caller: caller})
+ }
}
-func (s *system) Refresh(name string) {
+func (s *system) Refresh(name, caller string) {
s.mu.Lock()
- defer s.mu.Unlock()
+ var (
+ obs func(SystemChange)
+ changed bool
+ sec Section
+ )
if e, ok := s.sections[name]; ok {
e.fresh = false
s.dirty = true
+ changed = true
+ sec = e.def
+ obs = s.observer
+ }
+ s.mu.Unlock()
+
+ if changed && obs != nil {
+ obs(SystemChange{
+ Name: sec.Name,
+ Slot: int(sec.Slot),
+ Content: renderSection(sec),
+ Caller: caller,
+ })
+ }
+}
+
+// Sections returns a snapshot of currently registered sections in render order.
+func (s *system) Sections() []Section {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ entries := s.sortedEntries()
+ out := make([]Section, 0, len(entries))
+ for _, e := range entries {
+ out = append(out, e.def)
+ }
+ return out
+}
+
+// SetObserver attaches an observer for section mutations. On attach, the
+// existing sections are replayed as synthetic "added" events so the observer
+// sees a complete history starting from this moment.
+func (s *system) SetObserver(fn func(SystemChange)) {
+ s.mu.Lock()
+ s.observer = fn
+ entries := s.sortedEntries()
+ s.mu.Unlock()
+
+ if fn == nil {
+ return
+ }
+ for _, e := range entries {
+ fn(SystemChange{
+ Name: e.def.Name,
+ Slot: int(e.def.Slot),
+ Content: renderSection(e.def),
+ Caller: "system:init",
+ })
+ }
+}
+
+// renderSection invokes a Section's Render func once, returning "" on nil.
+// Used by observers that need the rendered content without going through the
+// cached-Prompt path (which joins all sections).
+func renderSection(sec Section) string {
+ if sec.Render == nil {
+ return ""
}
+ return sec.Render()
}
// sortedEntries returns entries in render order (Slot, insertion order).
diff --git a/internal/core/tool.go b/internal/core/tool.go
index f353e2d..eb6cacc 100644
--- a/internal/core/tool.go
+++ b/internal/core/tool.go
@@ -34,7 +34,15 @@ type ToolSchema struct {
type Tools interface {
Get(name string) Tool
All() []Tool
- Add(tool Tool)
- Remove(name string)
+ // Add registers (or replaces) a tool. Caller tags the mutation source
+ // (e.g. "mcp:weather", "agent:init") for trace records.
+ Add(tool Tool, caller string)
+ // Remove unregisters a tool by name. No-op if absent.
+ Remove(name, caller string)
Schemas() []ToolSchema
+
+ // SetObserver installs a callback invoked synchronously on every
+ // subsequent Add/Remove. Attaching also replays existing tools as
+ // synthetic Add events so the observer sees the full registry from t0.
+ SetObserver(fn func(ToolsChange))
}
diff --git a/internal/core/tool_impl.go b/internal/core/tool_impl.go
index 4080c44..589b863 100644
--- a/internal/core/tool_impl.go
+++ b/internal/core/tool_impl.go
@@ -13,6 +13,8 @@ type toolSet struct {
tools map[string]Tool
dirty bool // true when schemas cache needs rebuild
cache []ToolSchema // cached schemas
+
+ observer func(ToolsChange) // invoked after Add/Remove; nil until SetObserver
}
// NewTools creates an empty tool set.
@@ -44,20 +46,52 @@ func (s *toolSet) All() []Tool {
return out
}
-func (s *toolSet) Add(tool Tool) {
+func (s *toolSet) Add(tool Tool, caller string) {
s.mu.Lock()
- defer s.mu.Unlock()
s.tools[tool.Name()] = tool
s.dirty = true
+ obs := s.observer
+ s.mu.Unlock()
+
+ if obs != nil {
+ obs(ToolsChange{Schema: tool.Schema(), Caller: caller})
+ }
}
-func (s *toolSet) Remove(name string) {
+func (s *toolSet) Remove(name, caller string) {
s.mu.Lock()
- defer s.mu.Unlock()
- if _, ok := s.tools[name]; ok {
+ _, existed := s.tools[name]
+ if existed {
delete(s.tools, name)
s.dirty = true
}
+ obs := s.observer
+ s.mu.Unlock()
+
+ if existed && obs != nil {
+ obs(ToolsChange{Name: name, Removed: true, Caller: caller})
+ }
+}
+
+// SetObserver attaches a callback invoked on every Add/Remove. On attach,
+// existing tools are replayed as Add events with caller="tools:init".
+func (s *toolSet) SetObserver(fn func(ToolsChange)) {
+ s.mu.Lock()
+ s.observer = fn
+ snapshot := make([]Tool, 0, len(s.tools))
+ for _, t := range s.tools {
+ snapshot = append(snapshot, t)
+ }
+ s.mu.Unlock()
+
+ if fn == nil {
+ return
+ }
+ // Stable order so replay is reproducible across processes.
+ sort.Slice(snapshot, func(i, j int) bool { return snapshot[i].Name() < snapshot[j].Name() })
+ for _, t := range snapshot {
+ fn(ToolsChange{Schema: t.Schema(), Caller: "tools:init"})
+ }
}
func (s *toolSet) Schemas() []ToolSchema {
diff --git a/internal/session/message_convert.go b/internal/session/message_convert.go
index d862ad8..2b3dfe1 100644
--- a/internal/session/message_convert.go
+++ b/internal/session/message_convert.go
@@ -13,6 +13,47 @@ import (
var inlineImageTokenPattern = regexp.MustCompile(`\[Image #(\d+)\]`)
+// systemReminderRe matches a complete ...
+// block including tags. Reminders are appended verbatim by reminder.AttachToContent
+// and hook UserPromptSubmit additionalContext flows through the same path, so
+// recognizing the wrapper suffices to mark harness-injected content.
+var systemReminderRe = regexp.MustCompile(`(?s).*?`)
+
+const SourceReminder = "reminder"
+
+// splitTextByProvenance returns ContentBlocks that together preserve the input
+// byte-for-byte, marking blocks with Source="reminder" so
+// traces can distinguish user-typed text from harness-injected reminders /
+// hook additionalContext. Empty input returns nil.
+//
+// The function never trims whitespace — concatenating all returned Text fields
+// in order reproduces the original input. This keeps the read path
+// (extractUserContent, which joins text blocks) round-trip safe.
+func splitTextByProvenance(text string) []ContentBlock {
+ if text == "" {
+ return nil
+ }
+ matches := systemReminderRe.FindAllStringIndex(text, -1)
+ if len(matches) == 0 {
+ return []ContentBlock{{Type: "text", Text: text}}
+ }
+
+ blocks := make([]ContentBlock, 0, 2*len(matches)+1)
+ cursor := 0
+ for _, m := range matches {
+ start, end := m[0], m[1]
+ if start > cursor {
+ blocks = append(blocks, ContentBlock{Type: "text", Text: text[cursor:start]})
+ }
+ blocks = append(blocks, ContentBlock{Type: "text", Text: text[start:end], Source: SourceReminder})
+ cursor = end
+ }
+ if cursor < len(text) {
+ blocks = append(blocks, ContentBlock{Type: "text", Text: text[cursor:]})
+ }
+ return blocks
+}
+
func messagesToEntries(msgs []core.Message) []Entry {
entries := make([]Entry, 0, len(msgs))
var prevUUID string
@@ -106,9 +147,7 @@ func userContentToBlocks(content, displayContent string, images []core.Image) []
ImageSource: &ImageSource{Type: "base64", MediaType: img.MediaType, Data: img.Data},
})
}
- if content != "" {
- blocks = append(blocks, ContentBlock{Type: "text", Text: content})
- }
+ blocks = append(blocks, splitTextByProvenance(content)...)
return blocks
}
@@ -123,9 +162,8 @@ func interleavedUserContentToBlocks(content, displayContent string, images []cor
start, end := match[0], match[1]
idStart, idEnd := match[2], match[3]
- textPart := displayContent[last:start]
- if textPart != "" {
- blocks = append(blocks, ContentBlock{Type: "text", Text: textPart})
+ if textPart := displayContent[last:start]; textPart != "" {
+ blocks = append(blocks, splitTextByProvenance(textPart)...)
}
id, err := strconv.Atoi(displayContent[idStart:idEnd])
@@ -143,11 +181,11 @@ func interleavedUserContentToBlocks(content, displayContent string, images []cor
}
if tail := displayContent[last:]; tail != "" {
- blocks = append(blocks, ContentBlock{Type: "text", Text: tail})
+ blocks = append(blocks, splitTextByProvenance(tail)...)
}
if len(blocks) == 0 && content != "" {
- blocks = append(blocks, ContentBlock{Type: "text", Text: content})
+ blocks = append(blocks, splitTextByProvenance(content)...)
}
return blocks
diff --git a/internal/session/message_convert_test.go b/internal/session/message_convert_test.go
index 5d23a61..7716d71 100644
--- a/internal/session/message_convert_test.go
+++ b/internal/session/message_convert_test.go
@@ -1,11 +1,61 @@
package session
import (
+ "strings"
"testing"
"github.com/genai-io/gen-code/internal/core"
)
+// A user message that contains harness-injected blocks must
+// be persisted as multiple ContentBlocks: user-typed text with empty Source,
+// reminder text with Source="reminder". Concatenating the text fields must
+// reproduce the input byte-for-byte (round-trip safety for read path).
+func Test_userContentToBlocks_splitsByProvenance(t *testing.T) {
+ const reminder1 = "\nskills directory\n"
+ const reminder2 = "\nuser memory\n"
+ input := "hello\n\n" + reminder1 + "\n\n" + reminder2
+
+ blocks := userContentToBlocks(input, "", nil)
+
+ var sb strings.Builder
+ var reminderCount, userCount int
+ for _, b := range blocks {
+ if b.Type != "text" {
+ t.Fatalf("unexpected block type: %q", b.Type)
+ }
+ sb.WriteString(b.Text)
+ switch b.Source {
+ case SourceReminder:
+ reminderCount++
+ if !strings.HasPrefix(b.Text, "") {
+ t.Errorf("reminder block missing wrapper: %q", b.Text)
+ }
+ case "":
+ userCount++
+ default:
+ t.Fatalf("unexpected Source: %q", b.Source)
+ }
+ }
+ if sb.String() != input {
+ t.Fatalf("round-trip mismatch:\n got: %q\nwant: %q", sb.String(), input)
+ }
+ if reminderCount != 2 {
+ t.Fatalf("reminder block count = %d, want 2", reminderCount)
+ }
+ if userCount == 0 {
+ t.Fatalf("expected at least one user block, got %d", userCount)
+ }
+}
+
+// Plain user content with no reminders produces exactly one user-text block.
+func Test_userContentToBlocks_plainTextOneBlock(t *testing.T) {
+ blocks := userContentToBlocks("just a question", "", nil)
+ if len(blocks) != 1 || blocks[0].Type != "text" || blocks[0].Source != "" {
+ t.Fatalf("expected 1 user-text block, got %+v", blocks)
+ }
+}
+
func Test_messagesToEntries_roundtrip(t *testing.T) {
// Test that messagesToEntries -> EntriesToMessages roundtrips correctly.
msgs := []core.Message{
diff --git a/internal/session/recorder.go b/internal/session/recorder.go
new file mode 100644
index 0000000..b9ace74
--- /dev/null
+++ b/internal/session/recorder.go
@@ -0,0 +1,258 @@
+package session
+
+import (
+ "context"
+ "encoding/json"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "go.uber.org/zap"
+
+ "github.com/genai-io/gen-code/internal/core"
+ "github.com/genai-io/gen-code/internal/log"
+ "github.com/genai-io/gen-code/internal/session/transcript"
+)
+
+// Recorder turns core.Agent lifecycle events into transcript records.
+//
+// One Recorder is bound to one (sessionID, agentID) pair. The OnAgentEvent
+// method is meant to be passed as core.Config.OnEvent — it's called
+// synchronously from the agent goroutine, so handlers stay fast.
+//
+// The recorder is additive: existing code paths (Store.Save) keep persisting
+// messages and state. The recorder writes only event-driven records
+// (inference.requested / inference.responded) that the snapshot path can't
+// observe.
+type Recorder struct {
+ fs *transcript.FileStore
+ sessionID string
+ agentID string
+ provider string
+ model string
+ maxTokens int
+
+ turn atomic.Int64
+
+ mu sync.Mutex
+ lastRequest *requestState
+}
+
+type requestState struct {
+ turn int
+ startedAt time.Time
+ messageIDs []string
+}
+
+// RecorderOptions configures a Recorder. provider/model/maxTokens are captured
+// at construction time and stamped on every inference record. Mid-session
+// model swaps will be handled by a future model.changed event, not by mutating
+// the recorder.
+//
+// Cwd and ProjectID are needed at construction so the recorder can write
+// session.started before any telemetry — see NewRecorder for why.
+type RecorderOptions struct {
+ FileStore *transcript.FileStore
+ SessionID string
+ AgentID string
+ Provider string
+ Model string
+ MaxTokens int
+ Cwd string
+ ProjectID string
+}
+
+// NewRecorder constructs a Recorder and ensures session.started lands on
+// disk BEFORE the recorder can be invoked. This matters because the caller
+// (core.NewAgent → System/Tools.SetObserver) immediately replays existing
+// sections/tools as synthetic events; those events create the transcript
+// file via AppendSystemSection / AppendTools. If session.started hasn't
+// been written first, the subsequent Store.Save → Start call short-circuits
+// on fileExists and provider/model/parent metadata is lost.
+//
+// Start is idempotent — a no-op on existing files — so callers (Store.Save)
+// that also invoke it stay correct.
+func NewRecorder(opts RecorderOptions) *Recorder {
+ if opts.FileStore != nil && opts.SessionID != "" {
+ _ = opts.FileStore.Start(context.Background(), transcript.StartCommand{
+ SessionID: opts.SessionID,
+ ProjectID: opts.ProjectID,
+ Cwd: opts.Cwd,
+ Provider: opts.Provider,
+ Model: opts.Model,
+ Time: time.Now(),
+ })
+ }
+ return &Recorder{
+ fs: opts.FileStore,
+ sessionID: opts.SessionID,
+ agentID: opts.AgentID,
+ provider: opts.Provider,
+ model: opts.Model,
+ maxTokens: opts.MaxTokens,
+ }
+}
+
+// OnAgentEvent is the core.Config.OnEvent callback. It dispatches by event
+// type and writes the corresponding transcript record. Errors are logged
+// rather than propagated — failing to record telemetry must not break the
+// running session.
+func (r *Recorder) OnAgentEvent(ev core.Event) {
+ if r == nil || r.fs == nil || r.sessionID == "" {
+ return
+ }
+ switch ev.Type {
+ case core.PreInfer:
+ r.onPreInfer(ev)
+ case core.PostInfer:
+ r.onPostInfer(ev)
+ case core.OnSystemChange:
+ r.onSystemChange(ev)
+ case core.OnToolsChange:
+ r.onToolsChange(ev)
+ }
+}
+
+func (r *Recorder) onToolsChange(ev core.Event) {
+ c, ok := ev.ToolsChange()
+ if !ok {
+ return
+ }
+ typ := transcript.ToolsAdded
+ payload := transcript.ToolsRecord{Caller: c.Caller}
+ if c.Removed {
+ typ = transcript.ToolsRemoved
+ payload.Name = c.Name
+ } else {
+ payload.Schema = toolSchemaView(c.Schema)
+ }
+ err := r.fs.AppendTools(context.Background(), transcript.AppendToolsCommand{
+ SessionID: r.sessionID,
+ AgentID: r.agentID,
+ Time: time.Now(),
+ Type: typ,
+ Record: payload,
+ })
+ if err != nil {
+ log.Logger().Warn("recorder: append tools change failed", zap.Error(err))
+ }
+}
+
+func toolSchemaView(s core.ToolSchema) *transcript.ToolSchemaView {
+ view := &transcript.ToolSchemaView{
+ Name: s.Name,
+ Description: s.Description,
+ }
+ if s.Parameters != nil {
+ if data, err := json.Marshal(s.Parameters); err == nil {
+ view.Parameters = data
+ }
+ }
+ return view
+}
+
+func (r *Recorder) onSystemChange(ev core.Event) {
+ c, ok := ev.SystemChange()
+ if !ok {
+ return
+ }
+ typ := transcript.SystemSectionAdded
+ if c.Removed {
+ typ = transcript.SystemSectionRemoved
+ }
+ err := r.fs.AppendSystemSection(context.Background(), transcript.AppendSystemSectionCommand{
+ SessionID: r.sessionID,
+ AgentID: r.agentID,
+ Time: time.Now(),
+ Type: typ,
+ Record: transcript.SystemSectionRecord{
+ Name: c.Name,
+ Slot: c.Slot,
+ Content: c.Content,
+ Caller: c.Caller,
+ },
+ })
+ if err != nil {
+ log.Logger().Warn("recorder: append system section failed", zap.Error(err))
+ }
+}
+
+func (r *Recorder) onPreInfer(ev core.Event) {
+ ic, ok := ev.InferenceContext()
+ if !ok {
+ return
+ }
+
+ turn := int(r.turn.Add(1))
+ now := time.Now()
+
+ r.mu.Lock()
+ r.lastRequest = &requestState{
+ turn: turn,
+ startedAt: now,
+ messageIDs: ic.MessageIDs,
+ }
+ r.mu.Unlock()
+
+ err := r.fs.AppendInference(context.Background(), transcript.AppendInferenceCommand{
+ SessionID: r.sessionID,
+ AgentID: r.agentID,
+ Time: now,
+ Type: transcript.InferenceRequested,
+ Record: transcript.InferenceRecord{
+ Turn: turn,
+ Provider: r.provider,
+ Model: r.model,
+ MaxTokens: r.maxTokens,
+ SystemDigest: ic.SystemDigest,
+ ToolsDigest: ic.ToolsDigest,
+ MessageIDs: ic.MessageIDs,
+ },
+ })
+ if err != nil {
+ log.Logger().Warn("recorder: append inference.requested failed", zap.Error(err))
+ }
+}
+
+func (r *Recorder) onPostInfer(ev core.Event) {
+ resp, ok := ev.Response()
+ if !ok || resp == nil {
+ return
+ }
+
+ r.mu.Lock()
+ prev := r.lastRequest
+ r.lastRequest = nil
+ r.mu.Unlock()
+
+ now := time.Now()
+ var turn int
+ var latencyMs int64
+ if prev != nil {
+ turn = prev.turn
+ latencyMs = now.Sub(prev.startedAt).Milliseconds()
+ }
+
+ err := r.fs.AppendInference(context.Background(), transcript.AppendInferenceCommand{
+ SessionID: r.sessionID,
+ AgentID: r.agentID,
+ Time: now,
+ Type: transcript.InferenceResponded,
+ Record: transcript.InferenceRecord{
+ Turn: turn,
+ Provider: r.provider,
+ Model: r.model,
+ StopReason: string(resp.StopReason),
+ LatencyMs: latencyMs,
+ Usage: &transcript.InferenceUsage{
+ InputTokens: resp.TokensIn,
+ OutputTokens: resp.TokensOut,
+ CacheCreateTokens: resp.CacheCreateTokens,
+ CacheReadTokens: resp.CacheReadTokens,
+ },
+ },
+ })
+ if err != nil {
+ log.Logger().Warn("recorder: append inference.responded failed", zap.Error(err))
+ }
+}
diff --git a/internal/session/recorder_lifecycle_test.go b/internal/session/recorder_lifecycle_test.go
new file mode 100644
index 0000000..adc29d7
--- /dev/null
+++ b/internal/session/recorder_lifecycle_test.go
@@ -0,0 +1,72 @@
+package session
+
+import (
+ "encoding/json"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "github.com/genai-io/gen-code/internal/core"
+ "github.com/genai-io/gen-code/internal/session/transcript"
+)
+
+// Regression: NewRecorder must write session.started before returning, so
+// that subsequent telemetry writes from SetObserver replay don't beat it to
+// the disk. If session.started were written lazily (or only by Store.Save),
+// the transcript file would already exist when Save calls Start — Start
+// would short-circuit and provider/model metadata would be lost.
+func TestRecorder_WritesSessionStartedBeforeTelemetry(t *testing.T) {
+ dir := t.TempDir()
+ fs, err := transcript.NewFileStore(dir, "proj-1")
+ if err != nil {
+ t.Fatalf("NewFileStore: %v", err)
+ }
+ // Construct the recorder — should write session.started synchronously.
+ rec := NewRecorder(RecorderOptions{
+ FileStore: fs,
+ SessionID: "sess",
+ AgentID: "main",
+ Provider: "anthropic",
+ Model: "claude-x",
+ MaxTokens: 4096,
+ Cwd: "/tmp/project",
+ ProjectID: "proj-1",
+ })
+
+ // Fire a SystemChange right after — mirroring what SetObserver replay
+ // does inside core.NewAgent.
+ rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{
+ Name: "identity", Slot: 0, Content: "You are X", Caller: "system:init",
+ }})
+
+ data, err := os.ReadFile(filepath.Join(dir, "transcripts", "sess.jsonl"))
+ if err != nil {
+ t.Fatalf("ReadFile: %v", err)
+ }
+ var firstType, foundProvider, foundModel string
+ for i, line := range strings.Split(string(data), "\n") {
+ if line == "" {
+ continue
+ }
+ var raw map[string]any
+ if err := json.Unmarshal([]byte(line), &raw); err != nil {
+ t.Fatalf("line %d not JSON: %v", i, err)
+ }
+ if i == 0 {
+ firstType, _ = raw["type"].(string)
+ }
+ if t, _ := raw["type"].(string); t == "session.started" {
+ if sess, ok := raw["session"].(map[string]any); ok {
+ foundProvider, _ = sess["provider"].(string)
+ foundModel, _ = sess["model"].(string)
+ }
+ }
+ }
+ if firstType != "session.started" {
+ t.Fatalf("first record type = %q, want session.started", firstType)
+ }
+ if foundProvider != "anthropic" || foundModel != "claude-x" {
+ t.Fatalf("session.started provider=%q model=%q, want anthropic/claude-x", foundProvider, foundModel)
+ }
+}
diff --git a/internal/session/recorder_test.go b/internal/session/recorder_test.go
new file mode 100644
index 0000000..2ef6ef6
--- /dev/null
+++ b/internal/session/recorder_test.go
@@ -0,0 +1,250 @@
+package session
+
+import (
+ "bufio"
+ "context"
+ "encoding/json"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/genai-io/gen-code/internal/core"
+ "github.com/genai-io/gen-code/internal/session/transcript"
+)
+
+// One turn through PreInfer + PostInfer must produce one inference.requested
+// and one inference.responded record, with the request's MessageIDs/digests
+// preserved and the response's usage/stop_reason populated. The turn counter
+// links the pair.
+func TestRecorderWritesRequestedAndRespondedPerTurn(t *testing.T) {
+ dir := t.TempDir()
+ fs, err := transcript.NewFileStore(dir, "proj-1")
+ if err != nil {
+ t.Fatalf("NewFileStore: %v", err)
+ }
+ if err := fs.Start(context.Background(), transcript.StartCommand{
+ SessionID: "sess-1", Cwd: "/tmp", Provider: "anthropic", Model: "claude-x", Time: time.Now(),
+ }); err != nil {
+ t.Fatalf("Start: %v", err)
+ }
+
+ rec := NewRecorder(RecorderOptions{
+ FileStore: fs, SessionID: "sess-1", AgentID: "main",
+ Provider: "anthropic", Model: "claude-x", MaxTokens: 4096,
+ })
+
+ rec.OnAgentEvent(core.Event{Type: core.PreInfer, Source: "main", Data: core.InferenceContext{
+ SystemDigest: "sha256:sys", ToolsDigest: "sha256:tools", MessageIDs: []string{"m1", "m2"},
+ }})
+ rec.OnAgentEvent(core.Event{Type: core.PostInfer, Source: "main", Data: &core.InferResponse{
+ StopReason: core.StopEndTurn, TokensIn: 42, TokensOut: 8, CacheReadTokens: 10,
+ }})
+
+ tx, err := fs.Load(context.Background(), "sess-1")
+ if err != nil {
+ t.Fatalf("Load: %v", err)
+ }
+ // Project() flattens to messages; inspect raw records via a re-load instead.
+ // Easiest path: read the file directly via Load and look at the messages —
+ // but inference records aren't messages. Use the file to read raw records.
+ _ = tx
+
+ // Read raw records from disk to assert on inference records.
+ records := readAllRecords(t, dir, "sess-1")
+
+ var reqs, resps []transcript.Record
+ for _, r := range records {
+ switch r.Type {
+ case transcript.InferenceRequested:
+ reqs = append(reqs, r)
+ case transcript.InferenceResponded:
+ resps = append(resps, r)
+ }
+ }
+ if len(reqs) != 1 {
+ t.Fatalf("inference.requested count = %d, want 1", len(reqs))
+ }
+ if len(resps) != 1 {
+ t.Fatalf("inference.responded count = %d, want 1", len(resps))
+ }
+
+ req := reqs[0].Inference
+ if req == nil {
+ t.Fatal("request record missing Inference payload")
+ }
+ if req.Turn != 1 {
+ t.Fatalf("requested.Turn = %d, want 1", req.Turn)
+ }
+ if req.SystemDigest != "sha256:sys" || req.ToolsDigest != "sha256:tools" {
+ t.Fatalf("requested digests = %+v", req)
+ }
+ if len(req.MessageIDs) != 2 || req.MessageIDs[0] != "m1" {
+ t.Fatalf("requested.MessageIDs = %+v", req.MessageIDs)
+ }
+
+ resp := resps[0].Inference
+ if resp == nil {
+ t.Fatal("response record missing Inference payload")
+ }
+ if resp.Turn != 1 {
+ t.Fatalf("responded.Turn = %d, want 1 (must match request)", resp.Turn)
+ }
+ if resp.StopReason != string(core.StopEndTurn) {
+ t.Fatalf("responded.StopReason = %q", resp.StopReason)
+ }
+ if resp.Usage == nil || resp.Usage.InputTokens != 42 || resp.Usage.OutputTokens != 8 || resp.Usage.CacheReadTokens != 10 {
+ t.Fatalf("responded.Usage = %+v", resp.Usage)
+ }
+}
+
+// System mutations must produce one system.section.added per added/replaced
+// section and one system.section.removed per dropped section. Caller info is
+// preserved verbatim so trace consumers can answer "who changed this?".
+func TestRecorderWritesSystemSectionEvents(t *testing.T) {
+ dir := t.TempDir()
+ fs, err := transcript.NewFileStore(dir, "proj-1")
+ if err != nil {
+ t.Fatalf("NewFileStore: %v", err)
+ }
+ if err := fs.Start(context.Background(), transcript.StartCommand{
+ SessionID: "sess-sys", Cwd: "/tmp", Provider: "p", Model: "m", Time: time.Now(),
+ }); err != nil {
+ t.Fatalf("Start: %v", err)
+ }
+
+ rec := NewRecorder(RecorderOptions{
+ FileStore: fs, SessionID: "sess-sys", AgentID: "main",
+ Provider: "p", Model: "m", MaxTokens: 1,
+ })
+
+ rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{
+ Name: "identity", Slot: 0, Content: "You are X", Caller: "system:init",
+ }})
+ rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{
+ Name: "identity", Slot: 0, Content: "You are Y", Caller: "command:/identity",
+ }})
+ rec.OnAgentEvent(core.Event{Type: core.OnSystemChange, Data: core.SystemChange{
+ Name: "policy", Removed: true, Caller: "test:teardown",
+ }})
+
+ records := readAllRecords(t, dir, "sess-sys")
+ var added, removed []transcript.Record
+ for _, r := range records {
+ switch r.Type {
+ case transcript.SystemSectionAdded:
+ added = append(added, r)
+ case transcript.SystemSectionRemoved:
+ removed = append(removed, r)
+ }
+ }
+ if len(added) != 2 {
+ t.Fatalf("system.section.added count = %d, want 2", len(added))
+ }
+ if len(removed) != 1 {
+ t.Fatalf("system.section.removed count = %d, want 1", len(removed))
+ }
+ if added[1].System.Caller != "command:/identity" || added[1].System.Content != "You are Y" {
+ t.Fatalf("replaced section payload = %+v", added[1].System)
+ }
+ if removed[0].System.Name != "policy" || removed[0].System.Caller != "test:teardown" {
+ t.Fatalf("removed section payload = %+v", removed[0].System)
+ }
+}
+
+// Tool registry changes must produce one tools.added per Add and one
+// tools.removed per Remove. Schema/Name and Caller are preserved.
+func TestRecorderWritesToolsChangeEvents(t *testing.T) {
+ dir := t.TempDir()
+ fs, err := transcript.NewFileStore(dir, "proj-1")
+ if err != nil {
+ t.Fatalf("NewFileStore: %v", err)
+ }
+ if err := fs.Start(context.Background(), transcript.StartCommand{
+ SessionID: "sess-tools", Cwd: "/tmp", Provider: "p", Model: "m", Time: time.Now(),
+ }); err != nil {
+ t.Fatalf("Start: %v", err)
+ }
+
+ rec := NewRecorder(RecorderOptions{
+ FileStore: fs, SessionID: "sess-tools", AgentID: "main",
+ Provider: "p", Model: "m", MaxTokens: 1,
+ })
+
+ rec.OnAgentEvent(core.Event{Type: core.OnToolsChange, Data: core.ToolsChange{
+ Schema: core.ToolSchema{Name: "Read", Description: "read a file"},
+ Caller: "tools:init",
+ }})
+ rec.OnAgentEvent(core.Event{Type: core.OnToolsChange, Data: core.ToolsChange{
+ Schema: core.ToolSchema{Name: "Bash", Description: "run shell"},
+ Caller: "mcp:Bash",
+ }})
+ rec.OnAgentEvent(core.Event{Type: core.OnToolsChange, Data: core.ToolsChange{
+ Name: "Bash", Removed: true, Caller: "mode:plan",
+ }})
+
+ records := readAllRecords(t, dir, "sess-tools")
+ var added, removed []transcript.Record
+ for _, r := range records {
+ switch r.Type {
+ case transcript.ToolsAdded:
+ added = append(added, r)
+ case transcript.ToolsRemoved:
+ removed = append(removed, r)
+ }
+ }
+ if len(added) != 2 {
+ t.Fatalf("tools.added count = %d, want 2", len(added))
+ }
+ if len(removed) != 1 {
+ t.Fatalf("tools.removed count = %d, want 1", len(removed))
+ }
+ if added[0].Tools.Schema == nil || added[0].Tools.Schema.Name != "Read" || added[0].Tools.Caller != "tools:init" {
+ t.Fatalf("first added record = %+v", added[0].Tools)
+ }
+ if removed[0].Tools.Name != "Bash" || removed[0].Tools.Caller != "mode:plan" {
+ t.Fatalf("removed record = %+v", removed[0].Tools)
+ }
+}
+
+// A nil-safe recorder (no FileStore) must accept events without panicking.
+func TestRecorderNilSafe(t *testing.T) {
+ var rec *Recorder
+ rec.OnAgentEvent(core.Event{Type: core.PreInfer})
+
+ empty := NewRecorder(RecorderOptions{})
+ empty.OnAgentEvent(core.Event{Type: core.PreInfer})
+}
+
+func readAllRecords(t *testing.T, baseDir, sessionID string) []transcript.Record {
+ t.Helper()
+ fs, err := transcript.NewFileStore(baseDir, "proj-1")
+ if err != nil {
+ t.Fatalf("NewFileStore (reread): %v", err)
+ }
+ path := fs.TranscriptPath(sessionID)
+ f, err := os.Open(path)
+ if err != nil {
+ t.Fatalf("open transcript: %v", err)
+ }
+ defer f.Close()
+
+ var out []transcript.Record
+ scanner := bufio.NewScanner(f)
+ scanner.Buffer(make([]byte, 1024*1024), 16*1024*1024)
+ for scanner.Scan() {
+ line := strings.TrimSpace(scanner.Text())
+ if line == "" {
+ continue
+ }
+ var r transcript.Record
+ if err := json.Unmarshal([]byte(line), &r); err != nil {
+ t.Fatalf("decode record: %v", err)
+ }
+ out = append(out, r)
+ }
+ if err := scanner.Err(); err != nil {
+ t.Fatalf("scan transcript: %v", err)
+ }
+ return out
+}
diff --git a/internal/session/service.go b/internal/session/service.go
index 64f1625..04d13f9 100644
--- a/internal/session/service.go
+++ b/internal/session/service.go
@@ -26,6 +26,9 @@ type Service interface {
LoadLatest() (*Snapshot, error)
List() ([]*SessionMetadata, error)
Fork(id string) (*Snapshot, error)
+
+ // tracing
+ NewRecorder(agentID, provider, model string, maxTokens int) *Recorder
}
// Compile-time check: *Setup implements Service.
diff --git a/internal/session/setup.go b/internal/session/setup.go
index 1d2e621..7ce0248 100644
--- a/internal/session/setup.go
+++ b/internal/session/setup.go
@@ -134,3 +134,27 @@ func (s *Setup) Fork(id string) (*Snapshot, error) {
}
return st.Fork(id)
}
+
+// NewRecorder binds a Recorder to the current session and transcript store.
+// Returns nil if the store is not initialized — callers can pass the nil
+// result through to core.Config.OnEvent safely because Recorder.OnAgentEvent
+// is nil-safe.
+func (s *Setup) NewRecorder(agentID, provider, model string, maxTokens int) *Recorder {
+ s.mu.RLock()
+ st := s.Store
+ sessionID := s.SessionID
+ s.mu.RUnlock()
+ if st == nil || st.transcriptStore == nil || sessionID == "" {
+ return nil
+ }
+ return NewRecorder(RecorderOptions{
+ FileStore: st.transcriptStore,
+ SessionID: sessionID,
+ AgentID: agentID,
+ Provider: provider,
+ Model: model,
+ MaxTokens: maxTokens,
+ Cwd: st.cwd,
+ ProjectID: st.projectID,
+ })
+}
diff --git a/internal/session/transcript/fs_store.go b/internal/session/transcript/fs_store.go
index 7712829..3d4beb9 100644
--- a/internal/session/transcript/fs_store.go
+++ b/internal/session/transcript/fs_store.go
@@ -149,6 +149,78 @@ func (s *FileStore) PatchState(ctx context.Context, cmd PatchStateCommand) error
return s.refreshIndexLocked(cmd.SessionID)
}
+func (s *FileStore) AppendTools(ctx context.Context, cmd AppendToolsCommand) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ if cmd.Type != ToolsAdded && cmd.Type != ToolsRemoved {
+ return fmt.Errorf("append tools: unexpected type %q", cmd.Type)
+ }
+
+ rec := cmd.Record
+ full := Record{
+ ID: fmt.Sprintf("%s:tools:%d", cmd.SessionID, cmd.Time.UnixNano()),
+ SessionID: cmd.SessionID,
+ Time: cmd.Time,
+ Type: cmd.Type,
+ AgentID: cmd.AgentID,
+ Tools: &rec,
+ }
+ return s.appendRecord(s.transcriptPath(cmd.SessionID), full, false)
+}
+
+func (s *FileStore) AppendSystemSection(ctx context.Context, cmd AppendSystemSectionCommand) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ if cmd.Type != SystemSectionAdded && cmd.Type != SystemSectionRemoved {
+ return fmt.Errorf("append system section: unexpected type %q", cmd.Type)
+ }
+
+ rec := cmd.Record
+ full := Record{
+ ID: fmt.Sprintf("%s:system:%d", cmd.SessionID, cmd.Time.UnixNano()),
+ SessionID: cmd.SessionID,
+ Time: cmd.Time,
+ Type: cmd.Type,
+ AgentID: cmd.AgentID,
+ System: &rec,
+ }
+ return s.appendRecord(s.transcriptPath(cmd.SessionID), full, false)
+}
+
+func (s *FileStore) AppendInference(ctx context.Context, cmd AppendInferenceCommand) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ if cmd.Type != InferenceRequested && cmd.Type != InferenceResponded {
+ return fmt.Errorf("append inference: unexpected type %q", cmd.Type)
+ }
+
+ rec := cmd.Record
+ full := Record{
+ ID: fmt.Sprintf("%s:inference:%d", cmd.SessionID, cmd.Time.UnixNano()),
+ SessionID: cmd.SessionID,
+ Time: cmd.Time,
+ Type: cmd.Type,
+ AgentID: cmd.AgentID,
+ Inference: &rec,
+ }
+ // inference.responded sync-flushes the file so any preceding telemetry
+ // (system.section.*, tools.*, inference.requested) from this turn lands
+ // on disk together; inference.requested itself stays in the page cache.
+ return s.appendRecord(s.transcriptPath(cmd.SessionID), full, cmd.Type == InferenceResponded)
+}
+
func (s *FileStore) Compact(ctx context.Context, cmd CompactCommand) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -355,15 +427,19 @@ func (s *FileStore) indexPath() string {
}
// appendRecord writes one record to the JSONL. fsync is gated by sync so
-// hot-path telemetry can be buffered in the OS page cache and rolled up to
-// disk at turn boundaries.
+// hot-path telemetry (system.section.*, tools.*, inference.requested) can be
+// buffered in the OS page cache and rolled up to disk at turn boundaries.
//
// Durability classes:
-// - sync=true: user input (message.appended), lifecycle events
-// (session.started, session.compacted), and turn-completion writes.
+// - sync=true: user input (message.appended), turn-completion
+// (inference.responded), lifecycle events (session.started, session.compacted).
// A crash after these must not lose them.
-// - sync=false: pure telemetry (state.patched today; more in follow-up
-// commits). Worst case on crash: in-flight turn's telemetry is lost.
+// - sync=false: pure telemetry. Worst case on crash: the in-flight turn's
+// telemetry is lost, but the message/state of preceding turns is intact
+// because their write fsynced.
+//
+// Single-process per file: the append+close pair preserves order regardless
+// of fsync; durability is the only thing the flag toggles.
func (s *FileStore) appendRecord(path string, rec Record, sync bool) error {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return fmt.Errorf("create transcript dir: %w", err)
diff --git a/internal/session/transcript/records.go b/internal/session/transcript/records.go
index ef489b2..4ae5c8c 100644
--- a/internal/session/transcript/records.go
+++ b/internal/session/transcript/records.go
@@ -8,11 +8,17 @@ import (
// Record type values follow . (past tense), lowercase,
// dot-separated. See docs/tracing.md for the full taxonomy.
const (
- SessionStarted = "session.started"
- SessionForked = "session.forked"
- SessionCompacted = "session.compacted"
- MessageAppended = "message.appended"
- StatePatched = "state.patched"
+ SessionStarted = "session.started"
+ SessionForked = "session.forked"
+ SessionCompacted = "session.compacted"
+ MessageAppended = "message.appended"
+ StatePatched = "state.patched"
+ InferenceRequested = "inference.requested"
+ InferenceResponded = "inference.responded"
+ SystemSectionAdded = "system.section.added"
+ SystemSectionRemoved = "system.section.removed"
+ ToolsAdded = "tools.added"
+ ToolsRemoved = "tools.removed"
)
const (
@@ -37,9 +43,12 @@ type Record struct {
GitBranch string `json:"gitBranch,omitempty"`
AgentID string `json:"agentId,omitempty"`
- Message *MessageRecord `json:"message,omitempty"`
- State *StateRecord `json:"state,omitempty"`
- Session *SessionRecord `json:"session,omitempty"`
+ Message *MessageRecord `json:"message,omitempty"`
+ State *StateRecord `json:"state,omitempty"`
+ Session *SessionRecord `json:"session,omitempty"`
+ Inference *InferenceRecord `json:"inference,omitempty"`
+ System *SystemSectionRecord `json:"system,omitempty"`
+ Tools *ToolsRecord `json:"tools,omitempty"`
}
type MessageRecord struct {
@@ -68,6 +77,63 @@ type SessionRecord struct {
BoundaryID string `json:"boundaryId,omitempty"`
}
+// InferenceRecord carries the payload for inference.requested /
+// inference.responded. The "requested" side captures the digests of what was
+// sent to the LLM (system prompt, tools, active message chain); the "responded"
+// side captures stop reason, latency, and token usage. Big fields live in the
+// digests — full payloads are reconstructible by replaying preceding records.
+type InferenceRecord struct {
+ Turn int `json:"turn"`
+ Provider string `json:"provider,omitempty"`
+ Model string `json:"model,omitempty"`
+ MaxTokens int `json:"maxTokens,omitempty"`
+ SystemDigest string `json:"systemDigest,omitempty"`
+ ToolsDigest string `json:"toolsDigest,omitempty"`
+
+ // MessageIDs is the active chain at request time, in send order.
+ // Recorded on inference.requested only.
+ MessageIDs []string `json:"messageIds,omitempty"`
+
+ // Response fields — populated on inference.responded only.
+ StopReason string `json:"stopReason,omitempty"`
+ LatencyMs int64 `json:"latencyMs,omitempty"`
+ Usage *InferenceUsage `json:"usage,omitempty"`
+}
+
+type InferenceUsage struct {
+ InputTokens int `json:"inputTokens"`
+ OutputTokens int `json:"outputTokens"`
+ CacheCreateTokens int `json:"cacheCreateTokens,omitempty"`
+ CacheReadTokens int `json:"cacheReadTokens,omitempty"`
+}
+
+// SystemSectionRecord carries the payload for system.section.added /
+// system.section.removed. On removal, Content is empty.
+type SystemSectionRecord struct {
+ Name string `json:"name"`
+ Slot int `json:"slot,omitempty"`
+ Content string `json:"content,omitempty"`
+ Caller string `json:"caller,omitempty"`
+}
+
+// ToolSchemaView mirrors a tool schema in transcript-local types so the
+// transcript package stays free of cross-package imports. Recorder converts
+// from the runtime ToolSchema before writing.
+type ToolSchemaView struct {
+ Name string `json:"name"`
+ Description string `json:"description,omitempty"`
+ Parameters json.RawMessage `json:"parameters,omitempty"`
+}
+
+// ToolsRecord carries the payload for tools.added / tools.removed.
+// One tool per record (Add/Remove fire individually); Schema is set on
+// "added", Name on "removed".
+type ToolsRecord struct {
+ Schema *ToolSchemaView `json:"schema,omitempty"`
+ Name string `json:"name,omitempty"`
+ Caller string `json:"caller,omitempty"`
+}
+
type ContentBlock struct {
Type string `json:"type"`
diff --git a/internal/session/transcript/store.go b/internal/session/transcript/store.go
index d542a9c..ddc1978 100644
--- a/internal/session/transcript/store.go
+++ b/internal/session/transcript/store.go
@@ -51,6 +51,35 @@ type ForkCommand struct {
Time time.Time
}
+// AppendInferenceCommand writes either an inference.requested or
+// inference.responded record. Type selects which; Record carries the payload.
+type AppendInferenceCommand struct {
+ SessionID string
+ AgentID string
+ Time time.Time
+ Type string // InferenceRequested or InferenceResponded
+ Record InferenceRecord
+}
+
+// AppendSystemSectionCommand writes system.section.added or
+// system.section.removed. Removed sections drop Content from Record.
+type AppendSystemSectionCommand struct {
+ SessionID string
+ AgentID string
+ Time time.Time
+ Type string // SystemSectionAdded or SystemSectionRemoved
+ Record SystemSectionRecord
+}
+
+// AppendToolsCommand writes tools.added or tools.removed.
+type AppendToolsCommand struct {
+ SessionID string
+ AgentID string
+ Time time.Time
+ Type string // ToolsAdded or ToolsRemoved
+ Record ToolsRecord
+}
+
type ListOptions struct {
Limit int
IncludeSidechain bool
diff --git a/internal/subagent/executor.go b/internal/subagent/executor.go
index fc7543a..5aa6a49 100644
--- a/internal/subagent/executor.go
+++ b/internal/subagent/executor.go
@@ -285,7 +285,7 @@ func (e *Executor) buildAgent(ctx context.Context, rc *runConfig, agentCwd strin
if e.mcpRegistry != nil {
mcpCaller := mcp.NewCaller(e.mcpRegistry)
for _, t := range mcp.AsCoreTools(schemas, mcpCaller) {
- tools.Add(t)
+ tools.Add(t, "mcp:"+t.Name())
}
}
diff --git a/internal/subagent/executor_test.go b/internal/subagent/executor_test.go
index eb40977..2da6cbe 100644
--- a/internal/subagent/executor_test.go
+++ b/internal/subagent/executor_test.go
@@ -554,7 +554,9 @@ func (s *stubLLM) InputLimit() int { return 0 }
// stubSystem is a minimal core.System for tests.
type stubSystem struct{}
-func (s *stubSystem) Prompt() string { return "" }
-func (s *stubSystem) Use(_ core.Section) {}
-func (s *stubSystem) Drop(_ string) {}
-func (s *stubSystem) Refresh(_ string) {}
+func (s *stubSystem) Prompt() string { return "" }
+func (s *stubSystem) Use(_ core.Section, _ string) {}
+func (s *stubSystem) Drop(_, _ string) {}
+func (s *stubSystem) Refresh(_, _ string) {}
+func (s *stubSystem) Sections() []core.Section { return nil }
+func (s *stubSystem) SetObserver(_ func(core.SystemChange)) {}
diff --git a/internal/subagent/progress_tools.go b/internal/subagent/progress_tools.go
index 374b1e3..9a852ec 100644
--- a/internal/subagent/progress_tools.go
+++ b/internal/subagent/progress_tools.go
@@ -19,10 +19,11 @@ func (p *progressTools) Get(name string) core.Tool {
}
return &progressTool{inner: t, onExec: p.onExec}
}
-func (p *progressTools) All() []core.Tool { return p.inner.All() }
-func (p *progressTools) Add(t core.Tool) { p.inner.Add(t) }
-func (p *progressTools) Remove(name string) { p.inner.Remove(name) }
-func (p *progressTools) Schemas() []core.ToolSchema { return p.inner.Schemas() }
+func (p *progressTools) All() []core.Tool { return p.inner.All() }
+func (p *progressTools) Add(t core.Tool, caller string) { p.inner.Add(t, caller) }
+func (p *progressTools) Remove(name, caller string) { p.inner.Remove(name, caller) }
+func (p *progressTools) Schemas() []core.ToolSchema { return p.inner.Schemas() }
+func (p *progressTools) SetObserver(fn func(core.ToolsChange)) { p.inner.SetObserver(fn) }
type progressTool struct {
inner core.Tool
diff --git a/internal/tool/permission.go b/internal/tool/permission.go
index f574837..a774257 100644
--- a/internal/tool/permission.go
+++ b/internal/tool/permission.go
@@ -32,10 +32,11 @@ func (pt *permissionTools) Get(name string) core.Tool {
return &permissionTool{inner: t, check: pt.check}
}
-func (pt *permissionTools) All() []core.Tool { return pt.inner.All() }
-func (pt *permissionTools) Add(tool core.Tool) { pt.inner.Add(tool) }
-func (pt *permissionTools) Remove(name string) { pt.inner.Remove(name) }
-func (pt *permissionTools) Schemas() []core.ToolSchema { return pt.inner.Schemas() }
+func (pt *permissionTools) All() []core.Tool { return pt.inner.All() }
+func (pt *permissionTools) Add(tool core.Tool, caller string) { pt.inner.Add(tool, caller) }
+func (pt *permissionTools) Remove(name, caller string) { pt.inner.Remove(name, caller) }
+func (pt *permissionTools) Schemas() []core.ToolSchema { return pt.inner.Schemas() }
+func (pt *permissionTools) SetObserver(fn func(core.ToolsChange)) { pt.inner.SetObserver(fn) }
// permissionTool wraps a single core.Tool with permission checking.
type permissionTool struct {