From 60d4305bab8db99c01914408b3775d00dce990bc Mon Sep 17 00:00:00 2001 From: jkoberg Date: Tue, 16 Jul 2024 12:19:48 +0200 Subject: [PATCH 1/2] fix(natsjsregistry): spread load evenly among instances Signed-off-by: jkoberg --- changelog/unreleased/fix-natsjskv-registry.md | 5 ++ ocis-pkg/natsjsregistry/registry.go | 82 ++++++++++++------- ocis-pkg/natsjsregistry/watcher.go | 74 +++++++++++++++++ 3 files changed, 130 insertions(+), 31 deletions(-) create mode 100644 changelog/unreleased/fix-natsjskv-registry.md create mode 100644 ocis-pkg/natsjsregistry/watcher.go diff --git a/changelog/unreleased/fix-natsjskv-registry.md b/changelog/unreleased/fix-natsjskv-registry.md new file mode 100644 index 00000000000..934f8f3b0e9 --- /dev/null +++ b/changelog/unreleased/fix-natsjskv-registry.md @@ -0,0 +1,5 @@ +Fix: Repair nats-js-kv registry + +The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it. + +https://github.com/owncloud/ocis/pull/9618 diff --git a/ocis-pkg/natsjsregistry/registry.go b/ocis-pkg/natsjsregistry/registry.go index 6e216fd9dbe..5a45fe52bab 100644 --- a/ocis-pkg/natsjsregistry/registry.go +++ b/ocis-pkg/natsjsregistry/registry.go @@ -12,6 +12,7 @@ import ( "time" natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv" + "github.com/google/uuid" "github.com/nats-io/nats.go" "go-micro.dev/v4/registry" "go-micro.dev/v4/store" @@ -23,6 +24,8 @@ var ( _registryAddressEnv = "MICRO_REGISTRY_ADDRESS" _registryUsernameEnv = "MICRO_REGISTRY_AUTH_USERNAME" _registryPasswordEnv = "MICRO_REGISTRY_AUTH_PASSWORD" + + _serviceDelimiter = "/" ) func init() { @@ -80,76 +83,93 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti if s == nil { return errors.New("wont store nil service") } + + unique := uuid.New().String() + if s.Metadata == nil { + s.Metadata = make(map[string]string) + } + s.Metadata["uuid"] = unique + b, err := json.Marshal(s) if err != nil { return err } return n.store.Write(&store.Record{ - Key: s.Name, + Key: s.Name + _serviceDelimiter + unique, Value: b, Expiry: n.expiry, }) } -// Deregister removes a service from the registry +// Deregister removes a service from the registry. func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error { n.lock.RLock() defer n.lock.RUnlock() - return n.store.Delete(s.Name) + var unique string + if s.Metadata != nil { + unique = s.Metadata["uuid"] + } + + return n.store.Delete(s.Name + _serviceDelimiter + unique) } // GetService gets a specific service from the registry func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) { - n.lock.RLock() - defer n.lock.RUnlock() - - recs, err := n.store.Read(s) - if err != nil { - return nil, err - } - svcs := make([]*registry.Service, 0, len(recs)) - for _, rec := range recs { - var s registry.Service - if err := json.Unmarshal(rec.Value, &s); err != nil { - return nil, err - } - svcs = append(svcs, &s) - } - return svcs, nil + // avoid listing e.g. `webfinger` when requesting `web` by adding the delimiter to the service name + return n.listServices(store.ListPrefix(s + _serviceDelimiter)) } // ListServices lists all registered services func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) { + return n.listServices() +} + +// Watch allowes following the changes in the registry if it would be implemented +func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) { + return NewWatcher(n) +} + +// String returns the name of the registry +func (n *storeregistry) String() string { + return n.typ +} + +func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Service, error) { n.lock.RLock() defer n.lock.RUnlock() - keys, err := n.store.List() + keys, err := n.store.List(opts...) if err != nil { return nil, err } - var svcs []*registry.Service + svcs := make([]*registry.Service, 0, len(keys)) for _, k := range keys { - s, err := n.GetService(k) + s, err := n.getService(k) if err != nil { // TODO: continue ? return nil, err } - svcs = append(svcs, s...) + svcs = append(svcs, s) } return svcs, nil } -// Watch allowes following the changes in the registry if it would be implemented -func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) { - return nil, errors.New("watcher not implemented") -} - -// String returns the name of the registry -func (n *storeregistry) String() string { - return n.typ +func (n *storeregistry) getService(s string) (*registry.Service, error) { + recs, err := n.store.Read(s) + if err != nil { + return nil, err + } + if len(recs) == 0 { + return nil, registry.ErrNotFound + } + var svc registry.Service + if err := json.Unmarshal(recs[0].Value, &svc); err != nil { + return nil, err + } + return &svc, nil } func (n *storeregistry) storeOptions(opts registry.Options) []store.Option { diff --git a/ocis-pkg/natsjsregistry/watcher.go b/ocis-pkg/natsjsregistry/watcher.go new file mode 100644 index 00000000000..83d094bca46 --- /dev/null +++ b/ocis-pkg/natsjsregistry/watcher.go @@ -0,0 +1,74 @@ +package natsjsregistry + +import ( + "errors" + + "github.com/nats-io/nats.go" + "go-micro.dev/v4/registry" +) + +// NatsWatcher is the watcher of the nats interface +type NatsWatcher interface { + Watch(bucket string) (nats.KeyWatcher, error) +} + +// Watcher is used to keep track of changes in the registry +type Watcher struct { + watch nats.KeyWatcher + updates <-chan nats.KeyValueEntry + reg *storeregistry +} + +// NewWatcher returns a new watcher +func NewWatcher(s *storeregistry) (*Watcher, error) { + w, ok := s.store.(NatsWatcher) + if !ok { + return nil, errors.New("store does not implement watcher interface") + } + + watcher, err := w.Watch("service-registry") + if err != nil { + return nil, err + } + + return &Watcher{ + watch: watcher, + updates: watcher.Updates(), + reg: s, + }, nil +} + +// Next returns the next result. It is a blocking call +func (w *Watcher) Next() (*registry.Result, error) { + kve := <-w.updates + if kve == nil { + return nil, errors.New("watcher stopped") + } + + service, err := w.reg.getService(kve.Key()) + if err != nil { + return nil, err + } + + var action string + switch kve.Operation() { + default: + action = "create" + case nats.KeyValuePut: + action = "create" + case nats.KeyValueDelete: + action = "delete" + case nats.KeyValuePurge: + action = "delete" + } + + return ®istry.Result{ + Service: service, + Action: action, + }, nil +} + +// Stop stops the watcher +func (w *Watcher) Stop() { + _ = w.watch.Stop() +} From 6d7d18adceaa73bc3532ce7fb4c998f717645593 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Tue, 16 Jul 2024 12:23:28 +0200 Subject: [PATCH 2/2] feat(deps): bump plugins/store/nats-js-kv Signed-off-by: jkoberg --- changelog/unreleased/fix-natsjskv-registry.md | 4 ++-- go.mod | 2 ++ go.sum | 4 ++-- ocis-pkg/natsjsregistry/watcher.go | 4 ++-- .../go-micro/plugins/v4/store/nats-js-kv/nats.go | 16 +++++++++++++++- vendor/modules.txt | 3 ++- 6 files changed, 25 insertions(+), 8 deletions(-) diff --git a/changelog/unreleased/fix-natsjskv-registry.md b/changelog/unreleased/fix-natsjskv-registry.md index 934f8f3b0e9..8e93a285688 100644 --- a/changelog/unreleased/fix-natsjskv-registry.md +++ b/changelog/unreleased/fix-natsjskv-registry.md @@ -1,5 +1,5 @@ -Fix: Repair nats-js-kv registry +Bugfix: Repair nats-js-kv registry The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it. -https://github.com/owncloud/ocis/pull/9618 +https://github.com/owncloud/ocis/pull/9620 diff --git a/go.mod b/go.mod index d9e3f316934..df2a5637100 100644 --- a/go.mod +++ b/go.mod @@ -365,6 +365,8 @@ replace github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-2 replace github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c +replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf + // exclude the v2 line of go-sqlite3 which was released accidentally and prevents pulling in newer versions of go-sqlite3 // see https://github.com/mattn/go-sqlite3/issues/965 for more details exclude github.com/mattn/go-sqlite3 v2.0.3+incompatible diff --git a/go.sum b/go.sum index e78eee350db..761c3cea13d 100644 --- a/go.sum +++ b/go.sum @@ -1218,8 +1218,6 @@ github.com/go-micro/plugins/v4/server/http v1.2.2 h1:UK2/09AU0zV3wHELuR72TZzVU2v github.com/go-micro/plugins/v4/server/http v1.2.2/go.mod h1:YuAjaSPxcn3LI8j2FUsqx0Rxunrj4YwDV41Ax76rLl0= github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0 h1:Qa1EBQ9UyCGecFAJQovl/MHGnvbcvDaM3qUoAG5Lnvk= github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0/go.mod h1:aCRl8JQmqIaonOl88nFPY/BOQnHPVHY9ngStzLkXnYk= -github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e h1:hwH0qXT0J3UFYRi0UD+e3ItL92oW+jdPFA+3o/j6ASg= -github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e/go.mod h1:Goi4eJ9SrKkxE6NsAVqBVNxfQFbwb7UbyII6743ldgM= github.com/go-micro/plugins/v4/store/redis v1.2.1 h1:d9kwr9bSpoK9vkHkqcv+isQUbgBCHpfwCV57pcAPS6c= github.com/go-micro/plugins/v4/store/redis v1.2.1/go.mod h1:MbCG0YiyPqETTtm7uHFmxQNCaW1o9hBoYtFwhbVjLUg= github.com/go-micro/plugins/v4/transport/grpc v1.1.0 h1:mXfDYfFQLnVDzjGY3o84oe4prfux9h8txsnA19dKsj8= @@ -1613,6 +1611,8 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02 github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf h1:X4Hm7mZFAE+vJZ62mcXuH9BywmKiAr9B4V5LQLcTr70= +github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY= github.com/kolo/xmlrpc v0.0.0-20200310150728-e0350524596b/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/ocis-pkg/natsjsregistry/watcher.go b/ocis-pkg/natsjsregistry/watcher.go index 83d094bca46..f5187358f30 100644 --- a/ocis-pkg/natsjsregistry/watcher.go +++ b/ocis-pkg/natsjsregistry/watcher.go @@ -9,7 +9,7 @@ import ( // NatsWatcher is the watcher of the nats interface type NatsWatcher interface { - Watch(bucket string) (nats.KeyWatcher, error) + WatchAll(bucket string, opts ...nats.WatchOpt) (nats.KeyWatcher, error) } // Watcher is used to keep track of changes in the registry @@ -26,7 +26,7 @@ func NewWatcher(s *storeregistry) (*Watcher, error) { return nil, errors.New("store does not implement watcher interface") } - watcher, err := w.Watch("service-registry") + watcher, err := w.WatchAll("service-registry") if err != nil { return nil, err } diff --git a/vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go b/vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go index 13ae81d28c1..93a2c9bfe69 100644 --- a/vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go +++ b/vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go @@ -335,6 +335,20 @@ func (n *natsStore) String() string { return "NATS JetStream KeyValueStore" } +// WatchAll exposes the watcher interface from the underlying JetStreamContext. +func (n *natsStore) WatchAll(bucket string, opts ...nats.WatchOpt) (nats.KeyWatcher, error) { + if bucket == "" { + return nil, errors.New("multi bucket watching is not supported") + } + + b, err := n.js.KeyValue(bucket) + if err != nil { + return nil, errors.Wrap(err, "Failed to get bucket") + } + + return b.WatchAll(opts...) +} + // thread safe way to initialize the connection. func (n *natsStore) initConn() error { if n.hasConn() { @@ -397,7 +411,7 @@ func (n *natsStore) mustGetBucket(kv *nats.KeyValueConfig) (nats.KeyValue, error func (n *natsStore) getRecord(bucket nats.KeyValue, key string) (*store.Record, bool, error) { obj, err := bucket.Get(key) if errors.Is(err, nats.ErrKeyNotFound) { - return nil, false, nil + return nil, false, store.ErrNotFound } else if err != nil { return nil, false, errors.Wrap(err, "Failed to get object from bucket") } diff --git a/vendor/modules.txt b/vendor/modules.txt index 7fa20163a0e..980d098778c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -966,7 +966,7 @@ github.com/go-micro/plugins/v4/server/http # github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0 ## explicit; go 1.21 github.com/go-micro/plugins/v4/store/nats-js -# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e +# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf ## explicit; go 1.21 github.com/go-micro/plugins/v4/store/nats-js-kv # github.com/go-micro/plugins/v4/store/redis v1.2.1 @@ -2435,3 +2435,4 @@ stash.kopano.io/kgol/rndm # github.com/studio-b12/gowebdav => github.com/aduffeck/gowebdav v0.0.0-20231215102054-212d4a4374f6 # github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-20240123094924-5af178158eaf # github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c +# github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf