Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bot/metrics/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ var (
ForwardedDashboardMessages = newCounter("forwarded_dashboard_messages")

Events = newCounterVec("events", "event_type")
KafkaBatchSize = newHistogram("kafka_batch_size")
KafkaMessages = newHistogramVec("kafka_messages", "topic")
StreamBatchSize = newHistogram("stream_batch_size")
StreamMessages = newHistogramVec("stream_messages", "stream")

CategoryUpdates = newCounter("category_updates")
)
Expand Down
16 changes: 9 additions & 7 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,23 @@ func main() {

var wg sync.WaitGroup

hostname, _ := os.Hostname()

rpcClient, err := rpc.NewClient(
logger.With(zap.String("service", "rpc")),
rpc.Config{
Brokers: config.Conf.Kafka.Brokers,
Redis: redis.Client,
ConsumerGroup: "worker",
ConsumerConcurrency: config.Conf.Kafka.GoroutineLimit,
ConsumerName: hostname,
ConsumerConcurrency: config.Conf.Streams.GoroutineLimit,
MaxLen: 50000,
},
map[string]rpc.Listener{
// Listen for gateway events over Kafka
config.Conf.Kafka.EventsTopic: event.NewKafkaListener(
logger.With(zap.String("service", "gateway-events-kafka")),
"stream:gateway-events": event.NewEventListener(
logger.With(zap.String("service", "gateway-events")),
&pgCache,
),
// TODO: Don't hardcode
"tickets.rpc.categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger),
"stream:rpc:categoryupdate": listeners.NewTicketStatusUpdater(&pgCache, logger),
})

if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ type (
Threads int `env:"THREADS"`
} `envPrefix:"WORKER_REDIS_"`

Kafka struct {
Brokers []string `env:"BROKERS"`
EventsTopic string `env:"EVENTS_TOPIC"`
GoroutineLimit int `env:"GOROUTINE_LIMIT" envDefault:"1000"`
} `envPrefix:"KAFKA_"`
Streams struct {
GoroutineLimit int `env:"STREAMS_GOROUTINE_LIMIT" envDefault:"1000"`
}

Prometheus struct {
Address string `env:"PROMETHEUS_SERVER_ADDR"`
Expand Down
12 changes: 6 additions & 6 deletions event/kafkalisten.go → event/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,25 @@ import (
"go.uber.org/zap"
)

type KafkaConsumer struct {
type EventListener struct {
logger *zap.Logger
cache *cache.PgCache
}

var _ rpc.Listener = (*KafkaConsumer)(nil)
var _ rpc.Listener = (*EventListener)(nil)

func NewKafkaListener(logger *zap.Logger, cache *cache.PgCache) *KafkaConsumer {
return &KafkaConsumer{
func NewEventListener(logger *zap.Logger, cache *cache.PgCache) *EventListener {
return &EventListener{
logger: logger,
cache: cache,
}
}

func (k *KafkaConsumer) BuildContext() (context.Context, context.CancelFunc) {
func (k *EventListener) BuildContext() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
}

func (k *KafkaConsumer) HandleMessage(ctx context.Context, message []byte) {
func (k *EventListener) HandleMessage(ctx context.Context, message []byte) {
var event eventforwarding.Event
if err := json.Unmarshal(message, &event); err != nil {
k.logger.Error("Failed to unmarshal event", zap.Error(err))
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
cloud.google.com/go/profiler v0.4.2
github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c
github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704
github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7
github.com/TicketsBot-cloud/common v0.0.0-20260620182815-55fda9a14c01
github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7
github.com/TicketsBot-cloud/gdl v0.0.0-20260426095953-999472e6e538
github.com/caarlos0/env/v10 v10.0.0
Expand Down Expand Up @@ -127,8 +127,6 @@ require (
github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9 // indirect
github.com/tinylib/msgp v1.4.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/twmb/franz-go v1.19.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c
github.com/TicketsBot-cloud/analytics-client v0.0.0-20250604180646-6606dfc8fc8c/go.mod h1:zecIz09jVDSHyhV6NYgTko0NEN0QJGiZbzcxHRjQLzc=
github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704 h1:liLfvCrzoJ89DXFHzsd1iK3cyP8s4i0CnZPRFEj53zg=
github.com/TicketsBot-cloud/archiverclient v0.0.0-20251015181023-f0b66a074704/go.mod h1:Mux1bEPpOHwRw1wo6Fa6qJLJH9Erk9qv1yAIfLi1Wmw=
github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7 h1:dFmPLk9KXGRVSfiuKK6kusVhRKG7nGfiGld+WRuiX7w=
github.com/TicketsBot-cloud/common v0.0.0-20260412182419-83b9a6ea08e7/go.mod h1:jXGcmAuRvv92YqITskvClgoCpFVqYw14CKJdYhiLtVU=
github.com/TicketsBot-cloud/common v0.0.0-20260620182815-55fda9a14c01 h1:BTBphQxBilbCjiNUYHd67kG5yCdEQ9YRAvzRBxOgjkA=
github.com/TicketsBot-cloud/common v0.0.0-20260620182815-55fda9a14c01/go.mod h1:IA3watwJMLySOt5tsO5GPwAsHAFw6x9aqchxNWUqlog=
github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7 h1:35PmSSlrSN+DOLNEnkRw4vSg1qxnnlEKtMlOaOUjMOQ=
github.com/TicketsBot-cloud/database v0.0.0-20260423165031-495c2e8a5bc7/go.mod h1:HQXAgmNSm7/FmBYwcsa6qpZqMrDhbLoEl+AyqFQ+RwY=
github.com/TicketsBot-cloud/gdl v0.0.0-20260426095953-999472e6e538 h1:ewKw1Wv1x/yi8h1IH7EofcYnPUxWZaVTIPFkT97nhn0=
Expand Down Expand Up @@ -379,10 +379,6 @@ github.com/tinylib/msgp v1.4.0 h1:SYOeDRiydzOw9kSiwdYp9UcBgPFtLU2WDHaJXyHruf8=
github.com/tinylib/msgp v1.4.0/go.mod h1:cvjFkb4RiC8qSBOPMGPSzSAx47nAsfhLVTCZZNuHv5o=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/twmb/franz-go v1.19.0 h1:FzBAPUeaip68X9cbLDesgQesa5zxKVaZMk+du98vj3c=
github.com/twmb/franz-go v1.19.0/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
Expand Down
Loading