From a2dd638eea7b73b4ba6fbb633fdaa692cf5fdaed Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Sat, 16 Aug 2025 12:03:26 -0400 Subject: [PATCH] Support subscriptions against cluster slave nodes --- osscluster.go | 27 ++++++-- osscluster_test.go | 163 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+), 4 deletions(-) diff --git a/osscluster.go b/osscluster.go index 8d839a0a6..517b61f1a 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1821,14 +1821,33 @@ func (c *ClusterClient) pubSub() *PubSub { } var err error + if len(channels) > 0 { slot := hashtag.Slot(channels[0]) - node, err = c.slotMasterNode(ctx, slot) + + // newConn in PubSub is only used for subscription connections, so it is safe to + // assume that a slave node can always be used when client options specify ReadOnly. + if c.opt.ReadOnly { + state, err := c.state.Get(ctx) + if err != nil { + return nil, err + } + + node, err = c.slotReadOnlyNode(state, slot) + if err != nil { + return nil, err + } + } else { + node, err = c.slotMasterNode(ctx, slot) + if err != nil { + return nil, err + } + } } else { node, err = c.nodes.Random() - } - if err != nil { - return nil, err + if err != nil { + return nil, err + } } cn, err := node.Client.newConn(context.TODO()) diff --git a/osscluster_test.go b/osscluster_test.go index 09c6d362c..3659ec654 100644 --- a/osscluster_test.go +++ b/osscluster_test.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" . "github.com/bsm/ginkgo/v2" @@ -644,6 +645,87 @@ var _ = Describe("ClusterClient", func() { }, 30*time.Second).ShouldNot(HaveOccurred()) }) + It("supports PubSub with ReadOnly option", func() { + opt = redisClusterOptions() + opt.ReadOnly = true + client = cluster.newClusterClient(ctx, opt) + + pubsub := client.Subscribe(ctx, "mychannel") + defer pubsub.Close() + + Eventually(func() error { + var masterPubsubChannels atomic.Int64 + var slavePubsubChannels atomic.Int64 + + err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { + info := master.InfoMap(ctx, "stats") + if info.Err() != nil { + return info.Err() + } + + pc, err := strconv.Atoi(info.Item("Stats", "pubsub_channels")) + if err != nil { + return err + } + + masterPubsubChannels.Add(int64(pc)) + + return nil + }) + if err != nil { + return err + } + + err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error { + info := slave.InfoMap(ctx, "stats") + if info.Err() != nil { + return info.Err() + } + + pc, err := strconv.Atoi(info.Item("Stats", "pubsub_channels")) + if err != nil { + return err + } + + slavePubsubChannels.Add(int64(pc)) + + return nil + }) + if err != nil { + return err + } + + if c := masterPubsubChannels.Load(); c != int64(0) { + return fmt.Errorf("total master pubsub_channels is %d; expected 0", c) + } + + if c := slavePubsubChannels.Load(); c != int64(1) { + return fmt.Errorf("total slave pubsub_channels is %d; expected 1", c) + } + + return nil + }, 30*time.Second).ShouldNot(HaveOccurred()) + + Eventually(func() error { + _, err := client.Publish(ctx, "mychannel", "hello").Result() + if err != nil { + return err + } + + msg, err := pubsub.ReceiveTimeout(ctx, time.Second) + if err != nil { + return err + } + + _, ok := msg.(*redis.Message) + if !ok { + return fmt.Errorf("got %T, wanted *redis.Message", msg) + } + + return nil + }, 30*time.Second).ShouldNot(HaveOccurred()) + }) + It("supports sharded PubSub", func() { pubsub := client.SSubscribe(ctx, "mychannel") defer pubsub.Close() @@ -668,6 +750,87 @@ var _ = Describe("ClusterClient", func() { }, 30*time.Second).ShouldNot(HaveOccurred()) }) + It("supports sharded PubSub with ReadOnly option", func() { + opt = redisClusterOptions() + opt.ReadOnly = true + client = cluster.newClusterClient(ctx, opt) + + pubsub := client.SSubscribe(ctx, "mychannel") + defer pubsub.Close() + + Eventually(func() error { + var masterPubsubShardChannels atomic.Int64 + var slavePubsubShardChannels atomic.Int64 + + err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { + info := master.InfoMap(ctx, "stats") + if info.Err() != nil { + return info.Err() + } + + pc, err := strconv.Atoi(info.Item("Stats", "pubsubshard_channels")) + if err != nil { + return err + } + + masterPubsubShardChannels.Add(int64(pc)) + + return nil + }) + if err != nil { + return err + } + + err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error { + info := slave.InfoMap(ctx, "stats") + if info.Err() != nil { + return info.Err() + } + + pc, err := strconv.Atoi(info.Item("Stats", "pubsubshard_channels")) + if err != nil { + return err + } + + slavePubsubShardChannels.Add(int64(pc)) + + return nil + }) + if err != nil { + return err + } + + if c := masterPubsubShardChannels.Load(); c != int64(0) { + return fmt.Errorf("total master pubsubshard_channels is %d; expected 0", c) + } + + if c := slavePubsubShardChannels.Load(); c != int64(1) { + return fmt.Errorf("total slave pubsubshard_channels is %d; expected 1", c) + } + + return nil + }, 30*time.Second).ShouldNot(HaveOccurred()) + + Eventually(func() error { + _, err := client.SPublish(ctx, "mychannel", "hello").Result() + if err != nil { + return err + } + + msg, err := pubsub.ReceiveTimeout(ctx, time.Second) + if err != nil { + return err + } + + _, ok := msg.(*redis.Message) + if !ok { + return fmt.Errorf("got %T, wanted *redis.Message", msg) + } + + return nil + }, 30*time.Second).ShouldNot(HaveOccurred()) + }) + It("supports PubSub.Ping without channels", func() { pubsub := client.Subscribe(ctx) defer pubsub.Close()