Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ define assert_clean
fi
endef

.PHONY: build build-all-linux build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-orchestrator-start local-runway-orchestrator-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-runway-orchestrator run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help
.PHONY: build build-all-linux build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-orchestrator-start local-runway-orchestrator-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-init-stovepipe-schemas local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-runway-orchestrator run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help


build: ## Build all services and examples
Expand Down Expand Up @@ -224,13 +224,31 @@ local-init-submitqueue-schemas: ## Manually apply all database schemas
@echo "✅ All schemas applied successfully"

local-init-stovepipe-queue-schema: ## Apply queue schema only (mysql-queue) for Stovepipe compose stacks
@echo "Applying queue schema to mysql-queue (Stovepipe; no app storage/counter schema yet)..."
@echo "Applying queue schema to mysql-queue (Stovepipe; orchestrator example does not use app storage yet)..."
@for file in platform/extension/messagequeue/mysql/schema/*.sql; do \
echo " - Applying $$(basename $$file)..."; \
docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \
done
@echo "✅ Stovepipe queue schema applied successfully"

local-init-stovepipe-schemas: ## Apply all Stovepipe database schemas (storage + counter to mysql-app, queue to mysql-queue)
@echo "Applying storage schema to mysql-app..."
@for file in stovepipe/extension/storage/mysql/schema/*.sql; do \
echo " - Applying $$(basename $$file)..."; \
docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-app-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \
done
@echo "Applying counter schema to mysql-app..."
@for file in platform/extension/counter/mysql/schema/*.sql; do \
echo " - Applying $$(basename $$file)..."; \
docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-app-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \
done
@echo "Applying queue schema to mysql-queue..."
@for file in platform/extension/messagequeue/mysql/schema/*.sql; do \
echo " - Applying $$(basename $$file)..."; \
docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \
done
@echo "✅ All Stovepipe schemas applied successfully"

local-init-runway-queue-schema: ## Apply queue schema only (mysql-queue) for Runway compose stacks
@echo "Applying queue schema to mysql-queue (Runway; consumes the merge queues)..."
@for file in platform/extension/messagequeue/mysql/schema/*.sql; do \
Expand Down Expand Up @@ -331,8 +349,8 @@ local-stovepipe-logs: ## View logs from all running Stovepipe services
local-stovepipe-start: build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux ## Start full Stovepipe stack (gateway + orchestrator + MySQL)
@echo "Starting full Stovepipe stack with compose..."
@$(COMPOSE) -f $(STOVEPIPE_STACK_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) up -d --build --wait
@echo "Applying queue schema to mysql-queue (no Stovepipe app schema yet)..."
@$(MAKE) -s local-init-stovepipe-queue-schema
@echo "Applying database schemas..."
@$(MAKE) -s local-init-stovepipe-schemas
@echo ""
@echo "✅ Full Stovepipe stack is running!"
@echo ""
Expand Down Expand Up @@ -366,8 +384,8 @@ local-stovepipe-orchestrator-start: build-stovepipe-orchestrator-linux ## Start
local-stovepipe-gateway-start: build-stovepipe-gateway-linux ## Start Stovepipe gateway locally (gateway + 2 MySQL databases)
@echo "Starting Stovepipe gateway with compose..."
@$(COMPOSE) -f $(STOVEPIPE_GATEWAY_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) up -d --build --wait
@echo "Applying queue schema to mysql-queue (no Stovepipe app schema yet)..."
@$(MAKE) -s local-init-stovepipe-queue-schema
@echo "Applying database schemas..."
@$(MAKE) -s local-init-stovepipe-schemas
@echo ""
@echo "✅ Stovepipe gateway is running!"
@echo ""
Expand Down
13 changes: 13 additions & 0 deletions example/stovepipe/gateway/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,23 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//api/stovepipe/gateway/protopb",
"//platform/consumer",
"//platform/errs",
"//platform/errs/generic",
"//platform/errs/mysql",
"//platform/extension/counter/mysql",
"//platform/extension/messagequeue",
"//platform/extension/messagequeue/mysql",
"//stovepipe/core/topickey",
"//stovepipe/extension/storage/mysql",
"//stovepipe/gateway/controller",
"//stovepipe/gateway/controller/log",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally//:tally",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//reflection",
"@org_golang_google_grpc//status",
"@org_uber_go_zap//:zap",
],
)
Expand Down
7 changes: 4 additions & 3 deletions example/stovepipe/gateway/server/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Docker Compose for Stovepipe gateway manual testing
#
# Mirrors example/submitqueue/gateway/server/docker-compose.yml: same MySQL pair, healthchecks,
# env wiring, and startup ordering. The Stovepipe gateway binary is Ping-only today
# and does not open MySQL yet; variables are set so future work matches SubmitQueue.
# env wiring, and startup ordering. The gateway opens the app DB for the request log and SPID
# counter, and the queue DB to publish ingest requests and consume the log topic.
#
# IMPORTANT: Before running compose, build the Linux binary:
# make build-stovepipe-gateway-linux
Expand All @@ -12,7 +12,8 @@
# Quick start:
# make local-stovepipe-gateway-start
#
# After `up`, only the queue schema is applied (`local-init-stovepipe-queue-schema`).
# After `up`, all schemas are applied (`local-init-stovepipe-schemas`): storage + counter to
# mysql-app and the queue schema to mysql-queue.

services:
# Application Database - Stores business data (requests, counters, etc.)
Expand Down
186 changes: 174 additions & 12 deletions example/stovepipe/gateway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"database/sql"
"errors"
"fmt"
"net"
Expand All @@ -25,25 +26,44 @@ import (
"syscall"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally"
pb "github.com/uber/submitqueue/api/stovepipe/gateway/protopb"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/errs"
genericerrs "github.com/uber/submitqueue/platform/errs/generic"
mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql"
mysqlcounter "github.com/uber/submitqueue/platform/extension/counter/mysql"
extqueue "github.com/uber/submitqueue/platform/extension/messagequeue"
queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql"
"github.com/uber/submitqueue/stovepipe/core/topickey"
mysqlstorage "github.com/uber/submitqueue/stovepipe/extension/storage/mysql"
"github.com/uber/submitqueue/stovepipe/gateway/controller"
logctrl "github.com/uber/submitqueue/stovepipe/gateway/controller/log"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)

// GatewayServer wraps the controller and implements the gRPC service interface.
// GatewayServer wraps the controllers and implements the gRPC service interface.
type GatewayServer struct {
pb.UnimplementedStovepipeGatewayServer
pingController *controller.PingController
pingController *controller.PingController
ingestController *controller.IngestController
}

// Ping delegates to the controller.
func (s *GatewayServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
return s.pingController.Ping(ctx, req)
}

// Ingest delegates to the controller.
func (s *GatewayServer) Ingest(ctx context.Context, req *pb.IngestRequest) (*pb.IngestResponse, error) {
return s.ingestController.Ingest(ctx, req)
}

func main() {
code := 0
if err := run(); err != nil {
Expand Down Expand Up @@ -105,19 +125,142 @@ func run() error {
metricsWgDone.Wait()
}()

// Create gRPC server
grpcServer := grpc.NewServer()
// Open application database connection.
// Docker Compose healthchecks ensure MySQL is ready before service starts.
appDSN := os.Getenv("MYSQL_DSN")
if appDSN == "" {
return fmt.Errorf("MYSQL_DSN environment variable is required")
}
appDB, err := sql.Open("mysql", appDSN)
if err != nil {
return fmt.Errorf("failed to open app database: %w", err)
}
defer appDB.Close()

// Initialize counter from shared app database connection. The ingest controller uses it to
// mint a SPID per ingest request.
cnt := mysqlcounter.NewCounter(appDB, scope.SubScope("counter"))

// Open queue database connection
queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
if queueDSN == "" {
return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required")
}
queueDB, err := sql.Open("mysql", queueDSN)
if err != nil {
return fmt.Errorf("failed to open queue database: %w", err)
}
defer queueDB.Close()

// Initialize queue
mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{
DB: queueDB,
Logger: logger,
MetricsScope: scope.SubScope("queue"),
})
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}
defer mysqlQueue.Close()

logger.Info("initialized dependencies",
zap.String("app_dsn", appDSN),
zap.String("queue_dsn", queueDSN),
)

// Create ping controller and wrap it for gRPC
// Subscriber name for the log-topic consumer. It must be unique per running
// instance: SubscriberName identifies a subscriber for partition leases, so
// two gateway processes on the same host (sharing HOSTNAME) would otherwise
// contend for the same lease. Append the PID to keep co-located instances
// distinct; the PID is stable for the life of the process. Offset tracking
// stays keyed on the shared ConsumerGroup ("gateway-log"), not this name.
// Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs).
hostname := os.Getenv("HOSTNAME")
if hostname == "" {
hostname = fmt.Sprintf("stovepipe-gateway-%d", time.Now().Unix())
}
subscriberName := fmt.Sprintf("%s-%d", hostname, os.Getpid())

// Build the topic registry. The gateway publishes ingest requests to the start of the
// orchestrator pipeline (TopicKeyStart) — publish-only. It additionally consumes the log topic
// (TopicKeyLog): the gateway is the sole writer of the request log, persisting entries that the
// orchestrator publishes there.
registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{
{Key: topickey.TopicKeyStart, Name: "start", Queue: mysqlQueue},
{
Key: topickey.TopicKeyLog,
Name: "log",
Queue: mysqlQueue,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "gateway-log",
),
},
})
if err != nil {
return fmt.Errorf("failed to create topic registry: %w", err)
}

// Create gRPC server with a unary interceptor that translates user-input
// validation errors (anything in the chain that matches controller.ErrInvalidRequest)
// into codes.InvalidArgument so gRPC clients can distinguish bad input from
// infrastructure failures. Other errors pass through unchanged.
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
if err != nil && controller.IsInvalidRequest(err) {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return resp, err
},
))

// Initialize storage from the shared app database connection. The ingest controller writes the
// accepted entry to the request log directly; the log consumer (registered below) is the sole
// persister of request log entries published by the orchestrator.
store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage"))
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
requestLogStore := store.GetRequestLogStore()

// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
srv := &GatewayServer{
pingController: pingController,
ingestController := controller.NewIngestController(logger.Sugar(), scope, cnt, requestLogStore, registry)
gatewayServer := &GatewayServer{
pingController: pingController,
ingestController: ingestController,
}
pb.RegisterStovepipeGatewayServer(grpcServer, srv)

pb.RegisterStovepipeGatewayServer(grpcServer, gatewayServer)

// Register reflection service for debugging with grpcurl
reflection.Register(grpcServer)

// Create the queue consumer and register the log controller. The gateway is
// the sole persister of the request log: the orchestrator publishes entries
// to the log topic and this consumer writes them to storage.
logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry,
errs.NewClassifierProcessor(
// Storage (stovepipe/extension/storage/mysql) and queue (platform/extension/messagequeue/mysql)
// both run on the same MySQL driver, so a single classifier covers
// errors surfaced from either backend.
genericerrs.Classifier,
mysqlerrs.Classifier,
),
)

logController := logctrl.NewController(logger.Sugar(), scope, requestLogStore, topickey.TopicKeyLog, "gateway-log")
if err := logConsumer.Register(logController); err != nil {
return fmt.Errorf("failed to register log controller: %w", err)
}

if err := logConsumer.Start(ctx); err != nil {
// The error can also be a result of a context cancellation due to SIGINT or SIGTERM.
// This is expected, just propagate it.
return fmt.Errorf("failed to start log consumer: %w", err)
}
logger.Info("log consumer started")

// Listen on configurable port
port := os.Getenv("PORT")
if port == "" {
Expand All @@ -137,9 +280,11 @@ func run() error {
serverErrCh <- grpcServer.Serve(listener)
}()

// Wait for interrupt signal or server critical error
// If interruption is signaled, gracefully stop the server
// If an error happens during shutdown, return the actual error, not the context cancellation error
// Wait for interrupt signal or server critical error.
// If interruption is signaled, gracefully stop the server.
// If the server exits with an error, cancel the context to signal the consumer.
// After this, stop the consumer.
// If an error happens during shutdown, return the actual error, not the context cancellation error.
var serverErr error
select {
case <-ctx.Done():
Expand All @@ -155,10 +300,27 @@ func run() error {
serverErr = <-serverErrCh
case serverErr = <-serverErrCh:
fmt.Println("Shutting down stovepipe gateway server due to critical GRPC server error...")

// Cancel the context to signal cancellation to the queue consumer
cancel()
}

if serverErr != nil {
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr)
}

// Stop the consumer with a 30s timeout; by this time the context should be
// cancelled and the processing threads may already be exiting; recollect them.
errStop := logConsumer.Stop(30000)
if errStop != nil {
errStop = fmt.Errorf("failed to stop consumer: %w", errStop)
}

if errStop != nil || serverErr != nil {
// Override context cancellation error with the shutdown error. The server
// error is the primary/root failure, so it leads; the consumer-stop error
// is secondary cleanup.
err = errors.Join(serverErr, errStop)
}

return err
Expand Down
3 changes: 3 additions & 0 deletions stovepipe/core/topickey/topickey.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ const (
// TopicKeyBatch is the pipeline stage where validated commits are aggregated, since the last
// known green, into a contiguous validation batch.
TopicKeyBatch TopicKey = "batch"
// TopicKeyLog is the gateway-owned sink topic for append-only request log events. The
// orchestrator publishes log entries here; the gateway is the sole consumer that persists them.
TopicKeyLog TopicKey = "log"
)
2 changes: 2 additions & 0 deletions stovepipe/gateway/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//platform/metrics",
"//stovepipe/core/topickey",
"//stovepipe/entity",
"//stovepipe/extension/storage",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
Expand All @@ -39,6 +40,7 @@ go_test(
"//platform/extension/messagequeue/mock",
"//stovepipe/core/topickey",
"//stovepipe/entity",
"//stovepipe/extension/storage/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
Expand Down
Loading
Loading