From 05c4a4f657e2466c2a9642308ebfbc85dcd69c45 Mon Sep 17 00:00:00 2001 From: Ben Date: Sat, 20 Jun 2026 17:57:43 +0100 Subject: [PATCH 1/3] feat: redis Signed-off-by: Ben --- bot/metrics/prometheus/prometheus.go | 4 ++-- cmd/worker/main.go | 16 +++++++++------- config/config.go | 9 ++++----- event/{kafkalisten.go => listener.go} | 12 ++++++------ go.mod | 4 +--- go.sum | 6 ------ 6 files changed, 22 insertions(+), 29 deletions(-) rename event/{kafkalisten.go => listener.go} (75%) diff --git a/bot/metrics/prometheus/prometheus.go b/bot/metrics/prometheus/prometheus.go index 11628667..3d0070b4 100644 --- a/bot/metrics/prometheus/prometheus.go +++ b/bot/metrics/prometheus/prometheus.go @@ -35,8 +35,8 @@ var ( ForwardedDashboardMessages = newCounter("forwarded_dashboard_messages") Events = newCounterVec("events", "event_type") - KafkaBatchSize = newHistogram("kafka_batch_size") - KafkaMessages = newHistogramVec("kafka_messages", "topic") + StreamBatchSize = newHistogram("stream_batch_size") + StreamMessages = newHistogramVec("stream_messages", "stream") CategoryUpdates = newCounter("category_updates") ) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index dd43762a..dfda57ae 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -171,21 +171,23 @@ func main() { var wg sync.WaitGroup + hostname, _ := os.Hostname() + rpcClient, err := rpc.NewClient( logger.With(zap.String("service", "rpc")), rpc.Config{ - Brokers: config.Conf.Kafka.Brokers, + Redis: redis.Client, ConsumerGroup: "worker", - ConsumerConcurrency: config.Conf.Kafka.GoroutineLimit, + ConsumerName: hostname, + ConsumerConcurrency: config.Conf.Streams.GoroutineLimit, + MaxLen: 50000, }, map[string]rpc.Listener{ - // Listen for gateway events over Kafka - config.Conf.Kafka.EventsTopic: event.NewKafkaListener( - logger.With(zap.String("service", "gateway-events-kafka")), + config.Conf.Streams.EventsStream: event.NewEventListener( + logger.With(zap.String("service", "gateway-events")), &pgCache, ), - // TODO: Don't hardcode - "tickets.rpc.categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger), + "stream:rpc:categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger), }) if err != nil { diff --git a/config/config.go b/config/config.go index 0f6be9a2..a1c832eb 100644 --- a/config/config.go +++ b/config/config.go @@ -94,11 +94,10 @@ type ( Threads int `env:"THREADS"` } `envPrefix:"WORKER_REDIS_"` - Kafka struct { - Brokers []string `env:"BROKERS"` - EventsTopic string `env:"EVENTS_TOPIC"` - GoroutineLimit int `env:"GOROUTINE_LIMIT" envDefault:"1000"` - } `envPrefix:"KAFKA_"` + Streams struct { + EventsStream string `env:"STREAMS_EVENTS_STREAM" envDefault:"stream:gateway-events"` + GoroutineLimit int `env:"STREAMS_GOROUTINE_LIMIT" envDefault:"1000"` + } Prometheus struct { Address string `env:"PROMETHEUS_SERVER_ADDR"` diff --git a/event/kafkalisten.go b/event/listener.go similarity index 75% rename from event/kafkalisten.go rename to event/listener.go index a2639b62..e1a9e1dc 100644 --- a/event/kafkalisten.go +++ b/event/listener.go @@ -10,25 +10,25 @@ import ( "go.uber.org/zap" ) -type KafkaConsumer struct { +type EventListener struct { logger *zap.Logger cache *cache.PgCache } -var _ rpc.Listener = (*KafkaConsumer)(nil) +var _ rpc.Listener = (*EventListener)(nil) -func NewKafkaListener(logger *zap.Logger, cache *cache.PgCache) *KafkaConsumer { - return &KafkaConsumer{ +func NewEventListener(logger *zap.Logger, cache *cache.PgCache) *EventListener { + return &EventListener{ logger: logger, cache: cache, } } -func (k *KafkaConsumer) BuildContext() (context.Context, context.CancelFunc) { +func (k *EventListener) BuildContext() (context.Context, context.CancelFunc) { return context.WithCancel(context.Background()) } -func (k *KafkaConsumer) HandleMessage(ctx context.Context, message []byte) { +func (k *EventListener) HandleMessage(ctx context.Context, message []byte) { var event eventforwarding.Event if err := json.Unmarshal(message, &event); err != nil { k.logger.Error("Failed to unmarshal event", zap.Error(err)) diff --git a/go.mod b/go.mod index 6e25d236..5d98da8e 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.25.0 //replace github.com/TicketsBot-cloud/database => ../database -//replace github.com/TicketsBot-cloud/common => ../common +replace github.com/TicketsBot-cloud/common => ../common //replace github.com/TicketsBot-cloud/gdl => ../gdl @@ -127,8 +127,6 @@ require ( github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9 // indirect github.com/tinylib/msgp v1.4.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/twmb/franz-go v1.19.0 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect github.com/ugorji/go/codec v1.2.12 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect diff --git a/go.sum b/go.sum index 76ad0ccc..d543affa 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,6 @@ github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c/go.mod h1:zecIz09jVDSHyhV6NYgTko0NEN0QJGiZbzcxHRjQLzc= github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704 h1:liLfvCrzoJ89DXFHzsd1iK3cyP8s4i0CnZPRFEj53zg= github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704/go.mod h1:Mux1bEPpOHwRw1wo6Fa6qJLJH9Erk9qv1yAIfLi1Wmw= -github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7 h1:dFmPLk9KXGRVSfiuKK6kusVhRKG7nGfiGld+WRuiX7w= -github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7/go.mod h1:jXGcmAuRvv92YqITskvClgoCpFVqYw14CKJdYhiLtVU= github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7 h1:35PmSSlrSN+DOLNEnkRw4vSg1qxnnlEKtMlOaOUjMOQ= github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7/go.mod h1:HQXAgmNSm7/FmBYwcsa6qpZqMrDhbLoEl+AyqFQ+RwY= github.com/TicketsBot-cloud/gdl v0.0.0-20260426095953-999472e6e538 h1:ewKw1Wv1x/yi8h1IH7EofcYnPUxWZaVTIPFkT97nhn0= @@ -379,10 +377,6 @@ github.com/tinylib/msgp v1.4.0 h1:SYOeDRiydzOw9kSiwdYp9UcBgPFtLU2WDHaJXyHruf8= github.com/tinylib/msgp v1.4.0/go.mod h1:cvjFkb4RiC8qSBOPMGPSzSAx47nAsfhLVTCZZNuHv5o= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= -github.com/twmb/franz-go v1.19.0 h1:FzBAPUeaip68X9cbLDesgQesa5zxKVaZMk+du98vj3c= -github.com/twmb/franz-go v1.19.0/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= -github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= -github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= From a95c340dbcf2199ea8da3e966d8f024c29b4c926 Mon Sep 17 00:00:00 2001 From: biast12 <53872542+biast12@users.noreply.github.com> Date: Sat, 20 Jun 2026 19:20:16 +0200 Subject: [PATCH 2/3] Hardcode event stream key --- cmd/worker/main.go | 2 +- config/config.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index dfda57ae..99412d46 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -183,7 +183,7 @@ func main() { MaxLen: 50000, }, map[string]rpc.Listener{ - config.Conf.Streams.EventsStream: event.NewEventListener( + "stream:gateway-events": event.NewEventListener( logger.With(zap.String("service", "gateway-events")), &pgCache, ), diff --git a/config/config.go b/config/config.go index a1c832eb..48c89761 100644 --- a/config/config.go +++ b/config/config.go @@ -95,7 +95,6 @@ type ( } `envPrefix:"WORKER_REDIS_"` Streams struct { - EventsStream string `env:"STREAMS_EVENTS_STREAM" envDefault:"stream:gateway-events"` GoroutineLimit int `env:"STREAMS_GOROUTINE_LIMIT" envDefault:"1000"` } From 0ccb66dad1b1802e6d88adf42ed219dcb1c3dafb Mon Sep 17 00:00:00 2001 From: Ben Date: Sat, 20 Jun 2026 19:29:18 +0100 Subject: [PATCH 3/3] bump common Signed-off-by: Ben --- go.mod | 4 ++-- go.sum | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 5d98da8e..9a44e6b1 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.25.0 //replace github.com/TicketsBot-cloud/database => ../database -replace github.com/TicketsBot-cloud/common => ../common +//replace github.com/TicketsBot-cloud/common => ../common //replace github.com/TicketsBot-cloud/gdl => ../gdl @@ -16,7 +16,7 @@ require ( cloud.google.com/go/profiler v0.4.2 github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704 - github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7 + github.com/TicketsBot-cloud/common v0.0.0-20260620182815-55fda9a14c01 github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7 github.com/TicketsBot-cloud/gdl v0.0.0-20260426095953-999472e6e538 github.com/caarlos0/env/v10 v10.0.0 diff --git a/go.sum b/go.sum index d543affa..3fcf4548 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c/go.mod h1:zecIz09jVDSHyhV6NYgTko0NEN0QJGiZbzcxHRjQLzc= github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704 h1:liLfvCrzoJ89DXFHzsd1iK3cyP8s4i0CnZPRFEj53zg= github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704/go.mod h1:Mux1bEPpOHwRw1wo6Fa6qJLJH9Erk9qv1yAIfLi1Wmw= +github.com/TicketsBot-cloud/common v0.0.0-20260620182815-55fda9a14c01 h1:BTBphQxBilbCjiNUYHd67kG5yCdEQ9YRAvzRBxOgjkA= +github.com/TicketsBot-cloud/common v0.0.0-20260620182815-55fda9a14c01/go.mod h1:IA3watwJMLySOt5tsO5GPwAsHAFw6x9aqchxNWUqlog= github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7 h1:35PmSSlrSN+DOLNEnkRw4vSg1qxnnlEKtMlOaOUjMOQ= github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7/go.mod h1:HQXAgmNSm7/FmBYwcsa6qpZqMrDhbLoEl+AyqFQ+RwY= github.com/TicketsBot-cloud/gdl v0.0.0-20260426095953-999472e6e538 h1:ewKw1Wv1x/yi8h1IH7EofcYnPUxWZaVTIPFkT97nhn0=