Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ For details of each metric type, see [Prometheus documentation](http://prometheu
- `desc`: description of this metric (required)
- `key`: key name of record for instrumentation (**optional**)
- `initialized`: boolean controlling initilization of metric (**optional**). See [Metric initialization](#metric-initialization)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)
- `<initlabels>`: labels to use for initialization of ReccordAccessors/Placeholder labels (**optional**). See [Metric initialization](#metric-initialization)

Expand All @@ -316,6 +318,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
- `desc`: description of metric (required)
- `key`: key name of record for instrumentation (required)
- `initialized`: boolean controlling initilization of metric (**optional**). See [Metric initialization](#metric-initialization)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)
- `<initlabels>`: labels to use for initialization of ReccordAccessors/Placeholder labels (**optional**). See [Metric initialization](#metric-initialization)

Expand All @@ -340,6 +344,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
- `desc`: description of metric (required)
- `key`: key name of record for instrumentation (required)
- `initialized`: boolean controlling initilization of metric (**optional**). See [Metric initialization](#metric-initialization)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)
- `<initlabels>`: labels to use for initialization of ReccordAccessors/Placeholder labels (**optional**). See [Metric initialization](#metric-initialization)

Expand All @@ -366,6 +372,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
- `key`: key name of record for instrumentation (required)
- `initialized`: boolean controlling initilization of metric (**optional**). See [Metric initialization](#metric-initialization)
- `buckets`: buckets of record for instrumentation (optional)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)
- `<initlabels>`: labels to use for initialization of ReccordAccessors/Placeholder labels (**optional**). See [Metric initialization](#metric-initialization)

Expand Down Expand Up @@ -488,6 +496,33 @@ Prometheus output/filter plugin can have multiple metric section. Top-level labe

In this case, `message_foo_counter` has `tag`, `hostname`, `key` and `data_type` labels.

## Retention

By default metrics with all encountered label combinations are preserved until the next restart of fluentd.
Even if a label combination did not receive any update for a long time.
That behavior is not always desirable e.g. when the contents of of fields change for good and the metric becomes idle.
For these metrics you can set `retention` and `retention_check_interval` like this:

```
<metric>
name message_foo_counter
type counter
desc The total number of foo in message.
key foo
retention 3600 # 1h
retention_check_interval 1800 # 30m
<labels>
bar ${bar}
</labels>
</metric>
```

If `${bar}` was `baz` one time but after that no records with that value were processed, then after one hour the metric
`foo{bar="baz"}` might be removed.
When this actually happens depends on `retention_check_interval` (default 60).
It causes a background thread to check every 30 minutes for expired metrics.
So worst case the metrics are removed 30 minutes after expiration.
You can set this value as low as `1`, but that may put more stress on your CPU.

## Try plugin with nginx

Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin/filter_prometheus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class PrometheusFilter < Fluent::Plugin::Filter
include Fluent::Plugin::PrometheusLabelParser
include Fluent::Plugin::Prometheus

helpers :timer

def initialize
super
@registry = ::Prometheus::Client.registry
Expand All @@ -22,6 +24,16 @@ def configure(conf)
@metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels)
end

def start
super
Fluent::Plugin::Prometheus.start_retention_checks(
@metrics,
@registry,
@log,
method(:timer_execute)
)
end

def filter(tag, time, record)
instrument_single(tag, time, record, @metrics)
record
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin/out_prometheus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class PrometheusOutput < Fluent::Plugin::Output
include Fluent::Plugin::PrometheusLabelParser
include Fluent::Plugin::Prometheus

helpers :timer

def initialize
super
@registry = ::Prometheus::Client.registry
Expand All @@ -22,6 +24,16 @@ def configure(conf)
@metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels)
end

def start
super
Fluent::Plugin::Prometheus.start_retention_checks(
@metrics,
@registry,
@log,
method(:timer_execute)
)
end

def process(tag, es)
instrument(tag, es, @metrics)
end
Expand Down
148 changes: 121 additions & 27 deletions lib/fluent/plugin/prometheus.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'prometheus/client'
require 'prometheus/client/formats/text'
require 'fluent/plugin/prometheus/placeholder_expander'
require 'fluent/plugin/prometheus/data_store'

module Fluent
module Plugin
Expand Down Expand Up @@ -144,6 +145,14 @@ def self.parse_metrics_elements(conf, registry, labels = {})
metrics
end

def self.start_retention_checks(metrics, registry, log, timer_execute)
metrics.select { |metric| metric.has_retention? }.each do |metric|
timer_execute.call("prometheus_retention_#{metric.name}".to_sym, metric.retention_check_interval) do
metric.remove_expired_metrics(registry, log)
end
end
end

def self.placeholder_expander(log)
Fluent::Plugin::Prometheus::ExpandBuilder.new(log: log)
end
Expand All @@ -160,6 +169,11 @@ def stringify_keys(hash_to_stringify)
end.to_h
end

def initialize
super
::Prometheus::Client.config.data_store = Fluent::Plugin::Prometheus::DataStore.new
end

def configure(conf)
super
@placeholder_values = {}
Expand Down Expand Up @@ -214,6 +228,8 @@ class Metric
attr_reader :name
attr_reader :key
attr_reader :desc
attr_reader :retention
attr_reader :retention_check_interval

def initialize(element, registry, labels)
['name', 'desc'].each do |key|
Expand All @@ -226,7 +242,12 @@ def initialize(element, registry, labels)
@key = element['key']
@desc = element['desc']
element['initialized'].nil? ? @initialized = false : @initialized = element['initialized'] == 'true'

@retention = element['retention'].to_i
@retention_check_interval = element.fetch('retention_check_interval', 60).to_i
if has_retention?
@last_modified_store = LastModifiedStore.new
end

@base_labels = Fluent::Plugin::Prometheus.parse_labels_elements(element)
@base_labels = labels.merge(@base_labels)

Expand Down Expand Up @@ -273,6 +294,74 @@ def self.get(registry, name, type, docstring)

metric
end

def set_value?(value)
if value
return true
end
false
end

def instrument(record, expander)
value = self.value(record)
if self.set_value?(value)
labels = labels(record, expander)
set_value(value, labels)
if has_retention?
@last_modified_store.set_last_updated(labels)
end
end
end

def has_retention?
@retention > 0
end

def remove_expired_metrics(registry, log)
if has_retention?
metric = registry.get(@name)

expiration_time = Time.now - @retention
expired_label_sets = @last_modified_store.get_labels_not_modified_since(expiration_time)

expired_label_sets.each { |expired_label_set|
log.debug "Metric #{@name} with labels #{expired_label_set} expired. Removing..."
metric.remove(expired_label_set) # this method is supplied by the require at the top of this method
@last_modified_store.remove(expired_label_set)
}
else
log.warn('remove_expired_metrics should not be called when retention is not set for this metric!')
end
end

class LastModifiedStore
def initialize
@internal_store = Hash.new
@lock = Monitor.new
end

def synchronize
@lock.synchronize { yield }
end

def set_last_updated(labels)
synchronize do
@internal_store[labels] = Time.now
end
end

def remove(labels)
synchronize do
@internal_store.delete(labels)
end
end

def get_labels_not_modified_since(time)
synchronize do
@internal_store.select { |k, v| v < time }.keys
end
end
end
end

class Gauge < Metric
Expand All @@ -293,16 +382,17 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
def value(record)
if @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
end
if value
@gauge.set(value, labels: labels(record, expander))
@key.call(record)
end
end

def set_value(value, labels)
@gauge.set(value, labels: labels)
end
end

class Counter < Metric
Expand All @@ -319,20 +409,22 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
# use record value of the key if key is specified, otherwise just increment
def value(record)
if @key.nil?
value = 1
1
elsif @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
@key.call(record)
end
end

# ignore if record value is nil
return if value.nil?
def set_value?(value)
!value.nil?
end

@counter.increment(by: value, labels: labels(record, expander))
def set_value(value, labels)
@counter.increment(by: value, labels: labels)
end
end

Expand All @@ -354,16 +446,17 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
def value(record)
if @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
end
if value
@summary.observe(value, labels: labels(record, expander))
@key.call(record)
end
end

def set_value(value, labels)
@summary.observe(value, labels: labels)
end
end

class Histogram < Metric
Expand Down Expand Up @@ -391,16 +484,17 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
def value(record)
if @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
end
if value
@histogram.observe(value, labels: labels(record, expander))
@key.call(record)
end
end

def set_value(value, labels)
@histogram.observe(value, labels: labels)
end
end
end
end
Expand Down
Loading