Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ require (
github.com/jackc/pgx/v4 v4.18.3
github.com/klauspost/compress v1.17.8
github.com/panjf2000/ants/v2 v2.10.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.2
github.com/twmb/franz-go v1.18.0
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.13.0
golang.org/x/sync v0.9.0
Expand All @@ -44,10 +42,9 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
go.uber.org/multierr v1.5.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1
github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c h1:Gcce/r5tSQeprxswXXOwQ/RBU1bjQWVd9dB7QKoPXBE=
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c/go.mod h1:1iCZ0433JJMecYqCa+TdWA9Pax8MGl4ByuNDZ7eSnQY=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -204,10 +202,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9 h1:i2aD44Moa5N5pt/WNwHLvIklzPymtr8vkkBlVdNElUE=
github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9/go.mod h1:6HrfShlf4bKeQEFdWn4JP/yet/mHW2RhxOQf0e3HWA0=
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
38 changes: 19 additions & 19 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package rpc

import (
"context"
"fmt"
"os"

"github.com/TicketsBot-cloud/common/utils"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/go-redis/redis/v8"
"go.uber.org/atomic"
"go.uber.org/zap"
)

type Client struct {
config Config
client *kgo.Client
redis *redis.Client
logger *zap.Logger

consumerRunning *atomic.Bool
Expand All @@ -21,39 +22,38 @@ type Client struct {
}

type Config struct {
Brokers []string
Redis *redis.Client
ConsumerGroup string
ConsumerName string
ConsumerConcurrency int
MaxLen int64
}

func NewClient(logger *zap.Logger, config Config, listeners map[string]Listener) (*Client, error) {
kafkaClient, err := connectKafka(config.Brokers, config.ConsumerGroup, utils.Keys(listeners))
if err != nil {
return nil, err
if config.ConsumerName == "" {
hostname, _ := os.Hostname()
config.ConsumerName = hostname
}

ctx := context.Background()
for stream := range listeners {
err := config.Redis.XGroupCreateMkStream(ctx, stream, config.ConsumerGroup, "$").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
return nil, fmt.Errorf("create consumer group for stream %s: %w", stream, err)
}
}

return &Client{
config: config,
client: kafkaClient,
redis: config.Redis,
logger: logger,
consumerRunning: atomic.NewBool(false),
listeners: listeners,
}, nil
}

func (c *Client) Shutdown() {
c.client.Close()

if c.cancelFunc != nil {
c.cancelFunc()
}
}

func connectKafka(brokers []string, consumerGroup string, topics []string) (*kgo.Client, error) {
return kgo.NewClient(
kgo.SeedBrokers(brokers...),
kgo.ConsumerGroup(consumerGroup),
kgo.ConsumeTopics(topics...),
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
)
}
200 changes: 166 additions & 34 deletions rpc/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,25 @@ package rpc
import (
"context"
"errors"
"time"

"github.com/go-redis/redis/v8"
"github.com/panjf2000/ants/v2"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
)

const maxEventsPerPoll = 100
const (
maxEventsPerPoll = 100
pollBlockDuration = 1 * time.Second
maintenanceInterval = 30 * time.Second
autoClaimMinIdle = 5 * time.Minute
autoClaimBatchSize = 100
defaultMaxLen = int64(50000)
)

func (c *Client) StartConsumer() {
if c.consumerRunning.Swap(true) {
c.logger.Fatal("Kafka client already running")
c.logger.Fatal("Consumer already running")
return
}

Expand All @@ -25,64 +34,187 @@ func (c *Client) StartConsumer() {
return
}

streams := make([]string, 0, len(c.listeners)*2)
for stream := range c.listeners {
streams = append(streams, stream)
}
for range c.listeners {
streams = append(streams, ">")
}

for stream := range c.listeners {
go c.maintenanceLoop(ctx, stream)
}

for {
select {
case <-ctx.Done():
return
default:
records, err := c.poll(ctx)
messages, err := c.poll(ctx, streams)
if err != nil {
if errors.Is(err, kgo.ErrClientClosed) {
c.logger.Info("Kafka client closed, stopping read loop")
return
} else if errors.Is(err, context.Canceled) {
if errors.Is(err, context.Canceled) {
c.logger.Info("Context cancelled, stopping read loop")
return
} else {
c.logger.Error("Failed to poll records", zap.Error(err))
continue
}
c.logger.Error("Failed to poll records", zap.Error(err))
continue
}

for _, record := range records {
listener, ok := c.listeners[record.Topic]
for _, xStream := range messages {
listener, ok := c.listeners[xStream.Stream]
if !ok {
c.logger.Warn("No listener found for topic", zap.String("topic", record.Topic))
c.logger.Warn("No listener found for stream", zap.String("stream", xStream.Stream))
continue
}

value := record.Value
if err := pool.Submit(func() {
ctx, cancel := listener.BuildContext()
defer cancel()
streamName := xStream.Stream
for _, msg := range xStream.Messages {
value, ok := msg.Values["data"]
if !ok {
c.logger.Warn("Message missing data field", zap.String("stream", streamName), zap.String("id", msg.ID))
c.redis.XAck(ctx, streamName, c.config.ConsumerGroup, msg.ID)
continue
}

listener.HandleMessage(ctx, value)
}); err != nil {
c.logger.Error("Failed to submit task to worker pool", zap.Error(err))
continue
data := []byte(value.(string))
msgID := msg.ID

if err := pool.Submit(func() {
listenerCtx, listenerCancel := listener.BuildContext()
defer listenerCancel()

listener.HandleMessage(listenerCtx, data)
c.redis.XAck(ctx, streamName, c.config.ConsumerGroup, msgID)
}); err != nil {
c.logger.Error("Failed to submit task to worker pool", zap.Error(err))
continue
}
}
}
}
}
}

func (c *Client) poll(ctx context.Context) ([]*kgo.Record, error) {
fetches := c.client.PollRecords(ctx, maxEventsPerPoll)
if fetches.IsClientClosed() {
return nil, kgo.ErrClientClosed
}
func (c *Client) poll(ctx context.Context, streams []string) ([]redis.XStream, error) {
result, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.config.ConsumerGroup,
Consumer: c.config.ConsumerName,
Streams: streams,
Count: maxEventsPerPoll,
Block: pollBlockDuration,
}).Result()

if err := fetches.Err(); err != nil {
if err != nil {
if errors.Is(err, redis.Nil) {
return nil, nil
}
return nil, err
}

records := make([]*kgo.Record, 0, fetches.NumRecords())
return result, nil
}

func (c *Client) maintenanceLoop(ctx context.Context, stream string) {
ticker := time.NewTicker(maintenanceInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.trimStream(ctx, stream)
c.autoClaimStale(ctx, stream)
}
}
}

func (c *Client) trimStream(ctx context.Context, stream string) {
maxLen := c.config.MaxLen
if maxLen <= 0 {
maxLen = defaultMaxLen
}

trimmed, err := c.redis.XTrimMaxLenApprox(ctx, stream, maxLen, 0).Result()
if err != nil {
if !errors.Is(err, context.Canceled) {
c.logger.Warn("Failed to trim stream",
zap.String("stream", stream),
zap.Error(err))
}
return
}

if trimmed > 0 {
c.logger.Debug("Trimmed stream",
zap.String("stream", stream),
zap.Int64("trimmed", trimmed))
}
}

iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
records = append(records, record)
func (c *Client) autoClaimStale(ctx context.Context, stream string) {
// go-redis/v8's XAutoClaim parser expects 2 response elements, but Redis 7+
// returns 3 (messages, next start ID, deleted entry IDs). Use a raw Do call
// and parse only the messages array ourselves.
result, err := c.redis.Do(ctx,
"XAUTOCLAIM", stream, c.config.ConsumerGroup, c.config.ConsumerName,
int64(autoClaimMinIdle/time.Millisecond), "0-0", "COUNT", autoClaimBatchSize,
).Result()

if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, redis.Nil) {
c.logger.Warn("Failed to auto-claim stale messages",
zap.String("stream", stream),
zap.Error(err))
}
return
}

return records, nil
parts, ok := result.([]interface{})
if !ok || len(parts) < 2 {
return
}

msgs, ok := parts[1].([]interface{})
if !ok || len(msgs) == 0 {
return
}

listener, ok := c.listeners[stream]
if !ok {
return
}

c.logger.Info("Auto-claimed stale messages",
zap.String("stream", stream),
zap.Int("count", len(msgs)))

for _, raw := range msgs {
entry, ok := raw.([]interface{})
if !ok || len(entry) < 2 {
continue
}

msgID, _ := entry[0].(string)
fields, _ := entry[1].([]interface{})

var data string
for i := 0; i+1 < len(fields); i += 2 {
if key, _ := fields[i].(string); key == "data" {
data, _ = fields[i+1].(string)
break
}
}

if data == "" {
c.redis.XAck(ctx, stream, c.config.ConsumerGroup, msgID)
continue
}

listenerCtx, listenerCancel := listener.BuildContext()
listener.HandleMessage(listenerCtx, []byte(data))
listenerCancel()
c.redis.XAck(ctx, stream, c.config.ConsumerGroup, msgID)
}
}
Loading
Loading