From 122e9a894699673fef239a8a4fc81234ef0e9f90 Mon Sep 17 00:00:00 2001 From: cyruslo <710058301@qq.com> Date: Wed, 18 Dec 2024 21:53:23 +0800 Subject: [PATCH 1/2] feat: add logger context into inner client --- apollo/client.go | 17 ++++++++++++++++- apollo/client_impl.go | 15 --------------- go.mod | 5 ++++- go.sum | 2 ++ 4 files changed, 22 insertions(+), 17 deletions(-) delete mode 100644 apollo/client_impl.go diff --git a/apollo/client.go b/apollo/client.go index e57852e..7069f53 100644 --- a/apollo/client.go +++ b/apollo/client.go @@ -1,20 +1,35 @@ package apollo import ( + "context" "crypto/tls" "github.com/libgox/addr" + "golang.org/x/exp/slog" ) type Client interface { } +type innerClient struct { + ctx context.Context +} + type Config struct { Address addr.Address // TlsConfig configuration information for tls. TlsConfig *tls.Config + // Logger structured logger for logging operations + Logger *slog.Logger } func NewClient(config *Config) (Client, error) { - return newClient(config) + if config.Logger != nil { + config.Logger = slog.Default() + } + ctx := context.Background() + c := &innerClient{ + ctx: ctx, + } + return c, nil } diff --git a/apollo/client_impl.go b/apollo/client_impl.go deleted file mode 100644 index 8c72009..0000000 --- a/apollo/client_impl.go +++ /dev/null @@ -1,15 +0,0 @@ -package apollo - -type client struct { - urlPrefix string -} - -func newClient(config *Config) (*client, error) { - c := &client{} - if config.TlsConfig != nil { - c.urlPrefix = "https://" + config.Address.Addr() - } else { - c.urlPrefix = "http://" + config.Address.Addr() - } - return c, nil -} diff --git a/go.mod b/go.mod index 3549e82..c5b5634 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/protocol-laboratory/apollo-client-go go 1.20 -require github.com/libgox/addr v0.2.0 +require ( + github.com/libgox/addr v0.2.0 + golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 +) diff --git a/go.sum b/go.sum index 0bf62f6..e528468 100644 --- a/go.sum +++ b/go.sum @@ -3,4 +3,6 @@ github.com/libgox/addr v0.2.0 h1:he4Vv7lzcGy/1qNpjJufEWRql0S23Ki/j0XpTDMDu3s= github.com/libgox/addr v0.2.0/go.mod h1:j0hEfpRqTZy4BsstS9Egsn1U0JmNdj/VkbF2VBmJlFw= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA= +golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= From ef512155cce4ae82673d19cd74eb9039996b23d9 Mon Sep 17 00:00:00 2001 From: cyruslo <710058301@qq.com> Date: Thu, 19 Dec 2024 23:23:52 +0800 Subject: [PATCH 2/2] feat: fetch config --- apollo/client.go | 122 ++++++++++++++++++++++++++++++++++++----- apollo/client_test.go | 35 ++++++++++++ apollo/config.go | 47 ++++++++++++++++ apollo/event.go | 53 ++++++++++++++++++ apollo/logger.go | 57 +++++++++++++++++++ apollo/notification.go | 49 +++++++++++++++++ apollo/poll.go | 110 +++++++++++++++++++++++++++++++++++++ apollo/storage.go | 45 +++++++++++++++ apollo/utils.go | 23 ++++++++ 9 files changed, 526 insertions(+), 15 deletions(-) create mode 100644 apollo/client_test.go create mode 100644 apollo/config.go create mode 100644 apollo/event.go create mode 100644 apollo/logger.go create mode 100644 apollo/notification.go create mode 100644 apollo/poll.go create mode 100644 apollo/storage.go create mode 100644 apollo/utils.go diff --git a/apollo/client.go b/apollo/client.go index 7069f53..cbbf996 100644 --- a/apollo/client.go +++ b/apollo/client.go @@ -2,34 +2,126 @@ package apollo import ( "context" - "crypto/tls" - - "github.com/libgox/addr" - "golang.org/x/exp/slog" + "encoding/json" + "fmt" + "net/http" ) type Client interface { -} + GetStringValue(namespace, key string) string -type innerClient struct { - ctx context.Context + SubscribeEvent(listener Listener) } -type Config struct { - Address addr.Address - // TlsConfig configuration information for tls. - TlsConfig *tls.Config - // Logger structured logger for logging operations - Logger *slog.Logger +type innerClient struct { + ctx context.Context + config *Config + storage *storage + poller *longPoll + listener Listener } func NewClient(config *Config) (Client, error) { if config.Logger != nil { - config.Logger = slog.Default() + SetLogger(config.Logger) } ctx := context.Background() c := &innerClient{ - ctx: ctx, + ctx: ctx, + config: config, + storage: newStorage(config.NamespaceNames), } + c.poller = newLongPoll(config, c.updateHandle) + + // sync + err := c.poller.fetch(c.ctx) + if err != nil { + return nil, err + } + + // long poll + go c.poller.start(c.ctx) + return c, nil } + +func (i *innerClient) updateHandle(notification *notification) error { + change, err := i.sync(notification) + if err != nil { + return err + } + if change == nil || len(change.Changes) == 0 { + return fmt.Errorf("no changes to sync") + } + if i.listener != nil { + i.listener.OnChange(change) + } + return nil +} + +func (i *innerClient) sync(notification *notification) (*ChangeEvent, error) { + log.Infof("sync namespace %s with remote config server", notification.NamespaceName) + url := i.config.GetSyncURI(notification.NamespaceName) + r := &requester{ + client: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: i.config.TLSConfig, + }, + }, + retries: 3, + } + result, err := r.do(i.ctx, url, r.retries) + if err != nil || len(result) == 0 { + return nil, err + } + + ac := &apolloConfiguration{} + if err = json.Unmarshal(result, ac); err != nil { + return nil, err + } + return i.updateCache(ac) +} + +func (i *innerClient) updateCache(ac *apolloConfiguration) (*ChangeEvent, error) { + var change = &ChangeEvent{ + Namespace: ac.NamespaceName, + Changes: make(map[string]*Change), + } + c := i.storage.loadCache(ac.NamespaceName) + + c.data.Range(func(k, v interface{}) bool { + key := k.(string) + value := v.(string) + if _, ok := ac.Configurations[key]; !ok { + c.data.Delete(key) + change.Changes[key] = onDelete(key, value) + } + return true + }) + + for k, v := range ac.Configurations { + old, ok := c.data.Load(k) + if !ok { + change.Changes[k] = onAdd(k, v) + c.data.Store(k, v) + continue + } + if old.(string) != v { + change.Changes[k] = onModify(k, old.(string), v) + } + c.data.Store(k, v) + } + return change, nil +} + +func (i *innerClient) SubscribeEvent(listener Listener) { + i.listener = listener +} + +func (i *innerClient) GetStringValue(namespace string, key string) string { + v, ok := i.storage.loadCache(namespace).data.Load(key) + if !ok { + return "" + } + return v.(string) +} diff --git a/apollo/client_test.go b/apollo/client_test.go new file mode 100644 index 0000000..c4b87fa --- /dev/null +++ b/apollo/client_test.go @@ -0,0 +1,35 @@ +package apollo + +import ( + "testing" + "time" + + "github.com/libgox/addr" +) + +func TestNewClient(t *testing.T) { + c, err := NewClient(&Config{ + AppID: "SampleApp", + Cluster: "default", + NamespaceNames: []string{"application", "application2"}, + Address: addr.Address{ + Host: "localhost", + Port: 8080, + }, + Secret: "", + TLSConfig: nil, + Logger: nil, + }) + if err == nil { + value := c.GetStringValue("application", "timeout") + value2 := c.GetStringValue("application2", "timeout") + c.SubscribeEvent(&ClientTest{}) + t.Log(value, ",", value2) + } + time.Sleep(100 * time.Second) +} + +type ClientTest struct{} + +func (c *ClientTest) OnChange(event *ChangeEvent) { +} diff --git a/apollo/config.go b/apollo/config.go new file mode 100644 index 0000000..69dcbb8 --- /dev/null +++ b/apollo/config.go @@ -0,0 +1,47 @@ +package apollo + +import ( + "crypto/tls" + "fmt" + "net/url" + + "github.com/libgox/addr" +) + +type Config struct { + AppID string + Cluster string + NamespaceNames []string + Address addr.Address + Secret string + // TlsConfig configuration information for tls. + TLSConfig *tls.Config + Logger Logger +} + +func (c *Config) GetNotifyURLSuffix(notifications string) string { + return fmt.Sprintf("%s/notifications/v2?appId=%s&cluster=%s¬ifications=%s", + c.GetUrlPrefix(), + url.QueryEscape(c.AppID), + url.QueryEscape(c.Cluster), + url.QueryEscape(notifications)) +} + +func (c *Config) GetSyncURI(namespace string) string { + return fmt.Sprintf("%s/configs/%s/%s/%s?releaseKey=&ip=%s", + c.GetUrlPrefix(), + url.QueryEscape(c.AppID), + url.QueryEscape(c.Cluster), + url.QueryEscape(namespace), + GetLocalIP()) +} + +func (c *Config) GetUrlPrefix() string { + var urlPrefix string + if c.TLSConfig != nil { + urlPrefix = "https://" + c.Address.Addr() + } else { + urlPrefix = "http://" + c.Address.Addr() + } + return urlPrefix +} diff --git a/apollo/event.go b/apollo/event.go new file mode 100644 index 0000000..f5121a0 --- /dev/null +++ b/apollo/event.go @@ -0,0 +1,53 @@ +package apollo + +type ChangeType int + +const ( + ADD ChangeType = iota + + MODIFY + + DELETE +) + +type Listener interface { + OnChange(event *ChangeEvent) +} + +type Change struct { + Key string + OldValue string + NewValue string + ChangeType ChangeType +} + +type ChangeEvent struct { + Namespace string + NotificationID int + Changes map[string]*Change +} + +func onDelete(key, value string) *Change { + return &Change{ + Key: key, + ChangeType: DELETE, + OldValue: value, + } +} + +func onModify(key, oldValue, newValue string) *Change { + return &Change{ + Key: key, + ChangeType: MODIFY, + OldValue: oldValue, + NewValue: newValue, + } +} + +func onAdd(key, value string) *Change { + return &Change{ + Key: key, + ChangeType: ADD, + NewValue: value, + } +} diff --git a/apollo/logger.go b/apollo/logger.go new file mode 100644 index 0000000..72f23c2 --- /dev/null +++ b/apollo/logger.go @@ -0,0 +1,57 @@ +package apollo + +import ( + "fmt" + + "golang.org/x/exp/slog" +) + +type Logger interface { + Info(format string, args ...interface{}) + + Error(format string, args ...interface{}) + + Warn(format string, args ...interface{}) + + Infof(format string, args ...interface{}) + + Errorf(format string, args ...interface{}) + + Warnf(format string, args ...interface{}) +} + +type defaultLogger struct { + Logger *slog.Logger +} + +var log Logger = &defaultLogger{ + Logger: slog.Default(), +} + +func SetLogger(logger Logger) { + log = logger +} + +func (d *defaultLogger) Info(format string, args ...interface{}) { + d.Logger.Info(format, args...) +} + +func (d *defaultLogger) Error(format string, args ...interface{}) { + d.Logger.Error(format, args...) +} + +func (d *defaultLogger) Warn(format string, args ...interface{}) { + d.Logger.Warn(format, args...) +} + +func (d *defaultLogger) Infof(format string, args ...interface{}) { + d.Logger.Info(fmt.Sprintf(format, args...)) +} + +func (d *defaultLogger) Errorf(format string, args ...interface{}) { + d.Logger.Error(fmt.Sprintf(format, args...)) +} + +func (d *defaultLogger) Warnf(format string, args ...interface{}) { + d.Logger.Warn(fmt.Sprintf(format, args...)) +} diff --git a/apollo/notification.go b/apollo/notification.go new file mode 100644 index 0000000..8c9683d --- /dev/null +++ b/apollo/notification.go @@ -0,0 +1,49 @@ +package apollo + +import ( + "encoding/json" + "sync" +) + +const defaultNotificationID int = -1 + +type notificationsMgr struct { + notifications sync.Map +} + +type notification struct { + NamespaceName string `json:"namespaceName"` + NotificationID int `json:"notificationId"` +} + +func newNotificationManager(namespaceNames []string) *notificationsMgr { + n := ¬ificationsMgr{ + notifications: sync.Map{}, + } + for _, namespaceName := range namespaceNames { + n.notifications.Store(namespaceName, defaultNotificationID) + } + return n +} + +func (n *notificationsMgr) String() string { + var notifications []*notification + n.notifications.Range(func(key, value interface{}) bool { + k, _ := key.(string) + v, _ := value.(int) + notifications = append(notifications, ¬ification{ + NamespaceName: k, + NotificationID: v, + }) + return true + }) + res, err := json.Marshal(¬ifications) + if err != nil { + return "" + } + return string(res) +} + +func (n *notificationsMgr) Store(namespaceName string, notificationID int) { + n.notifications.Store(namespaceName, notificationID) +} diff --git a/apollo/poll.go b/apollo/poll.go new file mode 100644 index 0000000..f91afb6 --- /dev/null +++ b/apollo/poll.go @@ -0,0 +1,110 @@ +package apollo + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +const defaultLongPollInterval = time.Second * 2 + +type notifyHandler func(n *notification) error + +type requester struct { + client *http.Client + retries int +} + +func (r *requester) do(ctx context.Context, uri string, retries int) ([]byte, error) { + resp, err := r.client.Get(uri) + if err != nil { + if retries > 0 { + return r.do(ctx, uri, retries-1) + } + return nil, err + } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + log.Errorf("failed to close response body: %v", err) + return + } + }(resp.Body) + + if resp.StatusCode == http.StatusOK { + return io.ReadAll(resp.Body) + } + + return nil, fmt.Errorf("apollo return http resp code %d", resp.StatusCode) +} + +type longPoll struct { + config *Config + interval time.Duration + handler notifyHandler + requester *requester + notification *notificationsMgr +} + +func newLongPoll(config *Config, handler notifyHandler) *longPoll { + p := &longPoll{ + config: config, + interval: defaultLongPollInterval, + handler: handler, + requester: &requester{ + client: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: config.TLSConfig, + }, + }, + retries: 3, + }, + notification: newNotificationManager(config.NamespaceNames), + } + return p +} + +func (p *longPoll) start(ctx context.Context) { + child, cancel := context.WithCancel(ctx) + defer cancel() + + timer := time.NewTimer(p.interval) + defer timer.Stop() + + for { + select { + case <-timer.C: + err := p.fetch(child) + log.Errorf("fetch config err: %v", err) + timer.Reset(p.interval) + case <-child.Done(): + return + } + } +} + +func (p *longPoll) fetch(ctx context.Context) error { + url := p.config.GetNotifyURLSuffix(p.notification.String()) + result, err := p.requester.do(ctx, url, p.requester.retries) + if err != nil { + return err + } + if len(result) == 0 { + log.Warn("apollo get notify result empty") + return nil + } + var n []*notification + if err := json.Unmarshal(result, &n); err != nil { + return err + } + for _, v := range n { + if err := p.handler(v); err != nil { + return err + } + p.notification.Store(v.NamespaceName, v.NotificationID) + } + return nil +} diff --git a/apollo/storage.go b/apollo/storage.go new file mode 100644 index 0000000..84fc53a --- /dev/null +++ b/apollo/storage.go @@ -0,0 +1,45 @@ +package apollo + +import ( + "sync" +) + +// storage namespace cache +type storage struct { + caches sync.Map +} + +func newStorage(namespaceNames []string) *storage { + s := &storage{ + caches: sync.Map{}, + } + for _, namespace := range namespaceNames { + s.caches.Store(namespace, &cache{ + data: sync.Map{}, + }) + } + return s +} + +func (s *storage) loadCache(namespace string) *cache { + if value, ok := s.caches.Load(namespace); ok { + return value.(*cache) + } + c := &cache{ + data: sync.Map{}, + } + s.caches.Store(namespace, c) + return c +} + +// apolloConfiguration query config result +type apolloConfiguration struct { + NamespaceName string `json:"namespaceName"` + Configurations map[string]string `json:"configurations"` + ReleaseKey string `json:"releaseKey"` +} + +// cache apollo namespace configuration cache +type cache struct { + data sync.Map +} diff --git a/apollo/utils.go b/apollo/utils.go new file mode 100644 index 0000000..6c2438f --- /dev/null +++ b/apollo/utils.go @@ -0,0 +1,23 @@ +package apollo + +import "net" + +func GetLocalIP() string { + ips, err := net.InterfaceAddrs() + if err != nil { + return "" + } + for _, ip := range ips { + if ip4 := toIP4(ip); ip4 != nil { + return ip4.String() + } + } + return "" +} + +func toIP4(addr net.Addr) net.IP { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + return ipNet.IP.To4() + } + return nil +}