diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index 0a36efc0a..32797cf47 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -380,7 +380,7 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context, CommandResponse: &mpi.CommandResponse{ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, Message: message, - Error: "Can not perform write action as auxiliary command server", + Error: "Unable to process request. Management plane is configured as read only.", }, InstanceId: instanceID, }) diff --git a/internal/config/mapper.go b/internal/config/mapper.go index beda9352a..c51b1a810 100644 --- a/internal/config/mapper.go +++ b/internal/config/mapper.go @@ -85,3 +85,40 @@ func ToCommandProto(cmd *Command) *mpi.CommandServer { return protoConfig } + +// ToAuxiliaryCommandServerProto maps the AgentConfig Command struct back to the AuxiliaryCommandServer proto message +func ToAuxiliaryCommandServerProto(cmd *Command) *mpi.AuxiliaryCommandServer { + protoConfig := &mpi.AuxiliaryCommandServer{} + + // Map ServerConfig to the ServerSettings + if cmd.Server != nil { + protoServerType := mpi.ServerSettings_SERVER_SETTINGS_TYPE_UNDEFINED + if cmd.Server.Type == Grpc { + protoServerType = mpi.ServerSettings_SERVER_SETTINGS_TYPE_GRPC + } + + protoConfig.Server = &mpi.ServerSettings{ + Host: cmd.Server.Host, + Port: int32(cmd.Server.Port), + Type: protoServerType, + } + } + + // Map AuthConfig to AuthSettings + if cmd.Auth != nil { + protoConfig.Auth = &mpi.AuthSettings{} + } + + // Map TLSConfig to TLSSettings + if cmd.TLS != nil { + protoConfig.Tls = &mpi.TLSSettings{ + Cert: cmd.TLS.Cert, + Key: cmd.TLS.Key, + Ca: cmd.TLS.Ca, + ServerName: cmd.TLS.ServerName, + SkipVerify: cmd.TLS.SkipVerify, + } + } + + return protoConfig +} diff --git a/internal/watcher/credentials/credential_watcher_service.go b/internal/watcher/credentials/credential_watcher_service.go index 0a1bcfaa8..868acd3ae 100644 --- a/internal/watcher/credentials/credential_watcher_service.go +++ b/internal/watcher/credentials/credential_watcher_service.go @@ -13,6 +13,10 @@ import ( "sync/atomic" "time" + "github.com/nginx/agent/v3/internal/model" + + "github.com/nginx/agent/v3/internal/grpc" + "github.com/fsnotify/fsnotify" "github.com/nginx/agent/v3/internal/config" "github.com/nginx/agent/v3/internal/logger" @@ -28,7 +32,9 @@ var emptyEvent = fsnotify.Event{ } type CredentialUpdateMessage struct { - CorrelationID slog.Attr + CorrelationID slog.Attr + GrpcConnection *grpc.GrpcConnection + ServerType model.ServerType } type CredentialWatcherService struct { @@ -36,9 +42,11 @@ type CredentialWatcherService struct { watcher *fsnotify.Watcher filesBeingWatched *sync.Map filesChanged *atomic.Bool + serverType model.ServerType + watcherMutex sync.Mutex } -func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherService { +func NewCredentialWatcherService(agentConfig *config.Config, serverType model.ServerType) *CredentialWatcherService { filesChanged := &atomic.Bool{} filesChanged.Store(false) @@ -46,38 +54,53 @@ func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherS agentConfig: agentConfig, filesBeingWatched: &sync.Map{}, filesChanged: filesChanged, + serverType: serverType, + watcherMutex: sync.Mutex{}, } } func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- CredentialUpdateMessage) { - slog.DebugContext(ctx, "Starting credential watcher monitoring") + newCtx := context.WithValue( + ctx, + logger.ServerTypeContextKey, + slog.Any(logger.ServerTypeKey, cws.serverType.String()), + ) + slog.DebugContext(newCtx, "Starting credential watcher monitoring") ticker := time.NewTicker(monitoringInterval) watcher, err := fsnotify.NewWatcher() if err != nil { - slog.ErrorContext(ctx, "Failed to create credential watcher", "error", err) + slog.ErrorContext(newCtx, "Failed to create credential watcher", "error", err) return } cws.watcher = watcher - cws.watchFiles(ctx, credentialPaths(cws.agentConfig)) + cws.watcherMutex.Lock() + commandServer := cws.agentConfig.Command + + if cws.serverType == model.Auxiliary { + commandServer = cws.agentConfig.AuxiliaryCommand + } + + cws.watchFiles(newCtx, credentialPaths(commandServer)) + cws.watcherMutex.Unlock() for { select { - case <-ctx.Done(): + case <-newCtx.Done(): closeError := cws.watcher.Close() if closeError != nil { - slog.ErrorContext(ctx, "Unable to close credential watcher", "error", closeError) + slog.ErrorContext(newCtx, "Unable to close credential watcher", "error", closeError) } return case event := <-cws.watcher.Events: - cws.handleEvent(ctx, event) + cws.handleEvent(newCtx, event) case <-ticker.C: - cws.checkForUpdates(ctx, ch) + cws.checkForUpdates(newCtx, ch) case watcherError := <-cws.watcher.Errors: - slog.ErrorContext(ctx, "Unexpected error in credential watcher", "error", watcherError) + slog.ErrorContext(newCtx, "Unexpected error in credential watcher", "error", watcherError) } } } @@ -146,31 +169,50 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha slog.Any(logger.CorrelationIDKey, logger.GenerateCorrelationID()), ) + cws.watcherMutex.Lock() + defer cws.watcherMutex.Unlock() + + commandServer := cws.agentConfig.Command + if cws.serverType == model.Auxiliary { + commandServer = cws.agentConfig.AuxiliaryCommand + } + + conn, err := grpc.NewGrpcConnection(newCtx, cws.agentConfig, commandServer) + if err != nil { + slog.ErrorContext(newCtx, "Unable to create new grpc connection", "error", err) + cws.filesChanged.Store(false) + + return + } slog.DebugContext(ctx, "Credential watcher has detected changes") - ch <- CredentialUpdateMessage{CorrelationID: logger.CorrelationIDAttr(newCtx)} + ch <- CredentialUpdateMessage{ + CorrelationID: logger.CorrelationIDAttr(newCtx), + ServerType: cws.serverType, + GrpcConnection: conn, + } cws.filesChanged.Store(false) } } -func credentialPaths(agentConfig *config.Config) []string { +func credentialPaths(agentConfig *config.Command) []string { var paths []string - if agentConfig.Command.Auth != nil { - if agentConfig.Command.Auth.TokenPath != "" { - paths = append(paths, agentConfig.Command.Auth.TokenPath) + if agentConfig.Auth != nil { + if agentConfig.Auth.TokenPath != "" { + paths = append(paths, agentConfig.Auth.TokenPath) } } // agent's tls certs - if agentConfig.Command.TLS != nil { - if agentConfig.Command.TLS.Ca != "" { - paths = append(paths, agentConfig.Command.TLS.Ca) + if agentConfig.TLS != nil { + if agentConfig.TLS.Ca != "" { + paths = append(paths, agentConfig.TLS.Ca) } - if agentConfig.Command.TLS.Cert != "" { - paths = append(paths, agentConfig.Command.TLS.Cert) + if agentConfig.TLS.Cert != "" { + paths = append(paths, agentConfig.TLS.Cert) } - if agentConfig.Command.TLS.Key != "" { - paths = append(paths, agentConfig.Command.TLS.Key) + if agentConfig.TLS.Key != "" { + paths = append(paths, agentConfig.TLS.Key) } } diff --git a/internal/watcher/credentials/credential_watcher_service_test.go b/internal/watcher/credentials/credential_watcher_service_test.go index dba4450f3..31ce8ae98 100644 --- a/internal/watcher/credentials/credential_watcher_service_test.go +++ b/internal/watcher/credentials/credential_watcher_service_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/internal/config" "github.com/fsnotify/fsnotify" @@ -22,7 +24,7 @@ import ( ) func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) { - credentialWatcherService := NewCredentialWatcherService(types.AgentConfig()) + credentialWatcherService := NewCredentialWatcherService(types.AgentConfig(), model.Command) assert.Empty(t, credentialWatcherService.filesBeingWatched) assert.False(t, credentialWatcherService.filesChanged.Load()) @@ -30,7 +32,7 @@ func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) func TestCredentialWatcherService_Watch(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -61,7 +63,7 @@ func TestCredentialWatcherService_Watch(t *testing.T) { } func TestCredentialWatcherService_isWatching(t *testing.T) { - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) assert.False(t, cws.isWatching("test-file")) cws.filesBeingWatched.Store("test-file", true) assert.True(t, cws.isWatching("test-file")) @@ -80,7 +82,7 @@ func TestCredentialWatcherService_isEventSkippable(t *testing.T) { func TestCredentialWatcherService_addWatcher(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -105,7 +107,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) { var files []string ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -137,7 +139,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) { func TestCredentialWatcherService_checkForUpdates(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -164,7 +166,7 @@ func TestCredentialWatcherService_checkForUpdates(t *testing.T) { func TestCredentialWatcherService_handleEvent(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -232,7 +234,7 @@ func Test_credentialPaths(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig), "credentialPaths(%v)", tt.agentConfig) + assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig.Command), "credentialPaths(%v)", tt.agentConfig) }) } } diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go index b88f62cba..2b07bad21 100644 --- a/internal/watcher/instance/instance_watcher_service.go +++ b/internal/watcher/instance/instance_watcher_service.go @@ -307,7 +307,7 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan slog.WarnContext(ctx, "Unable to convert config to labels structure", "error", convertErr) } - return &mpi.Instance{ + instance := &mpi.Instance{ InstanceMeta: &mpi.InstanceMeta{ InstanceId: iw.agentConfig.UUID, InstanceType: mpi.InstanceMeta_INSTANCE_TYPE_AGENT, @@ -333,6 +333,13 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan Details: nil, }, } + + if iw.agentConfig.IsAuxiliaryCommandGrpcClientConfigured() { + instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = config. + ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand) + } + + return instance } func compareInstances(oldInstancesMap, instancesMap map[string]*mpi.Instance) ( diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 5a3e12a6e..42ca8dd5c 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -13,8 +13,6 @@ import ( "github.com/nginx/agent/v3/internal/model" - "github.com/nginx/agent/v3/internal/grpc" - "github.com/nginx/agent/v3/internal/watcher/credentials" mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" @@ -41,12 +39,14 @@ type ( nginxAppProtectInstanceWatcher *instance.NginxAppProtectInstanceWatcher healthWatcherService *health.HealthWatcherService fileWatcherService *file.FileWatcherService - credentialWatcherService credentialWatcherServiceInterface + commandCredentialWatcherService credentialWatcherServiceInterface + auxiliaryCredentialWatcherService credentialWatcherServiceInterface instanceUpdatesChannel chan instance.InstanceUpdatesMessage nginxConfigContextChannel chan instance.NginxConfigContextMessage instanceHealthChannel chan health.InstanceHealthMessage fileUpdatesChannel chan file.FileUpdateMessage - credentialUpdatesChannel chan credentials.CredentialUpdateMessage + commandCredentialUpdatesChannel chan credentials.CredentialUpdateMessage + auxiliaryCredentialUpdatesChannel chan credentials.CredentialUpdateMessage cancel context.CancelFunc instancesWithConfigApplyInProgress []string watcherMutex sync.Mutex @@ -80,12 +80,14 @@ func NewWatcher(agentConfig *config.Config) *Watcher { nginxAppProtectInstanceWatcher: instance.NewNginxAppProtectInstanceWatcher(agentConfig), healthWatcherService: health.NewHealthWatcherService(agentConfig), fileWatcherService: file.NewFileWatcherService(agentConfig), - credentialWatcherService: credentials.NewCredentialWatcherService(agentConfig), + commandCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, model.Command), + auxiliaryCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, model.Auxiliary), instanceUpdatesChannel: make(chan instance.InstanceUpdatesMessage), nginxConfigContextChannel: make(chan instance.NginxConfigContextMessage), instanceHealthChannel: make(chan health.InstanceHealthMessage), fileUpdatesChannel: make(chan file.FileUpdateMessage), - credentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage), + commandCredentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage), + auxiliaryCredentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage), instancesWithConfigApplyInProgress: []string{}, watcherMutex: sync.Mutex{}, } @@ -103,7 +105,11 @@ func (w *Watcher) Init(ctx context.Context, messagePipe bus.MessagePipeInterface go w.nginxAppProtectInstanceWatcher.Watch(watcherContext, w.instanceUpdatesChannel) go w.instanceWatcherService.Watch(watcherContext, w.instanceUpdatesChannel, w.nginxConfigContextChannel) go w.healthWatcherService.Watch(watcherContext, w.instanceHealthChannel) - go w.credentialWatcherService.Watch(watcherContext, w.credentialUpdatesChannel) + go w.commandCredentialWatcherService.Watch(watcherContext, w.commandCredentialUpdatesChannel) + + if w.agentConfig.AuxiliaryCommand != nil { + go w.auxiliaryCredentialWatcherService.Watch(watcherContext, w.auxiliaryCredentialUpdatesChannel) + } if w.agentConfig.IsFeatureEnabled(pkgConfig.FeatureFileWatcher) { go w.fileWatcherService.Watch(watcherContext, w.fileUpdatesChannel) @@ -135,8 +141,6 @@ func (*Watcher) Info() *bus.Info { func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { switch msg.Topic { - case bus.CredentialUpdatedTopic: - w.handleCredentialUpdate(ctx) case bus.ConfigApplyRequestTopic: w.handleConfigApplyRequest(ctx, msg) case bus.ConfigApplySuccessfulTopic: @@ -152,7 +156,6 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { func (*Watcher) Subscriptions() []string { return []string{ - bus.CredentialUpdatedTopic, bus.ConfigApplyRequestTopic, bus.ConfigApplySuccessfulTopic, bus.ConfigApplyCompleteTopic, @@ -251,35 +254,15 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag w.fileWatcherService.SetEnabled(true) } -func (w *Watcher) handleCredentialUpdate(ctx context.Context) { - slog.DebugContext(ctx, "Watcher plugin received credential update message") - - w.watcherMutex.Lock() - // This will be changed/moved during the credential watcher PR - conn, err := grpc.NewGrpcConnection(ctx, w.agentConfig, w.agentConfig.Command) - if err != nil { - slog.ErrorContext(ctx, "Unable to create new grpc connection", "error", err) - w.watcherMutex.Unlock() - - return - } - w.watcherMutex.Unlock() - w.messagePipe.Process(ctx, &bus.Message{ - Topic: bus.ConnectionResetTopic, Data: conn, - }) -} - func (w *Watcher) monitorWatchers(ctx context.Context) { for { select { case <-ctx.Done(): return - case message := <-w.credentialUpdatesChannel: - slog.DebugContext(ctx, "Received credential update event") - newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID) - w.messagePipe.Process(newCtx, &bus.Message{ - Topic: bus.CredentialUpdatedTopic, Data: nil, - }) + case message := <-w.commandCredentialUpdatesChannel: + w.handleCredentialUpdate(ctx, message) + case message := <-w.auxiliaryCredentialUpdatesChannel: + w.handleCredentialUpdate(ctx, message) case message := <-w.instanceUpdatesChannel: newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID) w.handleInstanceUpdates(newCtx, message) @@ -319,6 +302,17 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { } } +func (w *Watcher) handleCredentialUpdate(ctx context.Context, message credentials.CredentialUpdateMessage) { + newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID), + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, + message.ServerType.String())) + + slog.DebugContext(newCtx, "Received credential update event for command server") + w.messagePipe.Process(newCtx, &bus.Message{ + Topic: bus.ConnectionResetTopic, Data: message.GrpcConnection, + }) +} + func (w *Watcher) handleInstanceUpdates(newCtx context.Context, message instance.InstanceUpdatesMessage) { if len(message.InstanceUpdates.NewInstances) > 0 { slog.DebugContext(newCtx, "New instances found", "instances", message.InstanceUpdates.NewInstances) diff --git a/internal/watcher/watcher_plugin_test.go b/internal/watcher/watcher_plugin_test.go index 7e893a343..20cd99957 100644 --- a/internal/watcher/watcher_plugin_test.go +++ b/internal/watcher/watcher_plugin_test.go @@ -10,7 +10,9 @@ import ( "testing" "time" - model2 "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/internal/grpc" + + "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/internal/watcher/credentials" @@ -25,7 +27,7 @@ import ( "github.com/nginx/agent/v3/internal/bus" "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/pkg/id" - "github.com/nginx/agent/v3/test/model" + testModel "github.com/nginx/agent/v3/test/model" "github.com/nginx/agent/v3/test/protos" "github.com/nginx/agent/v3/test/types" "github.com/stretchr/testify/assert" @@ -67,7 +69,7 @@ func TestWatcher_Init(t *testing.T) { nginxConfigContextMessage := instance.NginxConfigContextMessage{ CorrelationID: logger.GenerateCorrelationID(), - NginxConfigContext: model.ConfigContext(), + NginxConfigContext: testModel.ConfigContext(), } instanceHealthMessage := health.InstanceHealthMessage{ @@ -76,13 +78,15 @@ func TestWatcher_Init(t *testing.T) { } credentialUpdateMessage := credentials.CredentialUpdateMessage{ - CorrelationID: logger.GenerateCorrelationID(), + CorrelationID: logger.GenerateCorrelationID(), + ServerType: model.Command, + GrpcConnection: &grpc.GrpcConnection{}, } watcherPlugin.instanceUpdatesChannel <- instanceUpdatesMessage watcherPlugin.nginxConfigContextChannel <- nginxConfigContextMessage watcherPlugin.instanceHealthChannel <- instanceHealthMessage - watcherPlugin.credentialUpdatesChannel <- credentialUpdateMessage + watcherPlugin.commandCredentialUpdatesChannel <- credentialUpdateMessage assert.Eventually(t, func() bool { return len(messagePipe.Messages()) == 6 }, 2*time.Second, 10*time.Millisecond) messages = messagePipe.Messages() @@ -113,7 +117,7 @@ func TestWatcher_Init(t *testing.T) { messages[4], ) assert.Equal(t, - &bus.Message{Topic: bus.CredentialUpdatedTopic, Data: nil}, + &bus.Message{Topic: bus.ConnectionResetTopic, Data: &grpc.GrpcConnection{}}, messages[5]) } @@ -165,8 +169,8 @@ func TestWatcher_Process_ConfigApplySuccessfulTopic(t *testing.T) { ctx := context.Background() data := protos.NginxOssInstance([]string{}) - response := &model2.ConfigApplySuccess{ - ConfigContext: &model2.NginxConfigContext{ + response := &model.ConfigApplySuccess{ + ConfigContext: &model.NginxConfigContext{ InstanceID: data.GetInstanceMeta().GetInstanceId(), }, DataPlaneResponse: &mpi.DataPlaneResponse{ @@ -236,7 +240,6 @@ func TestWatcher_Subscriptions(t *testing.T) { assert.Equal( t, []string{ - bus.CredentialUpdatedTopic, bus.ConfigApplyRequestTopic, bus.ConfigApplySuccessfulTopic, bus.ConfigApplyCompleteTopic, diff --git a/pkg/files/file_helpers.go b/pkg/files/file_helpers.go index ff286485f..1ebad824a 100644 --- a/pkg/files/file_helpers.go +++ b/pkg/files/file_helpers.go @@ -122,11 +122,13 @@ func FileMode(mode string) os.FileMode { func GenerateConfigVersion(fileSlice []*mpi.File) string { var hashes string - slices.SortFunc(fileSlice, func(a, b *mpi.File) int { + files := make([]*mpi.File, len(fileSlice)) + copy(files, fileSlice) + slices.SortFunc(files, func(a, b *mpi.File) int { return cmp.Compare(a.GetFileMeta().GetName(), b.GetFileMeta().GetName()) }) - for _, file := range fileSlice { + for _, file := range files { hashes += file.GetFileMeta().GetHash() }