diff --git a/go.mod b/go.mod index 75c7194..358902e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/stacklok/toolhive-registry-server go 1.25.3 require ( + github.com/fsnotify/fsnotify v1.9.0 github.com/go-chi/chi/v5 v5.2.3 github.com/go-git/go-billy/v5 v5.6.2 github.com/go-git/go-git/v5 v5.16.3 @@ -38,7 +39,6 @@ require ( github.com/emirpasic/gods v1.18.1 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/extism/go-sdk v1.7.0 // indirect - github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect github.com/go-logr/zapr v1.3.0 // indirect diff --git a/pkg/config/config.go b/pkg/config/config.go index e2d6c47..3441f97 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -17,9 +17,7 @@ const ( // SourceTypeFile is the type for registry data stored in local files SourceTypeFile = "file" -) -const ( // RegistryFormatToolHive is the native ToolHive registry format SourceFormatToolHive = "toolhive" @@ -27,8 +25,9 @@ const ( SourceFormatUpstream = "upstream" ) -// ConfigLoader defines the interface for loading configuration type ConfigLoader interface { + // LoadConfig reads and parses a configuration file from the given path. + // The file is only read, never modified. LoadConfig(path string) (*Config, error) } @@ -118,18 +117,19 @@ func NewConfigLoader() ConfigLoader { return &configLoader{} } -// LoadConfig loads and parses configuration from a YAML file +// LoadConfig reads and parses a YAML configuration file. +// This is a read-only operation - the file is never modified. func (c *configLoader) LoadConfig(path string) (*Config, error) { - // Read the entire file into memory + // Read-only file access data, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("failed to read config file: %w", err) } - // Parse YAML content + // Parse the YAML content var config Config if err := yaml.Unmarshal(data, &config); err != nil { - return nil, fmt.Errorf("failed to parse YAML config: %w", err) + return nil, fmt.Errorf("failed to parse YAML: %w", err) } return &config, nil diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index b53fa63..d2c4082 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -144,7 +144,7 @@ filter: yamlContent: `source: [invalid yaml`, wantConfig: nil, wantErr: true, - errMsg: "failed to parse YAML config", + errMsg: "failed to parse YAML", }, { name: "file_not_found", diff --git a/pkg/config/manager.go b/pkg/config/manager.go new file mode 100644 index 0000000..6efc383 --- /dev/null +++ b/pkg/config/manager.go @@ -0,0 +1,224 @@ +package config + +import ( + "context" + "fmt" + "sync" + + "github.com/fsnotify/fsnotify" + "github.com/stacklok/toolhive/pkg/logger" +) + +// ConfigManager provides thread-safe, read-only configuration management. +// Configuration files are never modified by the application - all updates come from +// external sources (Kubernetes ConfigMaps, Docker volume mounts, orchestration tools). +// +// Design for read-only operation: +// - No file locking needed - we only observe external changes +// - No write coordination required - we never modify files +// - Uses sync.RWMutex optimized for concurrent reads +// - Validates externally-updated configs before applying +// - Preserves last known good configuration on invalid updates +// - Optimized for container environments with atomic file updates +type ConfigManager interface { + // GetConfig safely retrieves the current configuration + GetConfig() *Config + + // ReloadConfig reads the latest configuration from disk and applies it if valid. + // The file is only read, never written. Returns error if the new config is invalid. + ReloadConfig() error + + // WatchConfig observes the configuration file for external changes. + // Automatically reloads when the file is updated by external systems. + // Blocks until context is cancelled. + WatchConfig(ctx context.Context) error + + // Close releases the file watcher resources + Close() error +} + +// ConfigValidator defines the interface for validating configurations +// This allows custom validation logic to be injected beyond the basic validation +type ConfigValidator interface { + Validate(config *Config) error +} + +// defaultValidator uses the Config's built-in validation +type defaultValidator struct{} + +// Validate delegates to the Config's own Validate method +func (v *defaultValidator) Validate(config *Config) error { + return config.Validate() +} + +// configManager is the concrete implementation of ConfigManager +type configManager struct { + mu sync.RWMutex // Protects concurrent access to config + config *Config // Current active configuration + configPath string // Path to configuration file + loader ConfigLoader // Loader for reading config files + validator ConfigValidator // Validator for checking config validity + watcher *fsnotify.Watcher // File system watcher (nil if not watching) + watcherMu sync.Mutex // Protects watcher field +} + +// ConfigManagerOption allows customizing ConfigManager behavior +type ConfigManagerOption func(*configManager) + +// WithValidator sets a custom validator for the config manager +func WithValidator(validator ConfigValidator) ConfigManagerOption { + return func(cm *configManager) { + cm.validator = validator + } +} + +// WithLoader sets a custom config loader for the config manager +func WithLoader(loader ConfigLoader) ConfigManagerOption { + return func(cm *configManager) { + cm.loader = loader + } +} + +// NewConfigManager creates a new ConfigManager with the given configuration file path. +// It loads and validates the initial configuration. +// Returns error if initial load or validation fails. +func NewConfigManager(configPath string, opts ...ConfigManagerOption) (ConfigManager, error) { + cm := &configManager{ + configPath: configPath, + loader: NewConfigLoader(), + validator: &defaultValidator{}, // Uses Config.Validate() by default + } + + // Apply options + for _, opt := range opts { + opt(cm) + } + + // Load initial configuration + if err := cm.ReloadConfig(); err != nil { + return nil, fmt.Errorf("failed to load initial configuration: %w", err) + } + + return cm, nil +} + +// GetConfig safely retrieves the current configuration +// Multiple goroutines can safely call this method concurrently +func (cm *configManager) GetConfig() *Config { + cm.mu.RLock() + defer cm.mu.RUnlock() + + // Return a copy to prevent external modifications + // This is a shallow copy, which is sufficient for our use case + // as the Config struct fields are not modified in place + configCopy := *cm.config + return &configCopy +} + +// ReloadConfig reads the configuration file and applies it if valid. +// The file is treated as read-only - we never modify it. +// If the new configuration is invalid, the previous configuration remains active. +func (cm *configManager) ReloadConfig() error { + // Read the configuration file (read-only operation) + newConfig, err := cm.loader.LoadConfig(cm.configPath) + if err != nil { + return fmt.Errorf("failed to read config file: %w", err) + } + + // Validate before applying + if err := cm.validator.Validate(newConfig); err != nil { + return fmt.Errorf("invalid configuration: %w", err) + } + + // Atomically update the in-memory configuration + cm.mu.Lock() + cm.config = newConfig + cm.mu.Unlock() + + logger.Infof("Configuration reloaded from %s", cm.configPath) + return nil +} + +// WatchConfig observes the configuration file for external changes. +// Since we never write to the file, all changes come from external sources: +// - Kubernetes ConfigMap updates (atomic via symlink swaps) +// - Docker volume mount updates +// - Configuration management tools +// +// This method blocks until the context is cancelled. +func (cm *configManager) WatchConfig(ctx context.Context) error { + cm.watcherMu.Lock() + if cm.watcher != nil { + cm.watcherMu.Unlock() + return fmt.Errorf("config watcher is already running") + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + cm.watcherMu.Unlock() + return fmt.Errorf("failed to create file watcher: %w", err) + } + cm.watcher = watcher + cm.watcherMu.Unlock() + + // Add config file to watcher + if err := watcher.Add(cm.configPath); err != nil { + return fmt.Errorf("failed to watch config file %s: %w", cm.configPath, err) + } + + logger.Infof("Started watching configuration file: %s", cm.configPath) + + // Watch loop + for { + select { + case <-ctx.Done(): + logger.Info("Stopping config file watcher due to context cancellation") + return ctx.Err() + + case event, ok := <-watcher.Events: + if !ok { + return fmt.Errorf("watcher event channel closed") + } + + // Detect external file modifications + if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) { + logger.Infof("External config update detected, reloading") + + if err := cm.ReloadConfig(); err != nil { + logger.Errorf("Failed to reload config: %v", err) + // Continue observing - previous config remains active + } + } + + // Handle K8s ConfigMap updates (may remove/recreate symlinks) + if event.Has(fsnotify.Remove) { + logger.Debugf("Config file removed (K8s update), re-watching") + _ = watcher.Add(cm.configPath) + } + + case err, ok := <-watcher.Errors: + if !ok { + return fmt.Errorf("watcher error channel closed") + } + // Log error but continue watching + logger.Errorf("File watcher error: %v", err) + } + } +} + +// Close releases resources held by the config manager +// Specifically, it closes the file watcher if active +func (cm *configManager) Close() error { + cm.watcherMu.Lock() + defer cm.watcherMu.Unlock() + + if cm.watcher != nil { + if err := cm.watcher.Close(); err != nil { + return fmt.Errorf("failed to close file watcher: %w", err) + } + cm.watcher = nil + logger.Info("Config watcher closed") + } + + return nil +} diff --git a/pkg/config/manager_test.go b/pkg/config/manager_test.go new file mode 100644 index 0000000..459faed --- /dev/null +++ b/pkg/config/manager_test.go @@ -0,0 +1,772 @@ +package config + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// validConfigYAML returns a valid configuration YAML string +func validConfigYAML() string { + return `source: + type: configmap + configmap: + name: test-registry +syncPolicy: + interval: "30m" +filter: + tags: + include: ["production"] + exclude: ["beta"]` +} + +// invalidConfigYAML returns an invalid configuration YAML string (missing required fields) +func invalidConfigYAML() string { + return `source: + type: "" +syncPolicy: + interval: "30m" +filter: + tags: + include: []` +} + +// TestNewConfigManager tests the creation of a new ConfigManager +func TestNewConfigManager(t *testing.T) { + tests := []struct { + name string + setupConfig func(t *testing.T) string + wantErr bool + errMsg string + }{ + { + name: "valid_config", + setupConfig: func(t *testing.T) string { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + return configPath + }, + wantErr: false, + }, + { + name: "invalid_config", + setupConfig: func(t *testing.T) string { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + err := os.WriteFile(configPath, []byte(invalidConfigYAML()), 0644) + require.NoError(t, err) + return configPath + }, + wantErr: true, + errMsg: "invalid configuration", + }, + { + name: "nonexistent_config", + setupConfig: func(t *testing.T) string { + tmpDir := t.TempDir() + return filepath.Join(tmpDir, "nonexistent.yaml") + }, + wantErr: true, + errMsg: "failed to load initial configuration", + }, + { + name: "invalid_yaml_syntax", + setupConfig: func(t *testing.T) string { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + err := os.WriteFile(configPath, []byte("invalid: [yaml"), 0644) + require.NoError(t, err) + return configPath + }, + wantErr: true, + errMsg: "failed to load initial configuration", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + configPath := tt.setupConfig(t) + + manager, err := NewConfigManager(configPath) + + if tt.wantErr { + require.Error(t, err) + if tt.errMsg != "" { + assert.Contains(t, err.Error(), tt.errMsg) + } + return + } + + require.NoError(t, err) + require.NotNil(t, manager) + + // Verify we can get the config + config := manager.GetConfig() + assert.NotNil(t, config) + + // Clean up + err = manager.Close() + assert.NoError(t, err) + }) + } +} + +// TestGetConfig tests thread-safe config retrieval +func TestGetConfig(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + // Test basic retrieval + config := manager.GetConfig() + require.NotNil(t, config) + assert.Equal(t, "configmap", config.Source.Type) + assert.Equal(t, "test-registry", config.Source.ConfigMap.Name) + assert.Equal(t, "30m", config.SyncPolicy.Interval) + + // Test that returned config is a copy (modifications don't affect stored config) + config.Source.Type = "modified" + config2 := manager.GetConfig() + assert.Equal(t, "configmap", config2.Source.Type, "Config should be a copy") +} + +// TestGetConfigConcurrent tests concurrent access to GetConfig +func TestGetConfigConcurrent(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + // Run many concurrent reads + const numReaders = 100 + var wg sync.WaitGroup + wg.Add(numReaders) + + for i := 0; i < numReaders; i++ { + go func() { + defer wg.Done() + config := manager.GetConfig() + assert.NotNil(t, config) + assert.Equal(t, "configmap", config.Source.Type) + }() + } + + wg.Wait() +} + +// TestReloadConfig tests configuration reloading +func TestReloadConfig(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + // Write initial config + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + // Verify initial config + config := manager.GetConfig() + assert.Equal(t, "test-registry", config.Source.ConfigMap.Name) + + // Update config file + newConfig := `source: + type: configmap + configmap: + name: updated-registry +syncPolicy: + interval: "1h" +filter: + tags: + include: ["development"] + exclude: []` + err = os.WriteFile(configPath, []byte(newConfig), 0644) + require.NoError(t, err) + + // Reload config + err = manager.ReloadConfig() + require.NoError(t, err) + + // Verify updated config + config = manager.GetConfig() + assert.Equal(t, "updated-registry", config.Source.ConfigMap.Name) + assert.Equal(t, "1h", config.SyncPolicy.Interval) + assert.Equal(t, []string{"development"}, config.Filter.Tags.Include) +} + +// TestReloadConfigFailure tests that old config is preserved on reload failure +func TestReloadConfigFailure(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + // Write initial valid config + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + // Verify initial config + config := manager.GetConfig() + assert.Equal(t, "test-registry", config.Source.ConfigMap.Name) + + // Write invalid config + err = os.WriteFile(configPath, []byte(invalidConfigYAML()), 0644) + require.NoError(t, err) + + // Attempt to reload (should fail) + err = manager.ReloadConfig() + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid configuration") + + // Verify old config is still active + config = manager.GetConfig() + assert.Equal(t, "test-registry", config.Source.ConfigMap.Name, "Old config should be preserved") +} + +// TestReloadConfigConcurrentReads tests reloading while reading +func TestReloadConfigConcurrentReads(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + // Write initial config + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + // Start many concurrent readers + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var wg sync.WaitGroup + const numReaders = 50 + + // Continuous readers + wg.Add(numReaders) + for i := 0; i < numReaders; i++ { + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + config := manager.GetConfig() + assert.NotNil(t, config) + // Config should always be valid + assert.NotEmpty(t, config.Source.Type) + } + } + }() + } + + // Perform multiple reloads while readers are active + for i := 0; i < 10; i++ { + newConfig := fmt.Sprintf(`source: + type: configmap + configmap: + name: registry-%d +syncPolicy: + interval: "30m" +filter: + tags: + include: ["test"] + exclude: []`, i) + err = os.WriteFile(configPath, []byte(newConfig), 0644) + require.NoError(t, err) + + err = manager.ReloadConfig() + require.NoError(t, err) + + time.Sleep(50 * time.Millisecond) + } + + cancel() + wg.Wait() +} + +// TestConfigValidator tests the default validator +func TestConfigValidator(t *testing.T) { + validator := &defaultValidator{} + + tests := []struct { + name string + config *Config + wantErr bool + errMsg string + }{ + { + name: "valid_configmap_config", + config: &Config{ + Source: SourceConfig{ + Type: "configmap", + ConfigMap: &ConfigMapConfig{ + Name: "test-registry", + }, + }, + SyncPolicy: SyncPolicyConfig{ + Interval: "30m", + }, + Filter: FilterConfig{}, + }, + wantErr: false, + }, + { + name: "nil_config", + config: nil, + wantErr: true, + errMsg: "config cannot be nil", + }, + { + name: "missing_source_type", + config: &Config{ + Source: SourceConfig{ + Type: "", + }, + SyncPolicy: SyncPolicyConfig{ + Interval: "30m", + }, + }, + wantErr: true, + errMsg: "source.type is required", + }, + { + name: "configmap_missing_config", + config: &Config{ + Source: SourceConfig{ + Type: "configmap", + ConfigMap: nil, + }, + SyncPolicy: SyncPolicyConfig{ + Interval: "30m", + }, + }, + wantErr: true, + errMsg: "source.configmap is required when type is configmap", + }, + { + name: "configmap_missing_name", + config: &Config{ + Source: SourceConfig{ + Type: "configmap", + ConfigMap: &ConfigMapConfig{ + Name: "", + }, + }, + SyncPolicy: SyncPolicyConfig{ + Interval: "30m", + }, + }, + wantErr: true, + errMsg: "source.configmap.name is required", + }, + { + name: "missing_sync_interval", + config: &Config{ + Source: SourceConfig{ + Type: "configmap", + ConfigMap: &ConfigMapConfig{ + Name: "test", + }, + }, + SyncPolicy: SyncPolicyConfig{ + Interval: "", + }, + }, + wantErr: true, + errMsg: "syncPolicy.interval is required", + }, + { + name: "invalid_sync_interval_format", + config: &Config{ + Source: SourceConfig{ + Type: "configmap", + ConfigMap: &ConfigMapConfig{ + Name: "test", + }, + }, + SyncPolicy: SyncPolicyConfig{ + Interval: "invalid", + }, + }, + wantErr: true, + errMsg: "must be a valid duration", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validator.Validate(tt.config) + + if tt.wantErr { + require.Error(t, err) + if tt.errMsg != "" { + assert.Contains(t, err.Error(), tt.errMsg) + } + return + } + + require.NoError(t, err) + }) + } +} + +// TestWatchConfig tests file watching functionality +func TestWatchConfig(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + // Write initial config + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + // Start watching in background + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + watchErr := make(chan error, 1) + go func() { + watchErr <- manager.WatchConfig(ctx) + }() + + // Wait for watcher to initialize + time.Sleep(100 * time.Millisecond) + + // Verify initial config + config := manager.GetConfig() + assert.Equal(t, "test-registry", config.Source.ConfigMap.Name) + + // Modify the config file + newConfig := `source: + type: configmap + configmap: + name: watched-registry +syncPolicy: + interval: "45m" +filter: + tags: + include: ["watched"] + exclude: []` + err = os.WriteFile(configPath, []byte(newConfig), 0644) + require.NoError(t, err) + + // Wait for debounce and reload (debounce is 500ms + some processing time) + time.Sleep(800 * time.Millisecond) + + // Verify config was reloaded + config = manager.GetConfig() + assert.Equal(t, "watched-registry", config.Source.ConfigMap.Name) + assert.Equal(t, "45m", config.SyncPolicy.Interval) + + // Stop watching + cancel() + + // Wait for watch to stop + select { + case err := <-watchErr: + // Context cancellation is expected + assert.ErrorIs(t, err, context.Canceled) + case <-time.After(2 * time.Second): + t.Fatal("WatchConfig did not stop after context cancellation") + } +} + +// TestWatchConfigImmediate tests that config changes are applied immediately (no debouncing) +// This simulates container environments where updates are atomic (e.g., K8s ConfigMaps) +func TestWatchConfigImmediate(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + // Write initial config + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + // Start watching in background + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + _ = manager.WatchConfig(ctx) + }() + + // Wait for watcher to initialize + time.Sleep(100 * time.Millisecond) + + // Test immediate application of changes + testCases := []string{"update-1", "update-2", "update-3"} + + for _, name := range testCases { + newConfig := fmt.Sprintf(`source: + type: configmap + configmap: + name: %s +syncPolicy: + interval: "30m" +filter: + tags: + include: [] + exclude: []`, name) + + err = os.WriteFile(configPath, []byte(newConfig), 0644) + require.NoError(t, err) + + // In container environments, changes should be applied immediately + // Small wait for file system events to propagate + time.Sleep(100 * time.Millisecond) + + config := manager.GetConfig() + assert.Equal(t, name, config.Source.ConfigMap.Name, + "Config should be updated immediately to %s", name) + } + + cancel() +} + +// TestWatchConfigInvalidUpdate tests that invalid config updates don't break watching +func TestWatchConfigInvalidUpdate(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + // Write initial config + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + // Start watching in background + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + _ = manager.WatchConfig(ctx) + }() + + // Wait for watcher to initialize + time.Sleep(100 * time.Millisecond) + + // Get initial config + initialConfig := manager.GetConfig() + assert.Equal(t, "test-registry", initialConfig.Source.ConfigMap.Name) + + // Write invalid config + err = os.WriteFile(configPath, []byte(invalidConfigYAML()), 0644) + require.NoError(t, err) + + // Wait for debounce and reload attempt + time.Sleep(800 * time.Millisecond) + + // Config should still be the old valid one + config := manager.GetConfig() + assert.Equal(t, "test-registry", config.Source.ConfigMap.Name) + + // Write valid config again + validConfig := `source: + type: configmap + configmap: + name: recovered-registry +syncPolicy: + interval: "15m" +filter: + tags: + include: [] + exclude: []` + err = os.WriteFile(configPath, []byte(validConfig), 0644) + require.NoError(t, err) + + // Wait for reload + time.Sleep(800 * time.Millisecond) + + // Should have new valid config + config = manager.GetConfig() + assert.Equal(t, "recovered-registry", config.Source.ConfigMap.Name) + + cancel() +} + +// TestWatchConfigAlreadyWatching tests that we can't start watching twice +func TestWatchConfigAlreadyWatching(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start first watcher + go func() { + _ = manager.WatchConfig(ctx) + }() + + // Wait for watcher to initialize + time.Sleep(100 * time.Millisecond) + + // Try to start second watcher + err = manager.WatchConfig(ctx) + require.Error(t, err) + assert.Contains(t, err.Error(), "already running") + + cancel() +} + +// TestConfigManagerWithCustomValidator tests using a custom validator +func TestConfigManagerWithCustomValidator(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + // Write config + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + // Custom validator that always fails + customValidator := &mockValidator{ + validateFunc: func(config *Config) error { + return fmt.Errorf("custom validation error") + }, + } + + // Should fail with custom validator + _, err = NewConfigManager(configPath, WithValidator(customValidator)) + require.Error(t, err) + assert.Contains(t, err.Error(), "custom validation error") +} + +// TestConfigManagerWithCustomLoader tests using a custom loader +func TestConfigManagerWithCustomLoader(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + // Custom loader that returns a fixed config + customLoader := &mockLoader{ + loadFunc: func(path string) (*Config, error) { + return &Config{ + Source: SourceConfig{ + Type: "configmap", + ConfigMap: &ConfigMapConfig{ + Name: "custom-loader-registry", + }, + }, + SyncPolicy: SyncPolicyConfig{ + Interval: "1h", + }, + Filter: FilterConfig{}, + }, nil + }, + } + + manager, err := NewConfigManager(configPath, WithLoader(customLoader)) + require.NoError(t, err) + defer func() { + assert.NoError(t, manager.Close()) + }() + + config := manager.GetConfig() + assert.Equal(t, "custom-loader-registry", config.Source.ConfigMap.Name) +} + +// TestClose tests proper cleanup of resources +func TestClose(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.yaml") + + err := os.WriteFile(configPath, []byte(validConfigYAML()), 0644) + require.NoError(t, err) + + manager, err := NewConfigManager(configPath) + require.NoError(t, err) + + // Start watching + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + _ = manager.WatchConfig(ctx) + }() + + time.Sleep(100 * time.Millisecond) + + // Close should succeed + err = manager.Close() + assert.NoError(t, err) + + // Second close should also succeed (idempotent) + err = manager.Close() + assert.NoError(t, err) + + cancel() +} + +// Mock implementations for testing + +type mockValidator struct { + validateFunc func(*Config) error +} + +func (m *mockValidator) Validate(config *Config) error { + return m.validateFunc(config) +} + +type mockLoader struct { + loadFunc func(string) (*Config, error) +} + +func (m *mockLoader) LoadConfig(path string) (*Config, error) { + return m.loadFunc(path) +}