Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions backend/internal/storage/sqlite/wiring/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Package wiring bridges *sqlite.Store to the engine's outbound ports. It
// embeds the store (so the SessionStore reads/writes and PRWriter.RecentCheckStatuses
// promote directly) and supplies the PR conversions plus the PRFacts read-model
// that drives the derived display status.
//
// The adapter lives in its own package so the daemon's composition root and any
// in-process integration tests (e.g. backend/internal/integration) can share the
// same bridge instead of redefining it.
package wiring

import (
"context"

"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
)

// Adapter wraps *sqlite.Store and implements ports.SessionStore + ports.PRWriter.
// The embedded *sqlite.Store promotes CreateSession / UpdateSession / GetSession
// / ListSessions / ListAllSessions and RecentCheckStatuses verbatim; the two
// methods defined here are the ones that need shape translation between the port
// types and the sqlite row types.
type Adapter struct{ *sqlite.Store }

var (
_ ports.SessionStore = Adapter{}
_ ports.PRWriter = Adapter{}
)

// PRFactsForSession picks the PR that drives display status — the most-recently
// updated non-closed PR, else the most recent — and folds in whether it has
// unresolved review comments.
func (a Adapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) {
rows, err := a.Store.ListPRsBySession(ctx, string(id)) // newest first
if err != nil {
return domain.PRFacts{}, err
}
if len(rows) == 0 {
return domain.PRFacts{}, nil
}
pick := rows[0]
for _, r := range rows {
if r.State == "draft" || r.State == "open" {
pick = r
break
}
}
facts := domain.PRFacts{
URL: pick.URL, Number: int(pick.Number), Exists: true,
Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed",
CI: domain.CIState(pick.CIState),
Review: domain.ReviewDecision(pick.ReviewDecision),
Mergeability: domain.Mergeability(pick.Mergeability),
}
comments, err := a.Store.ListPRComments(ctx, pick.URL)
if err != nil {
return domain.PRFacts{}, err
}
for _, c := range comments {
if !c.Resolved {
facts.ReviewComments = true
break
}
}
return facts, nil
}

func (a Adapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error {
row := sqlite.PRRow{
URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number),
State: prState(pr),
ReviewDecision: string(pr.Review),
CIState: string(pr.CI),
Mergeability: string(pr.Mergeability),
UpdatedAt: pr.UpdatedAt,
}
checkRows := make([]sqlite.PRCheckRow, len(checks))
for i, c := range checks {
checkRows[i] = sqlite.PRCheckRow{
PRURL: c.PRURL, Name: c.Name, CommitHash: c.CommitHash,
Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt,
}
}
commentRows := make([]sqlite.PRCommentRow, len(comments))
for i, c := range comments {
commentRows[i] = sqlite.PRCommentRow{
PRURL: pr.URL, CommentID: c.ID, Author: c.Author, File: c.File,
Line: int64(c.Line), Body: c.Body, Resolved: c.Resolved, CreatedAt: c.CreatedAt,
}
}
return a.Store.WritePRObservation(ctx, row, checkRows, commentRows)
}

// prState collapses the PR's bools into the single pr.state column value.
func prState(r ports.PRRow) string {
switch {
case r.Merged:
return "merged"
case r.Closed:
return "closed"
case r.Draft:
return "draft"
default:
return "open"
}
}
182 changes: 98 additions & 84 deletions backend/lifecycle_wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,29 @@ package main
import (
"context"
"log/slog"
"path/filepath"
"sync"

"github.com/aoagents/agent-orchestrator/backend/internal/adapters/runtime/tmux"
"github.com/aoagents/agent-orchestrator/backend/internal/adapters/workspace/gitworktree"
"github.com/aoagents/agent-orchestrator/backend/internal/config"
"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/lifecycle"
"github.com/aoagents/agent-orchestrator/backend/internal/observe/reaper"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
"github.com/aoagents/agent-orchestrator/backend/internal/session"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/wiring"
)

// lifecycleStack owns the running LCM + reaper. The LCM is the sole writer of
// canonical transitions; the reaper is the OBSERVE-layer timer that probes live
// runtimes and reports facts back through it.
// runtimes and reports facts back through it. Adapter is exposed so the Session
// Manager construction in startSession can plug the same SessionStore + PRWriter
// instance the LCM already holds.
type lifecycleStack struct {
LCM *lifecycle.Manager
Adapter wiring.Adapter
reaperDone <-chan struct{}
}

Expand All @@ -28,103 +38,60 @@ type lifecycleStack struct {
// - reaper.MapRegistry{} — empty runtime registry, so the reaper ticks
// escalations but probes nothing until the runtime plugins exist.
func startLifecycle(ctx context.Context, store *sqlite.Store, logger *slog.Logger) (*lifecycleStack, error) {
a := storeAdapter{store}
a := wiring.Adapter{Store: store}
lcm := lifecycle.New(a, a, noopNotifier{}, noopMessenger{})
rp := reaper.New(lcm, reaper.MapRegistry{}, reaper.Config{Logger: logger})
return &lifecycleStack{LCM: lcm, reaperDone: rp.Start(ctx)}, nil
return &lifecycleStack{LCM: lcm, Adapter: a, reaperDone: rp.Start(ctx)}, nil
}

// Stop waits for the reaper goroutine to exit (the caller must have cancelled the
// ctx passed to startLifecycle).
func (l *lifecycleStack) Stop() { <-l.reaperDone }

// storeAdapter bridges *sqlite.Store to the engine's ports. It embeds the store
// (so CreateSession/UpdateSession/GetSession/ListSessions/ListAllSessions and
// RecentCheckStatuses promote directly) and adds the PR conversions + the
// PRFacts read-model the display status needs.
type storeAdapter struct{ *sqlite.Store }
// sessionStack holds the daemon's live Session Manager. It mirrors
// lifecycleStack's shape so a future teardown hook (worktree drain, runtime
// shutdown) has a place to attach.
type sessionStack struct {
SM *session.Manager
}

var (
_ ports.SessionStore = storeAdapter{}
_ ports.PRWriter = storeAdapter{}
)
// startSession constructs the Session Manager over the real tmux Runtime and
// gitworktree Workspace, the LCM and adapter created by startLifecycle, and the
// loud-stub Agent / Messenger / Notifier ports that have no production
// implementations yet. It does NOT mount any HTTP routes — those come with the
// daemon lane (#10). Returning the SM here lets main hold the wired-but-quiet
// instance so future route wiring is a one-line plumb-through.
func startSession(ctx context.Context, cfg config.Config, ls *lifecycleStack, log *slog.Logger) (*sessionStack, error) {
_ = ctx // reserved for future ctx-aware plugin construction; today's tmux/gitworktree constructors are synchronous.
runtime := tmux.New(tmux.Options{})

// PRFactsForSession picks the PR that drives display status — the most-recently
// updated non-closed PR, else the most recent — and folds in whether it has
// unresolved review comments.
func (a storeAdapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) {
rows, err := a.Store.ListPRsBySession(ctx, string(id)) // newest first
if err != nil {
return domain.PRFacts{}, err
}
if len(rows) == 0 {
return domain.PRFacts{}, nil
}
pick := rows[0]
for _, r := range rows {
if r.State == "draft" || r.State == "open" {
pick = r
break
}
}
facts := domain.PRFacts{
URL: pick.URL, Number: int(pick.Number), Exists: true,
Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed",
CI: domain.CIState(pick.CIState),
Review: domain.ReviewDecision(pick.ReviewDecision),
Mergeability: domain.Mergeability(pick.Mergeability),
}
comments, err := a.Store.ListPRComments(ctx, pick.URL)
ws, err := gitworktree.New(gitworktree.Options{
// ManagedRoot is the directory under which per-session worktrees are
// materialised. Co-located with the SQLite DB so a single AO_DATA_DIR
// override moves all durable per-user state together.
ManagedRoot: filepath.Join(cfg.DataDir, "worktrees"),
// An empty resolver fails every project lookup with a clear
// `no repo configured for project %q` error. That's the right loud
// failure until the projects table feeds repo paths into the resolver
// — hard-coding a single repo here would silently misroute spawns.
RepoResolver: gitworktree.StaticRepoResolver{},
})
if err != nil {
return domain.PRFacts{}, err
}
for _, c := range comments {
if !c.Resolved {
facts.ReviewComments = true
break
}
return nil, err
}
return facts, nil
}

func (a storeAdapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error {
row := sqlite.PRRow{
URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number),
State: prState(pr),
ReviewDecision: string(pr.Review),
CIState: string(pr.CI),
Mergeability: string(pr.Mergeability),
UpdatedAt: pr.UpdatedAt,
}
checkRows := make([]sqlite.PRCheckRow, len(checks))
for i, c := range checks {
checkRows[i] = sqlite.PRCheckRow{
PRURL: c.PRURL, Name: c.Name, CommitHash: c.CommitHash,
Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt,
}
}
commentRows := make([]sqlite.PRCommentRow, len(comments))
for i, c := range comments {
commentRows[i] = sqlite.PRCommentRow{
PRURL: pr.URL, CommentID: c.ID, Author: c.Author, File: c.File,
Line: int64(c.Line), Body: c.Body, Resolved: c.Resolved, CreatedAt: c.CreatedAt,
}
}
return a.Store.WritePRObservation(ctx, row, checkRows, commentRows)
}
agent := newNoopAgent(log)

// prState collapses the PR's bools into the single pr.state column value.
func prState(r ports.PRRow) string {
switch {
case r.Merged:
return "merged"
case r.Closed:
return "closed"
case r.Draft:
return "draft"
default:
return "open"
}
sm := session.New(session.Deps{
Runtime: runtime,
Agent: agent,
Workspace: ws,
Store: ls.Adapter,
Messenger: noopMessenger{},
Lifecycle: ls.LCM,
})

return &sessionStack{SM: sm}, nil
}

// noopNotifier / noopMessenger are TEMPORARY stubs (see startLifecycle): the
Expand All @@ -137,3 +104,50 @@ func (noopNotifier) Notify(context.Context, ports.Event) error { return nil }
type noopMessenger struct{}

func (noopMessenger) Send(context.Context, domain.SessionID, string) error { return nil }

// agentNotWiredSentinel is the launch / restore command (and env-var key)
// noopAgent returns. tmux will try to exec a binary named exactly this and fail
// fast, so a Spawn against the loud stub surfaces a clear runtime error rather
// than starting a quiet, broken session.
const agentNotWiredSentinel = "AO_AGENT_HARNESS_NOT_WIRED"

// noopAgent is a loud stub for ports.Agent. There is no production Agent
// adapter on main yet; rather than panic at construction, this struct lets the
// daemon stand up the Session Manager, then logs a single warning the first
// time any SM call route through it and returns sentinel commands that make
// the runtime layer fail loudly.
type noopAgent struct {
log *slog.Logger
once *sync.Once
}

var _ ports.Agent = (*noopAgent)(nil)

func newNoopAgent(log *slog.Logger) *noopAgent {
return &noopAgent{log: log, once: &sync.Once{}}
}

func (n *noopAgent) warn() {
n.once.Do(func() {
n.log.Warn(
"agent harness not wired: Spawn/Restore will fail at the runtime layer until a ports.Agent adapter is built",
"sentinel", agentNotWiredSentinel,
"next_step", "implement a per-harness ports.Agent adapter and plug it into startSession",
)
})
}

func (n *noopAgent) GetLaunchCommand(ports.AgentConfig) string {
n.warn()
return agentNotWiredSentinel
}

func (n *noopAgent) GetEnvironment(ports.AgentConfig) map[string]string {
n.warn()
return map[string]string{agentNotWiredSentinel: "1"}
}

func (n *noopAgent) GetRestoreCommand(string) string {
n.warn()
return agentNotWiredSentinel
}
36 changes: 24 additions & 12 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,35 @@ func run() error {

// Bring up the Lifecycle Manager (sole store writer) and the reaper (OBSERVE
// timer). This makes the write path live end-to-end: LCM write -> store -> DB
// trigger -> change_log -> poller -> broadcaster. The collaborators it needs
// that don't yet have production implementations (Notifier, AgentMessenger,
// runtime registry) are stubbed in lifecycle_wiring.go with TODO markers.
//
// NOT wired here yet — both await collaborators the daemon lane owns:
// - Session Manager: session.New needs Runtime/Agent/Workspace plugins to
// construct. Stubbing them would make Spawn a silent no-op (a footgun),
// so it's deferred rather than faked. The LCM already exposes the read
// surface (RunningSessions) the SM would wrap.
// - HTTP API routes: httpd.New takes no SM/LCM today; surfacing the store
// over HTTP needs a constructor signature change + handlers, tracked with
// the SM work since the routes call into it.
// trigger -> change_log -> poller -> broadcaster.
lcStack, err := startLifecycle(ctx, store, log)
if err != nil {
return err
}

// Bring up the Session Manager. Runtime (tmux) and Workspace (gitworktree)
// are real on main; ports.Agent has no production adapter yet, so a loud
// stub returns a sentinel command that makes any Spawn fail at the runtime
// layer rather than start a broken session quietly. Notifier and
// AgentMessenger remain stubbed alongside the LCM until their multiplexers
// land. No HTTP routes wire to this yet — the daemon lane (#10) owns API
// surfacing — so we hold the SM in a local until it does.
sStack, err := startSession(ctx, cfg, lcStack, log)
if err != nil {
// startSession is the first start* call after this point that can
// realistically fail while the cdc poller and the reaper are already
// running. Mirror the bottom-of-run shutdown sequence so both have
// drained before the deferred store.Close() fires. Defers would hit
// the LIFO trap (see comment after srv.Run), hence explicit.
stop()
lcStack.Stop()
if cdcErr := cdcPipe.Stop(); cdcErr != nil {
log.Error("cdc pipeline shutdown", "err", cdcErr)
}
return err
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
_ = sStack

runErr := srv.Run(ctx)

// Shut the background goroutines down in order: cancel the context FIRST so
Expand Down
Loading
Loading