From 49b4e84d77e7326c2c0aeec98509fb1e80441ebd Mon Sep 17 00:00:00 2001 From: Pritom14 Date: Tue, 2 Jun 2026 20:42:32 +0530 Subject: [PATCH 1/2] feat(backend): wire PR engine into the daemon The PR observation subsystem was fully built and unit tested but never wired into the running daemon: the LCM was constructed with a nil messenger and a noopMessenger, and nothing produced PR observations. This makes the engine live end to end via branch self-discovery, with graceful degradation when no GitHub token is configured. - Real runtime messenger resolves a session's runtime handle and pastes via the zellij runtime, backing both human /send and lifecycle nudges. - FindPRForBranch discovery primitive on the GitHub provider. - PR poller that discovers, observes, persists, and drives nudges. - Daemon wiring: provider + PR service + poller goroutine with lifecycle-managed shutdown; disabled with a warning when no token. Closes #86 Co-Authored-By: Claude Opus 4.7 --- .../internal/adapters/scm/github/discover.go | 49 ++++ .../adapters/scm/github/discover_test.go | 74 ++++++ backend/internal/daemon/daemon.go | 18 +- backend/internal/daemon/lifecycle_wiring.go | 9 +- backend/internal/daemon/pr_wiring.go | 92 ++++++++ backend/internal/daemon/repo_resolver.go | 97 ++++++++ backend/internal/daemon/repo_resolver_test.go | 90 ++++++++ backend/internal/observe/prpoller/prpoller.go | 182 +++++++++++++++ .../observe/prpoller/prpoller_test.go | 211 ++++++++++++++++++ 9 files changed, 813 insertions(+), 9 deletions(-) create mode 100644 backend/internal/adapters/scm/github/discover.go create mode 100644 backend/internal/adapters/scm/github/discover_test.go create mode 100644 backend/internal/daemon/pr_wiring.go create mode 100644 backend/internal/daemon/repo_resolver.go create mode 100644 backend/internal/daemon/repo_resolver_test.go create mode 100644 backend/internal/observe/prpoller/prpoller.go create mode 100644 backend/internal/observe/prpoller/prpoller_test.go diff --git a/backend/internal/adapters/scm/github/discover.go b/backend/internal/adapters/scm/github/discover.go new file mode 100644 index 00000000..41158b69 --- /dev/null +++ b/backend/internal/adapters/scm/github/discover.go @@ -0,0 +1,49 @@ +package github + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + neturl "net/url" +) + +// FindPRForBranch resolves the open pull request whose head is the given +// branch in owner/repo, returning its github.com URL. found=false (with a nil +// error) means the branch has no open PR yet — the normal pre-PR state of a +// fresh session, not a failure. Network/auth/rate-limit errors are returned as +// errors so the poller can back off rather than treat them as "no PR". +// +// The head filter is owner:branch, which matches same-repo branches (the AO +// worktree model). Cross-fork PRs from a different head owner are out of scope +// for v1. +func (p *Provider) FindPRForBranch(ctx context.Context, owner, repo, branch string) (url string, found bool, err error) { + if owner == "" || repo == "" || branch == "" { + return "", false, fmt.Errorf("github scm: owner, repo, and branch are required") + } + q := neturl.Values{} + q.Set("head", owner+":"+branch) + q.Set("state", "open") + q.Set("per_page", "1") + + resp, err := p.client.doREST(ctx, http.MethodGet, repoPath(owner, repo, "pulls"), q, nil) + if err != nil { + return "", false, err + } + var pulls []struct { + HTMLURL string `json:"html_url"` + URL string `json:"url"` + } + if len(resp.Body) > 0 { + if err := json.Unmarshal(resp.Body, &pulls); err != nil { + return "", false, fmt.Errorf("github scm: decode pulls list: %w", err) + } + } + if len(pulls) == 0 { + return "", false, nil + } + if pulls[0].HTMLURL != "" { + return pulls[0].HTMLURL, true, nil + } + return pulls[0].URL, true, nil +} diff --git a/backend/internal/adapters/scm/github/discover_test.go b/backend/internal/adapters/scm/github/discover_test.go new file mode 100644 index 00000000..fd53bdcc --- /dev/null +++ b/backend/internal/adapters/scm/github/discover_test.go @@ -0,0 +1,74 @@ +package github + +import ( + "net/http" + "testing" +) + +func TestFindPRForBranch_Found(t *testing.T) { + f := newFakeGH(t) + f.on(http.MethodGet, "/repos/octocat/hello/pulls", func(w http.ResponseWriter, r *http.Request) { + if got := r.URL.Query().Get("head"); got != "octocat:feature-x" { + t.Errorf("head = %q, want octocat:feature-x", got) + } + if got := r.URL.Query().Get("state"); got != "open" { + t.Errorf("state = %q, want open", got) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`[{"html_url":"https://github.com/octocat/hello/pull/42","url":"https://api.github.com/repos/octocat/hello/pulls/42"}]`)) + }) + p := newProviderForTest(t, f) + + url, found, err := p.FindPRForBranch(ctx(), "octocat", "hello", "feature-x") + if err != nil { + t.Fatalf("FindPRForBranch: %v", err) + } + if !found { + t.Fatal("found = false, want true") + } + if url != "https://github.com/octocat/hello/pull/42" { + t.Fatalf("url = %q", url) + } +} + +func TestFindPRForBranch_NoOpenPR(t *testing.T) { + f := newFakeGH(t) + f.on(http.MethodGet, "/repos/octocat/hello/pulls", func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`[]`)) + }) + p := newProviderForTest(t, f) + + url, found, err := p.FindPRForBranch(ctx(), "octocat", "hello", "feature-x") + if err != nil { + t.Fatalf("FindPRForBranch: %v", err) + } + if found { + t.Fatalf("found = true (url %q), want false for empty list", url) + } +} + +func TestFindPRForBranch_AuthErrorSurfaces(t *testing.T) { + f := newFakeGH(t) + f.on(http.MethodGet, "/repos/octocat/hello/pulls", func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, `{"message":"Bad credentials"}`, http.StatusUnauthorized) + }) + p := newProviderForTest(t, f) + + if _, _, err := p.FindPRForBranch(ctx(), "octocat", "hello", "feature-x"); err == nil { + t.Fatal("expected error for 401, got nil") + } +} + +func TestFindPRForBranch_RequiresArgs(t *testing.T) { + p := &Provider{} + if _, _, err := p.FindPRForBranch(ctx(), "", "hello", "b"); err == nil { + t.Fatal("expected error for empty owner") + } + if _, _, err := p.FindPRForBranch(ctx(), "o", "", "b"); err == nil { + t.Fatal("expected error for empty repo") + } + if _, _, err := p.FindPRForBranch(ctx(), "o", "hello", ""); err == nil { + t.Fatal("expected error for empty branch") + } +} diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index 97a2ab20..70ed2c66 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -76,14 +76,15 @@ func Run() error { termMgr := terminal.NewManager(runtimeAdapter, cdcPipe.Broadcaster, log) defer termMgr.Close() + // The agent messenger resolves a session's live runtime handle from the + // store and sends validated user input into its zellij pane. It backs both + // human /send and the lifecycle PR nudges, so it is built before the LCM. + messenger := newSessionMessenger(store, runtimeAdapter, log) + // Bring up the Lifecycle Manager and the reaper first: it makes the session // lifecycle write path live (reducer write -> store -> DB trigger -> // change_log -> poller -> broadcaster) and gives startSession the shared LCM. - lcStack := startLifecycle(ctx, store, runtimeAdapter, log) - - // The agent messenger sends validated user input to the session's live - // zellij pane. Keep this path small until durable inbox semantics are needed. - messenger := newSessionMessenger(store, runtimeAdapter, log) + lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, log) // Wire the controller-facing session service over the same store + LCM, the // zellij runtime, a gitworktree workspace, the per-session agent resolver @@ -99,12 +100,18 @@ func Run() error { return fmt.Errorf("wire session service: %w", err) } + // Start the PR observation path: discover each live session's PR from its + // branch, observe it, persist, and drive lifecycle nudges through the shared + // LCM. Degrades to a no-op when no GitHub token is configured. + prStack := startPRPoller(ctx, store, lcStack.LCM, log) + srv, err := httpd.NewWithDeps(cfg, log, termMgr, httpd.APIDeps{ Projects: projectsvc.New(store), Sessions: sessionSvc, }) if err != nil { stop() + prStack.Stop() lcStack.Stop() if cdcErr := cdcPipe.Stop(); cdcErr != nil { log.Error("cdc pipeline shutdown", "err", cdcErr) @@ -119,6 +126,7 @@ func Run() error { // via defer) avoids the LIFO trap where a Stop() that blocks on ctx-cancel // runs before the cancel — which would hang any non-signal exit path. stop() + prStack.Stop() lcStack.Stop() if err := cdcPipe.Stop(); err != nil { log.Error("cdc pipeline shutdown", "err", err) diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index 69aae574..6024f4e4 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -32,8 +32,8 @@ type lifecycleStack struct { // startLifecycle constructs the Lifecycle Manager over the store and starts the // reaper. The goroutine stops when ctx is cancelled; Stop waits for it to drain. -func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, logger *slog.Logger) *lifecycleStack { - lcm := lifecycle.New(store, nil) +func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, messenger ports.AgentMessenger, logger *slog.Logger) *lifecycleStack { + lcm := lifecycle.New(store, messenger) rp := reaper.New(lcm, store, runtime, reaper.Config{Logger: logger}) return &lifecycleStack{LCM: lcm, reaperDone: rp.Start(ctx)} } @@ -44,8 +44,9 @@ func (l *lifecycleStack) Stop() { <-l.reaperDone } // startSession builds the controller-facing session service: a session manager // over the real zellij runtime, a per-session gitworktree workspace, the shared -// store + LCM, the per-session agent resolver (AO_AGENT default), and the -// agent messenger. The returned service is mounted at httpd APIDeps.Sessions. +// store + LCM, the per-session agent resolver (AO_AGENT default), and the live +// runtime messenger so human /send reaches the agent's pane. The returned +// service is mounted at httpd APIDeps.Sessions. func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, messenger ports.AgentMessenger, log *slog.Logger) (*sessionsvc.Service, error) { agents, err := buildAgentResolver(cfg.Agent, log) if err != nil { diff --git a/backend/internal/daemon/pr_wiring.go b/backend/internal/daemon/pr_wiring.go new file mode 100644 index 00000000..31940dd4 --- /dev/null +++ b/backend/internal/daemon/pr_wiring.go @@ -0,0 +1,92 @@ +package daemon + +import ( + "context" + "log/slog" + + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/scm/github" + "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" + "github.com/aoagents/agent-orchestrator/backend/internal/observe/prpoller" + prsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/pr" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" +) + +// prPollerStack owns the PR poller goroutine. When no GitHub token is +// configured the stack is inert (closed done channel, nothing started): PR +// observation degrades gracefully rather than failing daemon startup. +type prPollerStack struct { + done <-chan struct{} +} + +// Stop waits for the poller goroutine to exit. The caller must cancel the ctx +// passed to startPRPoller first. Safe on an inert stack. +func (s *prPollerStack) Stop() { + if s == nil || s.done == nil { + return + } + <-s.done +} + +// startPRPoller wires the PR observation path: a GitHub provider (token from +// env, falling back to `gh auth token`), the PR service (persist + lifecycle +// nudges over the shared store/LCM), and the poller that self-discovers each +// live session's PR from its branch. If no token resolves, the poller is +// skipped and the daemon runs without PR observation. +func startPRPoller(ctx context.Context, store *sqlite.Store, lcm *lifecycle.Manager, log *slog.Logger) *prPollerStack { + tokens := githubTokenSource() + if _, err := tokens.Token(ctx); err != nil { + log.Warn("PR poller disabled: no GitHub token (set AO_GITHUB_TOKEN/GITHUB_TOKEN or run `gh auth login`)", "err", err) + return &prPollerStack{} + } + + provider, err := github.NewProvider(github.ProviderOptions{Token: tokens}) + if err != nil { + log.Warn("PR poller disabled: could not build GitHub provider", "err", err) + return &prPollerStack{} + } + + manager := prsvc.New(prsvc.Deps{Writer: store, Lifecycle: lcm}) + resolver := gitRepoResolver{store: store} + poller := prpoller.New(store, provider, provider, manager, resolver, prpoller.Config{Logger: log}) + + log.Info("PR poller started", "interval", prpoller.DefaultTickInterval) + return &prPollerStack{done: poller.Start(ctx)} +} + +// githubTokenSource resolves a token from env first (AO_GITHUB_TOKEN, then +// GITHUB_TOKEN via EnvTokenSource's built-in fallback), then `gh auth token`. +func githubTokenSource() github.TokenSource { + return chainTokenSource{ + github.EnvTokenSource{EnvVars: []string{"AO_GITHUB_TOKEN"}}, + &github.GHTokenSource{}, + } +} + +// chainTokenSource returns the first source that yields a token, so a CI env +// var wins but a developer's `gh` login still works with no env set up. +type chainTokenSource []github.TokenSource + +func (c chainTokenSource) Token(ctx context.Context) (string, error) { + var lastErr error + for _, s := range c { + tok, err := s.Token(ctx) + if err == nil && tok != "" { + return tok, nil + } + lastErr = err + } + if lastErr == nil { + lastErr = github.ErrNoToken + } + return "", lastErr +} + +// InvalidateToken forwards to any chained source that supports invalidation, so +// a rotated token is picked up on the next request. +func (c chainTokenSource) InvalidateToken() { + for _, s := range c { + if inv, ok := s.(interface{ InvalidateToken() }); ok { + inv.InvalidateToken() + } + } +} diff --git a/backend/internal/daemon/repo_resolver.go b/backend/internal/daemon/repo_resolver.go new file mode 100644 index 00000000..76dcfd9e --- /dev/null +++ b/backend/internal/daemon/repo_resolver.go @@ -0,0 +1,97 @@ +package daemon + +import ( + "context" + "fmt" + "os/exec" + "strings" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// gitRepoResolver maps a project to its github owner/repo by reading the +// project's on-disk repo path from the store and parsing its origin remote. It +// backs the PR poller's per-project owner/repo lookup. +type gitRepoResolver struct { + store projectPathStore + // git is the shell-out hook; nil falls back to the real git binary. Tests + // inject a fake to avoid requiring a repo on disk. + git func(ctx context.Context, repoPath string) (string, error) +} + +type projectPathStore interface { + GetProject(ctx context.Context, id string) (domain.ProjectRecord, bool, error) +} + +// RepoIdent resolves owner/repo for projectID. It fails (rather than guessing) +// when the project is unregistered, has no repo path, or its origin remote +// isn't a parseable github URL — the poller treats a failure as "skip", so a +// non-github project simply never gets PR observations. +func (r gitRepoResolver) RepoIdent(ctx context.Context, projectID domain.ProjectID) (string, string, error) { + rec, ok, err := r.store.GetProject(ctx, string(projectID)) + if err != nil { + return "", "", fmt.Errorf("look up project %q: %w", projectID, err) + } + if !ok || rec.Path == "" { + return "", "", fmt.Errorf("project %q has no repo path on record", projectID) + } + run := r.git + if run == nil { + run = gitOriginURL + } + remote, err := run(ctx, rec.Path) + if err != nil { + return "", "", fmt.Errorf("read origin remote for %q: %w", projectID, err) + } + return parseOwnerRepo(remote) +} + +func gitOriginURL(ctx context.Context, repoPath string) (string, error) { + out, err := exec.CommandContext(ctx, "git", "-C", repoPath, "remote", "get-url", "origin").Output() + if err != nil { + return "", err + } + return string(out), nil +} + +// parseOwnerRepo extracts owner/repo from the common github remote URL shapes: +// +// https://github.com/owner/repo(.git) +// git@github.com:owner/repo(.git) +// ssh://git@github.com/owner/repo(.git) +// +// Only github hosts are accepted; anything else returns an error so the poller +// skips the project rather than POSTing to a non-github API. +func parseOwnerRepo(remote string) (string, string, error) { + s := strings.TrimSpace(remote) + if s == "" { + return "", "", fmt.Errorf("empty origin remote") + } + // Normalise scp-style (git@host:owner/repo) to a slash-delimited tail. + if !strings.Contains(s, "://") { + if at := strings.Index(s, "@"); at >= 0 { + s = s[at+1:] + } + s = strings.Replace(s, ":", "/", 1) + } else { + s = s[strings.Index(s, "://")+len("://"):] + if at := strings.Index(s, "@"); at >= 0 { + s = s[at+1:] + } + } + // s is now host/owner/repo(.git)[/...]. Require a github host. + parts := strings.Split(strings.Trim(s, "/"), "/") + if len(parts) < 3 { + return "", "", fmt.Errorf("origin remote %q is not an owner/repo url", remote) + } + host := strings.ToLower(parts[0]) + if host != "github.com" && host != "www.github.com" && !strings.HasSuffix(host, ".github.com") && !strings.HasSuffix(host, ".ghe.io") { + return "", "", fmt.Errorf("origin remote host %q is not github", host) + } + owner := parts[1] + repo := strings.TrimSuffix(parts[2], ".git") + if owner == "" || repo == "" { + return "", "", fmt.Errorf("origin remote %q is missing owner or repo", remote) + } + return owner, repo, nil +} diff --git a/backend/internal/daemon/repo_resolver_test.go b/backend/internal/daemon/repo_resolver_test.go new file mode 100644 index 00000000..7f790a2a --- /dev/null +++ b/backend/internal/daemon/repo_resolver_test.go @@ -0,0 +1,90 @@ +package daemon + +import ( + "context" + "errors" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func TestParseOwnerRepo(t *testing.T) { + cases := []struct { + in string + owner, repo string + wantErr bool + }{ + {"https://github.com/octocat/hello.git", "octocat", "hello", false}, + {"https://github.com/octocat/hello", "octocat", "hello", false}, + {"git@github.com:octocat/hello.git", "octocat", "hello", false}, + {"git@github.com:octocat/hello", "octocat", "hello", false}, + {"ssh://git@github.com/octocat/hello.git", "octocat", "hello", false}, + {"https://github.com/octocat/hello.git\n", "octocat", "hello", false}, + {"https://gitlab.com/octocat/hello.git", "", "", true}, + {"git@bitbucket.org:octocat/hello.git", "", "", true}, + {"", "", "", true}, + {"not a url", "", "", true}, + } + for _, tc := range cases { + owner, repo, err := parseOwnerRepo(tc.in) + if tc.wantErr { + if err == nil { + t.Errorf("parseOwnerRepo(%q): expected error, got %s/%s", tc.in, owner, repo) + } + continue + } + if err != nil { + t.Errorf("parseOwnerRepo(%q): %v", tc.in, err) + continue + } + if owner != tc.owner || repo != tc.repo { + t.Errorf("parseOwnerRepo(%q) = %s/%s, want %s/%s", tc.in, owner, repo, tc.owner, tc.repo) + } + } +} + +type fakeProjectStore struct { + rec domain.ProjectRecord + ok bool + err error +} + +func (f fakeProjectStore) GetProject(context.Context, string) (domain.ProjectRecord, bool, error) { + return f.rec, f.ok, f.err +} + +func TestGitRepoResolver_ResolvesFromOrigin(t *testing.T) { + r := gitRepoResolver{ + store: fakeProjectStore{rec: domain.ProjectRecord{ID: "p1", Path: "/repo"}, ok: true}, + git: func(_ context.Context, repoPath string) (string, error) { + if repoPath != "/repo" { + t.Errorf("git called with %q, want /repo", repoPath) + } + return "git@github.com:octocat/hello.git\n", nil + }, + } + owner, repo, err := r.RepoIdent(context.Background(), "p1") + if err != nil { + t.Fatalf("RepoIdent: %v", err) + } + if owner != "octocat" || repo != "hello" { + t.Fatalf("got %s/%s, want octocat/hello", owner, repo) + } +} + +func TestGitRepoResolver_UnknownProject(t *testing.T) { + r := gitRepoResolver{store: fakeProjectStore{ok: false}} + if _, _, err := r.RepoIdent(context.Background(), "p1"); err == nil { + t.Fatal("expected error for unknown project") + } +} + +func TestGitRepoResolver_GitFailureSurfaces(t *testing.T) { + r := gitRepoResolver{ + store: fakeProjectStore{rec: domain.ProjectRecord{Path: "/repo"}, ok: true}, + git: func(context.Context, string) (string, error) { return "", errors.New("no origin") }, + } + if _, _, err := r.RepoIdent(context.Background(), "p1"); err == nil { + t.Fatal("expected error when git fails") + } +} diff --git a/backend/internal/observe/prpoller/prpoller.go b/backend/internal/observe/prpoller/prpoller.go new file mode 100644 index 00000000..b1e69498 --- /dev/null +++ b/backend/internal/observe/prpoller/prpoller.go @@ -0,0 +1,182 @@ +// Package prpoller implements the OBSERVE-layer polling timer that discovers +// and refreshes pull-request state for live sessions. +// +// Each tick enumerates non-terminated sessions, resolves each session's branch +// to an open PR (self-discovery via the SCM, keyed on the session's branch), +// observes that PR, and feeds the observation into the PR service — which +// persists it and drives lifecycle nudges. The poller reports facts only; it +// never writes session or PR rows directly. +package prpoller + +import ( + "context" + "log/slog" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// DefaultTickInterval is the cadence used when Config.Tick is zero. PR state +// changes far slower than runtime liveness, so this is coarser than the +// reaper's 5s probe and stays well inside GitHub's REST budget. +const DefaultTickInterval = 30 * time.Second + +type sessionSource interface { + ListAllSessions(ctx context.Context) ([]domain.SessionRecord, error) +} + +type prDiscoverer interface { + // FindPRForBranch resolves the open PR for owner/repo's branch. found=false + // with a nil error is the normal pre-PR state of a fresh session. + FindPRForBranch(ctx context.Context, owner, repo, branch string) (url string, found bool, err error) +} + +type prObserver interface { + Observe(ctx context.Context, prURL string) (ports.PRObservation, error) +} + +type observationSink interface { + ApplyObservation(ctx context.Context, id domain.SessionID, o ports.PRObservation) error +} + +// repoResolver maps a project to its github owner/repo (from the project's +// origin remote). The poller caches results per project within a tick. +type repoResolver interface { + RepoIdent(ctx context.Context, projectID domain.ProjectID) (owner, repo string, err error) +} + +// Config holds the tunable knobs for a Poller. Every field is optional; zero +// values fall back to safe defaults. +type Config struct { + // Tick is the interval between cycles. <=0 means DefaultTickInterval. + Tick time.Duration + // Logger receives operational diagnostics. A failed discovery/observe for + // one session is logged but never propagated — it must not kill the loop. + // nil means slog.Default. + Logger *slog.Logger +} + +// Poller is the PR polling timer. Construct it with New; start the background +// goroutine with Start, or drive a single cycle synchronously with Tick. +type Poller struct { + sessions sessionSource + discoverer prDiscoverer + observer prObserver + sink observationSink + repos repoResolver + tick time.Duration + logger *slog.Logger +} + +// New constructs a Poller. sessions supplies the rows to poll; discoverer finds +// a branch's open PR; observer fetches its state; sink persists+reacts; repos +// resolves a project's github owner/repo. +func New(sessions sessionSource, discoverer prDiscoverer, observer prObserver, sink observationSink, repos repoResolver, cfg Config) *Poller { + p := &Poller{ + sessions: sessions, + discoverer: discoverer, + observer: observer, + sink: sink, + repos: repos, + tick: cfg.Tick, + logger: cfg.Logger, + } + if p.tick <= 0 { + p.tick = DefaultTickInterval + } + if p.logger == nil { + p.logger = slog.Default() + } + return p +} + +// Start launches the background goroutine and returns a channel that closes +// once the loop has exited. The loop exits on ctx cancellation; wait on the +// channel after cancel to confirm a clean stop before tearing down deps. +func (p *Poller) Start(ctx context.Context) <-chan struct{} { + done := make(chan struct{}) + go p.loop(ctx, done) + return done +} + +func (p *Poller) loop(ctx context.Context, done chan<- struct{}) { + defer close(done) + t := time.NewTicker(p.tick) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if err := p.Tick(ctx); err != nil { + p.logger.Error("prpoller: tick failed", "err", err) + } + } + } +} + +// Tick runs one cycle: enumerate non-terminated sessions with a branch, resolve +// each to an open PR, observe it, and feed the observation to the sink. Only +// the session-listing failure is propagated (it short-circuits the cycle); +// per-session failures are logged so one bad session can't stall the rest. +func (p *Poller) Tick(ctx context.Context) error { + sessions, err := p.sessions.ListAllSessions(ctx) + if err != nil { + return err + } + // Cache owner/repo per project for the duration of the tick: many sessions + // share a project, and remote resolution may shell out to git. + idents := map[domain.ProjectID]repoIdent{} + for _, sess := range sessions { + if sess.IsTerminated || sess.Metadata.Branch == "" { + continue + } + p.pollOne(ctx, sess, idents) + } + return nil +} + +type repoIdent struct { + owner string + repo string + ok bool // resolution succeeded; a failed lookup is cached as !ok to avoid retry-storming within a tick +} + +func (p *Poller) pollOne(ctx context.Context, sess domain.SessionRecord, idents map[domain.ProjectID]repoIdent) { + ident, cached := idents[sess.ProjectID] + if !cached { + owner, repo, err := p.repos.RepoIdent(ctx, sess.ProjectID) + ident = repoIdent{owner: owner, repo: repo, ok: err == nil} + if err != nil { + p.logger.Debug("prpoller: could not resolve project repo, skipping", + "project", sess.ProjectID, "err", err) + } + idents[sess.ProjectID] = ident + } + if !ident.ok { + return + } + + prURL, found, err := p.discoverer.FindPRForBranch(ctx, ident.owner, ident.repo, sess.Metadata.Branch) + if err != nil { + p.logger.Debug("prpoller: PR discovery failed", + "session", sess.ID, "branch", sess.Metadata.Branch, "err", err) + return + } + if !found { + return + } + + obs, err := p.observer.Observe(ctx, prURL) + if err != nil { + p.logger.Debug("prpoller: PR observation failed", + "session", sess.ID, "pr", prURL, "err", err) + return + } + + if err := p.sink.ApplyObservation(ctx, sess.ID, obs); err != nil { + p.logger.Error("prpoller: ApplyObservation failed", + "session", sess.ID, "pr", prURL, "err", err) + } +} diff --git a/backend/internal/observe/prpoller/prpoller_test.go b/backend/internal/observe/prpoller/prpoller_test.go new file mode 100644 index 00000000..bfd34266 --- /dev/null +++ b/backend/internal/observe/prpoller/prpoller_test.go @@ -0,0 +1,211 @@ +package prpoller + +import ( + "context" + "errors" + "io" + "log/slog" + "sync" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +type fakeSessions struct { + recs []domain.SessionRecord + err error +} + +func (f fakeSessions) ListAllSessions(context.Context) ([]domain.SessionRecord, error) { + return f.recs, f.err +} + +type discoverCall struct{ owner, repo, branch string } + +type fakeDiscoverer struct { + mu sync.Mutex + calls []discoverCall + url string + found bool + err error +} + +func (f *fakeDiscoverer) FindPRForBranch(_ context.Context, owner, repo, branch string) (string, bool, error) { + f.mu.Lock() + f.calls = append(f.calls, discoverCall{owner, repo, branch}) + f.mu.Unlock() + return f.url, f.found, f.err +} + +type fakeObserver struct { + mu sync.Mutex + urls []string + obs ports.PRObservation + err error +} + +func (f *fakeObserver) Observe(_ context.Context, prURL string) (ports.PRObservation, error) { + f.mu.Lock() + f.urls = append(f.urls, prURL) + f.mu.Unlock() + o := f.obs + o.URL = prURL + return o, f.err +} + +type sinkCall struct { + id domain.SessionID + obs ports.PRObservation +} + +type fakeSink struct { + mu sync.Mutex + calls []sinkCall + err error +} + +func (f *fakeSink) ApplyObservation(_ context.Context, id domain.SessionID, o ports.PRObservation) error { + f.mu.Lock() + f.calls = append(f.calls, sinkCall{id, o}) + f.mu.Unlock() + return f.err +} + +type fakeRepos struct { + mu sync.Mutex + calls int + owner string + repo string + err error +} + +func (f *fakeRepos) RepoIdent(context.Context, domain.ProjectID) (string, string, error) { + f.mu.Lock() + f.calls++ + f.mu.Unlock() + return f.owner, f.repo, f.err +} + +func testLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } + +func sessionWithBranch(id domain.SessionID, project domain.ProjectID, branch string) domain.SessionRecord { + return domain.SessionRecord{ID: id, ProjectID: project, Metadata: domain.SessionMetadata{Branch: branch}} +} + +func TestTick_DiscoversObservesAndApplies(t *testing.T) { + sessions := fakeSessions{recs: []domain.SessionRecord{ + sessionWithBranch("s1", "p1", "feature-x"), + }} + disc := &fakeDiscoverer{url: "https://github.com/octocat/hello/pull/7", found: true} + obs := &fakeObserver{obs: ports.PRObservation{Fetched: true, Number: 7, CI: domain.CIFailing}} + sink := &fakeSink{} + repos := &fakeRepos{owner: "octocat", repo: "hello"} + + p := New(sessions, disc, obs, sink, repos, Config{Logger: testLogger()}) + if err := p.Tick(context.Background()); err != nil { + t.Fatalf("Tick: %v", err) + } + + if len(disc.calls) != 1 || disc.calls[0] != (discoverCall{"octocat", "hello", "feature-x"}) { + t.Fatalf("discover calls = %+v", disc.calls) + } + if len(obs.urls) != 1 || obs.urls[0] != "https://github.com/octocat/hello/pull/7" { + t.Fatalf("observe urls = %+v", obs.urls) + } + if len(sink.calls) != 1 || sink.calls[0].id != "s1" || sink.calls[0].obs.CI != domain.CIFailing { + t.Fatalf("sink calls = %+v", sink.calls) + } +} + +func TestTick_SkipsTerminatedAndBranchless(t *testing.T) { + term := sessionWithBranch("s1", "p1", "feature-x") + term.IsTerminated = true + noBranch := sessionWithBranch("s2", "p1", "") + sessions := fakeSessions{recs: []domain.SessionRecord{term, noBranch}} + disc := &fakeDiscoverer{found: true, url: "u"} + p := New(sessions, disc, &fakeObserver{}, &fakeSink{}, &fakeRepos{owner: "o", repo: "r"}, Config{Logger: testLogger()}) + + if err := p.Tick(context.Background()); err != nil { + t.Fatalf("Tick: %v", err) + } + if len(disc.calls) != 0 { + t.Fatalf("expected no discovery for terminated/branchless sessions, got %+v", disc.calls) + } +} + +func TestTick_NoOpenPRSkipsObserve(t *testing.T) { + sessions := fakeSessions{recs: []domain.SessionRecord{sessionWithBranch("s1", "p1", "b")}} + disc := &fakeDiscoverer{found: false} + obs := &fakeObserver{} + sink := &fakeSink{} + p := New(sessions, disc, obs, sink, &fakeRepos{owner: "o", repo: "r"}, Config{Logger: testLogger()}) + + if err := p.Tick(context.Background()); err != nil { + t.Fatalf("Tick: %v", err) + } + if len(obs.urls) != 0 { + t.Fatal("must not observe when no open PR") + } + if len(sink.calls) != 0 { + t.Fatal("must not apply when no open PR") + } +} + +func TestTick_RepoResolutionCachedPerProject(t *testing.T) { + sessions := fakeSessions{recs: []domain.SessionRecord{ + sessionWithBranch("s1", "p1", "b1"), + sessionWithBranch("s2", "p1", "b2"), + sessionWithBranch("s3", "p1", "b3"), + }} + repos := &fakeRepos{owner: "o", repo: "r"} + disc := &fakeDiscoverer{found: false} + p := New(sessions, disc, &fakeObserver{}, &fakeSink{}, repos, Config{Logger: testLogger()}) + + if err := p.Tick(context.Background()); err != nil { + t.Fatalf("Tick: %v", err) + } + if repos.calls != 1 { + t.Fatalf("RepoIdent calls = %d, want 1 (cached per project)", repos.calls) + } + if len(disc.calls) != 3 { + t.Fatalf("discover calls = %d, want 3", len(disc.calls)) + } +} + +func TestTick_RepoResolutionFailureSkipsSession(t *testing.T) { + sessions := fakeSessions{recs: []domain.SessionRecord{sessionWithBranch("s1", "p1", "b")}} + disc := &fakeDiscoverer{} + repos := &fakeRepos{err: errors.New("no origin remote")} + p := New(sessions, disc, &fakeObserver{}, &fakeSink{}, repos, Config{Logger: testLogger()}) + + if err := p.Tick(context.Background()); err != nil { + t.Fatalf("Tick: %v", err) + } + if len(disc.calls) != 0 { + t.Fatal("must not discover when repo resolution fails") + } +} + +func TestTick_ObserveFailureDoesNotApply(t *testing.T) { + sessions := fakeSessions{recs: []domain.SessionRecord{sessionWithBranch("s1", "p1", "b")}} + disc := &fakeDiscoverer{found: true, url: "u"} + obs := &fakeObserver{err: errors.New("rate limited")} + sink := &fakeSink{} + p := New(sessions, disc, obs, sink, &fakeRepos{owner: "o", repo: "r"}, Config{Logger: testLogger()}) + + if err := p.Tick(context.Background()); err != nil { + t.Fatalf("Tick: %v", err) + } + if len(sink.calls) != 0 { + t.Fatal("must not apply a failed observation") + } +} + +func TestTick_ListErrorPropagates(t *testing.T) { + sentinel := errors.New("db down") + p := New(fakeSessions{err: sentinel}, &fakeDiscoverer{}, &fakeObserver{}, &fakeSink{}, &fakeRepos{}, Config{Logger: testLogger()}) + if err := p.Tick(context.Background()); !errors.Is(err, sentinel) { + t.Fatalf("Tick err = %v, want sentinel", err) + } +} From 5037b3ccf17a2e70c7214fa4e749ddbed1afbe7d Mon Sep 17 00:00:00 2001 From: Pritom14 Date: Tue, 2 Jun 2026 22:59:16 +0530 Subject: [PATCH 2/2] test(backend): add end-to-end functional tests for the PR engine Drive the wired poller against a fake GitHub (real github.Provider for discovery and observation), a real sqlite store, a real Lifecycle Manager, and the real PR service. Asserts a CI-failing PR flows discovery -> observe -> persist -> derived session status -> lifecycle nudge, and that a branch with no open PR stays quiet. Co-Authored-By: Claude Opus 4.7 --- .../integration/prpoller_functional_test.go | 201 ++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 backend/internal/integration/prpoller_functional_test.go diff --git a/backend/internal/integration/prpoller_functional_test.go b/backend/internal/integration/prpoller_functional_test.go new file mode 100644 index 00000000..1de640f5 --- /dev/null +++ b/backend/internal/integration/prpoller_functional_test.go @@ -0,0 +1,201 @@ +package integration + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/scm/github" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/observe/prpoller" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// staticRepos resolves every project to one fixed github owner/repo, standing +// in for the daemon's git-remote resolver so the functional test does not need +// a repo on disk. Repo resolution itself is unit-tested in the daemon package. +type staticRepos struct{ owner, repo string } + +func (s staticRepos) RepoIdent(context.Context, domain.ProjectID) (string, string, error) { + return s.owner, s.repo, nil +} + +// fakeGitHub serves the exact REST + GraphQL routes the PR engine touches for a +// single CI-failing PR (octocat/hello#42 on branch feat/x), so the real +// github.Provider exercises its real network code against it. +func fakeGitHub(t *testing.T) *httptest.Server { + t.Helper() + mux := http.NewServeMux() + + // FindPRForBranch: list open PRs for the branch head. + mux.HandleFunc("/repos/octocat/hello/pulls", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode([]map[string]any{ + {"html_url": "https://github.com/octocat/hello/pull/42", "url": "https://api.github.com/repos/octocat/hello/pulls/42"}, + }) + }) + + // Observe: REST pull detail. + mux.HandleFunc("/repos/octocat/hello/pulls/42", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("ETag", `W/"v1"`) + _ = json.NewEncoder(w).Encode(map[string]any{ + "number": 42, "title": "Found a bug", "state": "open", + "draft": false, "merged": false, "merged_at": nil, + "html_url": "https://github.com/octocat/hello/pull/42", + "head": map[string]any{"ref": "feat/x", "sha": "deadbeef"}, + "base": map[string]any{"ref": "main"}, + "mergeable": true, + "mergeable_state": "blocked", + "merge_state_status": "BLOCKED", + }) + }) + + // Observe: GraphQL rollup with a FAILED check carrying a databaseId so the + // provider fetches the job log tail. + mux.HandleFunc("/graphql", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "data": map[string]any{"repository": map[string]any{"pullRequest": map[string]any{ + "number": 42, "url": "https://github.com/octocat/hello/pull/42", + "state": "OPEN", "isDraft": false, "merged": false, "closed": false, + "mergeable": "MERGEABLE", "mergeStateStatus": "BLOCKED", + "reviewDecision": "REVIEW_REQUIRED", "headRefOid": "deadbeef", + "commits": map[string]any{"nodes": []any{map[string]any{"commit": map[string]any{ + "oid": "deadbeef", + "statusCheckRollup": map[string]any{ + "state": "FAILURE", + "contexts": map[string]any{ + "nodes": []any{map[string]any{ + "__typename": "CheckRun", "name": "build", "status": "COMPLETED", + "conclusion": "FAILURE", + "detailsUrl": "https://github.com/octocat/hello/runs/9001", + "databaseId": float64(9001), + }}, + "pageInfo": map[string]any{"hasNextPage": false}, + }, + }, + }}}}, + "reviewThreads": map[string]any{"nodes": []any{}}, + }}}, + }) + }) + + // Observe: failing job log, for the nudge's LogTail. + mux.HandleFunc("/repos/octocat/hello/actions/jobs/9001/logs", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + _, _ = w.Write([]byte(strings.Repeat("setup\n", 5) + "FAILED: build broke\n")) + }) + + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + return srv +} + +// TestPRPoller_FunctionalEndToEnd drives the whole PR engine the daemon wires: +// a real github.Provider (discovery + observation) against a fake GitHub, a real +// sqlite store, a real Lifecycle Manager, and the real PR service. It spawns a +// session with a branch, runs one poller tick, and asserts the CI-failing PR +// flows discovery -> observe -> persist -> session status -> lifecycle nudge. +func TestPRPoller_FunctionalEndToEnd(t *testing.T) { + ctx := context.Background() + st := newStack(t) + gh := fakeGitHub(t) + + provider, err := github.NewProvider(github.ProviderOptions{ + Token: github.StaticTokenSource("tkn-test"), + HTTPClient: gh.Client(), + RESTBase: gh.URL, + GraphQLURL: gh.URL + "/graphql", + UserAgent: "ao-prpoller-functional-test", + }) + if err != nil { + t.Fatalf("NewProvider: %v", err) + } + + sess, err := st.sm.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindWorker, Branch: "feat/x", Prompt: "do it"}) + if err != nil { + t.Fatalf("Spawn: %v", err) + } + + poller := prpoller.New(st.store, provider, provider, st.prm, staticRepos{owner: "octocat", repo: "hello"}, prpoller.Config{}) + if err := poller.Tick(ctx); err != nil { + t.Fatalf("Tick: %v", err) + } + + // The PR row was persisted from the observation. + pr, ok, err := st.store.GetDisplayPRFactsForSession(ctx, sess.ID) + if err != nil || !ok { + t.Fatalf("GetDisplayPRFactsForSession: ok=%v err=%v", ok, err) + } + if pr.URL != "https://github.com/octocat/hello/pull/42" || pr.Number != 42 { + t.Fatalf("PR not persisted: %+v", pr) + } + if pr.CI != domain.CIFailing { + t.Fatalf("PR CI = %q, want failing", pr.CI) + } + + // The derived session status reflects the failing CI. + got, err := st.sm.Get(ctx, sess.ID) + if err != nil { + t.Fatalf("Get: %v", err) + } + if got.Status != domain.StatusCIFailed { + t.Fatalf("status = %q, want ci_failed", got.Status) + } + + // The lifecycle nudged the agent's pane via the messenger, carrying the log tail. + if len(st.msg.msgs) != 1 { + t.Fatalf("messenger got %d msgs, want 1: %v", len(st.msg.msgs), st.msg.msgs) + } + if !strings.Contains(st.msg.msgs[0], "CI is failing") || !strings.Contains(st.msg.msgs[0], "FAILED: build broke") { + t.Fatalf("nudge missing CI-failure content: %q", st.msg.msgs[0]) + } +} + +// TestPRPoller_NoOpenPRIsQuiet proves the common pre-PR state: a fresh session +// whose branch has no open PR yet produces no persisted PR and no nudge, and the +// tick still succeeds. +func TestPRPoller_NoOpenPRIsQuiet(t *testing.T) { + ctx := context.Background() + st := newStack(t) + + mux := http.NewServeMux() + mux.HandleFunc("/repos/octocat/hello/pulls", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte("[]")) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + provider, err := github.NewProvider(github.ProviderOptions{ + Token: github.StaticTokenSource("tkn-test"), + HTTPClient: srv.Client(), + RESTBase: srv.URL, + GraphQLURL: srv.URL + "/graphql", + UserAgent: "ao-prpoller-functional-test", + }) + if err != nil { + t.Fatalf("NewProvider: %v", err) + } + + sess, err := st.sm.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindWorker, Branch: "feat/x", Prompt: "do it"}) + if err != nil { + t.Fatalf("Spawn: %v", err) + } + + poller := prpoller.New(st.store, provider, provider, st.prm, staticRepos{owner: "octocat", repo: "hello"}, prpoller.Config{}) + if err := poller.Tick(ctx); err != nil { + t.Fatalf("Tick: %v", err) + } + + if _, ok, _ := st.store.GetDisplayPRFactsForSession(ctx, sess.ID); ok { + t.Fatalf("no PR should be persisted for a branch with no open PR") + } + if len(st.msg.msgs) != 0 { + t.Fatalf("no nudge expected, got %v", st.msg.msgs) + } +}