Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions app/services/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"hostlink/internal/apiserver"
"hostlink/internal/crypto"
"hostlink/internal/networkmetrics"
"hostlink/internal/pgbouncermetrics"
"hostlink/internal/pgmetrics"
"hostlink/internal/storagemetrics"
"hostlink/internal/sysmetrics"
Expand All @@ -29,14 +30,15 @@ type Pusher interface {
}

type metricspusher struct {
apiserver apiserver.MetricsOperations
agentstate agentstate.Operations
metricscollector pgmetrics.Collector
syscollector sysmetrics.Collector
netcollector networkmetrics.Collector
storagecollector storagemetrics.Collector
crypto crypto.Service
privateKeyPath string
apiserver apiserver.MetricsOperations
agentstate agentstate.Operations
metricscollector pgmetrics.Collector
syscollector sysmetrics.Collector
netcollector networkmetrics.Collector
storagecollector storagemetrics.Collector
pgbouncercollector pgbouncermetrics.Collector
crypto crypto.Service
privateKeyPath string
}

func NewWithConf() (*metricspusher, error) {
Expand All @@ -50,14 +52,15 @@ func NewWithConf() (*metricspusher, error) {
}

return &metricspusher{
apiserver: svr,
agentstate: agentstate,
metricscollector: pgmetrics.New(),
syscollector: sysmetrics.New(),
netcollector: networkmetrics.New(),
storagecollector: storagemetrics.New(),
crypto: crypto.NewService(),
privateKeyPath: appconf.AgentPrivateKeyPath(),
apiserver: svr,
agentstate: agentstate,
metricscollector: pgmetrics.New(),
syscollector: sysmetrics.New(),
netcollector: networkmetrics.New(),
storagecollector: storagemetrics.New(),
pgbouncercollector: pgbouncermetrics.New(),
crypto: crypto.NewService(),
privateKeyPath: appconf.AgentPrivateKeyPath(),
}, nil
}

Expand All @@ -73,18 +76,20 @@ func NewWithDependencies(
syscollector sysmetrics.Collector,
netcollector networkmetrics.Collector,
storagecollector storagemetrics.Collector,
pgbouncercollector pgbouncermetrics.Collector,
crypto crypto.Service,
privateKeyPath string,
) *metricspusher {
return &metricspusher{
apiserver: apiserver,
agentstate: agentstate,
metricscollector: pgcollector,
syscollector: syscollector,
netcollector: netcollector,
storagecollector: storagecollector,
crypto: crypto,
privateKeyPath: privateKeyPath,
apiserver: apiserver,
agentstate: agentstate,
metricscollector: pgcollector,
syscollector: syscollector,
netcollector: netcollector,
storagecollector: storagecollector,
pgbouncercollector: pgbouncercollector,
crypto: crypto,
privateKeyPath: privateKeyPath,
}
}

Expand Down Expand Up @@ -183,6 +188,20 @@ func (mp *metricspusher) Push(cred credential.Credential) error {
}
}

// PgBouncer stats — try-connect approach: silently skip when not running.
// The collector returns an error if PgBouncer is unreachable; we mark Up: false
// and still include the metric set so the server can track the pooler state.
pgbouncerMetrics, err := mp.pgbouncercollector.Collect(cred)
if err != nil {
pgbouncerMetrics = domainmetrics.PgBouncerMetrics{Up: false}
} else {
pgbouncerMetrics.Up = true
}
metricSets = append(metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypePgBouncer,
Metrics: pgbouncerMetrics,
})

// If only the postgresql.database metric set exists (with up=false) and
// all other collectors failed, we still push so the server knows the agent
// is alive and PostgreSQL status is reported.
Expand Down
80 changes: 59 additions & 21 deletions app/services/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,26 +180,37 @@ func (m *MockCrypto) DecryptWithPrivateKey(ciphertextBase64 string, privateKey *
return args.String(0), args.Error(1)
}

type MockPgBouncerCollector struct {
mock.Mock
}

func (m *MockPgBouncerCollector) Collect(cred credential.Credential) (domainmetrics.PgBouncerMetrics, error) {
args := m.Called(cred)
return args.Get(0).(domainmetrics.PgBouncerMetrics), args.Error(1)
}

// Test helpers
type testMocks struct {
apiserver *MockAPIServer
agentstate *MockAgentState
collector *MockCollector
syscollector *MockSysCollector
netcollector *MockNetCollector
storagecollector *MockStorageCollector
crypto *MockCrypto
apiserver *MockAPIServer
agentstate *MockAgentState
collector *MockCollector
syscollector *MockSysCollector
netcollector *MockNetCollector
storagecollector *MockStorageCollector
pgbouncercollector *MockPgBouncerCollector
crypto *MockCrypto
}

func setupTestMetricsPusher() (*metricspusher, *testMocks) {
mocks := &testMocks{
apiserver: new(MockAPIServer),
agentstate: new(MockAgentState),
collector: new(MockCollector),
syscollector: new(MockSysCollector),
netcollector: new(MockNetCollector),
storagecollector: new(MockStorageCollector),
crypto: new(MockCrypto),
apiserver: new(MockAPIServer),
agentstate: new(MockAgentState),
collector: new(MockCollector),
syscollector: new(MockSysCollector),
netcollector: new(MockNetCollector),
storagecollector: new(MockStorageCollector),
pgbouncercollector: new(MockPgBouncerCollector),
crypto: new(MockCrypto),
}

mp := NewWithDependencies(
Expand All @@ -209,6 +220,7 @@ func setupTestMetricsPusher() (*metricspusher, *testMocks) {
mocks.syscollector,
mocks.netcollector,
mocks.storagecollector,
mocks.pgbouncercollector,
mocks.crypto,
"/test/key/path",
)
Expand Down Expand Up @@ -458,6 +470,7 @@ func TestPush_SystemMetricsFailure_StillPushesDbMetrics(t *testing.T) {
Return(domainmetrics.SystemMetrics{}, errors.New("collection failed"))
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true, ConnectionsTotal: 5}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
Expand Down Expand Up @@ -496,6 +509,7 @@ func TestPush_DatabaseMetricsFailure_StillPushesSystemMetricsAndDbWithUpFalse(t
setupSysCollectorMocks(mocks.syscollector)
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, collectErr)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
Expand Down Expand Up @@ -542,16 +556,26 @@ func TestPush_AllCollectionsFail_StillPushesDbWithUpFalse(t *testing.T) {
Return(nil, errors.New("storage failed"))
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, errors.New("connection refused"))
mocks.pgbouncercollector.On("Collect", testCred).
Return(domainmetrics.PgBouncerMetrics{}, errors.New("pgbouncer not running"))
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
if len(p.MetricSets) != 1 {
// Expect 2 metric sets: postgresql.database (up=false) + pgbouncer.stats (up=false)
if len(p.MetricSets) != 2 {
return false
}
ms := p.MetricSets[0]
if ms.Type != domainmetrics.MetricTypePostgreSQLDatabase {
return false
hasDbUpFalse := false
hasPgBouncerUpFalse := false
for _, ms := range p.MetricSets {
if ms.Type == domainmetrics.MetricTypePostgreSQLDatabase {
dbMetrics := ms.Metrics.(domainmetrics.PostgreSQLDatabaseMetrics)
hasDbUpFalse = !dbMetrics.Up
}
if ms.Type == domainmetrics.MetricTypePgBouncer {
pbMetrics := ms.Metrics.(domainmetrics.PgBouncerMetrics)
hasPgBouncerUpFalse = !pbMetrics.Up
}
}
dbMetrics := ms.Metrics.(domainmetrics.PostgreSQLDatabaseMetrics)
return !dbMetrics.Up
return hasDbUpFalse && hasPgBouncerUpFalse
})).Return(nil)

err := mp.Push(testCred)
Expand All @@ -569,6 +593,7 @@ func TestPush_APIServerPushFailure(t *testing.T) {
setupSysCollectorMocks(mocks.syscollector)
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true, ConnectionsTotal: 5}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.Anything).
Expand All @@ -593,6 +618,7 @@ func TestPush_Success_ValidatesPayloadSchema(t *testing.T) {
setupSysCollectorMocks(mocks.syscollector)
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
connected := true
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{
Expand Down Expand Up @@ -620,7 +646,7 @@ func TestPush_Success_ValidatesPayloadSchema(t *testing.T) {
if p.Resource.HostName == "" {
return false
}
if len(p.MetricSets) != 4 {
if len(p.MetricSets) != 5 {
return false
}

Expand Down Expand Up @@ -719,6 +745,7 @@ func TestPush_ContextPropagation(t *testing.T) {
setupSysCollectorMocks(mocks.syscollector)
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.MatchedBy(func(ctx context.Context) bool {
Expand Down Expand Up @@ -762,6 +789,7 @@ func TestPush_CredentialPassedCorrectly(t *testing.T) {
setupSysCollectorMocks(mocks.syscollector)
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", mock.MatchedBy(func(c credential.Credential) bool {
return c.Host == testCred.Host &&
c.Port == testCred.Port &&
Expand Down Expand Up @@ -797,6 +825,11 @@ func setupNetCollectorMocks(collector *MockNetCollector) {
}, nil)
}

func setupPgBouncerCollectorMocks(collector *MockPgBouncerCollector) {
collector.On("Collect", mock.Anything).
Return(domainmetrics.PgBouncerMetrics{}, errors.New("connection refused: pgbouncer not running"))
}

func setupStorageCollectorMocks(collector *MockStorageCollector) {
collector.On("Collect", mock.Anything).Return([]storagemetrics.StorageMetricSet{
{
Expand Down Expand Up @@ -827,6 +860,7 @@ func TestPush_DatabaseDown_SendsUpFalseWithZeroMetrics(t *testing.T) {
setupSysCollectorMocks(mocks.syscollector)
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, errors.New("connection refused"))
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
Expand Down Expand Up @@ -883,6 +917,7 @@ func TestPush_IncludesStorageMetrics(t *testing.T) {
setupSysCollectorMocks(mocks.syscollector)
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
Expand Down Expand Up @@ -912,6 +947,7 @@ func TestPush_StorageMetricsFailure_StillPushesOtherMetrics(t *testing.T) {
setupNetCollectorMocks(mocks.netcollector)
mocks.storagecollector.On("Collect", mock.Anything).
Return(nil, errors.New("storage collection failed"))
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
Expand Down Expand Up @@ -956,6 +992,7 @@ func TestPush_StorageMetricsMultipleMounts(t *testing.T) {
Metrics: domainmetrics.StorageMetrics{DiskUsedPercent: 75.0},
},
}, nil)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
Expand Down Expand Up @@ -993,6 +1030,7 @@ func TestPush_StorageMetricsWithAttributes(t *testing.T) {
Metrics: domainmetrics.StorageMetrics{DiskUsedPercent: 80.0},
},
}, nil)
setupPgBouncerCollectorMocks(mocks.pgbouncercollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
Expand Down
17 changes: 17 additions & 0 deletions domain/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
MetricTypeNetwork = "network"
MetricTypePostgreSQLDatabase = "postgresql.database"
MetricTypeStorage = "storage"
MetricTypePgBouncer = "pgbouncer.stats"
)

type MetricPayload struct {
Expand Down Expand Up @@ -54,6 +55,22 @@ type PostgreSQLDatabaseMetrics struct {
ReplicationConnected *bool `json:"replication_connected,omitempty"`
}

// PgBouncerMetrics holds aggregated connection pool statistics collected
// via the PgBouncer admin console (SHOW POOLS + SHOW STATS).
// Up is false when PgBouncer is not running or unreachable.
type PgBouncerMetrics struct {
Up bool `json:"up"`
ClientsActive int `json:"clients_active"`
ClientsWaiting int `json:"clients_waiting"`
ServersActive int `json:"servers_active"`
ServersIdle int `json:"servers_idle"`
MaxWaitMs float64 `json:"max_wait_ms"`
AvgQueryTimeMs float64 `json:"avg_query_time_ms"`
AvgWaitTimeMs float64 `json:"avg_wait_time_ms"`
TotalQueriesPerSec float64 `json:"total_queries_per_sec"`
PoolCount int `json:"pool_count"`
}

type StorageMetrics struct {
DiskUsedBytes float64 `json:"disk_used_bytes"`
DiskFreeBytes float64 `json:"disk_free_bytes"`
Expand Down
Loading
Loading