From ec09667113ad35f64ae85bae9288e84c7c2a7da5 Mon Sep 17 00:00:00 2001 From: Douglas-Lee Date: Wed, 16 Jul 2025 15:31:15 +0800 Subject: [PATCH 1/3] refactor ingest queue --- pkg/queue/queue.go | 23 ++-- pkg/queue/redis/redis.go | 222 +++++++++++++++++++-------------------- proxy/gateway.go | 89 +++++++--------- 3 files changed, 155 insertions(+), 179 deletions(-) diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 112b0c59..2a656e35 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -6,22 +6,23 @@ import ( ) type Message struct { - ID string - Data []byte + Value []byte Time time.Time WorkspaceID string } -type Options struct { - Count int64 - Block bool - Timeout time.Duration -} +type HandleFunc func(ctx context.Context, messages []*Message) error type Queue interface { - Enqueue(ctx context.Context, message *Message) error - Dequeue(ctx context.Context, opts *Options) ([]*Message, error) - Delete(ctx context.Context, message []*Message) error - Size(ctx context.Context) (int64, error) + Producer + Consumer Stats() map[string]interface{} } + +type Producer interface { + WriteMessage(ctx context.Context, message *Message) error +} + +type Consumer interface { + StartListen(ctx context.Context, handle HandleFunc) +} diff --git a/pkg/queue/redis/redis.go b/pkg/queue/redis/redis.go index 258121af..266ca3a3 100644 --- a/pkg/queue/redis/redis.go +++ b/pkg/queue/redis/redis.go @@ -4,11 +4,9 @@ import ( "context" "errors" "github.com/redis/go-redis/v9" - "github.com/webhookx-io/webhookx/constants" - "github.com/webhookx-io/webhookx/pkg/metrics" + "github.com/webhookx-io/webhookx/pkg/loglimiter" "github.com/webhookx-io/webhookx/pkg/queue" "github.com/webhookx-io/webhookx/pkg/tracing" - "github.com/webhookx-io/webhookx/utils" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "strconv" @@ -17,66 +15,53 @@ import ( ) type RedisQueue struct { - stream string - group string - consumer string - visibilityTimeout time.Duration - + opts Options c *redis.Client log *zap.SugaredLogger - metrics *metrics.Metrics + limiter *loglimiter.Limiter } -type RedisQueueOptions struct { +type Options struct { StreamName string - GroupName string + ConsumerGroupName string ConsumerName string VisibilityTimeout time.Duration + Listeners int Client *redis.Client } -func NewRedisQueue(opts RedisQueueOptions, logger *zap.SugaredLogger, metrics *metrics.Metrics) (queue.Queue, error) { +func NewRedisQueue(opts Options, logger *zap.SugaredLogger) (queue.Queue, error) { q := &RedisQueue{ - stream: utils.DefaultIfZero(opts.StreamName, constants.QueueRedisQueueName), - group: utils.DefaultIfZero(opts.GroupName, constants.QueueRedisGroupName), - consumer: utils.DefaultIfZero(opts.ConsumerName, constants.QueueRedisConsumerName), - visibilityTimeout: utils.DefaultIfZero(opts.VisibilityTimeout, constants.QueueRedisVisibilityTimeout), - c: opts.Client, - log: logger, - metrics: metrics, - } - - go q.process() - if metrics.Enabled { - go q.monitoring() + opts: opts, + c: opts.Client, + log: logger.Named("queue-redis"), + limiter: loglimiter.NewLimiter(time.Second), } - return q, nil } -func (q *RedisQueue) Enqueue(ctx context.Context, message *queue.Message) error { +func (q *RedisQueue) WriteMessage(ctx context.Context, message *queue.Message) error { ctx, span := tracing.Start(ctx, "redis.queue.enqueue", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() args := &redis.XAddArgs{ - Stream: q.stream, + Stream: q.opts.StreamName, ID: "*", - Values: []interface{}{"data", message.Data, "time", message.Time.UnixMilli(), "ws_id", message.WorkspaceID}, - } - res := q.c.XAdd(ctx, args) - if res.Err() != nil { - return res.Err() + Values: []interface{}{ + "data", message.Value, + "time", message.Time.UnixMilli(), + "ws_id", message.WorkspaceID, + }, } - message.ID = res.Val() - return nil + return q.c.XAdd(ctx, args).Err() } func toMessage(values map[string]interface{}) *queue.Message { message := &queue.Message{} if data, ok := values["data"].(string); ok { - message.Data = []byte(data) + message.Value = []byte(data) } if timestr, ok := values["time"].(string); ok { @@ -91,76 +76,97 @@ func toMessage(values map[string]interface{}) *queue.Message { return message } -func (q *RedisQueue) Dequeue(ctx context.Context, opt *queue.Options) ([]*queue.Message, error) { +func (q *RedisQueue) dequeue(ctx context.Context) ([]redis.XMessage, error) { ctx, span := tracing.Start(ctx, "redis.queue.dequeue", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - var count int64 = 1 - if opt != nil && opt.Count != 0 { - count = opt.Count - } - var block time.Duration = -1 - if opt != nil && opt.Block { - block = opt.Timeout - } - args := &redis.XReadGroupArgs{ - Group: q.group, - Consumer: q.consumer, - Streams: []string{q.stream, ">"}, - Count: count, - Block: block, + Group: q.opts.ConsumerGroupName, + Consumer: q.opts.ConsumerName, + Streams: []string{q.opts.StreamName, ">"}, + Count: 20, + Block: time.Second, } - res := q.c.XReadGroup(ctx, args) - if res.Err() != nil { - err := res.Err() + streams, err := q.c.XReadGroup(ctx, args).Result() + if err != nil { if errors.Is(err, redis.Nil) { err = nil } else if strings.HasPrefix(err.Error(), "NOGROUP") { - go q.createConsumerGroup() err = nil + go q.createConsumerGroup(q.opts.StreamName, q.opts.ConsumerGroupName) } return nil, err } - messages := make([]*queue.Message, 0) - for _, stream := range res.Val() { - for _, xmessage := range stream.Messages { - message := toMessage(xmessage.Values) - message.ID = xmessage.ID - messages = append(messages, message) - } - } - - return messages, nil + return streams[0].Messages, nil } -func (q *RedisQueue) Delete(ctx context.Context, messages []*queue.Message) error { +func (q *RedisQueue) delete(ctx context.Context, xmessages []redis.XMessage) error { ctx, span := tracing.Start(ctx, "redis.queue.delete", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - ids := make([]string, 0, len(messages)) - for _, message := range messages { + ids := make([]string, 0, len(xmessages)) + for _, message := range xmessages { ids = append(ids, message.ID) } + pipeline := q.c.Pipeline() - pipeline.XAck(ctx, q.stream, q.group, ids...) - pipeline.XDel(ctx, q.stream, ids...) + pipeline.XAck(ctx, q.opts.StreamName, q.opts.ConsumerGroupName, ids...) + pipeline.XDel(ctx, q.opts.StreamName, ids...) _, err := pipeline.Exec(ctx) - if err != nil { - return err + return err +} + +func (q *RedisQueue) StartListen(ctx context.Context, handle queue.HandleFunc) { + q.log.Infof("starting %d listeners", q.opts.Listeners) + for i := 0; i < q.opts.Listeners; i++ { + go q.listen(ctx, handle) + } + go q.process(ctx) +} + +func (q *RedisQueue) listen(ctx context.Context, handle queue.HandleFunc) { + for { + select { + case <-ctx.Done(): + return + default: + xmessages, err := q.dequeue(ctx) + if err != nil && q.limiter.Allow(err.Error()) { + q.log.Warnf("failed to dequeue: %v", err) + time.Sleep(time.Second) + continue + } + if len(xmessages) == 0 { + continue + } + + messages := make([]*queue.Message, 0, len(xmessages)) + for _, msg := range xmessages { + messages = append(messages, toMessage(msg.Values)) + } + + err = handle(ctx, messages) + if err != nil { + q.log.Warnf("failed to handle message: %v", err) + continue + } + err = q.delete(ctx, xmessages) + if err != nil { + q.log.Warnf("failed to delete message: %v", err) + } + } } - return nil } -func (q *RedisQueue) Size(ctx context.Context) (int64, error) { - return q.c.XLen(ctx, q.stream).Result() +func (q *RedisQueue) size(ctx context.Context) (int64, error) { + return q.c.XLen(ctx, q.opts.StreamName).Result() } func (q *RedisQueue) Stats() map[string]interface{} { stats := make(map[string]interface{}) - size, err := q.Size(context.TODO()) + size, err := q.size(context.TODO()) if err != nil { q.log.Errorf("failed to retrieve status: %v", err) } @@ -169,20 +175,18 @@ func (q *RedisQueue) Stats() map[string]interface{} { return stats } -func (q *RedisQueue) createConsumerGroup() { - res := q.c.XGroupCreateMkStream(context.TODO(), q.stream, q.group, "0") - if res.Err() == nil { - q.log.Debugf("created default consumer group: %s", q.group) +func (q *RedisQueue) createConsumerGroup(stream string, group string) { + err := q.c.XGroupCreateMkStream(context.TODO(), stream, group, "0").Err() + if err != nil { + if err.Error() != "BUSYGROUP Consumer Group name already exists" { + q.log.Errorf("failed to create Consumer Group '%s': %s", group, err.Error()) + } return } - - if res.Err().Error() != "BUSYGROUP Consumer Group name already exists" { - q.log.Errorf("failed to create the default consumer group: %s", res.Err().Error()) - } + q.log.Debugf("Consumer Group '%s' created", group) } -// process re-enqueue invisible messages that reach the visibility timeout -func (q *RedisQueue) process() { +func (q *RedisQueue) process(ctx context.Context) { var reenqueueScript = redis.NewScript(` local entries = redis.call('XPENDING', KEYS[1], KEYS[2], 'IDLE', ARGV[1], '-', '+', 1000) local ids = {} @@ -200,39 +204,25 @@ func (q *RedisQueue) process() { return ids `) - go func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - keys := []string{q.stream, q.group} - argv := []interface{}{q.visibilityTimeout.Milliseconds()} - res, err := reenqueueScript.Run(context.Background(), q.c, keys, argv...).Result() - if err != nil { - q.log.Errorf("failed to reenqueue: %v", err) - continue - } - - if ids, ok := res.([]interface{}); ok && len(ids) > 0 { - q.log.Debugf("enqueued invisible messages: %d", len(ids)) - } - } - } + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - }() -} + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + keys := []string{q.opts.StreamName, q.opts.ConsumerGroupName} + argv := []interface{}{q.opts.VisibilityTimeout.Milliseconds()} + res, err := reenqueueScript.Run(context.TODO(), q.c, keys, argv...).Result() + if err != nil { + q.log.Errorf("failed to reenqueue: %v", err) + continue + } -func (q *RedisQueue) monitoring() { - ticker := time.NewTicker(q.metrics.Interval) - defer ticker.Stop() - for range ticker.C { - size, err := q.Size(context.TODO()) - if err != nil { - q.log.Errorf("failed to get redis queue size: %v", err) - continue + if ids, ok := res.([]interface{}); ok && len(ids) > 0 { + q.log.Debugf("enqueued invisible messages: %d", len(ids)) + } } - q.metrics.EventPendingGauge.Set(float64(size)) } } diff --git a/proxy/gateway.go b/proxy/gateway.go index eedb7e4b..e8c66e69 100644 --- a/proxy/gateway.go +++ b/proxy/gateway.go @@ -100,9 +100,14 @@ func NewGateway(opts Options) *Gateway { var q queue.Queue switch opts.Cfg.Queue.Type { case "redis": - q, _ = redis.NewRedisQueue(redis.RedisQueueOptions{ - Client: opts.Cfg.Queue.Redis.GetClient(), - }, zap.S(), opts.Metrics) + q, _ = redis.NewRedisQueue(redis.Options{ + StreamName: constants.QueueRedisQueueName, + ConsumerGroupName: constants.QueueRedisGroupName, + ConsumerName: constants.QueueRedisConsumerName, + VisibilityTimeout: constants.QueueRedisVisibilityTimeout, + Listeners: runtime.GOMAXPROCS(0), + Client: opts.Cfg.Queue.Redis.GetClient(), + }, zap.S()) stats.Register(q) } @@ -285,11 +290,11 @@ func (gw *Gateway) ingestEvent(ctx context.Context, async bool, event *entities. } msg := queue.Message{ - Data: bytes, + Value: bytes, Time: time.Now(), WorkspaceID: event.WorkspaceId, } - return gw.queue.Enqueue(ctx, &msg) + return gw.queue.WriteMessage(ctx, &msg) } return gw.dispatch(ctx, []*entities.Event{event}) @@ -304,6 +309,10 @@ func (gw *Gateway) Start() { gw.buildRouter("init") + if gw.queue != nil { + gw.queue.StartListen(gw.ctx, gw.HandleMessages) + } + schedule.Schedule(gw.ctx, func() { version := store.GetDefault("router:version", "init").(string) if gw.routerVersion == version { @@ -312,6 +321,14 @@ func (gw *Gateway) Start() { gw.buildRouter(version) }, time.Second) + if gw.metrics.Enabled && gw.queue != nil { + schedule.Schedule(gw.ctx, func() { + stats := stats.Stats(gw.queue.Stats()) + size := stats.Int64("eventqueue.size") + gw.metrics.EventPendingGauge.Set(float64(size)) + }, gw.metrics.Interval) + } + gw.bus.Subscribe("source.crud", func(data interface{}) { store.Set("router:version", utils.UUID()) }) @@ -350,14 +367,6 @@ func (gw *Gateway) Start() { "tls", gw.cfg.TLS.Enabled(), ) - if gw.queue != nil { - listeners := runtime.GOMAXPROCS(0) - gw.log.Infof(`starting %d listeners`, listeners) - for i := 0; i < listeners; i++ { - go gw.listenQueue() - } - } - } // Stop stops the HTTP server @@ -374,48 +383,24 @@ func (gw *Gateway) Stop() error { return nil } -func (gw *Gateway) listenQueue() { - opts := &queue.Options{ - Count: 20, - Block: true, - Timeout: time.Second, +func (gw *Gateway) HandleMessages(ctx context.Context, messages []*queue.Message) error { + events := make([]*entities.Event, 0, len(messages)) + for _, message := range messages { + var event entities.Event + err := json.Unmarshal(message.Value, &event) + if err != nil { + gw.log.Warnf("faield to unmarshal message: %v", err) + continue + } + event.WorkspaceId = message.WorkspaceID + events = append(events, &event) } - for { - select { - case <-gw.ctx.Done(): - return - default: - ctx := context.TODO() - messages, err := gw.queue.Dequeue(ctx, opts) - if err != nil && gw.limiter.Allow(err.Error()) { - gw.log.Warnf("failed to dequeue: %v", err) - time.Sleep(time.Second) - continue - } - if len(messages) == 0 { - continue - } - - events := make([]*entities.Event, 0, len(messages)) - for _, message := range messages { - var event entities.Event - err = json.Unmarshal(message.Data, &event) - if err != nil { - gw.log.Warnf("faield to unmarshal message: %v", err) - continue - } - event.WorkspaceId = message.WorkspaceID - events = append(events, &event) - } - err = gw.dispatch(ctx, events) - if err != nil { - gw.log.Warnf("failed to dispatch event in batch: %v", err) - continue - } - _ = gw.queue.Delete(ctx, messages) - } + err := gw.dispatch(ctx, events) + if err != nil { + gw.log.Warnf("failed to dispatch event in batch: %v", err) } + return err } func (gw *Gateway) dispatch(ctx context.Context, events []*entities.Event) error { From 71077fb56229043c826fd866c34d7240c02d099f Mon Sep 17 00:00:00 2001 From: Douglas-Lee Date: Wed, 16 Jul 2025 17:05:56 +0800 Subject: [PATCH 2/3] update --- pkg/queue/queue.go | 6 +++--- pkg/queue/redis/redis.go | 10 +++++----- proxy/gateway.go | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 2a656e35..879e3911 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -11,7 +11,7 @@ type Message struct { WorkspaceID string } -type HandleFunc func(ctx context.Context, messages []*Message) error +type HandlerFunc func(ctx context.Context, messages []*Message) error type Queue interface { Producer @@ -20,9 +20,9 @@ type Queue interface { } type Producer interface { - WriteMessage(ctx context.Context, message *Message) error + Enqueue(ctx context.Context, message *Message) error } type Consumer interface { - StartListen(ctx context.Context, handle HandleFunc) + StartListen(ctx context.Context, handler HandlerFunc) } diff --git a/pkg/queue/redis/redis.go b/pkg/queue/redis/redis.go index 266ca3a3..f51ee9d6 100644 --- a/pkg/queue/redis/redis.go +++ b/pkg/queue/redis/redis.go @@ -41,7 +41,7 @@ func NewRedisQueue(opts Options, logger *zap.SugaredLogger) (queue.Queue, error) return q, nil } -func (q *RedisQueue) WriteMessage(ctx context.Context, message *queue.Message) error { +func (q *RedisQueue) Enqueue(ctx context.Context, message *queue.Message) error { ctx, span := tracing.Start(ctx, "redis.queue.enqueue", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() @@ -117,15 +117,15 @@ func (q *RedisQueue) delete(ctx context.Context, xmessages []redis.XMessage) err return err } -func (q *RedisQueue) StartListen(ctx context.Context, handle queue.HandleFunc) { +func (q *RedisQueue) StartListen(ctx context.Context, handler queue.HandlerFunc) { q.log.Infof("starting %d listeners", q.opts.Listeners) for i := 0; i < q.opts.Listeners; i++ { - go q.listen(ctx, handle) + go q.listen(ctx, handler) } go q.process(ctx) } -func (q *RedisQueue) listen(ctx context.Context, handle queue.HandleFunc) { +func (q *RedisQueue) listen(ctx context.Context, handler queue.HandlerFunc) { for { select { case <-ctx.Done(): @@ -146,7 +146,7 @@ func (q *RedisQueue) listen(ctx context.Context, handle queue.HandleFunc) { messages = append(messages, toMessage(msg.Values)) } - err = handle(ctx, messages) + err = handler(ctx, messages) if err != nil { q.log.Warnf("failed to handle message: %v", err) continue diff --git a/proxy/gateway.go b/proxy/gateway.go index e8c66e69..f702bbaa 100644 --- a/proxy/gateway.go +++ b/proxy/gateway.go @@ -294,7 +294,7 @@ func (gw *Gateway) ingestEvent(ctx context.Context, async bool, event *entities. Time: time.Now(), WorkspaceID: event.WorkspaceId, } - return gw.queue.WriteMessage(ctx, &msg) + return gw.queue.Enqueue(ctx, &msg) } return gw.dispatch(ctx, []*entities.Event{event}) From bd5000c89a835e5742005f0be5ca6b72e7725929 Mon Sep 17 00:00:00 2001 From: Douglas-Lee Date: Wed, 16 Jul 2025 17:11:40 +0800 Subject: [PATCH 3/3] feat(queue): integrate kafka --- config/kafka.go | 6 ++ config/proxy.go | 4 +- go.mod | 6 ++ go.sum | 57 ++++++++++++ pkg/queue/kafka/kafka.go | 191 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 263 insertions(+), 1 deletion(-) create mode 100644 config/kafka.go create mode 100644 pkg/queue/kafka/kafka.go diff --git a/config/kafka.go b/config/kafka.go new file mode 100644 index 00000000..4d81de01 --- /dev/null +++ b/config/kafka.go @@ -0,0 +1,6 @@ +package config + +type KafkaConfig struct { + Topic string `yaml:"topic" json:"topic" default:"webhookx"` + Address []string `yaml:"address" json:"address"` +} diff --git a/config/proxy.go b/config/proxy.go index 82b9ad5c..55210b54 100644 --- a/config/proxy.go +++ b/config/proxy.go @@ -17,15 +17,17 @@ type QueueType string const ( QueueTypeOff QueueType = "off" QueueTypeRedis QueueType = "redis" + QueueTypeKafka QueueType = "kafka" ) type Queue struct { Type QueueType `yaml:"type" json:"type" default:"redis"` Redis RedisConfig `yaml:"redis" json:"redis"` + Kafka KafkaConfig `yaml:"kafka" json:"kafka"` } func (cfg Queue) Validate() error { - if !slices.Contains([]QueueType{QueueTypeRedis, QueueTypeOff}, cfg.Type) { + if !slices.Contains([]QueueType{QueueTypeRedis, QueueTypeOff, QueueTypeKafka}, cfg.Type) { return fmt.Errorf("unknown type: %s", cfg.Type) } if cfg.Type == QueueTypeRedis { diff --git a/go.mod b/go.mod index 13684967..d9286b09 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/redis/go-redis/v9 v9.11.0 github.com/rs/zerolog v1.34.0 github.com/satori/go.uuid v1.2.0 + github.com/segmentio/kafka-go v0.4.48 github.com/segmentio/ksuid v1.0.4 github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 @@ -53,8 +54,13 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/klauspost/compress v1.15.11 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/pierrec/lz4/v4 v4.1.16 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/propagators/aws v1.36.0 // indirect go.opentelemetry.io/contrib/propagators/b3 v1.36.0 // indirect diff --git a/go.sum b/go.sum index 7d2f8519..662c18b3 100644 --- a/go.sum +++ b/go.sum @@ -117,6 +117,9 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -150,6 +153,9 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.16 h1:kQPfno+wyx6C5572ABwV+Uo3pDFzQ7yhyGchSyRda0c= +github.com/pierrec/lz4/v4 v4.1.16/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -169,6 +175,8 @@ github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/segmentio/kafka-go v0.4.48 h1:9jyu9CWK4W5W+SroCe8EffbrRZVqAOkuaLd/ApID4Vs= +github.com/segmentio/kafka-go v0.4.48/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= @@ -176,15 +184,25 @@ github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wx github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/tetratelabs/wazero v1.9.0 h1:IcZ56OuxrtaEz8UYNRHBrUa9bYeX9oVY93KspZZBf/I= github.com/tetratelabs/wazero v1.9.0/go.mod h1:TSbcXCfFP0L2FGkRPxHphadXPjo1T6W+CseNNY7EkjM= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU= @@ -233,23 +251,62 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU= golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237 h1:Kog3KlB4xevJlAcbbbzPfRG0+X9fdoGM+UBRKVz6Wr0= google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237/go.mod h1:ezi0AVyMKDWy5xAncvjLWH7UcLBB5n7y2fQ8MzjJcto= google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237 h1:cJfm9zPbe1e873mHJzmQ1nwVEeRDU/T1wXDK2kUSU34= diff --git a/pkg/queue/kafka/kafka.go b/pkg/queue/kafka/kafka.go new file mode 100644 index 00000000..1875974f --- /dev/null +++ b/pkg/queue/kafka/kafka.go @@ -0,0 +1,191 @@ +package kafka + +import ( + "context" + "fmt" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" + "github.com/segmentio/kafka-go/sasl/scram" + "github.com/webhookx-io/webhookx/pkg/queue" + "go.uber.org/zap" + "time" +) + +type KafkaQueue struct { + opts Options + log *zap.SugaredLogger + w *kafka.Writer + readers []*kafka.Reader + c *kafka.Client +} + +type Options struct { + Topic string + Address []string + Mechanism string + Username string + Password string + + ConsumerNumbers int +} + +func (o Options) MechanismX() sasl.Mechanism { + if true { + return nil + } + mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") + if err != nil { + panic(err) + } + return mechanism +} + +func NewKafkaQueue(opts Options, logger *zap.SugaredLogger) (queue.Queue, error) { + + //mechanism := opts.MechanismX() + + // Transports are responsible for managing connection pools and other resources, + // it's generally best to create a few of these and share them across your + // application. + //sharedTransport := &kafka.Transport{ + // SASL: mechanism, + //} + + //kafka.DefaultTransport + writer := &kafka.Writer{ + Addr: kafka.TCP(opts.Address...), + Topic: opts.Topic, + AllowAutoTopicCreation: true, + //Balancer: &kafka.LeastBytes{}, // TODO + //Transport: sharedTransport, + //RequiredAcks: kafka.RequireOne, + Async: true, + Compression: kafka.Snappy, + } + + readers := make([]*kafka.Reader, 0, opts.ConsumerNumbers) + for i := 0; i < opts.ConsumerNumbers; i++ { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: opts.Address, + Topic: opts.Topic, + GroupID: "webhookx", + MaxBytes: 10e6, // 10MB + //Dialer: &kafka.Dialer{ + // Timeout: kafka.DefaultDialer.Timeout, + // DualStack: kafka.DefaultDialer.DualStack, + // SASLMechanism: mechanism, + //}, + MaxWait: 10 * time.Second, + //CommitInterval: + }) + readers = append(readers, reader) + } + + client := &kafka.Client{ + Addr: kafka.TCP(opts.Address...), + } + + q := &KafkaQueue{ + opts: opts, + log: logger, + w: writer, + readers: readers, + c: client, + } + + return q, nil +} + +func (q *KafkaQueue) Enqueue(ctx context.Context, message *queue.Message) error { + m := kafka.Message{ + Key: nil, // TODO + Value: message.Value, + Headers: []kafka.Header{ + {Key: "wid", Value: []byte(message.WorkspaceID)}, + }, + } + return q.w.WriteMessages(ctx, m) +} + +func (q *KafkaQueue) Size(ctx context.Context) (int64, error) { + return 0, nil +} + +func (q *KafkaQueue) Stats() map[string]interface{} { + return map[string]interface{}{} +} + +func (q *KafkaQueue) Close() error { + if err := q.w.Close(); err != nil { + return err + } + for i := range q.readers { + if err := q.readers[i].Close(); err != nil { + return err + } + + } + return nil +} + +func fetchMessages(ctx context.Context, r *kafka.Reader) ([]kafka.Message, error) { + list := make([]kafka.Message, 0) + for i := 0; i < 20; i++ { + m, err := r.FetchMessage(ctx) + if err != nil { + fmt.Println("failed to fetch message") + return nil, err + } + list = append(list, m) + } + return list, nil +} + +func (q *KafkaQueue) StartListen(ctx context.Context, handle queue.HandlerFunc) { + for i := 0; i < q.opts.ConsumerNumbers; i++ { + go q.listen(ctx, q.readers[i], handle) + } +} + +func toMessage(message kafka.Message) *queue.Message { + return &queue.Message{ + Value: message.Value, + WorkspaceID: string(message.Headers[0].Value), + } +} + +func (q *KafkaQueue) listen(ctx context.Context, reader *kafka.Reader, handle queue.HandlerFunc) { + go func() { + for { + select { + case <-ctx.Done(): + return + default: + xmessages, err := fetchMessages(ctx, reader) + if err != nil { + fmt.Println("failed to fetch message", err) + continue + } + + if len(xmessages) == 0 { + continue + } + + messages := make([]*queue.Message, 0, len(xmessages)) + for _, msg := range xmessages { + messages = append(messages, toMessage(msg)) + } + + err = handle(ctx, messages) + if err != nil { + q.log.Warnf("failed to handle message: %v", err) + continue + } + err = reader.CommitMessages(ctx, xmessages...) + if err != nil { + q.log.Warnf("failed to delete message: %v", err) + } + } + } + }() +}