Skip to content

Update credential watcher to allow second credential watcher #1132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: add-read-only-file-plugin
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context,
CommandResponse: &mpi.CommandResponse{
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
Message: message,
Error: "Can not perform write action as auxiliary command server",
Error: "Unable to process request. Management plane is configured as read only.",
},
InstanceId: instanceID,
})
Expand Down
37 changes: 37 additions & 0 deletions internal/config/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,40 @@ func ToCommandProto(cmd *Command) *mpi.CommandServer {

return protoConfig
}

// ToAuxiliaryCommandServerProto maps the AgentConfig Command struct back to the AuxiliaryCommandServer proto message
func ToAuxiliaryCommandServerProto(cmd *Command) *mpi.AuxiliaryCommandServer {
protoConfig := &mpi.AuxiliaryCommandServer{}

// Map ServerConfig to the ServerSettings
if cmd.Server != nil {
protoServerType := mpi.ServerSettings_SERVER_SETTINGS_TYPE_UNDEFINED
if cmd.Server.Type == Grpc {
protoServerType = mpi.ServerSettings_SERVER_SETTINGS_TYPE_GRPC
}

protoConfig.Server = &mpi.ServerSettings{
Host: cmd.Server.Host,
Port: int32(cmd.Server.Port),
Type: protoServerType,
}
}

// Map AuthConfig to AuthSettings
if cmd.Auth != nil {
protoConfig.Auth = &mpi.AuthSettings{}
}

// Map TLSConfig to TLSSettings
if cmd.TLS != nil {
protoConfig.Tls = &mpi.TLSSettings{
Cert: cmd.TLS.Cert,
Key: cmd.TLS.Key,
Ca: cmd.TLS.Ca,
ServerName: cmd.TLS.ServerName,
SkipVerify: cmd.TLS.SkipVerify,
}
}

return protoConfig
}
86 changes: 64 additions & 22 deletions internal/watcher/credentials/credential_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"sync/atomic"
"time"

"github.com/nginx/agent/v3/internal/model"

"github.com/nginx/agent/v3/internal/grpc"

"github.com/fsnotify/fsnotify"
"github.com/nginx/agent/v3/internal/config"
"github.com/nginx/agent/v3/internal/logger"
Expand All @@ -28,56 +32,75 @@ var emptyEvent = fsnotify.Event{
}

type CredentialUpdateMessage struct {
CorrelationID slog.Attr
CorrelationID slog.Attr
GrpcConnection *grpc.GrpcConnection
ServerType model.ServerType
}

type CredentialWatcherService struct {
agentConfig *config.Config
watcher *fsnotify.Watcher
filesBeingWatched *sync.Map
filesChanged *atomic.Bool
serverType model.ServerType
watcherMutex sync.Mutex
}

func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherService {
func NewCredentialWatcherService(agentConfig *config.Config, serverType model.ServerType) *CredentialWatcherService {
filesChanged := &atomic.Bool{}
filesChanged.Store(false)

return &CredentialWatcherService{
agentConfig: agentConfig,
filesBeingWatched: &sync.Map{},
filesChanged: filesChanged,
serverType: serverType,
watcherMutex: sync.Mutex{},
}
}

func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- CredentialUpdateMessage) {
slog.DebugContext(ctx, "Starting credential watcher monitoring")
newCtx := context.WithValue(
ctx,
logger.ServerTypeContextKey,
slog.Any(logger.ServerTypeKey, cws.serverType.String()),
)
slog.DebugContext(newCtx, "Starting credential watcher monitoring")

ticker := time.NewTicker(monitoringInterval)
watcher, err := fsnotify.NewWatcher()
if err != nil {
slog.ErrorContext(ctx, "Failed to create credential watcher", "error", err)
slog.ErrorContext(newCtx, "Failed to create credential watcher", "error", err)
return
}

cws.watcher = watcher

cws.watchFiles(ctx, credentialPaths(cws.agentConfig))
cws.watcherMutex.Lock()
commandServer := cws.agentConfig.Command

if cws.serverType == model.Auxiliary {
commandServer = cws.agentConfig.AuxiliaryCommand
}

cws.watchFiles(newCtx, credentialPaths(commandServer))
cws.watcherMutex.Unlock()

for {
select {
case <-ctx.Done():
case <-newCtx.Done():
closeError := cws.watcher.Close()
if closeError != nil {
slog.ErrorContext(ctx, "Unable to close credential watcher", "error", closeError)
slog.ErrorContext(newCtx, "Unable to close credential watcher", "error", closeError)
}

return
case event := <-cws.watcher.Events:
cws.handleEvent(ctx, event)
cws.handleEvent(newCtx, event)
case <-ticker.C:
cws.checkForUpdates(ctx, ch)
cws.checkForUpdates(newCtx, ch)
case watcherError := <-cws.watcher.Errors:
slog.ErrorContext(ctx, "Unexpected error in credential watcher", "error", watcherError)
slog.ErrorContext(newCtx, "Unexpected error in credential watcher", "error", watcherError)
}
}
}
Expand Down Expand Up @@ -146,31 +169,50 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha
slog.Any(logger.CorrelationIDKey, logger.GenerateCorrelationID()),
)

cws.watcherMutex.Lock()
defer cws.watcherMutex.Unlock()

commandServer := cws.agentConfig.Command
if cws.serverType == model.Auxiliary {
commandServer = cws.agentConfig.AuxiliaryCommand
}

conn, err := grpc.NewGrpcConnection(newCtx, cws.agentConfig, commandServer)
if err != nil {
slog.ErrorContext(newCtx, "Unable to create new grpc connection", "error", err)
cws.filesChanged.Store(false)

return
}
slog.DebugContext(ctx, "Credential watcher has detected changes")
ch <- CredentialUpdateMessage{CorrelationID: logger.CorrelationIDAttr(newCtx)}
ch <- CredentialUpdateMessage{
CorrelationID: logger.CorrelationIDAttr(newCtx),
ServerType: cws.serverType,
GrpcConnection: conn,
}
cws.filesChanged.Store(false)
}
}

func credentialPaths(agentConfig *config.Config) []string {
func credentialPaths(agentConfig *config.Command) []string {
var paths []string

if agentConfig.Command.Auth != nil {
if agentConfig.Command.Auth.TokenPath != "" {
paths = append(paths, agentConfig.Command.Auth.TokenPath)
if agentConfig.Auth != nil {
if agentConfig.Auth.TokenPath != "" {
paths = append(paths, agentConfig.Auth.TokenPath)
}
}

// agent's tls certs
if agentConfig.Command.TLS != nil {
if agentConfig.Command.TLS.Ca != "" {
paths = append(paths, agentConfig.Command.TLS.Ca)
if agentConfig.TLS != nil {
if agentConfig.TLS.Ca != "" {
paths = append(paths, agentConfig.TLS.Ca)
}
if agentConfig.Command.TLS.Cert != "" {
paths = append(paths, agentConfig.Command.TLS.Cert)
if agentConfig.TLS.Cert != "" {
paths = append(paths, agentConfig.TLS.Cert)
}
if agentConfig.Command.TLS.Key != "" {
paths = append(paths, agentConfig.Command.TLS.Key)
if agentConfig.TLS.Key != "" {
paths = append(paths, agentConfig.TLS.Key)
}
}

Expand Down
18 changes: 10 additions & 8 deletions internal/watcher/credentials/credential_watcher_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"testing"
"time"

"github.com/nginx/agent/v3/internal/model"

"github.com/nginx/agent/v3/internal/config"

"github.com/fsnotify/fsnotify"
Expand All @@ -22,15 +24,15 @@ import (
)

func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) {
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig())
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig(), model.Command)

assert.Empty(t, credentialWatcherService.filesBeingWatched)
assert.False(t, credentialWatcherService.filesChanged.Load())
}

func TestCredentialWatcherService_Watch(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand Down Expand Up @@ -61,7 +63,7 @@ func TestCredentialWatcherService_Watch(t *testing.T) {
}

func TestCredentialWatcherService_isWatching(t *testing.T) {
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
assert.False(t, cws.isWatching("test-file"))
cws.filesBeingWatched.Store("test-file", true)
assert.True(t, cws.isWatching("test-file"))
Expand All @@ -80,7 +82,7 @@ func TestCredentialWatcherService_isEventSkippable(t *testing.T) {

func TestCredentialWatcherService_addWatcher(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand All @@ -105,7 +107,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {
var files []string

ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand Down Expand Up @@ -137,7 +139,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {

func TestCredentialWatcherService_checkForUpdates(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand All @@ -164,7 +166,7 @@ func TestCredentialWatcherService_checkForUpdates(t *testing.T) {

func TestCredentialWatcherService_handleEvent(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand Down Expand Up @@ -232,7 +234,7 @@ func Test_credentialPaths(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig), "credentialPaths(%v)", tt.agentConfig)
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig.Command), "credentialPaths(%v)", tt.agentConfig)
})
}
}
9 changes: 8 additions & 1 deletion internal/watcher/instance/instance_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan
slog.WarnContext(ctx, "Unable to convert config to labels structure", "error", convertErr)
}

return &mpi.Instance{
instance := &mpi.Instance{
InstanceMeta: &mpi.InstanceMeta{
InstanceId: iw.agentConfig.UUID,
InstanceType: mpi.InstanceMeta_INSTANCE_TYPE_AGENT,
Expand All @@ -333,6 +333,13 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan
Details: nil,
},
}

if iw.agentConfig.IsAuxiliaryCommandGrpcClientConfigured() {
instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = config.
ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand)
}

return instance
}

func compareInstances(oldInstancesMap, instancesMap map[string]*mpi.Instance) (
Expand Down
Loading