diff --git a/Makefile b/Makefile index 00a6655c..8ead26e7 100644 --- a/Makefile +++ b/Makefile @@ -53,7 +53,7 @@ define assert_clean fi endef -.PHONY: build build-all-linux build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-orchestrator-start local-runway-orchestrator-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-runway-orchestrator run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help +.PHONY: build build-all-linux build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-orchestrator-start local-runway-orchestrator-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-init-stovepipe-schemas local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-runway-orchestrator run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help build: ## Build all services and examples @@ -224,13 +224,31 @@ local-init-submitqueue-schemas: ## Manually apply all database schemas @echo "✅ All schemas applied successfully" local-init-stovepipe-queue-schema: ## Apply queue schema only (mysql-queue) for Stovepipe compose stacks - @echo "Applying queue schema to mysql-queue (Stovepipe; no app storage/counter schema yet)..." + @echo "Applying queue schema to mysql-queue (Stovepipe; orchestrator example does not use app storage yet)..." @for file in platform/extension/messagequeue/mysql/schema/*.sql; do \ echo " - Applying $$(basename $$file)..."; \ docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ done @echo "✅ Stovepipe queue schema applied successfully" +local-init-stovepipe-schemas: ## Apply all Stovepipe database schemas (storage + counter to mysql-app, queue to mysql-queue) + @echo "Applying storage schema to mysql-app..." + @for file in stovepipe/extension/storage/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-app-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "Applying counter schema to mysql-app..." + @for file in platform/extension/counter/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-app-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "Applying queue schema to mysql-queue..." + @for file in platform/extension/messagequeue/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "✅ All Stovepipe schemas applied successfully" + local-init-runway-queue-schema: ## Apply queue schema only (mysql-queue) for Runway compose stacks @echo "Applying queue schema to mysql-queue (Runway; consumes the merge queues)..." @for file in platform/extension/messagequeue/mysql/schema/*.sql; do \ @@ -331,8 +349,8 @@ local-stovepipe-logs: ## View logs from all running Stovepipe services local-stovepipe-start: build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux ## Start full Stovepipe stack (gateway + orchestrator + MySQL) @echo "Starting full Stovepipe stack with compose..." @$(COMPOSE) -f $(STOVEPIPE_STACK_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) up -d --build --wait - @echo "Applying queue schema to mysql-queue (no Stovepipe app schema yet)..." - @$(MAKE) -s local-init-stovepipe-queue-schema + @echo "Applying database schemas..." + @$(MAKE) -s local-init-stovepipe-schemas @echo "" @echo "✅ Full Stovepipe stack is running!" @echo "" @@ -366,8 +384,8 @@ local-stovepipe-orchestrator-start: build-stovepipe-orchestrator-linux ## Start local-stovepipe-gateway-start: build-stovepipe-gateway-linux ## Start Stovepipe gateway locally (gateway + 2 MySQL databases) @echo "Starting Stovepipe gateway with compose..." @$(COMPOSE) -f $(STOVEPIPE_GATEWAY_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) up -d --build --wait - @echo "Applying queue schema to mysql-queue (no Stovepipe app schema yet)..." - @$(MAKE) -s local-init-stovepipe-queue-schema + @echo "Applying database schemas..." + @$(MAKE) -s local-init-stovepipe-schemas @echo "" @echo "✅ Stovepipe gateway is running!" @echo "" diff --git a/example/stovepipe/gateway/server/BUILD.bazel b/example/stovepipe/gateway/server/BUILD.bazel index ea216c0c..48bf932d 100644 --- a/example/stovepipe/gateway/server/BUILD.bazel +++ b/example/stovepipe/gateway/server/BUILD.bazel @@ -12,10 +12,23 @@ go_library( visibility = ["//visibility:private"], deps = [ "//api/stovepipe/gateway/protopb", + "//platform/consumer", + "//platform/errs", + "//platform/errs/generic", + "//platform/errs/mysql", + "//platform/extension/counter/mysql", + "//platform/extension/messagequeue", + "//platform/extension/messagequeue/mysql", + "//stovepipe/core/topickey", + "//stovepipe/extension/storage/mysql", "//stovepipe/gateway/controller", + "//stovepipe/gateway/controller/log", + "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally//:tally", "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", "@org_golang_google_grpc//reflection", + "@org_golang_google_grpc//status", "@org_uber_go_zap//:zap", ], ) diff --git a/example/stovepipe/gateway/server/docker-compose.yml b/example/stovepipe/gateway/server/docker-compose.yml index 59660f17..b93adbde 100644 --- a/example/stovepipe/gateway/server/docker-compose.yml +++ b/example/stovepipe/gateway/server/docker-compose.yml @@ -1,8 +1,8 @@ # Docker Compose for Stovepipe gateway manual testing # # Mirrors example/submitqueue/gateway/server/docker-compose.yml: same MySQL pair, healthchecks, -# env wiring, and startup ordering. The Stovepipe gateway binary is Ping-only today -# and does not open MySQL yet; variables are set so future work matches SubmitQueue. +# env wiring, and startup ordering. The gateway opens the app DB for the request log and SPID +# counter, and the queue DB to publish ingest requests and consume the log topic. # # IMPORTANT: Before running compose, build the Linux binary: # make build-stovepipe-gateway-linux @@ -12,7 +12,8 @@ # Quick start: # make local-stovepipe-gateway-start # -# After `up`, only the queue schema is applied (`local-init-stovepipe-queue-schema`). +# After `up`, all schemas are applied (`local-init-stovepipe-schemas`): storage + counter to +# mysql-app and the queue schema to mysql-queue. services: # Application Database - Stores business data (requests, counters, etc.) diff --git a/example/stovepipe/gateway/server/main.go b/example/stovepipe/gateway/server/main.go index ce693ebd..1367350d 100644 --- a/example/stovepipe/gateway/server/main.go +++ b/example/stovepipe/gateway/server/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "database/sql" "errors" "fmt" "net" @@ -25,18 +26,32 @@ import ( "syscall" "time" + _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally" pb "github.com/uber/submitqueue/api/stovepipe/gateway/protopb" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + genericerrs "github.com/uber/submitqueue/platform/errs/generic" + mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql" + mysqlcounter "github.com/uber/submitqueue/platform/extension/counter/mysql" + extqueue "github.com/uber/submitqueue/platform/extension/messagequeue" + queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql" + "github.com/uber/submitqueue/stovepipe/core/topickey" + mysqlstorage "github.com/uber/submitqueue/stovepipe/extension/storage/mysql" "github.com/uber/submitqueue/stovepipe/gateway/controller" + logctrl "github.com/uber/submitqueue/stovepipe/gateway/controller/log" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" ) -// GatewayServer wraps the controller and implements the gRPC service interface. +// GatewayServer wraps the controllers and implements the gRPC service interface. type GatewayServer struct { pb.UnimplementedStovepipeGatewayServer - pingController *controller.PingController + pingController *controller.PingController + ingestController *controller.IngestController } // Ping delegates to the controller. @@ -44,6 +59,11 @@ func (s *GatewayServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.Ping return s.pingController.Ping(ctx, req) } +// Ingest delegates to the controller. +func (s *GatewayServer) Ingest(ctx context.Context, req *pb.IngestRequest) (*pb.IngestResponse, error) { + return s.ingestController.Ingest(ctx, req) +} + func main() { code := 0 if err := run(); err != nil { @@ -105,19 +125,142 @@ func run() error { metricsWgDone.Wait() }() - // Create gRPC server - grpcServer := grpc.NewServer() + // Open application database connection. + // Docker Compose healthchecks ensure MySQL is ready before service starts. + appDSN := os.Getenv("MYSQL_DSN") + if appDSN == "" { + return fmt.Errorf("MYSQL_DSN environment variable is required") + } + appDB, err := sql.Open("mysql", appDSN) + if err != nil { + return fmt.Errorf("failed to open app database: %w", err) + } + defer appDB.Close() + + // Initialize counter from shared app database connection. The ingest controller uses it to + // mint a SPID per ingest request. + cnt := mysqlcounter.NewCounter(appDB, scope.SubScope("counter")) + + // Open queue database connection + queueDSN := os.Getenv("QUEUE_MYSQL_DSN") + if queueDSN == "" { + return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required") + } + queueDB, err := sql.Open("mysql", queueDSN) + if err != nil { + return fmt.Errorf("failed to open queue database: %w", err) + } + defer queueDB.Close() + + // Initialize queue + mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: queueDB, + Logger: logger, + MetricsScope: scope.SubScope("queue"), + }) + if err != nil { + return fmt.Errorf("failed to create queue: %w", err) + } + defer mysqlQueue.Close() + + logger.Info("initialized dependencies", + zap.String("app_dsn", appDSN), + zap.String("queue_dsn", queueDSN), + ) - // Create ping controller and wrap it for gRPC + // Subscriber name for the log-topic consumer. It must be unique per running + // instance: SubscriberName identifies a subscriber for partition leases, so + // two gateway processes on the same host (sharing HOSTNAME) would otherwise + // contend for the same lease. Append the PID to keep co-located instances + // distinct; the PID is stable for the life of the process. Offset tracking + // stays keyed on the shared ConsumerGroup ("gateway-log"), not this name. + // Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs). + hostname := os.Getenv("HOSTNAME") + if hostname == "" { + hostname = fmt.Sprintf("stovepipe-gateway-%d", time.Now().Unix()) + } + subscriberName := fmt.Sprintf("%s-%d", hostname, os.Getpid()) + + // Build the topic registry. The gateway publishes ingest requests to the start of the + // orchestrator pipeline (TopicKeyStart) — publish-only. It additionally consumes the log topic + // (TopicKeyLog): the gateway is the sole writer of the request log, persisting entries that the + // orchestrator publishes there. + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyStart, Name: "start", Queue: mysqlQueue}, + { + Key: topickey.TopicKeyLog, + Name: "log", + Queue: mysqlQueue, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "gateway-log", + ), + }, + }) + if err != nil { + return fmt.Errorf("failed to create topic registry: %w", err) + } + + // Create gRPC server with a unary interceptor that translates user-input + // validation errors (anything in the chain that matches controller.ErrInvalidRequest) + // into codes.InvalidArgument so gRPC clients can distinguish bad input from + // infrastructure failures. Other errors pass through unchanged. + grpcServer := grpc.NewServer(grpc.UnaryInterceptor( + func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + resp, err := handler(ctx, req) + if err != nil && controller.IsInvalidRequest(err) { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + return resp, err + }, + )) + + // Initialize storage from the shared app database connection. The ingest controller writes the + // accepted entry to the request log directly; the log consumer (registered below) is the sole + // persister of request log entries published by the orchestrator. + store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage")) + if err != nil { + return fmt.Errorf("failed to create storage: %w", err) + } + requestLogStore := store.GetRequestLogStore() + + // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - srv := &GatewayServer{ - pingController: pingController, + ingestController := controller.NewIngestController(logger.Sugar(), scope, cnt, requestLogStore, registry) + gatewayServer := &GatewayServer{ + pingController: pingController, + ingestController: ingestController, } - pb.RegisterStovepipeGatewayServer(grpcServer, srv) + + pb.RegisterStovepipeGatewayServer(grpcServer, gatewayServer) // Register reflection service for debugging with grpcurl reflection.Register(grpcServer) + // Create the queue consumer and register the log controller. The gateway is + // the sole persister of the request log: the orchestrator publishes entries + // to the log topic and this consumer writes them to storage. + logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + errs.NewClassifierProcessor( + // Storage (stovepipe/extension/storage/mysql) and queue (platform/extension/messagequeue/mysql) + // both run on the same MySQL driver, so a single classifier covers + // errors surfaced from either backend. + genericerrs.Classifier, + mysqlerrs.Classifier, + ), + ) + + logController := logctrl.NewController(logger.Sugar(), scope, requestLogStore, topickey.TopicKeyLog, "gateway-log") + if err := logConsumer.Register(logController); err != nil { + return fmt.Errorf("failed to register log controller: %w", err) + } + + if err := logConsumer.Start(ctx); err != nil { + // The error can also be a result of a context cancellation due to SIGINT or SIGTERM. + // This is expected, just propagate it. + return fmt.Errorf("failed to start log consumer: %w", err) + } + logger.Info("log consumer started") + // Listen on configurable port port := os.Getenv("PORT") if port == "" { @@ -137,9 +280,11 @@ func run() error { serverErrCh <- grpcServer.Serve(listener) }() - // Wait for interrupt signal or server critical error - // If interruption is signaled, gracefully stop the server - // If an error happens during shutdown, return the actual error, not the context cancellation error + // Wait for interrupt signal or server critical error. + // If interruption is signaled, gracefully stop the server. + // If the server exits with an error, cancel the context to signal the consumer. + // After this, stop the consumer. + // If an error happens during shutdown, return the actual error, not the context cancellation error. var serverErr error select { case <-ctx.Done(): @@ -155,10 +300,27 @@ func run() error { serverErr = <-serverErrCh case serverErr = <-serverErrCh: fmt.Println("Shutting down stovepipe gateway server due to critical GRPC server error...") + + // Cancel the context to signal cancellation to the queue consumer + cancel() } if serverErr != nil { - err = fmt.Errorf("GRPC server exited with error: %w", serverErr) + serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + // Stop the consumer with a 30s timeout; by this time the context should be + // cancelled and the processing threads may already be exiting; recollect them. + errStop := logConsumer.Stop(30000) + if errStop != nil { + errStop = fmt.Errorf("failed to stop consumer: %w", errStop) + } + + if errStop != nil || serverErr != nil { + // Override context cancellation error with the shutdown error. The server + // error is the primary/root failure, so it leads; the consumer-stop error + // is secondary cleanup. + err = errors.Join(serverErr, errStop) } return err diff --git a/stovepipe/core/topickey/topickey.go b/stovepipe/core/topickey/topickey.go index ccab7f3f..374ef57c 100644 --- a/stovepipe/core/topickey/topickey.go +++ b/stovepipe/core/topickey/topickey.go @@ -28,4 +28,7 @@ const ( // TopicKeyBatch is the pipeline stage where validated commits are aggregated, since the last // known green, into a contiguous validation batch. TopicKeyBatch TopicKey = "batch" + // TopicKeyLog is the gateway-owned sink topic for append-only request log events. The + // orchestrator publishes log entries here; the gateway is the sole consumer that persists them. + TopicKeyLog TopicKey = "log" ) diff --git a/stovepipe/gateway/controller/BUILD.bazel b/stovepipe/gateway/controller/BUILD.bazel index 56a6a494..a40d2377 100644 --- a/stovepipe/gateway/controller/BUILD.bazel +++ b/stovepipe/gateway/controller/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//platform/metrics", "//stovepipe/core/topickey", "//stovepipe/entity", + "//stovepipe/extension/storage", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -39,6 +40,7 @@ go_test( "//platform/extension/messagequeue/mock", "//stovepipe/core/topickey", "//stovepipe/entity", + "//stovepipe/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", diff --git a/stovepipe/gateway/controller/ingest.go b/stovepipe/gateway/controller/ingest.go index 5154d5ae..4bb12b9a 100644 --- a/stovepipe/gateway/controller/ingest.go +++ b/stovepipe/gateway/controller/ingest.go @@ -29,6 +29,7 @@ import ( "github.com/uber/submitqueue/platform/metrics" "github.com/uber/submitqueue/stovepipe/core/topickey" "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/storage" "go.uber.org/zap" ) @@ -43,21 +44,23 @@ func IsInvalidRequest(err error) bool { // IngestController handles ingest business logic for the stovepipe gateway. type IngestController struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - counter counter.Counter - registry consumer.TopicRegistry + logger *zap.SugaredLogger + metricsScope tally.Scope + counter counter.Counter + requestLogStore storage.RequestLogStore + registry consumer.TopicRegistry } // NewIngestController creates a new instance of the stovepipe ingest controller. -// The controller publishes ingest requests to the topic registered under -// topickey.TopicKeyStart in the registry. -func NewIngestController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, registry consumer.TopicRegistry) *IngestController { +// The controller records an accepted entry in the request log before publishing +// ingest requests to the topic registered under topickey.TopicKeyStart in the registry. +func NewIngestController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, requestLogStore storage.RequestLogStore, registry consumer.TopicRegistry) *IngestController { return &IngestController{ - logger: logger, - metricsScope: scope.SubScope("ingest_controller"), - counter: counter, - registry: registry, + logger: logger, + metricsScope: scope.SubScope("ingest_controller"), + counter: counter, + requestLogStore: requestLogStore, + registry: registry, } } @@ -95,6 +98,15 @@ func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (r "change_uris", ingestRequest.Change.URIs, ) + // Record the accepted status in the request log for reconciliation before publishing to the + // queue for processing. The gateway is the sole owner of the request log and must record the + // status synchronously (written straight to storage, not via the queue) so it stays consistent + // with what callers observe the moment Ingest returns. + logEntry := entity.NewRequestLog(ingestRequest.ID, entity.RequestStatusAccepted, 0, "", nil) + if err := c.requestLogStore.Insert(ctx, logEntry); err != nil { + return nil, fmt.Errorf("IngestController failed to insert request log for spid=%s: %w", ingestRequest.ID, err) + } + if err := c.publishToQueue(ctx, ingestRequest); err != nil { return nil, fmt.Errorf("IngestController failed to publish request to queue: %w", err) } diff --git a/stovepipe/gateway/controller/ingest_test.go b/stovepipe/gateway/controller/ingest_test.go index 19d2c644..3b568595 100644 --- a/stovepipe/gateway/controller/ingest_test.go +++ b/stovepipe/gateway/controller/ingest_test.go @@ -30,10 +30,20 @@ import ( queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" "github.com/uber/submitqueue/stovepipe/core/topickey" "github.com/uber/submitqueue/stovepipe/entity" + storagemock "github.com/uber/submitqueue/stovepipe/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap" ) +// noopLogStore returns a mock RequestLogStore whose Insert silently succeeds. It is used by +// tests that are not exercising the request-log write path (AnyTimes tolerates zero calls for +// requests that fail validation before the log is written). +func noopLogStore(ctrl *gomock.Controller) *storagemock.MockRequestLogStore { + store := storagemock.NewMockRequestLogStore(ctrl) + store.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + return store +} + const ( testGitURI = "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01" testQueueName = "stovepipe-monorepo" @@ -66,7 +76,7 @@ func newIngestTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controlle func TestNewIngestController(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopLogStore(ctrl), newIngestTestRegistryWithNoopPublisher(t, ctrl)) require.NotNil(t, c) } @@ -75,7 +85,7 @@ func TestIngest_ReturnsSPID(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) - c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopLogStore(ctrl), newIngestTestRegistryWithNoopPublisher(t, ctrl)) resp, err := c.Ingest(context.Background(), &pb.IngestRequest{ Queue: testQueueName, Change: &changepb.Change{Uris: []string{testGitURI}}, @@ -96,7 +106,7 @@ func TestIngest_CounterDomainIncludesQueue(t *testing.T) { }, ) - c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopLogStore(ctrl), newIngestTestRegistryWithNoopPublisher(t, ctrl)) _, err := c.Ingest(context.Background(), &pb.IngestRequest{ Queue: testQueueName, Change: &changepb.Change{Uris: []string{testGitURI}}, @@ -111,7 +121,7 @@ func TestIngest_ReturnsErrorOnCounterFailure(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) - c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopLogStore(ctrl), newIngestTestRegistryWithNoopPublisher(t, ctrl)) _, err := c.Ingest(context.Background(), &pb.IngestRequest{ Queue: testQueueName, Change: &changepb.Change{Uris: []string{testGitURI}}, @@ -124,7 +134,7 @@ func TestIngest_ReturnsErrorOnEmptyQueue(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopLogStore(ctrl), newIngestTestRegistryWithNoopPublisher(t, ctrl)) _, err := c.Ingest(context.Background(), &pb.IngestRequest{ Queue: "", Change: &changepb.Change{Uris: []string{testGitURI}}, @@ -138,7 +148,7 @@ func TestIngest_ReturnsErrorOnNilChange(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopLogStore(ctrl), newIngestTestRegistryWithNoopPublisher(t, ctrl)) _, err := c.Ingest(context.Background(), &pb.IngestRequest{ Queue: testQueueName, Change: nil, @@ -152,7 +162,7 @@ func TestIngest_ReturnsErrorOnEmptyChangeURIs(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopLogStore(ctrl), newIngestTestRegistryWithNoopPublisher(t, ctrl)) _, err := c.Ingest(context.Background(), &pb.IngestRequest{ Queue: testQueueName, Change: &changepb.Change{Uris: []string{}}, @@ -179,7 +189,7 @@ func TestIngest_PublishesToQueue(t *testing.T) { }, ) - c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, registry) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopLogStore(ctrl), registry) resp, err := c.Ingest(context.Background(), &pb.IngestRequest{ Queue: testQueueName, Change: &changepb.Change{Uris: []string{testGitURI}}, @@ -206,7 +216,56 @@ func TestIngest_ReturnsErrorOnPublishFailure(t *testing.T) { registry, publisher := newIngestTestRegistry(t, ctrl) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) - c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, registry) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopLogStore(ctrl), registry) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.Error(t, err) +} + +func TestIngest_RecordsAcceptedLog(t *testing.T) { + var captured entity.RequestLog + + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(7), nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, log entity.RequestLog) error { + captured = log + return nil + }, + ) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, logStore, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + resp, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.NoError(t, err) + assert.Equal(t, "stovepipe-monorepo/7", resp.Spid) + assert.Equal(t, "stovepipe-monorepo/7", captured.RequestID) + assert.Equal(t, entity.RequestStatusAccepted, captured.Status) +} + +func TestIngest_ReturnsErrorOnLogInsertFailure(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(fmt.Errorf("database unavailable")) + + // The request log write happens before publishing, so a log failure must abort + // before anything is published to the queue. + registry, publisher := newIngestTestRegistry(t, ctrl) + publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, logStore, registry) _, err := c.Ingest(context.Background(), &pb.IngestRequest{ Queue: testQueueName, Change: &changepb.Change{Uris: []string{testGitURI}}, diff --git a/stovepipe/gateway/controller/log/BUILD.bazel b/stovepipe/gateway/controller/log/BUILD.bazel new file mode 100644 index 00000000..de8a5c82 --- /dev/null +++ b/stovepipe/gateway/controller/log/BUILD.bazel @@ -0,0 +1,33 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "log", + srcs = ["log.go"], + importpath = "github.com/uber/submitqueue/stovepipe/gateway/controller/log", + visibility = ["//visibility:public"], + deps = [ + "//platform/consumer", + "//platform/metrics", + "//stovepipe/entity", + "//stovepipe/extension/storage", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "log_test", + srcs = ["log_test.go"], + embed = [":log"], + deps = [ + "//platform/base/messagequeue", + "//platform/extension/messagequeue/mock", + "//stovepipe/core/topickey", + "//stovepipe/entity", + "//stovepipe/extension/storage/mock", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/stovepipe/gateway/controller/log/log.go b/stovepipe/gateway/controller/log/log.go new file mode 100644 index 00000000..328f8493 --- /dev/null +++ b/stovepipe/gateway/controller/log/log.go @@ -0,0 +1,113 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package log holds the gateway log-topic consumer that persists request log entries. +package log + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/storage" + "go.uber.org/zap" +) + +// Controller handles log queue messages. +// It consumes request log entries and persists them to storage. +// Implements consumer.Controller interface for integration with the consumer. +// +// The request log is written exclusively by the gateway: other services +// (e.g. the orchestrator) only publish log entries to the log topic, and this +// controller is the single consumer that persists them to storage. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + requestLogStore storage.RequestLogStore + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new log controller for the gateway. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + requestLogStore storage.RequestLogStore, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("log_controller"), + metricsScope: scope.SubScope("log_controller"), + requestLogStore: requestLogStore, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process processes a log delivery from the queue. +// Deserializes the request log entry and persists it to storage. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + // Deserialize request log entry + logEntry, err := entity.RequestLogFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return fmt.Errorf("failed to deserialize request log: %w", err) + } + + c.logger.Debugw("received request log entry", + "request_id", logEntry.RequestID, + "status", string(logEntry.Status), + "request_version", logEntry.RequestVersion, + "attempt", delivery.Attempt(), + ) + + // Persist request log to storage + if err := c.requestLogStore.Insert(ctx, logEntry); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to insert request log: %w", err) + } + + return nil // Success - message will be acked +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "log" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/stovepipe/gateway/controller/log/log_test.go b/stovepipe/gateway/controller/log/log_test.go new file mode 100644 index 00000000..8145a82b --- /dev/null +++ b/stovepipe/gateway/controller/log/log_test.go @@ -0,0 +1,111 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/stovepipe/core/topickey" + "github.com/uber/submitqueue/stovepipe/entity" + storagemock "github.com/uber/submitqueue/stovepipe/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func TestController_Process(t *testing.T) { + tests := []struct { + name string + logEntry *entity.RequestLog // nil means use rawPayload instead + rawPayload []byte // used when logEntry is nil (e.g. invalid JSON) + setupStore func(*gomock.Controller) *storagemock.MockRequestLogStore + wantErr bool + }{ + { + name: "success", + logEntry: newRequestLog( + "stovepipe-monorepo/1", entity.RequestStatusStarted, 1, "", nil, + ), + setupStore: func(ctrl *gomock.Controller) *storagemock.MockRequestLogStore { + store := storagemock.NewMockRequestLogStore(ctrl) + store.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + return store + }, + wantErr: false, + }, + { + name: "invalid JSON", + rawPayload: []byte(`{"invalid": json"}`), + setupStore: func(ctrl *gomock.Controller) *storagemock.MockRequestLogStore { + return storagemock.NewMockRequestLogStore(ctrl) + }, + wantErr: true, + }, + { + name: "storage failure", + logEntry: newRequestLog( + "stovepipe-monorepo/2", entity.RequestStatusError, 3, "build infrastructure unavailable", map[string]string{"step": "build"}, + ), + setupStore: func(ctrl *gomock.Controller) *storagemock.MockRequestLogStore { + store := storagemock.NewMockRequestLogStore(ctrl) + store.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(fmt.Errorf("database connection failed")) + return store + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + var payload []byte + if tt.logEntry != nil { + var err error + payload, err = tt.logEntry.ToBytes() + require.NoError(t, err) + } else { + payload = tt.rawPayload + } + + store := tt.setupStore(ctrl) + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, topickey.TopicKeyLog, "gateway-log") + + msg := entityqueue.NewMessage("stovepipe-monorepo/1", payload, "stovepipe-monorepo", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +// newRequestLog is a helper that returns a pointer to a RequestLog for use in test tables. +func newRequestLog(requestID string, status entity.RequestStatus, requestVersion int32, lastError string, metadata map[string]string) *entity.RequestLog { + log := entity.NewRequestLog(requestID, status, requestVersion, lastError, metadata) + return &log +}