From 7c03c9e5d2d1d00f1be8251316243d7e3a418da8 Mon Sep 17 00:00:00 2001 From: Benedikt Bongartz Date: Tue, 10 Mar 2026 13:53:38 +0100 Subject: [PATCH] feat: support metric otlp http ingestion Signed-off-by: Benedikt Bongartz --- api/metrics/v1/http.go | 44 ++++++++ client/spec.yaml | 107 +++++++++++++++++++ main.go | 25 ++++- test/e2e/configs.go | 61 +++++++++++ test/e2e/helpers.go | 2 + test/e2e/metrics_otlp_test.go | 188 ++++++++++++++++++++++++++++++++++ test/e2e/services.go | 75 +++++++++++--- 7 files changed, 486 insertions(+), 16 deletions(-) create mode 100644 test/e2e/metrics_otlp_test.go diff --git a/api/metrics/v1/http.go b/api/metrics/v1/http.go index 04bbfa949..eed714c7c 100644 --- a/api/metrics/v1/http.go +++ b/api/metrics/v1/http.go @@ -34,6 +34,8 @@ const ( RulesRoute = "/api/v1/rules" RulesRawRoute = "/api/v1/rules/raw" + OTLPRoute = "/otlp/v1/metrics" + AlertmanagerAlertsRoute = "/am/api/v2/alerts" AlertmanagerSilencesRoute = "/am/api/v2/silences" ) @@ -167,6 +169,7 @@ func (n nopInstrumentHandler) NewHandler(_ prometheus.Labels, handler http.Handl type Endpoints struct { ReadEndpoint *url.URL WriteEndpoint *url.URL + OTLPWriteEndpoint *url.URL RulesEndpoint *url.URL AlertmanagerEndpoint *url.URL } @@ -410,6 +413,47 @@ func NewHandler(endpoints Endpoints, tlsOptions *tls.UpstreamOptions, opts ...Ha }) } + if endpoints.OTLPWriteEndpoint != nil { + var proxyOTLPWrite http.Handler + { + middlewares := proxy.Middlewares( + proxy.MiddlewareSetUpstream(endpoints.OTLPWriteEndpoint), + proxy.MiddlewareSetPrefixHeader(), + proxy.MiddlewareLogger(c.logger), + proxy.MiddlewareMetrics(c.registry, prometheus.Labels{"proxy": "metricsv1-otlp-write"}), + ) + + t := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: dialTimeout, + }).DialContext, + TLSClientConfig: tlsOptions.NewClientConfig(), + } + + proxyOTLPWrite = &httputil.ReverseProxy{ + Director: middlewares, + ErrorLog: proxy.Logger(c.logger), + Transport: otelhttp.NewTransport(t), + } + } + r.Group(func(r chi.Router) { + r.Use(func(handler http.Handler) http.Handler { + return server.InjectLabelsCtx( + prometheus.Labels{"group": "metricsv1", "handler": "otlp"}, + handler, + ) + }) + r.Use(c.writeMiddlewares...) + r.Use(server.StripTenantPrefix("/api/metrics/v1")) + r.Handle(OTLPRoute, + otelhttp.WithRouteTag( + c.spanRoutePrefix+OTLPRoute, + proxyOTLPWrite, + ), + ) + }) + } + if endpoints.RulesEndpoint != nil { client, err := rules.NewClient(endpoints.RulesEndpoint.String()) if err != nil { diff --git a/client/spec.yaml b/client/spec.yaml index 4ddaef3e0..89c2b51b2 100644 --- a/client/spec.yaml +++ b/client/spec.yaml @@ -390,6 +390,45 @@ paths: description: Delete silence response 5XX: description: Server side error + /api/metrics/v1/{tenant}/otlp/v1/metrics: + parameters: + - $ref: '#/components/parameters/tenant' + post: + tags: + - metrics/otlpv1 + summary: Send metrics in OTLP format + operationId: postOTLPMetrics + description: | + Send metrics in OpenTelemetry Protocol (OTLP) format. + The request is proxied to the configured OTLP HTTP backend. + requestBody: + description: OTLP ExportMetricsServiceRequest + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/OTLPExportMetricsServiceRequest' + application/x-protobuf: + schema: + type: string + format: binary + responses: + '200': + description: Successfully exported metrics + content: + application/json: + schema: + $ref: '#/components/schemas/OTLPExportMetricsServiceResponse' + '400': + description: Bad request + '401': + description: Unauthorized - missing or invalid tenant ID + '403': + description: Forbidden - insufficient permissions + '429': + description: Too many requests - rate limit exceeded + 5XX: + description: Server side error /api/logs/v1/{tenant}/loki/api/v1/push: parameters: - $ref: '#/components/parameters/tenant' @@ -1710,6 +1749,73 @@ components: data: type: object $ref: '#/components/schemas/Rules' + OTLPExportMetricsServiceRequest: + type: object + description: OTLP ExportMetricsServiceRequest containing resource metrics + properties: + resourceMetrics: + type: array + items: + type: object + properties: + resource: + type: object + properties: + attributes: + type: array + items: + $ref: '#/components/schemas/OTLPKeyValue' + scopeMetrics: + type: array + items: + type: object + properties: + scope: + type: object + properties: + name: + type: string + version: + type: string + metrics: + type: array + items: + type: object + properties: + name: + type: string + unit: + type: string + description: + type: string + OTLPExportMetricsServiceResponse: + type: object + description: OTLP ExportMetricsServiceResponse + properties: + partialSuccess: + type: object + properties: + rejectedDataPoints: + type: integer + format: int64 + errorMessage: + type: string + OTLPKeyValue: + type: object + properties: + key: + type: string + value: + type: object + properties: + stringValue: + type: string + intValue: + type: string + doubleValue: + type: number + boolValue: + type: boolean x-tagGroups: - name: metrics tags: @@ -1719,6 +1825,7 @@ x-tagGroups: - metrics/labelvaluesv1 - metrics/queryv1 - metrics/seriesv1 + - metrics/otlpv1 - name: logs tags: - logs/pushv1 diff --git a/main.go b/main.go index 5522ee737..a713794da 100644 --- a/main.go +++ b/main.go @@ -145,6 +145,7 @@ type tlsConfig struct { type metricsConfig struct { readEndpoint *url.URL writeEndpoint *url.URL + otlpWriteEndpoint *url.URL rulesEndpoint *url.URL alertmanagerEndpoint *url.URL upstreamWriteTimeout time.Duration @@ -644,6 +645,7 @@ func main() { eps := metricsv1.Endpoints{ ReadEndpoint: cfg.metrics.readEndpoint, WriteEndpoint: cfg.metrics.writeEndpoint, + OTLPWriteEndpoint: cfg.metrics.otlpWriteEndpoint, RulesEndpoint: cfg.metrics.rulesEndpoint, AlertmanagerEndpoint: cfg.metrics.alertmanagerEndpoint, } @@ -1108,10 +1110,11 @@ func (m *multiStringFlag) String() string { func parseFlags() (config, error) { var ( rawTLSCipherSuites string - rawMetricsReadEndpoint string - rawMetricsWriteEndpoint string - rawMetricsRulesEndpoint string - rawMetricsAlertmanagerEndpoint string + rawMetricsReadEndpoint string + rawMetricsWriteEndpoint string + rawMetricsWriteOTLPHTTPEndpoint string + rawMetricsRulesEndpoint string + rawMetricsAlertmanagerEndpoint string rawLogsReadEndpoint string rawLogsRulesEndpoint string rawLogsTailEndpoint string @@ -1192,6 +1195,8 @@ func parseFlags() (config, error) { "The endpoint against which to send read requests for metrics.") flag.StringVar(&rawMetricsWriteEndpoint, "metrics.write.endpoint", "", "The endpoint against which to make write requests for metrics.") + flag.StringVar(&rawMetricsWriteOTLPHTTPEndpoint, "metrics.write.otlphttp.endpoint", "", + "The endpoint against which to make OTLP HTTP write requests for metrics.") flag.StringVar(&rawMetricsRulesEndpoint, "metrics.rules.endpoint", "", "The endpoint against which to make get requests for listing recording/alerting rules and put requests for creating/updating recording/alerting rules.") flag.StringVar(&rawMetricsAlertmanagerEndpoint, "metrics.alertmanager.endpoint", "", @@ -1320,6 +1325,17 @@ func parseFlags() (config, error) { cfg.metrics.writeEndpoint = metricsWriteEndpoint } + if rawMetricsWriteOTLPHTTPEndpoint != "" { + cfg.metrics.enabled = true + + metricsOTLPWriteEndpoint, err := url.ParseRequestURI(rawMetricsWriteOTLPHTTPEndpoint) + if err != nil { + return cfg, fmt.Errorf("--metrics.write.otlphttp.endpoint %q is invalid: %w", rawMetricsWriteOTLPHTTPEndpoint, err) + } + + cfg.metrics.otlpWriteEndpoint = metricsOTLPWriteEndpoint + } + if rawMetricsRulesEndpoint != "" { cfg.metrics.enabled = true @@ -1642,6 +1658,7 @@ var metricsV1Group = []groupHandler{ {"metricsv1", "labels"}, {"metricsv1", "labelvalues"}, {"metricsv1", "receive"}, + {"metricsv1", "otlp"}, {"metricsv1", "rules"}, {"metricsv1", "rules-raw"}, {"metricsv1", "alerts"}, diff --git a/test/e2e/configs.go b/test/e2e/configs.go index 67a4fdbb5..181863a1a 100644 --- a/test/e2e/configs.go +++ b/test/e2e/configs.go @@ -18,6 +18,7 @@ type testType string const ( metrics testType = "metrics" + metricsOTLP testType = "metricsOTLP" rules testType = "rules" alerts testType = "alerts" logs testType = "logs" @@ -32,6 +33,7 @@ const ( configSharedDir = "config" envMetricsName = "metrics" + envMetricsOTLPName = "metrics-otlp" envRulesAPIName = "rules-api" envAlertmanagerName = "alertmanager-api" envLogsName = "logs-tail" @@ -379,6 +381,65 @@ func createOtelForwardingCollectorConfigYAML( testutil.Ok(t, err) } +// OTel collector config for metrics: receives OTLP metrics and exports via Prometheus remote write. +const otelMetricsConfigTpl = ` +receivers: + otlp: + protocols: + http: + endpoint: "0.0.0.0:4318" + +exporters: + debug: + verbosity: detailed + prometheusremotewrite: + endpoint: "http://%[1]s/api/v1/receive" + tls: + insecure: true + headers: + THANOS-TENANT: "%[2]s" + +service: + telemetry: + metrics: + readers: + - pull: + exporter: + prometheus: + host: 0.0.0.0 + port: 8888 + level: detailed + logs: + level: DEBUG + pipelines: + metrics: + receivers: [otlp] + exporters: [debug,prometheusremotewrite] +` + +func createOtelMetricsCollectorConfigYAML( + t *testing.T, + e e2e.Environment, + thanosReceiveEndpoint string, + tenantID string, +) { + if strings.ContainsRune(otelMetricsConfigTpl, '\t') { + t.Errorf("Tab in the YAML") + } + + yamlContent := []byte(fmt.Sprintf( + otelMetricsConfigTpl, + thanosReceiveEndpoint, + tenantID)) + + err := os.WriteFile( + filepath.Join(e.SharedDir(), configSharedDir, "metrics-collector.yaml"), + yamlContent, + os.FileMode(0644), + ) + testutil.Ok(t, err) +} + const lokiYAMLTpl = `auth_enabled: true server: diff --git a/test/e2e/helpers.go b/test/e2e/helpers.go index a678b69a4..a94630886 100644 --- a/test/e2e/helpers.go +++ b/test/e2e/helpers.go @@ -88,6 +88,8 @@ func getContainerName(t *testing.T, tt testType, serviceName string) string { return envLogsName + "-" + serviceName case metrics: return envMetricsName + "-" + serviceName + case metricsOTLP: + return envMetricsOTLPName + "-" + serviceName case rules: return envRulesAPIName + "-" + serviceName case alerts: diff --git a/test/e2e/metrics_otlp_test.go b/test/e2e/metrics_otlp_test.go new file mode 100644 index 000000000..2477af0ce --- /dev/null +++ b/test/e2e/metrics_otlp_test.go @@ -0,0 +1,188 @@ +//go:build integration + +package e2e + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "testing" + "time" + + "github.com/efficientgo/core/testutil" + "github.com/efficientgo/e2e" + promapi "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" +) + +const otlpMetricsJSON = ` +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "my.service" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": { + "name": "my.library", + "version": "1.0.0" + }, + "metrics": [ + { + "name": "otlp_test_gauge", + "unit": "1", + "gauge": { + "dataPoints": [ + { + "asDouble": 42.0, + "timeUnixNano": "%d", + "attributes": [ + { + "key": "testlabel", + "value": { + "stringValue": "testvalue" + } + } + ] + } + ] + } + } + ] + } + ] + } + ] +}` + +func TestMetricsOTLPWrite(t *testing.T) { + t.Parallel() + + e, err := e2e.New(e2e.WithName(envMetricsOTLPName)) + testutil.Ok(t, err) + t.Cleanup(e.Close) + + prepareConfigsAndCerts(t, metricsOTLP, e) + _, token, rateLimiterAddr := startBaseServices(t, e, metricsOTLP) + readEndpoint, writeEndpoint, readExtEndpoint, otlpHTTPEndpoint := startServicesForMetricsOTLP(t, e) + + api, err := newObservatoriumAPIService( + e, + withMetricsEndpoints("http://"+readEndpoint, "http://"+writeEndpoint), + withOTLPHTTPMetricsEndpoint("http://"+otlpHTTPEndpoint), + withRateLimiter(rateLimiterAddr), + ) + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(api)) + + t.Run("write-otlp-http-then-query", func(t *testing.T) { + now := time.Now() + metricsPayload := fmt.Sprintf(otlpMetricsJSON, now.UnixNano()) + + tlsClientConfig := getTLSClientConfig(t, e) + client := &http.Client{Transport: &http.Transport{TLSClientConfig: tlsClientConfig}} + + otlpURL := fmt.Sprintf( + "https://%s/api/metrics/v1/%s/otlp/v1/metrics", + api.Endpoint("https"), + defaultTenantName, + ) + + request, err := http.NewRequest("POST", otlpURL, bytes.NewBuffer([]byte(metricsPayload))) + testutil.Ok(t, err) + request.Header.Set("Content-Type", "application/json") + request.Header.Set("Authorization", fmt.Sprintf("bearer %s", token)) + + response, err := client.Do(request) + testutil.Ok(t, err) + defer response.Body.Close() + + body, err := io.ReadAll(response.Body) + testutil.Ok(t, err) + + t.Logf("OTLP write response status: %d, body: %s", response.StatusCode, string(body)) + testutil.Equals(t, http.StatusOK, response.StatusCode) + + // Query Thanos to verify the metric arrived. + tr := &http.Transport{TLSClientConfig: getTLSClientConfig(t, e)} + apiClient, err := promapi.NewClient(promapi.Config{ + Address: "https://" + api.Endpoint("https") + "/api/metrics/v1/" + defaultTenantName, + RoundTripper: &tokenRoundTripper{rt: tr, token: token}, + }) + testutil.Ok(t, err) + + queryAPI := v1.NewAPI(apiClient) + + // Retry query to allow time for the metric to propagate through the collector and Thanos. + var queryResult model.Value + testutil.Ok(t, retryUntil(30*time.Second, 2*time.Second, func() error { + var err error + queryResult, _, err = queryAPI.Query(context.Background(), `otlp_test_gauge{testlabel="testvalue"}`, time.Now()) + if err != nil { + return err + } + if queryResult.String() == "" { + return fmt.Errorf("no results yet") + } + return nil + })) + + resultStr := queryResult.String() + t.Logf("Query result: %s", resultStr) + assertResponse(t, resultStr, "otlp_test_gauge") + assertResponse(t, resultStr, `testlabel="testvalue"`) + }) + + t.Run("write-otlp-http-unauthenticated", func(t *testing.T) { + now := time.Now() + metricsPayload := fmt.Sprintf(otlpMetricsJSON, now.UnixNano()) + + tlsClientConfig := getTLSClientConfig(t, e) + client := &http.Client{Transport: &http.Transport{TLSClientConfig: tlsClientConfig}} + + otlpURL := fmt.Sprintf( + "https://%s/api/metrics/v1/%s/otlp/v1/metrics", + api.Endpoint("https"), + defaultTenantName, + ) + + request, err := http.NewRequest("POST", otlpURL, bytes.NewBuffer([]byte(metricsPayload))) + testutil.Ok(t, err) + request.Header.Set("Content-Type", "application/json") + // No Authorization header - should fail. + + response, err := client.Do(request) + testutil.Ok(t, err) + defer response.Body.Close() + + // Without auth, we expect a redirect or error (not 200). + testutil.Assert(t, response.StatusCode != http.StatusOK, + fmt.Sprintf("expected non-200 status for unauthenticated request, got %d", response.StatusCode)) + }) + + _ = readExtEndpoint +} + +func retryUntil(timeout, interval time.Duration, f func() error) error { + deadline := time.Now().Add(timeout) + var lastErr error + for time.Now().Before(deadline) { + lastErr = f() + if lastErr == nil { + return nil + } + time.Sleep(interval) + } + return fmt.Errorf("timed out after %s: %w", timeout, lastErr) +} diff --git a/test/e2e/services.go b/test/e2e/services.go index fb041608a..0e42a29ef 100644 --- a/test/e2e/services.go +++ b/test/e2e/services.go @@ -58,6 +58,46 @@ func startServicesForMetrics(t *testing.T, e e2e.Environment) ( thanosQuery.Endpoint("http") } +func startServicesForMetricsOTLP(t *testing.T, e e2e.Environment) ( + metricsReadEndpoint string, + metricsWriteEndpoint string, + metricsExtReadEndpoint string, + otlpHTTPEndpoint string, +) { + thanosReceive := newThanosReceiveService(e) + thanosQuery := e2edb.NewThanosQuerier( + e, + "thanos-query", + []string{thanosReceive.InternalEndpoint("grpc")}, + e2edb.WithImage(thanosImage), + ) + testutil.Ok(t, e2e.StartAndWaitReady(thanosReceive, thanosQuery)) + + createOtelMetricsCollectorConfigYAML(t, e, thanosReceive.InternalEndpoint("remote_write"), defaultTenantID) + + otel := e.Runnable("otel-metrics-collector"). + WithPorts( + map[string]int{ + "http": 4318, + }). + Init(e2e.StartOptions{ + Image: otelCollectorImage, + Volumes: []string{ + fmt.Sprintf("%s:/conf/metrics-collector.yaml", + filepath.Join(filepath.Join(e.SharedDir(), configSharedDir, "metrics-collector.yaml"))), + }, + Command: e2e.Command{ + Args: []string{"--config=/conf/metrics-collector.yaml"}, + }, + }) + testutil.Ok(t, e2e.StartAndWaitReady(otel)) + + return thanosQuery.InternalEndpoint("http"), + thanosReceive.InternalEndpoint("remote_write"), + thanosQuery.Endpoint("http"), + otel.InternalEndpoint("http") +} + func startServicesForRules(t *testing.T, e e2e.Environment) (metricsRulesEndpoint string) { // Create S3 replacement for rules backend const bucket = "obs-rules-test" @@ -363,18 +403,19 @@ func newProbesService(e e2e.Environment) *e2emon.InstrumentedRunnable { } type apiOptions struct { - logsEndpoint string - metricsReadEndpoint string - metricsWriteEndpoint string - metricsRulesEndpoint string - alertmanagerEndpoint string - ratelimiterAddr string - probesEndpoint string - tracesWriteOTLPGRPCEndpoint string - tracesWriteOTLPHTTPEndpoint string - gRPCListenEndpoint string - jaegerQueryEndpoint string - tempoEndpoint string + logsEndpoint string + metricsReadEndpoint string + metricsWriteEndpoint string + metricsWriteOTLPHTTPEndpoint string + metricsRulesEndpoint string + alertmanagerEndpoint string + ratelimiterAddr string + probesEndpoint string + tracesWriteOTLPGRPCEndpoint string + tracesWriteOTLPHTTPEndpoint string + gRPCListenEndpoint string + jaegerQueryEndpoint string + tempoEndpoint string // "experimental.traces.read.endpoint-template" value. tracesExperimentalTemplateReadEndpoint string @@ -401,6 +442,12 @@ func withMetricsEndpoints(readEndpoint string, writeEndpoint string) apiOption { } } +func withOTLPHTTPMetricsEndpoint(endpoint string) apiOption { + return func(o *apiOptions) { + o.metricsWriteOTLPHTTPEndpoint = endpoint + } +} + func withOTLPGRPCTraceEndpoint(exportEndpoint string) apiOption { return func(o *apiOptions) { o.tracesWriteOTLPGRPCEndpoint = exportEndpoint @@ -509,6 +556,10 @@ func newObservatoriumAPIService( args = append(args, "--probes.endpoint="+opts.probesEndpoint) } + if opts.metricsWriteOTLPHTTPEndpoint != "" { + args = append(args, "--metrics.write.otlphttp.endpoint="+opts.metricsWriteOTLPHTTPEndpoint) + } + if opts.tracesWriteOTLPGRPCEndpoint != "" { args = append(args, "--traces.write.otlpgrpc.endpoint="+opts.tracesWriteOTLPGRPCEndpoint) }