diff --git a/doc/rfc/index.md b/doc/rfc/index.md index aa95dc19..6654b39b 100644 --- a/doc/rfc/index.md +++ b/doc/rfc/index.md @@ -17,3 +17,7 @@ Design documents and technical proposals, grouped by scope. Shared/cross-cutting ## Stovepipe - [Stovepipe Workflow](stovepipe/workflow.md) - Post-merge trunk-validation pipeline: ingest trunk push events (webhook + fallback poll), batch since last green, build to validate, record per-commit health, bisect to the offending commit, hand off to a remediation extension + +## Runway + +- [Runway Workflow](runway/workflow.md) - Landing service: merge-conflict checking and merging on behalf of SubmitQueue diff --git a/doc/rfc/runway/workflow.md b/doc/rfc/runway/workflow.md new file mode 100644 index 00000000..01357eb8 --- /dev/null +++ b/doc/rfc/runway/workflow.md @@ -0,0 +1,74 @@ +# Runway Workflow + +Runway is the landing service: it owns VCS operations — mergeability checking and landing — on behalf of SubmitQueue. The orchestrator subscribes to two inbound topics (`merge-conflict-checker`, `merger`) and publishes results to two outbound topics (`merge-conflict-checker-signal`, `merger-signal`). It is a consumer-only service with no gateway; work arrives via topic queues and results leave via topic queues. + +## Merge-conflict check and merge + +The two queues operate at different granularities: + +- **merge-conflict-check** is request-level. A merge request carries an ordered sequence of steps (changes + merge strategy). Runway performs a read-only trial merge and publishes per-step mergeability results back. + +- **merge** is batch-level. A merge request carries the same payload but Runway commits the result and reports the revisions it produced (per-step output IDs). + +These are independent input-output flows. A merge-conflict check can run without a merge ever running, and a merge does not depend on a prior check. + +## Branch serialization + +The partition key `repo/target` on both inbound topics serializes all VCS operations for a given branch. The message queue delivers messages with the same partition key to the same consumer in order, so at most one merge-conflict check or merge operation is in flight for any given branch at any time. + +The outbound topics partition by SubmitQueue queue name, matching SubmitQueue's fan-out model where state updates for the same queue are serialized. + +## Workflow + +``` + ┌─────────────────────────────────────────────────────┐ + │ submitqueue orchestrator │ + └──────────┬───────────────────────────┬──────────────┘ + │ │ + MergeRequest (dry run) MergeRequest (commit) + │ │ + ▼ ▼ + [merge-conflict-checker] [merger] + │ │ + merge-conflict-check ctrl merge ctrl + (read-only) (apply + commit) + │ │ + MergeResult MergeResult + │ │ + ▼ ▼ + [merge-conflict-checker-signal] [merger-signal] + │ │ + ▼ ▼ + ┌──────────┬───────────────────────────┬──────────────┐ + │ merge-conflict-check- merge-signal ctrl │ + │ signal ctrl (update batch state, │ + │ (update request fan out to conclude) │ + │ mergeability) │ + │ submitqueue orchestrator │ + └─────────────────────────────────────────────────────┘ +``` + +## Per-controller summary + +| Controller | In | Out | One-line role | +|---|---|---|---| +| **merge-conflict-check** | MergeRequest | MergeResult -> merge-conflict-checker-signal | Dry-run merge: check mergeability of ordered steps against the target branch (read-only) | +| **merge** | MergeRequest | MergeResult -> merger-signal | Apply, commit, and report per-step output IDs | + +The merge-conflict-check controller always publishes a result — even when all steps are mergeable — so SubmitQueue receives a definitive answer. On infrastructure error it nacks for retry. + +The merge controller publishes a conflict result (and acks) when the merge detects a conflict; SubmitQueue handles rebatching. On infrastructure error it nacks for retry. On success it publishes per-step outcomes (output IDs of the revisions produced) so SubmitQueue can update its request state. + +## Idempotency + +Runway has no persistent state — no request store, no job store, no database. Idempotency is achieved through the VCS contract: merge detects already-pushed changes (revisions reachable from HEAD) and treats them as already-landed. Merge-conflict check is read-only and naturally idempotent. + +## Ownership by service + +### Orchestrator + +The orchestrator is the only service. It subscribes to two inbound topics (`merge-conflict-checker`, `merger`), performs VCS operations through a pluggable extension, and publishes results to two outbound topics (`merge-conflict-checker-signal`, `merger-signal`). It owns no persistent data. + +### Shared: the messaging queue + +Runway communicates with SubmitQueue only through the messaging queue. The inbound topics are owned by runway; the outbound topics are owned by SubmitQueue. diff --git a/runway/extension/BUILD.bazel b/runway/extension/BUILD.bazel new file mode 100644 index 00000000..3832145f --- /dev/null +++ b/runway/extension/BUILD.bazel @@ -0,0 +1,8 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "extension", + srcs = ["extension.go"], + importpath = "github.com/uber/submitqueue/runway/extension", + visibility = ["//visibility:public"], +) diff --git a/runway/extension/extension.go b/runway/extension/extension.go new file mode 100644 index 00000000..310bab4e --- /dev/null +++ b/runway/extension/extension.go @@ -0,0 +1,16 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package extension holds Runway-specific extension implementations. +package extension diff --git a/runway/extension/vcs/BUILD.bazel b/runway/extension/vcs/BUILD.bazel new file mode 100644 index 00000000..e8e37e16 --- /dev/null +++ b/runway/extension/vcs/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "vcs", + srcs = ["vcs.go"], + importpath = "github.com/uber/submitqueue/runway/extension/vcs", + visibility = ["//visibility:public"], + deps = ["//runway/entity"], +) diff --git a/runway/extension/vcs/mock/BUILD.bazel b/runway/extension/vcs/mock/BUILD.bazel new file mode 100644 index 00000000..e04292b8 --- /dev/null +++ b/runway/extension/vcs/mock/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["vcs_mock.go"], + importpath = "github.com/uber/submitqueue/runway/extension/vcs/mock", + visibility = ["//visibility:public"], + deps = [ + "//runway/entity", + "//runway/extension/vcs", + "@org_uber_go_mock//gomock", + ], +) diff --git a/runway/extension/vcs/mock/vcs_mock.go b/runway/extension/vcs/mock/vcs_mock.go new file mode 100644 index 00000000..c4e33cc5 --- /dev/null +++ b/runway/extension/vcs/mock/vcs_mock.go @@ -0,0 +1,112 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: vcs.go +// +// Generated by this command: +// +// mockgen -source=vcs.go -destination=mock/vcs_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/runway/entity" + vcs "github.com/uber/submitqueue/runway/extension/vcs" + gomock "go.uber.org/mock/gomock" +) + +// MockVCS is a mock of VCS interface. +type MockVCS struct { + ctrl *gomock.Controller + recorder *MockVCSMockRecorder + isgomock struct{} +} + +// MockVCSMockRecorder is the mock recorder for MockVCS. +type MockVCSMockRecorder struct { + mock *MockVCS +} + +// NewMockVCS creates a new mock instance. +func NewMockVCS(ctrl *gomock.Controller) *MockVCS { + mock := &MockVCS{ctrl: ctrl} + mock.recorder = &MockVCSMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockVCS) EXPECT() *MockVCSMockRecorder { + return m.recorder +} + +// CheckMergeability mocks base method. +func (m *MockVCS) CheckMergeability(ctx context.Context, req entity.MergeRequest) (entity.MergeResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckMergeability", ctx, req) + ret0, _ := ret[0].(entity.MergeResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CheckMergeability indicates an expected call of CheckMergeability. +func (mr *MockVCSMockRecorder) CheckMergeability(ctx, req any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckMergeability", reflect.TypeOf((*MockVCS)(nil).CheckMergeability), ctx, req) +} + +// Land mocks base method. +func (m *MockVCS) Land(ctx context.Context, req entity.MergeRequest) (entity.MergeResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Land", ctx, req) + ret0, _ := ret[0].(entity.MergeResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Land indicates an expected call of Land. +func (mr *MockVCSMockRecorder) Land(ctx, req any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Land", reflect.TypeOf((*MockVCS)(nil).Land), ctx, req) +} + +// MockFactory is a mock of Factory interface. +type MockFactory struct { + ctrl *gomock.Controller + recorder *MockFactoryMockRecorder + isgomock struct{} +} + +// MockFactoryMockRecorder is the mock recorder for MockFactory. +type MockFactoryMockRecorder struct { + mock *MockFactory +} + +// NewMockFactory creates a new mock instance. +func NewMockFactory(ctrl *gomock.Controller) *MockFactory { + mock := &MockFactory{ctrl: ctrl} + mock.recorder = &MockFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { + return m.recorder +} + +// For mocks base method. +func (m *MockFactory) For(cfg vcs.Config) (vcs.VCS, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "For", cfg) + ret0, _ := ret[0].(vcs.VCS) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// For indicates an expected call of For. +func (mr *MockFactoryMockRecorder) For(cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockFactory)(nil).For), cfg) +} diff --git a/runway/extension/vcs/noop/BUILD.bazel b/runway/extension/vcs/noop/BUILD.bazel new file mode 100644 index 00000000..b9755204 --- /dev/null +++ b/runway/extension/vcs/noop/BUILD.bazel @@ -0,0 +1,25 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "noop", + srcs = ["noop.go"], + importpath = "github.com/uber/submitqueue/runway/extension/vcs/noop", + visibility = ["//visibility:public"], + deps = [ + "//runway/entity", + "//runway/extension/vcs", + ], +) + +go_test( + name = "noop_test", + srcs = ["noop_test.go"], + embed = [":noop"], + deps = [ + "//platform/base/change", + "//platform/base/mergestrategy", + "//runway/entity", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/runway/extension/vcs/noop/noop.go b/runway/extension/vcs/noop/noop.go new file mode 100644 index 00000000..cab7dd9b --- /dev/null +++ b/runway/extension/vcs/noop/noop.go @@ -0,0 +1,65 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package noop provides a no-op VCS implementation for local development and +// testing. CheckMergeability always reports success; Land produces synthetic +// output IDs from an atomic counter. +package noop + +import ( + "context" + "fmt" + "sync/atomic" + + "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/runway/extension/vcs" +) + +var _ vcs.VCS = (*VCS)(nil) + +// VCS is a no-op implementation that always succeeds. +type VCS struct { + seq atomic.Uint64 +} + +// New returns a new no-op VCS instance. +func New() *VCS { return &VCS{} } + +func (v *VCS) CheckMergeability(_ context.Context, req entity.MergeRequest) (entity.MergeResult, error) { + steps := make([]entity.StepResult, len(req.Steps)) + for i, s := range req.Steps { + steps[i] = entity.StepResult{StepID: s.StepID} + } + return entity.MergeResult{ + ID: req.ID, + Success: true, + Steps: steps, + }, nil +} + +func (v *VCS) Land(_ context.Context, req entity.MergeRequest) (entity.MergeResult, error) { + steps := make([]entity.StepResult, len(req.Steps)) + for i, s := range req.Steps { + n := v.seq.Add(1) + steps[i] = entity.StepResult{ + StepID: s.StepID, + OutputIDs: []string{fmt.Sprintf("%040x", n)}, + } + } + return entity.MergeResult{ + ID: req.ID, + Success: true, + Steps: steps, + }, nil +} diff --git a/runway/extension/vcs/noop/noop_test.go b/runway/extension/vcs/noop/noop_test.go new file mode 100644 index 00000000..0137d82e --- /dev/null +++ b/runway/extension/vcs/noop/noop_test.go @@ -0,0 +1,91 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package noop + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/platform/base/change" + "github.com/uber/submitqueue/platform/base/mergestrategy" + "github.com/uber/submitqueue/runway/entity" +) + +func testRequest() entity.MergeRequest { + return entity.MergeRequest{ + ID: "queue-a/42", + QueueName: "queue-a", + Steps: []entity.MergeStep{ + { + StepID: "queue-a/1", + Changes: []change.Change{{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}}, + Strategy: mergestrategy.MergeStrategyRebase, + }, + { + StepID: "queue-a/2", + Changes: []change.Change{{URIs: []string{"github://uber/repo/pull/2/89abcdef0123456789abcdef0123456789abcdef"}}}, + Strategy: mergestrategy.MergeStrategyMerge, + }, + }, + } +} + +func TestCheckMergeability(t *testing.T) { + v := New() + req := testRequest() + + res, err := v.CheckMergeability(context.Background(), req) + require.NoError(t, err) + + assert.Equal(t, req.ID, res.ID) + assert.True(t, res.Success) + require.Len(t, res.Steps, 2) + assert.Equal(t, "queue-a/1", res.Steps[0].StepID) + assert.Empty(t, res.Steps[0].OutputIDs) + assert.Equal(t, "queue-a/2", res.Steps[1].StepID) + assert.Empty(t, res.Steps[1].OutputIDs) +} + +func TestLand(t *testing.T) { + v := New() + req := testRequest() + + res, err := v.Land(context.Background(), req) + require.NoError(t, err) + + assert.Equal(t, req.ID, res.ID) + assert.True(t, res.Success) + require.Len(t, res.Steps, 2) + assert.Equal(t, "queue-a/1", res.Steps[0].StepID) + require.Len(t, res.Steps[0].OutputIDs, 1) + assert.NotEmpty(t, res.Steps[0].OutputIDs[0]) + assert.Equal(t, "queue-a/2", res.Steps[1].StepID) + require.Len(t, res.Steps[1].OutputIDs, 1) + assert.NotEmpty(t, res.Steps[1].OutputIDs[0]) +} + +func TestLand_UniqueOutputIDs(t *testing.T) { + v := New() + req := testRequest() + + res1, err := v.Land(context.Background(), req) + require.NoError(t, err) + res2, err := v.Land(context.Background(), req) + require.NoError(t, err) + + assert.NotEqual(t, res1.Steps[0].OutputIDs[0], res2.Steps[0].OutputIDs[0]) +} diff --git a/runway/extension/vcs/vcs.go b/runway/extension/vcs/vcs.go new file mode 100644 index 00000000..061ada28 --- /dev/null +++ b/runway/extension/vcs/vcs.go @@ -0,0 +1,58 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package vcs defines the pluggable interface for version-control operations +// that Runway performs on behalf of its callers. Implementations resolve change +// URIs, apply changes to a target branch, and (for a committing merge) push the +// result and finalize the change lifecycle (e.g. close PRs). +package vcs + +//go:generate mockgen -source=vcs.go -destination=mock/vcs_mock.go -package=mock + +import ( + "context" + "errors" + + "github.com/uber/submitqueue/runway/entity" +) + +// ErrConflict signals that the ordered steps could not be applied cleanly. +// Controllers treat this as an expected outcome (ack + publish a failure +// result), not an infrastructure error. +var ErrConflict = errors.New("merge conflict") + +// VCS performs version-control operations against a single landing target. +// Both methods accept the same MergeRequest payload; the behavioral difference +// is whether the result is committed to the remote. +type VCS interface { + // CheckMergeability performs a dry-run merge without committing. The + // returned MergeResult reports per-step mergeability; OutputIDs are empty. + CheckMergeability(ctx context.Context, req entity.MergeRequest) (entity.MergeResult, error) + // Land applies the ordered steps, commits the result to the remote, and + // reports per-step OutputIDs (the VCS-neutral revision identifiers produced). + Land(ctx context.Context, req entity.MergeRequest) (entity.MergeResult, error) +} + +// Config identifies the landing target a VCS instance operates on. The factory +// resolves deployment-specific details (remote URL, credentials) from this. +type Config struct { + // QueueName is the caller-provided queue name from the MergeRequest. + QueueName string +} + +// Factory creates VCS instances bound to a landing target. +type Factory interface { + // For returns a VCS instance configured for the given landing target. + For(cfg Config) (VCS, error) +} diff --git a/runway/orchestrator/controller/merge/BUILD.bazel b/runway/orchestrator/controller/merge/BUILD.bazel new file mode 100644 index 00000000..964b3812 --- /dev/null +++ b/runway/orchestrator/controller/merge/BUILD.bazel @@ -0,0 +1,40 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "merge", + srcs = ["merge.go"], + importpath = "github.com/uber/submitqueue/runway/orchestrator/controller/merge", + visibility = ["//visibility:public"], + deps = [ + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/metrics", + "//runway/core/topickey", + "//runway/entity", + "//runway/extension/vcs", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "merge_test", + srcs = ["merge_test.go"], + embed = [":merge"], + deps = [ + "//platform/base/change", + "//platform/base/mergestrategy", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/messagequeue/mock", + "//runway/core/topickey", + "//runway/entity", + "//runway/extension/vcs", + "//runway/extension/vcs/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/runway/orchestrator/controller/merge/merge.go b/runway/orchestrator/controller/merge/merge.go new file mode 100644 index 00000000..521e0e0a --- /dev/null +++ b/runway/orchestrator/controller/merge/merge.go @@ -0,0 +1,161 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "context" + "errors" + "fmt" + + "github.com/uber-go/tally" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/runway/extension/vcs" + "go.uber.org/zap" +) + +var _ consumer.Controller = (*Controller)(nil) + +// Controller handles merge queue messages. It applies the ordered steps, +// commits the result to the remote, and publishes per-step outcomes back +// to the signal queue. Conflicts are expected outcomes (ack + publish a +// failure result); infrastructure errors are nacked for retry. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + vcsFactory vcs.Factory + topicKey consumer.TopicKey + consumerGroup string +} + +// Params are the parameters for creating a new merge controller. +type Params struct { + Registry consumer.TopicRegistry + VCSFactory vcs.Factory + TopicKey consumer.TopicKey + ConsumerGroup string + + Scope tally.Scope + Logger *zap.SugaredLogger +} + +// NewController creates a new merge controller. +func NewController(p Params) *Controller { + return &Controller{ + logger: p.Logger.Named("merge_controller"), + metricsScope: p.Scope.SubScope("merge_controller"), + registry: p.Registry, + vcsFactory: p.VCSFactory, + topicKey: p.TopicKey, + consumerGroup: p.ConsumerGroup, + } +} + +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + req, err := entity.MergeRequestFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to deserialize merge request: %w", err) + } + + c.logger.Infow("received merge request", + "request_id", req.ID, + "queue", req.QueueName, + "step_count", len(req.Steps), + "attempt", delivery.Attempt(), + ) + + v, err := c.vcsFactory.For(vcs.Config{QueueName: req.QueueName}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1) + return fmt.Errorf("failed to build VCS for queue %s: %w", req.QueueName, err) + } + + result, err := v.Land(ctx, req) + switch { + case err == nil: + // Success — publish result with per-step output IDs. + case errors.Is(err, vcs.ErrConflict): + metrics.NamedCounter(c.metricsScope, opName, "conflicts", 1) + c.logger.Infow("merge conflict", + "request_id", req.ID, + ) + conflictResult := entity.MergeResult{ + ID: req.ID, + Success: false, + Reason: err.Error(), + } + if pubErr := c.publishResult(ctx, conflictResult, req.QueueName); pubErr != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish conflict result for request %s: %w", req.ID, pubErr) + } + return nil + default: + metrics.NamedCounter(c.metricsScope, opName, "land_errors", 1) + return fmt.Errorf("merge failed for request %s: %w", req.ID, err) + } + + if err := c.publishResult(ctx, result, req.QueueName); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish merge result for request %s: %w", req.ID, err) + } + + c.logger.Infow("published merge result", + "request_id", req.ID, + "success", result.Success, + "step_count", len(result.Steps), + ) + + return nil +} + +func (c *Controller) publishResult(ctx context.Context, result entity.MergeResult, partitionKey string) error { + payload, err := result.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize merge result: %w", err) + } + + q, ok := c.registry.Queue(topickey.TopicKeyMergeSignal) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyMergeSignal) + } + + topicName, ok := c.registry.TopicName(topickey.TopicKeyMergeSignal) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyMergeSignal) + } + + msg := entityqueue.NewMessage(result.ID, payload, partitionKey, nil) + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +func (c *Controller) Name() string { return "merge" } +func (c *Controller) TopicKey() consumer.TopicKey { return c.topicKey } +func (c *Controller) ConsumerGroup() string { return c.consumerGroup } diff --git a/runway/orchestrator/controller/merge/merge_test.go b/runway/orchestrator/controller/merge/merge_test.go new file mode 100644 index 00000000..9b616ec7 --- /dev/null +++ b/runway/orchestrator/controller/merge/merge_test.go @@ -0,0 +1,199 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/base/change" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/base/mergestrategy" + "github.com/uber/submitqueue/platform/consumer" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/runway/extension/vcs" + vcsmock "github.com/uber/submitqueue/runway/extension/vcs/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func testRequest() entity.MergeRequest { + return entity.MergeRequest{ + ID: "queue-a/42", + QueueName: "queue-a", + Steps: []entity.MergeStep{ + { + StepID: "queue-a/1", + Changes: []change.Change{{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}}, + Strategy: mergestrategy.MergeStrategyRebase, + }, + }, + } +} + +func captureRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error, captured *entityqueue.Message) consumer.TopicRegistry { + t.Helper() + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, msg entityqueue.Message) error { + if captured != nil { + *captured = msg + } + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyMergeSignal, Name: "merger-signal", Queue: mockQ}, + }) + require.NoError(t, err) + return registry +} + +func makeDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { + t.Helper() + msg := entityqueue.NewMessage("queue-a/42", payload, "queue-a", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return delivery +} + +func newTestController(t *testing.T, ctrl *gomock.Controller, mockVCS *vcsmock.MockVCS, publishErr error, captured *entityqueue.Message) *Controller { + t.Helper() + + factory := vcsmock.NewMockFactory(ctrl) + factory.EXPECT().For(gomock.Any()).Return(mockVCS, nil).AnyTimes() + + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + Registry: captureRegistry(t, ctrl, publishErr, captured), + VCSFactory: factory, + TopicKey: topickey.TopicKeyMerge, + ConsumerGroup: "runway-merge", + }) +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + controller := newTestController(t, ctrl, mockVCS, nil, nil) + + assert.Equal(t, "merge", controller.Name()) + assert.Equal(t, topickey.TopicKeyMerge, controller.TopicKey()) + assert.Equal(t, "runway-merge", controller.ConsumerGroup()) +} + +func TestProcess_Success(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + + expectedResult := entity.MergeResult{ + ID: "queue-a/42", + Success: true, + Steps: []entity.StepResult{{StepID: "queue-a/1", OutputIDs: []string{"abc123"}}}, + } + mockVCS.EXPECT().Land(gomock.Any(), gomock.Any()).Return(expectedResult, nil) + + var captured entityqueue.Message + controller := newTestController(t, ctrl, mockVCS, nil, &captured) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.NoError(t, controller.Process(context.Background(), delivery)) + + assert.Equal(t, "queue-a/42", captured.ID) + assert.Equal(t, "queue-a", captured.PartitionKey) + + result, err := entity.MergeResultFromBytes(captured.Payload) + require.NoError(t, err) + assert.True(t, result.Success) + require.Len(t, result.Steps, 1) + assert.Equal(t, []string{"abc123"}, result.Steps[0].OutputIDs) +} + +func TestProcess_Conflict(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().Land(gomock.Any(), gomock.Any()).Return(entity.MergeResult{}, fmt.Errorf("conflict in foo.go: %w", vcs.ErrConflict)) + + var captured entityqueue.Message + controller := newTestController(t, ctrl, mockVCS, nil, &captured) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.NoError(t, controller.Process(context.Background(), delivery)) + + result, err := entity.MergeResultFromBytes(captured.Payload) + require.NoError(t, err) + assert.False(t, result.Success) + assert.Contains(t, result.Reason, "conflict") +} + +func TestProcess_InfraError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().Land(gomock.Any(), gomock.Any()).Return(entity.MergeResult{}, fmt.Errorf("vcs unavailable")) + + controller := newTestController(t, ctrl, mockVCS, nil, nil) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.Error(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_DeserializeError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + controller := newTestController(t, ctrl, mockVCS, nil, nil) + delivery := makeDelivery(t, ctrl, []byte(`not json`)) + + require.Error(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_PublishError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().Land(gomock.Any(), gomock.Any()).Return(entity.MergeResult{ID: "queue-a/42", Success: true}, nil) + + controller := newTestController(t, ctrl, mockVCS, assert.AnError, nil) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.Error(t, controller.Process(context.Background(), delivery)) +} diff --git a/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel b/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel new file mode 100644 index 00000000..9a906fe7 --- /dev/null +++ b/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel @@ -0,0 +1,39 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mergeconflictcheck", + srcs = ["mergeconflictcheck.go"], + importpath = "github.com/uber/submitqueue/runway/orchestrator/controller/mergeconflictcheck", + visibility = ["//visibility:public"], + deps = [ + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/metrics", + "//runway/core/topickey", + "//runway/entity", + "//runway/extension/vcs", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "mergeconflictcheck_test", + srcs = ["mergeconflictcheck_test.go"], + embed = [":mergeconflictcheck"], + deps = [ + "//platform/base/change", + "//platform/base/mergestrategy", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/messagequeue/mock", + "//runway/core/topickey", + "//runway/entity", + "//runway/extension/vcs/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go new file mode 100644 index 00000000..b9805b55 --- /dev/null +++ b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go @@ -0,0 +1,139 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergeconflictcheck + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/runway/extension/vcs" + "go.uber.org/zap" +) + +var _ consumer.Controller = (*Controller)(nil) + +// Controller handles merge-conflict-check queue messages. It performs a dry-run +// merge and publishes per-step mergeability results back to the signal queue. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + vcsFactory vcs.Factory + topicKey consumer.TopicKey + consumerGroup string +} + +// Params are the parameters for creating a new merge-conflict-check controller. +type Params struct { + Registry consumer.TopicRegistry + VCSFactory vcs.Factory + TopicKey consumer.TopicKey + ConsumerGroup string + + Scope tally.Scope + Logger *zap.SugaredLogger +} + +// NewController creates a new merge-conflict-check controller. +func NewController(p Params) *Controller { + return &Controller{ + logger: p.Logger.Named("merge_conflict_check_controller"), + metricsScope: p.Scope.SubScope("merge_conflict_check_controller"), + registry: p.Registry, + vcsFactory: p.VCSFactory, + topicKey: p.TopicKey, + consumerGroup: p.ConsumerGroup, + } +} + +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + req, err := entity.MergeRequestFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to deserialize merge request: %w", err) + } + + c.logger.Infow("received merge-conflict-check request", + "request_id", req.ID, + "queue", req.QueueName, + "step_count", len(req.Steps), + "attempt", delivery.Attempt(), + ) + + v, err := c.vcsFactory.For(vcs.Config{QueueName: req.QueueName}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1) + return fmt.Errorf("failed to build VCS for queue %s: %w", req.QueueName, err) + } + + result, err := v.CheckMergeability(ctx, req) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "check_errors", 1) + return fmt.Errorf("merge-conflict check failed for request %s: %w", req.ID, err) + } + + if err := c.publishResult(ctx, result, req.QueueName); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish check result for request %s: %w", req.ID, err) + } + + c.logger.Infow("published merge-conflict-check result", + "request_id", req.ID, + "success", result.Success, + ) + + return nil +} + +func (c *Controller) publishResult(ctx context.Context, result entity.MergeResult, partitionKey string) error { + payload, err := result.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize merge result: %w", err) + } + + q, ok := c.registry.Queue(topickey.TopicKeyMergeConflictCheckSignal) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyMergeConflictCheckSignal) + } + + topicName, ok := c.registry.TopicName(topickey.TopicKeyMergeConflictCheckSignal) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyMergeConflictCheckSignal) + } + + msg := entityqueue.NewMessage(result.ID, payload, partitionKey, nil) + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +func (c *Controller) Name() string { return "merge-conflict-check" } +func (c *Controller) TopicKey() consumer.TopicKey { return c.topicKey } +func (c *Controller) ConsumerGroup() string { return c.consumerGroup } diff --git a/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go new file mode 100644 index 00000000..921dd954 --- /dev/null +++ b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go @@ -0,0 +1,188 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergeconflictcheck + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/base/change" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/base/mergestrategy" + "github.com/uber/submitqueue/platform/consumer" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/entity" + vcsmock "github.com/uber/submitqueue/runway/extension/vcs/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func testRequest() entity.MergeRequest { + return entity.MergeRequest{ + ID: "queue-a/42", + QueueName: "queue-a", + Steps: []entity.MergeStep{ + { + StepID: "queue-a/1", + Changes: []change.Change{{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}}, + Strategy: mergestrategy.MergeStrategyRebase, + }, + }, + } +} + +func captureRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error, captured *entityqueue.Message) consumer.TopicRegistry { + t.Helper() + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, msg entityqueue.Message) error { + if captured != nil { + *captured = msg + } + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyMergeConflictCheckSignal, Name: "merge-conflict-checker-signal", Queue: mockQ}, + }) + require.NoError(t, err) + return registry +} + +func makeDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { + t.Helper() + msg := entityqueue.NewMessage("queue-a/42", payload, "queue-a", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return delivery +} + +func newTestController(t *testing.T, ctrl *gomock.Controller, mockVCS *vcsmock.MockVCS, publishErr error, captured *entityqueue.Message) *Controller { + t.Helper() + + factory := vcsmock.NewMockFactory(ctrl) + factory.EXPECT().For(gomock.Any()).Return(mockVCS, nil).AnyTimes() + + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + Registry: captureRegistry(t, ctrl, publishErr, captured), + VCSFactory: factory, + TopicKey: topickey.TopicKeyMergeConflictCheck, + ConsumerGroup: "runway-merge-conflict-check", + }) +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + controller := newTestController(t, ctrl, mockVCS, nil, nil) + + assert.Equal(t, "merge-conflict-check", controller.Name()) + assert.Equal(t, topickey.TopicKeyMergeConflictCheck, controller.TopicKey()) + assert.Equal(t, "runway-merge-conflict-check", controller.ConsumerGroup()) +} + +func TestProcess_Success(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + + expectedResult := entity.MergeResult{ + ID: "queue-a/42", + Success: true, + Steps: []entity.StepResult{{StepID: "queue-a/1"}}, + } + mockVCS.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(expectedResult, nil) + + var captured entityqueue.Message + controller := newTestController(t, ctrl, mockVCS, nil, &captured) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.NoError(t, controller.Process(context.Background(), delivery)) + + assert.Equal(t, "queue-a/42", captured.ID) + assert.Equal(t, "queue-a", captured.PartitionKey) + + var result entity.MergeResult + result, err = entity.MergeResultFromBytes(captured.Payload) + require.NoError(t, err) + assert.True(t, result.Success) + assert.Equal(t, "queue-a/42", result.ID) +} + +func TestProcess_Errors(t *testing.T) { + tests := []struct { + name string + payload []byte + }{ + {name: "invalid json", payload: []byte(`not json`)}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + controller := newTestController(t, ctrl, mockVCS, nil, nil) + delivery := makeDelivery(t, ctrl, tt.payload) + + require.Error(t, controller.Process(context.Background(), delivery)) + }) + } +} + +func TestProcess_VCSError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(entity.MergeResult{}, fmt.Errorf("vcs unavailable")) + + controller := newTestController(t, ctrl, mockVCS, nil, nil) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.Error(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_PublishError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(entity.MergeResult{ID: "queue-a/42", Success: true}, nil) + + controller := newTestController(t, ctrl, mockVCS, assert.AnError, nil) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.Error(t, controller.Process(context.Background(), delivery)) +}