diff --git a/docs/api/_index.md b/docs/api/_index.md index 73462ec2585..356a5e82313 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -67,6 +67,9 @@ For the sake of clarity, in this document we have grouped API endpoints by servi | [Delete Alertmanager configuration](#delete-alertmanager-configuration) | Alertmanager || `DELETE /api/v1/alerts` | | [Tenant delete request](#tenant-delete-request) | Purger || `POST /purger/delete_tenant` | | [Tenant delete status](#tenant-delete-status) | Purger || `GET /purger/delete_tenant_status` | +| [Get user overrides](#get-user-overrides) | Overrides || `GET /api/v1/user-overrides` | +| [Set user overrides](#set-user-overrides) | Overrides || `POST /api/v1/user-overrides` | +| [Delete user overrides](#delete-user-overrides) | Overrides || `DELETE /api/v1/user-overrides` | | [Store-gateway ring status](#store-gateway-ring-status) | Store-gateway || `GET /store-gateway/ring` | | [Compactor ring status](#compactor-ring-status) | Compactor || `GET /compactor/ring` | | [Get rule files](#get-rule-files) | Configs API (deprecated) || `GET /api/prom/configs/rules` | @@ -888,6 +891,64 @@ Returns status of tenant deletion. Output format to be defined. Experimental. _Requires [authentication](#authentication)._ +## Overrides + +The Overrides service provides an API for managing user overrides. + +### Get user overrides + +``` +GET /api/v1/user-overrides +``` + +Get the current overrides for the authenticated tenant. Returns the overrides in JSON format. + +_Requires [authentication](#authentication)._ + +### Set user overrides + +``` +POST /api/v1/user-overrides +``` + +Set or update overrides for the authenticated tenant. The request body should contain a JSON object with the override values. + +_Requires [authentication](#authentication)._ + +### Delete user overrides + +``` +DELETE /api/v1/user-overrides +``` + +Delete all overrides for the authenticated tenant. This will revert the tenant to using default values. + +_Requires [authentication](#authentication)._ + +#### Example request body for PUT + +```json +{ + "ingestion_rate": 50000, + "max_global_series_per_user": 1000000, + "ruler_max_rules_per_rule_group": 100 +} +``` + +#### Supported limits + +The following limits can be modified via the API: +- `max_global_series_per_user` +- `max_global_series_per_metric` +- `ingestion_rate` +- `ingestion_burst_size` +- `ruler_max_rules_per_rule_group` +- `ruler_max_rule_groups_per_tenant` + +#### Hard limits + +Overrides are validated against hard limits defined in the runtime configuration file. If a requested override exceeds the hard limit for the tenant, the request will be rejected with a 400 status code. + ## Store-gateway ### Store-gateway ring status diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 8825b19e2a9..b456ce430e2 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -35,6 +35,8 @@ Cortex is an actively developed project and we want to encourage the introductio Currently experimental features are: +- Overrides API + - Runtime configuration API for managing tenant limits - Ruler - Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address`). - When `-ruler.frontend-address` is specified, the response format can be specified (via `-ruler.query-response-format`). diff --git a/docs/proposals/user-overrides-api.md b/docs/proposals/user-overrides-api.md index 3e89d45ddc2..cff4c098c99 100644 --- a/docs/proposals/user-overrides-api.md +++ b/docs/proposals/user-overrides-api.md @@ -40,7 +40,7 @@ Response format: } ``` -#### 2. PUT /api/v1/user-overrides +#### 2. POST /api/v1/user-overrides Updates overrides for a specific tenant. The request body should contain only the overrides that need to be updated. Request body: diff --git a/integration/overrides_test.go b/integration/overrides_test.go new file mode 100644 index 00000000000..1b2c2239660 --- /dev/null +++ b/integration/overrides_test.go @@ -0,0 +1,376 @@ +//go:build integration +// +build integration + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore/providers/s3" + "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestOverridesAPIWithRunningCortex(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + minio := e2edb.NewMinio(9000, "cortex") + require.NoError(t, s.StartAndWaitReady(minio)) + + runtimeConfig := map[string]interface{}{ + "overrides": map[string]interface{}{ + "user1": map[string]interface{}{ + "ingestion_rate": 5000, + }, + }, + "api_allowed_limits": []string{ + "ingestion_rate", + "max_global_series_per_user", + "max_global_series_per_metric", + "ingestion_burst_size", + "ruler_max_rules_per_rule_group", + "ruler_max_rule_groups_per_tenant", + }, + } + runtimeConfigData, err := yaml.Marshal(runtimeConfig) + require.NoError(t, err) + + s3Client, err := s3.NewBucketWithConfig(nil, s3.Config{ + Endpoint: minio.HTTPEndpoint(), + Insecure: true, + Bucket: "cortex", + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + }, "overrides-test", nil) + require.NoError(t, err) + + require.NoError(t, s3Client.Upload(context.Background(), "runtime.yaml", bytes.NewReader(runtimeConfigData))) + + flags := map[string]string{ + "-target": "overrides", + + "-runtime-config.file": "runtime.yaml", + "-runtime-config.backend": "s3", + "-runtime-config.s3.access-key-id": e2edb.MinioAccessKey, + "-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey, + "-runtime-config.s3.bucket-name": "cortex", + "-runtime-config.s3.endpoint": minio.NetworkHTTPEndpoint(), + "-runtime-config.s3.insecure": "true", + } + + cortexSvc := e2ecortex.NewSingleBinary("cortex-overrides", flags, "") + require.NoError(t, s.StartAndWaitReady(cortexSvc)) + + t.Run("GET overrides for existing user", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var overrides map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&overrides) + require.NoError(t, err) + + assert.Equal(t, float64(5000), overrides["ingestion_rate"]) + }) + + t.Run("GET overrides for non-existing user", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user2") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("POST overrides for new user", func(t *testing.T) { + newOverrides := map[string]interface{}{ + "ingestion_rate": 6000, + "ingestion_burst_size": 7000, + } + requestBody, err := json.Marshal(newOverrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user3") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + req, err = http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user3") + + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var savedOverrides map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&savedOverrides) + require.NoError(t, err) + + assert.Equal(t, float64(6000), savedOverrides["ingestion_rate"]) + assert.Equal(t, float64(7000), savedOverrides["ingestion_burst_size"]) + }) + + t.Run("POST overrides with invalid limit", func(t *testing.T) { + invalidOverrides := map[string]interface{}{ + "invalid_limit": 5000, + } + requestBody, err := json.Marshal(invalidOverrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user4") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("POST overrides with invalid JSON", func(t *testing.T) { + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader([]byte("invalid json"))) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user5") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("DELETE overrides", func(t *testing.T) { + req, err := http.NewRequest("DELETE", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + req, err = http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + require.NoError(t, s.Stop(cortexSvc)) +} + +func TestOverridesAPIHardLimits(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + minio := e2edb.NewMinio(9001, "cortex") + require.NoError(t, s.StartAndWaitReady(minio)) + + // Runtime config with hard limits + runtimeConfig := map[string]interface{}{ + "overrides": map[string]interface{}{}, + "hard_overrides": map[string]interface{}{ + "user1": map[string]interface{}{ + "ingestion_rate": 10000, + "max_global_series_per_user": 50000, + }, + }, + "api_allowed_limits": []string{ + "ingestion_rate", + "max_global_series_per_user", + }, + } + runtimeConfigData, err := yaml.Marshal(runtimeConfig) + require.NoError(t, err) + + s3Client, err := s3.NewBucketWithConfig(nil, s3.Config{ + Endpoint: minio.HTTPEndpoint(), + Insecure: true, + Bucket: "cortex", + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + }, "overrides-test-hard-limits", nil) + require.NoError(t, err) + + require.NoError(t, s3Client.Upload(context.Background(), "runtime.yaml", bytes.NewReader(runtimeConfigData))) + + flags := map[string]string{ + "-target": "overrides", + + "-runtime-config.file": "runtime.yaml", + "-runtime-config.backend": "s3", + "-runtime-config.s3.access-key-id": e2edb.MinioAccessKey, + "-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey, + "-runtime-config.s3.bucket-name": "cortex", + "-runtime-config.s3.endpoint": minio.NetworkHTTPEndpoint(), + "-runtime-config.s3.insecure": "true", + } + + cortexSvc := e2ecortex.NewSingleBinary("cortex-overrides-hard-limits", flags, "") + require.NoError(t, s.StartAndWaitReady(cortexSvc)) + + t.Run("POST overrides within hard limits", func(t *testing.T) { + overrides := map[string]interface{}{ + "ingestion_rate": 5000, // Within hard limit of 10000 + "max_global_series_per_user": 25000, // Within hard limit of 50000 + } + requestBody, err := json.Marshal(overrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + }) + + t.Run("POST overrides exceeding hard limits", func(t *testing.T) { + overrides := map[string]interface{}{ + "ingestion_rate": 15000, // Exceeds hard limit of 10000 + } + requestBody, err := json.Marshal(overrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("POST overrides for user without hard limits", func(t *testing.T) { + overrides := map[string]interface{}{ + "ingestion_rate": 20000, // No hard limits for user2 + } + requestBody, err := json.Marshal(overrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user2") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + }) + + require.NoError(t, s.Stop(cortexSvc)) +} + +func TestOverridesAPITenantExtraction(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + minio := e2edb.NewMinio(9010, "cortex") + require.NoError(t, s.StartAndWaitReady(minio)) + + // Upload an empty runtime config file to S3 + runtimeConfig := map[string]interface{}{ + "overrides": map[string]interface{}{}, + } + runtimeConfigData, err := yaml.Marshal(runtimeConfig) + require.NoError(t, err) + + s3Client, err := s3.NewBucketWithConfig(nil, s3.Config{ + Endpoint: minio.HTTPEndpoint(), + Insecure: true, + Bucket: "cortex", + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + }, "overrides-test-tenant", nil) + require.NoError(t, err) + + require.NoError(t, s3Client.Upload(context.Background(), "runtime.yaml", bytes.NewReader(runtimeConfigData))) + + flags := map[string]string{ + "-target": "overrides", + + "-runtime-config.file": "runtime.yaml", + "-runtime-config.backend": "s3", + "-runtime-config.s3.access-key-id": e2edb.MinioAccessKey, + "-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey, + "-runtime-config.s3.bucket-name": "cortex", + "-runtime-config.s3.endpoint": minio.NetworkHTTPEndpoint(), + "-runtime-config.s3.insecure": "true", + } + + cortexSvc := e2ecortex.NewSingleBinary("cortex-overrides-tenant", flags, "") + require.NoError(t, s.StartAndWaitReady(cortexSvc)) + + t.Run("no tenant header", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) + }) + + t.Run("empty tenant header", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) + }) + + require.NoError(t, s.Stop(cortexSvc)) +} diff --git a/pkg/api/api.go b/pkg/api/api.go index ebe64440f9c..d3b96eca1c1 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -30,6 +30,7 @@ import ( frontendv2 "github.com/cortexproject/cortex/pkg/frontend/v2" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/ring" @@ -385,6 +386,17 @@ func (a *API) RegisterRulerAPI(r *ruler.API) { a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/rules/{namespace}"), http.HandlerFunc(r.DeleteNamespace), true, "DELETE") } +// RegisterOverrides registers routes associated with the Overrides API +func (a *API) RegisterOverrides(o *overrides.API) { + // Register individual overrides API routes with the main API + a.RegisterRoute("/api/v1/user-overrides", http.HandlerFunc(o.GetOverrides), true, "GET") + a.RegisterRoute("/api/v1/user-overrides", http.HandlerFunc(o.SetOverrides), true, "POST") + a.RegisterRoute("/api/v1/user-overrides", http.HandlerFunc(o.DeleteOverrides), true, "DELETE") + + // Add link to the index page + a.indexPage.AddLink(SectionAdminEndpoints, "/api/v1/user-overrides", "User Overrides API") +} + // RegisterRing registers the ring UI page associated with the distributor for writes. func (a *API) RegisterRing(r *ring.Ring) { a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/ring", "Ingester Ring Status") diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index f50fcf26e17..4c7c2f031ce 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -40,6 +40,7 @@ import ( frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/parquetconverter" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/tenantfederation" @@ -313,7 +314,8 @@ type Cortex struct { Server *server.Server Ring *ring.Ring TenantLimits validation.TenantLimits - Overrides *validation.Overrides + OverridesConfig *validation.Overrides + Overrides *overrides.API Distributor *distributor.Distributor Ingester *ingester.Ingester Flusher *flusher.Flusher diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 013dbb90834..c17cefe318d 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -35,6 +35,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend" "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/parquetconverter" "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier" @@ -66,6 +67,7 @@ const ( API string = "api" Ring string = "ring" RuntimeConfig string = "runtime-config" + OverridesConfig string = "overrides-config" Overrides string = "overrides" OverridesExporter string = "overrides-exporter" Server string = "server" @@ -199,13 +201,26 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) { return serv, err } -func (t *Cortex) initOverrides() (services.Service, error) { - t.Overrides = validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits) +func (t *Cortex) initOverridesConfig() (services.Service, error) { + t.OverridesConfig = validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits) // overrides don't have operational state, nor do they need to do anything more in starting/stopping phase, // so there is no need to return any service. return nil, nil } +func (t *Cortex) initOverrides() (services.Service, error) { + + overridesAPI, err := overrides.New(t.Cfg.RuntimeConfig, util_log.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, fmt.Errorf("failed to create overrides API: %w", err) + } + t.Overrides = overridesAPI + + t.API.RegisterOverrides(overridesAPI) + + return overridesAPI, nil +} + func (t *Cortex) initOverridesExporter() (services.Service, error) { if t.Cfg.isModuleEnabled(OverridesExporter) && t.TenantLimits == nil { // This target isn't enabled by default ("all") and requires per-tenant limits to @@ -232,7 +247,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) { // ruler's dependency) canJoinDistributorsRing := t.Cfg.isModuleEnabled(Distributor) || t.Cfg.isModuleEnabled(All) - t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer, util_log.Logger) + t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.OverridesConfig, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer, util_log.Logger) if err != nil { return } @@ -254,7 +269,7 @@ func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) { } func (t *Cortex) initDistributor() (serv services.Service, err error) { - t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides) + t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.OverridesConfig) return nil, nil } @@ -265,7 +280,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer) // Create a querier queryable and PromQL engine - t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.Overrides.QueryPartialData) + t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData) // Use distributor as default MetadataQuerier t.MetadataQuerier = t.Distributor @@ -424,12 +439,12 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) { //nolint:revive // I prefer this form over removing 'else', because it allows q to have smaller scope. var queriable prom_storage.Queryable - if q, err := initBlockStoreQueryable(t.Cfg, t.Overrides, prometheus.DefaultRegisterer); err != nil { + if q, err := initBlockStoreQueryable(t.Cfg, t.OverridesConfig, prometheus.DefaultRegisterer); err != nil { return nil, fmt.Errorf("failed to initialize querier: %v", err) } else { queriable = q if t.Cfg.Querier.EnableParquetQueryable { - pq, err := querier.NewParquetQueryable(t.Cfg.Querier, t.Cfg.BlocksStorage, t.Overrides, q, util_log.Logger, prometheus.DefaultRegisterer) + pq, err := querier.NewParquetQueryable(t.Cfg.Querier, t.Cfg.BlocksStorage, t.OverridesConfig, q, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, fmt.Errorf("failed to initialize parquet querier: %v", err) } @@ -479,7 +494,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin t.tsdbIngesterConfig() - t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, prometheus.DefaultRegisterer, util_log.Logger, t.ResourceMonitor) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.OverridesConfig, prometheus.DefaultRegisterer, util_log.Logger, t.ResourceMonitor) if err != nil { return } @@ -499,7 +514,7 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) { t.Flusher, err = flusher.New( t.Cfg.Flusher, t.Cfg.Ingester, - t.Overrides, + t.OverridesConfig, prometheus.DefaultRegisterer, util_log.Logger, ) @@ -534,7 +549,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, util_log.Logger, - t.Overrides, + t.OverridesConfig, queryrange.PrometheusResponseExtractor{}, prometheus.DefaultRegisterer, queryAnalyzer, @@ -551,7 +566,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro instantQueryMiddlewares, err := instantquery.Middlewares( util_log.Logger, - t.Overrides, + t.OverridesConfig, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta, @@ -569,7 +584,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro instantQueryMiddlewares, prometheusCodec, instantQueryCodec, - t.Overrides, + t.OverridesConfig, queryAnalyzer, t.Cfg.Querier.DefaultEvaluationInterval, t.Cfg.Querier.MaxSubQuerySteps, @@ -587,7 +602,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro func (t *Cortex) initQueryFrontend() (serv services.Service, err error) { retry := transport.NewRetry(t.Cfg.QueryRange.MaxRetries, prometheus.DefaultRegisterer) - roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.Overrides, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer, retry) + roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.OverridesConfig, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer, retry) if err != nil { return nil, err } @@ -622,7 +637,7 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) { return } - t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, util_log.Logger, prometheus.DefaultRegisterer) + t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.OverridesConfig, rules.FileLoader{}, util_log.Logger, prometheus.DefaultRegisterer) return } @@ -668,15 +683,15 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { } queryEngine := engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer) - managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer) - manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.OverridesConfig, metrics, prometheus.DefaultRegisterer) + manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.OverridesConfig, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) } else { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) // TODO: Consider wrapping logger to differentiate from querier module logger - queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.Overrides.RulesPartialData) + queryable, _, engine := querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData) - managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer) - manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.OverridesConfig, metrics, prometheus.DefaultRegisterer) + manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.OverridesConfig, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) } if err != nil { @@ -689,7 +704,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { prometheus.DefaultRegisterer, util_log.Logger, t.RulerStorage, - t.Overrides, + t.OverridesConfig, ) if err != nil { return @@ -724,12 +739,12 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { t.Cfg.Alertmanager.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort // Initialise the store. - store, err := alertstore.NewAlertStore(context.Background(), t.Cfg.AlertmanagerStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + store, err := alertstore.NewAlertStore(context.Background(), t.Cfg.AlertmanagerStorage, t.OverridesConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return } - t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, store, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, store, t.OverridesConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return } @@ -740,14 +755,14 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { func (t *Cortex) initParquetConverter() (serv services.Service, err error) { t.Cfg.ParquetConverter.Ring.ListenPort = t.Cfg.Server.GRPCListenPort - return parquetconverter.NewConverter(t.Cfg.ParquetConverter, t.Cfg.BlocksStorage, t.Cfg.Compactor.BlockRanges.ToMilliseconds(), util_log.Logger, prometheus.DefaultRegisterer, t.Overrides) + return parquetconverter.NewConverter(t.Cfg.ParquetConverter, t.Cfg.BlocksStorage, t.Cfg.Compactor.BlockRanges.ToMilliseconds(), util_log.Logger, prometheus.DefaultRegisterer, t.OverridesConfig) } func (t *Cortex) initCompactor() (serv services.Service, err error) { t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort ingestionReplicationFactor := t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor - t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides, ingestionReplicationFactor) + t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.OverridesConfig, ingestionReplicationFactor) if err != nil { return } @@ -760,7 +775,7 @@ func (t *Cortex) initCompactor() (serv services.Service, err error) { func (t *Cortex) initStoreGateway() (serv services.Service, err error) { t.Cfg.StoreGateway.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort - t.StoreGateway, err = storegateway.NewStoreGateway(t.Cfg.StoreGateway, t.Cfg.BlocksStorage, t.Overrides, t.Cfg.Server.LogLevel, util_log.Logger, prometheus.DefaultRegisterer, t.ResourceMonitor) + t.StoreGateway, err = storegateway.NewStoreGateway(t.Cfg.StoreGateway, t.Cfg.BlocksStorage, t.OverridesConfig, t.Cfg.Server.LogLevel, util_log.Logger, prometheus.DefaultRegisterer, t.ResourceMonitor) if err != nil { return nil, err } @@ -802,7 +817,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { func (t *Cortex) initTenantDeletionAPI() (services.Service, error) { // t.RulerStorage can be nil when running in single-binary mode, and rule storage is not configured. - tenantDeletionAPI, err := purger.NewTenantDeletionAPI(t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + tenantDeletionAPI, err := purger.NewTenantDeletionAPI(t.Cfg.BlocksStorage, t.OverridesConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -817,7 +832,7 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) { tenant.WithDefaultResolver(tenantfederation.NewRegexValidator()) } - s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled) + s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.OverridesConfig, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled) if err != nil { return nil, errors.Wrap(err, "query-scheduler init") } @@ -861,7 +876,8 @@ func (t *Cortex) setupModuleManager() error { mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule) mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule) mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule) - mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule) + mm.RegisterModule(OverridesConfig, t.initOverridesConfig, modules.UserInvisibleModule) + mm.RegisterModule(Overrides, t.initOverrides) mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(DistributorService, t.initDistributorService, modules.UserInvisibleModule) @@ -893,33 +909,34 @@ func (t *Cortex) setupModuleManager() error { MemberlistKV: {API}, RuntimeConfig: {API}, Ring: {API, RuntimeConfig, MemberlistKV}, - Overrides: {RuntimeConfig}, + OverridesConfig: {RuntimeConfig}, + Overrides: {API, OverridesConfig}, OverridesExporter: {RuntimeConfig}, Distributor: {DistributorService, API, GrpcClientService}, - DistributorService: {Ring, Overrides}, - Ingester: {IngesterService, Overrides, API}, - IngesterService: {Overrides, RuntimeConfig, MemberlistKV, ResourceMonitor}, - Flusher: {Overrides, API}, - Queryable: {Overrides, DistributorService, Overrides, Ring, API, StoreQueryable, MemberlistKV}, + DistributorService: {Ring, OverridesConfig}, + Ingester: {IngesterService, OverridesConfig, API}, + IngesterService: {OverridesConfig, RuntimeConfig, MemberlistKV, ResourceMonitor}, + Flusher: {OverridesConfig, API}, + Queryable: {OverridesConfig, DistributorService, OverridesConfig, Ring, API, StoreQueryable, MemberlistKV}, Querier: {TenantFederation}, - StoreQueryable: {Overrides, Overrides, MemberlistKV, GrpcClientService}, - QueryFrontendTripperware: {API, Overrides}, + StoreQueryable: {OverridesConfig, OverridesConfig, MemberlistKV, GrpcClientService}, + QueryFrontendTripperware: {API, OverridesConfig}, QueryFrontend: {QueryFrontendTripperware}, - QueryScheduler: {API, Overrides}, - Ruler: {DistributorService, Overrides, StoreQueryable, RulerStorage}, - RulerStorage: {Overrides}, + QueryScheduler: {API, OverridesConfig}, + Ruler: {DistributorService, OverridesConfig, StoreQueryable, RulerStorage}, + RulerStorage: {OverridesConfig}, Configs: {API}, - AlertManager: {API, MemberlistKV, Overrides}, - Compactor: {API, MemberlistKV, Overrides}, - ParquetConverter: {API, MemberlistKV, Overrides}, - StoreGateway: {API, Overrides, MemberlistKV, ResourceMonitor}, - TenantDeletion: {API, Overrides}, + AlertManager: {API, MemberlistKV, OverridesConfig}, + Compactor: {API, MemberlistKV, OverridesConfig}, + ParquetConverter: {API, MemberlistKV, OverridesConfig}, + StoreGateway: {API, OverridesConfig, MemberlistKV, ResourceMonitor}, + TenantDeletion: {API, OverridesConfig}, Purger: {TenantDeletion}, TenantFederation: {Queryable}, All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler, Compactor, AlertManager}, } if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil { - deps[Ruler] = []string{Overrides, RulerStorage} + deps[Ruler] = []string{OverridesConfig, RulerStorage} } for mod, targets := range deps { if err := mm.AddDependency(mod, targets...); err != nil { diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index c2bcc786d91..4b4dd298697 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -20,18 +20,8 @@ var ( tenantLimitCheckTargets = []string{All, Distributor, Querier, Ruler} ) -// RuntimeConfigValues are values that can be reloaded from configuration file while Cortex is running. -// Reloading is done by runtime_config.Manager, which also keeps the currently loaded config. -// These values are then pushed to the components that are interested in them. -type RuntimeConfigValues struct { - TenantLimits map[string]*validation.Limits `yaml:"overrides"` - - Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` - - IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"` - - IngesterLimits *ingester.InstanceLimits `yaml:"ingester_limits"` -} +// avoid circular imports +type RuntimeConfigValues = runtimeconfig.RuntimeConfigValues // runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager // that reads limits from a configuration file on disk and periodically reloads them. diff --git a/pkg/overrides/api.go b/pkg/overrides/api.go new file mode 100644 index 00000000000..c7b4b6656ae --- /dev/null +++ b/pkg/overrides/api.go @@ -0,0 +1,231 @@ +package overrides + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + + "github.com/go-kit/log/level" + "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +const ( + // Error messages + ErrInvalidJSON = "invalid JSON" + ErrUserNotFound = "user not found" + + // Runtime config errors + ErrRuntimeConfig = "runtime config read error" +) + +// getAllowedLimitsFromBucket reads allowed limits from the runtime config file +func (a *API) getAllowedLimitsFromBucket(ctx context.Context) ([]string, error) { + reader, err := a.bucketClient.Get(ctx, a.runtimeConfigPath) + if err != nil { + return []string{}, nil // No allowed limits if config doesn't exist + } + defer reader.Close() + + var config runtimeconfig.RuntimeConfigValues + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + level.Error(a.logger).Log("msg", "failed to decode runtime config", "err", err) + return []string{}, fmt.Errorf("failed to decode runtime config") + } + + return config.APIAllowedLimits, nil +} + +// GetOverrides retrieves overrides for a specific tenant +func (a *API) GetOverrides(w http.ResponseWriter, r *http.Request) { + userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + // Read overrides from bucket storage + overrides, err := a.getOverridesFromBucket(r.Context(), userID) + if err != nil { + if err.Error() == ErrUserNotFound { + http.Error(w, "user not found", http.StatusBadRequest) + } else { + level.Error(a.logger).Log("msg", "failed to get overrides from bucket", "userID", userID, "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(overrides); err != nil { + level.Error(a.logger).Log("msg", "failed to encode overrides response", "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } +} + +// SetOverrides updates overrides for a specific tenant +func (a *API) SetOverrides(w http.ResponseWriter, r *http.Request) { + userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + var overrides map[string]any + if err := json.NewDecoder(r.Body).Decode(&overrides); err != nil { + http.Error(w, ErrInvalidJSON, http.StatusBadRequest) + return + } + + // Get allowed limits from runtime config + allowedLimits, err := a.getAllowedLimitsFromBucket(r.Context()) + if err != nil { + level.Error(a.logger).Log("msg", "failed to get allowed limits from bucket", "userID", userID, "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + // Validate that only allowed limits are being changed + if err := ValidateOverrides(overrides, allowedLimits); err != nil { + level.Error(a.logger).Log("msg", "invalid overrides validation", "userID", userID, "err", err) + http.Error(w, "Invalid overrides", http.StatusBadRequest) + return + } + + // Validate that values don't exceed hard limits from runtime config + if err := a.validateHardLimits(overrides, userID); err != nil { + level.Error(a.logger).Log("msg", "hard limits validation failed", "userID", userID, "err", err) + http.Error(w, "Invalid overrides", http.StatusBadRequest) + return + } + + // Write overrides to bucket storage + if err := a.setOverridesToBucket(r.Context(), userID, overrides); err != nil { + level.Error(a.logger).Log("msg", "failed to set overrides to bucket", "userID", userID, "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// DeleteOverrides removes tenant-specific overrides +func (a *API) DeleteOverrides(w http.ResponseWriter, r *http.Request) { + userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + if err := a.deleteOverridesFromBucket(r.Context(), userID); err != nil { + level.Error(a.logger).Log("msg", "failed to delete overrides from bucket", "userID", userID, "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// getOverridesFromBucket reads overrides for a specific tenant from the runtime config file +func (a *API) getOverridesFromBucket(ctx context.Context, userID string) (map[string]any, error) { + reader, err := a.bucketClient.Get(ctx, a.runtimeConfigPath) + if err != nil { + return nil, fmt.Errorf("failed to get runtime config: %w", err) + } + defer reader.Close() + + var config runtimeconfig.RuntimeConfigValues + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + return nil, fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + + if config.TenantLimits != nil { + if tenantLimits, exists := config.TenantLimits[userID]; exists { + // Use YAML marshaling to convert validation.Limits to map[string]interface{} + // This follows the same pattern as the existing runtime config handler + yamlData, err := yaml.Marshal(tenantLimits) + if err != nil { + return nil, fmt.Errorf("failed to marshal limits: %w", err) + } + + var result map[string]any + if err := yaml.Unmarshal(yamlData, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal limits: %w", err) + } + + return result, nil + } + // User does not exist in config - return error + return nil, errors.New(ErrUserNotFound) + } + + // No tenant limits configured - return empty map (no overrides) + return map[string]any{}, nil +} + +// setOverridesToBucket writes overrides for a specific tenant to the runtime config file +func (a *API) setOverridesToBucket(ctx context.Context, userID string, overrides map[string]any) error { + var config runtimeconfig.RuntimeConfigValues + reader, err := a.bucketClient.Get(ctx, a.runtimeConfigPath) + if err == nil { + defer reader.Close() + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + return fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + } + + yamlData, err := yaml.Marshal(overrides) + if err != nil { + return fmt.Errorf("failed to marshal overrides: %w", err) + } + + var limits validation.Limits + if err := yaml.Unmarshal(yamlData, &limits); err != nil { + return fmt.Errorf("invalid overrides format: %w", err) + } + + if config.TenantLimits == nil { + config.TenantLimits = make(map[string]*validation.Limits) + } + + config.TenantLimits[userID] = &limits + + data, err := yaml.Marshal(config) + if err != nil { + return fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + + return a.bucketClient.Upload(ctx, a.runtimeConfigPath, bytes.NewReader(data)) +} + +// deleteOverridesFromBucket removes overrides for a specific tenant from the runtime config file +func (a *API) deleteOverridesFromBucket(ctx context.Context, userID string) error { + reader, err := a.bucketClient.Get(ctx, a.runtimeConfigPath) + if err != nil { + return nil + } + defer reader.Close() + + var config runtimeconfig.RuntimeConfigValues + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + return fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + + if config.TenantLimits != nil { + delete(config.TenantLimits, userID) + } + + data, err := yaml.Marshal(config) + if err != nil { + return fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + + return a.bucketClient.Upload(ctx, a.runtimeConfigPath, bytes.NewReader(data)) +} diff --git a/pkg/overrides/limits.go b/pkg/overrides/limits.go new file mode 100644 index 00000000000..84d3b41c8e5 --- /dev/null +++ b/pkg/overrides/limits.go @@ -0,0 +1,125 @@ +package overrides + +import ( + "context" + "fmt" + "slices" + "strconv" + "strings" + + "github.com/go-kit/log/level" + "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" +) + +const ( + // Error messages + ErrInvalidLimits = "the following limits cannot be modified via the overrides API" +) + +// No default allowed limits - these must be configured via runtime config + +// ValidateOverrides checks if the provided overrides only contain allowed limits +func ValidateOverrides(overrides map[string]any, allowedLimits []string) error { + var invalidLimits []string + + for limitName := range overrides { + if !slices.Contains(allowedLimits, limitName) { + invalidLimits = append(invalidLimits, limitName) + } + } + + if len(invalidLimits) > 0 { + return fmt.Errorf("%s: %s", ErrInvalidLimits, strings.Join(invalidLimits, ", ")) + } + + return nil +} + +// validateHardLimits checks if the provided overrides exceed any hard limits from the runtime config +func (a *API) validateHardLimits(overrides map[string]any, userID string) error { + // Read the runtime config to get hard limits + reader, err := a.bucketClient.Get(context.Background(), a.runtimeConfigPath) + if err != nil { + level.Error(a.logger).Log("msg", "failed to read hard limits configuration", "userID", userID, "err", err) + return fmt.Errorf("failed to validate hard limits") + } + defer reader.Close() + + var config runtimeconfig.RuntimeConfigValues + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + level.Error(a.logger).Log("msg", "failed to decode hard limits configuration", "userID", userID, "err", err) + return fmt.Errorf("failed to validate hard limits") + } + + // If no hard overrides are defined, allow the request + if config.HardTenantLimits == nil { + return nil + } + + // Get hard limits for this specific user + userHardLimits, exists := config.HardTenantLimits[userID] + if !exists { + return nil // No hard limits defined for this user + } + + yamlData, err := yaml.Marshal(userHardLimits) + if err != nil { + level.Error(a.logger).Log("msg", "failed to marshal hard limits", "userID", userID, "err", err) + return fmt.Errorf("failed to validate hard limits") + } + + var hardLimitsMap map[string]any + if err := yaml.Unmarshal(yamlData, &hardLimitsMap); err != nil { + level.Error(a.logger).Log("msg", "failed to unmarshal hard limits", "userID", userID, "err", err) + return fmt.Errorf("failed to validate hard limits") + } + + // Validate each override against the user's hard limits + for limitName, value := range overrides { + if hardLimit, exists := hardLimitsMap[limitName]; exists { + if err := a.validateSingleHardLimit(limitName, value, hardLimit); err != nil { + return err + } + } + } + + return nil +} + +// validateSingleHardLimit validates a single limit against its hard limit +func (a *API) validateSingleHardLimit(limitName string, value, hardLimit any) error { + // Convert both values to float64 for comparison + valueFloat, err := convertToFloat64(value) + if err != nil { + return nil // Skip validation for unparseable values + } + + hardLimitFloat, err := convertToFloat64(hardLimit) + if err != nil { + return nil // Skip validation for unparseable hard limits + } + + if valueFloat > hardLimitFloat { + return fmt.Errorf("limit %s exceeds hard limit: %f > %f", limitName, valueFloat, hardLimitFloat) + } + + return nil +} + +// convertToFloat64 converts any value to float64 +func convertToFloat64(v any) (float64, error) { + switch val := v.(type) { + case float64: + return val, nil + case int: + return float64(val), nil + case int64: + return float64(val), nil + case string: + return strconv.ParseFloat(val, 64) + default: + return 0, fmt.Errorf("unsupported type: %T", v) + } +} diff --git a/pkg/overrides/overrides.go b/pkg/overrides/overrides.go new file mode 100644 index 00000000000..47f38e90e9c --- /dev/null +++ b/pkg/overrides/overrides.go @@ -0,0 +1,79 @@ +package overrides + +import ( + "context" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/cortexproject/cortex/pkg/util/services" +) + +const ( + ErrInvalidOverridesConfiguration = "invalid overrides configuration" + ErrFailedToCreateBucketClient = "failed to create bucket client for overrides" +) + +type API struct { + services.Service + cfg runtimeconfig.Config + logger log.Logger + registerer prometheus.Registerer + bucketClient objstore.Bucket + runtimeConfigPath string +} + +func New(cfg runtimeconfig.Config, logger log.Logger, registerer prometheus.Registerer) (*API, error) { + if err := cfg.StorageConfig.Validate(); err != nil { + return nil, fmt.Errorf("%s: %w", ErrInvalidOverridesConfiguration, err) + } + + api := &API{ + cfg: cfg, + logger: logger, + registerer: registerer, + } + api.Service = services.NewBasicService(api.starting, api.running, api.stopping) + return api, nil +} + +func (a *API) starting(ctx context.Context) error { + level.Info(a.logger).Log("msg", "overrides API starting", "runtime_config_file", a.cfg.LoadPath, "backend", a.cfg.StorageConfig.Backend) + + bucketClient, err := bucket.NewClient(ctx, a.cfg.StorageConfig, nil, "overrides", a.logger, a.registerer) + if err != nil { + level.Error(a.logger).Log("msg", ErrFailedToCreateBucketClient, "err", err) + return fmt.Errorf("%s: %w", ErrFailedToCreateBucketClient, err) + } + a.bucketClient = bucketClient + + a.runtimeConfigPath = a.cfg.LoadPath + + level.Info(a.logger).Log("msg", "overrides API started successfully", "backend", a.cfg.StorageConfig.Backend) + return nil +} + +func (a *API) running(ctx context.Context) error { + level.Info(a.logger).Log("msg", "overrides API is now running and ready to handle requests") + + <-ctx.Done() + + level.Info(a.logger).Log("msg", "overrides API received shutdown signal") + return nil +} + +func (a *API) stopping(err error) error { + if err != nil { + level.Error(a.logger).Log("msg", "overrides API stopping due to error", "err", err) + } else { + level.Info(a.logger).Log("msg", "overrides API stopping gracefully") + } + + level.Info(a.logger).Log("msg", "overrides API stopped") + return nil +} diff --git a/pkg/overrides/overrides_test.go b/pkg/overrides/overrides_test.go new file mode 100644 index 00000000000..223fda1eca6 --- /dev/null +++ b/pkg/overrides/overrides_test.go @@ -0,0 +1,608 @@ +package overrides + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/s3" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/cortexproject/cortex/pkg/util/services" +) + +func TestConfig_Validate(t *testing.T) { + t.Parallel() + tests := map[string]struct { + initConfig func(*runtimeconfig.Config) + expected error + }{ + "default config should pass": { + initConfig: func(cfg *runtimeconfig.Config) { + // Set default values for bucket config + flagext.DefaultValues(&cfg.StorageConfig) + }, + expected: nil, + }, + "s3 config should pass": { + initConfig: func(cfg *runtimeconfig.Config) { + cfg.StorageConfig = bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + }, + expected: nil, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + t.Parallel() + cfg := runtimeconfig.Config{} + + testData.initConfig(&cfg) + + if testData.expected == nil { + assert.NoError(t, cfg.StorageConfig.Validate()) + } else { + assert.ErrorIs(t, cfg.StorageConfig.Validate(), testData.expected) + } + }) + } +} + +func TestConfig_RegisterFlags(t *testing.T) { + cfg := runtimeconfig.Config{} + + // Test that flags are registered without panicking + require.NotPanics(t, func() { + flagSet := flag.NewFlagSet("test", flag.PanicOnError) + cfg.RegisterFlags(flagSet) + }) +} + +func TestNew(t *testing.T) { + tests := map[string]struct { + cfg runtimeconfig.Config + expectError bool + }{ + "valid config should create API": { + cfg: func() runtimeconfig.Config { + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + return cfg + }(), + expectError: false, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + api, err := New(testData.cfg, log.Logger, prometheus.DefaultRegisterer) + + if testData.expectError { + assert.Error(t, err) + assert.Nil(t, api) + } else { + assert.NoError(t, err) + assert.NotNil(t, api) + } + }) + } +} + +func TestOverridesModuleServiceInterface(t *testing.T) { + // Create the API instance with proper configuration + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + api, err := New(cfg, log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + require.NotNil(t, api) + + // Verify it implements the Service interface + require.Implements(t, (*services.Service)(nil), api) + + // Verify initial state + assert.Equal(t, services.New, api.State()) + + // Verify the service has the expected methods + // This is a basic check that the service was properly constructed + assert.NotNil(t, api.Service) +} + +// TestAPIEndpoints tests the actual HTTP API endpoints +func TestAPIEndpoints(t *testing.T) { + tests := []struct { + name string + method string + path string + tenantID string + requestBody any + expectedStatus int + setupMock func(*bucket.ClientMock) + validateResponse func(*testing.T, *httptest.ResponseRecorder) + }{ + { + name: "GET overrides - no tenant ID", + method: "GET", + path: "/api/v1/user-overrides", + tenantID: "", + expectedStatus: http.StatusUnauthorized, + }, + { + name: "GET overrides - valid tenant ID, no overrides", + method: "GET", + path: "/api/v1/user-overrides", + tenantID: "user123", + expectedStatus: http.StatusOK, + setupMock: func(mock *bucket.ClientMock) { + // Mock that no overrides exist by passing empty content + mock.MockGet("runtime.yaml", "overrides:\n", nil) + }, + validateResponse: func(t *testing.T, recorder *httptest.ResponseRecorder) { + var response map[string]any + err := json.Unmarshal(recorder.Body.Bytes(), &response) + require.NoError(t, err) + assert.Empty(t, response) + }, + }, + { + name: "GET overrides - valid tenant ID, with overrides", + method: "GET", + path: "/api/v1/user-overrides", + tenantID: "user456", + expectedStatus: http.StatusOK, + setupMock: func(mock *bucket.ClientMock) { + overridesData := `overrides: + user456: + ingestion_rate: 5000 + max_global_series_per_user: 100000` + mock.MockGet("runtime.yaml", overridesData, nil) + }, + validateResponse: func(t *testing.T, recorder *httptest.ResponseRecorder) { + var response map[string]any + err := json.Unmarshal(recorder.Body.Bytes(), &response) + require.NoError(t, err) + assert.Equal(t, float64(5000), response["ingestion_rate"]) + assert.Equal(t, float64(100000), response["max_global_series_per_user"]) + }, + }, + { + name: "GET overrides - valid tenant ID, user does not exist", + method: "GET", + path: "/api/v1/user-overrides", + tenantID: "nonexistent_user", + expectedStatus: http.StatusBadRequest, + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with different user + overridesData := `overrides: + other_user: + ingestion_rate: 5000` + mock.MockGet("runtime.yaml", overridesData, nil) + }, + }, + { + name: "POST overrides - no tenant ID", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "", + requestBody: map[string]any{"ingestion_rate": 5000}, + expectedStatus: http.StatusUnauthorized, + }, + { + name: "POST overrides - valid tenant ID, valid overrides", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "user789", + requestBody: map[string]any{"ingestion_rate": 5000, "ruler_max_rules_per_rule_group": 10}, + expectedStatus: http.StatusOK, + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with allowed limits + runtimeConfig := `overrides: + user789: + ingestion_rate: 5000 + ruler_max_rules_per_rule_group: 10 +api_allowed_limits: + - ingestion_rate + - ruler_max_rules_per_rule_group + - max_global_series_per_user + - max_global_series_per_metric + - ingestion_burst_size + - ruler_max_rule_groups_per_tenant` + // Mock both reads: one for getAllowedLimitsFromBucket, one for setOverridesToBucket + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockUpload("runtime.yaml", nil) + }, + }, + { + name: "POST overrides - invalid limit name", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "user999", + requestBody: map[string]any{"invalid_limit": 5000}, + expectedStatus: http.StatusBadRequest, + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with allowed limits (invalid_limit not included) + runtimeConfig := `api_allowed_limits: + - ingestion_rate + - ruler_max_rules_per_rule_group + - max_global_series_per_user + - max_global_series_per_metric + - ingestion_burst_size + - ruler_max_rule_groups_per_tenant` + mock.MockGet("runtime.yaml", runtimeConfig, nil) + }, + }, + + { + name: "POST overrides - invalid JSON", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "user999", + requestBody: "invalid json", + expectedStatus: http.StatusBadRequest, + }, + { + name: "POST overrides - exceeding hard limit from runtime config", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "user999", + requestBody: map[string]any{"ingestion_rate": 1500000}, // Exceeds hard limit of 1000000 + expectedStatus: http.StatusBadRequest, + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with per-user hard limits and allowed limits + runtimeConfig := `overrides: + user999: + ingestion_rate: 1000 +hard_overrides: + user999: + ingestion_rate: 1000000 + max_global_series_per_user: 5000000 +api_allowed_limits: + - ingestion_rate + - max_global_series_per_user` + // Mock all reads: one for getAllowedLimitsFromBucket, one for validateHardLimits, one for setOverridesToBucket + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockUpload("runtime.yaml", nil) + }, + }, + { + name: "DELETE overrides - no tenant ID", + method: "DELETE", + path: "/api/v1/user-overrides", + tenantID: "", + expectedStatus: http.StatusUnauthorized, + }, + { + name: "DELETE overrides - valid tenant ID", + method: "DELETE", + path: "/api/v1/user-overrides", + tenantID: "user123", + expectedStatus: http.StatusOK, + setupMock: func(mock *bucket.ClientMock) { + // First read succeeds, then upload succeeds + mock.MockGet("runtime.yaml", "overrides:\n user123:\n ingestion_rate: 1000", nil) + mock.MockUpload("runtime.yaml", nil) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock bucket client + mockBucket := &bucket.ClientMock{} + if tt.setupMock != nil { + tt.setupMock(mockBucket) + } + + // Create the API instance with proper configuration + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + api, err := New(cfg, log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + require.NotNil(t, api) + + // Manually set the bucket client and runtime config path for testing + api.bucketClient = mockBucket + api.runtimeConfigPath = "runtime.yaml" + + // Create the request + var req *http.Request + if tt.requestBody != nil { + var body []byte + if str, ok := tt.requestBody.(string); ok { + body = []byte(str) + } else { + body, err = json.Marshal(tt.requestBody) + require.NoError(t, err) + } + req = httptest.NewRequest(tt.method, tt.path, bytes.NewReader(body)) + } else { + req = httptest.NewRequest(tt.method, tt.path, nil) + } + + // Add tenant ID header if provided + if tt.tenantID != "" { + req.Header.Set("X-Scope-OrgID", tt.tenantID) + } + + // Create response recorder + recorder := httptest.NewRecorder() + + // Call the appropriate handler based on method + switch tt.method { + case "GET": + api.GetOverrides(recorder, req) + case "POST": + api.SetOverrides(recorder, req) + case "DELETE": + api.DeleteOverrides(recorder, req) + default: + t.Fatalf("Unsupported method: %s", tt.method) + } + + // Assert status code + assert.Equal(t, tt.expectedStatus, recorder.Code) + + // Validate response if validation function provided + if tt.validateResponse != nil { + tt.validateResponse(t, recorder) + } + }) + } +} + +// TestAPITenantExtraction tests tenant ID extraction from various header formats +func TestAPITenantExtraction(t *testing.T) { + tests := []struct { + name string + headers map[string]string + expectedTenant string + expectError bool + setupMock func(*bucket.ClientMock) + }{ + { + name: "X-Scope-OrgID header", + headers: map[string]string{"X-Scope-OrgID": "tenant1"}, + expectedTenant: "tenant1", + expectError: false, + setupMock: func(mock *bucket.ClientMock) { + // Mock successful get with empty overrides + mock.MockGet("runtime.yaml", "overrides:\n", nil) + }, + }, + { + name: "no tenant header", + headers: map[string]string{}, + expectedTenant: "", + expectError: true, + }, + { + name: "empty tenant header", + headers: map[string]string{"X-Scope-OrgID": ""}, + expectedTenant: "", + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock bucket client + mockBucket := &bucket.ClientMock{} + if tt.setupMock != nil { + tt.setupMock(mockBucket) + } + + // Create the API instance with proper configuration + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + api, err := New(cfg, log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + require.NotNil(t, api) + + // Manually set the bucket client and runtime config path for testing + api.bucketClient = mockBucket + api.runtimeConfigPath = "runtime.yaml" + + // Create the request + req := httptest.NewRequest("GET", "/api/v1/user-overrides", nil) + for key, value := range tt.headers { + req.Header.Set(key, value) + } + + // Create response recorder + recorder := httptest.NewRecorder() + + // Call the handler + api.GetOverrides(recorder, req) + + // Assert based on expected behavior + if tt.expectError { + assert.Equal(t, http.StatusUnauthorized, recorder.Code) + } else { + assert.Equal(t, http.StatusOK, recorder.Code) + } + }) + } +} + +// TestAPIBucketErrors tests how the API handles bucket operation errors +func TestAPIBucketErrors(t *testing.T) { + tests := []struct { + name string + method string + tenantID string + setupMock func(*bucket.ClientMock) + expectedStatus int + }{ + { + name: "GET overrides - bucket error returns internal server error", + method: "GET", + tenantID: "user123", + setupMock: func(mock *bucket.ClientMock) { + mock.MockGet("runtime.yaml", "", fmt.Errorf("bucket error")) + }, + expectedStatus: http.StatusInternalServerError, + }, + { + name: "POST overrides - bucket upload error", + method: "POST", + tenantID: "user456", + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with allowed limits + runtimeConfig := `overrides: + user456: + ingestion_rate: 1000 +api_allowed_limits: + - ingestion_rate + - max_global_series_per_user` + // First read succeeds (for allowed limits), then upload fails + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockUpload("runtime.yaml", fmt.Errorf("upload error")) + }, + expectedStatus: http.StatusInternalServerError, + }, + { + name: "DELETE overrides - bucket delete error", + method: "DELETE", + tenantID: "user789", + setupMock: func(mock *bucket.ClientMock) { + // First read succeeds, then upload fails + mock.MockGet("runtime.yaml", "overrides:\n user789:\n ingestion_rate: 1000", nil) + mock.MockUpload("runtime.yaml", fmt.Errorf("upload error")) + }, + expectedStatus: http.StatusInternalServerError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock bucket client + mockBucket := &bucket.ClientMock{} + tt.setupMock(mockBucket) + + // Create the API instance with proper configuration + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + api, err := New(cfg, log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + require.NotNil(t, api) + + // Manually set the bucket client and runtime config path for testing + api.bucketClient = mockBucket + api.runtimeConfigPath = "runtime.yaml" + + // Create the request + var req *http.Request + if tt.method == "POST" { + requestBody := map[string]any{"ingestion_rate": 5000} + body, err := json.Marshal(requestBody) + require.NoError(t, err) + req = httptest.NewRequest(tt.method, "/api/v1/user-overrides", bytes.NewReader(body)) + } else { + req = httptest.NewRequest(tt.method, "/api/v1/user-overrides", nil) + } + + // Add tenant ID header + req.Header.Set("X-Scope-OrgID", tt.tenantID) + + // Create response recorder + recorder := httptest.NewRecorder() + + // Call the appropriate handler + switch tt.method { + case "GET": + api.GetOverrides(recorder, req) + case "POST": + api.SetOverrides(recorder, req) + case "DELETE": + api.DeleteOverrides(recorder, req) + } + + // Assert status code + assert.Equal(t, tt.expectedStatus, recorder.Code) + }) + } +} diff --git a/pkg/util/runtimeconfig/types.go b/pkg/util/runtimeconfig/types.go new file mode 100644 index 00000000000..19285d77c73 --- /dev/null +++ b/pkg/util/runtimeconfig/types.go @@ -0,0 +1,21 @@ +package runtimeconfig + +import ( + "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +type RuntimeConfigValues struct { + TenantLimits map[string]*validation.Limits `yaml:"overrides"` + + Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` + + IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"` + + IngesterLimits *ingester.InstanceLimits `yaml:"ingester_limits"` + + HardTenantLimits map[string]*validation.Limits `yaml:"hard_overrides,omitempty"` + + APIAllowedLimits []string `yaml:"api_allowed_limits,omitempty"` +}