diff --git a/go.mod b/go.mod index a0442be..c722df5 100644 --- a/go.mod +++ b/go.mod @@ -15,10 +15,8 @@ require ( github.com/jackc/pgx/v4 v4.18.3 github.com/klauspost/compress v1.17.8 github.com/panjf2000/ants/v2 v2.10.0 - github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.2 - github.com/twmb/franz-go v1.18.0 go.uber.org/atomic v1.6.0 go.uber.org/zap v1.13.0 golang.org/x/sync v0.9.0 @@ -44,10 +42,9 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect go.uber.org/multierr v1.5.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f // indirect diff --git a/go.sum b/go.sum index 0fd308e..a7e6cd6 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,6 @@ github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1 github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c h1:Gcce/r5tSQeprxswXXOwQ/RBU1bjQWVd9dB7QKoPXBE= github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c/go.mod h1:1iCZ0433JJMecYqCa+TdWA9Pax8MGl4ByuNDZ7eSnQY= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -204,10 +202,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9 h1:i2aD44Moa5N5pt/WNwHLvIklzPymtr8vkkBlVdNElUE= github.com/tatsuworks/czlib v0.0.0-20190916144400-8a51758ea0d9/go.mod h1:6HrfShlf4bKeQEFdWn4JP/yet/mHW2RhxOQf0e3HWA0= -github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= -github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= -github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= -github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= diff --git a/rpc/client.go b/rpc/client.go index cc73368..ed8d6d6 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -2,16 +2,17 @@ package rpc import ( "context" + "fmt" + "os" - "github.com/TicketsBot-cloud/common/utils" - "github.com/twmb/franz-go/pkg/kgo" + "github.com/go-redis/redis/v8" "go.uber.org/atomic" "go.uber.org/zap" ) type Client struct { config Config - client *kgo.Client + redis *redis.Client logger *zap.Logger consumerRunning *atomic.Bool @@ -21,20 +22,30 @@ type Client struct { } type Config struct { - Brokers []string + Redis *redis.Client ConsumerGroup string + ConsumerName string ConsumerConcurrency int + MaxLen int64 } func NewClient(logger *zap.Logger, config Config, listeners map[string]Listener) (*Client, error) { - kafkaClient, err := connectKafka(config.Brokers, config.ConsumerGroup, utils.Keys(listeners)) - if err != nil { - return nil, err + if config.ConsumerName == "" { + hostname, _ := os.Hostname() + config.ConsumerName = hostname + } + + ctx := context.Background() + for stream := range listeners { + err := config.Redis.XGroupCreateMkStream(ctx, stream, config.ConsumerGroup, "$").Err() + if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { + return nil, fmt.Errorf("create consumer group for stream %s: %w", stream, err) + } } return &Client{ config: config, - client: kafkaClient, + redis: config.Redis, logger: logger, consumerRunning: atomic.NewBool(false), listeners: listeners, @@ -42,18 +53,7 @@ func NewClient(logger *zap.Logger, config Config, listeners map[string]Listener) } func (c *Client) Shutdown() { - c.client.Close() - if c.cancelFunc != nil { c.cancelFunc() } } - -func connectKafka(brokers []string, consumerGroup string, topics []string) (*kgo.Client, error) { - return kgo.NewClient( - kgo.SeedBrokers(brokers...), - kgo.ConsumerGroup(consumerGroup), - kgo.ConsumeTopics(topics...), - kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), - ) -} diff --git a/rpc/consumer.go b/rpc/consumer.go index 621d4a0..224edd8 100644 --- a/rpc/consumer.go +++ b/rpc/consumer.go @@ -3,16 +3,25 @@ package rpc import ( "context" "errors" + "time" + + "github.com/go-redis/redis/v8" "github.com/panjf2000/ants/v2" - "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" ) -const maxEventsPerPoll = 100 +const ( + maxEventsPerPoll = 100 + pollBlockDuration = 1 * time.Second + maintenanceInterval = 30 * time.Second + autoClaimMinIdle = 5 * time.Minute + autoClaimBatchSize = 100 + defaultMaxLen = int64(50000) +) func (c *Client) StartConsumer() { if c.consumerRunning.Swap(true) { - c.logger.Fatal("Kafka client already running") + c.logger.Fatal("Consumer already running") return } @@ -25,64 +34,187 @@ func (c *Client) StartConsumer() { return } + streams := make([]string, 0, len(c.listeners)*2) + for stream := range c.listeners { + streams = append(streams, stream) + } + for range c.listeners { + streams = append(streams, ">") + } + + for stream := range c.listeners { + go c.maintenanceLoop(ctx, stream) + } + for { select { case <-ctx.Done(): return default: - records, err := c.poll(ctx) + messages, err := c.poll(ctx, streams) if err != nil { - if errors.Is(err, kgo.ErrClientClosed) { - c.logger.Info("Kafka client closed, stopping read loop") - return - } else if errors.Is(err, context.Canceled) { + if errors.Is(err, context.Canceled) { c.logger.Info("Context cancelled, stopping read loop") return - } else { - c.logger.Error("Failed to poll records", zap.Error(err)) - continue } + c.logger.Error("Failed to poll records", zap.Error(err)) + continue } - for _, record := range records { - listener, ok := c.listeners[record.Topic] + for _, xStream := range messages { + listener, ok := c.listeners[xStream.Stream] if !ok { - c.logger.Warn("No listener found for topic", zap.String("topic", record.Topic)) + c.logger.Warn("No listener found for stream", zap.String("stream", xStream.Stream)) continue } - value := record.Value - if err := pool.Submit(func() { - ctx, cancel := listener.BuildContext() - defer cancel() + streamName := xStream.Stream + for _, msg := range xStream.Messages { + value, ok := msg.Values["data"] + if !ok { + c.logger.Warn("Message missing data field", zap.String("stream", streamName), zap.String("id", msg.ID)) + c.redis.XAck(ctx, streamName, c.config.ConsumerGroup, msg.ID) + continue + } - listener.HandleMessage(ctx, value) - }); err != nil { - c.logger.Error("Failed to submit task to worker pool", zap.Error(err)) - continue + data := []byte(value.(string)) + msgID := msg.ID + + if err := pool.Submit(func() { + listenerCtx, listenerCancel := listener.BuildContext() + defer listenerCancel() + + listener.HandleMessage(listenerCtx, data) + c.redis.XAck(ctx, streamName, c.config.ConsumerGroup, msgID) + }); err != nil { + c.logger.Error("Failed to submit task to worker pool", zap.Error(err)) + continue + } } } } } } -func (c *Client) poll(ctx context.Context) ([]*kgo.Record, error) { - fetches := c.client.PollRecords(ctx, maxEventsPerPoll) - if fetches.IsClientClosed() { - return nil, kgo.ErrClientClosed - } +func (c *Client) poll(ctx context.Context, streams []string) ([]redis.XStream, error) { + result, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: c.config.ConsumerGroup, + Consumer: c.config.ConsumerName, + Streams: streams, + Count: maxEventsPerPoll, + Block: pollBlockDuration, + }).Result() - if err := fetches.Err(); err != nil { + if err != nil { + if errors.Is(err, redis.Nil) { + return nil, nil + } return nil, err } - records := make([]*kgo.Record, 0, fetches.NumRecords()) + return result, nil +} + +func (c *Client) maintenanceLoop(ctx context.Context, stream string) { + ticker := time.NewTicker(maintenanceInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.trimStream(ctx, stream) + c.autoClaimStale(ctx, stream) + } + } +} + +func (c *Client) trimStream(ctx context.Context, stream string) { + maxLen := c.config.MaxLen + if maxLen <= 0 { + maxLen = defaultMaxLen + } + + trimmed, err := c.redis.XTrimMaxLenApprox(ctx, stream, maxLen, 0).Result() + if err != nil { + if !errors.Is(err, context.Canceled) { + c.logger.Warn("Failed to trim stream", + zap.String("stream", stream), + zap.Error(err)) + } + return + } + + if trimmed > 0 { + c.logger.Debug("Trimmed stream", + zap.String("stream", stream), + zap.Int64("trimmed", trimmed)) + } +} - iter := fetches.RecordIter() - for !iter.Done() { - record := iter.Next() - records = append(records, record) +func (c *Client) autoClaimStale(ctx context.Context, stream string) { + // go-redis/v8's XAutoClaim parser expects 2 response elements, but Redis 7+ + // returns 3 (messages, next start ID, deleted entry IDs). Use a raw Do call + // and parse only the messages array ourselves. + result, err := c.redis.Do(ctx, + "XAUTOCLAIM", stream, c.config.ConsumerGroup, c.config.ConsumerName, + int64(autoClaimMinIdle/time.Millisecond), "0-0", "COUNT", autoClaimBatchSize, + ).Result() + + if err != nil { + if !errors.Is(err, context.Canceled) && !errors.Is(err, redis.Nil) { + c.logger.Warn("Failed to auto-claim stale messages", + zap.String("stream", stream), + zap.Error(err)) + } + return } - return records, nil + parts, ok := result.([]interface{}) + if !ok || len(parts) < 2 { + return + } + + msgs, ok := parts[1].([]interface{}) + if !ok || len(msgs) == 0 { + return + } + + listener, ok := c.listeners[stream] + if !ok { + return + } + + c.logger.Info("Auto-claimed stale messages", + zap.String("stream", stream), + zap.Int("count", len(msgs))) + + for _, raw := range msgs { + entry, ok := raw.([]interface{}) + if !ok || len(entry) < 2 { + continue + } + + msgID, _ := entry[0].(string) + fields, _ := entry[1].([]interface{}) + + var data string + for i := 0; i+1 < len(fields); i += 2 { + if key, _ := fields[i].(string); key == "data" { + data, _ = fields[i+1].(string) + break + } + } + + if data == "" { + c.redis.XAck(ctx, stream, c.config.ConsumerGroup, msgID) + continue + } + + listenerCtx, listenerCancel := listener.BuildContext() + listener.HandleMessage(listenerCtx, []byte(data)) + listenerCancel() + c.redis.XAck(ctx, stream, c.config.ConsumerGroup, msgID) + } } diff --git a/rpc/producer.go b/rpc/producer.go index ce37289..e13102e 100644 --- a/rpc/producer.go +++ b/rpc/producer.go @@ -3,24 +3,29 @@ package rpc import ( "context" "encoding/json" - "github.com/twmb/franz-go/pkg/kgo" + + "github.com/go-redis/redis/v8" "go.uber.org/zap" ) -func (c *Client) ProduceSync(ctx context.Context, topic string, message []byte) error { - c.logger.Debug("Producing message", zap.String("topic", topic), zap.ByteString("message", message)) +const defaultMaxLenApprox int64 = 50000 + +func (c *Client) ProduceSync(ctx context.Context, stream string, message []byte) error { + c.logger.Debug("Producing message", zap.String("stream", stream), zap.ByteString("message", message)) - return c.client.ProduceSync(ctx, &kgo.Record{ - Topic: topic, - Value: message, - }).FirstErr() + return c.redis.XAdd(ctx, &redis.XAddArgs{ + Stream: stream, + MaxLenApprox: defaultMaxLenApprox, + ID: "*", + Values: map[string]interface{}{"data": string(message)}, + }).Err() } -func (c *Client) ProduceSyncJson(ctx context.Context, topic string, message any) error { +func (c *Client) ProduceSyncJson(ctx context.Context, stream string, message any) error { bytes, err := json.Marshal(message) if err != nil { return err } - return c.ProduceSync(ctx, topic, bytes) + return c.ProduceSync(ctx, stream, bytes) }