diff --git a/backend/internal/storage/sqlite/wiring/adapter.go b/backend/internal/storage/sqlite/wiring/adapter.go new file mode 100644 index 00000000..8a8d017d --- /dev/null +++ b/backend/internal/storage/sqlite/wiring/adapter.go @@ -0,0 +1,107 @@ +// Package wiring bridges *sqlite.Store to the engine's outbound ports. It +// embeds the store (so the SessionStore reads/writes and PRWriter.RecentCheckStatuses +// promote directly) and supplies the PR conversions plus the PRFacts read-model +// that drives the derived display status. +// +// The adapter lives in its own package so the daemon's composition root and any +// in-process integration tests (e.g. backend/internal/integration) can share the +// same bridge instead of redefining it. +package wiring + +import ( + "context" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" +) + +// Adapter wraps *sqlite.Store and implements ports.SessionStore + ports.PRWriter. +// The embedded *sqlite.Store promotes CreateSession / UpdateSession / GetSession +// / ListSessions / ListAllSessions and RecentCheckStatuses verbatim; the two +// methods defined here are the ones that need shape translation between the port +// types and the sqlite row types. +type Adapter struct{ *sqlite.Store } + +var ( + _ ports.SessionStore = Adapter{} + _ ports.PRWriter = Adapter{} +) + +// PRFactsForSession picks the PR that drives display status — the most-recently +// updated non-closed PR, else the most recent — and folds in whether it has +// unresolved review comments. +func (a Adapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) { + rows, err := a.Store.ListPRsBySession(ctx, string(id)) // newest first + if err != nil { + return domain.PRFacts{}, err + } + if len(rows) == 0 { + return domain.PRFacts{}, nil + } + pick := rows[0] + for _, r := range rows { + if r.State == "draft" || r.State == "open" { + pick = r + break + } + } + facts := domain.PRFacts{ + URL: pick.URL, Number: int(pick.Number), Exists: true, + Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed", + CI: domain.CIState(pick.CIState), + Review: domain.ReviewDecision(pick.ReviewDecision), + Mergeability: domain.Mergeability(pick.Mergeability), + } + comments, err := a.Store.ListPRComments(ctx, pick.URL) + if err != nil { + return domain.PRFacts{}, err + } + for _, c := range comments { + if !c.Resolved { + facts.ReviewComments = true + break + } + } + return facts, nil +} + +func (a Adapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error { + row := sqlite.PRRow{ + URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number), + State: prState(pr), + ReviewDecision: string(pr.Review), + CIState: string(pr.CI), + Mergeability: string(pr.Mergeability), + UpdatedAt: pr.UpdatedAt, + } + checkRows := make([]sqlite.PRCheckRow, len(checks)) + for i, c := range checks { + checkRows[i] = sqlite.PRCheckRow{ + PRURL: c.PRURL, Name: c.Name, CommitHash: c.CommitHash, + Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt, + } + } + commentRows := make([]sqlite.PRCommentRow, len(comments)) + for i, c := range comments { + commentRows[i] = sqlite.PRCommentRow{ + PRURL: pr.URL, CommentID: c.ID, Author: c.Author, File: c.File, + Line: int64(c.Line), Body: c.Body, Resolved: c.Resolved, CreatedAt: c.CreatedAt, + } + } + return a.Store.WritePRObservation(ctx, row, checkRows, commentRows) +} + +// prState collapses the PR's bools into the single pr.state column value. +func prState(r ports.PRRow) string { + switch { + case r.Merged: + return "merged" + case r.Closed: + return "closed" + case r.Draft: + return "draft" + default: + return "open" + } +} diff --git a/backend/lifecycle_wiring.go b/backend/lifecycle_wiring.go index d736d653..8aecd470 100644 --- a/backend/lifecycle_wiring.go +++ b/backend/lifecycle_wiring.go @@ -3,19 +3,29 @@ package main import ( "context" "log/slog" + "path/filepath" + "sync" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/runtime/tmux" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/workspace/gitworktree" + "github.com/aoagents/agent-orchestrator/backend/internal/config" "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" "github.com/aoagents/agent-orchestrator/backend/internal/observe/reaper" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + "github.com/aoagents/agent-orchestrator/backend/internal/session" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/wiring" ) // lifecycleStack owns the running LCM + reaper. The LCM is the sole writer of // canonical transitions; the reaper is the OBSERVE-layer timer that probes live -// runtimes and reports facts back through it. +// runtimes and reports facts back through it. Adapter is exposed so the Session +// Manager construction in startSession can plug the same SessionStore + PRWriter +// instance the LCM already holds. type lifecycleStack struct { LCM *lifecycle.Manager + Adapter wiring.Adapter reaperDone <-chan struct{} } @@ -28,103 +38,60 @@ type lifecycleStack struct { // - reaper.MapRegistry{} — empty runtime registry, so the reaper ticks // escalations but probes nothing until the runtime plugins exist. func startLifecycle(ctx context.Context, store *sqlite.Store, logger *slog.Logger) (*lifecycleStack, error) { - a := storeAdapter{store} + a := wiring.Adapter{Store: store} lcm := lifecycle.New(a, a, noopNotifier{}, noopMessenger{}) rp := reaper.New(lcm, reaper.MapRegistry{}, reaper.Config{Logger: logger}) - return &lifecycleStack{LCM: lcm, reaperDone: rp.Start(ctx)}, nil + return &lifecycleStack{LCM: lcm, Adapter: a, reaperDone: rp.Start(ctx)}, nil } // Stop waits for the reaper goroutine to exit (the caller must have cancelled the // ctx passed to startLifecycle). func (l *lifecycleStack) Stop() { <-l.reaperDone } -// storeAdapter bridges *sqlite.Store to the engine's ports. It embeds the store -// (so CreateSession/UpdateSession/GetSession/ListSessions/ListAllSessions and -// RecentCheckStatuses promote directly) and adds the PR conversions + the -// PRFacts read-model the display status needs. -type storeAdapter struct{ *sqlite.Store } +// sessionStack holds the daemon's live Session Manager. It mirrors +// lifecycleStack's shape so a future teardown hook (worktree drain, runtime +// shutdown) has a place to attach. +type sessionStack struct { + SM *session.Manager +} -var ( - _ ports.SessionStore = storeAdapter{} - _ ports.PRWriter = storeAdapter{} -) +// startSession constructs the Session Manager over the real tmux Runtime and +// gitworktree Workspace, the LCM and adapter created by startLifecycle, and the +// loud-stub Agent / Messenger / Notifier ports that have no production +// implementations yet. It does NOT mount any HTTP routes — those come with the +// daemon lane (#10). Returning the SM here lets main hold the wired-but-quiet +// instance so future route wiring is a one-line plumb-through. +func startSession(ctx context.Context, cfg config.Config, ls *lifecycleStack, log *slog.Logger) (*sessionStack, error) { + _ = ctx // reserved for future ctx-aware plugin construction; today's tmux/gitworktree constructors are synchronous. + runtime := tmux.New(tmux.Options{}) -// PRFactsForSession picks the PR that drives display status — the most-recently -// updated non-closed PR, else the most recent — and folds in whether it has -// unresolved review comments. -func (a storeAdapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) { - rows, err := a.Store.ListPRsBySession(ctx, string(id)) // newest first - if err != nil { - return domain.PRFacts{}, err - } - if len(rows) == 0 { - return domain.PRFacts{}, nil - } - pick := rows[0] - for _, r := range rows { - if r.State == "draft" || r.State == "open" { - pick = r - break - } - } - facts := domain.PRFacts{ - URL: pick.URL, Number: int(pick.Number), Exists: true, - Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed", - CI: domain.CIState(pick.CIState), - Review: domain.ReviewDecision(pick.ReviewDecision), - Mergeability: domain.Mergeability(pick.Mergeability), - } - comments, err := a.Store.ListPRComments(ctx, pick.URL) + ws, err := gitworktree.New(gitworktree.Options{ + // ManagedRoot is the directory under which per-session worktrees are + // materialised. Co-located with the SQLite DB so a single AO_DATA_DIR + // override moves all durable per-user state together. + ManagedRoot: filepath.Join(cfg.DataDir, "worktrees"), + // An empty resolver fails every project lookup with a clear + // `no repo configured for project %q` error. That's the right loud + // failure until the projects table feeds repo paths into the resolver + // — hard-coding a single repo here would silently misroute spawns. + RepoResolver: gitworktree.StaticRepoResolver{}, + }) if err != nil { - return domain.PRFacts{}, err - } - for _, c := range comments { - if !c.Resolved { - facts.ReviewComments = true - break - } + return nil, err } - return facts, nil -} -func (a storeAdapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error { - row := sqlite.PRRow{ - URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number), - State: prState(pr), - ReviewDecision: string(pr.Review), - CIState: string(pr.CI), - Mergeability: string(pr.Mergeability), - UpdatedAt: pr.UpdatedAt, - } - checkRows := make([]sqlite.PRCheckRow, len(checks)) - for i, c := range checks { - checkRows[i] = sqlite.PRCheckRow{ - PRURL: c.PRURL, Name: c.Name, CommitHash: c.CommitHash, - Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt, - } - } - commentRows := make([]sqlite.PRCommentRow, len(comments)) - for i, c := range comments { - commentRows[i] = sqlite.PRCommentRow{ - PRURL: pr.URL, CommentID: c.ID, Author: c.Author, File: c.File, - Line: int64(c.Line), Body: c.Body, Resolved: c.Resolved, CreatedAt: c.CreatedAt, - } - } - return a.Store.WritePRObservation(ctx, row, checkRows, commentRows) -} + agent := newNoopAgent(log) -// prState collapses the PR's bools into the single pr.state column value. -func prState(r ports.PRRow) string { - switch { - case r.Merged: - return "merged" - case r.Closed: - return "closed" - case r.Draft: - return "draft" - default: - return "open" - } + sm := session.New(session.Deps{ + Runtime: runtime, + Agent: agent, + Workspace: ws, + Store: ls.Adapter, + Messenger: noopMessenger{}, + Lifecycle: ls.LCM, + }) + + return &sessionStack{SM: sm}, nil } // noopNotifier / noopMessenger are TEMPORARY stubs (see startLifecycle): the @@ -137,3 +104,50 @@ func (noopNotifier) Notify(context.Context, ports.Event) error { return nil } type noopMessenger struct{} func (noopMessenger) Send(context.Context, domain.SessionID, string) error { return nil } + +// agentNotWiredSentinel is the launch / restore command (and env-var key) +// noopAgent returns. tmux will try to exec a binary named exactly this and fail +// fast, so a Spawn against the loud stub surfaces a clear runtime error rather +// than starting a quiet, broken session. +const agentNotWiredSentinel = "AO_AGENT_HARNESS_NOT_WIRED" + +// noopAgent is a loud stub for ports.Agent. There is no production Agent +// adapter on main yet; rather than panic at construction, this struct lets the +// daemon stand up the Session Manager, then logs a single warning the first +// time any SM call route through it and returns sentinel commands that make +// the runtime layer fail loudly. +type noopAgent struct { + log *slog.Logger + once *sync.Once +} + +var _ ports.Agent = (*noopAgent)(nil) + +func newNoopAgent(log *slog.Logger) *noopAgent { + return &noopAgent{log: log, once: &sync.Once{}} +} + +func (n *noopAgent) warn() { + n.once.Do(func() { + n.log.Warn( + "agent harness not wired: Spawn/Restore will fail at the runtime layer until a ports.Agent adapter is built", + "sentinel", agentNotWiredSentinel, + "next_step", "implement a per-harness ports.Agent adapter and plug it into startSession", + ) + }) +} + +func (n *noopAgent) GetLaunchCommand(ports.AgentConfig) string { + n.warn() + return agentNotWiredSentinel +} + +func (n *noopAgent) GetEnvironment(ports.AgentConfig) map[string]string { + n.warn() + return map[string]string{agentNotWiredSentinel: "1"} +} + +func (n *noopAgent) GetRestoreCommand(string) string { + n.warn() + return agentNotWiredSentinel +} diff --git a/backend/main.go b/backend/main.go index 60d9e26e..9fdb7a5e 100644 --- a/backend/main.go +++ b/backend/main.go @@ -72,23 +72,35 @@ func run() error { // Bring up the Lifecycle Manager (sole store writer) and the reaper (OBSERVE // timer). This makes the write path live end-to-end: LCM write -> store -> DB - // trigger -> change_log -> poller -> broadcaster. The collaborators it needs - // that don't yet have production implementations (Notifier, AgentMessenger, - // runtime registry) are stubbed in lifecycle_wiring.go with TODO markers. - // - // NOT wired here yet — both await collaborators the daemon lane owns: - // - Session Manager: session.New needs Runtime/Agent/Workspace plugins to - // construct. Stubbing them would make Spawn a silent no-op (a footgun), - // so it's deferred rather than faked. The LCM already exposes the read - // surface (RunningSessions) the SM would wrap. - // - HTTP API routes: httpd.New takes no SM/LCM today; surfacing the store - // over HTTP needs a constructor signature change + handlers, tracked with - // the SM work since the routes call into it. + // trigger -> change_log -> poller -> broadcaster. lcStack, err := startLifecycle(ctx, store, log) if err != nil { return err } + // Bring up the Session Manager. Runtime (tmux) and Workspace (gitworktree) + // are real on main; ports.Agent has no production adapter yet, so a loud + // stub returns a sentinel command that makes any Spawn fail at the runtime + // layer rather than start a broken session quietly. Notifier and + // AgentMessenger remain stubbed alongside the LCM until their multiplexers + // land. No HTTP routes wire to this yet — the daemon lane (#10) owns API + // surfacing — so we hold the SM in a local until it does. + sStack, err := startSession(ctx, cfg, lcStack, log) + if err != nil { + // startSession is the first start* call after this point that can + // realistically fail while the cdc poller and the reaper are already + // running. Mirror the bottom-of-run shutdown sequence so both have + // drained before the deferred store.Close() fires. Defers would hit + // the LIFO trap (see comment after srv.Run), hence explicit. + stop() + lcStack.Stop() + if cdcErr := cdcPipe.Stop(); cdcErr != nil { + log.Error("cdc pipeline shutdown", "err", cdcErr) + } + return err + } + _ = sStack + runErr := srv.Run(ctx) // Shut the background goroutines down in order: cancel the context FIRST so diff --git a/backend/wiring_test.go b/backend/wiring_test.go index 74b314b0..14bb3b4c 100644 --- a/backend/wiring_test.go +++ b/backend/wiring_test.go @@ -2,20 +2,27 @@ package main import ( "context" + "io" + "log/slog" + "reflect" "sync" "testing" "time" + "unsafe" "github.com/aoagents/agent-orchestrator/backend/internal/cdc" + "github.com/aoagents/agent-orchestrator/backend/internal/config" "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + "github.com/aoagents/agent-orchestrator/backend/internal/session" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/wiring" ) // TestWiring_WriteFlowsToBroadcaster exercises the real boot path end to end: // a lifecycle write -> sqlite -> DB trigger -> change_log -> CDC poller -> -// broadcaster, through the production storeAdapter and cdcSource. +// broadcaster, through the production wiring.Adapter and cdcSource. func TestWiring_WriteFlowsToBroadcaster(t *testing.T) { ctx := context.Background() store, err := sqlite.Open(t.TempDir()) @@ -24,7 +31,7 @@ func TestWiring_WriteFlowsToBroadcaster(t *testing.T) { } defer store.Close() - a := storeAdapter{store} + a := wiring.Adapter{Store: store} lcm := lifecycle.New(a, a, noopNotifier{}, noopMessenger{}) bcast := cdc.NewBroadcaster() @@ -69,3 +76,81 @@ func TestWiring_WriteFlowsToBroadcaster(t *testing.T) { t.Fatalf("expected a change_log event for %s to reach the broadcaster, got %d events", rec.ID, len(got)) } } + +// TestWiring_SessionManagerSharesLifecycleStoreAndLCM verifies that startSession +// constructs an SM whose Store and Lifecycle dependencies are the exact same +// values the LCM holds: a single canonical-store + LCM pair, not two parallel +// stacks that would diverge under concurrent writes. The brief constraint +// forbids modifying session/manager.go to add accessors, so the assertion +// reaches into the unexported fields via reflect + unsafe — scoped to the test +// and isolated in inspectSessionDeps. +func TestWiring_SessionManagerSharesLifecycleStoreAndLCM(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + // Registered first so it runs LAST (after the reaper has drained). + t.Cleanup(func() { _ = store.Close() }) + + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + cfg := config.Config{DataDir: t.TempDir()} + + lcStack, err := startLifecycle(ctx, store, log) + if err != nil { + t.Fatal(err) + } + // lcStack.Stop blocks on the reaper goroutine, which only exits once its + // ctx is cancelled. Production main.go calls stop() before lcStack.Stop() + // for the same reason — same ordering here. + t.Cleanup(func() { + cancel() + lcStack.Stop() + }) + + sStack, err := startSession(ctx, cfg, lcStack, log) + if err != nil { + t.Fatal(err) + } + if sStack == nil || sStack.SM == nil { + t.Fatal("startSession returned nil Session Manager") + } + + gotStore, gotLCM := inspectSessionDeps(t, sStack.SM) + + // Store should be the exact wiring.Adapter the LCM was constructed with. + gotAdapter, ok := gotStore.(wiring.Adapter) + if !ok { + t.Fatalf("SM.store is %T, want wiring.Adapter", gotStore) + } + if gotAdapter.Store != lcStack.Adapter.Store { + t.Fatalf("SM.store wraps a different *sqlite.Store than lcStack.Adapter") + } + + // Lifecycle should be the exact *lifecycle.Manager pointer from startLifecycle. + gotLCMPtr, ok := gotLCM.(*lifecycle.Manager) + if !ok { + t.Fatalf("SM.lcm is %T, want *lifecycle.Manager", gotLCM) + } + if gotLCMPtr != lcStack.LCM { + t.Fatalf("SM.lcm pointer (%p) differs from lcStack.LCM (%p)", gotLCMPtr, lcStack.LCM) + } +} + +// inspectSessionDeps reads session.Manager's unexported store and lcm fields. +// The brief forbids modifying session/manager.go to expose them; we settle for +// reflect + unsafe scoped to this one test helper. If the field names change +// upstream, the type assertion (and this helper) is the only place to touch. +func inspectSessionDeps(t *testing.T, sm *session.Manager) (store any, lcm any) { + t.Helper() + v := reflect.ValueOf(sm).Elem() + storeField := v.FieldByName("store") + lcmField := v.FieldByName("lcm") + if !storeField.IsValid() || !lcmField.IsValid() { + t.Fatalf("session.Manager fields renamed: store.IsValid=%v lcm.IsValid=%v — update inspectSessionDeps", storeField.IsValid(), lcmField.IsValid()) + } + storeVal := reflect.NewAt(storeField.Type(), unsafe.Pointer(storeField.UnsafeAddr())).Elem() + lcmVal := reflect.NewAt(lcmField.Type(), unsafe.Pointer(lcmField.UnsafeAddr())).Elem() + return storeVal.Interface(), lcmVal.Interface() +}