-
Notifications
You must be signed in to change notification settings - Fork 1
feat(observe): SCM poller — Observe → pr.Manager → lifecycle nudges #72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| package github | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "net/http" | ||
| "net/url" | ||
| "strconv" | ||
| "strings" | ||
| "time" | ||
| ) | ||
|
|
||
| // FindOpenPRForBranch returns the canonical github.com URL of the most | ||
| // recently updated open PR whose head ref is "{owner}:{branch}", or "" | ||
| // with a nil error when no open PR matches. | ||
| // | ||
| // The poller uses this for branch-based discovery: since the session | ||
| // record does not (yet) carry a stored PR URL, the only way to find | ||
| // "the PR for this session" is by the workspace branch. The endpoint | ||
| // hit is GET /repos/{owner}/{repo}/pulls?head={owner}:{branch}&state=open | ||
| // per the GitHub REST API. | ||
| // | ||
| // When multiple open PRs share the same head ref (rare but legal — | ||
| // e.g. forks that pushed to the same branch name), we pick the most | ||
| // recently updated one rather than failing closed. Failing closed | ||
| // would silently stop observing the PR every time a stale duplicate | ||
| // shows up. | ||
| func (p *Provider) FindOpenPRForBranch(ctx context.Context, owner, repo, branch string) (string, error) { | ||
| owner = strings.TrimSpace(owner) | ||
| repo = strings.TrimSpace(repo) | ||
| branch = strings.TrimSpace(branch) | ||
| if owner == "" || repo == "" || branch == "" { | ||
| return "", fmt.Errorf("github scm: FindOpenPRForBranch requires owner/repo/branch (got %q/%q/%q)", owner, repo, branch) | ||
| } | ||
|
|
||
| q := url.Values{} | ||
| q.Set("state", "open") | ||
| q.Set("head", owner+":"+branch) | ||
| q.Set("per_page", "100") | ||
|
|
||
| resp, err := p.client.doREST(ctx, http.MethodGet, repoPath(owner, repo, "pulls"), q, nil) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| if len(resp.Body) == 0 { | ||
| return "", nil | ||
| } | ||
| var list []listedPR | ||
| if err := json.Unmarshal(resp.Body, &list); err != nil { | ||
| return "", fmt.Errorf("github scm: decode pulls list: %w", err) | ||
| } | ||
| if len(list) == 0 { | ||
| return "", nil | ||
| } | ||
|
|
||
| best := -1 | ||
| var bestTime time.Time | ||
| for i, pr := range list { | ||
| if !strings.EqualFold(pr.State, "open") { | ||
| continue | ||
| } | ||
| t := parsePRTimestamp(pr.UpdatedAt) | ||
| if best < 0 || t.After(bestTime) { | ||
| best = i | ||
| bestTime = t | ||
| } | ||
| } | ||
| if best < 0 { | ||
| return "", nil | ||
| } | ||
| chosen := list[best] | ||
| if chosen.HTMLURL != "" { | ||
| return chosen.HTMLURL, nil | ||
| } | ||
| // Construct the canonical web URL from owner/repo/number when the | ||
| // API response omits html_url (some enterprise responses elide it). | ||
| return "https://github.com/" + owner + "/" + repo + "/pull/" + strconv.Itoa(chosen.Number), nil | ||
| } | ||
|
|
||
| type listedPR struct { | ||
| Number int `json:"number"` | ||
| State string `json:"state"` | ||
| HTMLURL string `json:"html_url"` | ||
| UpdatedAt string `json:"updated_at"` | ||
| } | ||
|
|
||
| func parsePRTimestamp(s string) time.Time { | ||
| t, err := time.Parse(time.RFC3339, s) | ||
| if err != nil { | ||
| return time.Time{} | ||
| } | ||
| return t | ||
| } | ||
131 changes: 131 additions & 0 deletions
131
backend/internal/adapters/scm/github/find_branch_pr_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| package github | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "errors" | ||
| "net/http" | ||
| "strings" | ||
| "testing" | ||
| ) | ||
|
|
||
| func TestFindOpenPRForBranchSingleMatch(t *testing.T) { | ||
| fake := newFakeGH(t) | ||
| p := newProviderForTest(t, fake) | ||
| fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) { | ||
| if got := r.URL.Query().Get("head"); got != "acme:feat/x" { | ||
| t.Errorf("head query = %q, want acme:feat/x", got) | ||
| } | ||
| if got := r.URL.Query().Get("state"); got != "open" { | ||
| t.Errorf("state query = %q, want open", got) | ||
| } | ||
| w.Header().Set("Content-Type", "application/json") | ||
| _ = json.NewEncoder(w).Encode([]map[string]any{ | ||
| {"number": 7, "state": "open", "html_url": "https://github.com/acme/repo/pull/7", "updated_at": "2026-05-01T10:00:00Z"}, | ||
| }) | ||
| }) | ||
|
|
||
| url, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x") | ||
| if err != nil { | ||
| t.Fatalf("FindOpenPRForBranch: %v", err) | ||
| } | ||
| if url != "https://github.com/acme/repo/pull/7" { | ||
| t.Fatalf("url = %q", url) | ||
| } | ||
| } | ||
|
|
||
| func TestFindOpenPRForBranchNoMatch(t *testing.T) { | ||
| fake := newFakeGH(t) | ||
| p := newProviderForTest(t, fake) | ||
| fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) { | ||
| w.Header().Set("Content-Type", "application/json") | ||
| _, _ = w.Write([]byte("[]")) | ||
| }) | ||
| url, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x") | ||
| if err != nil { | ||
| t.Fatalf("FindOpenPRForBranch: %v", err) | ||
| } | ||
| if url != "" { | ||
| t.Fatalf("url = %q, want empty", url) | ||
| } | ||
| } | ||
|
|
||
| func TestFindOpenPRForBranchMultiplePicksMostRecent(t *testing.T) { | ||
| fake := newFakeGH(t) | ||
| p := newProviderForTest(t, fake) | ||
| fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) { | ||
| w.Header().Set("Content-Type", "application/json") | ||
| _ = json.NewEncoder(w).Encode([]map[string]any{ | ||
| {"number": 1, "state": "open", "html_url": "https://github.com/acme/repo/pull/1", "updated_at": "2026-01-01T00:00:00Z"}, | ||
| {"number": 9, "state": "open", "html_url": "https://github.com/acme/repo/pull/9", "updated_at": "2026-05-01T00:00:00Z"}, | ||
| {"number": 4, "state": "open", "html_url": "https://github.com/acme/repo/pull/4", "updated_at": "2026-03-01T00:00:00Z"}, | ||
| }) | ||
| }) | ||
| url, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x") | ||
| if err != nil { | ||
| t.Fatalf("FindOpenPRForBranch: %v", err) | ||
| } | ||
| if url != "https://github.com/acme/repo/pull/9" { | ||
| t.Fatalf("url = %q, want pull/9", url) | ||
| } | ||
| } | ||
|
|
||
| func TestFindOpenPRForBranchEmptyInputsError(t *testing.T) { | ||
| fake := newFakeGH(t) | ||
| p := newProviderForTest(t, fake) | ||
| for _, tc := range []struct{ owner, repo, branch string }{ | ||
| {"", "repo", "b"}, | ||
| {"o", "", "b"}, | ||
| {"o", "r", ""}, | ||
| } { | ||
| _, err := p.FindOpenPRForBranch(ctx(), tc.owner, tc.repo, tc.branch) | ||
| if err == nil { | ||
| t.Errorf("expected error for empty input %+v", tc) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func TestFindOpenPRForBranchRateLimited(t *testing.T) { | ||
| fake := newFakeGH(t) | ||
| p := newProviderForTest(t, fake) | ||
| fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) { | ||
| w.Header().Set("X-RateLimit-Remaining", "0") | ||
| w.Header().Set("X-RateLimit-Reset", "1700000000") | ||
| w.WriteHeader(http.StatusForbidden) | ||
| _, _ = w.Write([]byte(`{"message":"API rate limit exceeded"}`)) | ||
| }) | ||
| _, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x") | ||
| if !errors.Is(err, ErrRateLimited) { | ||
| t.Fatalf("err = %v, want ErrRateLimited", err) | ||
| } | ||
| } | ||
|
|
||
| func TestFindOpenPRForBranchAuthFailed(t *testing.T) { | ||
| fake := newFakeGH(t) | ||
| p := newProviderForTest(t, fake) | ||
| fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) { | ||
| w.WriteHeader(http.StatusUnauthorized) | ||
| _, _ = w.Write([]byte(`{"message":"Bad credentials"}`)) | ||
| }) | ||
| _, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x") | ||
| if !errors.Is(err, ErrAuthFailed) { | ||
| t.Fatalf("err = %v, want ErrAuthFailed", err) | ||
| } | ||
| } | ||
|
|
||
| func TestFindOpenPRForBranchSynthesizesURLWhenHTMLEmpty(t *testing.T) { | ||
| fake := newFakeGH(t) | ||
| p := newProviderForTest(t, fake) | ||
| fake.on(http.MethodGet, "/repos/acme/repo/pulls", func(w http.ResponseWriter, r *http.Request) { | ||
| w.Header().Set("Content-Type", "application/json") | ||
| _ = json.NewEncoder(w).Encode([]map[string]any{ | ||
| {"number": 42, "state": "open", "updated_at": "2026-05-01T10:00:00Z"}, | ||
| }) | ||
| }) | ||
| url, err := p.FindOpenPRForBranch(ctx(), "acme", "repo", "feat/x") | ||
| if err != nil { | ||
| t.Fatalf("err = %v", err) | ||
| } | ||
| if !strings.HasSuffix(url, "/acme/repo/pull/42") { | ||
| t.Fatalf("url = %q, want suffix /acme/repo/pull/42", url) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| package daemon | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "log/slog" | ||
|
|
||
| scmgithub "github.com/aoagents/agent-orchestrator/backend/internal/adapters/scm/github" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/observe/scm" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/pr" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/project" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" | ||
| ) | ||
|
|
||
| // scmStack owns the SCM observation loop: a GitHub Provider, a pr.Manager | ||
| // that writes PR rows and forwards observations to lifecycle for nudges, | ||
| // and the polling goroutine that drives both. A nil-token environment | ||
| // degrades gracefully — the daemon still runs locally without SCM | ||
| // observation; PR-driven nudges (CI-failure log tail, review feedback, | ||
| // merge-conflict rebase) will not fire until a token is supplied. | ||
| type scmStack struct { | ||
| pollerDone <-chan struct{} | ||
| } | ||
|
|
||
| // startSCM constructs and starts the SCM observation stack. The Provider | ||
| // reads its token from AO_GITHUB_TOKEN (preferred) or GITHUB_TOKEN, both | ||
| // via os.Getenv. Without a token, the poller is not started and a no-op | ||
| // done channel is returned — Stop is a free call in that case. | ||
| func startSCM(ctx context.Context, store *sqlite.Store, projects project.Manager, lcm *lifecycle.Manager, log *slog.Logger) *scmStack { | ||
| tokenSource := scmgithub.EnvTokenSource{EnvVars: []string{"AO_GITHUB_TOKEN", "GITHUB_TOKEN"}} | ||
| provider, err := scmgithub.NewProvider(scmgithub.ProviderOptions{Token: tokenSource}) | ||
| if err != nil { | ||
| if errors.Is(err, scmgithub.ErrNoToken) { | ||
| log.Info("scm poller: no GITHUB_TOKEN configured, SCM observation disabled") | ||
| } else { | ||
| log.Warn("scm poller: provider construction failed, SCM observation disabled", "err", err) | ||
| } | ||
| return &scmStack{pollerDone: closedDone()} | ||
| } | ||
| prMgr := pr.New(pr.Deps{Writer: store, Lifecycle: lcm}) | ||
| poller := scm.New(scm.Deps{ | ||
| Provider: provider, | ||
| Branches: provider, | ||
| Sessions: store, | ||
| Projects: projects, | ||
| PR: prMgr, | ||
| Logger: log, | ||
| }) | ||
| return &scmStack{pollerDone: poller.Start(ctx)} | ||
| } | ||
|
|
||
| // Stop waits for the poller goroutine to exit. The caller must cancel the | ||
| // ctx passed to startSCM before calling Stop. | ||
| func (s *scmStack) Stop() { <-s.pollerDone } | ||
|
|
||
| func closedDone() <-chan struct{} { | ||
| ch := make(chan struct{}) | ||
| close(ch) | ||
| return ch | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/but the comment above it explicitly says this path fires for "some enterprise responses" that omithtml_url. On a GitHub Enterprise instance the synthesised URL would carry the wrong host — causing the PR row to be stored with agithub.comURL and any subsequentProvider.Observecall on that URL to either hit the wrong host or be rejected byparsePRURL. TheProvideralready holds its configured base URL (e.g.https://github.mycompany.com) so the fix requires plumbing it through, or at minimum synthesising from the REST base host rather than the hardcoded public domain.