Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

## master / unreleased

## 1.19.0 in progress
## 1.19.1 2025-09-20

* [BUGFIX] Frontend: Fix remote read snappy input due to request string logging when query stats enabled. #7025

## 1.19.0 2025-02-27

* [CHANGE] Deprecate `-blocks-storage.tsdb.wal-compression-enabled` flag (use `blocks-storage.tsdb.wal-compression-type` instead). #6529
* [CHANGE] OTLP: Change OTLP handler to be consistent with the Prometheus OTLP handler. #6272
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.19.0
1.19.1
2 changes: 1 addition & 1 deletion docs/getting-started/.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CORTEX_VERSION=v1.19.0
CORTEX_VERSION=v1.19.1
GRAFANA_VERSION=10.4.2
PROMETHEUS_VERSION=v2.51.2
SEAWEEDFS_VERSION=3.67
30 changes: 8 additions & 22 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
type queryFrontendTestConfig struct {
testMissingMetricName bool
querySchedulerEnabled bool
queryStatsEnabled bool
remoteReadEnabled bool
testSubQueryStepSize bool
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
Expand All @@ -60,7 +59,6 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) {
func TestQueryFrontendWithBlocksStorageViaFlagsAndQueryStatsEnabled(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
queryStatsEnabled: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
flags = BlocksStorageFlags()

Expand Down Expand Up @@ -91,7 +89,6 @@ func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQuerySchedulerAndQueryStat
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
querySchedulerEnabled: true,
queryStatsEnabled: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
flags = BlocksStorageFlags()

Expand Down Expand Up @@ -167,7 +164,6 @@ func TestQueryFrontendWithVerticalSharding(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
querySchedulerEnabled: false,
queryStatsEnabled: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))

Expand All @@ -187,7 +183,6 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
querySchedulerEnabled: true,
queryStatsEnabled: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))

Expand All @@ -207,7 +202,6 @@ func TestQueryFrontendProtobufCodec(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
querySchedulerEnabled: true,
queryStatsEnabled: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))

Expand Down Expand Up @@ -273,7 +267,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
"-querier.split-queries-by-interval": "24h",
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled),
"-frontend.query-stats-enabled": "true", // Always enable query stats to capture regressions
})

// Start the query-scheduler if enabled.
Expand Down Expand Up @@ -361,7 +355,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
}

// No need to repeat the test on Server-Timing header for each user.
if userID == 0 && cfg.queryStatsEnabled {
if userID == 0 {
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now(), map[string]string{})
require.NoError(t, err)
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0])
Expand Down Expand Up @@ -412,15 +406,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {

wg.Wait()

extra := float64(2)
extra := float64(3) // Always include query stats test
if cfg.testMissingMetricName {
extra++
}

if cfg.queryStatsEnabled {
extra++
}

if cfg.remoteReadEnabled {
extra++
}
Expand All @@ -437,15 +427,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))

// Ensure query stats metrics are tracked only when enabled.
if cfg.queryStatsEnabled {
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(
e2e.Greater(0),
[]string{"cortex_query_seconds_total"},
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"))))
} else {
require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total"))
}
// Ensure query stats metrics are always tracked to capture regressions.
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(
e2e.Greater(0),
[]string{"cortex_query_seconds_total"},
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"))))

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
Expand Down
8 changes: 5 additions & 3 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// We parse form here so that we can use buf as body, in order to
// prevent https://github.com/cortexproject/cortex/issues/5201.
// Exclude remote read here as we don't have to buffer its body.
if !strings.Contains(r.URL.Path, "api/v1/read") {
isRemoteRead := strings.Contains(r.URL.Path, "api/v1/read")
if !isRemoteRead {
if err := r.ParseForm(); err != nil {
statusCode := http.StatusBadRequest
if util.IsRequestBodyTooLarge(err) {
Expand All @@ -240,8 +241,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = io.NopCloser(&buf)
}

// Log request
if f.cfg.QueryStatsEnabled {
// Log request if the request is not remote read.
// We need to parse remote read proto to be properly log it so skip it.
if f.cfg.QueryStatsEnabled && !isRemoteRead {
queryString = f.parseRequestQueryString(r, buf)
f.logQueryRequest(r, queryString)
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,3 +608,41 @@ func Test_TenantFederation_MaxTenant(t *testing.T) {
})
}
}

func TestHandler_RemoteReadRequest_DoesNotParseQueryString(t *testing.T) {
// Create a mock round tripper that captures the request
var capturedRequest *http.Request
roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
capturedRequest = req
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("{}")),
}, nil
})

// Use a larger MaxBodySize to avoid the "request body too large" error
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true, MaxBodySize: 10 * 1024 * 1024}, tenantfederation.Config{}, roundTripper, log.NewNopLogger(), nil)
handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler)

// Create a remote read request with a body that would be corrupted by parseRequestQueryString
originalBody := "snappy-compressed-data"
req := httptest.NewRequest("POST", "http://fake/api/v1/read", strings.NewReader(originalBody))
req.Header.Set("X-Scope-OrgId", "user-1")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy")

resp := httptest.NewRecorder()
handlerWithAuth.ServeHTTP(resp, req)

// Verify the request was successful
require.Equal(t, http.StatusOK, resp.Code)

// Verify that the original request body was preserved and not corrupted
require.NotNil(t, capturedRequest)
bodyBytes, err := io.ReadAll(capturedRequest.Body)
require.NoError(t, err)
require.Equal(t, originalBody, string(bodyBytes))

// Verify that the request body is still readable (not replaced with empty buffer)
require.NotEmpty(t, string(bodyBytes))
}
Loading