diff --git a/bot/metrics/prometheus/prometheus.go b/bot/metrics/prometheus/prometheus.go index 23ff2cfa..938a45c1 100644 --- a/bot/metrics/prometheus/prometheus.go +++ b/bot/metrics/prometheus/prometheus.go @@ -35,8 +35,8 @@ var ( ForwardedDashboardMessages = newCounter("forwarded_dashboard_messages") Events = newCounterVec("events", "event_type") - KafkaBatchSize = newHistogram("kafka_batch_size") - KafkaMessages = newHistogramVec("kafka_messages", "topic") + StreamBatchSize = newHistogram("stream_batch_size") + StreamMessages = newHistogramVec("stream_messages", "stream") CategoryUpdates = newCounter("category_updates") diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 1c3c2adc..42be4b65 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -168,21 +168,23 @@ func main() { var wg sync.WaitGroup + hostname, _ := os.Hostname() + rpcClient, err := rpc.NewClient( logger.With(zap.String("service", "rpc")), rpc.Config{ - Brokers: config.Conf.Kafka.Brokers, + Redis: redis.Client, ConsumerGroup: "worker", - ConsumerConcurrency: config.Conf.Kafka.GoroutineLimit, + ConsumerName: hostname, + ConsumerConcurrency: config.Conf.Streams.GoroutineLimit, + MaxLen: 50000, }, map[string]rpc.Listener{ - // Listen for gateway events over Kafka - config.Conf.Kafka.EventsTopic: event.NewKafkaListener( - logger.With(zap.String("service", "gateway-events-kafka")), + "stream:gateway-events": event.NewEventListener( + logger.With(zap.String("service", "gateway-events")), &pgCache, ), - // TODO: Don't hardcode - "tickets.rpc.categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger), + "stream:rpc:categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger), }) if err != nil { diff --git a/config/config.go b/config/config.go index f0dfc571..dfbf9d6e 100644 --- a/config/config.go +++ b/config/config.go @@ -85,11 +85,9 @@ type ( Threads int `env:"THREADS"` } `envPrefix:"WORKER_REDIS_"` - Kafka struct { - Brokers []string `env:"BROKERS"` - EventsTopic string `env:"EVENTS_TOPIC"` - GoroutineLimit int `env:"GOROUTINE_LIMIT" envDefault:"1000"` - } `envPrefix:"KAFKA_"` + Streams struct { + GoroutineLimit int `env:"STREAMS_GOROUTINE_LIMIT" envDefault:"1000"` + } Prometheus struct { Address string `env:"PROMETHEUS_SERVER_ADDR"` diff --git a/event/kafkalisten.go b/event/listener.go similarity index 75% rename from event/kafkalisten.go rename to event/listener.go index a2639b62..e1a9e1dc 100644 --- a/event/kafkalisten.go +++ b/event/listener.go @@ -10,25 +10,25 @@ import ( "go.uber.org/zap" ) -type KafkaConsumer struct { +type EventListener struct { logger *zap.Logger cache *cache.PgCache } -var _ rpc.Listener = (*KafkaConsumer)(nil) +var _ rpc.Listener = (*EventListener)(nil) -func NewKafkaListener(logger *zap.Logger, cache *cache.PgCache) *KafkaConsumer { - return &KafkaConsumer{ +func NewEventListener(logger *zap.Logger, cache *cache.PgCache) *EventListener { + return &EventListener{ logger: logger, cache: cache, } } -func (k *KafkaConsumer) BuildContext() (context.Context, context.CancelFunc) { +func (k *EventListener) BuildContext() (context.Context, context.CancelFunc) { return context.WithCancel(context.Background()) } -func (k *KafkaConsumer) HandleMessage(ctx context.Context, message []byte) { +func (k *EventListener) HandleMessage(ctx context.Context, message []byte) { var event eventforwarding.Event if err := json.Unmarshal(message, &event); err != nil { k.logger.Error("Failed to unmarshal event", zap.Error(err)) diff --git a/go.mod b/go.mod index 64ee3df1..257ff475 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.25.0 replace github.com/TicketsBot-cloud/database => ../database -//replace github.com/TicketsBot-cloud/common => ../common +replace github.com/TicketsBot-cloud/common => ../common //replace github.com/TicketsBot-cloud/gdl => ../gdl @@ -107,7 +107,6 @@ require ( github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/philhofer/fwd v1.2.0 // indirect - github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.63.0 // indirect @@ -119,8 +118,6 @@ require ( github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9 // indirect github.com/tinylib/msgp v1.4.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/twmb/franz-go v1.19.0 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect github.com/ugorji/go/codec v1.2.12 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect diff --git a/go.sum b/go.sum index 5bc22a9a..d3d572b2 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,6 @@ github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0 github.com/ReneKroon/ttlcache v1.6.0/go.mod h1:DG6nbhXKUQhrExfwwLuZUdH7UnRDDRA1IW+nBuCssvs= github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704 h1:liLfvCrzoJ89DXFHzsd1iK3cyP8s4i0CnZPRFEj53zg= github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704/go.mod h1:Mux1bEPpOHwRw1wo6Fa6qJLJH9Erk9qv1yAIfLi1Wmw= -github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7 h1:dFmPLk9KXGRVSfiuKK6kusVhRKG7nGfiGld+WRuiX7w= -github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7/go.mod h1:jXGcmAuRvv92YqITskvClgoCpFVqYw14CKJdYhiLtVU= github.com/TicketsBot-cloud/gdl v0.0.0-20260306134952-cccb0116fef6 h1:ucG0xLPt7xixW7/LvL0hXDBDouDRS1Nf+77qP8iJ/X0= github.com/TicketsBot-cloud/gdl v0.0.0-20260306134952-cccb0116fef6/go.mod h1:CdwBR2egPtxUXjD2CgC9ZwfuB8dz9HPePM8nuG6dt7Y= github.com/TicketsBot-cloud/logarchiver v0.0.0-20251018211319-7a7df5cacbdc h1:qTLNpCvIqM7UwZ6MdWQ9EztcDsIJfHh+VJdG+ULLEaA= @@ -280,8 +278,6 @@ github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0 github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM= github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= -github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -350,10 +346,6 @@ github.com/tinylib/msgp v1.4.0 h1:SYOeDRiydzOw9kSiwdYp9UcBgPFtLU2WDHaJXyHruf8= github.com/tinylib/msgp v1.4.0/go.mod h1:cvjFkb4RiC8qSBOPMGPSzSAx47nAsfhLVTCZZNuHv5o= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= -github.com/twmb/franz-go v1.19.0 h1:FzBAPUeaip68X9cbLDesgQesa5zxKVaZMk+du98vj3c= -github.com/twmb/franz-go v1.19.0/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= -github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= -github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= diff --git a/locale b/locale index 6d572d7e..087963f3 160000 --- a/locale +++ b/locale @@ -1 +1 @@ -Subproject commit 6d572d7ee1353b6d0f084a4e8574d0b0d8d7fd40 +Subproject commit 087963f3a474f70d655383103cb044881603e071