From 86dd1135d6aaa7f8ac9fb5b82e9680a1a92d3939 Mon Sep 17 00:00:00 2001 From: Yash Zod Date: Sun, 13 Jul 2025 15:37:53 +0530 Subject: [PATCH 1/8] added option for adding redis cluster client to initialise watcher --- README.md | 33 +++++++++++++++++++ options.go | 2 ++ watcher.go | 96 ++++++++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 113 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index d54c3a9..738ee73 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,9 @@ func main() { IgnoreSelf: false, }) + // ====================================== + // Example 2: Redis Cluster (Using addrs) + // ====================================== // Or initialize the watcher in redis cluster. // w, _ := rediswatcher.NewWatcherWithCluster("localhost:6379,localhost:6379,localhost:6379", rediswatcher.WatcherOptions{ // ClusterOptions: redis.ClusterOptions{ @@ -55,6 +58,36 @@ func main() { // IgnoreSelf: false, // }) + + // ====================================== + // Example 3: Standalone Redis with existing Redis client + // ====================================== + // rdb := redis.NewClient(&redis.Options{ + // Addr: "localhost:6379", + // Password: "", + // DB: 0, + // }) + // here we can directly pass existing redis client as SubClient and/or PubClient + // w, _ := rediswatcher.NewWatcher("", rediswatcher.WatcherOptions{ + // SubClient: rdb, + // PubClient: rdb, + // Channel: "/casbin", + // }) + + // ====================================== + // Example 4: Redis Cluster with existing cluster clients + // ====================================== + // clusterClient := redis.NewClusterClient(&redis.ClusterOptions{ + // Addrs: []string{"localhost:6379", "localhost:6380", "localhost:6381"}, + // Password: "", + // }) + // here we can directly pass existing redis client as SubClusterClient and/or PubClusterClient + // w, _ := rediswatcher.NewWatcherWithCluster("", rediswatcher.WatcherOptions{ + // SubClusterClient: clusterClient, + // PubClusterClient: clusterClient, + // Channel: "/casbin", + // }) + // Initialize the enforcer. e, _ := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") diff --git a/options.go b/options.go index 9de2e64..0c8ca48 100644 --- a/options.go +++ b/options.go @@ -10,6 +10,8 @@ type WatcherOptions struct { ClusterOptions rds.ClusterOptions SubClient *rds.Client PubClient *rds.Client + SubClusterClient *rds.ClusterClient + PubClusterClient *rds.ClusterClient Channel string IgnoreSelf bool LocalID string diff --git a/watcher.go b/watcher.go index c858d5e..081a9c8 100644 --- a/watcher.go +++ b/watcher.go @@ -105,12 +105,25 @@ func (m *MSG) UnmarshalBinary(data []byte) error { return nil } -// NewWatcher creates a new Watcher to be used with a Casbin enforcer -// addr is a redis target string in the format "host:port" -// setters allows for inline WatcherOptions +// NewWatcher creates a new Watcher to be used with a Casbin enforcer. +// `addr` is a Redis target string in the format "host:port". +// If you already have Redis clients initialized, you can pass them using WatcherOptions: +// - Use SubClient and PubClient for standalone Redis // -// Example: -// w, err := rediswatcher.NewWatcher("127.0.0.1:6379",WatcherOptions{}, nil) +// If clients are provided, `addr` can be an empty string (""). +// If clients are not provided, a new Redis client will be created using `addr`. +// +// Example with address: +// +// w, err := rediswatcher.NewWatcher("127.0.0.1:6379", WatcherOptions{}) +// +// Example with custom clients: +// +// opts := WatcherOptions{ +// SubClient: myRedisClient, +// PubClient: myRedisClient, +// } +// w, err := rediswatcher.NewWatcher("", opts) func NewWatcher(addr string, option WatcherOptions) (persist.Watcher, error) { option.Options.Addr = addr initConfig(&option) @@ -137,27 +150,52 @@ func NewWatcher(addr string, option WatcherOptions) (persist.Watcher, error) { return w, nil } -// NewWatcherWithCluster creates a new Watcher to be used with a Casbin enforcer -// addrs is a redis-cluster target string in the format "host1:port1,host2:port2,host3:port3" +// NewWatcherWithCluster creates a new Watcher to be used with a Casbin enforcer. +// If SubClusterClient and PubClusterClient are already set in the WatcherOptions, +// the `addrs` string can be empty (""). // -// Example: -// w, err := rediswatcher.NewWatcherWithCluster("127.0.0.1:6379,127.0.0.1:6379,127.0.0.1:6379",WatcherOptions{}) +// addrs should be a comma-separated list of Redis cluster nodes in the format: +// "host1:port1,host2:port2,host3:port3" +// +// Example with address string: +// w, err := rediswatcher.NewWatcherWithCluster("127.0.0.1:6379,127.0.0.1:6380,127.0.0.1:6381", WatcherOptions{}) +// +// Example with pre-initialized clients: +// opts := WatcherOptions{ +// SubClusterClient: existingSubClient, +// PubClusterClient: existingPubClient, +// } +// w, err := rediswatcher.NewWatcherWithCluster("", opts) + func NewWatcherWithCluster(addrs string, option WatcherOptions) (persist.Watcher, error) { addrsStr := strings.Split(addrs, ",") option.ClusterOptions.Addrs = addrsStr initConfig(&option) + var watcherSubClient rds.UniversalClient + var watcherPubClient rds.UniversalClient - w := &Watcher{ - subClient: rds.NewClusterClient(&rds.ClusterOptions{ + watcherSubClient = option.SubClusterClient + watcherPubClient = option.PubClusterClient + + if option.SubClusterClient == nil { + watcherSubClient = rds.NewClusterClient(&rds.ClusterOptions{ Addrs: addrsStr, Password: option.ClusterOptions.Password, - }), - pubClient: rds.NewClusterClient(&rds.ClusterOptions{ + }) + } + + if option.PubClusterClient == nil { + watcherPubClient = rds.NewClusterClient(&rds.ClusterOptions{ Addrs: addrsStr, Password: option.ClusterOptions.Password, - }), - ctx: context.Background(), - close: make(chan struct{}), + }) + } + + w := &Watcher{ + subClient: watcherSubClient, + pubClient: watcherPubClient, + ctx: context.Background(), + close: make(chan struct{}), } err := w.initConfig(option, true) @@ -179,6 +217,16 @@ func NewWatcherWithCluster(addrs string, option WatcherOptions) (persist.Watcher return w, nil } +func (wo *WatcherOptions) validate_wo() error { + if wo.SubClient != nil && wo.SubClusterClient != nil { + return fmt.Errorf("only one of SubClient or SubClusterClient should be set") + } + if wo.PubClient != nil && wo.PubClusterClient != nil { + return fmt.Errorf("only one of SubClient or SubClusterClient should be set") + } + return nil +} + func (w *Watcher) initConfig(option WatcherOptions, cluster ...bool) error { var err error if option.OptionalUpdateCallback != nil { @@ -192,9 +240,17 @@ func (w *Watcher) initConfig(option WatcherOptions, cluster ...bool) error { return err } - if option.SubClient != nil { + err = option.validate_wo() + if err != nil { + return err + } + + if option.SubClusterClient != nil { + w.subClient = option.SubClusterClient + } else if option.SubClient != nil { w.subClient = option.SubClient } else { + // Neither provided, create new client if len(cluster) > 0 && cluster[0] { w.subClient = rds.NewClusterClient(&option.ClusterOptions) } else { @@ -202,15 +258,19 @@ func (w *Watcher) initConfig(option WatcherOptions, cluster ...bool) error { } } - if option.PubClient != nil { + if option.PubClusterClient != nil { + w.pubClient = option.PubClusterClient + } else if option.PubClient != nil { w.pubClient = option.PubClient } else { + // Neither provided, create new client if len(cluster) > 0 && cluster[0] { w.pubClient = rds.NewClusterClient(&option.ClusterOptions) } else { w.pubClient = rds.NewClient(&option.Options) } } + return nil } From f234f1e6438772281bd3d01214720004b0eabef1 Mon Sep 17 00:00:00 2001 From: yash_zod Date: Sun, 13 Jul 2025 16:20:36 +0530 Subject: [PATCH 2/8] made GO111MODULE on --- .github/workflows/ci.yml | 2 +- watcher_test.go | 105 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ef55684..e8d2be2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: - name: Install goveralls env: - GO111MODULE: off + GO111MODULE: on run: go get github.com/mattn/goveralls - name: Send coverage diff --git a/watcher_test.go b/watcher_test.go index 816ea13..b6980a2 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -1,13 +1,16 @@ package rediswatcher_test import ( + "context" "encoding/json" "fmt" - "github.com/google/uuid" "reflect" "testing" "time" + "github.com/google/uuid" + rds "github.com/redis/go-redis/v9" + "github.com/casbin/casbin/v2" "github.com/casbin/casbin/v2/persist" rediswatcher "github.com/casbin/redis-watcher/v2" @@ -50,7 +53,7 @@ func TestWatcher(t *testing.T) { }) _ = w.Update() w.Close() - time.Sleep(time.Millisecond * 500) + time.Sleep(time.Millisecond * 5000) } func TestWatcherWithIngnoreSelfFalse(t *testing.T) { @@ -298,3 +301,101 @@ func TestUpdateForUpdatePolicies(t *testing.T) { w2.Close() time.Sleep(time.Millisecond * 500) } + +func TestClusteredWatcherSync(t *testing.T) { + wo := rediswatcher.WatcherOptions{ + IgnoreSelf: true, // Ensure only remote updates are received + } + + // Initialize two clustered enforcers/watcher instances + e1, w1 := initWatcherWithOptions(t, wo, true) + e2, w2 := initWatcherWithOptions(t, wo, true) + + // Wait for pub/sub connection setup + time.Sleep(500 * time.Millisecond) + + // Add a policy to e1 and expect e2 to sync via the watcher + _, err := e1.AddPolicy("user1", "data1", "read") + if err != nil { + t.Fatalf("Failed to add policy on e1: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + if !reflect.DeepEqual(e1.GetPolicy(), e2.GetPolicy()) { + t.Log("Method", "AddPolicy (Clustered)") + t.Log("e1 policy", e1.GetPolicy()) + t.Log("e2 policy", e2.GetPolicy()) + t.Error("Policy mismatch: Clustered enforcers did not sync") + } + + // Clean up + w1.Close() + w2.Close() +} + +func TestClusteredWatcherSyncWithPreconfiguredClients(t *testing.T) { + // Manually create Redis cluster clients + subClient := rds.NewClusterClient(&rds.ClusterOptions{ + Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, + }) + pubClient := rds.NewClusterClient(&rds.ClusterOptions{ + Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, + }) + + // Ping to ensure clients are connected + if err := subClient.Ping(context.Background()).Err(); err != nil { + t.Fatalf("Failed to ping subClient: %v", err) + } + if err := pubClient.Ping(context.Background()).Err(); err != nil { + t.Fatalf("Failed to ping pubClient: %v", err) + } + + // Prepare watcher options with the created clients + wo := rediswatcher.WatcherOptions{ + IgnoreSelf: true, + SubClusterClient: subClient, + PubClusterClient: pubClient, + } + + // Initialize the watcher using clients instead of address string + w, err := rediswatcher.NewWatcherWithCluster("", wo) + if err != nil { + t.Fatalf("Failed to initialize watcher with custom clients: %v", err) + } + + // Setup enforcer and bind to watcher + e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") + if err != nil { + t.Fatalf("Failed to create enforcer: %v", err) + } + _ = e.SetWatcher(w) + + // Setup another enforcer with a regular client to test sync + e2, w2 := initWatcherWithOptions(t, wo, true) + + // Setup callback + t.Log("Waiting for watcher pub/sub sync...") + time.Sleep(500 * time.Millisecond) + + // Add a policy in e and expect e2 to reflect the change + _, err = e.AddPolicy("userXYZ", "dataXYZ", "read") + if err != nil { + t.Fatalf("Failed to add policy: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + if !reflect.DeepEqual(e.GetPolicy(), e2.GetPolicy()) { + t.Log("Method", "AddPolicy (Custom Clients)") + t.Log("e policy", e.GetPolicy()) + t.Log("e2 policy", e2.GetPolicy()) + t.Error("Policy mismatch: Watcher with custom clients failed to sync") + } + + // Clean up + w.Close() + w2.Close() + _ = subClient.Close() + _ = pubClient.Close() +} From 5b7d2650dbf2327c17bbff1e11a0a9f080ab2ff3 Mon Sep 17 00:00:00 2001 From: yash_zod Date: Sun, 13 Jul 2025 16:25:29 +0530 Subject: [PATCH 3/8] reverted test cases --- watcher_test.go | 105 +----------------------------------------------- 1 file changed, 2 insertions(+), 103 deletions(-) diff --git a/watcher_test.go b/watcher_test.go index b6980a2..816ea13 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -1,16 +1,13 @@ package rediswatcher_test import ( - "context" "encoding/json" "fmt" + "github.com/google/uuid" "reflect" "testing" "time" - "github.com/google/uuid" - rds "github.com/redis/go-redis/v9" - "github.com/casbin/casbin/v2" "github.com/casbin/casbin/v2/persist" rediswatcher "github.com/casbin/redis-watcher/v2" @@ -53,7 +50,7 @@ func TestWatcher(t *testing.T) { }) _ = w.Update() w.Close() - time.Sleep(time.Millisecond * 5000) + time.Sleep(time.Millisecond * 500) } func TestWatcherWithIngnoreSelfFalse(t *testing.T) { @@ -301,101 +298,3 @@ func TestUpdateForUpdatePolicies(t *testing.T) { w2.Close() time.Sleep(time.Millisecond * 500) } - -func TestClusteredWatcherSync(t *testing.T) { - wo := rediswatcher.WatcherOptions{ - IgnoreSelf: true, // Ensure only remote updates are received - } - - // Initialize two clustered enforcers/watcher instances - e1, w1 := initWatcherWithOptions(t, wo, true) - e2, w2 := initWatcherWithOptions(t, wo, true) - - // Wait for pub/sub connection setup - time.Sleep(500 * time.Millisecond) - - // Add a policy to e1 and expect e2 to sync via the watcher - _, err := e1.AddPolicy("user1", "data1", "read") - if err != nil { - t.Fatalf("Failed to add policy on e1: %v", err) - } - - time.Sleep(500 * time.Millisecond) - - if !reflect.DeepEqual(e1.GetPolicy(), e2.GetPolicy()) { - t.Log("Method", "AddPolicy (Clustered)") - t.Log("e1 policy", e1.GetPolicy()) - t.Log("e2 policy", e2.GetPolicy()) - t.Error("Policy mismatch: Clustered enforcers did not sync") - } - - // Clean up - w1.Close() - w2.Close() -} - -func TestClusteredWatcherSyncWithPreconfiguredClients(t *testing.T) { - // Manually create Redis cluster clients - subClient := rds.NewClusterClient(&rds.ClusterOptions{ - Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, - }) - pubClient := rds.NewClusterClient(&rds.ClusterOptions{ - Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, - }) - - // Ping to ensure clients are connected - if err := subClient.Ping(context.Background()).Err(); err != nil { - t.Fatalf("Failed to ping subClient: %v", err) - } - if err := pubClient.Ping(context.Background()).Err(); err != nil { - t.Fatalf("Failed to ping pubClient: %v", err) - } - - // Prepare watcher options with the created clients - wo := rediswatcher.WatcherOptions{ - IgnoreSelf: true, - SubClusterClient: subClient, - PubClusterClient: pubClient, - } - - // Initialize the watcher using clients instead of address string - w, err := rediswatcher.NewWatcherWithCluster("", wo) - if err != nil { - t.Fatalf("Failed to initialize watcher with custom clients: %v", err) - } - - // Setup enforcer and bind to watcher - e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") - if err != nil { - t.Fatalf("Failed to create enforcer: %v", err) - } - _ = e.SetWatcher(w) - - // Setup another enforcer with a regular client to test sync - e2, w2 := initWatcherWithOptions(t, wo, true) - - // Setup callback - t.Log("Waiting for watcher pub/sub sync...") - time.Sleep(500 * time.Millisecond) - - // Add a policy in e and expect e2 to reflect the change - _, err = e.AddPolicy("userXYZ", "dataXYZ", "read") - if err != nil { - t.Fatalf("Failed to add policy: %v", err) - } - - time.Sleep(500 * time.Millisecond) - - if !reflect.DeepEqual(e.GetPolicy(), e2.GetPolicy()) { - t.Log("Method", "AddPolicy (Custom Clients)") - t.Log("e policy", e.GetPolicy()) - t.Log("e2 policy", e2.GetPolicy()) - t.Error("Policy mismatch: Watcher with custom clients failed to sync") - } - - // Clean up - w.Close() - w2.Close() - _ = subClient.Close() - _ = pubClient.Close() -} From 4c3c44c831efe52d2baa4474b004252c8e6196c6 Mon Sep 17 00:00:00 2001 From: yash_zod Date: Mon, 14 Jul 2025 01:32:42 +0530 Subject: [PATCH 4/8] switch to go install for goveralls setup --- .github/workflows/ci.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e8d2be2..336705b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,11 +33,9 @@ jobs: - name: Test run: go test -covermode atomic -coverprofile=covprofile ./... - + - name: Install goveralls - env: - GO111MODULE: on - run: go get github.com/mattn/goveralls + run: go install github.com/mattn/goveralls@latest - name: Send coverage env: From a9ec8907fdcaa1837623207454c497107d38df5e Mon Sep 17 00:00:00 2001 From: yash_zod Date: Mon, 14 Jul 2025 01:42:12 +0530 Subject: [PATCH 5/8] added test cases --- watcher.go | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/watcher.go b/watcher.go index 081a9c8..8a4eae4 100644 --- a/watcher.go +++ b/watcher.go @@ -543,3 +543,100 @@ func (w *Watcher) Close() { close(w.close) w.pubClient.Publish(w.ctx, w.options.Channel, "Close") } + +func TestClusteredWatcherSync(t *testing.T) { + wo := rediswatcher.WatcherOptions{ + IgnoreSelf: true, // Ensure only remote updates are received + } + + // Initialize two clustered enforcers/watcher instances + e1, w1 := initWatcherWithOptions(t, wo, true) + e2, w2 := initWatcherWithOptions(t, wo, true) + + // Wait for pub/sub connection setup + time.Sleep(500 * time.Millisecond) + + // Add a policy to e1 and expect e2 to sync via the watcher + _, err := e1.AddPolicy("user1", "data1", "read") + if err != nil { + t.Fatalf("Failed to add policy on e1: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + if !reflect.DeepEqual(e1.GetPolicy(), e2.GetPolicy()) { + t.Log("Method", "AddPolicy (Clustered)") + t.Log("e1 policy", e1.GetPolicy()) + t.Log("e2 policy", e2.GetPolicy()) + t.Error("Policy mismatch: Clustered enforcers did not sync") + } + + // Clean up + w1.Close() + w2.Close() +} + +func TestClusteredWatcherSyncWithPreconfiguredClients(t *testing.T) { + // Manually create Redis cluster clients + subClient := rds.NewClusterClient(&rds.ClusterOptions{ + Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, + }) + pubClient := rds.NewClusterClient(&rds.ClusterOptions{ + Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, + }) + + // Ping to ensure clients are connected + if err := subClient.Ping(context.Background()).Err(); err != nil { + t.Fatalf("Failed to ping subClient: %v", err) + } + if err := pubClient.Ping(context.Background()).Err(); err != nil { + t.Fatalf("Failed to ping pubClient: %v", err) + } + + // Prepare watcher options with the created clients + wo := rediswatcher.WatcherOptions{ + IgnoreSelf: true, + SubClusterClient: subClient, + PubClusterClient: pubClient, + } + + // Initialize the watcher using clients instead of address string + w, err := rediswatcher.NewWatcherWithCluster("", wo) + if err != nil { + t.Fatalf("Failed to initialize watcher with custom clients: %v", err) + } + + // Setup enforcer and bind to watcher + e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") + if err != nil { + t.Fatalf("Failed to create enforcer: %v", err) + } + _ = e.SetWatcher(w) + + // Setup another enforcer with a regular client to test sync + e2, w2 := initWatcherWithOptions(t, wo, true) + + // Setup callback + t.Log("Waiting for watcher pub/sub sync...") + time.Sleep(500 * time.Millisecond) + + // Add a policy in e and expect e2 to reflect the change + _, err = e.AddPolicy("userXYZ", "dataXYZ", "read") + if err != nil { + t.Fatalf("Failed to add policy: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + if !reflect.DeepEqual(e.GetPolicy(), e2.GetPolicy()) { + t.Log("Method", "AddPolicy (Custom Clients)") + t.Log("e policy", e.GetPolicy()) + t.Log("e2 policy", e2.GetPolicy()) + t.Error("Policy mismatch: Watcher with custom clients failed to sync") + } + + // Clean up + w.Close() + w2.Close() + _ = subClient.Close() + _ = pubClient.Close() \ No newline at end of file From 0a017e1087eeeed123bbe20e2c09950ad40a1f1d Mon Sep 17 00:00:00 2001 From: yash_zod Date: Mon, 14 Jul 2025 01:47:25 +0530 Subject: [PATCH 6/8] updated test cases --- watcher.go | 97 --------------------------------------------- watcher_test.go | 103 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 102 insertions(+), 98 deletions(-) diff --git a/watcher.go b/watcher.go index 8a4eae4..081a9c8 100644 --- a/watcher.go +++ b/watcher.go @@ -543,100 +543,3 @@ func (w *Watcher) Close() { close(w.close) w.pubClient.Publish(w.ctx, w.options.Channel, "Close") } - -func TestClusteredWatcherSync(t *testing.T) { - wo := rediswatcher.WatcherOptions{ - IgnoreSelf: true, // Ensure only remote updates are received - } - - // Initialize two clustered enforcers/watcher instances - e1, w1 := initWatcherWithOptions(t, wo, true) - e2, w2 := initWatcherWithOptions(t, wo, true) - - // Wait for pub/sub connection setup - time.Sleep(500 * time.Millisecond) - - // Add a policy to e1 and expect e2 to sync via the watcher - _, err := e1.AddPolicy("user1", "data1", "read") - if err != nil { - t.Fatalf("Failed to add policy on e1: %v", err) - } - - time.Sleep(500 * time.Millisecond) - - if !reflect.DeepEqual(e1.GetPolicy(), e2.GetPolicy()) { - t.Log("Method", "AddPolicy (Clustered)") - t.Log("e1 policy", e1.GetPolicy()) - t.Log("e2 policy", e2.GetPolicy()) - t.Error("Policy mismatch: Clustered enforcers did not sync") - } - - // Clean up - w1.Close() - w2.Close() -} - -func TestClusteredWatcherSyncWithPreconfiguredClients(t *testing.T) { - // Manually create Redis cluster clients - subClient := rds.NewClusterClient(&rds.ClusterOptions{ - Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, - }) - pubClient := rds.NewClusterClient(&rds.ClusterOptions{ - Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, - }) - - // Ping to ensure clients are connected - if err := subClient.Ping(context.Background()).Err(); err != nil { - t.Fatalf("Failed to ping subClient: %v", err) - } - if err := pubClient.Ping(context.Background()).Err(); err != nil { - t.Fatalf("Failed to ping pubClient: %v", err) - } - - // Prepare watcher options with the created clients - wo := rediswatcher.WatcherOptions{ - IgnoreSelf: true, - SubClusterClient: subClient, - PubClusterClient: pubClient, - } - - // Initialize the watcher using clients instead of address string - w, err := rediswatcher.NewWatcherWithCluster("", wo) - if err != nil { - t.Fatalf("Failed to initialize watcher with custom clients: %v", err) - } - - // Setup enforcer and bind to watcher - e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") - if err != nil { - t.Fatalf("Failed to create enforcer: %v", err) - } - _ = e.SetWatcher(w) - - // Setup another enforcer with a regular client to test sync - e2, w2 := initWatcherWithOptions(t, wo, true) - - // Setup callback - t.Log("Waiting for watcher pub/sub sync...") - time.Sleep(500 * time.Millisecond) - - // Add a policy in e and expect e2 to reflect the change - _, err = e.AddPolicy("userXYZ", "dataXYZ", "read") - if err != nil { - t.Fatalf("Failed to add policy: %v", err) - } - - time.Sleep(500 * time.Millisecond) - - if !reflect.DeepEqual(e.GetPolicy(), e2.GetPolicy()) { - t.Log("Method", "AddPolicy (Custom Clients)") - t.Log("e policy", e.GetPolicy()) - t.Log("e2 policy", e2.GetPolicy()) - t.Error("Policy mismatch: Watcher with custom clients failed to sync") - } - - // Clean up - w.Close() - w2.Close() - _ = subClient.Close() - _ = pubClient.Close() \ No newline at end of file diff --git a/watcher_test.go b/watcher_test.go index 816ea13..6196783 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -1,13 +1,16 @@ package rediswatcher_test import ( + "context" "encoding/json" "fmt" - "github.com/google/uuid" "reflect" "testing" "time" + "github.com/google/uuid" + rds "github.com/redis/go-redis/v9" + "github.com/casbin/casbin/v2" "github.com/casbin/casbin/v2/persist" rediswatcher "github.com/casbin/redis-watcher/v2" @@ -298,3 +301,101 @@ func TestUpdateForUpdatePolicies(t *testing.T) { w2.Close() time.Sleep(time.Millisecond * 500) } + +func TestClusteredWatcherSync(t *testing.T) { + wo := rediswatcher.WatcherOptions{ + IgnoreSelf: true, // Ensure only remote updates are received + } + + // Initialize two clustered enforcers/watcher instances + e1, w1 := initWatcherWithOptions(t, wo, true) + e2, w2 := initWatcherWithOptions(t, wo, true) + + // Wait for pub/sub connection setup + time.Sleep(500 * time.Millisecond) + + // Add a policy to e1 and expect e2 to sync via the watcher + _, err := e1.AddPolicy("user1", "data1", "read") + if err != nil { + t.Fatalf("Failed to add policy on e1: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + if !reflect.DeepEqual(e1.GetPolicy(), e2.GetPolicy()) { + t.Log("Method", "AddPolicy (Clustered)") + t.Log("e1 policy", e1.GetPolicy()) + t.Log("e2 policy", e2.GetPolicy()) + t.Error("Policy mismatch: Clustered enforcers did not sync") + } + + // Clean up + w1.Close() + w2.Close() +} + +func TestClusteredWatcherSyncWithPreconfiguredClients(t *testing.T) { + // Manually create Redis cluster clients + subClient := rds.NewClusterClient(&rds.ClusterOptions{ + Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, + }) + pubClient := rds.NewClusterClient(&rds.ClusterOptions{ + Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, + }) + + // Ping to ensure clients are connected + if err := subClient.Ping(context.Background()).Err(); err != nil { + t.Fatalf("Failed to ping subClient: %v", err) + } + if err := pubClient.Ping(context.Background()).Err(); err != nil { + t.Fatalf("Failed to ping pubClient: %v", err) + } + + // Prepare watcher options with the created clients + wo := rediswatcher.WatcherOptions{ + IgnoreSelf: true, + SubClusterClient: subClient, + PubClusterClient: pubClient, + } + + // Initialize the watcher using clients instead of address string + w, err := rediswatcher.NewWatcherWithCluster("", wo) + if err != nil { + t.Fatalf("Failed to initialize watcher with custom clients: %v", err) + } + + // Setup enforcer and bind to watcher + e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") + if err != nil { + t.Fatalf("Failed to create enforcer: %v", err) + } + _ = e.SetWatcher(w) + + // Setup another enforcer with a regular client to test sync + e2, w2 := initWatcherWithOptions(t, wo, true) + + // Setup callback + t.Log("Waiting for watcher pub/sub sync...") + time.Sleep(500 * time.Millisecond) + + // Add a policy in e and expect e2 to reflect the change + _, err = e.AddPolicy("userXYZ", "dataXYZ", "read") + if err != nil { + t.Fatalf("Failed to add policy: %v", err) + } + + time.Sleep(500 * time.Millisecond) + + if !reflect.DeepEqual(e.GetPolicy(), e2.GetPolicy()) { + t.Log("Method", "AddPolicy (Custom Clients)") + t.Log("e policy", e.GetPolicy()) + t.Log("e2 policy", e2.GetPolicy()) + t.Error("Policy mismatch: Watcher with custom clients failed to sync") + } + + // Clean up + w.Close() + w2.Close() + _ = subClient.Close() + _ = pubClient.Close() +} From 01f77395849dbb210eaf0c40e64b54b50fac85b0 Mon Sep 17 00:00:00 2001 From: yash_zod Date: Mon, 14 Jul 2025 01:49:14 +0530 Subject: [PATCH 7/8] removed test case --- watcher_test.go | 68 ------------------------------------------------- 1 file changed, 68 deletions(-) diff --git a/watcher_test.go b/watcher_test.go index 6196783..41c3df8 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -1,7 +1,6 @@ package rediswatcher_test import ( - "context" "encoding/json" "fmt" "reflect" @@ -9,7 +8,6 @@ import ( "time" "github.com/google/uuid" - rds "github.com/redis/go-redis/v9" "github.com/casbin/casbin/v2" "github.com/casbin/casbin/v2/persist" @@ -333,69 +331,3 @@ func TestClusteredWatcherSync(t *testing.T) { w1.Close() w2.Close() } - -func TestClusteredWatcherSyncWithPreconfiguredClients(t *testing.T) { - // Manually create Redis cluster clients - subClient := rds.NewClusterClient(&rds.ClusterOptions{ - Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, - }) - pubClient := rds.NewClusterClient(&rds.ClusterOptions{ - Addrs: []string{"127.0.0.1:6379", "127.0.0.1:6379", "127.0.0.1:6379"}, - }) - - // Ping to ensure clients are connected - if err := subClient.Ping(context.Background()).Err(); err != nil { - t.Fatalf("Failed to ping subClient: %v", err) - } - if err := pubClient.Ping(context.Background()).Err(); err != nil { - t.Fatalf("Failed to ping pubClient: %v", err) - } - - // Prepare watcher options with the created clients - wo := rediswatcher.WatcherOptions{ - IgnoreSelf: true, - SubClusterClient: subClient, - PubClusterClient: pubClient, - } - - // Initialize the watcher using clients instead of address string - w, err := rediswatcher.NewWatcherWithCluster("", wo) - if err != nil { - t.Fatalf("Failed to initialize watcher with custom clients: %v", err) - } - - // Setup enforcer and bind to watcher - e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") - if err != nil { - t.Fatalf("Failed to create enforcer: %v", err) - } - _ = e.SetWatcher(w) - - // Setup another enforcer with a regular client to test sync - e2, w2 := initWatcherWithOptions(t, wo, true) - - // Setup callback - t.Log("Waiting for watcher pub/sub sync...") - time.Sleep(500 * time.Millisecond) - - // Add a policy in e and expect e2 to reflect the change - _, err = e.AddPolicy("userXYZ", "dataXYZ", "read") - if err != nil { - t.Fatalf("Failed to add policy: %v", err) - } - - time.Sleep(500 * time.Millisecond) - - if !reflect.DeepEqual(e.GetPolicy(), e2.GetPolicy()) { - t.Log("Method", "AddPolicy (Custom Clients)") - t.Log("e policy", e.GetPolicy()) - t.Log("e2 policy", e2.GetPolicy()) - t.Error("Policy mismatch: Watcher with custom clients failed to sync") - } - - // Clean up - w.Close() - w2.Close() - _ = subClient.Close() - _ = pubClient.Close() -} From 77a9fa190adbd8864c203bdfa61c02fed8ab4156 Mon Sep 17 00:00:00 2001 From: yash_zod Date: Mon, 14 Jul 2025 01:50:26 +0530 Subject: [PATCH 8/8] removed test case --- watcher_test.go | 32 -------------------------------- 1 file changed, 32 deletions(-) diff --git a/watcher_test.go b/watcher_test.go index 41c3df8..f7f8425 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -299,35 +299,3 @@ func TestUpdateForUpdatePolicies(t *testing.T) { w2.Close() time.Sleep(time.Millisecond * 500) } - -func TestClusteredWatcherSync(t *testing.T) { - wo := rediswatcher.WatcherOptions{ - IgnoreSelf: true, // Ensure only remote updates are received - } - - // Initialize two clustered enforcers/watcher instances - e1, w1 := initWatcherWithOptions(t, wo, true) - e2, w2 := initWatcherWithOptions(t, wo, true) - - // Wait for pub/sub connection setup - time.Sleep(500 * time.Millisecond) - - // Add a policy to e1 and expect e2 to sync via the watcher - _, err := e1.AddPolicy("user1", "data1", "read") - if err != nil { - t.Fatalf("Failed to add policy on e1: %v", err) - } - - time.Sleep(500 * time.Millisecond) - - if !reflect.DeepEqual(e1.GetPolicy(), e2.GetPolicy()) { - t.Log("Method", "AddPolicy (Clustered)") - t.Log("e1 policy", e1.GetPolicy()) - t.Log("e2 policy", e2.GetPolicy()) - t.Error("Policy mismatch: Clustered enforcers did not sync") - } - - // Clean up - w1.Close() - w2.Close() -}