diff --git a/README.md b/README.md index 4ae8450..d4d9c30 100644 --- a/README.md +++ b/README.md @@ -290,6 +290,8 @@ For details of each metric type, see [Prometheus documentation](http://prometheu - `desc`: description of this metric (required) - `key`: key name of record for instrumentation (**optional**) - `initialized`: boolean controlling initilization of metric (**optional**). See [Metric initialization](#metric-initialization) +- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention) +- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention) - ``: additional labels for this metric (optional). See [Labels](#labels) - ``: labels to use for initialization of ReccordAccessors/Placeholder labels (**optional**). See [Metric initialization](#metric-initialization) @@ -316,6 +318,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by - `desc`: description of metric (required) - `key`: key name of record for instrumentation (required) - `initialized`: boolean controlling initilization of metric (**optional**). See [Metric initialization](#metric-initialization) +- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention) +- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention) - ``: additional labels for this metric (optional). See [Labels](#labels) - ``: labels to use for initialization of ReccordAccessors/Placeholder labels (**optional**). See [Metric initialization](#metric-initialization) @@ -340,6 +344,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by - `desc`: description of metric (required) - `key`: key name of record for instrumentation (required) - `initialized`: boolean controlling initilization of metric (**optional**). See [Metric initialization](#metric-initialization) +- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention) +- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention) - ``: additional labels for this metric (optional). See [Labels](#labels) - ``: labels to use for initialization of ReccordAccessors/Placeholder labels (**optional**). See [Metric initialization](#metric-initialization) @@ -366,6 +372,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by - `key`: key name of record for instrumentation (required) - `initialized`: boolean controlling initilization of metric (**optional**). See [Metric initialization](#metric-initialization) - `buckets`: buckets of record for instrumentation (optional) +- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention) +- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention) - ``: additional labels for this metric (optional). See [Labels](#labels) - ``: labels to use for initialization of ReccordAccessors/Placeholder labels (**optional**). See [Metric initialization](#metric-initialization) @@ -488,6 +496,33 @@ Prometheus output/filter plugin can have multiple metric section. Top-level labe In this case, `message_foo_counter` has `tag`, `hostname`, `key` and `data_type` labels. +## Retention + +By default metrics with all encountered label combinations are preserved until the next restart of fluentd. +Even if a label combination did not receive any update for a long time. +That behavior is not always desirable e.g. when the contents of of fields change for good and the metric becomes idle. +For these metrics you can set `retention` and `retention_check_interval` like this: + +``` + + name message_foo_counter + type counter + desc The total number of foo in message. + key foo + retention 3600 # 1h + retention_check_interval 1800 # 30m + + bar ${bar} + + +``` + +If `${bar}` was `baz` one time but after that no records with that value were processed, then after one hour the metric +`foo{bar="baz"}` might be removed. +When this actually happens depends on `retention_check_interval` (default 60). +It causes a background thread to check every 30 minutes for expired metrics. +So worst case the metrics are removed 30 minutes after expiration. +You can set this value as low as `1`, but that may put more stress on your CPU. ## Try plugin with nginx diff --git a/lib/fluent/plugin/filter_prometheus.rb b/lib/fluent/plugin/filter_prometheus.rb index ccdfe78..31735f7 100644 --- a/lib/fluent/plugin/filter_prometheus.rb +++ b/lib/fluent/plugin/filter_prometheus.rb @@ -7,6 +7,8 @@ class PrometheusFilter < Fluent::Plugin::Filter include Fluent::Plugin::PrometheusLabelParser include Fluent::Plugin::Prometheus + helpers :timer + def initialize super @registry = ::Prometheus::Client.registry @@ -22,6 +24,16 @@ def configure(conf) @metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels) end + def start + super + Fluent::Plugin::Prometheus.start_retention_checks( + @metrics, + @registry, + @log, + method(:timer_execute) + ) + end + def filter(tag, time, record) instrument_single(tag, time, record, @metrics) record diff --git a/lib/fluent/plugin/out_prometheus.rb b/lib/fluent/plugin/out_prometheus.rb index cdaae4d..aa1f02d 100644 --- a/lib/fluent/plugin/out_prometheus.rb +++ b/lib/fluent/plugin/out_prometheus.rb @@ -7,6 +7,8 @@ class PrometheusOutput < Fluent::Plugin::Output include Fluent::Plugin::PrometheusLabelParser include Fluent::Plugin::Prometheus + helpers :timer + def initialize super @registry = ::Prometheus::Client.registry @@ -22,6 +24,16 @@ def configure(conf) @metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels) end + def start + super + Fluent::Plugin::Prometheus.start_retention_checks( + @metrics, + @registry, + @log, + method(:timer_execute) + ) + end + def process(tag, es) instrument(tag, es, @metrics) end diff --git a/lib/fluent/plugin/prometheus.rb b/lib/fluent/plugin/prometheus.rb index 5db615a..3de3d52 100644 --- a/lib/fluent/plugin/prometheus.rb +++ b/lib/fluent/plugin/prometheus.rb @@ -1,6 +1,7 @@ require 'prometheus/client' require 'prometheus/client/formats/text' require 'fluent/plugin/prometheus/placeholder_expander' +require 'fluent/plugin/prometheus/data_store' module Fluent module Plugin @@ -144,6 +145,14 @@ def self.parse_metrics_elements(conf, registry, labels = {}) metrics end + def self.start_retention_checks(metrics, registry, log, timer_execute) + metrics.select { |metric| metric.has_retention? }.each do |metric| + timer_execute.call("prometheus_retention_#{metric.name}".to_sym, metric.retention_check_interval) do + metric.remove_expired_metrics(registry, log) + end + end + end + def self.placeholder_expander(log) Fluent::Plugin::Prometheus::ExpandBuilder.new(log: log) end @@ -160,6 +169,11 @@ def stringify_keys(hash_to_stringify) end.to_h end + def initialize + super + ::Prometheus::Client.config.data_store = Fluent::Plugin::Prometheus::DataStore.new + end + def configure(conf) super @placeholder_values = {} @@ -214,6 +228,8 @@ class Metric attr_reader :name attr_reader :key attr_reader :desc + attr_reader :retention + attr_reader :retention_check_interval def initialize(element, registry, labels) ['name', 'desc'].each do |key| @@ -226,7 +242,12 @@ def initialize(element, registry, labels) @key = element['key'] @desc = element['desc'] element['initialized'].nil? ? @initialized = false : @initialized = element['initialized'] == 'true' - + @retention = element['retention'].to_i + @retention_check_interval = element.fetch('retention_check_interval', 60).to_i + if has_retention? + @last_modified_store = LastModifiedStore.new + end + @base_labels = Fluent::Plugin::Prometheus.parse_labels_elements(element) @base_labels = labels.merge(@base_labels) @@ -273,6 +294,74 @@ def self.get(registry, name, type, docstring) metric end + + def set_value?(value) + if value + return true + end + false + end + + def instrument(record, expander) + value = self.value(record) + if self.set_value?(value) + labels = labels(record, expander) + set_value(value, labels) + if has_retention? + @last_modified_store.set_last_updated(labels) + end + end + end + + def has_retention? + @retention > 0 + end + + def remove_expired_metrics(registry, log) + if has_retention? + metric = registry.get(@name) + + expiration_time = Time.now - @retention + expired_label_sets = @last_modified_store.get_labels_not_modified_since(expiration_time) + + expired_label_sets.each { |expired_label_set| + log.debug "Metric #{@name} with labels #{expired_label_set} expired. Removing..." + metric.remove(expired_label_set) # this method is supplied by the require at the top of this method + @last_modified_store.remove(expired_label_set) + } + else + log.warn('remove_expired_metrics should not be called when retention is not set for this metric!') + end + end + + class LastModifiedStore + def initialize + @internal_store = Hash.new + @lock = Monitor.new + end + + def synchronize + @lock.synchronize { yield } + end + + def set_last_updated(labels) + synchronize do + @internal_store[labels] = Time.now + end + end + + def remove(labels) + synchronize do + @internal_store.delete(labels) + end + end + + def get_labels_not_modified_since(time) + synchronize do + @internal_store.select { |k, v| v < time }.keys + end + end + end end class Gauge < Metric @@ -293,16 +382,17 @@ def initialize(element, registry, labels) end end - def instrument(record, expander) + def value(record) if @key.is_a?(String) - value = record[@key] + record[@key] else - value = @key.call(record) - end - if value - @gauge.set(value, labels: labels(record, expander)) + @key.call(record) end end + + def set_value(value, labels) + @gauge.set(value, labels: labels) + end end class Counter < Metric @@ -319,20 +409,22 @@ def initialize(element, registry, labels) end end - def instrument(record, expander) - # use record value of the key if key is specified, otherwise just increment + def value(record) if @key.nil? - value = 1 + 1 elsif @key.is_a?(String) - value = record[@key] + record[@key] else - value = @key.call(record) + @key.call(record) end + end - # ignore if record value is nil - return if value.nil? + def set_value?(value) + !value.nil? + end - @counter.increment(by: value, labels: labels(record, expander)) + def set_value(value, labels) + @counter.increment(by: value, labels: labels) end end @@ -354,16 +446,17 @@ def initialize(element, registry, labels) end end - def instrument(record, expander) + def value(record) if @key.is_a?(String) - value = record[@key] + record[@key] else - value = @key.call(record) - end - if value - @summary.observe(value, labels: labels(record, expander)) + @key.call(record) end end + + def set_value(value, labels) + @summary.observe(value, labels: labels) + end end class Histogram < Metric @@ -391,16 +484,17 @@ def initialize(element, registry, labels) end end - def instrument(record, expander) + def value(record) if @key.is_a?(String) - value = record[@key] + record[@key] else - value = @key.call(record) - end - if value - @histogram.observe(value, labels: labels(record, expander)) + @key.call(record) end end + + def set_value(value, labels) + @histogram.observe(value, labels: labels) + end end end end diff --git a/lib/fluent/plugin/prometheus/data_store.rb b/lib/fluent/plugin/prometheus/data_store.rb new file mode 100644 index 0000000..d89d4d7 --- /dev/null +++ b/lib/fluent/plugin/prometheus/data_store.rb @@ -0,0 +1,82 @@ +# The default Prometheus client data store has no means of removing values. +# For the "retention" feature we need to be able to remove metrics with specific labels after some time of inactivity. +# By patching the Metric class and using our own DataStore we implement that missing feature. +module Prometheus + module Client + class Metric + def remove(labels) + label_set = label_set_for(labels) + @store.remove(labels: label_set) + end + end + end +end + +module Fluent + module Plugin + module Prometheus + # Stores all the data in simple hashes, one per metric. Each of these metrics + # synchronizes access to their hash, but multiple metrics can run observations + # concurrently. + class DataStore + class InvalidStoreSettingsError < StandardError; end + + def for_metric(metric_name, metric_type:, metric_settings: {}) + # We don't need `metric_type` or `metric_settings` for this particular store + validate_metric_settings(metric_settings: metric_settings) + MetricStore.new + end + + private + + def validate_metric_settings(metric_settings:) + unless metric_settings.empty? + raise InvalidStoreSettingsError, + "Synchronized doesn't allow any metric_settings" + end + end + + class MetricStore + def initialize + @internal_store = Hash.new { |hash, key| hash[key] = 0.0 } + @lock = Monitor.new + end + + def synchronize + @lock.synchronize { yield } + end + + def set(labels:, val:) + synchronize do + @internal_store[labels] = val.to_f + end + end + + def increment(labels:, by: 1) + synchronize do + @internal_store[labels] += by + end + end + + def get(labels:) + synchronize do + @internal_store[labels] + end + end + + def remove(labels:) + synchronize do + @internal_store.delete(labels) + end + end + + def all_values + synchronize { @internal_store.dup } + end + end + + private_constant :MetricStore + end + end + end +end diff --git a/spec/fluent/plugin/filter_prometheus_spec.rb b/spec/fluent/plugin/filter_prometheus_spec.rb index f98c8c6..2ae2209 100644 --- a/spec/fluent/plugin/filter_prometheus_spec.rb +++ b/spec/fluent/plugin/filter_prometheus_spec.rb @@ -45,4 +45,38 @@ it_behaves_like 'instruments record' end + + describe '#run with retention' do + let(:message) { { "foo" => 100, "bar" => 100, "baz" => 100, "qux" => 10 } } + + context 'config with retention 1' do + let(:config) { + BASE_CONFIG + %( + + name simple + type counter + desc Something foo. + key foo + + bar ${bar} + baz ${baz} + qux ${qux} + + retention 1 + retention_check_interval 1 + + ) + } + + it 'expires metric after max 2s' do + expect(registry.metrics.map(&:name)).not_to eq([:simple]) + driver.run(default_tag: tag) { + driver.feed(event_time, message) + expect(registry.metrics[0].get(labels: { :bar => 100, :baz => 100, :qux => 10 })).to eq(100) + sleep(2) + expect(registry.metrics[0].get(labels: { :bar => 100, :baz => 100, :qux => 10 })).to eq(0.0) + } + end + end + end end diff --git a/spec/fluent/plugin/out_prometheus_spec.rb b/spec/fluent/plugin/out_prometheus_spec.rb index e58f2e2..11b7c7f 100644 --- a/spec/fluent/plugin/out_prometheus_spec.rb +++ b/spec/fluent/plugin/out_prometheus_spec.rb @@ -69,4 +69,39 @@ it_behaves_like 'instruments record' end + + describe '#run with retention' do + let(:message) { { "foo" => 100, "bar" => 100, "baz" => 100, "qux" => 10 } } + let(:labels) { { :bar => 100, :baz => 100, :qux => 10 } } + + context 'config with retention 1' do + let(:config) { + BASE_CONFIG + %( + + name simple + type counter + desc Something foo. + key foo + + bar ${bar} + baz ${baz} + qux ${qux} + + retention 1 + retention_check_interval 1 + + ) + } + + it 'expires metric after max 2s' do + expect(registry.metrics.map(&:name)).not_to eq([:simple]) + driver.run(default_tag: tag) { + driver.feed(event_time, message) + expect(registry.metrics[0].get(labels: labels)).to eq(100) + sleep(2) + expect(registry.metrics[0].get(labels: labels)).to eq(0.0) + } + end + end + end end