Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ If you are still using the legacy [Access scopes][access-scopes], the `https://w
| `monitoring.metrics-interval` | No | `5m` | Metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API. Only the most recent data point is used |
| `monitoring.metrics-offset` | No | `0s` | Offset (into the past) for the metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API, to handle latency in published metrics |
| `monitoring.filters` | No | | Additonal filters to be sent on the Monitoring API call. Add multiple filters by providing this parameter multiple times. See [monitoring.filters](#using-filters) for more info. |
| `monitoring.metrics-with-aggregations` | No | | Specify metrics with aggregation options in the format: metric_name:alignment_period:cross_series_reducer:group_by_fields:per_series_aligner. Example: custom.googleapis.com/my_metric:60s:REDUCE_SUM:metric.labels.instance_id,resource.labels.zone:ALIGN_MEAN |
| `monitoring.aggregate-deltas` | No | | If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge. Be sure to read [what to know about aggregating DELTA metrics](#what-to-know-about-aggregating-delta-metrics) |
| `monitoring.aggregate-deltas-ttl` | No | `30m` | How long should a delta metric continue to be exported and stored after GCP stops producing it. Read [slow moving metrics](#slow-moving-metrics) to understand the problem this attempts to solve |
| `monitoring.descriptor-cache-ttl` | No | `0s` | How long should the metric descriptors for a prefixed be cached for |
Expand Down
39 changes: 39 additions & 0 deletions collectors/monitoring_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package collectors

import (
"encoding/json"
"errors"
"fmt"
"log/slog"
Expand All @@ -36,10 +37,19 @@ type MetricFilter struct {
FilterQuery string
}

type MetricAggregationConfig struct {
TargetedMetricPrefix string
AlignmentPeriod string
CrossSeriesReducer string
GroupByFields []string
PerSeriesAligner string
}

type MonitoringCollector struct {
projectID string
metricsTypePrefixes []string
metricsFilters []MetricFilter
metricsAggregationConfigs []MetricAggregationConfig
metricsInterval time.Duration
metricsOffset time.Duration
metricsIngestDelay bool
Expand All @@ -66,6 +76,8 @@ type MonitoringCollectorOptions struct {
// ExtraFilters is a list of criteria to apply to each corresponding metric prefix query. If one or more are
// applicable to a given metric type prefix, they will be 'AND' concatenated.
ExtraFilters []MetricFilter
// MetricsWithAggregations is a list of metrics with aggregation options in the format: metric_name:cross_series_reducer:group_by_fields:per_series_aligner. Example: custom.googleapis.com/my_metric:REDUCE_SUM:metric.labels.instance_id,resource.labels.zone:ALIGN_MEAN
MetricAggregationConfigs []MetricAggregationConfig
// RequestInterval is the time interval used in each request to get metrics. If there are many data points returned
// during this interval, only the latest will be reported.
RequestInterval time.Duration
Expand Down Expand Up @@ -198,6 +210,7 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv
projectID: projectID,
metricsTypePrefixes: opts.MetricTypePrefixes,
metricsFilters: opts.ExtraFilters,
metricsAggregationConfigs: opts.MetricAggregationConfigs,
metricsInterval: opts.RequestInterval,
metricsOffset: opts.RequestOffset,
metricsIngestDelay: opts.IngestDelay,
Expand Down Expand Up @@ -319,6 +332,16 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri
IntervalStartTime(startTime.Format(time.RFC3339Nano)).
IntervalEndTime(endTime.Format(time.RFC3339Nano))

for _, ef := range c.metricsAggregationConfigs {
if strings.HasPrefix(metricDescriptor.Type, ef.TargetedMetricPrefix) {
timeSeriesListCall.AggregationAlignmentPeriod(ef.AlignmentPeriod).
AggregationCrossSeriesReducer(ef.CrossSeriesReducer).
AggregationGroupByFields(ef.GroupByFields...).
AggregationPerSeriesAligner(ef.PerSeriesAligner)
break
}
}

for {
c.apiCallsTotalMetric.Inc()
page, err := timeSeriesListCall.Do()
Expand Down Expand Up @@ -452,6 +475,22 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics(
}
}

// Add the monitored system labels
var systemLabels map[string]string
if timeSeries.Metadata != nil && timeSeries.Metadata.SystemLabels != nil {
err := json.Unmarshal(timeSeries.Metadata.SystemLabels, &systemLabels)
if err != nil {
c.logger.Error("failed to decode SystemLabels", "err", err)
} else {
for key, value := range systemLabels {
if !c.keyExists(labelKeys, key) {
labelKeys = append(labelKeys, key)
labelValues = append(labelValues, value)
}
}
}
}

if c.monitoringDropDelegatedProjects {
dropDelegatedProject := false

Expand Down
257 changes: 256 additions & 1 deletion collectors/monitoring_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,28 @@

package collectors

import "testing"
import (
"log/slog"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/api/monitoring/v3"
)

func TestIsGoogleMetric(t *testing.T) {
good := []string{
"pubsub.googleapis.com/some/metric",
"compute.googleapis.com/instance/cpu/utilization",
"cloudsql.googleapis.com/database/cpu/utilization",
"storage.googleapis.com/api/request_count",
"custom.googleapis.com/my_metric",
}

bad := []string{
"my.metric/a/b",
"my.metrics/pubsub.googleapis.com/a",
"mycompany.com/metrics/requests",
}

for _, e := range good {
Expand All @@ -37,3 +49,246 @@ func TestIsGoogleMetric(t *testing.T) {
}
}
}

func TestGoogleDescriptorCache(t *testing.T) {
ttl := 1 * time.Second
innerCache := newDescriptorCache(ttl)
cache := &googleDescriptorCache{inner: innerCache}

googleMetric := "pubsub.googleapis.com/topic/num_undelivered_messages"
customMetric := "custom.googleapis.com/my_metric"

googleDescriptors := []*monitoring.MetricDescriptor{
{Type: googleMetric, DisplayName: "Google Metric"},
}
customDescriptors := []*monitoring.MetricDescriptor{
{Type: customMetric, DisplayName: "Custom Metric"},
}

// Test that Google metrics are cached
cache.Store(googleMetric, googleDescriptors)
result := cache.Lookup(googleMetric)
if result == nil {
t.Error("Google metric should be cached")
}
if len(result) != 1 || result[0].Type != googleMetric {
t.Error("Cached Google metric should match stored value")
}

// Test that custom.googleapis.com metrics are also cached (they are Google metrics)
cache.Store(customMetric, customDescriptors)
result = cache.Lookup(customMetric)
if result == nil {
t.Error("Custom Google metric should be cached")
}
if len(result) != 1 || result[0].Type != customMetric {
t.Error("Cached custom Google metric should match stored value")
}

// Test expiration
time.Sleep(2 * ttl)
result = cache.Lookup(googleMetric)
if result != nil {
t.Error("Cached Google metric should have expired")
}
}

func TestNoopDescriptorCache(t *testing.T) {
cache := &noopDescriptorCache{}

descriptors := []*monitoring.MetricDescriptor{
{Type: "test.metric", DisplayName: "Test Metric"},
}

// Test that Lookup always returns nil
result := cache.Lookup("any.prefix")
if result != nil {
t.Error("Noop cache should always return nil on lookup")
}

// Test that Store does nothing (no panic)
cache.Store("any.prefix", descriptors)
result = cache.Lookup("any.prefix")
if result != nil {
t.Error("Noop cache should still return nil after store")
}
}

func TestNewMonitoringCollector(t *testing.T) {
logger := slog.Default()
monitoringService := &monitoring.Service{}

tests := []struct {
name string
projectID string
opts MonitoringCollectorOptions
expectError bool
}{
{
name: "basic collector creation",
projectID: "test-project",
opts: MonitoringCollectorOptions{
MetricTypePrefixes: []string{"pubsub.googleapis.com"},
RequestInterval: 5 * time.Minute,
},
expectError: false,
},
{
name: "collector with descriptor cache",
projectID: "test-project",
opts: MonitoringCollectorOptions{
MetricTypePrefixes: []string{"pubsub.googleapis.com"},
RequestInterval: 5 * time.Minute,
DescriptorCacheTTL: 10 * time.Minute,
DescriptorCacheOnlyGoogle: true,
},
expectError: false,
},
{
name: "collector with delta aggregation",
projectID: "test-project",
opts: MonitoringCollectorOptions{
MetricTypePrefixes: []string{"pubsub.googleapis.com"},
RequestInterval: 5 * time.Minute,
AggregateDeltas: true,
},
expectError: false,
},
{
name: "collector with all options",
projectID: "test-project",
opts: MonitoringCollectorOptions{
MetricTypePrefixes: []string{"pubsub.googleapis.com"},
ExtraFilters: []MetricFilter{{TargetedMetricPrefix: "pubsub.googleapis.com", FilterQuery: "resource.labels.topic_id=\"test\""}},
MetricAggregationConfigs: []MetricAggregationConfig{{TargetedMetricPrefix: "pubsub.googleapis.com", CrossSeriesReducer: "REDUCE_SUM"}},
RequestInterval: 5 * time.Minute,
RequestOffset: 1 * time.Minute,
IngestDelay: true,
FillMissingLabels: true,
DropDelegatedProjects: true,
AggregateDeltas: true,
DescriptorCacheTTL: 10 * time.Minute,
DescriptorCacheOnlyGoogle: false,
},
expectError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
collector, err := NewMonitoringCollector(tt.projectID, monitoringService, tt.opts, logger, nil, nil)

if tt.expectError && err == nil {
t.Error("Expected error but got none")
}
if !tt.expectError && err != nil {
t.Errorf("Unexpected error: %v", err)
}
if err != nil {
return
}

// Verify basic fields
if collector.projectID != tt.projectID {
t.Errorf("Expected projectID %s, got %s", tt.projectID, collector.projectID)
}

if len(collector.metricsTypePrefixes) != len(tt.opts.MetricTypePrefixes) {
t.Errorf("Expected %d metric prefixes, got %d", len(tt.opts.MetricTypePrefixes), len(collector.metricsTypePrefixes))
}

if collector.metricsInterval != tt.opts.RequestInterval {
t.Errorf("Expected interval %v, got %v", tt.opts.RequestInterval, collector.metricsInterval)
}

if collector.metricsOffset != tt.opts.RequestOffset {
t.Errorf("Expected offset %v, got %v", tt.opts.RequestOffset, collector.metricsOffset)
}

if collector.metricsIngestDelay != tt.opts.IngestDelay {
t.Errorf("Expected ingest delay %v, got %v", tt.opts.IngestDelay, collector.metricsIngestDelay)
}

if collector.collectorFillMissingLabels != tt.opts.FillMissingLabels {
t.Errorf("Expected fill missing labels %v, got %v", tt.opts.FillMissingLabels, collector.collectorFillMissingLabels)
}

if collector.monitoringDropDelegatedProjects != tt.opts.DropDelegatedProjects {
t.Errorf("Expected drop delegated projects %v, got %v", tt.opts.DropDelegatedProjects, collector.monitoringDropDelegatedProjects)
}

if collector.aggregateDeltas != tt.opts.AggregateDeltas {
t.Errorf("Expected aggregate deltas %v, got %v", tt.opts.AggregateDeltas, collector.aggregateDeltas)
}

// Verify descriptor cache type
if tt.opts.DescriptorCacheTTL == 0 {
if _, ok := collector.descriptorCache.(*noopDescriptorCache); !ok {
t.Error("Expected noop descriptor cache when TTL is 0")
}
} else if tt.opts.DescriptorCacheOnlyGoogle {
if _, ok := collector.descriptorCache.(*googleDescriptorCache); !ok {
t.Error("Expected google descriptor cache when only Google is enabled")
}
} else {
if _, ok := collector.descriptorCache.(*descriptorCache); !ok {
t.Error("Expected regular descriptor cache when TTL > 0 and not Google-only")
}
}

// Verify metrics are created
if collector.apiCallsTotalMetric == nil {
t.Error("Expected apiCallsTotalMetric to be created")
}
if collector.scrapesTotalMetric == nil {
t.Error("Expected scrapesTotalMetric to be created")
}
if collector.scrapeErrorsTotalMetric == nil {
t.Error("Expected scrapeErrorsTotalMetric to be created")
}
if collector.lastScrapeErrorMetric == nil {
t.Error("Expected lastScrapeErrorMetric to be created")
}
if collector.lastScrapeTimestampMetric == nil {
t.Error("Expected lastScrapeTimestampMetric to be created")
}
if collector.lastScrapeDurationSecondsMetric == nil {
t.Error("Expected lastScrapeDurationSecondsMetric to be created")
}
})
}
}

func TestMonitoringCollectorDescribe(t *testing.T) {
logger := slog.Default()
monitoringService := &monitoring.Service{}
opts := MonitoringCollectorOptions{
MetricTypePrefixes: []string{"pubsub.googleapis.com"},
RequestInterval: 5 * time.Minute,
}

collector, err := NewMonitoringCollector("test-project", monitoringService, opts, logger, nil, nil)
if err != nil {
t.Fatalf("Failed to create collector: %v", err)
}

// Create a channel to collect descriptions
ch := make(chan *prometheus.Desc, 10)

// Call Describe
collector.Describe(ch)
close(ch)

// Count the descriptions
count := 0
for range ch {
count++
}

// Should have 6 metrics: api_calls_total, scrapes_total, scrape_errors_total,
// last_scrape_error, last_scrape_timestamp, last_scrape_duration_seconds
expectedCount := 6
if count != expectedCount {
t.Errorf("Expected %d metric descriptions, got %d", expectedCount, count)
}
}
Loading