diff --git a/bot/metrics/prometheus/prometheus.go b/bot/metrics/prometheus/prometheus.go index 11628667..3d0070b4 100644 --- a/bot/metrics/prometheus/prometheus.go +++ b/bot/metrics/prometheus/prometheus.go @@ -35,8 +35,8 @@ var ( ForwardedDashboardMessages = newCounter("forwarded_dashboard_messages") Events = newCounterVec("events", "event_type") - KafkaBatchSize = newHistogram("kafka_batch_size") - KafkaMessages = newHistogramVec("kafka_messages", "topic") + StreamBatchSize = newHistogram("stream_batch_size") + StreamMessages = newHistogramVec("stream_messages", "stream") CategoryUpdates = newCounter("category_updates") ) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index dd43762a..99412d46 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -171,21 +171,23 @@ func main() { var wg sync.WaitGroup + hostname, _ := os.Hostname() + rpcClient, err := rpc.NewClient( logger.With(zap.String("service", "rpc")), rpc.Config{ - Brokers: config.Conf.Kafka.Brokers, + Redis: redis.Client, ConsumerGroup: "worker", - ConsumerConcurrency: config.Conf.Kafka.GoroutineLimit, + ConsumerName: hostname, + ConsumerConcurrency: config.Conf.Streams.GoroutineLimit, + MaxLen: 50000, }, map[string]rpc.Listener{ - // Listen for gateway events over Kafka - config.Conf.Kafka.EventsTopic: event.NewKafkaListener( - logger.With(zap.String("service", "gateway-events-kafka")), + "stream:gateway-events": event.NewEventListener( + logger.With(zap.String("service", "gateway-events")), &pgCache, ), - // TODO: Don't hardcode - "tickets.rpc.categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger), + "stream:rpc:categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger), }) if err != nil { diff --git a/config/config.go b/config/config.go index 0f6be9a2..48c89761 100644 --- a/config/config.go +++ b/config/config.go @@ -94,11 +94,9 @@ type ( Threads int `env:"THREADS"` } `envPrefix:"WORKER_REDIS_"` - Kafka struct { - Brokers []string `env:"BROKERS"` - EventsTopic string `env:"EVENTS_TOPIC"` - GoroutineLimit int `env:"GOROUTINE_LIMIT" envDefault:"1000"` - } `envPrefix:"KAFKA_"` + Streams struct { + GoroutineLimit int `env:"STREAMS_GOROUTINE_LIMIT" envDefault:"1000"` + } Prometheus struct { Address string `env:"PROMETHEUS_SERVER_ADDR"` diff --git a/event/kafkalisten.go b/event/listener.go similarity index 75% rename from event/kafkalisten.go rename to event/listener.go index a2639b62..e1a9e1dc 100644 --- a/event/kafkalisten.go +++ b/event/listener.go @@ -10,25 +10,25 @@ import ( "go.uber.org/zap" ) -type KafkaConsumer struct { +type EventListener struct { logger *zap.Logger cache *cache.PgCache } -var _ rpc.Listener = (*KafkaConsumer)(nil) +var _ rpc.Listener = (*EventListener)(nil) -func NewKafkaListener(logger *zap.Logger, cache *cache.PgCache) *KafkaConsumer { - return &KafkaConsumer{ +func NewEventListener(logger *zap.Logger, cache *cache.PgCache) *EventListener { + return &EventListener{ logger: logger, cache: cache, } } -func (k *KafkaConsumer) BuildContext() (context.Context, context.CancelFunc) { +func (k *EventListener) BuildContext() (context.Context, context.CancelFunc) { return context.WithCancel(context.Background()) } -func (k *KafkaConsumer) HandleMessage(ctx context.Context, message []byte) { +func (k *EventListener) HandleMessage(ctx context.Context, message []byte) { var event eventforwarding.Event if err := json.Unmarshal(message, &event); err != nil { k.logger.Error("Failed to unmarshal event", zap.Error(err)) diff --git a/go.mod b/go.mod index 6e25d236..9a44e6b1 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( cloud.google.com/go/profiler v0.4.2 github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704 - github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7 + github.com/TicketsBot-cloud/common v0.0.0-20260620182815-55fda9a14c01 github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7 github.com/TicketsBot-cloud/gdl v0.0.0-20260426095953-999472e6e538 github.com/caarlos0/env/v10 v10.0.0 @@ -127,8 +127,6 @@ require ( github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9 // indirect github.com/tinylib/msgp v1.4.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/twmb/franz-go v1.19.0 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect github.com/ugorji/go/codec v1.2.12 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect diff --git a/go.sum b/go.sum index 76ad0ccc..3fcf4548 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,8 @@ github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c/go.mod h1:zecIz09jVDSHyhV6NYgTko0NEN0QJGiZbzcxHRjQLzc= github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704 h1:liLfvCrzoJ89DXFHzsd1iK3cyP8s4i0CnZPRFEj53zg= github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704/go.mod h1:Mux1bEPpOHwRw1wo6Fa6qJLJH9Erk9qv1yAIfLi1Wmw= -github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7 h1:dFmPLk9KXGRVSfiuKK6kusVhRKG7nGfiGld+WRuiX7w= -github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7/go.mod h1:jXGcmAuRvv92YqITskvClgoCpFVqYw14CKJdYhiLtVU= +github.com/TicketsBot-cloud/common v0.0.0-20260620182815-55fda9a14c01 h1:BTBphQxBilbCjiNUYHd67kG5yCdEQ9YRAvzRBxOgjkA= +github.com/TicketsBot-cloud/common v0.0.0-20260620182815-55fda9a14c01/go.mod h1:IA3watwJMLySOt5tsO5GPwAsHAFw6x9aqchxNWUqlog= github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7 h1:35PmSSlrSN+DOLNEnkRw4vSg1qxnnlEKtMlOaOUjMOQ= github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7/go.mod h1:HQXAgmNSm7/FmBYwcsa6qpZqMrDhbLoEl+AyqFQ+RwY= github.com/TicketsBot-cloud/gdl v0.0.0-20260426095953-999472e6e538 h1:ewKw1Wv1x/yi8h1IH7EofcYnPUxWZaVTIPFkT97nhn0= @@ -379,10 +379,6 @@ github.com/tinylib/msgp v1.4.0 h1:SYOeDRiydzOw9kSiwdYp9UcBgPFtLU2WDHaJXyHruf8= github.com/tinylib/msgp v1.4.0/go.mod h1:cvjFkb4RiC8qSBOPMGPSzSAx47nAsfhLVTCZZNuHv5o= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= -github.com/twmb/franz-go v1.19.0 h1:FzBAPUeaip68X9cbLDesgQesa5zxKVaZMk+du98vj3c= -github.com/twmb/franz-go v1.19.0/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= -github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= -github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=