Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 115 additions & 1 deletion internal/collector/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,89 @@ func TestHTTPCollector_MultipleEndpoints(t *testing.T) {
}

// ---------------------------------------------------------------------------
// 9. splitLabels — quoted values containing commas
// 9. HTTPCollector.Name
// ---------------------------------------------------------------------------

func TestHTTPCollector_Name(t *testing.T) {
c := NewHTTPCollector(nil, 5*time.Second)
if c.Name() != "http" {
t.Errorf("expected Name() == \"http\", got %q", c.Name())
}
}

// ---------------------------------------------------------------------------
// 10. parseMetrics — all type-switch branches
// ---------------------------------------------------------------------------

// TestParseMetrics_AllTypes exercises the int, int64, int32 and float32
// branches of parseMetrics, which are not reached through the HTTP path
// because JSON always deserialises numbers as float64.
func TestParseMetrics_AllTypes(t *testing.T) {
col := NewHTTPCollector(nil, 5*time.Second)

rawMetrics := map[string]interface{}{
"int_val": int(7),
"int64_val": int64(8),
"int32_val": int32(9),
"float32_val": float32(3.14),
"float64_val": float64(2.71),
"string_val": "skip_me", // should be ignored
}

metrics := col.parseMetrics("test_ep", rawMetrics)

// 5 numeric keys → 5 metrics (the string should be skipped).
if len(metrics) != 5 {
t.Fatalf("expected 5 metrics, got %d", len(metrics))
}

find := func(name string) *Metric {
for i := range metrics {
if metrics[i].Name == "app_"+name {
return &metrics[i]
}
}
return nil
}

cases := []struct {
key string
want float64
}{
{"int_val", 7},
{"int64_val", 8},
{"int32_val", 9},
}
for _, tc := range cases {
m := find(tc.key)
if m == nil {
t.Errorf("metric app_%s not found", tc.key)
continue
}
if m.Value != tc.want {
t.Errorf("app_%s: expected %v, got %v", tc.key, tc.want, m.Value)
}
if m.Labels["endpoint"] != "test_ep" {
t.Errorf("app_%s: expected endpoint label 'test_ep', got %q", tc.key, m.Labels["endpoint"])
}
}

// float32 loses some precision when converted; just check it's close.
f32m := find("float32_val")
if f32m == nil {
t.Error("metric app_float32_val not found")
} else if f32m.Value < 3.0 || f32m.Value > 4.0 {
t.Errorf("app_float32_val: unexpected value %v", f32m.Value)
}

// string key must NOT appear.
if find("string_val") != nil {
t.Error("app_string_val should have been skipped")
}
}

// ---------------------------------------------------------------------------
// 11. splitLabels — quoted values containing commas
// ---------------------------------------------------------------------------

func TestSplitLabels(t *testing.T) {
Expand Down Expand Up @@ -408,3 +490,35 @@ func TestSplitLabels(t *testing.T) {
})
}
}

func TestParsePrometheusLine_EdgeCases(t *testing.T) {
c := &HTTPCollector{}

t.Run("no value field", func(t *testing.T) {
result := c.parsePrometheusLine("test", "metric_name_only")
if result != nil {
t.Error("expected nil for line with no value")
}
})

t.Run("unparseable value", func(t *testing.T) {
result := c.parsePrometheusLine("test", "metric_name notanumber")
if result != nil {
t.Error("expected nil for non-numeric value")
}
})

t.Run("malformed labels no closing brace", func(t *testing.T) {
result := c.parsePrometheusLine("test", "metric{label=\"value\" 123")
if result != nil {
t.Error("expected nil for malformed labels")
}
})

t.Run("labels with no value after brace", func(t *testing.T) {
result := c.parsePrometheusLine("test", "metric{label=\"value\"}")
if result != nil {
t.Error("expected nil for labels with no value")
}
})
}
8 changes: 4 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

// Config represents the application configuration
type Config struct {
Server ServerConfig `json:"server"`
Collector CollectorConfig `json:"collector"`
Shipper ShipperConfig `json:"shipper"`
Endpoints []EndpointConfig `json:"endpoints"`
Server ServerConfig `json:"server"`
Collector CollectorConfig `json:"collector"`
Shipper ShipperConfig `json:"shipper"`
Endpoints []EndpointConfig `json:"endpoints"`
}

// ServerConfig contains HTTP server settings
Expand Down
163 changes: 163 additions & 0 deletions internal/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package orchestrator

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -177,3 +178,165 @@ func TestStop(t *testing.T) {
t.Fatal("Start did not return after Stop()")
}
}

// retryShipper is a shipper that fails for the first N calls then succeeds.
type retryShipper struct {
mu sync.Mutex
callCount int
failUntil int // fail for first N calls
shipped [][]collector.Metric
}

func (r *retryShipper) Ship(_ context.Context, metrics []collector.Metric) error {
r.mu.Lock()
defer r.mu.Unlock()
r.callCount++
if r.callCount <= r.failUntil {
return fmt.Errorf("ship error on call %d", r.callCount)
}
r.shipped = append(r.shipped, metrics)
return nil
}

func (r *retryShipper) Close() error { return nil }

func (r *retryShipper) calls() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.callCount
}

func (r *retryShipper) successCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.shipped)
}

// TestCollectAndShip_ShipRetry verifies the path where the first Ship call
// fails but the retry succeeds — metrics must still be delivered.
func TestCollectAndShip_ShipRetry(t *testing.T) {
userMetrics := []collector.Metric{
{Name: "cpu", Value: 1.0, Type: "gauge", Labels: map[string]string{}},
}
reg := collector.NewRegistry()
reg.Register(&mockCollector{name: "test", metrics: userMetrics})

shpr := &retryShipper{failUntil: 1} // fail first call, succeed on retry

o := NewOrchestrator(reg, shpr, 10*time.Minute)
o.collectAndShip(context.Background())

// Ship should have been called exactly twice (original + retry).
if shpr.calls() != 2 {
t.Errorf("expected 2 Ship calls (1 fail + 1 retry), got %d", shpr.calls())
}
// The retry succeeded, so metrics should have been delivered once.
if shpr.successCount() != 1 {
t.Errorf("expected 1 successful delivery, got %d", shpr.successCount())
}
// lastShipDuration should be set after a successful retry.
if o.lastShipDuration == 0 {
t.Error("expected lastShipDuration to be non-zero after successful retry")
}
}

// TestCollectAndShip_ShipRetryFails verifies the path where both the original
// Ship call and the retry fail — no panic, lastShipDuration still set.
func TestCollectAndShip_ShipRetryFails(t *testing.T) {
userMetrics := []collector.Metric{
{Name: "cpu", Value: 2.0, Type: "gauge", Labels: map[string]string{}},
}
reg := collector.NewRegistry()
reg.Register(&mockCollector{name: "test", metrics: userMetrics})

shpr := &retryShipper{failUntil: 999} // always fail

o := NewOrchestrator(reg, shpr, 10*time.Minute)
o.collectAndShip(context.Background())

// Ship should have been called exactly twice (original + retry).
if shpr.calls() != 2 {
t.Errorf("expected 2 Ship calls (original + retry), got %d", shpr.calls())
}
// No metrics should have been successfully delivered.
if shpr.successCount() != 0 {
t.Errorf("expected 0 successful deliveries, got %d", shpr.successCount())
}
// lastShipDuration is still updated even on full failure.
if o.lastShipDuration == 0 {
t.Error("expected lastShipDuration to be non-zero even after failed retry")
}
}

// TestCollectAndShip_DeadlineWarning verifies that collectAndShip completes
// without panic when the collection duration exceeds 80 % of the interval.
// We can't assert on the log output but we can ensure the cycle still ships.
func TestCollectAndShip_DeadlineWarning(t *testing.T) {
// Use a very short interval so even a trivial collection duration exceeds 80 %.
interval := 1 * time.Nanosecond

reg := collector.NewRegistry()
reg.Register(&mockCollector{
name: "slow",
metrics: []collector.Metric{{Name: "m", Value: 1, Type: "gauge", Labels: map[string]string{}}},
})

shpr := &mockShipper{}
o := NewOrchestrator(reg, shpr, interval)

// collectAndShip must not panic even when the deadline warning fires.
o.collectAndShip(context.Background())

if shpr.calls() < 1 {
t.Error("expected at least one Ship call despite deadline warning")
}
}

// TestCollectAndShip_LastShipDurationIncluded verifies that on the second call
// to collectAndShip the metricsd_ship_duration_seconds internal metric is
// present in the shipped batch (because lastShipDuration was set by the first).
func TestCollectAndShip_LastShipDurationIncluded(t *testing.T) {
reg := collector.NewRegistry()
reg.Register(&mockCollector{
name: "test",
metrics: []collector.Metric{{Name: "cpu", Value: 1.0, Type: "gauge", Labels: map[string]string{}}},
})

shpr := &mockShipper{}
o := NewOrchestrator(reg, shpr, 10*time.Minute)

// First cycle — sets lastShipDuration but does NOT include it in shipped metrics.
o.collectAndShip(context.Background())

if shpr.calls() != 1 {
t.Fatalf("expected 1 Ship call after first cycle, got %d", shpr.calls())
}
firstBatch := shpr.firstBatch()
for _, m := range firstBatch {
if m.Name == "metricsd_ship_duration_seconds" {
t.Error("metricsd_ship_duration_seconds should NOT appear in first cycle batch")
}
}

// Second cycle — lastShipDuration > 0, so ship metric must be included.
o.collectAndShip(context.Background())

if shpr.calls() != 2 {
t.Fatalf("expected 2 Ship calls after second cycle, got %d", shpr.calls())
}

shpr.mu.Lock()
secondBatch := shpr.shipped[1]
shpr.mu.Unlock()

found := false
for _, m := range secondBatch {
if m.Name == "metricsd_ship_duration_seconds" {
found = true
break
}
}
if !found {
t.Error("expected metricsd_ship_duration_seconds in second cycle batch")
}
}
20 changes: 10 additions & 10 deletions internal/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ type PluginConfig struct {
Name string `json:"name"`
Path string `json:"-"` // Set by discovery, not from JSON
Args []string `json:"args,omitempty"`
Timeout int `json:"timeout,omitempty"` // Seconds
Timeout int `json:"timeout,omitempty"` // Seconds
Env []string `json:"env,omitempty"`
WorkingDir string `json:"working_dir,omitempty"`
Enabled *bool `json:"enabled,omitempty"` // Pointer to distinguish unset from false
Enabled *bool `json:"enabled,omitempty"` // Pointer to distinguish unset from false
Interval int `json:"interval_seconds,omitempty"`
}

Expand Down Expand Up @@ -45,14 +45,14 @@ type PluginMetric struct {
// PluginHealth tracks the runtime health state of a single plugin.
// Owned by the Manager, not the plugin itself.
type PluginHealth struct {
Name string
Status string // "ok", "failing", "circuit_open"
ConsecutiveFails int
LastError string
LastSuccess time.Time
LastCollect time.Time
LastMetricCount int
CircuitOpenUntil time.Time // Zero means circuit closed
Name string
Status string // "ok", "failing", "circuit_open"
ConsecutiveFails int
LastError string
LastSuccess time.Time
LastCollect time.Time
LastMetricCount int
CircuitOpenUntil time.Time // Zero means circuit closed
}

// DefaultTimeout is the fallback plugin timeout.
Expand Down
Loading
Loading