Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,14 +649,16 @@ func SetDefaultValues(data interface{}) error {
var err error
switch vFieldKind {
case reflect.Struct:
err = SetDefaultValues(vField.Addr().Interface())
if err != nil {
return err
if vField.Addr().CanInterface() {
err = SetDefaultValues(vField.Addr().Interface())
if err != nil {
return err
}
}
case reflect.Slice:
for i := 0; i < vField.Len(); i++ {
item := vField.Index(i)
if item.Kind() == reflect.Struct {
if item.Kind() == reflect.Struct && item.Addr().CanInterface() {
err = SetDefaultValues(item.Addr().Interface())
if err != nil {
return err
Expand Down
62 changes: 40 additions & 22 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
metricHoldDuration := pipeline.DefaultMetricHoldDuration
metaCacheSize := pipeline.DefaultMetaCacheSize
pool := ""
metricMaxLabelValueLength := pipeline.DefaultMetricMaxLabelValueLength

if settings != nil {
val := settings.Get("capacity").MustInt()
Expand Down Expand Up @@ -104,38 +105,55 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
sourceNameMetaField = settings.Get("source_name_meta_field").MustString()
isStrict = settings.Get("is_strict").MustBool()

if str := settings.Get("pool").MustString(); str != "" {
pool = str
}

if metrics := settings.Get("metrics"); metrics != nil {
str = metrics.Get("hold_duration").MustString()
if str != "" {
i, err := time.ParseDuration(str)
if err != nil {
logger.Fatalf("can't parse pipeline metric hold duration: %s", err.Error())
}
metricHoldDuration = i
}

val = metrics.Get("max_label_value_length").MustInt()
if val != 0 {
metricMaxLabelValueLength = val
}
}

str = settings.Get("metric_hold_duration").MustString()
if str != "" {
if str != "" && metricHoldDuration == pipeline.DefaultMetricHoldDuration {
i, err := time.ParseDuration(str)
if err != nil {
logger.Fatalf("can't parse pipeline metric hold duration: %s", err.Error())
}
metricHoldDuration = i
}

if str := settings.Get("pool").MustString(); str != "" {
pool = str
}
}

return &pipeline.Settings{
Decoder: decoder,
DecoderParams: decoderParams,
Capacity: capacity,
MetaCacheSize: metaCacheSize,
AvgEventSize: avgInputEventSize,
MaxEventSize: maxInputEventSize,
CutOffEventByLimit: cutOffEventByLimit,
CutOffEventByLimitField: cutOffEventByLimitField,
AntispamThreshold: antispamThreshold,
AntispamExceptions: antispamExceptions,
SourceNameMetaField: sourceNameMetaField,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
StreamField: streamField,
IsStrict: isStrict,
MetricHoldDuration: metricHoldDuration,
Pool: pipeline.PoolType(pool),
Decoder: decoder,
DecoderParams: decoderParams,
Capacity: capacity,
MetaCacheSize: metaCacheSize,
AvgEventSize: avgInputEventSize,
MaxEventSize: maxInputEventSize,
CutOffEventByLimit: cutOffEventByLimit,
CutOffEventByLimitField: cutOffEventByLimitField,
AntispamThreshold: antispamThreshold,
AntispamExceptions: antispamExceptions,
SourceNameMetaField: sourceNameMetaField,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
StreamField: streamField,
IsStrict: isStrict,
MetricHoldDuration: metricHoldDuration,
Pool: pipeline.PoolType(pool),
MetricMaxLabelValueLength: metricMaxLabelValueLength,
}
}

Expand Down
34 changes: 29 additions & 5 deletions metric/held_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,27 @@ func (h HeldCounter) Add(v float64) {
h.updateUsage()
}

func (h HeldCounter) Get() prometheus.Counter {
return h.metric
}

type HeldCounterVec struct {
store *heldMetricsStore[prometheus.Counter]
vec *prometheus.CounterVec
store *heldMetricsStore[prometheus.Counter]
vec *prometheus.CounterVec
metricMaxLabelValueLength int
}

func NewHeldCounterVec(cv *prometheus.CounterVec) HeldCounterVec {
func NewHeldCounterVec(cv *prometheus.CounterVec, metricMaxLabelValueLength int) HeldCounterVec {
return HeldCounterVec{
vec: cv,
store: newHeldMetricsStore[prometheus.Counter](),
vec: cv,
store: newHeldMetricsStore[prometheus.Counter](),
metricMaxLabelValueLength: metricMaxLabelValueLength,
}
}

func (h HeldCounterVec) WithLabelValues(lvs ...string) HeldCounter {
TruncateLabels(lvs, h.metricMaxLabelValueLength)

return HeldCounter{
heldMetric: h.store.GetOrCreate(lvs, h.vec.WithLabelValues),
}
Expand All @@ -41,3 +49,19 @@ func (h HeldCounterVec) WithLabelValues(lvs ...string) HeldCounter {
func (h HeldCounterVec) DeleteOldMetrics(holdDuration time.Duration) {
h.store.DeleteOldMetrics(holdDuration, h.vec)
}

func (h HeldCounterVec) GetVec() *prometheus.CounterVec {
return h.vec
}

func TruncateLabels(lvs []string, metricMaxLabelValueLength int) {
if metricMaxLabelValueLength == 0 {
return
}

for i, label := range lvs {
if len(label) > metricMaxLabelValueLength {
lvs[i] = label[:metricMaxLabelValueLength]
}
}
}
18 changes: 13 additions & 5 deletions metric/held_gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,22 @@ func (h HeldGauge) Sub(v float64) {
}

type HeldGaugeVec struct {
store *heldMetricsStore[prometheus.Gauge]
vec *prometheus.GaugeVec
store *heldMetricsStore[prometheus.Gauge]
vec *prometheus.GaugeVec
metricMaxLabelValueLength int
}

func NewHeldGaugeVec(gv *prometheus.GaugeVec) HeldGaugeVec {
func NewHeldGaugeVec(gv *prometheus.GaugeVec, metricMaxLabelValueLength int) HeldGaugeVec {
return HeldGaugeVec{
vec: gv,
store: newHeldMetricsStore[prometheus.Gauge](),
vec: gv,
store: newHeldMetricsStore[prometheus.Gauge](),
metricMaxLabelValueLength: metricMaxLabelValueLength,
}
}

func (h HeldGaugeVec) WithLabelValues(lvs ...string) HeldGauge {
TruncateLabels(lvs, h.metricMaxLabelValueLength)

return HeldGauge{
heldMetric: h.store.GetOrCreate(lvs, h.vec.WithLabelValues),
}
Expand All @@ -56,3 +60,7 @@ func (h HeldGaugeVec) WithLabelValues(lvs ...string) HeldGauge {
func (h HeldGaugeVec) DeleteOldMetrics(holdDuration time.Duration) {
h.store.DeleteOldMetrics(holdDuration, h.vec)
}

func (h HeldGaugeVec) DeleteLabelValues(lvs ...string) {
h.vec.DeleteLabelValues(lvs...)
}
14 changes: 9 additions & 5 deletions metric/held_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ func (h HeldHistogram) Observe(v float64) {
}

type HeldHistogramVec struct {
store *heldMetricsStore[prometheus.Histogram]
vec *prometheus.HistogramVec
store *heldMetricsStore[prometheus.Histogram]
vec *prometheus.HistogramVec
metricMaxLabelValueLength int
}

func NewHeldHistogramVec(hv *prometheus.HistogramVec) HeldHistogramVec {
func NewHeldHistogramVec(hv *prometheus.HistogramVec, metricMaxLabelValueLength int) HeldHistogramVec {
return HeldHistogramVec{
vec: hv,
store: newHeldMetricsStore[prometheus.Histogram](),
vec: hv,
store: newHeldMetricsStore[prometheus.Histogram](),
metricMaxLabelValueLength: metricMaxLabelValueLength,
}
}

func (h HeldHistogramVec) WithLabelValues(lvs ...string) HeldHistogram {
TruncateLabels(lvs, h.metricMaxLabelValueLength)

return HeldHistogram{
heldMetric: h.store.GetOrCreate(lvs, func(s ...string) prometheus.Histogram {
return h.vec.WithLabelValues(s...).(prometheus.Histogram)
Expand Down
4 changes: 2 additions & 2 deletions metric/held_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestLabelExpiration(t *testing.T) {
ctl := NewCtl("test", prometheus.NewRegistry())
promCounter := ctl.RegisterCounterVec("errors", "", "level")

c := NewHeldCounterVec(promCounter)
c := NewHeldCounterVec(promCounter, 100)

now := time.Now().UnixNano()
xtime.SetNowTime(now)
Expand Down Expand Up @@ -101,7 +101,7 @@ var holderBenchCases = []struct {
func BenchmarkMetricHolder(b *testing.B) {
for _, benchCase := range holderBenchCases {
ctl := NewCtl("test", prometheus.NewRegistry())
holder := NewHolder(time.Minute)
holder := NewHolder(time.Minute, 100)

promCounter := ctl.RegisterCounterVec("test_name", "", benchCase.Labels...)
counter := holder.AddCounterVec(promCounter)
Expand Down
18 changes: 10 additions & 8 deletions metric/holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ type heldMetricVec interface {
}

type Holder struct {
holdDuration time.Duration
heldMetrics []heldMetricVec
holdDuration time.Duration
heldMetrics []heldMetricVec
metricMaxLabelValueLength int
}

// NewHolder returns new metric holder. The holdDuration must be more than 1m.
func NewHolder(holdDuration time.Duration) *Holder {
func NewHolder(holdDuration time.Duration, metricMaxLabelValueLength int) *Holder {
if holdDuration < time.Minute {
panic("hold duration must be greater than 1m")
}
return &Holder{
holdDuration: holdDuration,
heldMetrics: make([]heldMetricVec, 0),
holdDuration: holdDuration,
heldMetrics: make([]heldMetricVec, 0),
metricMaxLabelValueLength: metricMaxLabelValueLength,
}
}

Expand All @@ -31,19 +33,19 @@ func (h *Holder) Maintenance() {
}

func (h *Holder) AddCounterVec(counterVec *prometheus.CounterVec) HeldCounterVec {
hcv := NewHeldCounterVec(counterVec)
hcv := NewHeldCounterVec(counterVec, h.metricMaxLabelValueLength)
h.heldMetrics = append(h.heldMetrics, hcv)
return hcv
}

func (h *Holder) AddGaugeVec(gaugeVec *prometheus.GaugeVec) HeldGaugeVec {
hgv := NewHeldGaugeVec(gaugeVec)
hgv := NewHeldGaugeVec(gaugeVec, h.metricMaxLabelValueLength)
h.heldMetrics = append(h.heldMetrics, hgv)
return hgv
}

func (h *Holder) AddHistogramVec(histogramVec *prometheus.HistogramVec) HeldHistogramVec {
hhv := NewHeldHistogramVec(histogramVec)
hhv := NewHeldHistogramVec(histogramVec, h.metricMaxLabelValueLength)
h.heldMetrics = append(h.heldMetrics, hhv)
return hhv
}
Expand Down
27 changes: 27 additions & 0 deletions pipeline/README.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Whether to fatal on decoding error.

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

> ⚠ DEPRECATED. Use `hold_duration` in `metrics` section instead.

<br>

**`pool`** *`string`* *`options=std|low_memory`*
Expand All @@ -131,6 +133,31 @@ Default pool is `low_memory`.

<br>

## Metrics

Section for metrics in settings. Example:

```yaml
pipelines:
test:
settings:
metrics:
hold_duration: 1h
max_label_value_length: 100
```

**`hold_duration`** *`string`* *`default=30m`*

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

<br>

**`max_label_value_length`** *`int`* *`default=0`*

Maximum length of custom metric labels in action plugins. If zero, no limit is set.

<br>

## Datetime parse formats

Most of the plugins which work with parsing datetime call `pipeline.ParseTime` function. It accepts datetime layouts the same way as Go [time.Parse](https://pkg.go.dev/time#Parse) (in format of datetime like `2006-01-02T15:04:05.999999999Z07:00`) except unix timestamp formats, they can only be specified via aliases.
Expand Down
27 changes: 27 additions & 0 deletions pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Whether to fatal on decoding error.

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

> ⚠ DEPRECATED. Use `hold_duration` in `metrics` section instead.

<br>

**`pool`** *`string`* *`options=std|low_memory`*
Expand All @@ -131,6 +133,31 @@ Default pool is `low_memory`.

<br>

## Metrics

Section for metrics in settings. Example:

```yaml
pipelines:
test:
settings:
metrics:
hold_duration: 1h
max_label_value_length: 100
```

**`hold_duration`** *`string`* *`default=30m`*

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

<br>

**`max_label_value_length`** *`int`* *`default=0`*

Maximum length of custom metric labels in action plugins. If zero, no limit is set.

<br>

## Datetime parse formats

Most of the plugins which work with parsing datetime call `pipeline.ParseTime` function. It accepts datetime layouts the same way as Go [time.Parse](https://pkg.go.dev/time#Parse) (in format of datetime like `2006-01-02T15:04:05.999999999Z07:00`) except unix timestamp formats, they can only be specified via aliases.
Expand Down
Loading