diff --git a/telemetry/DESIGN.md b/telemetry/DESIGN.md index a9107c7..089bd80 100644 --- a/telemetry/DESIGN.md +++ b/telemetry/DESIGN.md @@ -1744,7 +1744,8 @@ func BenchmarkInterceptor_Disabled(b *testing.B) { ### Phase 2: Per-Host Management - [x] Implement `featureflag.go` with caching and reference counting (PECOBLR-1146) -- [ ] Implement `manager.go` for client management +- [x] Implement `manager.go` for client management (PECOBLR-1147) +- [x] Implement `client.go` with minimal telemetryClient stub (PECOBLR-1147) - [ ] Implement `circuitbreaker.go` with state machine - [ ] Add unit tests for all components diff --git a/telemetry/client.go b/telemetry/client.go new file mode 100644 index 0000000..f345ab3 --- /dev/null +++ b/telemetry/client.go @@ -0,0 +1,38 @@ +package telemetry + +import ( + "net/http" +) + +// telemetryClient represents a client for sending telemetry data to Databricks. +// This is a minimal stub implementation that will be fully implemented in Phase 4. +type telemetryClient struct { + host string + httpClient *http.Client + cfg *Config + started bool + closed bool +} + +// newTelemetryClient creates a new telemetry client for the given host. +func newTelemetryClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient { + return &telemetryClient{ + host: host, + httpClient: httpClient, + cfg: cfg, + } +} + +// start starts the telemetry client's background operations. +// This is a stub implementation that will be fully implemented in Phase 4. +func (c *telemetryClient) start() error { + c.started = true + return nil +} + +// close stops the telemetry client and flushes any pending data. +// This is a stub implementation that will be fully implemented in Phase 4. +func (c *telemetryClient) close() error { + c.closed = true + return nil +} diff --git a/telemetry/manager.go b/telemetry/manager.go new file mode 100644 index 0000000..a4f8237 --- /dev/null +++ b/telemetry/manager.go @@ -0,0 +1,73 @@ +package telemetry + +import ( + "net/http" + "sync" +) + +// clientManager manages one telemetry client per host. +// Prevents rate limiting by sharing clients across connections. +type clientManager struct { + mu sync.RWMutex + clients map[string]*clientHolder +} + +// clientHolder holds a telemetry client and its reference count. +type clientHolder struct { + client *telemetryClient + refCount int +} + +var ( + managerOnce sync.Once + managerInstance *clientManager +) + +// getClientManager returns the singleton instance. +func getClientManager() *clientManager { + managerOnce.Do(func() { + managerInstance = &clientManager{ + clients: make(map[string]*clientHolder), + } + }) + return managerInstance +} + +// getOrCreateClient gets or creates a telemetry client for the host. +// Increments reference count. +func (m *clientManager) getOrCreateClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient { + m.mu.Lock() + defer m.mu.Unlock() + + holder, exists := m.clients[host] + if !exists { + holder = &clientHolder{ + client: newTelemetryClient(host, httpClient, cfg), + } + m.clients[host] = holder + _ = holder.client.start() // Start background flush goroutine + } + holder.refCount++ + return holder.client +} + +// releaseClient decrements reference count for the host. +// Closes and removes client when ref count reaches zero. +func (m *clientManager) releaseClient(host string) error { + m.mu.Lock() + holder, exists := m.clients[host] + if !exists { + m.mu.Unlock() + return nil + } + + holder.refCount-- + if holder.refCount <= 0 { + delete(m.clients, host) + m.mu.Unlock() + return holder.client.close() // Close and flush + } + + m.mu.Unlock() + return nil +} diff --git a/telemetry/manager_test.go b/telemetry/manager_test.go new file mode 100644 index 0000000..c3fecc1 --- /dev/null +++ b/telemetry/manager_test.go @@ -0,0 +1,322 @@ +package telemetry + +import ( + "net/http" + "sync" + "testing" +) + +func TestGetClientManager_Singleton(t *testing.T) { + // Reset singleton for testing + managerInstance = nil + managerOnce = sync.Once{} + + manager1 := getClientManager() + manager2 := getClientManager() + + if manager1 != manager2 { + t.Error("Expected singleton instances to be the same") + } +} + +func TestClientManager_GetOrCreateClient(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + // First call should create client and increment refCount to 1 + client1 := manager.getOrCreateClient(host, httpClient, cfg) + if client1 == nil { + t.Fatal("Expected client to be created") + } + + holder, exists := manager.clients[host] + if !exists { + t.Fatal("Expected holder to exist in clients map") + } + if holder.refCount != 1 { + t.Errorf("Expected refCount to be 1, got %d", holder.refCount) + } + if !client1.started { + t.Error("Expected client to be started") + } + + // Second call should reuse client and increment refCount to 2 + client2 := manager.getOrCreateClient(host, httpClient, cfg) + if client2 != client1 { + t.Error("Expected to get the same client instance") + } + if holder.refCount != 2 { + t.Errorf("Expected refCount to be 2, got %d", holder.refCount) + } +} + +func TestClientManager_GetOrCreateClient_DifferentHosts(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host1 := "host1.databricks.com" + host2 := "host2.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + client1 := manager.getOrCreateClient(host1, httpClient, cfg) + client2 := manager.getOrCreateClient(host2, httpClient, cfg) + + if client1 == client2 { + t.Error("Expected different clients for different hosts") + } + + if len(manager.clients) != 2 { + t.Errorf("Expected 2 clients in manager, got %d", len(manager.clients)) + } +} + +func TestClientManager_ReleaseClient(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + // Create client with refCount = 2 + manager.getOrCreateClient(host, httpClient, cfg) + manager.getOrCreateClient(host, httpClient, cfg) + + // First release should decrement to 1 + err := manager.releaseClient(host) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + holder, exists := manager.clients[host] + if !exists { + t.Fatal("Expected holder to still exist") + } + if holder.refCount != 1 { + t.Errorf("Expected refCount to be 1, got %d", holder.refCount) + } + if holder.client.closed { + t.Error("Expected client not to be closed yet") + } + + // Second release should remove client + err = manager.releaseClient(host) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + _, exists = manager.clients[host] + if exists { + t.Error("Expected holder to be removed when refCount reaches 0") + } + if !holder.client.closed { + t.Error("Expected client to be closed when removed") + } +} + +func TestClientManager_ReleaseClient_NonExistent(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + // Release non-existent host should not panic or error + err := manager.releaseClient("non-existent-host.databricks.com") + if err != nil { + t.Errorf("Expected no error for non-existent host, got %v", err) + } +} + +func TestClientManager_ConcurrentAccess(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + numGoroutines := 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + // Concurrent getOrCreateClient + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + client := manager.getOrCreateClient(host, httpClient, cfg) + if client == nil { + t.Error("Expected client to be created") + } + }() + } + wg.Wait() + + // Verify refCount + holder, exists := manager.clients[host] + if !exists { + t.Fatal("Expected holder to exist") + } + if holder.refCount != numGoroutines { + t.Errorf("Expected refCount to be %d, got %d", numGoroutines, holder.refCount) + } + + // Concurrent releaseClient + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + _ = manager.releaseClient(host) + }() + } + wg.Wait() + + // Verify client is removed + _, exists = manager.clients[host] + if exists { + t.Error("Expected holder to be removed after all releases") + } +} + +func TestClientManager_ConcurrentAccessMultipleHosts(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + hosts := []string{ + "host1.databricks.com", + "host2.databricks.com", + "host3.databricks.com", + } + httpClient := &http.Client{} + cfg := DefaultConfig() + numGoroutinesPerHost := 50 + + var wg sync.WaitGroup + + // Concurrent access to multiple hosts + for _, host := range hosts { + for i := 0; i < numGoroutinesPerHost; i++ { + wg.Add(1) + go func(h string) { + defer wg.Done() + _ = manager.getOrCreateClient(h, httpClient, cfg) + }(host) + } + } + wg.Wait() + + // Verify all hosts have correct refCount + if len(manager.clients) != len(hosts) { + t.Errorf("Expected %d clients, got %d", len(hosts), len(manager.clients)) + } + + for _, host := range hosts { + holder, exists := manager.clients[host] + if !exists { + t.Errorf("Expected holder for host %s to exist", host) + continue + } + if holder.refCount != numGoroutinesPerHost { + t.Errorf("Expected refCount for host %s to be %d, got %d", host, numGoroutinesPerHost, holder.refCount) + } + } +} + +func TestClientManager_ReleaseClientPartial(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + // Create 5 references + for i := 0; i < 5; i++ { + manager.getOrCreateClient(host, httpClient, cfg) + } + + // Release 3 references + for i := 0; i < 3; i++ { + _ = manager.releaseClient(host) + } + + // Should still have 2 references + holder, exists := manager.clients[host] + if !exists { + t.Fatal("Expected holder to still exist") + } + if holder.refCount != 2 { + t.Errorf("Expected refCount to be 2, got %d", holder.refCount) + } + if holder.client.closed { + t.Error("Expected client not to be closed with remaining references") + } +} + +func TestClientManager_ClientStartCalled(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + client := manager.getOrCreateClient(host, httpClient, cfg) + + if !client.started { + t.Error("Expected start() to be called on new client") + } +} + +func TestClientManager_ClientCloseCalled(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + client := manager.getOrCreateClient(host, httpClient, cfg) + _ = manager.releaseClient(host) + + if !client.closed { + t.Error("Expected close() to be called when client is removed") + } +} + +func TestClientManager_MultipleGetOrCreateSameClient(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + // Get same client multiple times + client1 := manager.getOrCreateClient(host, httpClient, cfg) + client2 := manager.getOrCreateClient(host, httpClient, cfg) + client3 := manager.getOrCreateClient(host, httpClient, cfg) + + // All should be same instance + if client1 != client2 || client2 != client3 { + t.Error("Expected all calls to return the same client instance") + } + + // Verify refCount is 3 + holder := manager.clients[host] + if holder.refCount != 3 { + t.Errorf("Expected refCount to be 3, got %d", holder.refCount) + } +}