Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion internal/agent/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type BuildParams struct {
PermissionDecider PermDecisionFunc
InteractionFunc tool.InteractionFunc
ToolProgress func(toolCallID string, msg string)

// OnEvent observes every agent lifecycle event synchronously, alongside
// outbox delivery. Used by the trace recorder; nil leaves recording off.
OnEvent func(core.Event)
}

func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) {
Expand Down Expand Up @@ -87,7 +91,7 @@ func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) {
}))
tools := tool.AdaptToolRegistry(schemas, cwdFunc, adaptOpts...)
for _, t := range p.MCPTools {
tools.Add(t)
tools.Add(t, "mcp:"+t.Name())
}

compactClient := client
Expand All @@ -111,6 +115,7 @@ func buildAgent(p BuildParams) (core.Agent, *PermissionBridge, error) {
Tools: tool.WithPermission(tools, pb.PermissionFunc()),
CompactFunc: compactFunc,
CWD: p.CWD,
OnEvent: p.OnEvent,
})

return ag, pb, nil
Expand Down
9 changes: 8 additions & 1 deletion internal/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,18 @@ func (m *model) buildAgentParams() agent.BuildParams {
mcpTools = mcp.AsCoreTools(schemas, mcpCaller)
}

maxTokens := kit.GetMaxTokens(m.services.LLM.Store(), m.env.CurrentModel, setting.DefaultMaxTokens)
var onEvent func(core.Event)
if rec := m.services.Session.NewRecorder("main", m.env.LLMProvider.Name(), m.env.GetModelID(), maxTokens); rec != nil {
onEvent = rec.OnAgentEvent
}

return agent.BuildParams{
Provider: m.env.LLMProvider,
ModelID: m.env.GetModelID(),
MaxTokens: kit.GetMaxTokens(m.services.LLM.Store(), m.env.CurrentModel, setting.DefaultMaxTokens),
MaxTokens: maxTokens,
ThinkingEffort: m.env.EffectiveThinkingEffort(),
OnEvent: onEvent,

CWD: m.env.CWD,
CWDFunc: func() string { return m.env.CWD },
Expand Down
77 changes: 75 additions & 2 deletions internal/core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func NewAgent(cfg Config) Agent {
outbox = make(chan Event, cfg.OutboxBuf)
}

return &agent{
a := &agent{
id: cfg.ID,
agentType: cfg.AgentType,
color: cfg.Color,
Expand All @@ -148,6 +148,16 @@ func NewAgent(cfg Config) Agent {
outbox: outbox,
onEvent: cfg.OnEvent,
}
// Mirror system + tools mutations onto the event bus. Attach after
// construction so each registry replays its initial members back to the
// observer — the recorder sees a complete event chain from t0.
cfg.System.SetObserver(func(c SystemChange) {
a.emitTelemetry(SystemChangeEvent(a.id, c))
})
cfg.Tools.SetObserver(func(c ToolsChange) {
a.emitTelemetry(ToolsChangeEvent(a.id, c))
})
return a
}

// Result represents the outcome of one completed turn (end_turn).
Expand Down Expand Up @@ -178,6 +188,15 @@ const (
OnMessage EventType = "Message" // message received on inbox (Message in Data)
OnTurn EventType = "Turn" // think+act cycle completed (Result in Data)
OnCompact EventType = "Compact" // conversation compacted (CompactInfo in Data)

// OnSystemChange fires when a system-prompt section is added, replaced,
// or removed. Data is SystemChange. Non-critical telemetry — never blocks
// the outbox on backpressure.
OnSystemChange EventType = "SystemChange"

// OnToolsChange fires when a tool is registered or unregistered. Data is
// ToolsChange. Like OnSystemChange, non-blocking telemetry.
OnToolsChange EventType = "ToolsChange"
)

// Event carries context for an agent lifecycle point.
Expand Down Expand Up @@ -205,6 +224,50 @@ type CompactInfo struct {
OriginalCount int
}

// InferenceContext is the PreInfer payload — what was about to be sent to the
// LLM, expressed as content-addressed digests so consumers (trace recorder,
// debug logger) can reference inputs without copying them on every turn.
type InferenceContext struct {
SystemDigest string // sha256 of rendered system prompt
ToolsDigest string // sha256 of canonicalized tool schemas
MessageIDs []string // active chain at request time, in send order
}

func (e Event) InferenceContext() (InferenceContext, bool) {
ic, ok := e.Data.(InferenceContext)
return ic, ok
}

// SystemChange describes one mutation to the system prompt's section map.
// Emitted on Use/Drop. The recorder translates these into
// system.section.added / system.section.removed records.
type SystemChange struct {
Name string // section name (stable across mutations)
Slot int // render slot
Content string // rendered content; empty when Removed
Removed bool // true on Drop, false on Use
Caller string // who triggered the mutation (e.g. "system:init", "command:/identity")
}

func (e Event) SystemChange() (SystemChange, bool) {
c, ok := e.Data.(SystemChange)
return c, ok
}

// ToolsChange describes one mutation to the tool registry. On removal,
// Schema.Name carries the dropped tool's name and other fields are zero.
type ToolsChange struct {
Schema ToolSchema // populated on Add (zero on Remove)
Name string // populated on Remove (empty on Add)
Removed bool // true on Remove, false on Add
Caller string // who triggered the mutation
}

func (e Event) ToolsChange() (ToolsChange, bool) {
c, ok := e.Data.(ToolsChange)
return c, ok
}

// Typed event constructors — enforce correct Data types at construction.

func StartEvent(agentID string) Event { return Event{Type: OnStart, Source: agentID} }
Expand All @@ -214,7 +277,9 @@ func StopEvent(agentID string, err error) Event {
func ChunkEvent(agentID string, c Chunk) Event { return Event{Type: OnChunk, Source: agentID, Data: c} }
func MessageEvent(msg Message) Event { return Event{Type: OnMessage, Source: msg.From, Data: msg} }
func TurnEvent(agentID string, r Result) Event { return Event{Type: OnTurn, Source: agentID, Data: r} }
func PreInferEvent(agentID string) Event { return Event{Type: PreInfer, Source: agentID} }
func PreInferEvent(agentID string, ctx InferenceContext) Event {
return Event{Type: PreInfer, Source: agentID, Data: ctx}
}
func PostInferEvent(agentID string, r *InferResponse) Event {
return Event{Type: PostInfer, Source: agentID, Data: r}
}
Expand All @@ -223,3 +288,11 @@ func PostToolEvent(tr ToolResult) Event { return Event{Type: PostTool, Source: t
func CompactEvent(agentID string, info CompactInfo) Event {
return Event{Type: OnCompact, Source: agentID, Data: info}
}

func SystemChangeEvent(agentID string, c SystemChange) Event {
return Event{Type: OnSystemChange, Source: agentID, Data: c}
}

func ToolsChangeEvent(agentID string, c ToolsChange) Event {
return Event{Type: OnToolsChange, Source: agentID, Data: c}
}
38 changes: 33 additions & 5 deletions internal/core/agent_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@ func (a *agent) ThinkAct(ctx context.Context) (*Result, error) {
}
}

a.emit(ctx, PreInferEvent(a.id))

resp, err := a.streamInfer(ctx)
if err != nil {
// Reactive compaction: if prompt too long, compact and retry
Expand Down Expand Up @@ -424,11 +422,24 @@ func ToolCallIDFromContext(ctx context.Context) string {
// --- internals ---

// streamInfer calls the LLM, streams chunks to outbox, returns the final response.
//
// Emits PreInferEvent (with input digests) before the LLM call so observers
// can record exactly what was sent without copying the bytes on every turn.
func (a *agent) streamInfer(ctx context.Context) (*InferResponse, error) {
sys := a.system.Prompt()
msgs := a.snapshot()
tools := a.tools.Schemas()

a.emit(ctx, PreInferEvent(a.id, InferenceContext{
SystemDigest: sha256Hex([]byte(sys)),
ToolsDigest: toolsDigest(tools),
MessageIDs: messageIDs(msgs),
}))

chunks, err := a.llm.Infer(ctx, InferRequest{
System: a.system.Prompt(),
Messages: a.snapshot(),
Tools: a.tools.Schemas(),
System: sys,
Messages: msgs,
Tools: tools,
})
if err != nil {
return nil, fmt.Errorf("infer: %w", err)
Expand Down Expand Up @@ -475,6 +486,23 @@ func (a *agent) emit(ctx context.Context, event Event) {
}
}

// emitTelemetry delivers a fire-and-forget event: synchronously to onEvent,
// non-blocking to the outbox (dropped if full). Used for events whose
// consumers tolerate misses (system changes, hot-path tracing) and which can
// fire from goroutines without a useful ctx (e.g. system observer callbacks).
func (a *agent) emitTelemetry(event Event) {
if a.onEvent != nil {
a.onEvent(event)
}
if a.outbox == nil || a.closed.Load() {
return
}
select {
case a.outbox <- event:
default:
}
}

// emitFinal sends a critical event that must be delivered even on ctx cancellation.
// Used for StopEvent — consumers rely on it for cleanup/session saving.
// No-op when outbox is nil. Blocks up to 5 seconds; logs a warning if delivery fails.
Expand Down
51 changes: 51 additions & 0 deletions internal/core/digest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package core

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"sort"
)

// sha256Hex returns "sha256:" + lowercase hex of the SHA-256 sum of b.
// The prefix makes algorithm choice explicit on the wire.
func sha256Hex(b []byte) string {
sum := sha256.Sum256(b)
return "sha256:" + hex.EncodeToString(sum[:])
}

// toolsDigest canonicalizes a tool schema list (sort by Name, marshal each)
// and returns its sha256. Stable across runs as long as schemas are stable.
func toolsDigest(schemas []ToolSchema) string {
if len(schemas) == 0 {
return sha256Hex(nil)
}
sorted := make([]ToolSchema, len(schemas))
copy(sorted, schemas)
sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name })

b, err := json.Marshal(sorted)
if err != nil {
// Marshal can only fail on unsupported types; ToolSchema fields are
// JSON-safe. Fall back to a digest of the names so the value is still
// stable rather than empty.
names := make([]string, len(sorted))
for i, s := range sorted {
names[i] = s.Name
}
b, _ = json.Marshal(names)
}
return sha256Hex(b)
}

// messageIDs extracts non-empty message IDs from the conversation snapshot,
// in send order. Empty IDs (legacy data) are skipped rather than padded.
func messageIDs(msgs []Message) []string {
out := make([]string, 0, len(msgs))
for _, m := range msgs {
if m.ID != "" {
out = append(out, m.ID)
}
}
return out
}
21 changes: 17 additions & 4 deletions internal/core/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@ type System interface {
// invalidated only when sections change.
Prompt() string

// Use registers or replaces a section by Name.
Use(Section)
// Use registers or replaces a section by Name. Caller is a short tag
// describing what triggered the mutation (e.g. "system:init",
// "command:/identity") and surfaces in the trace.
Use(sec Section, caller string)

// Drop removes a section by Name. No-op if absent.
Drop(name string)
Drop(name, caller string)

// Refresh marks one section's rendered output stale. Use after the
// section's underlying state changed but the Section value did not.
Refresh(name string)
Refresh(name, caller string)

// Sections returns a snapshot of currently registered sections in
// render order (slot ascending, insertion order ascending). Used by
// observers that attach after construction to replay the existing state.
Sections() []Section

// SetObserver installs a callback invoked synchronously on every
// subsequent mutation. Attaching also replays existing sections as
// "added" events so the observer sees a complete history starting
// from the moment of attachment.
SetObserver(fn func(SystemChange))
}
4 changes: 2 additions & 2 deletions internal/core/system/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ func TestSystemUseDropRefresh(t *testing.T) {
sys.Use(core.Section{
Slot: core.SlotEnvironment, Name: "test-section", Source: core.Dynamic,
Render: func() string { return "TEST_SECTION_BODY" },
})
}, "test")
if !strings.Contains(sys.Prompt(), "TEST_SECTION_BODY") {
t.Error("Use should add a new section's content to Prompt()")
}

// Drop: remove it.
sys.Drop("test-section")
sys.Drop("test-section", "test")
if strings.Contains(sys.Prompt(), "TEST_SECTION_BODY") {
t.Error("Drop should remove the section from Prompt()")
}
Expand Down
Loading