Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bot/metrics/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ var (
ForwardedDashboardMessages = newCounter("forwarded_dashboard_messages")

Events = newCounterVec("events", "event_type")
KafkaBatchSize = newHistogram("kafka_batch_size")
KafkaMessages = newHistogramVec("kafka_messages", "topic")
StreamBatchSize = newHistogram("stream_batch_size")
StreamMessages = newHistogramVec("stream_messages", "stream")

CategoryUpdates = newCounter("category_updates")

Expand Down
16 changes: 9 additions & 7 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,23 @@ func main() {

var wg sync.WaitGroup

hostname, _ := os.Hostname()

rpcClient, err := rpc.NewClient(
logger.With(zap.String("service", "rpc")),
rpc.Config{
Brokers: config.Conf.Kafka.Brokers,
Redis: redis.Client,
ConsumerGroup: "worker",
ConsumerConcurrency: config.Conf.Kafka.GoroutineLimit,
ConsumerName: hostname,
ConsumerConcurrency: config.Conf.Streams.GoroutineLimit,
MaxLen: 50000,
},
map[string]rpc.Listener{
// Listen for gateway events over Kafka
config.Conf.Kafka.EventsTopic: event.NewKafkaListener(
logger.With(zap.String("service", "gateway-events-kafka")),
"stream:gateway-events": event.NewEventListener(
logger.With(zap.String("service", "gateway-events")),
&pgCache,
),
// TODO: Don't hardcode
"tickets.rpc.categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger),
"stream:rpc:categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger),
})

if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,9 @@ type (
Threads int `env:"THREADS"`
} `envPrefix:"WORKER_REDIS_"`

Kafka struct {
Brokers []string `env:"BROKERS"`
EventsTopic string `env:"EVENTS_TOPIC"`
GoroutineLimit int `env:"GOROUTINE_LIMIT" envDefault:"1000"`
} `envPrefix:"KAFKA_"`
Streams struct {
GoroutineLimit int `env:"STREAMS_GOROUTINE_LIMIT" envDefault:"1000"`
}

Prometheus struct {
Address string `env:"PROMETHEUS_SERVER_ADDR"`
Expand Down
12 changes: 6 additions & 6 deletions event/kafkalisten.go → event/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,25 @@ import (
"go.uber.org/zap"
)

type KafkaConsumer struct {
type EventListener struct {
logger *zap.Logger
cache *cache.PgCache
}

var _ rpc.Listener = (*KafkaConsumer)(nil)
var _ rpc.Listener = (*EventListener)(nil)

func NewKafkaListener(logger *zap.Logger, cache *cache.PgCache) *KafkaConsumer {
return &KafkaConsumer{
func NewEventListener(logger *zap.Logger, cache *cache.PgCache) *EventListener {
return &EventListener{
logger: logger,
cache: cache,
}
}

func (k *KafkaConsumer) BuildContext() (context.Context, context.CancelFunc) {
func (k *EventListener) BuildContext() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
}

func (k *KafkaConsumer) HandleMessage(ctx context.Context, message []byte) {
func (k *EventListener) HandleMessage(ctx context.Context, message []byte) {
var event eventforwarding.Event
if err := json.Unmarshal(message, &event); err != nil {
k.logger.Error("Failed to unmarshal event", zap.Error(err))
Expand Down
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.25.0

replace github.com/TicketsBot-cloud/database => ../database

//replace github.com/TicketsBot-cloud/common => ../common
replace github.com/TicketsBot-cloud/common => ../common

//replace github.com/TicketsBot-cloud/gdl => ../gdl

Expand Down Expand Up @@ -107,7 +107,6 @@ require (
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/philhofer/fwd v1.2.0 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.63.0 // indirect
Expand All @@ -119,8 +118,6 @@ require (
github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9 // indirect
github.com/tinylib/msgp v1.4.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/twmb/franz-go v1.19.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0
github.com/ReneKroon/ttlcache v1.6.0/go.mod h1:DG6nbhXKUQhrExfwwLuZUdH7UnRDDRA1IW+nBuCssvs=
github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704 h1:liLfvCrzoJ89DXFHzsd1iK3cyP8s4i0CnZPRFEj53zg=
github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704/go.mod h1:Mux1bEPpOHwRw1wo6Fa6qJLJH9Erk9qv1yAIfLi1Wmw=
github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7 h1:dFmPLk9KXGRVSfiuKK6kusVhRKG7nGfiGld+WRuiX7w=
github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7/go.mod h1:jXGcmAuRvv92YqITskvClgoCpFVqYw14CKJdYhiLtVU=
github.com/TicketsBot-cloud/gdl v0.0.0-20260306134952-cccb0116fef6 h1:ucG0xLPt7xixW7/LvL0hXDBDouDRS1Nf+77qP8iJ/X0=
github.com/TicketsBot-cloud/gdl v0.0.0-20260306134952-cccb0116fef6/go.mod h1:CdwBR2egPtxUXjD2CgC9ZwfuB8dz9HPePM8nuG6dt7Y=
github.com/TicketsBot-cloud/logarchiver v0.0.0-20251018211319-7a7df5cacbdc h1:qTLNpCvIqM7UwZ6MdWQ9EztcDsIJfHh+VJdG+ULLEaA=
Expand Down Expand Up @@ -280,8 +278,6 @@ github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM=
github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -350,10 +346,6 @@ github.com/tinylib/msgp v1.4.0 h1:SYOeDRiydzOw9kSiwdYp9UcBgPFtLU2WDHaJXyHruf8=
github.com/tinylib/msgp v1.4.0/go.mod h1:cvjFkb4RiC8qSBOPMGPSzSAx47nAsfhLVTCZZNuHv5o=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/twmb/franz-go v1.19.0 h1:FzBAPUeaip68X9cbLDesgQesa5zxKVaZMk+du98vj3c=
github.com/twmb/franz-go v1.19.0/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
Expand Down
2 changes: 1 addition & 1 deletion locale
Submodule locale updated 47 files
+38 −33 ar-SA.json
+25 −20 az-AZ.json
+27 −22 bg-BG.json
+39 −34 ca-ES.json
+34 −29 cs-CZ.json
+26 −21 cy-GB.json
+40 −35 da-DK.json
+26 −21 de-CH.json
+43 −38 de-DE.json
+29 −24 el-GR.json
+25 −20 en-GB.json
+25 −20 en-Tok.json
+25 −20 eo-UY.json
+26 −21 es-ES.json
+26 −21 et-EE.json
+25 −20 fa-IR.json
+25 −20 fi-FI.json
+34 −29 fr-FR.json
+26 −21 he-IL.json
+25 −20 hi-IN.json
+25 −20 hi-LS.json
+27 −22 hr-HR.json
+26 −21 hu-HU.json
+26 −21 id-ID.json
+26 −21 it-IT.json
+27 −22 ja-JP.json
+27 −22 ko-KR.json
+27 −22 lt-LT.json
+25 −20 lv-LV.json
+25 −20 ne-NP.json
+85 −80 nl-NL.json
+26 −21 no-NO.json
+25 −20 pl-PL.json
+26 −21 pt-BR.json
+26 −21 pt-PT.json
+26 −21 ro-RO.json
+27 −22 ru-RU.json
+26 −21 sk-SK.json
+26 −21 sl-SI.json
+28 −23 sr-SP.json
+26 −21 sv-SE.json
+26 −21 th-TH.json
+29 −24 tr-TR.json
+27 −22 uk-UA.json
+28 −23 vi-VN.json
+28 −23 zh-CN.json
+27 −22 zh-TW.json
Loading