From b81fb374a93632501591c764ad07d5a9ebb14c89 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Tue, 17 Jun 2025 10:22:47 +0100 Subject: [PATCH 01/11] allow second credential watcher --- .../credentials/credential_watcher_service.go | 82 ++++++++++++++----- .../credential_watcher_service_test.go | 18 ++-- internal/watcher/watcher_plugin.go | 64 +++++++-------- internal/watcher/watcher_plugin_test.go | 10 ++- 4 files changed, 110 insertions(+), 64 deletions(-) diff --git a/internal/watcher/credentials/credential_watcher_service.go b/internal/watcher/credentials/credential_watcher_service.go index 0a1bcfaa8..677938e78 100644 --- a/internal/watcher/credentials/credential_watcher_service.go +++ b/internal/watcher/credentials/credential_watcher_service.go @@ -13,6 +13,9 @@ import ( "sync/atomic" "time" + "github.com/nginx/agent/v3/internal/command" + "github.com/nginx/agent/v3/internal/grpc" + "github.com/fsnotify/fsnotify" "github.com/nginx/agent/v3/internal/config" "github.com/nginx/agent/v3/internal/logger" @@ -29,6 +32,8 @@ var emptyEvent = fsnotify.Event{ type CredentialUpdateMessage struct { CorrelationID slog.Attr + Conn *grpc.GrpcConnection + SeverType command.ServerType } type CredentialWatcherService struct { @@ -36,9 +41,11 @@ type CredentialWatcherService struct { watcher *fsnotify.Watcher filesBeingWatched *sync.Map filesChanged *atomic.Bool + serverType command.ServerType + watcherMutex sync.Mutex } -func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherService { +func NewCredentialWatcherService(agentConfig *config.Config, serverType command.ServerType) *CredentialWatcherService { filesChanged := &atomic.Bool{} filesChanged.Store(false) @@ -46,38 +53,53 @@ func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherS agentConfig: agentConfig, filesBeingWatched: &sync.Map{}, filesChanged: filesChanged, + serverType: serverType, + watcherMutex: sync.Mutex{}, } } func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- CredentialUpdateMessage) { - slog.DebugContext(ctx, "Starting credential watcher monitoring") + newCtx := context.WithValue( + ctx, + logger.ServerTypeContextKey, + slog.Any(logger.ServerTypeKey, cws.serverType.String()), + ) + slog.DebugContext(newCtx, "Starting credential watcher monitoring") ticker := time.NewTicker(monitoringInterval) watcher, err := fsnotify.NewWatcher() if err != nil { - slog.ErrorContext(ctx, "Failed to create credential watcher", "error", err) + slog.ErrorContext(newCtx, "Failed to create credential watcher", "error", err) return } cws.watcher = watcher - cws.watchFiles(ctx, credentialPaths(cws.agentConfig)) + cws.watcherMutex.Lock() + commandSever := cws.agentConfig.Command + + if cws.serverType == command.Auxiliary { + commandSever = cws.agentConfig.AuxiliaryCommand + } + + cws.watchFiles(newCtx, credentialPaths(commandSever)) + cws.watcherMutex.Unlock() for { select { - case <-ctx.Done(): + case <-newCtx.Done(): closeError := cws.watcher.Close() if closeError != nil { - slog.ErrorContext(ctx, "Unable to close credential watcher", "error", closeError) + slog.ErrorContext(newCtx, "Unable to close credential watcher", "error", closeError) } return case event := <-cws.watcher.Events: - cws.handleEvent(ctx, event) + cws.handleEvent(newCtx, event) case <-ticker.C: - cws.checkForUpdates(ctx, ch) + cws.checkForUpdates(newCtx, ch) case watcherError := <-cws.watcher.Errors: - slog.ErrorContext(ctx, "Unexpected error in credential watcher", "error", watcherError) + slog.ErrorContext(newCtx, "Unexpected error in credential watcher", "error", watcherError) } } } @@ -146,31 +168,49 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha slog.Any(logger.CorrelationIDKey, logger.GenerateCorrelationID()), ) + cws.watcherMutex.Lock() + defer cws.watcherMutex.Unlock() + + commandSever := cws.agentConfig.Command + if cws.serverType == command.Auxiliary { + commandSever = cws.agentConfig.AuxiliaryCommand + } + + conn, err := grpc.NewGrpcConnection(newCtx, cws.agentConfig, commandSever) + if err != nil { + slog.ErrorContext(newCtx, "Unable to create new grpc connection", "error", err) + cws.filesChanged.Store(false) + + return + } slog.DebugContext(ctx, "Credential watcher has detected changes") - ch <- CredentialUpdateMessage{CorrelationID: logger.CorrelationIDAttr(newCtx)} + ch <- CredentialUpdateMessage{ + CorrelationID: logger.CorrelationIDAttr(newCtx), + SeverType: cws.serverType, Conn: conn, + } cws.filesChanged.Store(false) } } -func credentialPaths(agentConfig *config.Config) []string { +func credentialPaths(agentConfig *config.Command) []string { var paths []string - if agentConfig.Command.Auth != nil { - if agentConfig.Command.Auth.TokenPath != "" { - paths = append(paths, agentConfig.Command.Auth.TokenPath) + if agentConfig.Auth != nil { + if agentConfig.Auth.TokenPath != "" { + paths = append(paths, agentConfig.Auth.TokenPath) } } // agent's tls certs - if agentConfig.Command.TLS != nil { - if agentConfig.Command.TLS.Ca != "" { - paths = append(paths, agentConfig.Command.TLS.Ca) + if agentConfig.TLS != nil { + if agentConfig.TLS.Ca != "" { + paths = append(paths, agentConfig.TLS.Ca) } - if agentConfig.Command.TLS.Cert != "" { - paths = append(paths, agentConfig.Command.TLS.Cert) + if agentConfig.TLS.Cert != "" { + paths = append(paths, agentConfig.TLS.Cert) } - if agentConfig.Command.TLS.Key != "" { - paths = append(paths, agentConfig.Command.TLS.Key) + if agentConfig.TLS.Key != "" { + paths = append(paths, agentConfig.TLS.Key) } } diff --git a/internal/watcher/credentials/credential_watcher_service_test.go b/internal/watcher/credentials/credential_watcher_service_test.go index dba4450f3..7abed1767 100644 --- a/internal/watcher/credentials/credential_watcher_service_test.go +++ b/internal/watcher/credentials/credential_watcher_service_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "github.com/nginx/agent/v3/internal/command" + "github.com/nginx/agent/v3/internal/config" "github.com/fsnotify/fsnotify" @@ -22,7 +24,7 @@ import ( ) func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) { - credentialWatcherService := NewCredentialWatcherService(types.AgentConfig()) + credentialWatcherService := NewCredentialWatcherService(types.AgentConfig(), command.Command) assert.Empty(t, credentialWatcherService.filesBeingWatched) assert.False(t, credentialWatcherService.filesChanged.Load()) @@ -30,7 +32,7 @@ func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) func TestCredentialWatcherService_Watch(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -61,7 +63,7 @@ func TestCredentialWatcherService_Watch(t *testing.T) { } func TestCredentialWatcherService_isWatching(t *testing.T) { - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) assert.False(t, cws.isWatching("test-file")) cws.filesBeingWatched.Store("test-file", true) assert.True(t, cws.isWatching("test-file")) @@ -80,7 +82,7 @@ func TestCredentialWatcherService_isEventSkippable(t *testing.T) { func TestCredentialWatcherService_addWatcher(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -105,7 +107,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) { var files []string ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -137,7 +139,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) { func TestCredentialWatcherService_checkForUpdates(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -164,7 +166,7 @@ func TestCredentialWatcherService_checkForUpdates(t *testing.T) { func TestCredentialWatcherService_handleEvent(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig()) + cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -232,7 +234,7 @@ func Test_credentialPaths(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig), "credentialPaths(%v)", tt.agentConfig) + assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig.Command), "credentialPaths(%v)", tt.agentConfig) }) } } diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index bbb288edf..1b31f84ab 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -11,9 +11,9 @@ import ( "slices" "sync" - "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/internal/command" - "github.com/nginx/agent/v3/internal/grpc" + "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/internal/watcher/credentials" @@ -40,12 +40,14 @@ type ( instanceWatcherService instanceWatcherServiceInterface healthWatcherService *health.HealthWatcherService fileWatcherService *file.FileWatcherService - credentialWatcherService credentialWatcherServiceInterface + commandCredentialWatcherService credentialWatcherServiceInterface + auxiliaryCredentialWatcherService credentialWatcherServiceInterface instanceUpdatesChannel chan instance.InstanceUpdatesMessage nginxConfigContextChannel chan instance.NginxConfigContextMessage instanceHealthChannel chan health.InstanceHealthMessage fileUpdatesChannel chan file.FileUpdateMessage - credentialUpdatesChannel chan credentials.CredentialUpdateMessage + commandCredentialUpdatesChannel chan credentials.CredentialUpdateMessage + auxiliaryCredentialUpdatesChannel chan credentials.CredentialUpdateMessage cancel context.CancelFunc instancesWithConfigApplyInProgress []string watcherMutex sync.Mutex @@ -78,12 +80,14 @@ func NewWatcher(agentConfig *config.Config) *Watcher { instanceWatcherService: instance.NewInstanceWatcherService(agentConfig), healthWatcherService: health.NewHealthWatcherService(agentConfig), fileWatcherService: file.NewFileWatcherService(agentConfig), - credentialWatcherService: credentials.NewCredentialWatcherService(agentConfig), + commandCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, command.Command), + auxiliaryCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, command.Auxiliary), instanceUpdatesChannel: make(chan instance.InstanceUpdatesMessage), nginxConfigContextChannel: make(chan instance.NginxConfigContextMessage), instanceHealthChannel: make(chan health.InstanceHealthMessage), fileUpdatesChannel: make(chan file.FileUpdateMessage), - credentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage), + commandCredentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage), + auxiliaryCredentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage), instancesWithConfigApplyInProgress: []string{}, watcherMutex: sync.Mutex{}, } @@ -100,7 +104,11 @@ func (w *Watcher) Init(ctx context.Context, messagePipe bus.MessagePipeInterface go w.instanceWatcherService.Watch(watcherContext, w.instanceUpdatesChannel, w.nginxConfigContextChannel) go w.healthWatcherService.Watch(watcherContext, w.instanceHealthChannel) - go w.credentialWatcherService.Watch(watcherContext, w.credentialUpdatesChannel) + go w.commandCredentialWatcherService.Watch(watcherContext, w.commandCredentialUpdatesChannel) + + if w.agentConfig.AuxiliaryCommand != nil { + go w.auxiliaryCredentialWatcherService.Watch(watcherContext, w.auxiliaryCredentialUpdatesChannel) + } if w.agentConfig.IsFeatureEnabled(pkgConfig.FeatureFileWatcher) { go w.fileWatcherService.Watch(watcherContext, w.fileUpdatesChannel) @@ -132,8 +140,6 @@ func (*Watcher) Info() *bus.Info { func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { switch msg.Topic { - case bus.CredentialUpdatedTopic: - w.handleCredentialUpdate(ctx) case bus.ConfigApplyRequestTopic: w.handleConfigApplyRequest(ctx, msg) case bus.ConfigApplySuccessfulTopic: @@ -149,7 +155,6 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { func (*Watcher) Subscriptions() []string { return []string{ - bus.CredentialUpdatedTopic, bus.ConfigApplyRequestTopic, bus.ConfigApplySuccessfulTopic, bus.ConfigApplyCompleteTopic, @@ -248,34 +253,29 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag w.fileWatcherService.SetEnabled(true) } -func (w *Watcher) handleCredentialUpdate(ctx context.Context) { - slog.DebugContext(ctx, "Watcher plugin received credential update message") - - w.watcherMutex.Lock() - // This will be changed/moved during the credential watcher PR - conn, err := grpc.NewGrpcConnection(ctx, w.agentConfig, w.agentConfig.Command) - if err != nil { - slog.ErrorContext(ctx, "Unable to create new grpc connection", "error", err) - w.watcherMutex.Unlock() - - return - } - w.watcherMutex.Unlock() - w.messagePipe.Process(ctx, &bus.Message{ - Topic: bus.ConnectionResetTopic, Data: conn, - }) -} - func (w *Watcher) monitorWatchers(ctx context.Context) { for { select { case <-ctx.Done(): return - case message := <-w.credentialUpdatesChannel: - slog.DebugContext(ctx, "Received credential update event") - newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID) + case message := <-w.commandCredentialUpdatesChannel: + slog.DebugContext(ctx, "Received credential update event for command server") + newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID), + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, + message.SeverType.String())) + + w.messagePipe.Process(newCtx, &bus.Message{ + Topic: bus.ConnectionResetTopic, Data: message.Conn, + }) + + case message := <-w.auxiliaryCredentialUpdatesChannel: + slog.DebugContext(ctx, "Received credential update event for auxiliary command server") + newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID), + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, + message.SeverType.String())) + w.messagePipe.Process(newCtx, &bus.Message{ - Topic: bus.CredentialUpdatedTopic, Data: nil, + Topic: bus.ConnectionResetTopic, Data: message.Conn, }) case message := <-w.instanceUpdatesChannel: newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID) diff --git a/internal/watcher/watcher_plugin_test.go b/internal/watcher/watcher_plugin_test.go index 7e893a343..7a17413ca 100644 --- a/internal/watcher/watcher_plugin_test.go +++ b/internal/watcher/watcher_plugin_test.go @@ -10,6 +10,9 @@ import ( "testing" "time" + "github.com/nginx/agent/v3/internal/command" + "github.com/nginx/agent/v3/internal/grpc" + model2 "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/internal/watcher/credentials" @@ -77,12 +80,14 @@ func TestWatcher_Init(t *testing.T) { credentialUpdateMessage := credentials.CredentialUpdateMessage{ CorrelationID: logger.GenerateCorrelationID(), + SeverType: command.Command, + Conn: &grpc.GrpcConnection{}, } watcherPlugin.instanceUpdatesChannel <- instanceUpdatesMessage watcherPlugin.nginxConfigContextChannel <- nginxConfigContextMessage watcherPlugin.instanceHealthChannel <- instanceHealthMessage - watcherPlugin.credentialUpdatesChannel <- credentialUpdateMessage + watcherPlugin.commandCredentialUpdatesChannel <- credentialUpdateMessage assert.Eventually(t, func() bool { return len(messagePipe.Messages()) == 6 }, 2*time.Second, 10*time.Millisecond) messages = messagePipe.Messages() @@ -113,7 +118,7 @@ func TestWatcher_Init(t *testing.T) { messages[4], ) assert.Equal(t, - &bus.Message{Topic: bus.CredentialUpdatedTopic, Data: nil}, + &bus.Message{Topic: bus.ConnectionResetTopic, Data: &grpc.GrpcConnection{}}, messages[5]) } @@ -236,7 +241,6 @@ func TestWatcher_Subscriptions(t *testing.T) { assert.Equal( t, []string{ - bus.CredentialUpdatedTopic, bus.ConfigApplyRequestTopic, bus.ConfigApplySuccessfulTopic, bus.ConfigApplyCompleteTopic, From 753ee945ba64cf12cbc88b25ce37df49071f32f4 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Mon, 23 Jun 2025 14:59:01 +0100 Subject: [PATCH 02/11] feedback --- .../credentials/credential_watcher_service.go | 22 ++++++++++--------- .../credential_watcher_service_test.go | 16 +++++++------- internal/watcher/watcher_plugin.go | 14 +++++------- internal/watcher/watcher_plugin_test.go | 17 +++++++------- 4 files changed, 34 insertions(+), 35 deletions(-) diff --git a/internal/watcher/credentials/credential_watcher_service.go b/internal/watcher/credentials/credential_watcher_service.go index 677938e78..291cbd71b 100644 --- a/internal/watcher/credentials/credential_watcher_service.go +++ b/internal/watcher/credentials/credential_watcher_service.go @@ -13,7 +13,8 @@ import ( "sync/atomic" "time" - "github.com/nginx/agent/v3/internal/command" + "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/internal/grpc" "github.com/fsnotify/fsnotify" @@ -31,9 +32,9 @@ var emptyEvent = fsnotify.Event{ } type CredentialUpdateMessage struct { - CorrelationID slog.Attr - Conn *grpc.GrpcConnection - SeverType command.ServerType + CorrelationID slog.Attr + GrpcConnection *grpc.GrpcConnection + ServerType model.ServerType } type CredentialWatcherService struct { @@ -41,11 +42,11 @@ type CredentialWatcherService struct { watcher *fsnotify.Watcher filesBeingWatched *sync.Map filesChanged *atomic.Bool - serverType command.ServerType + serverType model.ServerType watcherMutex sync.Mutex } -func NewCredentialWatcherService(agentConfig *config.Config, serverType command.ServerType) *CredentialWatcherService { +func NewCredentialWatcherService(agentConfig *config.Config, serverType model.ServerType) *CredentialWatcherService { filesChanged := &atomic.Bool{} filesChanged.Store(false) @@ -78,7 +79,7 @@ func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- Creden cws.watcherMutex.Lock() commandSever := cws.agentConfig.Command - if cws.serverType == command.Auxiliary { + if cws.serverType == model.Auxiliary { commandSever = cws.agentConfig.AuxiliaryCommand } @@ -172,7 +173,7 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha defer cws.watcherMutex.Unlock() commandSever := cws.agentConfig.Command - if cws.serverType == command.Auxiliary { + if cws.serverType == model.Auxiliary { commandSever = cws.agentConfig.AuxiliaryCommand } @@ -185,8 +186,9 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha } slog.DebugContext(ctx, "Credential watcher has detected changes") ch <- CredentialUpdateMessage{ - CorrelationID: logger.CorrelationIDAttr(newCtx), - SeverType: cws.serverType, Conn: conn, + CorrelationID: logger.CorrelationIDAttr(newCtx), + ServerType: cws.serverType, + GrpcConnection: conn, } cws.filesChanged.Store(false) } diff --git a/internal/watcher/credentials/credential_watcher_service_test.go b/internal/watcher/credentials/credential_watcher_service_test.go index 7abed1767..31ce8ae98 100644 --- a/internal/watcher/credentials/credential_watcher_service_test.go +++ b/internal/watcher/credentials/credential_watcher_service_test.go @@ -13,7 +13,7 @@ import ( "testing" "time" - "github.com/nginx/agent/v3/internal/command" + "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/internal/config" @@ -24,7 +24,7 @@ import ( ) func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) { - credentialWatcherService := NewCredentialWatcherService(types.AgentConfig(), command.Command) + credentialWatcherService := NewCredentialWatcherService(types.AgentConfig(), model.Command) assert.Empty(t, credentialWatcherService.filesBeingWatched) assert.False(t, credentialWatcherService.filesChanged.Load()) @@ -32,7 +32,7 @@ func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) func TestCredentialWatcherService_Watch(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -63,7 +63,7 @@ func TestCredentialWatcherService_Watch(t *testing.T) { } func TestCredentialWatcherService_isWatching(t *testing.T) { - cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) assert.False(t, cws.isWatching("test-file")) cws.filesBeingWatched.Store("test-file", true) assert.True(t, cws.isWatching("test-file")) @@ -82,7 +82,7 @@ func TestCredentialWatcherService_isEventSkippable(t *testing.T) { func TestCredentialWatcherService_addWatcher(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -107,7 +107,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) { var files []string ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -139,7 +139,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) { func TestCredentialWatcherService_checkForUpdates(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher @@ -166,7 +166,7 @@ func TestCredentialWatcherService_checkForUpdates(t *testing.T) { func TestCredentialWatcherService_handleEvent(t *testing.T) { ctx := context.Background() - cws := NewCredentialWatcherService(types.AgentConfig(), command.Command) + cws := NewCredentialWatcherService(types.AgentConfig(), model.Command) watcher, err := fsnotify.NewWatcher() require.NoError(t, err) cws.watcher = watcher diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 1b31f84ab..c6a1bebf2 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -11,8 +11,6 @@ import ( "slices" "sync" - "github.com/nginx/agent/v3/internal/command" - "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/internal/watcher/credentials" @@ -80,8 +78,8 @@ func NewWatcher(agentConfig *config.Config) *Watcher { instanceWatcherService: instance.NewInstanceWatcherService(agentConfig), healthWatcherService: health.NewHealthWatcherService(agentConfig), fileWatcherService: file.NewFileWatcherService(agentConfig), - commandCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, command.Command), - auxiliaryCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, command.Auxiliary), + commandCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, model.Command), + auxiliaryCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, model.Auxiliary), instanceUpdatesChannel: make(chan instance.InstanceUpdatesMessage), nginxConfigContextChannel: make(chan instance.NginxConfigContextMessage), instanceHealthChannel: make(chan health.InstanceHealthMessage), @@ -262,20 +260,20 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { slog.DebugContext(ctx, "Received credential update event for command server") newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID), logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, - message.SeverType.String())) + message.ServerType.String())) w.messagePipe.Process(newCtx, &bus.Message{ - Topic: bus.ConnectionResetTopic, Data: message.Conn, + Topic: bus.ConnectionResetTopic, Data: message.GrpcConnection, }) case message := <-w.auxiliaryCredentialUpdatesChannel: slog.DebugContext(ctx, "Received credential update event for auxiliary command server") newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID), logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, - message.SeverType.String())) + message.ServerType.String())) w.messagePipe.Process(newCtx, &bus.Message{ - Topic: bus.ConnectionResetTopic, Data: message.Conn, + Topic: bus.ConnectionResetTopic, Data: message.GrpcConnection, }) case message := <-w.instanceUpdatesChannel: newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID) diff --git a/internal/watcher/watcher_plugin_test.go b/internal/watcher/watcher_plugin_test.go index 7a17413ca..20cd99957 100644 --- a/internal/watcher/watcher_plugin_test.go +++ b/internal/watcher/watcher_plugin_test.go @@ -10,10 +10,9 @@ import ( "testing" "time" - "github.com/nginx/agent/v3/internal/command" "github.com/nginx/agent/v3/internal/grpc" - model2 "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/internal/watcher/credentials" @@ -28,7 +27,7 @@ import ( "github.com/nginx/agent/v3/internal/bus" "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/pkg/id" - "github.com/nginx/agent/v3/test/model" + testModel "github.com/nginx/agent/v3/test/model" "github.com/nginx/agent/v3/test/protos" "github.com/nginx/agent/v3/test/types" "github.com/stretchr/testify/assert" @@ -70,7 +69,7 @@ func TestWatcher_Init(t *testing.T) { nginxConfigContextMessage := instance.NginxConfigContextMessage{ CorrelationID: logger.GenerateCorrelationID(), - NginxConfigContext: model.ConfigContext(), + NginxConfigContext: testModel.ConfigContext(), } instanceHealthMessage := health.InstanceHealthMessage{ @@ -79,9 +78,9 @@ func TestWatcher_Init(t *testing.T) { } credentialUpdateMessage := credentials.CredentialUpdateMessage{ - CorrelationID: logger.GenerateCorrelationID(), - SeverType: command.Command, - Conn: &grpc.GrpcConnection{}, + CorrelationID: logger.GenerateCorrelationID(), + ServerType: model.Command, + GrpcConnection: &grpc.GrpcConnection{}, } watcherPlugin.instanceUpdatesChannel <- instanceUpdatesMessage @@ -170,8 +169,8 @@ func TestWatcher_Process_ConfigApplySuccessfulTopic(t *testing.T) { ctx := context.Background() data := protos.NginxOssInstance([]string{}) - response := &model2.ConfigApplySuccess{ - ConfigContext: &model2.NginxConfigContext{ + response := &model.ConfigApplySuccess{ + ConfigContext: &model.NginxConfigContext{ InstanceID: data.GetInstanceMeta().GetInstanceId(), }, DataPlaneResponse: &mpi.DataPlaneResponse{ From d58625933c0ec7a02bc245722c1b44d5eecc00af Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Mon, 23 Jun 2025 17:03:39 +0100 Subject: [PATCH 03/11] fix race condition --- pkg/files/file_helpers.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/files/file_helpers.go b/pkg/files/file_helpers.go index ff286485f..1ebad824a 100644 --- a/pkg/files/file_helpers.go +++ b/pkg/files/file_helpers.go @@ -122,11 +122,13 @@ func FileMode(mode string) os.FileMode { func GenerateConfigVersion(fileSlice []*mpi.File) string { var hashes string - slices.SortFunc(fileSlice, func(a, b *mpi.File) int { + files := make([]*mpi.File, len(fileSlice)) + copy(files, fileSlice) + slices.SortFunc(files, func(a, b *mpi.File) int { return cmp.Compare(a.GetFileMeta().GetName(), b.GetFileMeta().GetName()) }) - for _, file := range fileSlice { + for _, file := range files { hashes += file.GetFileMeta().GetHash() } From a8d5b2977e5d1054740f75504289c85847253201 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 26 Jun 2025 12:24:53 +0100 Subject: [PATCH 04/11] Pr feedback --- .../credentials/credential_watcher_service.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/watcher/credentials/credential_watcher_service.go b/internal/watcher/credentials/credential_watcher_service.go index 291cbd71b..868acd3ae 100644 --- a/internal/watcher/credentials/credential_watcher_service.go +++ b/internal/watcher/credentials/credential_watcher_service.go @@ -77,13 +77,13 @@ func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- Creden cws.watcher = watcher cws.watcherMutex.Lock() - commandSever := cws.agentConfig.Command + commandServer := cws.agentConfig.Command if cws.serverType == model.Auxiliary { - commandSever = cws.agentConfig.AuxiliaryCommand + commandServer = cws.agentConfig.AuxiliaryCommand } - cws.watchFiles(newCtx, credentialPaths(commandSever)) + cws.watchFiles(newCtx, credentialPaths(commandServer)) cws.watcherMutex.Unlock() for { @@ -172,12 +172,12 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha cws.watcherMutex.Lock() defer cws.watcherMutex.Unlock() - commandSever := cws.agentConfig.Command + commandServer := cws.agentConfig.Command if cws.serverType == model.Auxiliary { - commandSever = cws.agentConfig.AuxiliaryCommand + commandServer = cws.agentConfig.AuxiliaryCommand } - conn, err := grpc.NewGrpcConnection(newCtx, cws.agentConfig, commandSever) + conn, err := grpc.NewGrpcConnection(newCtx, cws.agentConfig, commandServer) if err != nil { slog.ErrorContext(newCtx, "Unable to create new grpc connection", "error", err) cws.filesChanged.Store(false) From d08ba6ffcbff508ffe1286691fd2eab5139afb69 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 27 Jun 2025 14:51:09 +0100 Subject: [PATCH 05/11] PR feedback --- internal/watcher/watcher_plugin.go | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index d425291db..42ca8dd5c 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -260,24 +260,9 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { case <-ctx.Done(): return case message := <-w.commandCredentialUpdatesChannel: - slog.DebugContext(ctx, "Received credential update event for command server") - newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID), - logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, - message.ServerType.String())) - - w.messagePipe.Process(newCtx, &bus.Message{ - Topic: bus.ConnectionResetTopic, Data: message.GrpcConnection, - }) - + w.handleCredentialUpdate(ctx, message) case message := <-w.auxiliaryCredentialUpdatesChannel: - slog.DebugContext(ctx, "Received credential update event for auxiliary command server") - newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID), - logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, - message.ServerType.String())) - - w.messagePipe.Process(newCtx, &bus.Message{ - Topic: bus.ConnectionResetTopic, Data: message.GrpcConnection, - }) + w.handleCredentialUpdate(ctx, message) case message := <-w.instanceUpdatesChannel: newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID) w.handleInstanceUpdates(newCtx, message) @@ -317,6 +302,17 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { } } +func (w *Watcher) handleCredentialUpdate(ctx context.Context, message credentials.CredentialUpdateMessage) { + newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID), + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, + message.ServerType.String())) + + slog.DebugContext(newCtx, "Received credential update event for command server") + w.messagePipe.Process(newCtx, &bus.Message{ + Topic: bus.ConnectionResetTopic, Data: message.GrpcConnection, + }) +} + func (w *Watcher) handleInstanceUpdates(newCtx context.Context, message instance.InstanceUpdatesMessage) { if len(message.InstanceUpdates.NewInstances) > 0 { slog.DebugContext(newCtx, "New instances found", "instances", message.InstanceUpdates.NewInstances) From 9c61e4c5aebe1a3a620b991f48f39b335cc86921 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Wed, 2 Jul 2025 14:37:52 +0100 Subject: [PATCH 06/11] update message --- internal/command/command_plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index 0a36efc0a..32797cf47 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -380,7 +380,7 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context, CommandResponse: &mpi.CommandResponse{ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, Message: message, - Error: "Can not perform write action as auxiliary command server", + Error: "Unable to process request. Management plane is configured as read only.", }, InstanceId: instanceID, }) From 8f350436d6bc10cb28f6c856c50641191de6e96c Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 3 Jul 2025 11:40:51 +0100 Subject: [PATCH 07/11] update agent config --- internal/config/mapper.go | 37 +++++++++++++++++++ .../instance/instance_watcher_service.go | 1 + 2 files changed, 38 insertions(+) diff --git a/internal/config/mapper.go b/internal/config/mapper.go index beda9352a..c51b1a810 100644 --- a/internal/config/mapper.go +++ b/internal/config/mapper.go @@ -85,3 +85,40 @@ func ToCommandProto(cmd *Command) *mpi.CommandServer { return protoConfig } + +// ToAuxiliaryCommandServerProto maps the AgentConfig Command struct back to the AuxiliaryCommandServer proto message +func ToAuxiliaryCommandServerProto(cmd *Command) *mpi.AuxiliaryCommandServer { + protoConfig := &mpi.AuxiliaryCommandServer{} + + // Map ServerConfig to the ServerSettings + if cmd.Server != nil { + protoServerType := mpi.ServerSettings_SERVER_SETTINGS_TYPE_UNDEFINED + if cmd.Server.Type == Grpc { + protoServerType = mpi.ServerSettings_SERVER_SETTINGS_TYPE_GRPC + } + + protoConfig.Server = &mpi.ServerSettings{ + Host: cmd.Server.Host, + Port: int32(cmd.Server.Port), + Type: protoServerType, + } + } + + // Map AuthConfig to AuthSettings + if cmd.Auth != nil { + protoConfig.Auth = &mpi.AuthSettings{} + } + + // Map TLSConfig to TLSSettings + if cmd.TLS != nil { + protoConfig.Tls = &mpi.TLSSettings{ + Cert: cmd.TLS.Cert, + Key: cmd.TLS.Key, + Ca: cmd.TLS.Ca, + ServerName: cmd.TLS.ServerName, + SkipVerify: cmd.TLS.SkipVerify, + } + } + + return protoConfig +} diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go index b88f62cba..96ef8b348 100644 --- a/internal/watcher/instance/instance_watcher_service.go +++ b/internal/watcher/instance/instance_watcher_service.go @@ -318,6 +318,7 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan Config: &mpi.InstanceConfig_AgentConfig{ AgentConfig: &mpi.AgentConfig{ Command: config.ToCommandProto(iw.agentConfig.Command), + AuxiliaryCommand: config.ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand), Metrics: &mpi.MetricsServer{}, File: &mpi.FileServer{}, Labels: labels, From 70da7a51a1b525cc6be833cccf957a05b3563460 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 3 Jul 2025 11:59:01 +0100 Subject: [PATCH 08/11] update agent config --- internal/watcher/instance/instance_watcher_service.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go index 96ef8b348..116dea8e2 100644 --- a/internal/watcher/instance/instance_watcher_service.go +++ b/internal/watcher/instance/instance_watcher_service.go @@ -307,7 +307,7 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan slog.WarnContext(ctx, "Unable to convert config to labels structure", "error", convertErr) } - return &mpi.Instance{ + instance := &mpi.Instance{ InstanceMeta: &mpi.InstanceMeta{ InstanceId: iw.agentConfig.UUID, InstanceType: mpi.InstanceMeta_INSTANCE_TYPE_AGENT, @@ -318,7 +318,6 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan Config: &mpi.InstanceConfig_AgentConfig{ AgentConfig: &mpi.AgentConfig{ Command: config.ToCommandProto(iw.agentConfig.Command), - AuxiliaryCommand: config.ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand), Metrics: &mpi.MetricsServer{}, File: &mpi.FileServer{}, Labels: labels, @@ -334,6 +333,13 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan Details: nil, }, } + + if iw.agentConfig.AuxiliaryCommand != nil { + instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = + config.ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand) + } + + return instance } func compareInstances(oldInstancesMap, instancesMap map[string]*mpi.Instance) ( From cb0e64a6693d91f26087e6345b2a0f567389214f Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 3 Jul 2025 12:05:15 +0100 Subject: [PATCH 09/11] update agent config --- internal/watcher/instance/instance_watcher_service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go index 116dea8e2..eedc3adf2 100644 --- a/internal/watcher/instance/instance_watcher_service.go +++ b/internal/watcher/instance/instance_watcher_service.go @@ -335,8 +335,8 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan } if iw.agentConfig.AuxiliaryCommand != nil { - instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = - config.ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand) + instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = config. + ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand) } return instance From f85b4c7a0c904aacf74ace3a6bde7efefbcac011 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 3 Jul 2025 15:37:04 +0100 Subject: [PATCH 10/11] try fix test --- internal/watcher/instance/instance_watcher_service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go index eedc3adf2..193ac4c72 100644 --- a/internal/watcher/instance/instance_watcher_service.go +++ b/internal/watcher/instance/instance_watcher_service.go @@ -337,6 +337,8 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan if iw.agentConfig.AuxiliaryCommand != nil { instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = config. ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand) + } else { + instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = &mpi.AuxiliaryCommandServer{} } return instance From 5c55151f2fd70ad158628aef52b29a5e5eb2119f Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 3 Jul 2025 16:23:02 +0100 Subject: [PATCH 11/11] try fix test --- internal/watcher/instance/instance_watcher_service.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go index 193ac4c72..2b07bad21 100644 --- a/internal/watcher/instance/instance_watcher_service.go +++ b/internal/watcher/instance/instance_watcher_service.go @@ -334,11 +334,9 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan }, } - if iw.agentConfig.AuxiliaryCommand != nil { + if iw.agentConfig.IsAuxiliaryCommandGrpcClientConfigured() { instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = config. ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand) - } else { - instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = &mpi.AuxiliaryCommandServer{} } return instance