From 1d2cd49f91f41de71275e6f545f0ff7952cad162 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Sat, 1 Nov 2025 18:49:58 -0700 Subject: [PATCH 1/4] idk --- .../v1/test_trace_item_attribute_values_v1.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/web/rpc/v1/test_trace_item_attribute_values_v1.py b/tests/web/rpc/v1/test_trace_item_attribute_values_v1.py index 2e226281feb..f02088b2ac8 100644 --- a/tests/web/rpc/v1/test_trace_item_attribute_values_v1.py +++ b/tests/web/rpc/v1/test_trace_item_attribute_values_v1.py @@ -108,6 +108,12 @@ def setup_teardown(clickhouse_db: None, redis_db: None) -> Generator[List[bytes] "sentry.transaction": AnyValue(string_value="*foo"), }, ), + gen_item_message( + start_timestamp=start_timestamp, + attributes={ + "metric.questions.6._id": AnyValue(string_value="jlfsj"), + }, + ), ] write_raw_unprocessed_events(items_storage, messages) # type: ignore yield messages @@ -181,3 +187,13 @@ def test_item_id_substring_match(self, setup_teardown: List[bytes]) -> None: ) res = AttributeValuesRequest().execute(req) assert res.values == [item_id] + + def test_attribute_names_with_dots(self, setup_teardown: Any) -> None: + message = TraceItemAttributeValuesRequest( + meta=COMMON_META, + limit=10, + key=AttributeKey(name="metric.questions.6._id", type=AttributeKey.TYPE_STRING), + ) + response = AttributeValuesRequest().execute(message) + assert len(response.values) == 1 + assert set(response.values) == {"jlfsj"} From f85a64457b9e4f60c377cc3aa59253d644595c51 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Sun, 9 Nov 2025 15:32:27 -0800 Subject: [PATCH 2/4] feat(cbrs): load based routing strategy --- .../routing_strategies/load_based.py | 62 ++++++++++ .../v1/routing_strategies/test_load_based.py | 111 ++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 snuba/web/rpc/storage_routing/routing_strategies/load_based.py create mode 100644 tests/web/rpc/v1/routing_strategies/test_load_based.py diff --git a/snuba/web/rpc/storage_routing/routing_strategies/load_based.py b/snuba/web/rpc/storage_routing/routing_strategies/load_based.py new file mode 100644 index 00000000000..7a680cde973 --- /dev/null +++ b/snuba/web/rpc/storage_routing/routing_strategies/load_based.py @@ -0,0 +1,62 @@ +import sentry_sdk + +from snuba.configs.configuration import Configuration +from snuba.web.rpc.storage_routing.routing_strategies.outcomes_based import ( + OutcomesBasedRoutingStrategy, +) +from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( + RoutingDecision, +) + + +class LoadBasedRoutingStrategy(OutcomesBasedRoutingStrategy): + """ + If cluster load is under a threshold, ignore recommendations and allow the query to pass through with the tier decided based on outcomes-based routing. + """ + + def _additional_config_definitions(self) -> list[Configuration]: + return [ + Configuration( + name="pass_through_load_percentage", + description="If cluster load is below this percentage, allow the query to run regardless of allocation policies", + value_type=int, + default=20, + ), + Configuration( + name="pass_through_max_threads", + description="Max threads to use when allowing the query to pass through under low load", + value_type=int, + default=10, + ), + ] + + def _update_routing_decision( + self, + routing_decision: RoutingDecision, + ) -> None: + super()._update_routing_decision(routing_decision) + + load_info = routing_decision.routing_context.cluster_load_info + if load_info is None: + return + + pass_through_threshold = int(self.get_config_value("pass_through_load_percentage")) + pass_through_max_threads = int(self.get_config_value("pass_through_max_threads")) + + if load_info.cluster_load < pass_through_threshold: + routing_decision.can_run = True + routing_decision.is_throttled = False + routing_decision.clickhouse_settings["max_threads"] = pass_through_max_threads + routing_decision.routing_context.extra_info["load_based_pass_through"] = { + "cluster_load": load_info.cluster_load, + "threshold": pass_through_threshold, + "max_threads": pass_through_max_threads, + } + sentry_sdk.update_current_span( # pyright: ignore[reportUndefinedVariable] + attributes={ + "load_based_pass_through": True, + "cluster_load": load_info.cluster_load, + "pass_through_threshold": pass_through_threshold, + "pass_through_max_threads": pass_through_max_threads, + } + ) diff --git a/tests/web/rpc/v1/routing_strategies/test_load_based.py b/tests/web/rpc/v1/routing_strategies/test_load_based.py new file mode 100644 index 00000000000..1c08f90f72a --- /dev/null +++ b/tests/web/rpc/v1/routing_strategies/test_load_based.py @@ -0,0 +1,111 @@ +import uuid +from datetime import UTC, datetime, timedelta +from unittest.mock import patch + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import TraceItemTableRequest +from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType + +from snuba.configs.configuration import Configuration, ResourceIdentifier +from snuba.datasets.storages.storage_key import StorageKey +from snuba.query.allocation_policies import ( + MAX_THRESHOLD, + NO_SUGGESTION, + NO_UNITS, + AllocationPolicy, + QueryResultOrError, + QuotaAllowance, +) +from snuba.utils.metrics.timer import Timer +from snuba.web.rpc.storage_routing.load_retriever import LoadInfo +from snuba.web.rpc.storage_routing.routing_strategies.load_based import ( + LoadBasedRoutingStrategy, +) +from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( + BaseRoutingStrategy, + RoutingContext, +) + +BASE_TIME = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) +_PROJECT_ID = 1 +_ORG_ID = 1 + + +def _get_request_meta(hour_interval: int = 1) -> RequestMeta: + start = BASE_TIME - timedelta(hours=hour_interval) + end = BASE_TIME + return RequestMeta( + project_ids=[_PROJECT_ID], + organization_id=_ORG_ID, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(start.timestamp())), + end_timestamp=Timestamp(seconds=int(end.timestamp())), + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + downsampled_storage_config=DownsampledStorageConfig( + mode=DownsampledStorageConfig.MODE_NORMAL + ), + ) + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +def test_load_based_routing_pass_through_even_if_policies_reject() -> None: + # policy that always rejects (can_run=False) + class RejectAllPolicy(AllocationPolicy): + def _additional_config_definitions(self) -> list[Configuration]: + return [] + + def _get_quota_allowance( + self, tenant_ids: dict[str, str | int], query_id: str + ) -> QuotaAllowance: + return QuotaAllowance( + can_run=False, + max_threads=0, + explanation={"reason": "reject all"}, + is_throttled=True, + throttle_threshold=MAX_THRESHOLD, + rejection_threshold=MAX_THRESHOLD, + quota_used=0, + quota_unit=NO_UNITS, + suggestion=NO_SUGGESTION, + ) + + def _update_quota_balance( + self, + tenant_ids: dict[str, str | int], + query_id: str, + result_or_error: QueryResultOrError, + ) -> None: + return + + strategy = LoadBasedRoutingStrategy() + request = TraceItemTableRequest(meta=_get_request_meta(hour_interval=1)) + context = RoutingContext( + in_msg=request, + timer=Timer("test"), + query_id=uuid.uuid4().hex, + ) + + with patch.object( + BaseRoutingStrategy, + "get_allocation_policies", + return_value=[ + RejectAllPolicy(ResourceIdentifier(StorageKey("doesntmatter")), ["org_id"], {}) + ], + ): + with patch( + "snuba.web.rpc.storage_routing.routing_strategies.storage_routing.get_cluster_loadinfo", + return_value=LoadInfo(cluster_load=5.0, concurrent_queries=1), + ): + routing_decision = strategy.get_routing_decision(context) + + assert routing_decision.can_run is True + assert routing_decision.clickhouse_settings.get("max_threads") == 10 + assert "load_based_pass_through" in routing_decision.routing_context.extra_info + assert ( + routing_decision.routing_context.extra_info["load_based_pass_through"]["cluster_load"] + == 5.0 + ) From 300ca779e44863a38eb2f6cd80f8feccf48a1693 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Sun, 9 Nov 2025 15:37:57 -0800 Subject: [PATCH 3/4] revert file --- .../v1/test_trace_item_attribute_values_v1.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/tests/web/rpc/v1/test_trace_item_attribute_values_v1.py b/tests/web/rpc/v1/test_trace_item_attribute_values_v1.py index f02088b2ac8..2e226281feb 100644 --- a/tests/web/rpc/v1/test_trace_item_attribute_values_v1.py +++ b/tests/web/rpc/v1/test_trace_item_attribute_values_v1.py @@ -108,12 +108,6 @@ def setup_teardown(clickhouse_db: None, redis_db: None) -> Generator[List[bytes] "sentry.transaction": AnyValue(string_value="*foo"), }, ), - gen_item_message( - start_timestamp=start_timestamp, - attributes={ - "metric.questions.6._id": AnyValue(string_value="jlfsj"), - }, - ), ] write_raw_unprocessed_events(items_storage, messages) # type: ignore yield messages @@ -187,13 +181,3 @@ def test_item_id_substring_match(self, setup_teardown: List[bytes]) -> None: ) res = AttributeValuesRequest().execute(req) assert res.values == [item_id] - - def test_attribute_names_with_dots(self, setup_teardown: Any) -> None: - message = TraceItemAttributeValuesRequest( - meta=COMMON_META, - limit=10, - key=AttributeKey(name="metric.questions.6._id", type=AttributeKey.TYPE_STRING), - ) - response = AttributeValuesRequest().execute(message) - assert len(response.values) == 1 - assert set(response.values) == {"jlfsj"} From 0c56783188eac32f4e4b97eef8b7e4d476e8e7f6 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Sun, 9 Nov 2025 22:01:36 -0800 Subject: [PATCH 4/4] fix --- .../rpc/storage_routing/routing_strategies/load_based.py | 1 - tests/web/rpc/v1/routing_strategies/test_load_based.py | 6 ++---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/snuba/web/rpc/storage_routing/routing_strategies/load_based.py b/snuba/web/rpc/storage_routing/routing_strategies/load_based.py index 7a680cde973..a967604cee8 100644 --- a/snuba/web/rpc/storage_routing/routing_strategies/load_based.py +++ b/snuba/web/rpc/storage_routing/routing_strategies/load_based.py @@ -48,7 +48,6 @@ def _update_routing_decision( routing_decision.is_throttled = False routing_decision.clickhouse_settings["max_threads"] = pass_through_max_threads routing_decision.routing_context.extra_info["load_based_pass_through"] = { - "cluster_load": load_info.cluster_load, "threshold": pass_through_threshold, "max_threads": pass_through_max_threads, } diff --git a/tests/web/rpc/v1/routing_strategies/test_load_based.py b/tests/web/rpc/v1/routing_strategies/test_load_based.py index 1c08f90f72a..80bcd6a5c96 100644 --- a/tests/web/rpc/v1/routing_strategies/test_load_based.py +++ b/tests/web/rpc/v1/routing_strategies/test_load_based.py @@ -105,7 +105,5 @@ def _update_quota_balance( assert routing_decision.can_run is True assert routing_decision.clickhouse_settings.get("max_threads") == 10 assert "load_based_pass_through" in routing_decision.routing_context.extra_info - assert ( - routing_decision.routing_context.extra_info["load_based_pass_through"]["cluster_load"] - == 5.0 - ) + assert routing_decision.routing_context.cluster_load_info is not None + assert routing_decision.routing_context.cluster_load_info.cluster_load == 5.0