From ad1c4dacece42cd7810ab005a98fc0f343203d3b Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Sun, 31 May 2026 18:34:51 +0530 Subject: [PATCH 1/4] test(integration): LCM+SM live-fire against real SQLite store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds backend/internal/integration with five end-to-end tests that hydrate the real lifecycle.Manager + session.Manager against a tmp SQLite store and exercise the full pipeline through the DB triggers and the CDC poller: - TestHappyPath_Spawn_PR_Kill — spawn -> SCM PR observation (open + CI passing) -> kill; asserts canonical row, pr row, and change_log event types (session_created/_updated, pr_created, pr_check_recorded). - TestRestoreRoundTrip_PreservesMetadata — spawn, kill, close store, reopen same DB path, hydrate fresh LCM/SM, Restore(); asserts AgentSessionID and the rest of SessionMetadata survive across the daemon restart. - TestCIFailureAndRecovery_NudgeThenClears — failing CI observation drives the CI-failed reaction nudge with the log tail injected; passing CI observation switches to approved-and-green human notify; pr_checks history reads back the failure (the brake's source of truth). - TestDetectingPersistsAcrossRestart — failed probe parks the session in detecting with detecting_* columns populated, round-trips across a close/reopen, alive probe clears the quarantine memory. - TestCDCPollerReceivesAllStages — drives the real cdc.Poller; asserts the trigger pipeline emits each expected event_type and seq is monotonic. Wiring gap fixed (minimal): goose v3 keeps baseFS/logger/dialect as package-level globals, so two concurrent sqlite.Open() calls — uncommon in production but normal under -race with t.Parallel() — race on goose.SetBaseFS/SetLogger/SetDialect inside migrate(). Added a process-level sync.Mutex around the migrate() call. ~11 lines, no signature changes. Scope notes (the task brief assumed a fancier architecture than what actually shipped in PR #37): - No outbox / consumer_offsets / janitor exist on main — the change_log table IS the durable, ordered source of truth (see cdc/event.go), so the brief's janitor-watermark step is skipped. - No reaction_trackers table / ReactionStore port — trackers are in-memory per lifecycle/reactions.go; persistence-round-trip there is N/A. - No revision column / Upsert(rec, eventType) — write-mutex serialises and change_log.seq orders, so the assertions land on event_type + seq, not on a per-row revision counter. All 219 tests pass under -race across 18 packages. lifecycle/fakes_test.go is untouched; existing unit tests still drive the in-memory fake. --- .../integration/lifecycle_sqlite_test.go | 644 ++++++++++++++++++ backend/internal/storage/sqlite/db.go | 11 + 2 files changed, 655 insertions(+) create mode 100644 backend/internal/integration/lifecycle_sqlite_test.go diff --git a/backend/internal/integration/lifecycle_sqlite_test.go b/backend/internal/integration/lifecycle_sqlite_test.go new file mode 100644 index 00000000..214bf083 --- /dev/null +++ b/backend/internal/integration/lifecycle_sqlite_test.go @@ -0,0 +1,644 @@ +// Package integration exercises the lifecycle + session lane against the real +// SQLite store and the real CDC trigger pipeline. Unit tests stay on the +// in-memory fakes in lifecycle/ and session/; these live-fire tests prove the +// wiring across packages actually flows: SM -> store row -> LCM mutate -> store +// update -> DB trigger -> change_log read. +package integration + +import ( + "context" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/cdc" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" + "github.com/aoagents/agent-orchestrator/backend/internal/session" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" +) + +// ---- store adapter (mirrors backend.storeAdapter so integration tests don't +// import package main; kept local and minimal). ---- +// +// MIRROR OF backend/lifecycle_wiring.go's storeAdapter. The integration tests +// can't import package main, so the small set of methods that bridge +// *sqlite.Store to ports.SessionStore + ports.PRWriter is duplicated here. Keep +// in sync; the obvious follow-up is to extract the production adapter into a +// shared internal package once the lane stabilises. + +type storeAdapter struct{ *sqlite.Store } + +var ( + _ ports.SessionStore = storeAdapter{} + _ ports.PRWriter = storeAdapter{} +) + +func (a storeAdapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) { + rows, err := a.Store.ListPRsBySession(ctx, string(id)) + if err != nil || len(rows) == 0 { + return domain.PRFacts{}, err + } + pick := rows[0] + for _, r := range rows { + if r.State == "draft" || r.State == "open" { + pick = r + break + } + } + facts := domain.PRFacts{ + URL: pick.URL, Number: int(pick.Number), Exists: true, + Draft: pick.State == "draft", + Merged: pick.State == "merged", + Closed: pick.State == "closed", + CI: domain.CIState(pick.CIState), + Review: domain.ReviewDecision(pick.ReviewDecision), + Mergeability: domain.Mergeability(pick.Mergeability), + } + comments, err := a.Store.ListPRComments(ctx, pick.URL) + if err != nil { + return domain.PRFacts{}, err + } + for _, c := range comments { + if !c.Resolved { + facts.ReviewComments = true + break + } + } + return facts, nil +} + +func (a storeAdapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error { + state := "open" + switch { + case pr.Merged: + state = "merged" + case pr.Closed: + state = "closed" + case pr.Draft: + state = "draft" + } + row := sqlite.PRRow{ + URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number), + State: state, ReviewDecision: string(pr.Review), + CIState: string(pr.CI), Mergeability: string(pr.Mergeability), UpdatedAt: pr.UpdatedAt, + } + checkRows := make([]sqlite.PRCheckRow, len(checks)) + for i, c := range checks { + checkRows[i] = sqlite.PRCheckRow{ + PRURL: c.PRURL, Name: c.Name, CommitHash: c.CommitHash, + Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt, + } + } + commentRows := make([]sqlite.PRCommentRow, len(comments)) + for i, c := range comments { + commentRows[i] = sqlite.PRCommentRow{ + PRURL: pr.URL, CommentID: c.ID, Author: c.Author, File: c.File, + Line: int64(c.Line), Body: c.Body, Resolved: c.Resolved, CreatedAt: c.CreatedAt, + } + } + return a.Store.WritePRObservation(ctx, row, checkRows, commentRows) +} + +// ---- plugin fakes (minimal: only enough to drive SM through real LCM) ---- + +type stubRuntime struct { + id, name string +} + +func (s *stubRuntime) Create(_ context.Context, cfg ports.RuntimeConfig) (ports.RuntimeHandle, error) { + return ports.RuntimeHandle{ID: s.id, RuntimeName: s.name}, nil +} +func (s *stubRuntime) Destroy(context.Context, ports.RuntimeHandle) error { return nil } +func (s *stubRuntime) IsAlive(context.Context, ports.RuntimeHandle) (bool, error) { + return true, nil +} + +type stubAgent struct{} + +func (stubAgent) GetLaunchCommand(ports.AgentConfig) string { return "launch" } +func (stubAgent) GetEnvironment(ports.AgentConfig) map[string]string { return map[string]string{} } +func (stubAgent) GetRestoreCommand(id string) string { return "resume " + id } + +type stubWorkspace struct { + root string +} + +func (w *stubWorkspace) Create(_ context.Context, cfg ports.WorkspaceConfig) (ports.WorkspaceInfo, error) { + return ports.WorkspaceInfo{ + Path: filepath.Join(w.root, string(cfg.SessionID)), + Branch: cfg.Branch, + SessionID: cfg.SessionID, + ProjectID: cfg.ProjectID, + }, nil +} +func (w *stubWorkspace) Destroy(context.Context, ports.WorkspaceInfo) error { return nil } +func (w *stubWorkspace) Restore(ctx context.Context, cfg ports.WorkspaceConfig) (ports.WorkspaceInfo, error) { + return w.Create(ctx, cfg) +} + +type captureMessenger struct { + mu sync.Mutex + msgs []string +} + +func (m *captureMessenger) Send(_ context.Context, _ domain.SessionID, msg string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.msgs = append(m.msgs, msg) + return nil +} +func (m *captureMessenger) drain() []string { + m.mu.Lock() + defer m.mu.Unlock() + out := append([]string(nil), m.msgs...) + m.msgs = nil + return out +} + +type captureNotifier struct { + mu sync.Mutex + events []ports.Event +} + +func (n *captureNotifier) Notify(_ context.Context, e ports.Event) error { + n.mu.Lock() + defer n.mu.Unlock() + n.events = append(n.events, e) + return nil +} +func (n *captureNotifier) drain() []ports.Event { + n.mu.Lock() + defer n.mu.Unlock() + out := append([]ports.Event(nil), n.events...) + n.events = nil + return out +} + +// ---- harness: real store + real LCM + real SM + change_log poller ---- + +type liveStack struct { + dataDir string + store *sqlite.Store + adapter storeAdapter + lcm *lifecycle.Manager + sm *session.Manager + notifier *captureNotifier + messenger *captureMessenger +} + +func openLiveStack(t *testing.T, dataDir string) *liveStack { + t.Helper() + store, err := sqlite.Open(dataDir) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + adapter := storeAdapter{store} + notifier := &captureNotifier{} + messenger := &captureMessenger{} + lcm := lifecycle.New(adapter, adapter, notifier, messenger) + + wsRoot := t.TempDir() + sm := session.New(session.Deps{ + Runtime: &stubRuntime{id: "h1", name: "tmux"}, + Agent: stubAgent{}, + Workspace: &stubWorkspace{root: wsRoot}, + Store: adapter, + Messenger: messenger, + Lifecycle: lcm, + }) + return &liveStack{ + dataDir: dataDir, + store: store, + adapter: adapter, + lcm: lcm, + sm: sm, + notifier: notifier, + messenger: messenger, + } +} + +func (s *liveStack) close(t *testing.T) { + t.Helper() + if err := s.store.Close(); err != nil { + t.Fatalf("close store: %v", err) + } +} + +func seedProject(t *testing.T, store *sqlite.Store, id string) { + t.Helper() + if err := store.UpsertProject(context.Background(), sqlite.ProjectRow{ + ID: id, Path: "/repo/" + id, RegisteredAt: time.Now(), + }); err != nil { + t.Fatalf("upsert project: %v", err) + } +} + +// ---- tests ---- + +// TestHappyPath drives Spawn -> SCM PR observation (open + CI passing) -> Kill, +// asserting via direct store reads that the canonical row, the PR row, and the +// change_log stream all reflect what each step contributed. +func TestHappyPath_Spawn_PR_Kill(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := openLiveStack(t, t.TempDir()) + defer st.close(t) + seedProject(t, st.store, "mer") + + // 1. Spawn — SM inserts the session row, LCM marks it live. + sess, err := st.sm.Spawn(ctx, ports.SpawnConfig{ + ProjectID: "mer", Kind: domain.KindWorker, Prompt: "ship it", + }) + if err != nil { + t.Fatalf("spawn: %v", err) + } + if sess.ID != "mer-1" { + t.Fatalf("want id mer-1, got %q", sess.ID) + } + + rec, ok, err := st.store.GetSession(ctx, sess.ID) + if err != nil || !ok { + t.Fatalf("get session: ok=%v err=%v", ok, err) + } + if !rec.Lifecycle.IsAlive { + t.Fatal("post-spawn: is_alive should be true") + } + if rec.Lifecycle.Session.State != domain.SessionNotStarted { + t.Fatalf("post-spawn state want not_started, got %q", rec.Lifecycle.Session.State) + } + if rec.Metadata.RuntimeHandleID != "h1" || rec.Metadata.RuntimeName != "tmux" { + t.Fatalf("post-spawn handles missing: %+v", rec.Metadata) + } + if rec.Metadata.WorkspacePath == "" || rec.Metadata.Prompt != "ship it" { + t.Fatalf("post-spawn metadata missing: %+v", rec.Metadata) + } + + // 2. SCM observes a fresh PR — open, CI passing. LCM writes the pr row + // atomically (one tx, triggers fire pr_created). + prURL := "https://github.com/repo/mer/pull/1" + if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{ + Fetched: true, URL: prURL, Number: 1, + CI: domain.CIPassing, Review: domain.ReviewNone, Mergeability: domain.MergeMergeable, + Checks: []ports.PRCheckRow{{ + Name: "ci/build", CommitHash: "abc123", Status: "passed", CreatedAt: time.Now(), + }}, + }); err != nil { + t.Fatalf("apply pr: %v", err) + } + prRow, ok, err := st.store.GetPR(ctx, prURL) + if err != nil || !ok { + t.Fatalf("get pr: ok=%v err=%v", ok, err) + } + if prRow.SessionID != string(sess.ID) || prRow.CIState != "passing" || prRow.State != "open" { + t.Fatalf("pr row wrong: %+v", prRow) + } + + // 3. Kill — SM routes to LCM and tears down runtime+workspace. + freed, err := st.sm.Kill(ctx, sess.ID, domain.TermManuallyKilled) + if err != nil || !freed { + t.Fatalf("kill freed=%v err=%v", freed, err) + } + rec, _, _ = st.store.GetSession(ctx, sess.ID) + if rec.Lifecycle.Session.State != domain.SessionTerminated || + rec.Lifecycle.TerminationReason != domain.TermManuallyKilled || + rec.Lifecycle.IsAlive { + t.Fatalf("post-kill canonical wrong: %+v", rec.Lifecycle) + } + + // 4. Assert the change_log captured the full timeline. The DB triggers + // write the only durable CDC; we don't want to assume an ordering of + // interleaved events, just that each expected event_type shows up. + rows, err := st.store.ReadChangeLogAfter(ctx, 0, 100) + if err != nil { + t.Fatalf("read change_log: %v", err) + } + seen := map[string]bool{} + for _, r := range rows { + seen[r.EventType] = true + } + for _, want := range []string{"session_created", "session_updated", "pr_created", "pr_check_recorded"} { + if !seen[want] { + t.Fatalf("missing change_log event %q (got: %v)", want, seen) + } + } +} + +// TestRestoreRoundTrip simulates a daemon restart: spawn a session, persist the +// kill, fully close the in-process LCM/SM, open a fresh stack against the SAME +// DB file, and Restore. The restored session must keep its metadata (the agent +// session id is the must-survive bit). +func TestRestoreRoundTrip_PreservesMetadata(t *testing.T) { + t.Parallel() + ctx := context.Background() + dir := t.TempDir() + st := openLiveStack(t, dir) + seedProject(t, st.store, "mer") + + // Phase A: spawn with an agent session id, then kill so the row is terminal + // and Restore is legal. + sess, err := st.sm.Spawn(ctx, ports.SpawnConfig{ + ProjectID: "mer", Kind: domain.KindWorker, Prompt: "remember me", + }) + if err != nil { + t.Fatalf("spawn: %v", err) + } + // fold an AgentSessionID into the row — the LCM does this through the spawn + // outcome on Restore too, but a fresh spawn doesn't (the agent has not + // reported one yet). We patch via the store so the restore branch has + // something to resume from. + rec, _, _ := st.store.GetSession(ctx, sess.ID) + rec.Metadata.AgentSessionID = "agent-xyz" + if err := st.store.UpdateSession(ctx, rec); err != nil { + t.Fatalf("patch agent id: %v", err) + } + if _, err := st.sm.Kill(ctx, sess.ID, domain.TermManuallyKilled); err != nil { + t.Fatalf("kill: %v", err) + } + st.close(t) + + // Phase B: reopen against the same data dir; everything in memory is gone. + st2 := openLiveStack(t, dir) + defer st2.close(t) + + // Confirm the row survived the restart. + rec2, ok, err := st2.store.GetSession(ctx, sess.ID) + if err != nil || !ok { + t.Fatalf("reopen get: ok=%v err=%v", ok, err) + } + if rec2.Metadata.AgentSessionID != "agent-xyz" { + t.Fatalf("agent session id lost across restart: %+v", rec2.Metadata) + } + if rec2.Lifecycle.Session.State != domain.SessionTerminated { + t.Fatalf("expected terminal after reopen, got %q", rec2.Lifecycle.Session.State) + } + + // Phase C: Restore — must drive a fresh OnSpawnCompleted and surface the + // preserved AgentSessionID into the new outcome. + restored, err := st2.sm.Restore(ctx, sess.ID) + if err != nil { + t.Fatalf("restore: %v", err) + } + if !restored.Lifecycle.IsAlive { + t.Fatal("restored session should be is_alive after spawn-completed") + } + if restored.Metadata.AgentSessionID != "agent-xyz" { + t.Fatalf("restored row dropped AgentSessionID: %+v", restored.Metadata) + } +} + +// TestCIFailureAndRecovery drives the CI-failed reaction path: a failing +// observation injects a nudge into the agent (messenger), a recovery +// observation (CI passing) flips state without re-firing the nudge, and the +// pr_checks history records both runs so the brake's "last 3 all failed" query +// reads the truth. +func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := openLiveStack(t, t.TempDir()) + defer st.close(t) + seedProject(t, st.store, "mer") + + sess, err := st.sm.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindWorker, Prompt: "."}) + if err != nil { + t.Fatalf("spawn: %v", err) + } + // Move the session out of not_started so the reaction path engages on real + // PR facts (not_started doesn't react on PRs). + if err := st.lcm.ApplyActivitySignal(ctx, sess.ID, ports.ActivitySignal{ + Valid: true, State: domain.ActivityActive, Source: domain.SourceHook, Timestamp: time.Now(), + }); err != nil { + t.Fatalf("activity: %v", err) + } + _ = st.messenger.drain() // ignore startup nudges, focus on CI + + prURL := "https://github.com/repo/mer/pull/2" + // Failing CI: handleCIFailure should send a CI-failed nudge with the log + // tail injected. + if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{ + Fetched: true, URL: prURL, Number: 2, + CI: domain.CIFailing, Mergeability: domain.MergeUnstable, + Checks: []ports.PRCheckRow{{ + Name: "ci/build", CommitHash: "c1", Status: "failed", LogTail: "panic: nil map", CreatedAt: time.Now(), + }}, + }); err != nil { + t.Fatalf("apply pr (failing): %v", err) + } + got := st.messenger.drain() + if len(got) == 0 { + t.Fatal("expected CI-failed nudge to the agent") + } + if !strings.Contains(got[0], "CI is failing") || !strings.Contains(got[0], "panic: nil map") { + t.Fatalf("ci-failed message missing content: %q", got[0]) + } + + // Brake confirmation: only one failure so far, RecentCheckStatuses should + // reflect it. + history, err := st.adapter.RecentCheckStatuses(ctx, prURL, "ci/build", 3) + if err != nil { + t.Fatalf("recent checks: %v", err) + } + if len(history) != 1 || history[0] != "failed" { + t.Fatalf("ci history wrong: %v", history) + } + + // Recovery: CI passing on a new commit. With the dedupe slot still on + // rxCIFailed, the dispatch path moves to rxApprovedGreen (mergeable) and + // the human notifier is the one that pages. + if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{ + Fetched: true, URL: prURL, Number: 2, + CI: domain.CIPassing, Mergeability: domain.MergeMergeable, + Checks: []ports.PRCheckRow{{ + Name: "ci/build", CommitHash: "c2", Status: "passed", CreatedAt: time.Now(), + }}, + }); err != nil { + t.Fatalf("apply pr (recovery): %v", err) + } + ev := st.notifier.drain() + if len(ev) == 0 { + t.Fatal("recovery: notifier should have received an event (approved-and-green)") + } + if !anyEventType(ev, "reaction.approved-and-green") { + t.Fatalf("recovery should notify approved-and-green, got %+v", ev) + } + + // And the pr row reflects the recovery in the canonical fact store. + prRow, ok, _ := st.store.GetPR(ctx, prURL) + if !ok || prRow.CIState != "passing" { + t.Fatalf("pr ci_state should be passing post-recovery: %+v", prRow) + } +} + +// TestDetectingPersistsAcrossRestart drives the runtime quarantine path: a +// failed probe puts the session into the detecting state, which means the +// decider's anti-flap memory MUST be flushed to the detecting_* columns and +// survive a restart. A subsequent alive probe must clear it. +func TestDetectingPersistsAcrossRestart(t *testing.T) { + t.Parallel() + ctx := context.Background() + dir := t.TempDir() + st := openLiveStack(t, dir) + seedProject(t, st.store, "mer") + + sess, err := st.sm.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindWorker, Prompt: "."}) + if err != nil { + t.Fatalf("spawn: %v", err) + } + // Move to working so the runtime decider doesn't bail on not_started. + if err := st.lcm.ApplyActivitySignal(ctx, sess.ID, ports.ActivitySignal{ + Valid: true, State: domain.ActivityActive, Source: domain.SourceHook, Timestamp: time.Now(), + }); err != nil { + t.Fatalf("activity: %v", err) + } + // One failed probe should park the session in detecting with attempts=1. + if err := st.lcm.ApplyRuntimeObservation(ctx, sess.ID, ports.RuntimeFacts{ + ObservedAt: time.Now(), + Runtime: ports.ProbeFailed, + Process: ports.ProbeFailed, + }); err != nil { + t.Fatalf("apply runtime: %v", err) + } + rec, _, _ := st.store.GetSession(ctx, sess.ID) + if rec.Lifecycle.Session.State != domain.SessionDetecting { + t.Fatalf("expected detecting state, got %q", rec.Lifecycle.Session.State) + } + if rec.Lifecycle.Detecting == nil || rec.Lifecycle.Detecting.Attempts == 0 { + t.Fatalf("detecting memory should be populated: %+v", rec.Lifecycle.Detecting) + } + + // Restart: close, reopen, verify the detecting_* columns round-tripped. + st.close(t) + st2 := openLiveStack(t, dir) + defer st2.close(t) + + rec2, ok, _ := st2.store.GetSession(ctx, sess.ID) + if !ok || rec2.Lifecycle.Detecting == nil { + t.Fatalf("detecting lost across restart: %+v", rec2.Lifecycle) + } + if rec2.Lifecycle.Detecting.Attempts != rec.Lifecycle.Detecting.Attempts { + t.Fatalf("attempts round-trip mismatch: pre=%d post=%d", + rec.Lifecycle.Detecting.Attempts, rec2.Lifecycle.Detecting.Attempts) + } + if rec2.Lifecycle.Detecting.EvidenceHash != rec.Lifecycle.Detecting.EvidenceHash { + t.Fatal("evidence hash dropped across restart") + } + + // Recovery probe — alive — must clear detecting and flip state out of it. + if err := st2.lcm.ApplyRuntimeObservation(ctx, sess.ID, ports.RuntimeFacts{ + ObservedAt: time.Now(), + Runtime: ports.ProbeAlive, + Process: ports.ProbeAlive, + }); err != nil { + t.Fatalf("recovery probe: %v", err) + } + rec3, _, _ := st2.store.GetSession(ctx, sess.ID) + if rec3.Lifecycle.Detecting != nil { + t.Fatalf("alive probe should clear detecting, got %+v", rec3.Lifecycle.Detecting) + } + if rec3.Lifecycle.Session.State == domain.SessionDetecting { + t.Fatalf("session state should leave detecting, got %q", rec3.Lifecycle.Session.State) + } +} + +// TestCDCPollerReceivesAllStages drives the full real pipeline including the +// in-process CDC poller — proving the trigger writes become broadcaster events +// in the same order the storage layer observes them. +func TestCDCPollerReceivesAllStages(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := openLiveStack(t, t.TempDir()) + defer st.close(t) + seedProject(t, st.store, "mer") + + bcast := cdc.NewBroadcaster() + src := pollerSource{st.store} + poller := cdc.NewPoller(src, bcast, cdc.PollerConfig{Batch: 100}) + + var ( + mu sync.Mutex + events []cdc.Event + ) + bcast.Subscribe(func(e cdc.Event) { + mu.Lock() + defer mu.Unlock() + events = append(events, e) + }) + + sess, err := st.sm.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindWorker, Prompt: "."}) + if err != nil { + t.Fatalf("spawn: %v", err) + } + if err := st.lcm.ApplyActivitySignal(ctx, sess.ID, ports.ActivitySignal{ + Valid: true, State: domain.ActivityActive, Source: domain.SourceHook, Timestamp: time.Now(), + }); err != nil { + t.Fatalf("activity: %v", err) + } + if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{ + Fetched: true, URL: "https://github.com/repo/mer/pull/3", Number: 3, + CI: domain.CIPassing, Mergeability: domain.MergeMergeable, + }); err != nil { + t.Fatalf("apply pr: %v", err) + } + + if err := poller.Poll(ctx); err != nil { + t.Fatalf("poll: %v", err) + } + + mu.Lock() + defer mu.Unlock() + types := map[cdc.EventType]bool{} + for _, e := range events { + types[e.Type] = true + } + for _, want := range []cdc.EventType{cdc.EventSessionCreated, cdc.EventSessionUpdated, cdc.EventPRCreated} { + if !types[want] { + t.Fatalf("poller missed event %q (got %+v)", want, types) + } + } + // Seq monotonicity invariant — the wiring assumes it; assert it here. + var prev int64 + for _, e := range events { + if e.Seq <= prev { + t.Fatalf("seq not monotonic: %d after %d", e.Seq, prev) + } + prev = e.Seq + } +} + +// ---- small helpers ---- + +type pollerSource struct{ *sqlite.Store } + +func (s pollerSource) EventsAfter(ctx context.Context, after int64, limit int) ([]cdc.Event, error) { + rows, err := s.Store.ReadChangeLogAfter(ctx, after, limit) + if err != nil { + return nil, err + } + out := make([]cdc.Event, len(rows)) + for i, r := range rows { + out[i] = cdc.Event{ + Seq: r.Seq, + ProjectID: r.ProjectID, + SessionID: r.SessionID, + Type: cdc.EventType(r.EventType), + Payload: []byte(r.Payload), + CreatedAt: r.CreatedAt, + } + } + return out, nil +} +func (s pollerSource) LatestSeq(ctx context.Context) (int64, error) { + return s.Store.MaxChangeLogSeq(ctx) +} + +func anyEventType(evs []ports.Event, t string) bool { + for _, e := range evs { + if e.Type == t { + return true + } + } + return false +} diff --git a/backend/internal/storage/sqlite/db.go b/backend/internal/storage/sqlite/db.go index 8b001d11..926d08d3 100644 --- a/backend/internal/storage/sqlite/db.go +++ b/backend/internal/storage/sqlite/db.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "github.com/pressly/goose/v3" // modernc.org/sqlite is the pure-Go (CGO-free) SQLite driver — chosen so the @@ -70,7 +71,17 @@ func Open(dataDir string) (*Store, error) { return NewStore(writeDB, readDB), nil } +// gooseMu serialises calls into goose. goose v3 keeps its baseFS / logger / +// dialect as package-level globals (goose.SetBaseFS, goose.SetLogger, +// goose.SetDialect), so two concurrent Open() calls — uncommon in production +// but normal in -race test runs — race on those writes. The cost of holding the +// mutex is one process-startup migration; readers and writers afterwards never +// touch goose. +var gooseMu sync.Mutex + func migrate(db *sql.DB) error { + gooseMu.Lock() + defer gooseMu.Unlock() goose.SetBaseFS(migrationsFS) goose.SetLogger(goose.NopLogger()) if err := goose.SetDialect("sqlite3"); err != nil { From ee0af288a49dcf2e85e9cf789af41796d834ee7d Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Sun, 31 May 2026 18:37:06 +0530 Subject: [PATCH 2/4] chore: gofmt integration test file The Check formatting step failed on the stub method alignment in stubAgent's three method declarations. Re-run of gofmt aligns the column gutter on those signatures; no behavioural change. --- backend/internal/integration/lifecycle_sqlite_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/internal/integration/lifecycle_sqlite_test.go b/backend/internal/integration/lifecycle_sqlite_test.go index 214bf083..e3956110 100644 --- a/backend/internal/integration/lifecycle_sqlite_test.go +++ b/backend/internal/integration/lifecycle_sqlite_test.go @@ -119,9 +119,9 @@ func (s *stubRuntime) IsAlive(context.Context, ports.RuntimeHandle) (bool, error type stubAgent struct{} -func (stubAgent) GetLaunchCommand(ports.AgentConfig) string { return "launch" } +func (stubAgent) GetLaunchCommand(ports.AgentConfig) string { return "launch" } func (stubAgent) GetEnvironment(ports.AgentConfig) map[string]string { return map[string]string{} } -func (stubAgent) GetRestoreCommand(id string) string { return "resume " + id } +func (stubAgent) GetRestoreCommand(id string) string { return "resume " + id } type stubWorkspace struct { root string From 6552e1eee09d7b21d2759aff9eea71b5e0c1dd4f Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Sun, 31 May 2026 18:46:31 +0530 Subject: [PATCH 3/4] test(integration): address greptile review - Idempotent close + t.Cleanup so a mid-test t.Fatalf can't leak the SQLite handle for the rest of the binary run. Restart-style tests still call close() explicitly between phases; the cleanup hook becomes a no-op once that runs. - Drop the hardcoded "mer-1" assertion in TestHappyPath; assert the structural invariant (project-scoped, non-empty id) instead, so the test does not couple to the {project}-{counter} generation detail. - Realign the test storeAdapter's PRFactsForSession + WritePR bodies to be line-for-line identical to backend/lifecycle_wiring.go's production adapter (extracted prState helper, separated err vs empty-rows checks), so a future divergence shows up as a diff at review time. The proper fix (extract to internal/storeutil) remains out of scope per the brief's "do NOT redesign anything". All 219 tests still pass under -race. --- .../integration/lifecycle_sqlite_test.go | 81 +++++++++++++------ 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/backend/internal/integration/lifecycle_sqlite_test.go b/backend/internal/integration/lifecycle_sqlite_test.go index e3956110..f2f75a71 100644 --- a/backend/internal/integration/lifecycle_sqlite_test.go +++ b/backend/internal/integration/lifecycle_sqlite_test.go @@ -21,14 +21,15 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" ) -// ---- store adapter (mirrors backend.storeAdapter so integration tests don't -// import package main; kept local and minimal). ---- +// ---- store adapter ---- // // MIRROR OF backend/lifecycle_wiring.go's storeAdapter. The integration tests // can't import package main, so the small set of methods that bridge -// *sqlite.Store to ports.SessionStore + ports.PRWriter is duplicated here. Keep -// in sync; the obvious follow-up is to extract the production adapter into a -// shared internal package once the lane stabilises. +// *sqlite.Store to ports.SessionStore + ports.PRWriter is duplicated here. +// Function bodies are line-for-line identical to the production adapter so a +// future divergence shows up as a real diff in code review; the obvious +// follow-up is to extract the production adapter into a shared internal +// package — explicitly out of scope for this PR ("do NOT redesign anything"). type storeAdapter struct{ *sqlite.Store } @@ -39,9 +40,12 @@ var ( func (a storeAdapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) { rows, err := a.Store.ListPRsBySession(ctx, string(id)) - if err != nil || len(rows) == 0 { + if err != nil { return domain.PRFacts{}, err } + if len(rows) == 0 { + return domain.PRFacts{}, nil + } pick := rows[0] for _, r := range rows { if r.State == "draft" || r.State == "open" { @@ -51,9 +55,7 @@ func (a storeAdapter) PRFactsForSession(ctx context.Context, id domain.SessionID } facts := domain.PRFacts{ URL: pick.URL, Number: int(pick.Number), Exists: true, - Draft: pick.State == "draft", - Merged: pick.State == "merged", - Closed: pick.State == "closed", + Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed", CI: domain.CIState(pick.CIState), Review: domain.ReviewDecision(pick.ReviewDecision), Mergeability: domain.Mergeability(pick.Mergeability), @@ -72,19 +74,13 @@ func (a storeAdapter) PRFactsForSession(ctx context.Context, id domain.SessionID } func (a storeAdapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error { - state := "open" - switch { - case pr.Merged: - state = "merged" - case pr.Closed: - state = "closed" - case pr.Draft: - state = "draft" - } row := sqlite.PRRow{ URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number), - State: state, ReviewDecision: string(pr.Review), - CIState: string(pr.CI), Mergeability: string(pr.Mergeability), UpdatedAt: pr.UpdatedAt, + State: prState(pr), + ReviewDecision: string(pr.Review), + CIState: string(pr.CI), + Mergeability: string(pr.Mergeability), + UpdatedAt: pr.UpdatedAt, } checkRows := make([]sqlite.PRCheckRow, len(checks)) for i, c := range checks { @@ -103,6 +99,21 @@ func (a storeAdapter) WritePR(ctx context.Context, pr ports.PRRow, checks []port return a.Store.WritePRObservation(ctx, row, checkRows, commentRows) } +// prState mirrors the production helper of the same name in +// backend/lifecycle_wiring.go. +func prState(r ports.PRRow) string { + switch { + case r.Merged: + return "merged" + case r.Closed: + return "closed" + case r.Draft: + return "draft" + default: + return "open" + } +} + // ---- plugin fakes (minimal: only enough to drive SM through real LCM) ---- type stubRuntime struct { @@ -188,8 +199,14 @@ type liveStack struct { sm *session.Manager notifier *captureNotifier messenger *captureMessenger + + closed bool // guard so the explicit close() and t.Cleanup don't double-close } +// openLiveStack opens the store + hydrates the LCM/SM and registers an +// idempotent t.Cleanup so a mid-test t.Fatalf can't leak the SQLite handle. +// Tests that need to simulate a daemon restart still call close() explicitly +// between phases; the cleanup hook becomes a no-op once that runs. func openLiveStack(t *testing.T, dataDir string) *liveStack { t.Helper() store, err := sqlite.Open(dataDir) @@ -210,7 +227,7 @@ func openLiveStack(t *testing.T, dataDir string) *liveStack { Messenger: messenger, Lifecycle: lcm, }) - return &liveStack{ + st := &liveStack{ dataDir: dataDir, store: store, adapter: adapter, @@ -219,10 +236,24 @@ func openLiveStack(t *testing.T, dataDir string) *liveStack { notifier: notifier, messenger: messenger, } + t.Cleanup(func() { + if st.closed { + return + } + // Best-effort: failures here would be noise after t.Fatalf already + // recorded the real cause. + _ = st.store.Close() + st.closed = true + }) + return st } func (s *liveStack) close(t *testing.T) { t.Helper() + if s.closed { + return + } + s.closed = true if err := s.store.Close(); err != nil { t.Fatalf("close store: %v", err) } @@ -249,15 +280,17 @@ func TestHappyPath_Spawn_PR_Kill(t *testing.T) { defer st.close(t) seedProject(t, st.store, "mer") - // 1. Spawn — SM inserts the session row, LCM marks it live. + // 1. Spawn — SM inserts the session row, LCM marks it live. We only assert + // the structural invariant of the id (project-scoped, non-empty), not the + // literal counter — that's a store-internal detail. sess, err := st.sm.Spawn(ctx, ports.SpawnConfig{ ProjectID: "mer", Kind: domain.KindWorker, Prompt: "ship it", }) if err != nil { t.Fatalf("spawn: %v", err) } - if sess.ID != "mer-1" { - t.Fatalf("want id mer-1, got %q", sess.ID) + if sess.ID == "" || !strings.HasPrefix(string(sess.ID), "mer-") { + t.Fatalf("expected project-scoped id like mer-N, got %q", sess.ID) } rec, ok, err := st.store.GetSession(ctx, sess.ID) From 1a9a9ec67eb0934e960e3664fb90ecc428bd9018 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Sun, 31 May 2026 18:54:55 +0530 Subject: [PATCH 4/4] test(integration): check ok/err before patching session row MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Greptile P1: the patch-then-Update path in TestRestoreRoundTrip was discarding GetSession's ok/err. A missed row would have handed UpdateSession a zero-value SessionRecord (ID==""), which matches zero rows and returns nil — Phase B then fails with the misleading "agent session id lost across restart" instead of the real cause. Fixed at the patch site (the only write path that could swallow the error) and at the two read-then-assert sites in TestDetectingPersistsAcrossRestart for consistency. Downstream assertions there would already fail loudly, but the explicit ok/err check makes the failure mode unambiguous. All 219 tests still pass under -race. --- .../integration/lifecycle_sqlite_test.go | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/backend/internal/integration/lifecycle_sqlite_test.go b/backend/internal/integration/lifecycle_sqlite_test.go index f2f75a71..47745508 100644 --- a/backend/internal/integration/lifecycle_sqlite_test.go +++ b/backend/internal/integration/lifecycle_sqlite_test.go @@ -382,8 +382,14 @@ func TestRestoreRoundTrip_PreservesMetadata(t *testing.T) { // fold an AgentSessionID into the row — the LCM does this through the spawn // outcome on Restore too, but a fresh spawn doesn't (the agent has not // reported one yet). We patch via the store so the restore branch has - // something to resume from. - rec, _, _ := st.store.GetSession(ctx, sess.ID) + // something to resume from. Check ok/err: without it, a missed row would + // hand UpdateSession a zero-value record (ID==""), which matches no rows + // and returns nil — Phase B would then fail with a misleading "agent id + // lost across restart" rather than the real cause. + rec, ok, err := st.store.GetSession(ctx, sess.ID) + if err != nil || !ok { + t.Fatalf("get session for patch: ok=%v err=%v", ok, err) + } rec.Metadata.AgentSessionID = "agent-xyz" if err := st.store.UpdateSession(ctx, rec); err != nil { t.Fatalf("patch agent id: %v", err) @@ -534,7 +540,10 @@ func TestDetectingPersistsAcrossRestart(t *testing.T) { }); err != nil { t.Fatalf("apply runtime: %v", err) } - rec, _, _ := st.store.GetSession(ctx, sess.ID) + rec, ok, err := st.store.GetSession(ctx, sess.ID) + if err != nil || !ok { + t.Fatalf("get session post-probe: ok=%v err=%v", ok, err) + } if rec.Lifecycle.Session.State != domain.SessionDetecting { t.Fatalf("expected detecting state, got %q", rec.Lifecycle.Session.State) } @@ -567,7 +576,10 @@ func TestDetectingPersistsAcrossRestart(t *testing.T) { }); err != nil { t.Fatalf("recovery probe: %v", err) } - rec3, _, _ := st2.store.GetSession(ctx, sess.ID) + rec3, ok3, err := st2.store.GetSession(ctx, sess.ID) + if err != nil || !ok3 { + t.Fatalf("get session post-recovery: ok=%v err=%v", ok3, err) + } if rec3.Lifecycle.Detecting != nil { t.Fatalf("alive probe should clear detecting, got %+v", rec3.Lifecycle.Detecting) }