diff --git a/changelog/unreleased/fix-natsjskv-registry.md b/changelog/unreleased/fix-natsjskv-registry.md new file mode 100644 index 00000000000..8e93a285688 --- /dev/null +++ b/changelog/unreleased/fix-natsjskv-registry.md @@ -0,0 +1,5 @@ +Bugfix: Repair nats-js-kv registry + +The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it. + +https://github.com/owncloud/ocis/pull/9620 diff --git a/go.mod b/go.mod index d9e3f316934..df2a5637100 100644 --- a/go.mod +++ b/go.mod @@ -365,6 +365,8 @@ replace github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-2 replace github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c +replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf + // exclude the v2 line of go-sqlite3 which was released accidentally and prevents pulling in newer versions of go-sqlite3 // see https://github.com/mattn/go-sqlite3/issues/965 for more details exclude github.com/mattn/go-sqlite3 v2.0.3+incompatible diff --git a/go.sum b/go.sum index e78eee350db..761c3cea13d 100644 --- a/go.sum +++ b/go.sum @@ -1218,8 +1218,6 @@ github.com/go-micro/plugins/v4/server/http v1.2.2 h1:UK2/09AU0zV3wHELuR72TZzVU2v github.com/go-micro/plugins/v4/server/http v1.2.2/go.mod h1:YuAjaSPxcn3LI8j2FUsqx0Rxunrj4YwDV41Ax76rLl0= github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0 h1:Qa1EBQ9UyCGecFAJQovl/MHGnvbcvDaM3qUoAG5Lnvk= github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0/go.mod h1:aCRl8JQmqIaonOl88nFPY/BOQnHPVHY9ngStzLkXnYk= -github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e h1:hwH0qXT0J3UFYRi0UD+e3ItL92oW+jdPFA+3o/j6ASg= -github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e/go.mod h1:Goi4eJ9SrKkxE6NsAVqBVNxfQFbwb7UbyII6743ldgM= github.com/go-micro/plugins/v4/store/redis v1.2.1 h1:d9kwr9bSpoK9vkHkqcv+isQUbgBCHpfwCV57pcAPS6c= github.com/go-micro/plugins/v4/store/redis v1.2.1/go.mod h1:MbCG0YiyPqETTtm7uHFmxQNCaW1o9hBoYtFwhbVjLUg= github.com/go-micro/plugins/v4/transport/grpc v1.1.0 h1:mXfDYfFQLnVDzjGY3o84oe4prfux9h8txsnA19dKsj8= @@ -1613,6 +1611,8 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02 github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf h1:X4Hm7mZFAE+vJZ62mcXuH9BywmKiAr9B4V5LQLcTr70= +github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY= github.com/kolo/xmlrpc v0.0.0-20200310150728-e0350524596b/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/ocis-pkg/natsjsregistry/registry.go b/ocis-pkg/natsjsregistry/registry.go index 6e216fd9dbe..5a45fe52bab 100644 --- a/ocis-pkg/natsjsregistry/registry.go +++ b/ocis-pkg/natsjsregistry/registry.go @@ -12,6 +12,7 @@ import ( "time" natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv" + "github.com/google/uuid" "github.com/nats-io/nats.go" "go-micro.dev/v4/registry" "go-micro.dev/v4/store" @@ -23,6 +24,8 @@ var ( _registryAddressEnv = "MICRO_REGISTRY_ADDRESS" _registryUsernameEnv = "MICRO_REGISTRY_AUTH_USERNAME" _registryPasswordEnv = "MICRO_REGISTRY_AUTH_PASSWORD" + + _serviceDelimiter = "/" ) func init() { @@ -80,76 +83,93 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti if s == nil { return errors.New("wont store nil service") } + + unique := uuid.New().String() + if s.Metadata == nil { + s.Metadata = make(map[string]string) + } + s.Metadata["uuid"] = unique + b, err := json.Marshal(s) if err != nil { return err } return n.store.Write(&store.Record{ - Key: s.Name, + Key: s.Name + _serviceDelimiter + unique, Value: b, Expiry: n.expiry, }) } -// Deregister removes a service from the registry +// Deregister removes a service from the registry. func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error { n.lock.RLock() defer n.lock.RUnlock() - return n.store.Delete(s.Name) + var unique string + if s.Metadata != nil { + unique = s.Metadata["uuid"] + } + + return n.store.Delete(s.Name + _serviceDelimiter + unique) } // GetService gets a specific service from the registry func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) { - n.lock.RLock() - defer n.lock.RUnlock() - - recs, err := n.store.Read(s) - if err != nil { - return nil, err - } - svcs := make([]*registry.Service, 0, len(recs)) - for _, rec := range recs { - var s registry.Service - if err := json.Unmarshal(rec.Value, &s); err != nil { - return nil, err - } - svcs = append(svcs, &s) - } - return svcs, nil + // avoid listing e.g. `webfinger` when requesting `web` by adding the delimiter to the service name + return n.listServices(store.ListPrefix(s + _serviceDelimiter)) } // ListServices lists all registered services func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) { + return n.listServices() +} + +// Watch allowes following the changes in the registry if it would be implemented +func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) { + return NewWatcher(n) +} + +// String returns the name of the registry +func (n *storeregistry) String() string { + return n.typ +} + +func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Service, error) { n.lock.RLock() defer n.lock.RUnlock() - keys, err := n.store.List() + keys, err := n.store.List(opts...) if err != nil { return nil, err } - var svcs []*registry.Service + svcs := make([]*registry.Service, 0, len(keys)) for _, k := range keys { - s, err := n.GetService(k) + s, err := n.getService(k) if err != nil { // TODO: continue ? return nil, err } - svcs = append(svcs, s...) + svcs = append(svcs, s) } return svcs, nil } -// Watch allowes following the changes in the registry if it would be implemented -func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) { - return nil, errors.New("watcher not implemented") -} - -// String returns the name of the registry -func (n *storeregistry) String() string { - return n.typ +func (n *storeregistry) getService(s string) (*registry.Service, error) { + recs, err := n.store.Read(s) + if err != nil { + return nil, err + } + if len(recs) == 0 { + return nil, registry.ErrNotFound + } + var svc registry.Service + if err := json.Unmarshal(recs[0].Value, &svc); err != nil { + return nil, err + } + return &svc, nil } func (n *storeregistry) storeOptions(opts registry.Options) []store.Option { diff --git a/ocis-pkg/natsjsregistry/watcher.go b/ocis-pkg/natsjsregistry/watcher.go new file mode 100644 index 00000000000..f5187358f30 --- /dev/null +++ b/ocis-pkg/natsjsregistry/watcher.go @@ -0,0 +1,74 @@ +package natsjsregistry + +import ( + "errors" + + "github.com/nats-io/nats.go" + "go-micro.dev/v4/registry" +) + +// NatsWatcher is the watcher of the nats interface +type NatsWatcher interface { + WatchAll(bucket string, opts ...nats.WatchOpt) (nats.KeyWatcher, error) +} + +// Watcher is used to keep track of changes in the registry +type Watcher struct { + watch nats.KeyWatcher + updates <-chan nats.KeyValueEntry + reg *storeregistry +} + +// NewWatcher returns a new watcher +func NewWatcher(s *storeregistry) (*Watcher, error) { + w, ok := s.store.(NatsWatcher) + if !ok { + return nil, errors.New("store does not implement watcher interface") + } + + watcher, err := w.WatchAll("service-registry") + if err != nil { + return nil, err + } + + return &Watcher{ + watch: watcher, + updates: watcher.Updates(), + reg: s, + }, nil +} + +// Next returns the next result. It is a blocking call +func (w *Watcher) Next() (*registry.Result, error) { + kve := <-w.updates + if kve == nil { + return nil, errors.New("watcher stopped") + } + + service, err := w.reg.getService(kve.Key()) + if err != nil { + return nil, err + } + + var action string + switch kve.Operation() { + default: + action = "create" + case nats.KeyValuePut: + action = "create" + case nats.KeyValueDelete: + action = "delete" + case nats.KeyValuePurge: + action = "delete" + } + + return ®istry.Result{ + Service: service, + Action: action, + }, nil +} + +// Stop stops the watcher +func (w *Watcher) Stop() { + _ = w.watch.Stop() +} diff --git a/vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go b/vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go index 13ae81d28c1..93a2c9bfe69 100644 --- a/vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go +++ b/vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go @@ -335,6 +335,20 @@ func (n *natsStore) String() string { return "NATS JetStream KeyValueStore" } +// WatchAll exposes the watcher interface from the underlying JetStreamContext. +func (n *natsStore) WatchAll(bucket string, opts ...nats.WatchOpt) (nats.KeyWatcher, error) { + if bucket == "" { + return nil, errors.New("multi bucket watching is not supported") + } + + b, err := n.js.KeyValue(bucket) + if err != nil { + return nil, errors.Wrap(err, "Failed to get bucket") + } + + return b.WatchAll(opts...) +} + // thread safe way to initialize the connection. func (n *natsStore) initConn() error { if n.hasConn() { @@ -397,7 +411,7 @@ func (n *natsStore) mustGetBucket(kv *nats.KeyValueConfig) (nats.KeyValue, error func (n *natsStore) getRecord(bucket nats.KeyValue, key string) (*store.Record, bool, error) { obj, err := bucket.Get(key) if errors.Is(err, nats.ErrKeyNotFound) { - return nil, false, nil + return nil, false, store.ErrNotFound } else if err != nil { return nil, false, errors.Wrap(err, "Failed to get object from bucket") } diff --git a/vendor/modules.txt b/vendor/modules.txt index 7fa20163a0e..980d098778c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -966,7 +966,7 @@ github.com/go-micro/plugins/v4/server/http # github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0 ## explicit; go 1.21 github.com/go-micro/plugins/v4/store/nats-js -# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e +# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf ## explicit; go 1.21 github.com/go-micro/plugins/v4/store/nats-js-kv # github.com/go-micro/plugins/v4/store/redis v1.2.1 @@ -2435,3 +2435,4 @@ stash.kopano.io/kgol/rndm # github.com/studio-b12/gowebdav => github.com/aduffeck/gowebdav v0.0.0-20231215102054-212d4a4374f6 # github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-20240123094924-5af178158eaf # github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c +# github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf