Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions pkg/datasource/apollo/apollo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@ import (
"github.com/apolloconfig/agollo/v4"
"github.com/apolloconfig/agollo/v4/component/log"
"github.com/apolloconfig/agollo/v4/env/config"
"github.com/apolloconfig/agollo/v4/storage"
"github.com/pkg/errors"
)

var (
ErrEmptyKey = errors.New("property key is empty")
ErrMissConfig = errors.New("miss config")
ErrConfigNil = errors.New("cant find a config by the special namespace")
)

const defaultNamespace = "application"

type Option func(o *options)

type options struct {
handlers []datasource.PropertyHandler
logger log.LoggerInterface
client *agollo.Client
handlers []datasource.PropertyHandler
logger log.LoggerInterface
namespace string
}

// WithPropertyHandlers set property handlers
Expand All @@ -35,11 +39,25 @@ func WithLogger(logger log.LoggerInterface) Option {
}
}

// WithNamespace set apollo namespace to supports fetching rules from specified namespace.
func WithNamespace(namespace string) Option {
return func(o *options) {
o.namespace = namespace
}
}

// apolloDatasource implements datasource.Datasource
type apolloDatasource struct {
datasource.Base
client *agollo.Client
client apolloClient
propertyKey string
namespace string
}

// apolloClient a simple apollo client to support sentinel datasource
type apolloClient interface {
GetConfig(namespace string) *storage.Config
AddChangeListener(listener storage.ChangeListener)
}

// NewDatasource create apollo datasource
Expand All @@ -56,6 +74,9 @@ func NewDatasource(conf *config.AppConfig, propertyKey string, opts ...Option) (
for _, opt := range opts {
opt(option)
}
if option.namespace == "" {
option.namespace = defaultNamespace
}
agollo.SetLogger(option.logger)
apolloClient, err := agollo.StartWithConfig(func() (*config.AppConfig, error) {
return conf, nil
Expand All @@ -66,6 +87,7 @@ func NewDatasource(conf *config.AppConfig, propertyKey string, opts ...Option) (
ds := &apolloDatasource{
client: apolloClient,
propertyKey: propertyKey,
namespace: option.namespace,
}
for _, handler := range option.handlers {
ds.AddPropertyHandler(handler)
Expand All @@ -74,7 +96,11 @@ func NewDatasource(conf *config.AppConfig, propertyKey string, opts ...Option) (
}

func (a *apolloDatasource) ReadSource() ([]byte, error) {
value := a.client.GetValue(a.propertyKey)
cfg := a.client.GetConfig(a.namespace)
if cfg == nil {
return nil, ErrConfigNil
}
value := cfg.GetValue(a.propertyKey)
return []byte(value), nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/apollo/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ type customChangeListener struct {

func (c *customChangeListener) OnChange(event *storage.ChangeEvent) {
for key, value := range event.Changes {
if c.ds.propertyKey == key {
if c.ds.namespace == event.Namespace && c.ds.propertyKey == key {
c.ds.handle([]byte(value.NewValue.(string)))
}
}
}

func (c *customChangeListener) OnNewestChange(event *storage.FullChangeEvent) {
for key, value := range event.Changes {
if c.ds.propertyKey == key {
if c.ds.namespace == event.Namespace && c.ds.propertyKey == key {
c.ds.handle([]byte(value.(string)))
}
}
Expand Down
Loading