Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions backend/internal/adapters/scm/github/find_branch_pr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package github

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)

// FindOpenPRForBranch returns the canonical github.com URL of the most
// recently updated open PR whose head ref is "{owner}:{branch}", or ""
// with a nil error when no open PR matches.
//
// The poller uses this for branch-based discovery: since the session
// record does not (yet) carry a stored PR URL, the only way to find
// "the PR for this session" is by the workspace branch. The endpoint
// hit is GET /repos/{owner}/{repo}/pulls?head={owner}:{branch}&state=open
// per the GitHub REST API.
//
// When multiple open PRs share the same head ref (rare but legal —
// e.g. forks that pushed to the same branch name), we pick the most
// recently updated one rather than failing closed. Failing closed
// would silently stop observing the PR every time a stale duplicate
// shows up.
func (p *Provider) FindOpenPRForBranch(ctx context.Context, owner, repo, branch string) (string, error) {
owner = strings.TrimSpace(owner)
repo = strings.TrimSpace(repo)
branch = strings.TrimSpace(branch)
if owner == "" || repo == "" || branch == "" {
return "", fmt.Errorf("github scm: FindOpenPRForBranch requires owner/repo/branch (got %q/%q/%q)", owner, repo, branch)
}

q := url.Values{}
q.Set("state", "open")
q.Set("head", owner+":"+branch)
q.Set("per_page", "100")

resp, err := p.client.doREST(ctx, http.MethodGet, repoPath(owner, repo, "pulls"), q, nil)
if err != nil {
return "", err
}
if len(resp.Body) == 0 {
return "", nil
}
var list []listedPR
if err := json.Unmarshal(resp.Body, &list); err != nil {
return "", fmt.Errorf("github scm: decode pulls list: %w", err)
}
if len(list) == 0 {
return "", nil
}

best := -1
var bestTime time.Time
for i, pr := range list {
if !strings.EqualFold(pr.State, "open") {
continue
}
t := parsePRTimestamp(pr.UpdatedAt)
if best < 0 || t.After(bestTime) {
best = i
bestTime = t
}
}
if best < 0 {
return "", nil
}
chosen := list[best]
if chosen.HTMLURL != "" {
return chosen.HTMLURL, nil
}
// Construct the canonical web URL from owner/repo/number when the
// API response omits html_url (some enterprise responses elide it).
return "https://github.com/" + owner + "/" + repo + "/pull/" + strconv.Itoa(chosen.Number), nil
Comment on lines +76 to +78
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The fallback URL synthesis hardcodes https://github.com/ but the comment above it explicitly says this path fires for "some enterprise responses" that omit html_url. On a GitHub Enterprise instance the synthesised URL would carry the wrong host — causing the PR row to be stored with a github.com URL and any subsequent Provider.Observe call on that URL to either hit the wrong host or be rejected by parsePRURL. The Provider already holds its configured base URL (e.g. https://github.mycompany.com) so the fix requires plumbing it through, or at minimum synthesising from the REST base host rather than the hardcoded public domain.

Suggested change
// Construct the canonical web URL from owner/repo/number when the
// API response omits html_url (some enterprise responses elide it).
return "https://github.com/" + owner + "/" + repo + "/pull/" + strconv.Itoa(chosen.Number), nil
// Construct the canonical web URL from owner/repo/number when the
// API response omits html_url (some enterprise responses elide it).
// TODO: plumb the Provider's web base URL here so GHE instances get
// the correct host rather than the hardcoded github.com.
return "https://github.com/" + owner + "/" + repo + "/pull/" + strconv.Itoa(chosen.Number), nil

}

type listedPR struct {
Number int `json:"number"`
State string `json:"state"`
HTMLURL string `json:"html_url"`
UpdatedAt string `json:"updated_at"`
}

func parsePRTimestamp(s string) time.Time {
t, err := time.Parse(time.RFC3339, s)
if err != nil {
return time.Time{}
}
return t
}
131 changes: 131 additions & 0 deletions backend/internal/adapters/scm/github/find_branch_pr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package github

import (
"encoding/json"
"errors"
"net/http"
"strings"
"testing"
)

func TestFindOpenPRForBranchSingleMatch(t *testing.T) {
fake := newFakeGH(t)
p := newProviderForTest(t, fake)
fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) {
if got := r.URL.Query().Get("head"); got != "acme:feat/x" {
t.Errorf("head query = %q, want acme:feat/x", got)
}
if got := r.URL.Query().Get("state"); got != "open" {
t.Errorf("state query = %q, want open", got)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode([]map[string]any{
{"number": 7, "state": "open", "html_url": "https://github.com/acme/repo/pull/7", "updated_at": "2026-05-01T10:00:00Z"},
})
})

url, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x")
if err != nil {
t.Fatalf("FindOpenPRForBranch: %v", err)
}
if url != "https://github.com/acme/repo/pull/7" {
t.Fatalf("url = %q", url)
}
}

func TestFindOpenPRForBranchNoMatch(t *testing.T) {
fake := newFakeGH(t)
p := newProviderForTest(t, fake)
fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte("[]"))
})
url, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x")
if err != nil {
t.Fatalf("FindOpenPRForBranch: %v", err)
}
if url != "" {
t.Fatalf("url = %q, want empty", url)
}
}

func TestFindOpenPRForBranchMultiplePicksMostRecent(t *testing.T) {
fake := newFakeGH(t)
p := newProviderForTest(t, fake)
fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode([]map[string]any{
{"number": 1, "state": "open", "html_url": "https://github.com/acme/repo/pull/1", "updated_at": "2026-01-01T00:00:00Z"},
{"number": 9, "state": "open", "html_url": "https://github.com/acme/repo/pull/9", "updated_at": "2026-05-01T00:00:00Z"},
{"number": 4, "state": "open", "html_url": "https://github.com/acme/repo/pull/4", "updated_at": "2026-03-01T00:00:00Z"},
})
})
url, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x")
if err != nil {
t.Fatalf("FindOpenPRForBranch: %v", err)
}
if url != "https://github.com/acme/repo/pull/9" {
t.Fatalf("url = %q, want pull/9", url)
}
}

func TestFindOpenPRForBranchEmptyInputsError(t *testing.T) {
fake := newFakeGH(t)
p := newProviderForTest(t, fake)
for _, tc := range []struct{ owner, repo, branch string }{
{"", "repo", "b"},
{"o", "", "b"},
{"o", "r", ""},
} {
_, err := p.FindOpenPRForBranch(ctx(), tc.owner, tc.repo, tc.branch)
if err == nil {
t.Errorf("expected error for empty input %+v", tc)
}
}
}

func TestFindOpenPRForBranchRateLimited(t *testing.T) {
fake := newFakeGH(t)
p := newProviderForTest(t, fake)
fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-RateLimit-Remaining", "0")
w.Header().Set("X-RateLimit-Reset", "1700000000")
w.WriteHeader(http.StatusForbidden)
_, _ = w.Write([]byte(`{"message":"API rate limit exceeded"}`))
})
_, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x")
if !errors.Is(err, ErrRateLimited) {
t.Fatalf("err = %v, want ErrRateLimited", err)
}
}

func TestFindOpenPRForBranchAuthFailed(t *testing.T) {
fake := newFakeGH(t)
p := newProviderForTest(t, fake)
fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte(`{"message":"Bad credentials"}`))
})
_, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x")
if !errors.Is(err, ErrAuthFailed) {
t.Fatalf("err = %v, want ErrAuthFailed", err)
}
}

func TestFindOpenPRForBranchSynthesizesURLWhenHTMLEmpty(t *testing.T) {
fake := newFakeGH(t)
p := newProviderForTest(t, fake)
fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode([]map[string]any{
{"number": 42, "state": "open", "updated_at": "2026-05-01T10:00:00Z"},
})
})
url, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x")
if err != nil {
t.Fatalf("err = %v", err)
}
if !strings.HasSuffix(url, "/acme/repo/pull/42") {
t.Fatalf("url = %q, want suffix /acme/repo/pull/42", url)
}
}
7 changes: 7 additions & 0 deletions backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,20 @@ func Run() error {
}
_ = ss // sm: HTTP routes land in a follow-up PR (γ)

// SCM observation: polling Provider -> pr.Manager -> lifecycle nudges.
// Constructed after lifecycle so the PR Manager can forward observations
// to ApplyPRObservation; runs alongside the reaper as a sibling background
// loop. Missing GITHUB_TOKEN degrades gracefully (loop is not started).
scmStk := startSCM(ctx, store, projects, lcStack.lcm, log)

runErr := srv.Run(ctx)

// Shut the background goroutines down in order: cancel the context FIRST so
// their loops exit, then wait for them to drain. Doing this explicitly (not
// via defer) avoids the LIFO trap where a Stop() that blocks on ctx-cancel
// runs before the cancel — which would hang any non-signal exit path.
stop()
scmStk.Stop()
lcStack.Stop()
if err := cdcPipe.Stop(); err != nil {
log.Error("cdc pipeline shutdown", "err", err)
Expand Down
61 changes: 61 additions & 0 deletions backend/internal/daemon/scm_wiring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package daemon

import (
"context"
"errors"
"log/slog"

scmgithub "github.com/aoagents/agent-orchestrator/backend/internal/adapters/scm/github"
"github.com/aoagents/agent-orchestrator/backend/internal/lifecycle"
"github.com/aoagents/agent-orchestrator/backend/internal/observe/scm"
"github.com/aoagents/agent-orchestrator/backend/internal/pr"
"github.com/aoagents/agent-orchestrator/backend/internal/project"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
)

// scmStack owns the SCM observation loop: a GitHub Provider, a pr.Manager
// that writes PR rows and forwards observations to lifecycle for nudges,
// and the polling goroutine that drives both. A nil-token environment
// degrades gracefully — the daemon still runs locally without SCM
// observation; PR-driven nudges (CI-failure log tail, review feedback,
// merge-conflict rebase) will not fire until a token is supplied.
type scmStack struct {
pollerDone <-chan struct{}
}

// startSCM constructs and starts the SCM observation stack. The Provider
// reads its token from AO_GITHUB_TOKEN (preferred) or GITHUB_TOKEN, both
// via os.Getenv. Without a token, the poller is not started and a no-op
// done channel is returned — Stop is a free call in that case.
func startSCM(ctx context.Context, store *sqlite.Store, projects project.Manager, lcm *lifecycle.Manager, log *slog.Logger) *scmStack {
tokenSource := scmgithub.EnvTokenSource{EnvVars: []string{"AO_GITHUB_TOKEN", "GITHUB_TOKEN"}}
provider, err := scmgithub.NewProvider(scmgithub.ProviderOptions{Token: tokenSource})
if err != nil {
if errors.Is(err, scmgithub.ErrNoToken) {
log.Info("scm poller: no GITHUB_TOKEN configured, SCM observation disabled")
} else {
log.Warn("scm poller: provider construction failed, SCM observation disabled", "err", err)
}
return &scmStack{pollerDone: closedDone()}
}
prMgr := pr.New(pr.Deps{Writer: store, Lifecycle: lcm})
poller := scm.New(scm.Deps{
Provider: provider,
Branches: provider,
Sessions: store,
Projects: projects,
PR: prMgr,
Logger: log,
})
return &scmStack{pollerDone: poller.Start(ctx)}
}

// Stop waits for the poller goroutine to exit. The caller must cancel the
// ctx passed to startSCM before calling Stop.
func (s *scmStack) Stop() { <-s.pollerDone }

func closedDone() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}
Loading
Loading