diff --git a/go-plugin/pkg/config/dynamic.go b/go-plugin/pkg/config/dynamic.go index 320191d..64f3e30 100644 --- a/go-plugin/pkg/config/dynamic.go +++ b/go-plugin/pkg/config/dynamic.go @@ -5,9 +5,14 @@ import ( "sync" ) -const ExtendConfigKey = "global_extend_config" +func Register(ctx context.Context, key string, handler ConfigHandler) { + globalExtendConfig.Register(ctx, key, handler) +} func GlobalExtendMapByContext(ctx context.Context) (*sync.Map, bool) { + if cfg, ok := globalExtendConfig.SyncMapByConfig(ctx); ok { + return &cfg, ok + } cfg, ok := ctx.Value(ExtendConfigKey).(*sync.Map) return cfg, ok } diff --git a/go-plugin/pkg/config/extend.go b/go-plugin/pkg/config/extend.go new file mode 100644 index 0000000..7715957 --- /dev/null +++ b/go-plugin/pkg/config/extend.go @@ -0,0 +1,112 @@ +package config + +import ( + "context" + "encoding/json" + "errors" + "sync" + + "mosn.io/pkg/log" + "mosn.io/pkg/variable" +) + +const ExtendConfigKey = "global_extend_config" + +var globalExtendConfig = NewGlobalExtendConfig() + +type ConfigHandler func(val string) + +type ExtendConfig struct { + effective bool + once sync.Once + dirtyConfig sync.Map + config sync.Map + handler chan string + sendh map[string]ConfigHandler +} + +func NewGlobalExtendConfig() *ExtendConfig { + return &ExtendConfig{ + handler: make(chan string), + sendh: make(map[string]ConfigHandler), + } +} + +func (gex *ExtendConfig) Register(ctx context.Context, key string, handler ConfigHandler) { + gex.sendh[key] = handler + gex.initConfig(ctx) +} + +func (gex *ExtendConfig) initConfig(ctx context.Context) { + var gerr error + gex.once.Do(func() { + recvl, err := variable.Get(ctx, ExtendConfigKey) + if err != nil { + gerr = errors.New("the dynamic config chan is not exist") + return + } + rec, ok := recvl.(chan chan string) + if !ok { + gerr = errors.New("the dynamic config chan is not exist") + return + } + // 第一次获取数据,阻塞获取 + rec <- gex.handler + gex.effective = true + val := <-gex.handler + gex.parse(val) + go gex.handlerConfig() + }) + if gerr != nil { + log.DefaultLogger.Errorf("init config failed,err:%s", gerr) + } +} + +func (gex *ExtendConfig) handlerConfig() { + for val := range gex.handler { + gex.parse(val) + } +} + +func (gex *ExtendConfig) parse(value string) error { + cc := make(map[string]string) + if err := json.Unmarshal([]byte(value), &cc); err != nil { + return err + } + // 更新&添加事件 + for key, value := range cc { + handler, ok := gex.sendh[key] + if ok { + handler(value) + } + gex.dirtyConfig.Delete(key) + } + // 删除事件 + gex.dirtyConfig.Range(func(key, value interface{}) bool { + handler, ok := gex.sendh[key.(string)] + if ok { + handler("") + } + return true + }) + + var dirtyConfig, config sync.Map + for key, value := range cc { + dirtyConfig.Store(key, value) + config.Store(key, value) + } + gex.config = config + gex.dirtyConfig = dirtyConfig + return nil +} + +func (gex *ExtendConfig) SyncMapByConfig(ctx context.Context) (sync.Map, bool) { + gex.initConfig(ctx) + return gex.config, gex.effective +} + +func (gex *ExtendConfig) GetConfig(ctx context.Context, key string) (string, bool) { + gex.initConfig(ctx) + val, ok := gex.config.Load(key) + return val.(string), ok +} diff --git a/go-plugin/pkg/config/extend_test.go b/go-plugin/pkg/config/extend_test.go new file mode 100644 index 0000000..5147c3b --- /dev/null +++ b/go-plugin/pkg/config/extend_test.go @@ -0,0 +1,51 @@ +package config + +import ( + "context" + "encoding/json" + "github.com/stretchr/testify/assert" + "mosn.io/pkg/variable" + "testing" + "time" +) + +func init() { + variable.Register(variable.NewVariable(ExtendConfigKey, nil, nil, variable.DefaultSetter, 0)) +} + +func MockContext(value string) context.Context { + receiver := make(chan chan string, 1) + ctx := variable.NewVariableContext(context.Background()) + variable.Set(ctx, ExtendConfigKey, receiver) + go func() { + rec := <-receiver + rec <- value + }() + return ctx +} + +func TestExtendConfig_Register(t *testing.T) { + gex := NewGlobalExtendConfig() + val := `{"key":"v1"}` + rval := "v1" + cc := make(map[string]string) + if err := json.Unmarshal([]byte(val), &cc); err != nil { + } + // 更新&添加事件 + handler := func(value string) { + assert.Equal(t, value, rval) + } + // add + gex.Register(MockContext(val), "key", handler) + time.Sleep(time.Millisecond * 5) + // update + val = `{"key":"value"}` + rval = "value" + gex.handler <- val + time.Sleep(time.Millisecond * 5) + // delete + val = `{}` + rval = "" + gex.handler <- val + time.Sleep(time.Millisecond * 5) +} diff --git a/go-plugin/plugins/stream_filters/dynamic_conf/main/filter.go b/go-plugin/plugins/stream_filters/dynamic_conf/main/filter.go index 6b6af34..0a2880f 100644 --- a/go-plugin/plugins/stream_filters/dynamic_conf/main/filter.go +++ b/go-plugin/plugins/stream_filters/dynamic_conf/main/filter.go @@ -28,6 +28,9 @@ type DynamicFilterFactory struct { } func (f *DynamicFilterFactory) CreateFilterChain(ctx context.Context, callbacks api.StreamFilterChainFactoryCallbacks) { + config.Register(ctx, "config", func(val string) { + log.DefaultLogger.Infof("recv config %s", config) + }) filter := NewDynamicFilter(ctx, f.config) // ReceiverFilter, run the filter when receive a request from downstream // The FilterPhase can be BeforeRoute or AfterRoute, we use BeforeRoute in this demo