diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f24da38962..ab4a3157f57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,11 @@ ## master / unreleased -## 1.19.0 in progress +## 1.19.1 2025-09-20 + +* [BUGFIX] Frontend: Fix remote read snappy input due to request string logging when query stats enabled. #7025 + +## 1.19.0 2025-02-27 * [CHANGE] Deprecate `-blocks-storage.tsdb.wal-compression-enabled` flag (use `blocks-storage.tsdb.wal-compression-type` instead). #6529 * [CHANGE] OTLP: Change OTLP handler to be consistent with the Prometheus OTLP handler. #6272 diff --git a/VERSION b/VERSION index 815d5ca06d5..66e2ae6c25c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.19.0 +1.19.1 diff --git a/docs/getting-started/.env b/docs/getting-started/.env index d2f816e1ac5..07d9b97eae6 100644 --- a/docs/getting-started/.env +++ b/docs/getting-started/.env @@ -1,4 +1,4 @@ -CORTEX_VERSION=v1.19.0 +CORTEX_VERSION=v1.19.1 GRAFANA_VERSION=10.4.2 PROMETHEUS_VERSION=v2.51.2 SEAWEEDFS_VERSION=3.67 diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index c62b520f820..a707499cdf4 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -37,7 +37,6 @@ import ( type queryFrontendTestConfig struct { testMissingMetricName bool querySchedulerEnabled bool - queryStatsEnabled bool remoteReadEnabled bool testSubQueryStepSize bool setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) @@ -60,7 +59,6 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) { func TestQueryFrontendWithBlocksStorageViaFlagsAndQueryStatsEnabled(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { flags = BlocksStorageFlags() @@ -91,7 +89,6 @@ func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQuerySchedulerAndQueryStat runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: true, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { flags = BlocksStorageFlags() @@ -167,7 +164,6 @@ func TestQueryFrontendWithVerticalSharding(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: false, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) @@ -187,7 +183,6 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: true, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) @@ -207,7 +202,6 @@ func TestQueryFrontendProtobufCodec(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: true, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) @@ -273,7 +267,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { "-querier.split-queries-by-interval": "24h", "-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), - "-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled), + "-frontend.query-stats-enabled": "true", // Always enable query stats to capture regressions }) // Start the query-scheduler if enabled. @@ -361,7 +355,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { } // No need to repeat the test on Server-Timing header for each user. - if userID == 0 && cfg.queryStatsEnabled { + if userID == 0 { res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now(), map[string]string{}) require.NoError(t, err) require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0]) @@ -412,15 +406,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { wg.Wait() - extra := float64(2) + extra := float64(3) // Always include query stats test if cfg.testMissingMetricName { extra++ } - if cfg.queryStatsEnabled { - extra++ - } - if cfg.remoteReadEnabled { extra++ } @@ -437,15 +427,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount)) require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount)) - // Ensure query stats metrics are tracked only when enabled. - if cfg.queryStatsEnabled { - require.NoError(t, queryFrontend.WaitSumMetricsWithOptions( - e2e.Greater(0), - []string{"cortex_query_seconds_total"}, - e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")))) - } else { - require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total")) - } + // Ensure query stats metrics are always tracked to capture regressions. + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Greater(0), + []string{"cortex_query_seconds_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")))) // Ensure no service-specific metrics prefix is used by the wrong service. assertServiceMetricsPrefixes(t, Distributor, distributor) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index b703778ca0a..a00a57f0b14 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -224,7 +224,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // We parse form here so that we can use buf as body, in order to // prevent https://github.com/cortexproject/cortex/issues/5201. // Exclude remote read here as we don't have to buffer its body. - if !strings.Contains(r.URL.Path, "api/v1/read") { + isRemoteRead := strings.Contains(r.URL.Path, "api/v1/read") + if !isRemoteRead { if err := r.ParseForm(); err != nil { statusCode := http.StatusBadRequest if util.IsRequestBodyTooLarge(err) { @@ -240,8 +241,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = io.NopCloser(&buf) } - // Log request - if f.cfg.QueryStatsEnabled { + // Log request if the request is not remote read. + // We need to parse remote read proto to be properly log it so skip it. + if f.cfg.QueryStatsEnabled && !isRemoteRead { queryString = f.parseRequestQueryString(r, buf) f.logQueryRequest(r, queryString) } diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 92e0b59fd48..0bba23e9a7b 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -608,3 +608,41 @@ func Test_TenantFederation_MaxTenant(t *testing.T) { }) } } + +func TestHandler_RemoteReadRequest_DoesNotParseQueryString(t *testing.T) { + // Create a mock round tripper that captures the request + var capturedRequest *http.Request + roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + capturedRequest = req + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("{}")), + }, nil + }) + + // Use a larger MaxBodySize to avoid the "request body too large" error + handler := NewHandler(HandlerConfig{QueryStatsEnabled: true, MaxBodySize: 10 * 1024 * 1024}, tenantfederation.Config{}, roundTripper, log.NewNopLogger(), nil) + handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler) + + // Create a remote read request with a body that would be corrupted by parseRequestQueryString + originalBody := "snappy-compressed-data" + req := httptest.NewRequest("POST", "http://fake/api/v1/read", strings.NewReader(originalBody)) + req.Header.Set("X-Scope-OrgId", "user-1") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Encoding", "snappy") + + resp := httptest.NewRecorder() + handlerWithAuth.ServeHTTP(resp, req) + + // Verify the request was successful + require.Equal(t, http.StatusOK, resp.Code) + + // Verify that the original request body was preserved and not corrupted + require.NotNil(t, capturedRequest) + bodyBytes, err := io.ReadAll(capturedRequest.Body) + require.NoError(t, err) + require.Equal(t, originalBody, string(bodyBytes)) + + // Verify that the request body is still readable (not replaced with empty buffer) + require.NotEmpty(t, string(bodyBytes)) +}