From e033208a46408f60ef3998c7df9150b8e6b45fde Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Sat, 6 Jun 2026 00:23:53 +0530 Subject: [PATCH 1/5] refactor(observe): extract shared observer skeleton Move the observer-pattern-general pieces of the SCM observer into a new backend/internal/observe package so the tracker observer (issue #35) can build on the same primitives: - StartPollLoop: goroutine supervisor with immediate-first-poll + ticker + ctx-done exit. SCM Observer.Start now delegates to it. - CheckCredentialsOnce: lazy first-poll credential gate driven by a CredentialProbe closure. SCM observer keeps credentialsChecked/disabled as Observer fields; the shared helper mutates them via pointer so state ownership stays single-source. - CacheSet[V any] / CacheDelete[V any]: one generic bounded-FIFO helper replaces the three near-identical cacheSet{String,Time,Bool} bodies and the standalone evictStrings. The SCM-side methods are now one-line wrappers that thread o.Cache.max into the shared helper, so existing call sites and tests are untouched. SCM behavior is unchanged. The full 21-test SCM suite (including the end-to-end test added in PR #115) plus 577 backend tests stay green under `go test -race`. Part of #112. Co-Authored-By: Claude Opus 4.7 --- backend/internal/observe/observer.go | 133 ++++++++++++++ backend/internal/observe/observer_test.go | 209 ++++++++++++++++++++++ backend/internal/observe/scm/observer.go | 124 ++++--------- 3 files changed, 372 insertions(+), 94 deletions(-) create mode 100644 backend/internal/observe/observer.go create mode 100644 backend/internal/observe/observer_test.go diff --git a/backend/internal/observe/observer.go b/backend/internal/observe/observer.go new file mode 100644 index 0000000..391e02a --- /dev/null +++ b/backend/internal/observe/observer.go @@ -0,0 +1,133 @@ +// Package observe contains observer-pattern primitives shared across the SCM +// and Tracker observation lanes. The pieces here are deliberately +// provider-agnostic: a polling goroutine supervisor, a lazy credential gate, +// and a bounded FIFO cache helper. Provider-specific normalization, +// persistence, and lifecycle reactions live in the sibling packages +// (observe/scm, future observe/tracker). +package observe + +import ( + "context" + "errors" + "log/slog" + "time" +) + +// StartPollLoop launches a goroutine that calls poll immediately, then on every +// tick interval until ctx is done. The returned channel closes when the +// goroutine exits; callers wait on it during shutdown. +// +// The immediate first poll inside the goroutine (rather than before the ticker +// loop) keeps daemon startup non-blocking: callers see Start return after the +// goroutine is launched, not after the first network call. +// +// poll errors other than context.Canceled are logged via logger with name as a +// prefix, e.g. name="scm observer" -> "scm observer: initial poll failed". +func StartPollLoop(ctx context.Context, tick time.Duration, poll func(context.Context) error, logger *slog.Logger, name string) <-chan struct{} { + if logger == nil { + logger = slog.Default() + } + done := make(chan struct{}) + go func() { + defer close(done) + if err := poll(ctx); err != nil && !errors.Is(err, context.Canceled) { + logger.Error(name+": initial poll failed", "err", err) + } + t := time.NewTicker(tick) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if err := poll(ctx); err != nil && !errors.Is(err, context.Canceled) { + logger.Error(name+": poll failed", "err", err) + } + } + } + }() + return done +} + +// CredentialProbe checks whether the observer's provider has usable credentials. +// Implementations return (false, nil) for a transient failure the observer +// should retry on the next tick, (false, non-nil) for an error the caller +// should surface, and (true, nil) when credentials are available. +type CredentialProbe func(ctx context.Context) (available bool, err error) + +// CheckCredentialsOnce runs probe at most once. The caller owns checked/disabled +// state via pointer so the observer struct keeps a single source of truth. +// +// State transitions: +// - probe == nil → *checked = true, returns (true, nil). No gate. +// - probe returns err → state unchanged, returns (false, nil). Retried next tick. +// - probe returns false → *checked = true, *disabled = true, returns (false, nil). Observer stays disabled. +// - probe returns true → *checked = true, returns (true, nil). Subsequent calls bypass the probe. +// +// A context-cancellation before the probe returns (false, ctx.Err()). +func CheckCredentialsOnce(ctx context.Context, probe CredentialProbe, checked, disabled *bool, logger *slog.Logger, name string) (bool, error) { + if *checked { + return true, nil + } + if err := ctx.Err(); err != nil { + return false, err + } + if logger == nil { + logger = slog.Default() + } + if probe == nil { + *checked = true + return true, nil + } + available, err := probe(ctx) + if err != nil { + logger.Warn(name+" credentials check failed; will retry", "err", err) + return false, nil + } + *checked = true + if !available { + *disabled = true + logger.Warn(name + " disabled: provider credentials unavailable") + return false, nil + } + return true, nil +} + +// CacheSet writes value to m[key] and tracks insertion order in *order for +// bounded FIFO eviction. If the bucket already had key, order is left +// unchanged; otherwise key is appended. When len(*order) exceeds max, the +// oldest keys are evicted from both order and m. +// +// max <= 0 disables eviction; callers that want bounded behavior must pass a +// positive max. The generic shape lets the same helper serve string-, time-, +// and bool-valued caches without per-type duplication. +func CacheSet[V any](m map[string]V, order *[]string, max int, key string, value V) { + if _, ok := m[key]; !ok { + *order = append(*order, key) + } + m[key] = value + if max <= 0 { + return + } + for len(*order) > max { + evict := (*order)[0] + *order = (*order)[1:] + delete(m, evict) + } +} + +// CacheDelete removes key from m and the matching slot from *order. It is a +// no-op when key is absent. +func CacheDelete[V any](m map[string]V, order *[]string, key string) { + if _, ok := m[key]; !ok { + return + } + delete(m, key) + dst := (*order)[:0] + for _, cachedKey := range *order { + if cachedKey != key { + dst = append(dst, cachedKey) + } + } + *order = dst +} diff --git a/backend/internal/observe/observer_test.go b/backend/internal/observe/observer_test.go new file mode 100644 index 0000000..46cc077 --- /dev/null +++ b/backend/internal/observe/observer_test.go @@ -0,0 +1,209 @@ +package observe + +import ( + "context" + "errors" + "io" + "log/slog" + "sync" + "sync/atomic" + "testing" + "time" +) + +func quietLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func TestCacheSet_InsertsAndOrders(t *testing.T) { + m := map[string]string{} + var order []string + CacheSet(m, &order, 4, "a", "1") + CacheSet(m, &order, 4, "b", "2") + CacheSet(m, &order, 4, "c", "3") + if got := m["b"]; got != "2" { + t.Fatalf("m[b] = %q want %q", got, "2") + } + if len(order) != 3 || order[0] != "a" || order[2] != "c" { + t.Fatalf("order = %v, want [a b c]", order) + } +} + +func TestCacheSet_UpdateDoesNotRepeatOrder(t *testing.T) { + m := map[string]string{} + var order []string + CacheSet(m, &order, 4, "a", "1") + CacheSet(m, &order, 4, "a", "1b") + if got := m["a"]; got != "1b" { + t.Fatalf("m[a] = %q want %q", got, "1b") + } + if len(order) != 1 || order[0] != "a" { + t.Fatalf("order = %v, want [a] (repeat sets must not duplicate the slot)", order) + } +} + +func TestCacheSet_EvictsOldestPastMax(t *testing.T) { + m := map[string]int{} + var order []string + for i, k := range []string{"a", "b", "c", "d"} { + CacheSet(m, &order, 2, k, i) + } + if _, ok := m["a"]; ok { + t.Fatalf("a should have been evicted, got %v", m) + } + if _, ok := m["b"]; ok { + t.Fatalf("b should have been evicted, got %v", m) + } + if len(order) != 2 || order[0] != "c" || order[1] != "d" { + t.Fatalf("order = %v, want [c d]", order) + } +} + +func TestCacheSet_GenericOverTime(t *testing.T) { + m := map[string]time.Time{} + var order []string + now := time.Unix(1700000000, 0) + CacheSet(m, &order, 4, "k", now) + if !m["k"].Equal(now) { + t.Fatalf("time round-trip failed: %v vs %v", m["k"], now) + } +} + +func TestCacheDelete_RemovesKeyAndOrderSlot(t *testing.T) { + m := map[string]bool{} + var order []string + CacheSet(m, &order, 4, "a", true) + CacheSet(m, &order, 4, "b", true) + CacheDelete(m, &order, "a") + if _, ok := m["a"]; ok { + t.Fatalf("a should be removed: %v", m) + } + if len(order) != 1 || order[0] != "b" { + t.Fatalf("order = %v, want [b]", order) + } +} + +func TestCacheDelete_MissingKeyIsNoop(t *testing.T) { + m := map[string]bool{"a": true} + order := []string{"a"} + CacheDelete(m, &order, "z") + if !m["a"] || len(order) != 1 || order[0] != "a" { + t.Fatalf("missing-key delete must not mutate, got m=%v order=%v", m, order) + } +} + +func TestCheckCredentialsOnce_NilProbeMarksChecked(t *testing.T) { + var checked, disabled bool + ok, err := CheckCredentialsOnce(context.Background(), nil, &checked, &disabled, quietLogger(), "test") + if err != nil || !ok { + t.Fatalf("nil probe: ok=%v err=%v", ok, err) + } + if !checked || disabled { + t.Fatalf("nil probe: checked=%v disabled=%v", checked, disabled) + } +} + +func TestCheckCredentialsOnce_ProbeAvailable(t *testing.T) { + var checked, disabled bool + calls := 0 + probe := func(context.Context) (bool, error) { calls++; return true, nil } + if ok, err := CheckCredentialsOnce(context.Background(), probe, &checked, &disabled, quietLogger(), "test"); err != nil || !ok { + t.Fatalf("first call: ok=%v err=%v", ok, err) + } + if !checked || disabled { + t.Fatalf("after success: checked=%v disabled=%v", checked, disabled) + } + // Second call must NOT re-invoke probe. + if ok, err := CheckCredentialsOnce(context.Background(), probe, &checked, &disabled, quietLogger(), "test"); err != nil || !ok { + t.Fatalf("second call: ok=%v err=%v", ok, err) + } + if calls != 1 { + t.Fatalf("probe should run once, ran %d", calls) + } +} + +func TestCheckCredentialsOnce_ProbeUnavailableDisables(t *testing.T) { + var checked, disabled bool + probe := func(context.Context) (bool, error) { return false, nil } + ok, err := CheckCredentialsOnce(context.Background(), probe, &checked, &disabled, quietLogger(), "test") + if err != nil || ok { + t.Fatalf("ok=%v err=%v, want (false, nil)", ok, err) + } + if !checked || !disabled { + t.Fatalf("after unavailable: checked=%v disabled=%v", checked, disabled) + } +} + +func TestCheckCredentialsOnce_TransientErrorRetries(t *testing.T) { + var checked, disabled bool + calls := 0 + probe := func(context.Context) (bool, error) { + calls++ + if calls == 1 { + return false, errors.New("transient") + } + return true, nil + } + if ok, err := CheckCredentialsOnce(context.Background(), probe, &checked, &disabled, quietLogger(), "test"); err != nil || ok { + t.Fatalf("first call: ok=%v err=%v, want (false,nil)", ok, err) + } + if checked || disabled { + t.Fatalf("transient error must leave state untouched: checked=%v disabled=%v", checked, disabled) + } + if ok, err := CheckCredentialsOnce(context.Background(), probe, &checked, &disabled, quietLogger(), "test"); err != nil || !ok { + t.Fatalf("retry call: ok=%v err=%v, want (true,nil)", ok, err) + } +} + +func TestStartPollLoop_FirstPollImmediateThenTicks(t *testing.T) { + var mu sync.Mutex + calls := 0 + poll := func(context.Context) error { + mu.Lock() + defer mu.Unlock() + calls++ + return nil + } + ctx, cancel := context.WithCancel(context.Background()) + done := StartPollLoop(ctx, 10*time.Millisecond, poll, quietLogger(), "test") + // Wait for at least 2 polls (initial + one tick). + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + mu.Lock() + n := calls + mu.Unlock() + if n >= 2 { + break + } + time.Sleep(5 * time.Millisecond) + } + cancel() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("done channel not closed after cancel") + } + mu.Lock() + defer mu.Unlock() + if calls < 2 { + t.Fatalf("expected at least 2 polls, got %d", calls) + } +} + +func TestStartPollLoop_LogsPollErrorWithoutPanic(t *testing.T) { + var ran atomic.Int32 + poll := func(context.Context) error { + ran.Add(1) + return errors.New("boom") + } + ctx, cancel := context.WithCancel(context.Background()) + done := StartPollLoop(ctx, 10*time.Millisecond, poll, quietLogger(), "test") + for time.Now().Before(time.Now().Add(200 * time.Millisecond)) { + if ran.Load() >= 2 { + break + } + time.Sleep(5 * time.Millisecond) + } + cancel() + <-done +} diff --git a/backend/internal/observe/scm/observer.go b/backend/internal/observe/scm/observer.go index 4c425da..afc35d5 100644 --- a/backend/internal/observe/scm/observer.go +++ b/backend/internal/observe/scm/observer.go @@ -14,9 +14,11 @@ import ( "log/slog" "os/exec" "strings" + "sync" "time" "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/observe" "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) @@ -160,39 +162,25 @@ func New(provider Provider, store Store, lifecycle Lifecycle, cfg Config) *Obser // Start launches the observer loop. The first Poll runs immediately inside the // goroutine so daemon startup is not blocked; subsequent polls run on the tick. +// +// The first invocation of poll inside the supervisor also runs checkCredentials +// up front. That way the "scm observer disabled: provider credentials +// unavailable" warning is emitted on a fresh daemon even if discoverSubjects +// has no subjects yet (which would otherwise short-circuit Poll before +// checkCredentials). checkCredentials is guarded by credentialsChecked, so the +// wrap stays once-per-process; a transient error there simply defers the check +// to the next tick. func (o *Observer) Start(ctx context.Context) <-chan struct{} { - done := make(chan struct{}) - go o.loop(ctx, done) - return done -} - -func (o *Observer) loop(ctx context.Context, done chan<- struct{}) { - defer close(done) - // Run the credential gate once before the first poll so the - // "scm observer disabled: provider credentials unavailable" warning is - // emitted on a fresh daemon even if discoverSubjects has no subjects yet - // (which would otherwise short-circuit Poll before checkCredentials). - // checkCredentials is guarded by credentialsChecked, so this remains - // once-per-process; a transient error here simply defers the check to the - // next Poll, matching existing behavior. - if _, err := o.checkCredentials(ctx); err != nil && !errors.Is(err, context.Canceled) { - o.logger.Error("scm observer: initial credential check failed", "err", err) - } - if err := o.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) { - o.logger.Error("scm observer: initial poll failed", "err", err) - } - t := time.NewTicker(o.tick) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - if err := o.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) { - o.logger.Error("scm observer: poll failed", "err", err) + var credentialGate sync.Once + poll := func(ctx context.Context) error { + credentialGate.Do(func() { + if _, err := o.checkCredentials(ctx); err != nil && !errors.Is(err, context.Canceled) { + o.logger.Error("scm observer: initial credential check failed", "err", err) } - } + }) + return o.Poll(ctx) } + return observe.StartPollLoop(ctx, o.tick, poll, o.logger, "scm observer") } type subject struct { @@ -410,29 +398,11 @@ func (o *Observer) Poll(ctx context.Context) error { } func (o *Observer) checkCredentials(ctx context.Context) (bool, error) { - if o.credentialsChecked { - return true, nil - } - if err := ctx.Err(); err != nil { - return false, err + var probe observe.CredentialProbe + if checker, ok := o.provider.(credentialChecker); ok { + probe = checker.SCMCredentialsAvailable } - checker, ok := o.provider.(credentialChecker) - if !ok { - o.credentialsChecked = true - return true, nil - } - available, err := checker.SCMCredentialsAvailable(ctx) - if err != nil { - o.logger.Warn("scm observer credentials check failed; will retry", "err", err) - return false, nil - } - o.credentialsChecked = true - if !available { - o.disabled = true - o.logger.Warn("scm observer disabled: provider credentials unavailable") - return false, nil - } - return true, nil + return observe.CheckCredentialsOnce(ctx, probe, &o.credentialsChecked, &o.disabled, o.logger, "scm observer") } func (o *Observer) discoverSubjects(ctx context.Context) (map[string]*subject, error) { @@ -1162,56 +1132,22 @@ func scrubLine(s string) string { return strings.TrimSpace(s) } +// cacheSetString / cacheSetTime / cacheSetBool are thin wrappers around the +// generic observe.CacheSet helper, kept on Observer so callers don't need to +// thread o.Cache.max through every invocation. The single shared +// implementation lives in the observe package. func (o *Observer) cacheSetString(m map[string]string, order *[]string, key, value string) { - if _, ok := m[key]; !ok { - *order = append(*order, key) - } - m[key] = value - o.evictStrings(m, order) + observe.CacheSet(m, order, o.Cache.max, key, value) } func (o *Observer) cacheSetTime(m map[string]time.Time, order *[]string, key string, value time.Time) { - if _, ok := m[key]; !ok { - *order = append(*order, key) - } - m[key] = value - for len(*order) > o.Cache.max { - evict := (*order)[0] - *order = (*order)[1:] - delete(m, evict) - } + observe.CacheSet(m, order, o.Cache.max, key, value) } func (o *Observer) cacheSetBool(m map[string]bool, order *[]string, key string, value bool) { - if _, ok := m[key]; !ok { - *order = append(*order, key) - } - m[key] = value - for len(*order) > o.Cache.max { - evict := (*order)[0] - *order = (*order)[1:] - delete(m, evict) - } + observe.CacheSet(m, order, o.Cache.max, key, value) } func cacheDelete[V any](m map[string]V, order *[]string, key string) { - if _, ok := m[key]; !ok { - return - } - delete(m, key) - dst := (*order)[:0] - for _, cachedKey := range *order { - if cachedKey != key { - dst = append(dst, cachedKey) - } - } - *order = dst -} - -func (o *Observer) evictStrings(m map[string]string, order *[]string) { - for len(*order) > o.Cache.max { - evict := (*order)[0] - *order = (*order)[1:] - delete(m, evict) - } + observe.CacheDelete(m, order, key) } From a3f1cd8835e6ce791f838b949e1f540da3c66f8f Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Sat, 6 Jun 2026 00:27:01 +0530 Subject: [PATCH 2/5] feat(tracker): ports.TrackerObservation DTO + ApplyTrackerFacts reducer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Land the contract that the future Tracker observer (issue #35) and its provider adapters must satisfy. No observer is wired in this PR — the DTO + reducer are the deliverable, and locking the shape now lets the observer + adapter work happen in small follow-up PRs. DTO (backend/internal/ports/tracker_observations.go): - TrackerObservation mirrors ports.SCMObservation: Fetched bool, ObservedAt time.Time, Provider/Host/Repo, normalized Issue facts, Comments, and a Changed{State, Assignee, Comments} discriminator. - TrackerIssueObservation carries the minimal facts lifecycle needs today (state, assignee, title, body, timestamps); richer per-provider metadata stays inside each adapter. - TrackerCommentObservation carries the comment fields needed for the bot-mention nudge (Author, Body, IsBot, ID for dedup). Reducer (backend/internal/lifecycle/reactions.go): - ApplyTrackerFacts(ctx, sessionID, ports.TrackerObservation) error, mirroring ApplySCMObservation's "Fetched gate → terminal-state → per-bucket reactions" shape. - Three initial reactions: * Issue state == done | cancelled → MarkTerminated (idempotent). * Changed.Assignee → log only via slog.Default(). The "assignee changed away from AO" policy is reserved for #40. * Changed.Comments with bot comments → one-time nudge with strings.Join'd bot bodies, deduped by comment IDs. - The nudge path reuses sendOnce with an empty prURL so the in-memory dedup applies but the PR-row persistence path is skipped. Tracker signature persistence will land with #35 alongside issue-row storage. Tests in backend/internal/lifecycle/manager_test.go cover each branch: terminate (done + cancelled), log-only assignee, nudge fires on new bot comment, nudge suppressed on repeat, new bot comment id refires, not-fetched is no-op, terminated session ignores observations. Part of #112. Co-Authored-By: Claude Opus 4.7 --- backend/internal/lifecycle/manager_test.go | 134 ++++++++++++++++++ backend/internal/lifecycle/reactions.go | 72 ++++++++++ .../internal/ports/tracker_observations.go | 107 ++++++++++++++ 3 files changed, 313 insertions(+) create mode 100644 backend/internal/ports/tracker_observations.go diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go index 6d86189..15c2718 100644 --- a/backend/internal/lifecycle/manager_test.go +++ b/backend/internal/lifecycle/manager_test.go @@ -313,6 +313,140 @@ func TestPRObservation_DedupPersistsAcrossPRs(t *testing.T) { } } +func TestApplyTrackerFacts_TerminalStateMarksTerminated(t *testing.T) { + for _, state := range []domain.NormalizedIssueState{domain.IssueDone, domain.IssueCancelled} { + t.Run(string(state), func(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + o := ports.TrackerObservation{ + Fetched: true, + Issue: ports.TrackerIssueObservation{URL: "https://github.com/o/r/issues/1", State: state}, + } + if err := m.ApplyTrackerFacts(ctx, "mer-1", o); err != nil { + t.Fatalf("ApplyTrackerFacts: %v", err) + } + got := st.sessions["mer-1"] + if !got.IsTerminated || got.Activity.State != domain.ActivityExited { + t.Fatalf("want terminated/exited for state %q, got %+v", state, got) + } + if len(msg.msgs) != 0 { + t.Fatalf("terminal state should not nudge, got %v", msg.msgs) + } + }) + } +} + +func TestApplyTrackerFacts_AssigneeChangedIsLogOnly(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + before := st.sessions["mer-1"] + o := ports.TrackerObservation{ + Fetched: true, + Issue: ports.TrackerIssueObservation{URL: "https://github.com/o/r/issues/1", State: domain.IssueOpen, Assignee: "someone-else"}, + Changed: ports.TrackerChanged{Assignee: true}, + } + if err := m.ApplyTrackerFacts(ctx, "mer-1", o); err != nil { + t.Fatalf("ApplyTrackerFacts: %v", err) + } + if st.sessions["mer-1"] != before { + t.Fatalf("assignee-only change must not mutate the session row, got %+v", st.sessions["mer-1"]) + } + if len(msg.msgs) != 0 { + t.Fatalf("assignee-only change must not nudge, got %v", msg.msgs) + } +} + +func TestApplyTrackerFacts_NewBotCommentNudges(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + o := ports.TrackerObservation{ + Fetched: true, + Issue: ports.TrackerIssueObservation{URL: "https://github.com/o/r/issues/1", State: domain.IssueOpen}, + Comments: []ports.TrackerCommentObservation{ + {ID: "human-1", Author: "alice", Body: "human chime-in, must NOT nudge", IsBot: false}, + {ID: "bot-1", Author: "ci-bot[bot]", Body: "please rerun the migration", IsBot: true}, + }, + Changed: ports.TrackerChanged{Comments: true}, + } + if err := m.ApplyTrackerFacts(ctx, "mer-1", o); err != nil { + t.Fatalf("ApplyTrackerFacts: %v", err) + } + if len(msg.msgs) != 1 { + t.Fatalf("want one bot-mention nudge, got %d: %v", len(msg.msgs), msg.msgs) + } + if !strings.Contains(msg.msgs[0], "please rerun the migration") { + t.Fatalf("nudge should include the bot comment body, got %q", msg.msgs[0]) + } + if strings.Contains(msg.msgs[0], "human chime-in") { + t.Fatalf("nudge must not include human comments, got %q", msg.msgs[0]) + } +} + +func TestApplyTrackerFacts_NudgeSuppressedOnRepeat(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + o := ports.TrackerObservation{ + Fetched: true, + Issue: ports.TrackerIssueObservation{URL: "https://github.com/o/r/issues/1", State: domain.IssueOpen}, + Comments: []ports.TrackerCommentObservation{ + {ID: "bot-1", Author: "ci-bot[bot]", Body: "please rerun the migration", IsBot: true}, + }, + Changed: ports.TrackerChanged{Comments: true}, + } + if err := m.ApplyTrackerFacts(ctx, "mer-1", o); err != nil { + t.Fatalf("first ApplyTrackerFacts: %v", err) + } + if err := m.ApplyTrackerFacts(ctx, "mer-1", o); err != nil { + t.Fatalf("second ApplyTrackerFacts: %v", err) + } + if len(msg.msgs) != 1 { + t.Fatalf("repeat observation must dedup; got %d nudges: %v", len(msg.msgs), msg.msgs) + } + + // A genuinely new bot comment still fires. + o.Comments = append(o.Comments, ports.TrackerCommentObservation{ID: "bot-2", Author: "ci-bot[bot]", Body: "now check the seed", IsBot: true}) + if err := m.ApplyTrackerFacts(ctx, "mer-1", o); err != nil { + t.Fatalf("third ApplyTrackerFacts: %v", err) + } + if len(msg.msgs) != 2 { + t.Fatalf("new bot comment id should re-fire, got %d: %v", len(msg.msgs), msg.msgs) + } +} + +func TestApplyTrackerFacts_NotFetchedIsNoop(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + before := st.sessions["mer-1"] + if err := m.ApplyTrackerFacts(ctx, "mer-1", ports.TrackerObservation{Fetched: false}); err != nil { + t.Fatalf("ApplyTrackerFacts: %v", err) + } + if st.sessions["mer-1"] != before { + t.Fatalf("not-fetched observation must not mutate state") + } + if len(msg.msgs) != 0 { + t.Fatalf("not-fetched observation must not nudge") + } +} + +func TestApplyTrackerFacts_TerminatedSessionDoesNotRefireOrNudge(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer", IsTerminated: true, Activity: domain.Activity{State: domain.ActivityExited}} + o := ports.TrackerObservation{ + Fetched: true, + Issue: ports.TrackerIssueObservation{URL: "https://github.com/o/r/issues/1", State: domain.IssueOpen}, + Comments: []ports.TrackerCommentObservation{ + {ID: "bot-1", Body: "x", IsBot: true}, + }, + Changed: ports.TrackerChanged{Comments: true}, + } + if err := m.ApplyTrackerFacts(ctx, "mer-1", o); err != nil { + t.Fatalf("ApplyTrackerFacts: %v", err) + } + if len(msg.msgs) != 0 { + t.Fatalf("terminated session must not receive nudges, got %v", msg.msgs) + } +} + func TestPRObservation_RetriesAfterMessengerFailure(t *testing.T) { m, st, msg := newManager() st.sessions["mer-1"] = working("mer-1") diff --git a/backend/internal/lifecycle/reactions.go b/backend/internal/lifecycle/reactions.go index c921a93..71e8e8e 100644 --- a/backend/internal/lifecycle/reactions.go +++ b/backend/internal/lifecycle/reactions.go @@ -3,6 +3,7 @@ package lifecycle import ( "context" "encoding/json" + "log/slog" "strings" "sync" @@ -154,6 +155,77 @@ func scmToPRObservation(o ports.SCMObservation) ports.PRObservation { return pr } +// ApplyTrackerFacts reacts to a fetched Tracker issue observation. It owns the +// issue-driven side of session lifecycle and the initial bot-mention nudge; +// it does NOT persist tracker rows (the future Tracker observer in #35 owns +// the read-side persistence path). +// +// Reactions today: +// - Issue terminal (state == done or cancelled) → MarkTerminated. The +// reducer is idempotent — repeat observations on an already-terminated +// session are no-ops because MarkTerminated skips when IsTerminated. +// - Assignee changed → log only. No session-state reaction yet; the policy +// for "assignee changed away from AO" is reserved for the write-side work +// tracked by #40. +// - New bot comment → one-time nudge using the same sendOnce + dedup +// signature pattern as the SCM lane. Dedup is in-memory only for now; +// cross-restart persistence lands with the Tracker observer (issue #35) +// when issue-row signature storage is on the table. +func (m *Manager) ApplyTrackerFacts(ctx context.Context, id domain.SessionID, o ports.TrackerObservation) error { + if !o.Fetched { + return nil + } + if isTerminalTrackerState(o.Issue.State) { + return m.MarkTerminated(ctx, id) + } + rec, ok, err := m.store.GetSession(ctx, id) + if err != nil || !ok { + return err + } + if rec.IsTerminated || rec.Activity.State == domain.ActivityWaitingInput { + return nil + } + if o.Changed.Assignee { + slog.Default().Info("lifecycle: tracker issue assignee changed", + "session", id, "issue", o.Issue.URL, "assignee", o.Issue.Assignee) + } + if o.Changed.Comments { + bodies, ids := newBotCommentContent(o.Comments) + if len(ids) > 0 { + msg := "A bot left a new comment on your tracker issue. Address it and update the session." + if joined := strings.Join(bodies, "\n\n"); strings.TrimSpace(joined) != "" { + msg += "\n\n" + joined + } + // Empty prURL routes sendOnce through its in-memory-only branch: + // the PR-row signature load/persist is skipped, so the dedup + // survives only for the lifetime of this Manager. Cross-restart + // persistence ships with #35. + return m.sendOnce(ctx, id, "", "tracker-bot:"+o.Issue.URL, strings.Join(ids, ","), msg, 0) + } + } + return nil +} + +func isTerminalTrackerState(state domain.NormalizedIssueState) bool { + return state == domain.IssueDone || state == domain.IssueCancelled +} + +func newBotCommentContent(comments []ports.TrackerCommentObservation) ([]string, []string) { + bodies := make([]string, 0, len(comments)) + ids := make([]string, 0, len(comments)) + for _, c := range comments { + if !c.IsBot { + continue + } + if c.ID == "" && strings.TrimSpace(c.Body) == "" { + continue + } + bodies = append(bodies, c.Body) + ids = append(ids, c.ID) + } + return bodies, ids +} + func firstSCMNonEmpty(a, b string) string { if strings.TrimSpace(a) != "" { return a diff --git a/backend/internal/ports/tracker_observations.go b/backend/internal/ports/tracker_observations.go new file mode 100644 index 0000000..6fabac5 --- /dev/null +++ b/backend/internal/ports/tracker_observations.go @@ -0,0 +1,107 @@ +// This file defines provider-neutral Tracker DTOs used at the boundary between +// the (future) Tracker observer, persistence layer, and lifecycle manager. The +// shape mirrors ports.SCMObservation so the lifecycle reducer in +// lifecycle.Manager has the same "Fetched + ObservedAt + normalized facts + +// Changed discriminator" contract for both lanes. +package ports + +import ( + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// TrackerObservation is the provider-neutral issue observation emitted by the +// Tracker observer and consumed by lifecycle.Manager.ApplyTrackerFacts. +// Provider adapters normalize their tracker-specific payloads into this DTO +// before the observer persists/notifies. +type TrackerObservation struct { + // Fetched is true only when the provider refresh succeeded and the nested + // facts are authoritative for this poll. + Fetched bool + // ObservedAt is the observer timestamp for this normalized snapshot. + ObservedAt time.Time + + // Provider is the normalized tracker provider name, e.g. "github". + Provider string + // Host is the tracker host that served this observation. + Host string + // Repo is the full repository/project name shown to AO users, usually + // "owner/name" for GitHub-issue trackers. + Repo string + + // Issue contains the normalized issue facts (state, assignee, title, body). + Issue TrackerIssueObservation + // Comments contains the normalized comments observed on the issue. The + // observer is responsible for windowing/dedup; lifecycle treats every + // entry as a fact about the current snapshot. + Comments []TrackerCommentObservation + + // Changed marks which semantic buckets changed compared with the DB snapshot. + Changed TrackerChanged +} + +// TrackerChanged marks which semantic state buckets changed in the successful +// poll. The discriminator lets lifecycle skip work cheaply when only one +// bucket moved; today it also lets the reducer fire reactions on the right +// edges (assignee-only change vs comment-only change). +type TrackerChanged struct { + // State is true when Issue.State changed since the last persisted snapshot. + State bool + // Assignee is true when Issue.Assignee changed since the last persisted snapshot. + Assignee bool + // Comments is true when the comment set changed (new comment, edit, or removal). + Comments bool +} + +// TrackerIssueObservation carries the normalized issue facts. The field set is +// deliberately the minimum that lifecycle reactions need today; provider +// adapters keep richer per-provider metadata behind their own packages. +type TrackerIssueObservation struct { + // URL is the canonical issue browser URL used as the persistence key. + URL string + // Number is the provider's issue number within the repository/project. + Number int + // State is AO's normalized issue state from domain.NormalizedIssueState + // (open, in_progress, review, done, cancelled). + State domain.NormalizedIssueState + // Title is the provider issue title. + Title string + // Body is the issue description as plain text/markdown. + Body string + // Assignee is the login/identifier of the currently primary assignee, or + // "" when the issue is unassigned. Multi-assignee tracking is not part of + // the lifecycle contract today. + Assignee string + // Author is the login/name of the issue author. + Author string + // Labels is the normalized label set on the issue. + Labels []string + + // CreatedAtProvider is the provider's issue creation timestamp. + CreatedAtProvider time.Time + // UpdatedAtProvider is the provider's last issue update timestamp. + UpdatedAtProvider time.Time + // ClosedAtProvider is the provider's close timestamp when the issue is closed. + ClosedAtProvider time.Time +} + +// TrackerCommentObservation is one normalized issue comment. +type TrackerCommentObservation struct { + // ID is the provider's stable comment identifier. + ID string + // Author is the provider login/name of the commenter. + Author string + // Body is the comment text. + Body string + // URL is a provider link to the comment. + URL string + // IsBot is true when the provider identifies the author as a bot. The + // lifecycle reducer treats new bot comments as actionable nudges. + IsBot bool + // CreatedAtProvider is the provider's comment creation timestamp. + CreatedAtProvider time.Time + // UpdatedAtProvider is the provider's last comment update timestamp when + // the provider exposes one. + UpdatedAtProvider time.Time +} From 6da17a1657358e3f8ec0e510e6f32d182dd908b3 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Sat, 6 Jun 2026 00:30:24 +0530 Subject: [PATCH 3/5] fix(observe): rename CacheSet param to avoid shadowing built-in max golangci-lint revive flagged the CacheSet generic helper's max parameter as shadowing the built-in max() function. Rename to maxEntries; signature change is internal to the observe package and the SCM observer's one-line wrappers pass the value positionally, so no call sites need updating. Co-Authored-By: Claude Opus 4.7 --- backend/internal/observe/observer.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/backend/internal/observe/observer.go b/backend/internal/observe/observer.go index 391e02a..6b82efe 100644 --- a/backend/internal/observe/observer.go +++ b/backend/internal/observe/observer.go @@ -95,21 +95,21 @@ func CheckCredentialsOnce(ctx context.Context, probe CredentialProbe, checked, d // CacheSet writes value to m[key] and tracks insertion order in *order for // bounded FIFO eviction. If the bucket already had key, order is left -// unchanged; otherwise key is appended. When len(*order) exceeds max, the -// oldest keys are evicted from both order and m. +// unchanged; otherwise key is appended. When len(*order) exceeds maxEntries, +// the oldest keys are evicted from both order and m. // -// max <= 0 disables eviction; callers that want bounded behavior must pass a -// positive max. The generic shape lets the same helper serve string-, time-, -// and bool-valued caches without per-type duplication. -func CacheSet[V any](m map[string]V, order *[]string, max int, key string, value V) { +// maxEntries <= 0 disables eviction; callers that want bounded behavior must +// pass a positive value. The generic shape lets the same helper serve +// string-, time-, and bool-valued caches without per-type duplication. +func CacheSet[V any](m map[string]V, order *[]string, maxEntries int, key string, value V) { if _, ok := m[key]; !ok { *order = append(*order, key) } m[key] = value - if max <= 0 { + if maxEntries <= 0 { return } - for len(*order) > max { + for len(*order) > maxEntries { evict := (*order)[0] *order = (*order)[1:] delete(m, evict) From 2321bb8be7880ce42afef8724861c517ff9570d6 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Sat, 6 Jun 2026 00:37:45 +0530 Subject: [PATCH 4/5] fix: honour disabled state in CheckCredentialsOnce + tighten bot-comment filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two P1 review findings on #116: 1. observe.CheckCredentialsOnce was returning (true, nil) on every call after the gate ran, even when the probe had marked the observer disabled, because the *checked short-circuit ignored *disabled. The SCM observer didn't surface this in practice — its Poll method has an independent `if o.disabled { return nil }` guard that runs first — but a future Tracker observer that relies on the helper's documented contract ("Observer stays disabled") would silently flip back to "credentials available" after the first poll. Change the short-circuit to `return !*disabled, nil` and lock the behavior with a regression test that issues repeat calls after the probe reported unavailable. 2. lifecycle.newBotCommentContent's "skip uninteresting comments" filter used && where it needed ||. A bot comment with an empty ID but a non-empty body slipped through and appended "" to the ids slice. If every bot comment in the observation had an empty ID, strings.Join produced "" — which matches the zero value of the in-memory dedup map, so sendOnce treated the nudge as already-sent and silently suppressed it forever. Switch to || so any comment missing either an ID or a body is dropped, and add a regression test that an empty-ID bot comment never nudges (and does not pollute the dedup state for a follow-up comment that has a real ID). 586 tests pass with -race. Co-Authored-By: Claude Opus 4.7 --- backend/internal/lifecycle/manager_test.go | 34 ++++++++++++++++++++++ backend/internal/lifecycle/reactions.go | 7 ++++- backend/internal/observe/observer.go | 5 +++- backend/internal/observe/observer_test.go | 15 +++++++++- 4 files changed, 58 insertions(+), 3 deletions(-) diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go index 15c2718..5a87298 100644 --- a/backend/internal/lifecycle/manager_test.go +++ b/backend/internal/lifecycle/manager_test.go @@ -413,6 +413,40 @@ func TestApplyTrackerFacts_NudgeSuppressedOnRepeat(t *testing.T) { } } +func TestApplyTrackerFacts_BotCommentWithEmptyIDIsIgnored(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + // Bot comment lacks an ID — without one we cannot dedup, and the + // zero-value signature collides with m.react.seen's empty default and + // would silently suppress every future nudge for this issue. The + // reducer must skip it entirely. + o := ports.TrackerObservation{ + Fetched: true, + Issue: ports.TrackerIssueObservation{URL: "https://github.com/o/r/issues/1", State: domain.IssueOpen}, + Comments: []ports.TrackerCommentObservation{ + {ID: "", Author: "ci-bot[bot]", Body: "no id, must be skipped", IsBot: true}, + }, + Changed: ports.TrackerChanged{Comments: true}, + } + if err := m.ApplyTrackerFacts(ctx, "mer-1", o); err != nil { + t.Fatalf("ApplyTrackerFacts: %v", err) + } + if len(msg.msgs) != 0 { + t.Fatalf("bot comment with empty ID must not nudge, got %v", msg.msgs) + } + // A subsequent, properly-formed bot comment must still nudge — the + // earlier empty-ID entry must not have polluted the dedup signature. + o.Comments = []ports.TrackerCommentObservation{ + {ID: "bot-1", Author: "ci-bot[bot]", Body: "now with an id", IsBot: true}, + } + if err := m.ApplyTrackerFacts(ctx, "mer-1", o); err != nil { + t.Fatalf("second ApplyTrackerFacts: %v", err) + } + if len(msg.msgs) != 1 { + t.Fatalf("follow-up bot comment with real ID should nudge, got %d: %v", len(msg.msgs), msg.msgs) + } +} + func TestApplyTrackerFacts_NotFetchedIsNoop(t *testing.T) { m, st, msg := newManager() st.sessions["mer-1"] = working("mer-1") diff --git a/backend/internal/lifecycle/reactions.go b/backend/internal/lifecycle/reactions.go index 71e8e8e..614d3db 100644 --- a/backend/internal/lifecycle/reactions.go +++ b/backend/internal/lifecycle/reactions.go @@ -217,7 +217,12 @@ func newBotCommentContent(comments []ports.TrackerCommentObservation) ([]string, if !c.IsBot { continue } - if c.ID == "" && strings.TrimSpace(c.Body) == "" { + // Both an ID and a body are required: ID anchors the dedup + // signature (an empty ID collapses to "" which collides with + // the zero value of m.react.seen[key] and silently suppresses + // the nudge), and a body is what we actually need to surface + // to the agent. + if c.ID == "" || strings.TrimSpace(c.Body) == "" { continue } bodies = append(bodies, c.Body) diff --git a/backend/internal/observe/observer.go b/backend/internal/observe/observer.go index 6b82efe..2b9022c 100644 --- a/backend/internal/observe/observer.go +++ b/backend/internal/observe/observer.go @@ -64,10 +64,13 @@ type CredentialProbe func(ctx context.Context) (available bool, err error) // - probe returns false → *checked = true, *disabled = true, returns (false, nil). Observer stays disabled. // - probe returns true → *checked = true, returns (true, nil). Subsequent calls bypass the probe. // +// Subsequent calls after the probe has run return (!*disabled, nil) so the +// disabled verdict is honoured on every poll, not just the first one. +// // A context-cancellation before the probe returns (false, ctx.Err()). func CheckCredentialsOnce(ctx context.Context, probe CredentialProbe, checked, disabled *bool, logger *slog.Logger, name string) (bool, error) { if *checked { - return true, nil + return !*disabled, nil } if err := ctx.Err(); err != nil { return false, err diff --git a/backend/internal/observe/observer_test.go b/backend/internal/observe/observer_test.go index 46cc077..ff476af 100644 --- a/backend/internal/observe/observer_test.go +++ b/backend/internal/observe/observer_test.go @@ -124,7 +124,8 @@ func TestCheckCredentialsOnce_ProbeAvailable(t *testing.T) { func TestCheckCredentialsOnce_ProbeUnavailableDisables(t *testing.T) { var checked, disabled bool - probe := func(context.Context) (bool, error) { return false, nil } + calls := 0 + probe := func(context.Context) (bool, error) { calls++; return false, nil } ok, err := CheckCredentialsOnce(context.Background(), probe, &checked, &disabled, quietLogger(), "test") if err != nil || ok { t.Fatalf("ok=%v err=%v, want (false, nil)", ok, err) @@ -132,6 +133,18 @@ func TestCheckCredentialsOnce_ProbeUnavailableDisables(t *testing.T) { if !checked || !disabled { t.Fatalf("after unavailable: checked=%v disabled=%v", checked, disabled) } + // Subsequent calls must keep reporting (false, nil) — the short-circuit + // on *checked still has to honour *disabled, otherwise a disabled + // observer's Poll path silently flips back to "credentials available". + for i := 0; i < 3; i++ { + ok, err := CheckCredentialsOnce(context.Background(), probe, &checked, &disabled, quietLogger(), "test") + if err != nil || ok { + t.Fatalf("repeat call %d: ok=%v err=%v, want (false, nil)", i, ok, err) + } + } + if calls != 1 { + t.Fatalf("probe should run exactly once even when disabled, ran %d times", calls) + } } func TestCheckCredentialsOnce_TransientErrorRetries(t *testing.T) { From 8ec9b46b44b82b02d5e9f0f747d61736376e421a Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Sat, 6 Jun 2026 00:44:16 +0530 Subject: [PATCH 5/5] test(observe): capture deadline once in poll-error spin-wait MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `TestStartPollLoop_LogsPollErrorWithoutPanic` spin-wait was computing the loop bound as `time.Now().Before(time.Now().Add(200ms))` on every iteration, which is permanently true — the loop could only exit via the `break`. Under a scheduler delay (heavy CI load or `GOMAXPROCS=1`) where two polls never land in time, the test would hang until the wall-clock kill rather than failing fast. Capture the deadline once before the loop, and tighten the assertion to actually require two polls + done-channel closure within a bounded window, matching `TestStartPollLoop_FirstPollImmediateThenTicks`. Co-Authored-By: Claude Opus 4.7 --- backend/internal/observe/observer_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/backend/internal/observe/observer_test.go b/backend/internal/observe/observer_test.go index ff476af..f961247 100644 --- a/backend/internal/observe/observer_test.go +++ b/backend/internal/observe/observer_test.go @@ -211,12 +211,20 @@ func TestStartPollLoop_LogsPollErrorWithoutPanic(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) done := StartPollLoop(ctx, 10*time.Millisecond, poll, quietLogger(), "test") - for time.Now().Before(time.Now().Add(200 * time.Millisecond)) { + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { if ran.Load() >= 2 { break } time.Sleep(5 * time.Millisecond) } cancel() - <-done + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("done channel not closed after cancel") + } + if ran.Load() < 2 { + t.Fatalf("expected at least 2 polls under error path, got %d", ran.Load()) + } }