Skip to content

Commit 3db952d

Browse files
committed
backport remote read fix to release 1.19
Signed-off-by: yeya24 <benye@amazon.com>
1 parent babe421 commit 3db952d

File tree

5 files changed

+57
-27
lines changed

5 files changed

+57
-27
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
## master / unreleased
44

5-
## 1.19.0 in progress
5+
## 1.19.1 2025-09-20
6+
7+
* [BUGFIX] Frontend: Fix remote read snappy input due to request string logging when query stats enabled. #7025
8+
9+
## 1.19.0 2025-02-27
610

711
* [CHANGE] Deprecate `-blocks-storage.tsdb.wal-compression-enabled` flag (use `blocks-storage.tsdb.wal-compression-type` instead). #6529
812
* [CHANGE] OTLP: Change OTLP handler to be consistent with the Prometheus OTLP handler. #6272

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.19.0
1+
1.19.1

integration/query_frontend_test.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
type queryFrontendTestConfig struct {
3838
testMissingMetricName bool
3939
querySchedulerEnabled bool
40-
queryStatsEnabled bool
4140
remoteReadEnabled bool
4241
testSubQueryStepSize bool
4342
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
@@ -60,7 +59,6 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) {
6059
func TestQueryFrontendWithBlocksStorageViaFlagsAndQueryStatsEnabled(t *testing.T) {
6160
runQueryFrontendTest(t, queryFrontendTestConfig{
6261
testMissingMetricName: false,
63-
queryStatsEnabled: true,
6462
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
6563
flags = BlocksStorageFlags()
6664

@@ -91,7 +89,6 @@ func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQuerySchedulerAndQueryStat
9189
runQueryFrontendTest(t, queryFrontendTestConfig{
9290
testMissingMetricName: false,
9391
querySchedulerEnabled: true,
94-
queryStatsEnabled: true,
9592
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
9693
flags = BlocksStorageFlags()
9794

@@ -167,7 +164,6 @@ func TestQueryFrontendWithVerticalSharding(t *testing.T) {
167164
runQueryFrontendTest(t, queryFrontendTestConfig{
168165
testMissingMetricName: false,
169166
querySchedulerEnabled: false,
170-
queryStatsEnabled: true,
171167
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
172168
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
173169

@@ -187,7 +183,6 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) {
187183
runQueryFrontendTest(t, queryFrontendTestConfig{
188184
testMissingMetricName: false,
189185
querySchedulerEnabled: true,
190-
queryStatsEnabled: true,
191186
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
192187
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
193188

@@ -207,7 +202,6 @@ func TestQueryFrontendProtobufCodec(t *testing.T) {
207202
runQueryFrontendTest(t, queryFrontendTestConfig{
208203
testMissingMetricName: false,
209204
querySchedulerEnabled: true,
210-
queryStatsEnabled: true,
211205
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
212206
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
213207

@@ -273,7 +267,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
273267
"-querier.split-queries-by-interval": "24h",
274268
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
275269
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
276-
"-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled),
270+
"-frontend.query-stats-enabled": "true", // Always enable query stats to capture regressions
277271
})
278272

279273
// Start the query-scheduler if enabled.
@@ -361,7 +355,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
361355
}
362356

363357
// No need to repeat the test on Server-Timing header for each user.
364-
if userID == 0 && cfg.queryStatsEnabled {
358+
if userID == 0 {
365359
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now(), map[string]string{})
366360
require.NoError(t, err)
367361
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0])
@@ -412,15 +406,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
412406

413407
wg.Wait()
414408

415-
extra := float64(2)
409+
extra := float64(3) // Always include query stats test
416410
if cfg.testMissingMetricName {
417411
extra++
418412
}
419413

420-
if cfg.queryStatsEnabled {
421-
extra++
422-
}
423-
424414
if cfg.remoteReadEnabled {
425415
extra++
426416
}
@@ -437,15 +427,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
437427
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
438428
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))
439429

440-
// Ensure query stats metrics are tracked only when enabled.
441-
if cfg.queryStatsEnabled {
442-
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(
443-
e2e.Greater(0),
444-
[]string{"cortex_query_seconds_total"},
445-
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"))))
446-
} else {
447-
require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total"))
448-
}
430+
// Ensure query stats metrics are always tracked to capture regressions.
431+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(
432+
e2e.Greater(0),
433+
[]string{"cortex_query_seconds_total"},
434+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"))))
449435

450436
// Ensure no service-specific metrics prefix is used by the wrong service.
451437
assertServiceMetricsPrefixes(t, Distributor, distributor)

pkg/frontend/transport/handler.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
224224
// We parse form here so that we can use buf as body, in order to
225225
// prevent https://github.com/cortexproject/cortex/issues/5201.
226226
// Exclude remote read here as we don't have to buffer its body.
227-
if !strings.Contains(r.URL.Path, "api/v1/read") {
227+
isRemoteRead := strings.Contains(r.URL.Path, "api/v1/read")
228+
if !isRemoteRead {
228229
if err := r.ParseForm(); err != nil {
229230
statusCode := http.StatusBadRequest
230231
if util.IsRequestBodyTooLarge(err) {
@@ -240,8 +241,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
240241
r.Body = io.NopCloser(&buf)
241242
}
242243

243-
// Log request
244-
if f.cfg.QueryStatsEnabled {
244+
// Log request if the request is not remote read.
245+
// We need to parse remote read proto to be properly log it so skip it.
246+
if f.cfg.QueryStatsEnabled && !isRemoteRead {
245247
queryString = f.parseRequestQueryString(r, buf)
246248
f.logQueryRequest(r, queryString)
247249
}

pkg/frontend/transport/handler_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,3 +608,41 @@ func Test_TenantFederation_MaxTenant(t *testing.T) {
608608
})
609609
}
610610
}
611+
612+
func TestHandler_RemoteReadRequest_DoesNotParseQueryString(t *testing.T) {
613+
// Create a mock round tripper that captures the request
614+
var capturedRequest *http.Request
615+
roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
616+
capturedRequest = req
617+
return &http.Response{
618+
StatusCode: http.StatusOK,
619+
Body: io.NopCloser(strings.NewReader("{}")),
620+
}, nil
621+
})
622+
623+
// Use a larger MaxBodySize to avoid the "request body too large" error
624+
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true, MaxBodySize: 10 * 1024 * 1024}, tenantfederation.Config{}, roundTripper, log.NewNopLogger(), nil)
625+
handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler)
626+
627+
// Create a remote read request with a body that would be corrupted by parseRequestQueryString
628+
originalBody := "snappy-compressed-data"
629+
req := httptest.NewRequest("POST", "http://fake/api/v1/read", strings.NewReader(originalBody))
630+
req.Header.Set("X-Scope-OrgId", "user-1")
631+
req.Header.Set("Content-Type", "application/x-protobuf")
632+
req.Header.Set("Content-Encoding", "snappy")
633+
634+
resp := httptest.NewRecorder()
635+
handlerWithAuth.ServeHTTP(resp, req)
636+
637+
// Verify the request was successful
638+
require.Equal(t, http.StatusOK, resp.Code)
639+
640+
// Verify that the original request body was preserved and not corrupted
641+
require.NotNil(t, capturedRequest)
642+
bodyBytes, err := io.ReadAll(capturedRequest.Body)
643+
require.NoError(t, err)
644+
require.Equal(t, originalBody, string(bodyBytes))
645+
646+
// Verify that the request body is still readable (not replaced with empty buffer)
647+
require.NotEmpty(t, string(bodyBytes))
648+
}

0 commit comments

Comments
 (0)