From 8356a1021ac3d19e26cff74f3a3228a56bbfda80 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Mon, 27 Apr 2026 01:21:18 +0530 Subject: [PATCH 1/3] Add PgBouncer metrics collection to agent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Collect connection pool statistics from PgBouncer admin console (port 6432) using SHOW POOLS + SHOW STATS. Uses a try-connect approach — silently marks Up: false when PgBouncer is not running, so the server can track pooler state without requiring any config flag changes. - New pgbouncermetrics package: connects via selfhostadmin credential, aggregates pool/stats counters into PgBouncerMetrics struct - domain/metrics: adds MetricTypePgBouncer constant and PgBouncerMetrics type - metrics service: includes pgbouncer.stats metric set in every push cycle - metrics_test: adds MockPgBouncerCollector and updates all Push tests for the new always-included metric set Co-Authored-By: Claude Sonnet 4.6 --- app/services/metrics/metrics.go | 67 +++++---- app/services/metrics/metrics_test.go | 80 ++++++++--- domain/metrics/metrics.go | 17 +++ internal/pgbouncermetrics/collector.go | 187 +++++++++++++++++++++++++ 4 files changed, 306 insertions(+), 45 deletions(-) create mode 100644 internal/pgbouncermetrics/collector.go diff --git a/app/services/metrics/metrics.go b/app/services/metrics/metrics.go index 6cff199..4e608aa 100644 --- a/app/services/metrics/metrics.go +++ b/app/services/metrics/metrics.go @@ -15,6 +15,7 @@ import ( "hostlink/internal/apiserver" "hostlink/internal/crypto" "hostlink/internal/networkmetrics" + "hostlink/internal/pgbouncermetrics" "hostlink/internal/pgmetrics" "hostlink/internal/storagemetrics" "hostlink/internal/sysmetrics" @@ -29,14 +30,15 @@ type Pusher interface { } type metricspusher struct { - apiserver apiserver.MetricsOperations - agentstate agentstate.Operations - metricscollector pgmetrics.Collector - syscollector sysmetrics.Collector - netcollector networkmetrics.Collector - storagecollector storagemetrics.Collector - crypto crypto.Service - privateKeyPath string + apiserver apiserver.MetricsOperations + agentstate agentstate.Operations + metricscollector pgmetrics.Collector + syscollector sysmetrics.Collector + netcollector networkmetrics.Collector + storagecollector storagemetrics.Collector + pgbouncercollector pgbouncermetrics.Collector + crypto crypto.Service + privateKeyPath string } func NewWithConf() (*metricspusher, error) { @@ -50,14 +52,15 @@ func NewWithConf() (*metricspusher, error) { } return &metricspusher{ - apiserver: svr, - agentstate: agentstate, - metricscollector: pgmetrics.New(), - syscollector: sysmetrics.New(), - netcollector: networkmetrics.New(), - storagecollector: storagemetrics.New(), - crypto: crypto.NewService(), - privateKeyPath: appconf.AgentPrivateKeyPath(), + apiserver: svr, + agentstate: agentstate, + metricscollector: pgmetrics.New(), + syscollector: sysmetrics.New(), + netcollector: networkmetrics.New(), + storagecollector: storagemetrics.New(), + pgbouncercollector: pgbouncermetrics.New(), + crypto: crypto.NewService(), + privateKeyPath: appconf.AgentPrivateKeyPath(), }, nil } @@ -73,18 +76,20 @@ func NewWithDependencies( syscollector sysmetrics.Collector, netcollector networkmetrics.Collector, storagecollector storagemetrics.Collector, + pgbouncercollector pgbouncermetrics.Collector, crypto crypto.Service, privateKeyPath string, ) *metricspusher { return &metricspusher{ - apiserver: apiserver, - agentstate: agentstate, - metricscollector: pgcollector, - syscollector: syscollector, - netcollector: netcollector, - storagecollector: storagecollector, - crypto: crypto, - privateKeyPath: privateKeyPath, + apiserver: apiserver, + agentstate: agentstate, + metricscollector: pgcollector, + syscollector: syscollector, + netcollector: netcollector, + storagecollector: storagecollector, + pgbouncercollector: pgbouncercollector, + crypto: crypto, + privateKeyPath: privateKeyPath, } } @@ -183,6 +188,20 @@ func (mp *metricspusher) Push(cred credential.Credential) error { } } + // PgBouncer stats — try-connect approach: silently skip when not running. + // The collector returns an error if PgBouncer is unreachable; we mark Up: false + // and still include the metric set so the server can track the pooler state. + pgbouncerMetrics, err := mp.pgbouncercollector.Collect(cred) + if err != nil { + pgbouncerMetrics = domainmetrics.PgBouncerMetrics{Up: false} + } else { + pgbouncerMetrics.Up = true + } + metricSets = append(metricSets, domainmetrics.MetricSet{ + Type: domainmetrics.MetricTypePgBouncer, + Metrics: pgbouncerMetrics, + }) + // If only the postgresql.database metric set exists (with up=false) and // all other collectors failed, we still push so the server knows the agent // is alive and PostgreSQL status is reported. diff --git a/app/services/metrics/metrics_test.go b/app/services/metrics/metrics_test.go index 15c8964..b30ab45 100644 --- a/app/services/metrics/metrics_test.go +++ b/app/services/metrics/metrics_test.go @@ -180,26 +180,37 @@ func (m *MockCrypto) DecryptWithPrivateKey(ciphertextBase64 string, privateKey * return args.String(0), args.Error(1) } +type MockPgBouncerCollector struct { + mock.Mock +} + +func (m *MockPgBouncerCollector) Collect(cred credential.Credential) (domainmetrics.PgBouncerMetrics, error) { + args := m.Called(cred) + return args.Get(0).(domainmetrics.PgBouncerMetrics), args.Error(1) +} + // Test helpers type testMocks struct { - apiserver *MockAPIServer - agentstate *MockAgentState - collector *MockCollector - syscollector *MockSysCollector - netcollector *MockNetCollector - storagecollector *MockStorageCollector - crypto *MockCrypto + apiserver *MockAPIServer + agentstate *MockAgentState + collector *MockCollector + syscollector *MockSysCollector + netcollector *MockNetCollector + storagecollector *MockStorageCollector + pgbouncercollector *MockPgBouncerCollector + crypto *MockCrypto } func setupTestMetricsPusher() (*metricspusher, *testMocks) { mocks := &testMocks{ - apiserver: new(MockAPIServer), - agentstate: new(MockAgentState), - collector: new(MockCollector), - syscollector: new(MockSysCollector), - netcollector: new(MockNetCollector), - storagecollector: new(MockStorageCollector), - crypto: new(MockCrypto), + apiserver: new(MockAPIServer), + agentstate: new(MockAgentState), + collector: new(MockCollector), + syscollector: new(MockSysCollector), + netcollector: new(MockNetCollector), + storagecollector: new(MockStorageCollector), + pgbouncercollector: new(MockPgBouncerCollector), + crypto: new(MockCrypto), } mp := NewWithDependencies( @@ -209,6 +220,7 @@ func setupTestMetricsPusher() (*metricspusher, *testMocks) { mocks.syscollector, mocks.netcollector, mocks.storagecollector, + mocks.pgbouncercollector, mocks.crypto, "/test/key/path", ) @@ -458,6 +470,7 @@ func TestPush_SystemMetricsFailure_StillPushesDbMetrics(t *testing.T) { Return(domainmetrics.SystemMetrics{}, errors.New("collection failed")) setupNetCollectorMocks(mocks.netcollector) setupStorageCollectorMocks(mocks.storagecollector) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true, ConnectionsTotal: 5}, nil) mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool { @@ -496,6 +509,7 @@ func TestPush_DatabaseMetricsFailure_StillPushesSystemMetricsAndDbWithUpFalse(t setupSysCollectorMocks(mocks.syscollector) setupNetCollectorMocks(mocks.netcollector) setupStorageCollectorMocks(mocks.storagecollector) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{}, collectErr) mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool { @@ -542,16 +556,26 @@ func TestPush_AllCollectionsFail_StillPushesDbWithUpFalse(t *testing.T) { Return(nil, errors.New("storage failed")) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{}, errors.New("connection refused")) + mocks.pgbouncercollector.On("Collect", testCred). + Return(domainmetrics.PgBouncerMetrics{}, errors.New("pgbouncer not running")) mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool { - if len(p.MetricSets) != 1 { + // Expect 2 metric sets: postgresql.database (up=false) + pgbouncer.stats (up=false) + if len(p.MetricSets) != 2 { return false } - ms := p.MetricSets[0] - if ms.Type != domainmetrics.MetricTypePostgreSQLDatabase { - return false + hasDbUpFalse := false + hasPgBouncerUpFalse := false + for _, ms := range p.MetricSets { + if ms.Type == domainmetrics.MetricTypePostgreSQLDatabase { + dbMetrics := ms.Metrics.(domainmetrics.PostgreSQLDatabaseMetrics) + hasDbUpFalse = !dbMetrics.Up + } + if ms.Type == domainmetrics.MetricTypePgBouncer { + pbMetrics := ms.Metrics.(domainmetrics.PgBouncerMetrics) + hasPgBouncerUpFalse = !pbMetrics.Up + } } - dbMetrics := ms.Metrics.(domainmetrics.PostgreSQLDatabaseMetrics) - return !dbMetrics.Up + return hasDbUpFalse && hasPgBouncerUpFalse })).Return(nil) err := mp.Push(testCred) @@ -569,6 +593,7 @@ func TestPush_APIServerPushFailure(t *testing.T) { setupSysCollectorMocks(mocks.syscollector) setupNetCollectorMocks(mocks.netcollector) setupStorageCollectorMocks(mocks.storagecollector) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true, ConnectionsTotal: 5}, nil) mocks.apiserver.On("PushMetrics", mock.Anything, mock.Anything). @@ -593,6 +618,7 @@ func TestPush_Success_ValidatesPayloadSchema(t *testing.T) { setupSysCollectorMocks(mocks.syscollector) setupNetCollectorMocks(mocks.netcollector) setupStorageCollectorMocks(mocks.storagecollector) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) connected := true mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{ @@ -620,7 +646,7 @@ func TestPush_Success_ValidatesPayloadSchema(t *testing.T) { if p.Resource.HostName == "" { return false } - if len(p.MetricSets) != 4 { + if len(p.MetricSets) != 5 { return false } @@ -719,6 +745,7 @@ func TestPush_ContextPropagation(t *testing.T) { setupSysCollectorMocks(mocks.syscollector) setupNetCollectorMocks(mocks.netcollector) setupStorageCollectorMocks(mocks.storagecollector) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil) mocks.apiserver.On("PushMetrics", mock.MatchedBy(func(ctx context.Context) bool { @@ -762,6 +789,7 @@ func TestPush_CredentialPassedCorrectly(t *testing.T) { setupSysCollectorMocks(mocks.syscollector) setupNetCollectorMocks(mocks.netcollector) setupStorageCollectorMocks(mocks.storagecollector) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", mock.MatchedBy(func(c credential.Credential) bool { return c.Host == testCred.Host && c.Port == testCred.Port && @@ -797,6 +825,11 @@ func setupNetCollectorMocks(collector *MockNetCollector) { }, nil) } +func setupPgBouncerCollectorMocks(collector *MockPgBouncerCollector) { + collector.On("Collect", mock.Anything). + Return(domainmetrics.PgBouncerMetrics{}, errors.New("connection refused: pgbouncer not running")) +} + func setupStorageCollectorMocks(collector *MockStorageCollector) { collector.On("Collect", mock.Anything).Return([]storagemetrics.StorageMetricSet{ { @@ -827,6 +860,7 @@ func TestPush_DatabaseDown_SendsUpFalseWithZeroMetrics(t *testing.T) { setupSysCollectorMocks(mocks.syscollector) setupNetCollectorMocks(mocks.netcollector) setupStorageCollectorMocks(mocks.storagecollector) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{}, errors.New("connection refused")) mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool { @@ -883,6 +917,7 @@ func TestPush_IncludesStorageMetrics(t *testing.T) { setupSysCollectorMocks(mocks.syscollector) setupNetCollectorMocks(mocks.netcollector) setupStorageCollectorMocks(mocks.storagecollector) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil) mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool { @@ -912,6 +947,7 @@ func TestPush_StorageMetricsFailure_StillPushesOtherMetrics(t *testing.T) { setupNetCollectorMocks(mocks.netcollector) mocks.storagecollector.On("Collect", mock.Anything). Return(nil, errors.New("storage collection failed")) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil) mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool { @@ -956,6 +992,7 @@ func TestPush_StorageMetricsMultipleMounts(t *testing.T) { Metrics: domainmetrics.StorageMetrics{DiskUsedPercent: 75.0}, }, }, nil) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil) mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool { @@ -993,6 +1030,7 @@ func TestPush_StorageMetricsWithAttributes(t *testing.T) { Metrics: domainmetrics.StorageMetrics{DiskUsedPercent: 80.0}, }, }, nil) + setupPgBouncerCollectorMocks(mocks.pgbouncercollector) mocks.collector.On("Collect", testCred). Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil) mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool { diff --git a/domain/metrics/metrics.go b/domain/metrics/metrics.go index dfd23b5..98b3606 100644 --- a/domain/metrics/metrics.go +++ b/domain/metrics/metrics.go @@ -6,6 +6,7 @@ const ( MetricTypeNetwork = "network" MetricTypePostgreSQLDatabase = "postgresql.database" MetricTypeStorage = "storage" + MetricTypePgBouncer = "pgbouncer.stats" ) type MetricPayload struct { @@ -54,6 +55,22 @@ type PostgreSQLDatabaseMetrics struct { ReplicationConnected *bool `json:"replication_connected,omitempty"` } +// PgBouncerMetrics holds aggregated connection pool statistics collected +// via the PgBouncer admin console (SHOW POOLS + SHOW STATS). +// Up is false when PgBouncer is not running or unreachable. +type PgBouncerMetrics struct { + Up bool `json:"up"` + ClientsActive int `json:"clients_active"` + ClientsWaiting int `json:"clients_waiting"` + ServersActive int `json:"servers_active"` + ServersIdle int `json:"servers_idle"` + MaxWaitMs float64 `json:"max_wait_ms"` + AvgQueryTimeMs float64 `json:"avg_query_time_ms"` + AvgWaitTimeMs float64 `json:"avg_wait_time_ms"` + TotalQueriesPerSec float64 `json:"total_queries_per_sec"` + PoolCount int `json:"pool_count"` +} + type StorageMetrics struct { DiskUsedBytes float64 `json:"disk_used_bytes"` DiskFreeBytes float64 `json:"disk_free_bytes"` diff --git a/internal/pgbouncermetrics/collector.go b/internal/pgbouncermetrics/collector.go new file mode 100644 index 0000000..445dcbd --- /dev/null +++ b/internal/pgbouncermetrics/collector.go @@ -0,0 +1,187 @@ +// Package pgbouncermetrics collects connection pool statistics from the +// PgBouncer admin console (SHOW POOLS + SHOW STATS). +// Collection is attempted via the admin database on port 6432. +// If PgBouncer is not running the collector returns Up: false without error — +// callers treat this as a soft miss, not a hard failure. +package pgbouncermetrics + +import ( + "context" + "database/sql" + "fmt" + "strconv" + "time" + + _ "github.com/lib/pq" + + "hostlink/domain/credential" + "hostlink/domain/metrics" +) + +const ( + pgbouncerPort = 6432 + pgbouncerDatabase = "pgbouncer" +) + +// Collector collects PgBouncer pool statistics using the same agent +// credential used for PostgreSQL (selfhostadmin / same password). +type Collector interface { + Collect(credential.Credential) (metrics.PgBouncerMetrics, error) +} + +type collector struct { + queryTimeout time.Duration +} + +func New() Collector { + return &collector{queryTimeout: 5 * time.Second} +} + +func (c *collector) Collect(cred credential.Credential) (metrics.PgBouncerMetrics, error) { + password := "" + if cred.Password != nil { + password = *cred.Password + } + + // Always connect to the PgBouncer admin console on port 6432, + // regardless of the credential's port (which may already be 6432). + connStr := fmt.Sprintf( + "host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", + cred.Host, pgbouncerPort, cred.Username, password, pgbouncerDatabase, + ) + + db, err := sql.Open("postgres", connStr) + if err != nil { + return metrics.PgBouncerMetrics{}, fmt.Errorf("open: %w", err) + } + defer db.Close() + + ctx, cancel := context.WithTimeout(context.Background(), c.queryTimeout) + defer cancel() + + if err := db.PingContext(ctx); err != nil { + // PgBouncer not running — caller marks Up: false. + return metrics.PgBouncerMetrics{}, fmt.Errorf("ping: %w", err) + } + + m := metrics.PgBouncerMetrics{} + + if err := c.collectPools(ctx, db, &m); err != nil { + return m, fmt.Errorf("SHOW POOLS: %w", err) + } + + if err := c.collectStats(ctx, db, &m); err != nil { + // Stats are supplementary — log via caller but don't fail the whole collect. + return m, fmt.Errorf("SHOW STATS: %w", err) + } + + return m, nil +} + +// collectPools aggregates per-pool counters from SHOW POOLS. +// Skips the internal "pgbouncer" database row. +func (c *collector) collectPools(ctx context.Context, db *sql.DB, m *metrics.PgBouncerMetrics) error { + rows, err := db.QueryContext(ctx, "SHOW POOLS") + if err != nil { + return err + } + defer rows.Close() + + cols, err := rows.Columns() + if err != nil { + return err + } + + for rows.Next() { + row := scanRowToMap(cols, rows) + if row["database"] == "pgbouncer" { + continue + } + + m.PoolCount++ + m.ClientsActive += parseInt(row["cl_active"]) + m.ClientsWaiting += parseInt(row["cl_waiting"]) + m.ServersActive += parseInt(row["sv_active"]) + m.ServersIdle += parseInt(row["sv_idle"]) + + // maxwait_us (microseconds) is available in PgBouncer ≥ 1.8; + // fall back to maxwait (seconds) for older versions. + if us := parseFloat(row["maxwait_us"]); us > 0 { + waitMs := us / 1000 + if waitMs > m.MaxWaitMs { + m.MaxWaitMs = waitMs + } + } else if s := parseFloat(row["maxwait"]); s > 0 { + waitMs := s * 1000 + if waitMs > m.MaxWaitMs { + m.MaxWaitMs = waitMs + } + } + } + + return rows.Err() +} + +// collectStats reads aggregate throughput and latency from SHOW STATS. +// Averages are computed across all non-meta database rows. +func (c *collector) collectStats(ctx context.Context, db *sql.DB, m *metrics.PgBouncerMetrics) error { + rows, err := db.QueryContext(ctx, "SHOW STATS") + if err != nil { + return err + } + defer rows.Close() + + cols, err := rows.Columns() + if err != nil { + return err + } + + var rowCount int + var sumQueryTime, sumWaitTime float64 + + for rows.Next() { + row := scanRowToMap(cols, rows) + if row["database"] == "pgbouncer" { + continue + } + rowCount++ + m.TotalQueriesPerSec += parseFloat(row["avg_query_count"]) + sumQueryTime += parseFloat(row["avg_query_time"]) // microseconds + sumWaitTime += parseFloat(row["avg_wait_time"]) // microseconds + } + + if rowCount > 0 { + m.AvgQueryTimeMs = sumQueryTime / float64(rowCount) / 1000 + m.AvgWaitTimeMs = sumWaitTime / float64(rowCount) / 1000 + } + + return rows.Err() +} + +// scanRowToMap scans a sql.Rows row into a string map keyed by column name. +func scanRowToMap(cols []string, rows *sql.Rows) map[string]string { + vals := make([]interface{}, len(cols)) + ptrs := make([]interface{}, len(cols)) + for i := range vals { + ptrs[i] = &vals[i] + } + _ = rows.Scan(ptrs...) + + result := make(map[string]string, len(cols)) + for i, col := range cols { + if vals[i] != nil { + result[col] = fmt.Sprintf("%v", vals[i]) + } + } + return result +} + +func parseInt(s string) int { + v, _ := strconv.Atoi(s) + return v +} + +func parseFloat(s string) float64 { + v, _ := strconv.ParseFloat(s, 64) + return v +} From 8a8caac90402d575e68ff07a34a64cd672f7c67a Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Mon, 27 Apr 2026 19:26:46 +0530 Subject: [PATCH 2/3] fix: correct PgBouncer max_wait_ms calculation and weight latency averages maxwait_us is the sub-second microsecond remainder of the wait, not the full duration. Combine with maxwait (whole seconds) using the correct formula: maxwait*1000 + maxwait_us/1000. Previously maxwait_us alone was used whenever nonzero, dropping the whole-seconds component entirely. Latency averages in collectStats are now weighted by avg_query_count so high-traffic databases dominate the aggregate rather than each database row contributing equally regardless of query volume. Co-Authored-By: Claude Sonnet 4.6 --- internal/pgbouncermetrics/collector.go | 40 ++++++++++++-------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/internal/pgbouncermetrics/collector.go b/internal/pgbouncermetrics/collector.go index 445dcbd..7605c7b 100644 --- a/internal/pgbouncermetrics/collector.go +++ b/internal/pgbouncermetrics/collector.go @@ -104,18 +104,11 @@ func (c *collector) collectPools(ctx context.Context, db *sql.DB, m *metrics.PgB m.ServersActive += parseInt(row["sv_active"]) m.ServersIdle += parseInt(row["sv_idle"]) - // maxwait_us (microseconds) is available in PgBouncer ≥ 1.8; - // fall back to maxwait (seconds) for older versions. - if us := parseFloat(row["maxwait_us"]); us > 0 { - waitMs := us / 1000 - if waitMs > m.MaxWaitMs { - m.MaxWaitMs = waitMs - } - } else if s := parseFloat(row["maxwait"]); s > 0 { - waitMs := s * 1000 - if waitMs > m.MaxWaitMs { - m.MaxWaitMs = waitMs - } + // maxwait is whole seconds; maxwait_us is the sub-second remainder in + // microseconds (PgBouncer ≥ 1.8). Combine both to get the full wait. + waitMs := parseFloat(row["maxwait"])*1000 + parseFloat(row["maxwait_us"])/1000 + if waitMs > m.MaxWaitMs { + m.MaxWaitMs = waitMs } } @@ -123,7 +116,8 @@ func (c *collector) collectPools(ctx context.Context, db *sql.DB, m *metrics.PgB } // collectStats reads aggregate throughput and latency from SHOW STATS. -// Averages are computed across all non-meta database rows. +// Latency averages are weighted by each database's avg_query_count so that +// high-traffic databases dominate the aggregate rather than row count. func (c *collector) collectStats(ctx context.Context, db *sql.DB, m *metrics.PgBouncerMetrics) error { rows, err := db.QueryContext(ctx, "SHOW STATS") if err != nil { @@ -136,23 +130,25 @@ func (c *collector) collectStats(ctx context.Context, db *sql.DB, m *metrics.PgB return err } - var rowCount int - var sumQueryTime, sumWaitTime float64 + // Weighted sums: weight each database's avg latency by its query rate so + // high-traffic databases dominate the aggregate, not row count. + var totalWeight, weightedQueryTime, weightedWaitTime float64 for rows.Next() { row := scanRowToMap(cols, rows) if row["database"] == "pgbouncer" { continue } - rowCount++ - m.TotalQueriesPerSec += parseFloat(row["avg_query_count"]) - sumQueryTime += parseFloat(row["avg_query_time"]) // microseconds - sumWaitTime += parseFloat(row["avg_wait_time"]) // microseconds + qps := parseFloat(row["avg_query_count"]) + m.TotalQueriesPerSec += qps + weightedQueryTime += parseFloat(row["avg_query_time"]) * qps // microseconds · qps + weightedWaitTime += parseFloat(row["avg_wait_time"]) * qps // microseconds · qps + totalWeight += qps } - if rowCount > 0 { - m.AvgQueryTimeMs = sumQueryTime / float64(rowCount) / 1000 - m.AvgWaitTimeMs = sumWaitTime / float64(rowCount) / 1000 + if totalWeight > 0 { + m.AvgQueryTimeMs = weightedQueryTime / totalWeight / 1000 + m.AvgWaitTimeMs = weightedWaitTime / totalWeight / 1000 } return rows.Err() From 0808f9800262d44c21ed9f7563f5cd8542380830 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Mon, 27 Apr 2026 21:50:38 +0530 Subject: [PATCH 3/3] fix: handle []byte NUMERIC values from lib/pq in scanRowToMap PgBouncer emits SHOW STATS/SHOW POOLS NUMERIC columns as []byte via lib/pq. fmt.Sprintf("%v", []byte) produces "[49 50 46 53]" rather than "12.5", causing parseFloat/parseInt to return 0. Add a type switch to call string(v) for []byte values so avg_query_count, avg_query_time, and avg_wait_time parse correctly. Co-Authored-By: Claude Sonnet 4.6 --- internal/pgbouncermetrics/collector.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/pgbouncermetrics/collector.go b/internal/pgbouncermetrics/collector.go index 7605c7b..367380b 100644 --- a/internal/pgbouncermetrics/collector.go +++ b/internal/pgbouncermetrics/collector.go @@ -166,7 +166,14 @@ func scanRowToMap(cols []string, rows *sql.Rows) map[string]string { result := make(map[string]string, len(cols)) for i, col := range cols { if vals[i] != nil { - result[col] = fmt.Sprintf("%v", vals[i]) + switch v := vals[i].(type) { + case []byte: + // lib/pq returns NUMERIC columns as []byte; convert directly to + // string so parseFloat/parseInt can read them correctly. + result[col] = string(v) + default: + result[col] = fmt.Sprintf("%v", v) + } } } return result