diff --git a/CHANGELOG.md b/CHANGELOG.md index 61eee8a099e..a7737bac5a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873 * [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893 * [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978 +* [ENHANCEMENT] OTLP: Add a dedicated Cortex converter to skip the intermediate conversion from Prometheus format to Cortex for performance. #7014 * [ENHANCEMENT] Modernizes the entire codebase by using go modernize tool. #7005 * [ENHANCEMENT] Overrides Exporter: Expose all fields that can be converted to float64. Also, the label value `max_local_series_per_metric` got renamed to `max_series_per_metric`, and `max_local_series_per_user` got renamed to `max_series_per_user`. #6979 * [ENHANCEMENT] Ingester: Add `cortex_ingester_tsdb_wal_replay_unknown_refs_total` and `cortex_ingester_tsdb_wbl_replay_unknown_refs_total` metrics to track unknown series references during wal/wbl replaying. #6945 diff --git a/go.mod b/go.mod index 0d01d06bb69..312330e8f40 100644 --- a/go.mod +++ b/go.mod @@ -85,11 +85,14 @@ require ( github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 github.com/prometheus-community/parquet-common v0.0.0-20250827225610-65f0b68d35e6 + github.com/prometheus/otlptranslator v0.0.0-20250731173911-a9673827589a github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 go.opentelemetry.io/collector/pdata v1.35.0 + go.opentelemetry.io/collector/semconv v0.128.0 go.uber.org/automaxprocs v1.6.0 + go.uber.org/multierr v1.11.0 google.golang.org/protobuf v1.36.6 ) @@ -226,7 +229,6 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus-community/prom-label-proxy v0.11.1 // indirect github.com/prometheus/exporter-toolkit v0.14.0 // indirect - github.com/prometheus/otlptranslator v0.0.0-20250731173911-a9673827589a // indirect github.com/prometheus/sigv4 v0.2.0 // indirect github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect github.com/rantav/go-grpc-channelz v0.0.4 // indirect @@ -258,7 +260,6 @@ require ( go.opentelemetry.io/collector/internal/telemetry v0.129.0 // indirect go.opentelemetry.io/collector/pipeline v0.129.0 // indirect go.opentelemetry.io/collector/processor v1.35.0 // indirect - go.opentelemetry.io/collector/semconv v0.128.0 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.11.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.35.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect @@ -273,7 +274,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.36.0 // indirect go.opentelemetry.io/proto/otlp v1.7.0 // indirect go.uber.org/goleak v1.3.0 // indirect - go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect go4.org/intern v0.0.0-20230525184215-6c62f75575cb // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20231121144256-b99613f794b6 // indirect diff --git a/pkg/util/push/cortexotlpconverter/context.go b/pkg/util/push/cortexotlpconverter/context.go new file mode 100644 index 00000000000..d116ed6ab8e --- /dev/null +++ b/pkg/util/push/cortexotlpconverter/context.go @@ -0,0 +1,25 @@ +package cortexotlpconverter + +import "context" + +// everyNTimes supports checking for context error every n times. +type everyNTimes struct { + n int + i int + err error +} + +// checkContext calls ctx.Err() every e.n times and returns an eventual error. +func (e *everyNTimes) checkContext(ctx context.Context) error { + if e.err != nil { + return e.err + } + + e.i++ + if e.i >= e.n { + e.i = 0 + e.err = ctx.Err() + } + + return e.err +} diff --git a/pkg/util/push/cortexotlpconverter/helper.go b/pkg/util/push/cortexotlpconverter/helper.go new file mode 100644 index 00000000000..5356daf08e3 --- /dev/null +++ b/pkg/util/push/cortexotlpconverter/helper.go @@ -0,0 +1,665 @@ +package cortexotlpconverter + +import ( + "context" + "encoding/hex" + "fmt" + "log" + "math" + "slices" + "sort" + "strconv" + "strings" + "time" + "unicode/utf8" + + "github.com/cespare/xxhash/v2" + "github.com/prometheus/common/model" + "github.com/prometheus/otlptranslator" + "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/model/value" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +const ( + sumStr = "_sum" + countStr = "_count" + bucketStr = "_bucket" + leStr = "le" + quantileStr = "quantile" + pInfStr = "+Inf" + // maxExemplarRunes is the maximum number of UTF-8 exemplar characters + // according to the prometheus specification + // https://github.com/prometheus/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#exemplars + maxExemplarRunes = 128 + // Trace and Span id keys are defined as part of the spec: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification%2Fmetrics%2Fdatamodel.md#exemplars-2 + traceIDKey = "trace_id" + spanIDKey = "span_id" + infoType = "info" + targetMetricName = "target_info" + defaultLookbackDelta = 5 * time.Minute +) + +type bucketBoundsData struct { + ts *cortexpb.TimeSeries + bound float64 +} + +// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds. +type byBucketBoundsData []bucketBoundsData + +func (m byBucketBoundsData) Len() int { return len(m) } +func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound } +func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] } + +// ByLabelName enables the usage of sort.Sort() with a slice of labels. +type ByLabelName []cortexpb.LabelAdapter + +func (a ByLabelName) Len() int { return len(a) } +func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// timeSeriesSignature returns a hashed label set signature. +// The label slice should not contain duplicate label names; this method sorts the slice by label name before creating +// the signature. +// The algorithm is the same as in Prometheus' labels.StableHash function. +func timeSeriesSignature(labels []cortexpb.LabelAdapter) uint64 { + sort.Sort(ByLabelName(labels)) + + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + for i, v := range labels { + if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB+ do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, v := range labels[i:] { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(seps) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(seps) + } + return h.Sum64() + } + + b = append(b, v.Name...) + b = append(b, seps[0]) + b = append(b, v.Value...) + b = append(b, seps[0]) + } + return xxhash.Sum64(b) +} + +var seps = []byte{'\xff'} + +// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. +// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and +// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. +// If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels. +func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope scope, settings Settings, + ignoreAttrs []string, logOnOverwrite bool, metadata *cortexpb.MetricMetadata, extras ...string, +) ([]cortexpb.LabelAdapter, error) { + resourceAttrs := resource.Attributes() + serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) + instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) + + promotedAttrs := settings.PromoteResourceAttributes.promotedAttributes(resourceAttrs) + + promoteScope := settings.PromoteScopeMetadata && scope.name != "" + scopeLabelCount := 0 + if promoteScope { + // Include name, version and schema URL. + scopeLabelCount = scope.attributes.Len() + 3 + } + + // Calculate the maximum possible number of labels we could return so we can preallocate l. + maxLabelCount := attributes.Len() + len(settings.ExternalLabels) + len(promotedAttrs) + scopeLabelCount + len(extras)/2 + + if haveServiceName { + maxLabelCount++ + } + if haveInstanceID { + maxLabelCount++ + } + if settings.EnableTypeAndUnitLabels { + maxLabelCount += 2 + } + + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + labels := make([]cortexpb.LabelAdapter, 0, maxLabelCount) + // XXX: Should we always drop service namespace/service name/service instance ID from the labels + // (as they get mapped to other Prometheus labels)? + attributes.Range(func(key string, value pcommon.Value) bool { + if !slices.Contains(ignoreAttrs, key) { + labels = append(labels, cortexpb.LabelAdapter{Name: key, Value: value.AsString()}) + } + return true + }) + sort.Stable(ByLabelName(labels)) + + // map ensures no duplicate label names. + l := make(map[string]string, maxLabelCount) + labelNamer := otlptranslator.LabelNamer{UTF8Allowed: settings.AllowUTF8} + for _, label := range labels { + finalKey, err := labelNamer.Build(label.Name) + if err != nil { + return nil, err + } + if existingValue, alreadyExists := l[finalKey]; alreadyExists { + l[finalKey] = existingValue + ";" + label.Value + } else { + l[finalKey] = label.Value + } + } + + for _, lbl := range promotedAttrs { + normalized, err := labelNamer.Build(lbl.Name) + if err != nil { + return nil, err + } + if _, exists := l[normalized]; !exists { + l[normalized] = lbl.Value + } + } + if promoteScope { + var rangeErr error + scope.attributes.Range(func(k string, v pcommon.Value) bool { + name, err := labelNamer.Build("otel_scope_" + k) + if err != nil { + rangeErr = err + return false + } + l[name] = v.AsString() + return true + }) + if rangeErr != nil { + return nil, rangeErr + } + // Scope Name, Version and Schema URL are added after attributes to ensure they are not overwritten by attributes. + l["otel_scope_name"] = scope.name + l["otel_scope_version"] = scope.version + l["otel_scope_schema_url"] = scope.schemaURL + } + + if settings.EnableTypeAndUnitLabels { + unitNamer := otlptranslator.UnitNamer{UTF8Allowed: settings.AllowUTF8} + if metadata.Type != cortexpb.UNKNOWN { + l["__type__"] = strings.ToLower(metadata.Type.String()) + } + if metadata.Unit != "" { + l["__unit__"] = unitNamer.Build(metadata.Unit) + } + } + + // Map service.name + service.namespace to job. + if haveServiceName { + val := serviceName.AsString() + if serviceNamespace, ok := resourceAttrs.Get(conventions.AttributeServiceNamespace); ok { + val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val) + } + l[model.JobLabel] = val + } + // Map service.instance.id to instance. + if haveInstanceID { + l[model.InstanceLabel] = instance.AsString() + } + for key, value := range settings.ExternalLabels { + // External labels have already been sanitized. + if _, alreadyExists := l[key]; alreadyExists { + // Skip external labels if they are overridden by metric attributes. + continue + } + l[key] = value + } + + for i := 0; i < len(extras); i += 2 { + if i+1 >= len(extras) { + break + } + + name := extras[i] + _, found := l[name] + if found && logOnOverwrite { + log.Println("label " + name + " is overwritten. Check if Prometheus reserved labels are used.") + } + // internal labels should be maintained. + if len(name) <= 4 || name[:2] != "__" || name[len(name)-2:] != "__" { + var err error + name, err = labelNamer.Build(name) + if err != nil { + return nil, err + } + } + l[name] = extras[i+1] + } + + labels = labels[:0] + for k, v := range l { + labels = append(labels, cortexpb.LabelAdapter{Name: k, Value: v}) + } + + return labels, nil +} + +func aggregationTemporality(metric pmetric.Metric) (pmetric.AggregationTemporality, bool, error) { + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: + return 0, false, nil + case pmetric.MetricTypeSum: + return metric.Sum().AggregationTemporality(), true, nil + case pmetric.MetricTypeHistogram: + return metric.Histogram().AggregationTemporality(), true, nil + case pmetric.MetricTypeExponentialHistogram: + return metric.ExponentialHistogram().AggregationTemporality(), true, nil + } + return 0, false, fmt.Errorf("could not get aggregation temporality for %s as it has unsupported metric type %s", metric.Name(), metric.Type()) +} + +// addHistogramDataPoints adds OTel histogram data points to the corresponding Prometheus time series +// as classical histogram samples. +// +// Note that we can't convert to native histograms, since these have exponential buckets and don't line up +// with the user defined bucket boundaries of non-exponential OTel histograms. +// However, work is under way to resolve this shortcoming through a feature called native histograms custom buckets: +// https://github.com/prometheus/prometheus/issues/13485. +func (c *CortexConverter) addHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice, + resource pcommon.Resource, settings Settings, metadata *cortexpb.MetricMetadata, scope scope, +) error { + for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + + pt := dataPoints.At(x) + timestamp := convertTimeStamp(pt.Timestamp()) + baseLabels, err := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata) + if err != nil { + return err + } + + // If the sum is unset, it indicates the _sum metric point should be + // omitted + if pt.HasSum() { + // treat sum as a sample in an individual TimeSeries + sum := &cortexpb.Sample{ + Value: pt.Sum(), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + sum.Value = math.Float64frombits(value.StaleNaN) + } + + sumlabels := createLabels(metadata.MetricFamilyName+sumStr, baseLabels) + c.addSample(sum, sumlabels) + } + + // treat count as a sample in an individual TimeSeries + count := &cortexpb.Sample{ + Value: float64(pt.Count()), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + count.Value = math.Float64frombits(value.StaleNaN) + } + + countlabels := createLabels(metadata.MetricFamilyName+countStr, baseLabels) + c.addSample(count, countlabels) + + // cumulative count for conversion to cumulative histogram + var cumulativeCount uint64 + + var bucketBounds []bucketBoundsData + + // process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1 + for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + + bound := pt.ExplicitBounds().At(i) + cumulativeCount += pt.BucketCounts().At(i) + bucket := &cortexpb.Sample{ + Value: float64(cumulativeCount), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + bucket.Value = math.Float64frombits(value.StaleNaN) + } + boundStr := strconv.FormatFloat(bound, 'f', -1, 64) + labels := createLabels(metadata.MetricFamilyName+bucketStr, baseLabels, leStr, boundStr) + ts := c.addSample(bucket, labels) + + bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: bound}) + } + // add le=+Inf bucket + infBucket := &cortexpb.Sample{ + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + infBucket.Value = math.Float64frombits(value.StaleNaN) + } else { + infBucket.Value = float64(pt.Count()) + } + infLabels := createLabels(metadata.MetricFamilyName+bucketStr, baseLabels, leStr, pInfStr) + ts := c.addSample(infBucket, infLabels) + + bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: math.Inf(1)}) + if err := c.addExemplars(ctx, pt, bucketBounds); err != nil { + return err + } + } + + return nil +} + +type exemplarType interface { + pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint | pmetric.NumberDataPoint + Exemplars() pmetric.ExemplarSlice +} + +func getPromExemplars[T exemplarType](ctx context.Context, everyN *everyNTimes, pt T) ([]cortexpb.Exemplar, error) { + promExemplars := make([]cortexpb.Exemplar, 0, pt.Exemplars().Len()) + for i := 0; i < pt.Exemplars().Len(); i++ { + if err := everyN.checkContext(ctx); err != nil { + return nil, err + } + + exemplar := pt.Exemplars().At(i) + exemplarRunes := 0 + + promExemplar := cortexpb.Exemplar{ + TimestampMs: timestamp.FromTime(exemplar.Timestamp().AsTime()), + } + switch exemplar.ValueType() { + case pmetric.ExemplarValueTypeInt: + promExemplar.Value = float64(exemplar.IntValue()) + case pmetric.ExemplarValueTypeDouble: + promExemplar.Value = exemplar.DoubleValue() + default: + return nil, fmt.Errorf("unsupported exemplar value type: %v", exemplar.ValueType()) + } + + if traceID := exemplar.TraceID(); !traceID.IsEmpty() { + val := hex.EncodeToString(traceID[:]) + exemplarRunes += utf8.RuneCountInString(traceIDKey) + utf8.RuneCountInString(val) + promLabel := cortexpb.LabelAdapter{ + Name: traceIDKey, + Value: val, + } + promExemplar.Labels = append(promExemplar.Labels, promLabel) + } + if spanID := exemplar.SpanID(); !spanID.IsEmpty() { + val := hex.EncodeToString(spanID[:]) + exemplarRunes += utf8.RuneCountInString(spanIDKey) + utf8.RuneCountInString(val) + promLabel := cortexpb.LabelAdapter{ + Name: spanIDKey, + Value: val, + } + promExemplar.Labels = append(promExemplar.Labels, promLabel) + } + + attrs := exemplar.FilteredAttributes() + labelsFromAttributes := make([]cortexpb.LabelAdapter, 0, attrs.Len()) + attrs.Range(func(key string, value pcommon.Value) bool { + val := value.AsString() + exemplarRunes += utf8.RuneCountInString(key) + utf8.RuneCountInString(val) + promLabel := cortexpb.LabelAdapter{ + Name: key, + Value: val, + } + + labelsFromAttributes = append(labelsFromAttributes, promLabel) + + return true + }) + if exemplarRunes <= maxExemplarRunes { + // only append filtered attributes if it does not cause exemplar + // labels to exceed the max number of runes + promExemplar.Labels = append(promExemplar.Labels, labelsFromAttributes...) + } + + promExemplars = append(promExemplars, promExemplar) + } + + return promExemplars, nil +} + +// findMinAndMaxTimestamps returns the minimum of minTimestamp and the earliest timestamp in metric and +// the maximum of maxTimestamp and the latest timestamp in metric, respectively. +func findMinAndMaxTimestamps(metric pmetric.Metric, minTimestamp, maxTimestamp pcommon.Timestamp) (pcommon.Timestamp, pcommon.Timestamp) { + // handle individual metric based on type + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) + } + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) + } + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) + } + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) + } + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) + } + } + return minTimestamp, maxTimestamp +} + +func (c *CortexConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, + settings Settings, metadata *cortexpb.MetricMetadata, scope scope, +) error { + for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + + pt := dataPoints.At(x) + timestamp := convertTimeStamp(pt.Timestamp()) + baseLabels, err := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata) + if err != nil { + return err + } + + // treat sum as a sample in an individual TimeSeries + sum := &cortexpb.Sample{ + Value: pt.Sum(), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + sum.Value = math.Float64frombits(value.StaleNaN) + } + // sum and count of the summary should append suffix to baseName + sumlabels := createLabels(metadata.MetricFamilyName+sumStr, baseLabels) + c.addSample(sum, sumlabels) + + // treat count as a sample in an individual TimeSeries + count := &cortexpb.Sample{ + Value: float64(pt.Count()), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + count.Value = math.Float64frombits(value.StaleNaN) + } + countlabels := createLabels(metadata.MetricFamilyName+countStr, baseLabels) + c.addSample(count, countlabels) + + // process each percentile/quantile + for i := 0; i < pt.QuantileValues().Len(); i++ { + qt := pt.QuantileValues().At(i) + quantile := &cortexpb.Sample{ + Value: qt.Value(), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + quantile.Value = math.Float64frombits(value.StaleNaN) + } + percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) + qtlabels := createLabels(metadata.MetricFamilyName, baseLabels, quantileStr, percentileStr) + c.addSample(quantile, qtlabels) + } + } + + return nil +} + +// createLabels returns a copy of baseLabels, adding to it the pair model.MetricNameLabel=name. +// If extras are provided, corresponding label pairs are also added to the returned slice. +// If extras is uneven length, the last (unpaired) extra will be ignored. +func createLabels(name string, baseLabels []cortexpb.LabelAdapter, extras ...string) []cortexpb.LabelAdapter { + extraLabelCount := len(extras) / 2 + labels := make([]cortexpb.LabelAdapter, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name + copy(labels, baseLabels) + + n := len(extras) + n -= n % 2 + for extrasIdx := 0; extrasIdx < n; extrasIdx += 2 { + labels = append(labels, cortexpb.LabelAdapter{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) + } + + labels = append(labels, cortexpb.LabelAdapter{Name: model.MetricNameLabel, Value: name}) + return labels +} + +// getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false. +// Otherwise it creates a new one and returns that, and true. +func (c *CortexConverter) getOrCreateTimeSeries(lbls []cortexpb.LabelAdapter) (*cortexpb.TimeSeries, bool) { + h := timeSeriesSignature(lbls) + ts := c.unique[h] + if ts != nil { + if isSameMetric(ts, lbls) { + // We already have this metric + return ts, false + } + + // Look for a matching conflict + for _, cTS := range c.conflicts[h] { + if isSameMetric(cTS, lbls) { + // We already have this metric + return cTS, false + } + } + + // New conflict + ts = &cortexpb.TimeSeries{ + Labels: lbls, + } + c.conflicts[h] = append(c.conflicts[h], ts) + return ts, true + } + + // This metric is new + ts = &cortexpb.TimeSeries{ + Labels: lbls, + } + c.unique[h] = ts + return ts, true +} + +// addResourceTargetInfo converts the resource to the target info metric. +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, earliestTimestamp, latestTimestamp time.Time, converter *CortexConverter) error { + if settings.DisableTargetInfo { + return nil + } + + attributes := resource.Attributes() + identifyingAttrs := []string{ + conventions.AttributeServiceNamespace, + conventions.AttributeServiceName, + conventions.AttributeServiceInstanceID, + } + nonIdentifyingAttrsCount := attributes.Len() + for _, a := range identifyingAttrs { + _, haveAttr := attributes.Get(a) + if haveAttr { + nonIdentifyingAttrsCount-- + } + } + if nonIdentifyingAttrsCount == 0 { + // If we only have job + instance, then target_info isn't useful, so don't add it. + return nil + } + + name := targetMetricName + if len(settings.Namespace) > 0 { + name = settings.Namespace + "_" + name + } + + settings.PromoteResourceAttributes = nil + if settings.KeepIdentifyingResourceAttributes { + // Do not pass identifying attributes as ignoreAttrs below. + identifyingAttrs = nil + } + labels, err := createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, &cortexpb.MetricMetadata{}, model.MetricNameLabel, name) + if err != nil { + return err + } + haveIdentifier := false + for _, l := range labels { + if l.Name == model.JobLabel || l.Name == model.InstanceLabel { + haveIdentifier = true + break + } + } + + if !haveIdentifier { + // We need at least one identifying label to generate target_info. + return nil + } + + // Generate target_info samples starting at earliestTimestamp and ending at latestTimestamp, + // with a sample at every interval between them. + // Use an interval corresponding to half of the lookback delta, to ensure that target_info samples are found + // for the entirety of the relevant period. + if settings.LookbackDelta == 0 { + settings.LookbackDelta = defaultLookbackDelta + } + interval := settings.LookbackDelta / 2 + ts, _ := converter.getOrCreateTimeSeries(labels) + for timestamp := earliestTimestamp; timestamp.Before(latestTimestamp); timestamp = timestamp.Add(interval) { + ts.Samples = append(ts.Samples, cortexpb.Sample{ + Value: float64(1), + TimestampMs: timestamp.UnixMilli(), + }) + } + ts.Samples = append(ts.Samples, cortexpb.Sample{ + Value: float64(1), + TimestampMs: latestTimestamp.UnixMilli(), + }) + return nil +} + +// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms. +func convertTimeStamp(timestamp pcommon.Timestamp) int64 { + return int64(timestamp) / 1_000_000 +} diff --git a/pkg/util/push/cortexotlpconverter/histograms.go b/pkg/util/push/cortexotlpconverter/histograms.go new file mode 100644 index 00000000000..2e4ffd80663 --- /dev/null +++ b/pkg/util/push/cortexotlpconverter/histograms.go @@ -0,0 +1,354 @@ +package cortexotlpconverter + +import ( + "context" + "fmt" + "math" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/util/annotations" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +const defaultZeroThreshold = 1e-128 + +// addExponentialHistogramDataPoints adds OTel exponential histogram data points to the corresponding time series +// as native histogram samples. +func (c *CortexConverter) addExponentialHistogramDataPoints(ctx context.Context, dataPoints pmetric.ExponentialHistogramDataPointSlice, + resource pcommon.Resource, settings Settings, metadata *cortexpb.MetricMetadata, temporality pmetric.AggregationTemporality, scope scope, +) (annotations.Annotations, error) { + var annots annotations.Annotations + for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return annots, err + } + + pt := dataPoints.At(x) + + histogram, ws, err := exponentialToNativeHistogram(pt, temporality) + annots.Merge(ws) + if err != nil { + return annots, err + } + + lbls, err := createAttributes( + resource, + pt.Attributes(), + scope, + settings, + nil, + true, + metadata, + model.MetricNameLabel, + metadata.MetricFamilyName, + ) + if err != nil { + return nil, err + } + ts, _ := c.getOrCreateTimeSeries(lbls) + ts.Histograms = append(ts.Histograms, histogram) + + exemplars, err := getPromExemplars[pmetric.ExponentialHistogramDataPoint](ctx, &c.everyN, pt) + if err != nil { + return annots, err + } + ts.Exemplars = append(ts.Exemplars, exemplars...) + } + + return annots, nil +} + +// exponentialToNativeHistogram translates an OTel Exponential Histogram data point +// to a Prometheus Native Histogram. +func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint, temporality pmetric.AggregationTemporality) (cortexpb.Histogram, annotations.Annotations, error) { + var annots annotations.Annotations + scale := p.Scale() + if scale < -4 { + return cortexpb.Histogram{}, annots, + fmt.Errorf("cannot convert exponential to native histogram."+ + " Scale must be >= -4, was %d", scale) + } + + var scaleDown int32 + if scale > 8 { + scaleDown = scale - 8 + scale = 8 + } + + pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true) + nSpans, nDeltas := convertBucketsLayout(p.Negative().BucketCounts().AsRaw(), p.Negative().Offset(), scaleDown, true) + + // The counter reset detection must be compatible with Prometheus to + // safely set ResetHint to NO. This is not ensured currently. + // Sending a sample that triggers counter reset but with ResetHint==NO + // would lead to Prometheus panic as it does not double check the hint. + // Thus we're explicitly saying UNKNOWN here, which is always safe. + // TODO: using created time stamp should be accurate, but we + // need to know here if it was used for the detection. + // Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303 + // Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232 + resetHint := cortexpb.Histogram_UNKNOWN + + if temporality == pmetric.AggregationTemporalityDelta { + // If the histogram has delta temporality, set the reset hint to gauge to avoid unnecessary chunk cutting. + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/). + // This might be changed to a different hint name as gauge type might be misleading for samples that should be + // summed over time. + resetHint = cortexpb.Histogram_GAUGE + } + + h := cortexpb.Histogram{ + ResetHint: resetHint, + Schema: scale, + + ZeroCount: &cortexpb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()}, + // TODO use zero_threshold, if set, see + // https://github.com/open-telemetry/opentelemetry-proto/pull/441 + ZeroThreshold: defaultZeroThreshold, + + PositiveSpans: pSpans, + PositiveDeltas: pDeltas, + NegativeSpans: nSpans, + NegativeDeltas: nDeltas, + + TimestampMs: convertTimeStamp(p.Timestamp()), + } + + if p.Flags().NoRecordedValue() { + h.Sum = math.Float64frombits(value.StaleNaN) + h.Count = &cortexpb.Histogram_CountInt{CountInt: value.StaleNaN} + } else { + if p.HasSum() { + h.Sum = p.Sum() + } + h.Count = &cortexpb.Histogram_CountInt{CountInt: p.Count()} + if p.Count() == 0 && h.Sum != 0 { + annots.Add(fmt.Errorf("exponential histogram data point has zero count, but non-zero sum: %f", h.Sum)) + } + } + return h, annots, nil +} + +// convertBucketsLayout translates OTel Explicit or Exponential Histogram dense buckets +// representation to Prometheus Native Histogram sparse bucket representation. This is used +// for translating Exponential Histograms into Native Histograms, and Explicit Histograms +// into Native Histograms with Custom Buckets. +// +// The translation logic is taken from the client_golang `histogram.go#makeBuckets` +// function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go +// +// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one. +// +// When converting from OTel Exponential Histograms to Native Histograms, the +// bucket indexes conversion is adjusted, since OTel exp. histogram bucket +// index 0 corresponds to the range (1, base] while Prometheus bucket index 0 +// to the range (base 1]. +// +// When converting from OTel Explicit Histograms to Native Histograms with Custom Buckets, +// the bucket indexes are not scaled, and the indices are not adjusted by 1. +func convertBucketsLayout(bucketCounts []uint64, offset, scaleDown int32, adjustOffset bool) ([]cortexpb.BucketSpan, []int64) { + if len(bucketCounts) == 0 { + return nil, nil + } + + var ( + spans []cortexpb.BucketSpan + deltas []int64 + count int64 + prevCount int64 + ) + + appendDelta := func(count int64) { + spans[len(spans)-1].Length++ + deltas = append(deltas, count-prevCount) + prevCount = count + } + + // Let the compiler figure out that this is const during this function by + // moving it into a local variable. + numBuckets := len(bucketCounts) + + bucketIdx := offset>>scaleDown + 1 + + initialOffset := offset + if adjustOffset { + initialOffset = initialOffset>>scaleDown + 1 + } + + spans = append(spans, cortexpb.BucketSpan{ + Offset: initialOffset, + Length: 0, + }) + + for i := range numBuckets { + nextBucketIdx := (int32(i)+offset)>>scaleDown + 1 + if bucketIdx == nextBucketIdx { // We have not collected enough buckets to merge yet. + count += int64(bucketCounts[i]) + continue + } + if count == 0 { + count = int64(bucketCounts[i]) + continue + } + + gap := nextBucketIdx - bucketIdx - 1 + if gap > 2 { + // We have to create a new span, because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, cortexpb.BucketSpan{ + Offset: gap, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for range gap { + appendDelta(0) + } + } + appendDelta(count) + count = int64(bucketCounts[i]) + bucketIdx = nextBucketIdx + } + + // Need to use the last item's index. The offset is scaled and adjusted by 1 as described above. + gap := (int32(numBuckets)+offset-1)>>scaleDown + 1 - bucketIdx + if gap > 2 { + // We have to create a new span, because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, cortexpb.BucketSpan{ + Offset: gap, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for range gap { + appendDelta(0) + } + } + appendDelta(count) + + return spans, deltas +} + +func (c *CortexConverter) addCustomBucketsHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice, + resource pcommon.Resource, settings Settings, metadata *cortexpb.MetricMetadata, temporality pmetric.AggregationTemporality, scope scope, +) (annotations.Annotations, error) { + var annots annotations.Annotations + + for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return annots, err + } + + pt := dataPoints.At(x) + + histogram, ws, err := explicitHistogramToCustomBucketsHistogram(pt, temporality) + annots.Merge(ws) + if err != nil { + return annots, err + } + + lbls, err := createAttributes( + resource, + pt.Attributes(), + scope, + settings, + nil, + true, + metadata, + model.MetricNameLabel, + metadata.MetricFamilyName, + ) + if err != nil { + return nil, err + } + + ts, _ := c.getOrCreateTimeSeries(lbls) + ts.Histograms = append(ts.Histograms, histogram) + + exemplars, err := getPromExemplars[pmetric.HistogramDataPoint](ctx, &c.everyN, pt) + if err != nil { + return annots, err + } + ts.Exemplars = append(ts.Exemplars, exemplars...) + } + + return annots, nil +} + +func explicitHistogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint, temporality pmetric.AggregationTemporality) (cortexpb.Histogram, annotations.Annotations, error) { + var annots annotations.Annotations + + buckets := p.BucketCounts().AsRaw() + offset := getBucketOffset(buckets) + bucketCounts := buckets[offset:] + positiveSpans, positiveDeltas := convertBucketsLayout(bucketCounts, int32(offset), 0, false) + + // The counter reset detection must be compatible with Prometheus to + // safely set ResetHint to NO. This is not ensured currently. + // Sending a sample that triggers counter reset but with ResetHint==NO + // would lead to Prometheus panic as it does not double check the hint. + // Thus we're explicitly saying UNKNOWN here, which is always safe. + // TODO: using created time stamp should be accurate, but we + // need to know here if it was used for the detection. + // Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303 + // Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232 + resetHint := cortexpb.Histogram_UNKNOWN + + if temporality == pmetric.AggregationTemporalityDelta { + // If the histogram has delta temporality, set the reset hint to gauge to avoid unnecessary chunk cutting. + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/). + // This might be changed to a different hint name as gauge type might be misleading for samples that should be + // summed over time. + resetHint = cortexpb.Histogram_GAUGE + } + + // TODO(carrieedwards): Add setting to limit maximum bucket count + h := cortexpb.Histogram{ + ResetHint: resetHint, + Schema: histogram.CustomBucketsSchema, + + PositiveSpans: positiveSpans, + PositiveDeltas: positiveDeltas, + // Note: OTel explicit histograms have an implicit +Inf bucket, which has a lower bound + // of the last element in the explicit_bounds array. + // This is similar to the custom_values array in native histograms with custom buckets. + // Because of this shared property, the OTel explicit histogram's explicit_bounds array + // can be mapped directly to the custom_values array. + // See: https://github.com/open-telemetry/opentelemetry-proto/blob/d7770822d70c7bd47a6891fc9faacc66fc4af3d3/opentelemetry/proto/metrics/v1/metrics.proto#L469 + //CustomValues: p.ExplicitBounds().AsRaw(), + // TODO(Sungjin1212): Add CustomValues after adding it to histogram + + TimestampMs: convertTimeStamp(p.Timestamp()), + } + + if p.Flags().NoRecordedValue() { + h.Sum = math.Float64frombits(value.StaleNaN) + h.Count = &cortexpb.Histogram_CountInt{CountInt: value.StaleNaN} + } else { + if p.HasSum() { + h.Sum = p.Sum() + } + h.Count = &cortexpb.Histogram_CountInt{CountInt: p.Count()} + if p.Count() == 0 && h.Sum != 0 { + annots.Add(fmt.Errorf("histogram data point has zero count, but non-zero sum: %f", h.Sum)) + } + } + return h, annots, nil +} + +func getBucketOffset(buckets []uint64) (offset int) { + for offset < len(buckets) && buckets[offset] == 0 { + offset++ + } + return offset +} diff --git a/pkg/util/push/cortexotlpconverter/metrics_to_prw.go b/pkg/util/push/cortexotlpconverter/metrics_to_prw.go new file mode 100644 index 00000000000..6e161ff7f67 --- /dev/null +++ b/pkg/util/push/cortexotlpconverter/metrics_to_prw.go @@ -0,0 +1,373 @@ +package cortexotlpconverter + +import ( + "context" + "errors" + "fmt" + "math" + "sort" + "time" + + "github.com/prometheus/otlptranslator" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/annotations" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +type PromoteResourceAttributes struct { + promoteAll bool + attrs map[string]struct{} +} + +type Settings struct { + Namespace string + ExternalLabels map[string]string + DisableTargetInfo bool + AddMetricSuffixes bool + AllowUTF8 bool + PromoteResourceAttributes *PromoteResourceAttributes + KeepIdentifyingResourceAttributes bool + ConvertHistogramsToNHCB bool + AllowDeltaTemporality bool + // LookbackDelta is the PromQL engine lookback delta. + LookbackDelta time.Duration + // PromoteScopeMetadata controls whether to promote OTel scope metadata to metric labels. + PromoteScopeMetadata bool + EnableTypeAndUnitLabels bool +} + +// CortexConverter converts from OTel write format to Cortex remote write format. +type CortexConverter struct { + unique map[uint64]*cortexpb.TimeSeries + conflicts map[uint64][]*cortexpb.TimeSeries + everyN everyNTimes + metadata []*cortexpb.MetricMetadata +} + +func NewCortexConverter() *CortexConverter { + return &CortexConverter{ + unique: map[uint64]*cortexpb.TimeSeries{}, + conflicts: map[uint64][]*cortexpb.TimeSeries{}, + } +} + +func TranslatorMetricFromOtelMetric(metric pmetric.Metric) otlptranslator.Metric { + m := otlptranslator.Metric{ + Name: metric.Name(), + Unit: metric.Unit(), + Type: otlptranslator.MetricTypeUnknown, + } + switch metric.Type() { + case pmetric.MetricTypeGauge: + m.Type = otlptranslator.MetricTypeGauge + case pmetric.MetricTypeSum: + if metric.Sum().IsMonotonic() { + m.Type = otlptranslator.MetricTypeMonotonicCounter + } else { + m.Type = otlptranslator.MetricTypeNonMonotonicCounter + } + case pmetric.MetricTypeSummary: + m.Type = otlptranslator.MetricTypeSummary + case pmetric.MetricTypeHistogram: + m.Type = otlptranslator.MetricTypeHistogram + case pmetric.MetricTypeExponentialHistogram: + m.Type = otlptranslator.MetricTypeExponentialHistogram + } + return m +} + +type scope struct { + name string + version string + schemaURL string + attributes pcommon.Map +} + +func newScopeFromScopeMetrics(scopeMetrics pmetric.ScopeMetrics) scope { + s := scopeMetrics.Scope() + return scope{ + name: s.Name(), + version: s.Version(), + schemaURL: scopeMetrics.SchemaUrl(), + attributes: s.Attributes(), + } +} + +// FromMetrics converts pmetric.Metrics to Cortex remote write format. +func (c *CortexConverter) FromMetrics(ctx context.Context, md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) { + namer := otlptranslator.MetricNamer{ + Namespace: settings.Namespace, + WithMetricSuffixes: settings.AddMetricSuffixes, + UTF8Allowed: settings.AllowUTF8, + } + c.everyN = everyNTimes{n: 128} + resourceMetricsSlice := md.ResourceMetrics() + + numMetrics := 0 + for i := 0; i < resourceMetricsSlice.Len(); i++ { + scopeMetricsSlice := resourceMetricsSlice.At(i).ScopeMetrics() + for j := 0; j < scopeMetricsSlice.Len(); j++ { + numMetrics += scopeMetricsSlice.At(j).Metrics().Len() + } + } + c.metadata = make([]*cortexpb.MetricMetadata, 0, numMetrics) + + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + resource := resourceMetrics.Resource() + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + // keep track of the earliest and latest timestamp in the ResourceMetrics for + // use with the "target" info metric + earliestTimestamp := pcommon.Timestamp(math.MaxUint64) + latestTimestamp := pcommon.Timestamp(0) + for j := 0; j < scopeMetricsSlice.Len(); j++ { + scopeMetrics := scopeMetricsSlice.At(j) + scope := newScopeFromScopeMetrics(scopeMetrics) + metricSlice := scopeMetrics.Metrics() + + // TODO: decide if instrumentation library information should be exported as labels + for k := 0; k < metricSlice.Len(); k++ { + if err := c.everyN.checkContext(ctx); err != nil { + errs = multierr.Append(errs, err) + return + } + + metric := metricSlice.At(k) + earliestTimestamp, latestTimestamp = findMinAndMaxTimestamps(metric, earliestTimestamp, latestTimestamp) + temporality, hasTemporality, err := aggregationTemporality(metric) + if err != nil { + errs = multierr.Append(errs, err) + continue + } + + if hasTemporality && + // Cumulative temporality is always valid. + // Delta temporality is also valid if AllowDeltaTemporality is true. + // All other temporality values are invalid. + (temporality != pmetric.AggregationTemporalityCumulative && + (!settings.AllowDeltaTemporality || temporality != pmetric.AggregationTemporalityDelta)) { + errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name())) + continue + } + + promName, err := namer.Build(TranslatorMetricFromOtelMetric(metric)) + if err != nil { + errs = multierr.Append(errs, err) + continue + } + metadata := &cortexpb.MetricMetadata{ + Type: otelMetricTypeToPromMetricType(metric), + MetricFamilyName: promName, + Help: metric.Description(), + Unit: metric.Unit(), + } + c.metadata = append(c.metadata, metadata) + + // handle individual metrics based on type + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + if err := c.addGaugeNumberDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + if settings.ConvertHistogramsToNHCB { + ws, err := c.addCustomBucketsHistogramDataPoints( + ctx, dataPoints, resource, settings, metadata, temporality, scope, + ) + annots.Merge(ws) + if err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } + } else { + if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } + } + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + ws, err := c.addExponentialHistogramDataPoints( + ctx, + dataPoints, + resource, + settings, + metadata, + temporality, + scope, + ) + annots.Merge(ws) + if err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + if err := c.addSummaryDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } + default: + errs = multierr.Append(errs, errors.New("unsupported metric type")) + } + } + } + if earliestTimestamp < pcommon.Timestamp(math.MaxUint64) { + // We have at least one metric sample for this resource. + // Generate a corresponding target_info series. + err := addResourceTargetInfo(resource, settings, earliestTimestamp.AsTime(), latestTimestamp.AsTime(), c) + if err != nil { + errs = multierr.Append(errs, err) + } + } + } + + return annots, errs +} + +func isSameMetric(ts *cortexpb.TimeSeries, lbls []cortexpb.LabelAdapter) bool { + if len(ts.Labels) != len(lbls) { + return false + } + for i, l := range ts.Labels { + if l.Name != ts.Labels[i].Name || l.Value != ts.Labels[i].Value { + return false + } + } + return true +} + +// addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value, +// the exemplar is added to the bucket bound's time series, provided that the time series' has samples. +func (c *CortexConverter) addExemplars(ctx context.Context, dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) error { + if len(bucketBounds) == 0 { + return nil + } + + exemplars, err := getPromExemplars(ctx, &c.everyN, dataPoint) + if err != nil { + return err + } + if len(exemplars) == 0 { + return nil + } + + sort.Sort(byBucketBoundsData(bucketBounds)) + for _, exemplar := range exemplars { + for _, bound := range bucketBounds { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + if len(bound.ts.Samples) > 0 && exemplar.Value <= bound.bound { + bound.ts.Exemplars = append(bound.ts.Exemplars, exemplar) + break + } + } + } + + return nil +} + +// addSample finds a TimeSeries that corresponds to lbls, and adds sample to it. +// If there is no corresponding TimeSeries already, it's created. +// The corresponding TimeSeries is returned. +// If either lbls is nil/empty or sample is nil, nothing is done. +func (c *CortexConverter) addSample(sample *cortexpb.Sample, lbls []cortexpb.LabelAdapter) *cortexpb.TimeSeries { + if sample == nil || len(lbls) == 0 { + // This shouldn't happen + return nil + } + + ts, _ := c.getOrCreateTimeSeries(lbls) + ts.Samples = append(ts.Samples, *sample) + return ts +} + +func NewPromoteResourceAttributes(otlpCfg config.OTLPConfig) *PromoteResourceAttributes { + attrs := otlpCfg.PromoteResourceAttributes + if otlpCfg.PromoteAllResourceAttributes { + attrs = otlpCfg.IgnoreResourceAttributes + } + attrsMap := make(map[string]struct{}, len(attrs)) + for _, s := range attrs { + attrsMap[s] = struct{}{} + } + return &PromoteResourceAttributes{ + promoteAll: otlpCfg.PromoteAllResourceAttributes, + attrs: attrsMap, + } +} + +// promotedAttributes returns labels for promoted resourceAttributes. +func (s *PromoteResourceAttributes) promotedAttributes(resourceAttributes pcommon.Map) []cortexpb.LabelAdapter { + if s == nil { + return nil + } + + var promotedAttrs []cortexpb.LabelAdapter + if s.promoteAll { + promotedAttrs = make([]cortexpb.LabelAdapter, 0, resourceAttributes.Len()) + resourceAttributes.Range(func(name string, value pcommon.Value) bool { + if _, exists := s.attrs[name]; !exists { + promotedAttrs = append(promotedAttrs, cortexpb.LabelAdapter{Name: name, Value: value.AsString()}) + } + return true + }) + } else { + promotedAttrs = make([]cortexpb.LabelAdapter, 0, len(s.attrs)) + resourceAttributes.Range(func(name string, value pcommon.Value) bool { + if _, exists := s.attrs[name]; exists { + promotedAttrs = append(promotedAttrs, cortexpb.LabelAdapter{Name: name, Value: value.AsString()}) + } + return true + }) + } + sort.Stable(ByLabelName(promotedAttrs)) + return promotedAttrs +} diff --git a/pkg/util/push/cortexotlpconverter/number_data_points.go b/pkg/util/push/cortexotlpconverter/number_data_points.go new file mode 100644 index 00000000000..547853a5e57 --- /dev/null +++ b/pkg/util/push/cortexotlpconverter/number_data_points.go @@ -0,0 +1,106 @@ +package cortexotlpconverter + +import ( + "context" + "math" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/value" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +func (c *CortexConverter) addGaugeNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, settings Settings, metadata *cortexpb.MetricMetadata, scope scope, +) error { + for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + + pt := dataPoints.At(x) + labels, err := createAttributes( + resource, + pt.Attributes(), + scope, + settings, + nil, + true, + metadata, + model.MetricNameLabel, + metadata.MetricFamilyName, + ) + if err != nil { + return err + } + sample := &cortexpb.Sample{ + // convert ns to ms + TimestampMs: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + + c.addSample(sample, labels) + } + + return nil +} + +func (c *CortexConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, settings Settings, metadata *cortexpb.MetricMetadata, scope scope, +) error { + for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + + pt := dataPoints.At(x) + lbls, err := createAttributes( + resource, + pt.Attributes(), + scope, + settings, + nil, + true, + metadata, + model.MetricNameLabel, + metadata.MetricFamilyName, + ) + if err != nil { + return err + } + sample := &cortexpb.Sample{ + // convert ns to ms + TimestampMs: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + + ts := c.addSample(sample, lbls) + if ts != nil { + exemplars, err := getPromExemplars[pmetric.NumberDataPoint](ctx, &c.everyN, pt) + if err != nil { + return err + } + ts.Exemplars = append(ts.Exemplars, exemplars...) + } + } + + return nil +} diff --git a/pkg/util/push/cortexotlpconverter/otlp_to_openmetrics_metadata.go b/pkg/util/push/cortexotlpconverter/otlp_to_openmetrics_metadata.go new file mode 100644 index 00000000000..50532f42807 --- /dev/null +++ b/pkg/util/push/cortexotlpconverter/otlp_to_openmetrics_metadata.go @@ -0,0 +1,42 @@ +package cortexotlpconverter + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +func otelMetricTypeToPromMetricType(otelMetric pmetric.Metric) cortexpb.MetricMetadata_MetricType { + switch otelMetric.Type() { + case pmetric.MetricTypeGauge: + return cortexpb.GAUGE + case pmetric.MetricTypeSum: + metricType := cortexpb.GAUGE + if otelMetric.Sum().IsMonotonic() { + metricType = cortexpb.COUNTER + } + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + // We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now. + if otelMetric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityDelta { + metricType = cortexpb.UNKNOWN + } + return metricType + case pmetric.MetricTypeHistogram: + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + // We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now. + if otelMetric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta { + return cortexpb.UNKNOWN + } + return cortexpb.HISTOGRAM + case pmetric.MetricTypeSummary: + return cortexpb.SUMMARY + case pmetric.MetricTypeExponentialHistogram: + if otelMetric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta { + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + // We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now. + return cortexpb.UNKNOWN + } + return cortexpb.HISTOGRAM + } + return cortexpb.UNKNOWN +} diff --git a/pkg/util/push/cortexotlpconverter/timeseries.go b/pkg/util/push/cortexotlpconverter/timeseries.go new file mode 100644 index 00000000000..7a24ae07f0c --- /dev/null +++ b/pkg/util/push/cortexotlpconverter/timeseries.go @@ -0,0 +1,30 @@ +package cortexotlpconverter + +import ( + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +// TimeSeries returns a slice of the cortexpb.PreallocTimeseries that were converted from OTel format. +func (c *CortexConverter) TimeSeries() []cortexpb.PreallocTimeseries { + conflicts := 0 + for _, ts := range c.conflicts { + conflicts += len(ts) + } + + allTS := make([]cortexpb.PreallocTimeseries, 0, len(c.unique)+conflicts) + for _, ts := range c.unique { + allTS = append(allTS, cortexpb.PreallocTimeseries{TimeSeries: ts}) + } + for _, cTS := range c.conflicts { + for _, ts := range cTS { + allTS = append(allTS, cortexpb.PreallocTimeseries{TimeSeries: ts}) + } + } + + return allTS +} + +// Metadata returns a slice of the cortexpb.MetricMetadata that were converted from OTel format. +func (c *CortexConverter) Metadata() []*cortexpb.MetricMetadata { + return c.metadata +} diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index cdf1259d122..dbc933783a6 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -11,9 +11,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" "github.com/prometheus/prometheus/util/annotations" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" @@ -26,6 +23,7 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/push/cortexotlpconverter" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -65,28 +63,14 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri SkipLabelNameValidation: false, } - // otlp to prompb TimeSeries - promTsList, promMetadata, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger) - if err != nil && len(promTsList) == 0 { + // otlp to cortex TimeSeries + tsList, metadataList, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger) + if err != nil && len(tsList) == 0 { http.Error(w, err.Error(), http.StatusBadRequest) return } - - // convert prompb to cortexpb TimeSeries - tsList := make([]cortexpb.PreallocTimeseries, 0, len(promTsList)) - for _, v := range promTsList { - tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{ - Labels: makeLabels(v.Labels), - Samples: makeSamples(v.Samples), - Exemplars: makeExemplars(v.Exemplars), - Histograms: makeHistograms(v.Histograms), - }}) - } - - metadata := makeMetadata(promMetadata) - prwReq.Timeseries = tsList - prwReq.Metadata = metadata + prwReq.Metadata = metadataList if _, err := push(ctx, &prwReq); err != nil { resp, ok := httpgrpc.HTTPResponseFromError(err) @@ -104,19 +88,6 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri }) } -func makeMetadata(promMetadata []prompb.MetricMetadata) []*cortexpb.MetricMetadata { - metadata := make([]*cortexpb.MetricMetadata, 0, len(promMetadata)) - for _, m := range promMetadata { - metadata = append(metadata, &cortexpb.MetricMetadata{ - Type: cortexpb.MetricMetadata_MetricType(m.Type), - MetricFamilyName: m.MetricFamilyName, - Help: m.Help, - Unit: m.Unit, - }) - } - return metadata -} - func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) { expectedSize := int(r.ContentLength) if expectedSize > maxSize { @@ -175,9 +146,9 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) ( return decoderFunc(r.Body) } -func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, []prompb.MetricMetadata, error) { - promConverter := prometheusremotewrite.NewPrometheusConverter() - settings := prometheusremotewrite.Settings{ +func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]cortexpb.PreallocTimeseries, []*cortexpb.MetricMetadata, error) { + promConverter := cortexotlpconverter.NewCortexConverter() + settings := cortexotlpconverter.Settings{ AddMetricSuffixes: true, DisableTargetInfo: cfg.DisableTargetInfo, AllowDeltaTemporality: cfg.AllowDeltaTemporality, @@ -190,7 +161,7 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu if cfg.ConvertAllAttributes { annots, err = promConverter.FromMetrics(ctx, convertToMetricsAttributes(pmetrics), settings) } else { - settings.PromoteResourceAttributes = prometheusremotewrite.NewPromoteResourceAttributes(config.OTLPConfig{ + settings.PromoteResourceAttributes = cortexotlpconverter.NewPromoteResourceAttributes(config.OTLPConfig{ PromoteResourceAttributes: overrides.PromoteResourceAttributes(userID), }) annots, err = promConverter.FromMetrics(ctx, pmetrics, settings) @@ -208,45 +179,6 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu return promConverter.TimeSeries(), promConverter.Metadata(), err } -func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter { - builder := labels.NewBuilder(labels.EmptyLabels()) - for _, l := range in { - builder.Set(l.Name, l.Value) - } - return cortexpb.FromLabelsToLabelAdapters(builder.Labels()) -} - -func makeSamples(in []prompb.Sample) []cortexpb.Sample { - out := make([]cortexpb.Sample, 0, len(in)) - for _, s := range in { - out = append(out, cortexpb.Sample{ - Value: s.Value, - TimestampMs: s.Timestamp, - }) - } - return out -} - -func makeExemplars(in []prompb.Exemplar) []cortexpb.Exemplar { - out := make([]cortexpb.Exemplar, 0, len(in)) - for _, e := range in { - out = append(out, cortexpb.Exemplar{ - Labels: makeLabels(e.Labels), - Value: e.Value, - TimestampMs: e.Timestamp, - }) - } - return out -} - -func makeHistograms(in []prompb.Histogram) []cortexpb.Histogram { - out := make([]cortexpb.Histogram, 0, len(in)) - for _, h := range in { - out = append(out, cortexpb.HistogramPromProtoToHistogramProto(h)) - } - return out -} - func convertToMetricsAttributes(md pmetric.Metrics) pmetric.Metrics { cloneMd := pmetric.NewMetrics() md.CopyTo(cloneMd) diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index efcdb40655b..79d61a662c6 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -14,7 +14,6 @@ import ( "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" @@ -39,7 +38,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) { allowDeltaTemporality bool otlpSeries pmetric.Metric expectedLabels labels.Labels - expectedMetadata prompb.MetricMetadata + expectedMetadata *cortexpb.MetricMetadata }{ { description: "[enableTypeAndUnitLabels: true], the '__type__' label should be attached when the type is the gauge", @@ -51,7 +50,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) { "__unit__": "seconds", "test_label": "test_value", }), - expectedMetadata: createPromMetadata("test_seconds", "seconds", prompb.MetricMetadata_GAUGE), + expectedMetadata: createPromMetadata("test_seconds", "seconds", cortexpb.GAUGE), }, { description: "[enableTypeAndUnitLabels: true], the '__type__' label should not be attached when the type is unknown", @@ -63,7 +62,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) { "__unit__": "seconds", "test_label": "test_value", }), - expectedMetadata: createPromMetadata("test_seconds", "seconds", prompb.MetricMetadata_UNKNOWN), + expectedMetadata: createPromMetadata("test_seconds", "seconds", cortexpb.UNKNOWN), }, { description: "[enableTypeAndUnitLabels: false]", @@ -73,7 +72,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) { "__name__": "test_seconds", "test_label": "test_value", }), - expectedMetadata: createPromMetadata("test_seconds", "seconds", prompb.MetricMetadata_GAUGE), + expectedMetadata: createPromMetadata("test_seconds", "seconds", cortexpb.GAUGE), }, } @@ -94,7 +93,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) { promSeries, metadata, err := convertToPromTS(ctx, metrics, cfg, overrides, "user-1", logger) require.NoError(t, err) require.Equal(t, 1, len(promSeries)) - require.Equal(t, prompb.FromLabels(test.expectedLabels, nil), promSeries[0].Labels) + require.Equal(t, test.expectedLabels, cortexpb.FromLabelAdaptersToLabels(promSeries[0].Labels)) require.Equal(t, 1, len(metadata)) require.Equal(t, test.expectedMetadata, metadata[0]) @@ -111,8 +110,8 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { description string allowDeltaTemporality bool otlpSeries []pmetric.Metric - expectedSeries []prompb.TimeSeries - expectedMetadata []prompb.MetricMetadata + expectedSeries []cortexpb.PreallocTimeseries + expectedMetadata []*cortexpb.MetricMetadata expectedErr string }{ { @@ -122,13 +121,13 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { createOtelSum("test_1", "", pmetric.AggregationTemporalityCumulative, ts), createOtelSum("test_2", "", pmetric.AggregationTemporalityCumulative, ts), }, - expectedSeries: []prompb.TimeSeries{ + expectedSeries: []cortexpb.PreallocTimeseries{ createPromFloatSeries("test_1", ts), createPromFloatSeries("test_2", ts), }, - expectedMetadata: []prompb.MetricMetadata{ - createPromMetadata("test_1", "", prompb.MetricMetadata_GAUGE), - createPromMetadata("test_2", "", prompb.MetricMetadata_GAUGE), + expectedMetadata: []*cortexpb.MetricMetadata{ + createPromMetadata("test_1", "", cortexpb.GAUGE), + createPromMetadata("test_2", "", cortexpb.GAUGE), }, }, { @@ -138,8 +137,8 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { createOtelSum("test_1", "", pmetric.AggregationTemporalityDelta, ts), createOtelSum("test_2", "", pmetric.AggregationTemporalityDelta, ts), }, - expectedSeries: []prompb.TimeSeries{}, - expectedMetadata: []prompb.MetricMetadata{}, + expectedSeries: []cortexpb.PreallocTimeseries{}, + expectedMetadata: []*cortexpb.MetricMetadata{}, expectedErr: `invalid temporality and type combination for metric "test_1"; invalid temporality and type combination for metric "test_2"`, }, { @@ -149,13 +148,13 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { createOtelSum("test_1", "", pmetric.AggregationTemporalityDelta, ts), createOtelSum("test_2", "", pmetric.AggregationTemporalityDelta, ts), }, - expectedSeries: []prompb.TimeSeries{ + expectedSeries: []cortexpb.PreallocTimeseries{ createPromFloatSeries("test_1", ts), createPromFloatSeries("test_2", ts), }, - expectedMetadata: []prompb.MetricMetadata{ - createPromMetadata("test_1", "", prompb.MetricMetadata_UNKNOWN), - createPromMetadata("test_2", "", prompb.MetricMetadata_UNKNOWN), + expectedMetadata: []*cortexpb.MetricMetadata{ + createPromMetadata("test_1", "", cortexpb.UNKNOWN), + createPromMetadata("test_2", "", cortexpb.UNKNOWN), }, }, { @@ -165,11 +164,11 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { createOtelSum("test_1", "", pmetric.AggregationTemporalityDelta, ts), createOtelSum("test_2", "", pmetric.AggregationTemporalityCumulative, ts), }, - expectedSeries: []prompb.TimeSeries{ + expectedSeries: []cortexpb.PreallocTimeseries{ createPromFloatSeries("test_2", ts), }, - expectedMetadata: []prompb.MetricMetadata{ - createPromMetadata("test_2", "", prompb.MetricMetadata_GAUGE), + expectedMetadata: []*cortexpb.MetricMetadata{ + createPromMetadata("test_2", "", cortexpb.GAUGE), }, expectedErr: `invalid temporality and type combination for metric "test_1"`, }, @@ -180,13 +179,13 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { createOtelExponentialHistogram("test_1", pmetric.AggregationTemporalityCumulative, ts), createOtelExponentialHistogram("test_2", pmetric.AggregationTemporalityCumulative, ts), }, - expectedSeries: []prompb.TimeSeries{ - createPromNativeHistogramSeries("test_1", prompb.Histogram_UNKNOWN, ts), - createPromNativeHistogramSeries("test_2", prompb.Histogram_UNKNOWN, ts), + expectedSeries: []cortexpb.PreallocTimeseries{ + createPromNativeHistogramSeries("test_1", cortexpb.Histogram_UNKNOWN, ts), + createPromNativeHistogramSeries("test_2", cortexpb.Histogram_UNKNOWN, ts), }, - expectedMetadata: []prompb.MetricMetadata{ - createPromMetadata("test_1", "", prompb.MetricMetadata_HISTOGRAM), - createPromMetadata("test_2", "", prompb.MetricMetadata_HISTOGRAM), + expectedMetadata: []*cortexpb.MetricMetadata{ + createPromMetadata("test_1", "", cortexpb.HISTOGRAM), + createPromMetadata("test_2", "", cortexpb.HISTOGRAM), }, }, { @@ -196,8 +195,8 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { createOtelExponentialHistogram("test_1", pmetric.AggregationTemporalityDelta, ts), createOtelExponentialHistogram("test_2", pmetric.AggregationTemporalityDelta, ts), }, - expectedSeries: []prompb.TimeSeries{}, - expectedMetadata: []prompb.MetricMetadata{}, + expectedSeries: []cortexpb.PreallocTimeseries{}, + expectedMetadata: []*cortexpb.MetricMetadata{}, expectedErr: `invalid temporality and type combination for metric "test_1"; invalid temporality and type combination for metric "test_2"`, }, { @@ -207,13 +206,13 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { createOtelExponentialHistogram("test_1", pmetric.AggregationTemporalityDelta, ts), createOtelExponentialHistogram("test_2", pmetric.AggregationTemporalityDelta, ts), }, - expectedSeries: []prompb.TimeSeries{ - createPromNativeHistogramSeries("test_1", prompb.Histogram_GAUGE, ts), - createPromNativeHistogramSeries("test_2", prompb.Histogram_GAUGE, ts), + expectedSeries: []cortexpb.PreallocTimeseries{ + createPromNativeHistogramSeries("test_1", cortexpb.Histogram_GAUGE, ts), + createPromNativeHistogramSeries("test_2", cortexpb.Histogram_GAUGE, ts), }, - expectedMetadata: []prompb.MetricMetadata{ - createPromMetadata("test_1", "", prompb.MetricMetadata_UNKNOWN), - createPromMetadata("test_2", "", prompb.MetricMetadata_UNKNOWN), + expectedMetadata: []*cortexpb.MetricMetadata{ + createPromMetadata("test_1", "", cortexpb.UNKNOWN), + createPromMetadata("test_2", "", cortexpb.UNKNOWN), }, }, { @@ -223,11 +222,11 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { createOtelExponentialHistogram("test_1", pmetric.AggregationTemporalityDelta, ts), createOtelExponentialHistogram("test_2", pmetric.AggregationTemporalityCumulative, ts), }, - expectedSeries: []prompb.TimeSeries{ - createPromNativeHistogramSeries("test_2", prompb.Histogram_UNKNOWN, ts), + expectedSeries: []cortexpb.PreallocTimeseries{ + createPromNativeHistogramSeries("test_2", cortexpb.Histogram_UNKNOWN, ts), }, - expectedMetadata: []prompb.MetricMetadata{ - createPromMetadata("test_2", "", prompb.MetricMetadata_HISTOGRAM), + expectedMetadata: []*cortexpb.MetricMetadata{ + createPromMetadata("test_2", "", cortexpb.HISTOGRAM), }, expectedErr: `invalid temporality and type combination for metric "test_1"`, }, @@ -259,8 +258,8 @@ func TestOTLP_AllowDeltaTemporality(t *testing.T) { } } -func createPromMetadata(name, unit string, metadataType prompb.MetricMetadata_MetricType) prompb.MetricMetadata { - return prompb.MetricMetadata{ +func createPromMetadata(name, unit string, metadataType cortexpb.MetricMetadata_MetricType) *cortexpb.MetricMetadata { + return &cortexpb.MetricMetadata{ Type: metadataType, Unit: unit, MetricFamilyName: name, @@ -268,7 +267,7 @@ func createPromMetadata(name, unit string, metadataType prompb.MetricMetadata_Me } // copied from: https://github.com/prometheus/prometheus/blob/v3.5.0/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go -func sortTimeSeries(series []prompb.TimeSeries) []prompb.TimeSeries { +func sortTimeSeries(series []cortexpb.PreallocTimeseries) []cortexpb.PreallocTimeseries { for i := range series { sort.Slice(series[i].Labels, func(j, k int) bool { return series[i].Labels[j].Name < series[i].Labels[k].Name @@ -283,16 +282,18 @@ func sortTimeSeries(series []prompb.TimeSeries) []prompb.TimeSeries { } // copied from: https://github.com/prometheus/prometheus/blob/v3.5.0/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go -func createPromFloatSeries(name string, ts time.Time) prompb.TimeSeries { - return prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: name}, - {Name: "test_label", Value: "test_value"}, +func createPromFloatSeries(name string, ts time.Time) cortexpb.PreallocTimeseries { + return cortexpb.PreallocTimeseries{ + TimeSeries: &cortexpb.TimeSeries{ + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: name}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []cortexpb.Sample{{ + Value: 5, + TimestampMs: ts.UnixMilli(), + }}, }, - Samples: []prompb.Sample{{ - Value: 5, - Timestamp: ts.UnixMilli(), - }}, } } @@ -327,21 +328,23 @@ func createOtelExponentialHistogram(name string, temporality pmetric.Aggregation } // copied from: https://github.com/prometheus/prometheus/blob/v3.5.0/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go -func createPromNativeHistogramSeries(name string, hint prompb.Histogram_ResetHint, ts time.Time) prompb.TimeSeries { - return prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: name}, - {Name: "test_label", Value: "test_value"}, - }, - Histograms: []prompb.Histogram{ - { - Count: &prompb.Histogram_CountInt{CountInt: 1}, - Sum: 5, - Schema: 0, - ZeroThreshold: 1e-128, - ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: 0}, - Timestamp: ts.UnixMilli(), - ResetHint: hint, +func createPromNativeHistogramSeries(name string, hint cortexpb.Histogram_ResetHint, ts time.Time) cortexpb.PreallocTimeseries { + return cortexpb.PreallocTimeseries{ + TimeSeries: &cortexpb.TimeSeries{ + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: name}, + {Name: "test_label", Value: "test_value"}, + }, + Histograms: []cortexpb.Histogram{ + { + Count: &cortexpb.Histogram_CountInt{CountInt: 1}, + Sum: 5, + Schema: 0, + ZeroThreshold: 1e-128, + ZeroCount: &cortexpb.Histogram_ZeroCountInt{ZeroCountInt: 0}, + TimestampMs: ts.UnixMilli(), + ResetHint: hint, + }, }, }, } @@ -376,7 +379,7 @@ func TestOTLPConvertToPromTS(t *testing.T) { description string PromoteResourceAttributes []string cfg distributor.OTLPConfig - expectedLabels []prompb.Label + expectedLabels []labels.Label }{ { description: "target_info should be generated and an attribute that exist in promote resource attributes should be converted", @@ -385,7 +388,7 @@ func TestOTLPConvertToPromTS(t *testing.T) { ConvertAllAttributes: false, DisableTargetInfo: false, }, - expectedLabels: []prompb.Label{ + expectedLabels: []labels.Label{ { Name: "__name__", Value: "test_counter_total", @@ -407,7 +410,7 @@ func TestOTLPConvertToPromTS(t *testing.T) { ConvertAllAttributes: false, DisableTargetInfo: true, }, - expectedLabels: []prompb.Label{ + expectedLabels: []labels.Label{ { Name: "__name__", Value: "test_counter_total", @@ -429,7 +432,7 @@ func TestOTLPConvertToPromTS(t *testing.T) { ConvertAllAttributes: false, DisableTargetInfo: true, }, - expectedLabels: []prompb.Label{ + expectedLabels: []labels.Label{ { Name: "__name__", Value: "test_counter_total", @@ -447,7 +450,7 @@ func TestOTLPConvertToPromTS(t *testing.T) { ConvertAllAttributes: true, DisableTargetInfo: true, }, - expectedLabels: []prompb.Label{ + expectedLabels: []labels.Label{ { Name: "__name__", Value: "test_counter_total", @@ -481,7 +484,7 @@ func TestOTLPConvertToPromTS(t *testing.T) { ConvertAllAttributes: true, DisableTargetInfo: true, }, - expectedLabels: []prompb.Label{ + expectedLabels: []labels.Label{ { Name: "__name__", Value: "test_counter_total", @@ -521,7 +524,7 @@ func TestOTLPConvertToPromTS(t *testing.T) { // test metadata conversion require.Equal(t, 1, len(metadata)) - require.Equal(t, prompb.MetricMetadata_MetricType(1), metadata[0].Type) + require.Equal(t, cortexpb.MetricMetadata_MetricType(1), metadata[0].Type) require.Equal(t, "test_counter_total", metadata[0].MetricFamilyName) require.Equal(t, "test-counter-description", metadata[0].Help) @@ -532,7 +535,7 @@ func TestOTLPConvertToPromTS(t *testing.T) { require.Equal(t, 2, len(tsList)) // test_counter_total + target_info } - var counterTs prompb.TimeSeries + var counterTs cortexpb.PreallocTimeseries for _, ts := range tsList { for _, label := range ts.Labels { if label.Name == "__name__" && label.Value == "test_counter_total" { @@ -542,7 +545,7 @@ func TestOTLPConvertToPromTS(t *testing.T) { } } - require.ElementsMatch(t, test.expectedLabels, counterTs.Labels) + require.ElementsMatch(t, test.expectedLabels, cortexpb.FromLabelAdaptersToLabels(counterTs.Labels)) }) } }