From 926a01503fc7e430ca86bb7f262d293893465265 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 24 Nov 2025 13:07:48 -0800 Subject: [PATCH 01/16] feat(eap): Allow arrays to be queried from EAP --- snuba/web/rpc/v1/endpoint_get_trace.py | 30 ++++++++++++++ .../configuration/test_storage_loader.py | 12 ++++++ tests/web/rpc/v1/test_endpoint_get_trace.py | 40 ++++++++++++++----- tests/web/rpc/v1/test_utils.py | 35 ++++++++++++---- 4 files changed, 98 insertions(+), 19 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index 5269af2b016..792b1045f22 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -1,3 +1,4 @@ +import json import random import uuid from datetime import datetime @@ -58,6 +59,7 @@ "project_id", "trace_id", "sampling_factor", + "attributes_bool", ] APPLY_FINAL_ROLLOUT_PERCENTAGE_CONFIG_KEY = "EndpointGetTrace.apply_final_rollout_percentage" @@ -219,6 +221,14 @@ def _build_query( tuple(column(f"attributes_float_{i}") for i in range(40)), ), ), + SelectedExpression( + name=("attributes_array"), + expression=FunctionCall( + "attributes_array", + "toJSONString", + (column("attributes_array"),), + ), + ), ] selected_columns.extend( map( @@ -418,6 +428,21 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa ) +def _transform_array_value(value: dict[str, str]) -> Any: + for t, v in value.items(): + if t == "Int": + return int(v) + return v + + +def _process_arrays(raw: str) -> dict[str, list[Any]]: + parsed = json.loads(raw) + arrays = {} + for key, values in parsed.items(): + arrays[key] = [_transform_array_value(v) for v in values] + return arrays + + def _process_results( data: Iterable[Dict[str, Any]], ) -> ProcessedResults: @@ -433,6 +458,7 @@ def _process_results( for row in data: id = row.pop("id") ts = row.pop("timestamp") + arrays = row.pop("attributes_array") last_seen_timestamp_precise = float(ts) last_seen_id = id @@ -459,6 +485,10 @@ def add_attribute(key: str, value: Any) -> None: else: add_attribute(key, value) + attributes_array = _process_arrays(arrays) + for key, value in attributes_array.items(): + add_attribute(k, v) + item = GetTraceResponse.Item( id=id, timestamp=timestamp, diff --git a/tests/datasets/configuration/test_storage_loader.py b/tests/datasets/configuration/test_storage_loader.py index d7d88588348..28afffd0f97 100644 --- a/tests/datasets/configuration/test_storage_loader.py +++ b/tests/datasets/configuration/test_storage_loader.py @@ -5,6 +5,7 @@ from typing import Any from snuba.clickhouse.columns import ( + JSON, Array, Bool, Column, @@ -196,6 +197,13 @@ def test_column_parser(self) -> None: "schema_modifiers": ["nullable"], }, }, + { + "name": "json_col", + "type": "JSON", + "args": { + "max_dynamic_paths": 128, + }, + }, ] expected_python_columns = [ @@ -222,6 +230,10 @@ def test_column_parser(self) -> None: SchemaModifiers(nullable=True, readonly=False), ), ), + Column( + "json_col", + JSON(max_dynamic_paths=128), + ), ] assert parse_columns(serialized_columns) == expected_python_columns diff --git a/tests/web/rpc/v1/test_endpoint_get_trace.py b/tests/web/rpc/v1/test_endpoint_get_trace.py index 788b6849abd..4b2af1251af 100644 --- a/tests/web/rpc/v1/test_endpoint_get_trace.py +++ b/tests/web/rpc/v1/test_endpoint_get_trace.py @@ -21,7 +21,11 @@ ResponseMeta, TraceItemType, ) -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + Array, + AttributeKey, + AttributeValue, +) from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem from snuba import state @@ -79,14 +83,14 @@ for i in range(10) ] - -_PROTOBUF_TO_SENTRY_PROTOS = { +_PROTOBUF_TO_SENTRY_PROTOS: dict[str, tuple[str, AttributeKey.Type.ValueType]] = { "string_value": ("val_str", AttributeKey.Type.TYPE_STRING), "double_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE), # we store integers as double "int_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE), # we store boolean as double "bool_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE), + "array_value": ("val_array", AttributeKey.Type.TYPE_ARRAY), } @@ -111,18 +115,29 @@ def get_attributes( value=attribute_value, ) ) - for key, value in span.attributes.items(): + + def _convert_to_attribute_value(value: AnyValue) -> AttributeValue: value_type = value.WhichOneof("value") if value_type: - attribute_key = AttributeKey( - name=key, - type=_PROTOBUF_TO_SENTRY_PROTOS[value_type][1], - ) - args = {_PROTOBUF_TO_SENTRY_PROTOS[value_type][0]: getattr(value, value_type)} + arg_name = _PROTOBUF_TO_SENTRY_PROTOS[str(value_type)][0] + arg_value = getattr(value, value_type) + if value_type == "array_value": + arg_value = Array(values=[_convert_to_attribute_value(v) for v in arg_value.values]) + args = {arg_name: arg_value} else: - continue + args = {"is_null": True} + + return AttributeValue(**args) - attribute_value = AttributeValue(**args) + for key, value in span.attributes.items(): + value_type = value.WhichOneof("value") + if not value_type: + continue + attribute_key = AttributeKey( + name=key, + type=_PROTOBUF_TO_SENTRY_PROTOS[str(value_type)][1], + ) + attribute_value = _convert_to_attribute_value(value) attributes.append( GetTraceResponse.Item.Attribute( key=attribute_key, @@ -134,7 +149,10 @@ def get_attributes( @pytest.fixture(autouse=False) def setup_teardown(clickhouse_db: None, redis_db: None) -> None: + state.set_config("eap_items_consumer_insert_arrays", "1") + items_storage = get_storage(StorageKey("eap_items")) + write_raw_unprocessed_events(items_storage, _SPANS) # type: ignore write_raw_unprocessed_events(items_storage, _LOGS) # type: ignore diff --git a/tests/web/rpc/v1/test_utils.py b/tests/web/rpc/v1/test_utils.py index 29b65f372ee..f7cc86c6db9 100644 --- a/tests/web/rpc/v1/test_utils.py +++ b/tests/web/rpc/v1/test_utils.py @@ -11,7 +11,7 @@ OrFilter, TraceItemFilter, ) -from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem +from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, ArrayValue, TraceItem from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey @@ -69,6 +69,17 @@ "transaction.method": AnyValue(string_value="POST"), "transaction.op": AnyValue(string_value="http.server"), "user": AnyValue(string_value="ip:127.0.0.1"), + "i_am_an_array": AnyValue( + array_value=ArrayValue( + values=[ + AnyValue(int_value=1), + AnyValue(int_value=2), + AnyValue(int_value=3), + AnyValue(int_value=4), + AnyValue(int_value=5), + ] + ) + ), } @@ -89,16 +100,24 @@ def write_eap_item( count: the number of these spans to write. """ - attributes: dict[str, AnyValue] = {} - for key, value in raw_attributes.items(): + def convert_attribute_value(v: Any) -> AnyValue: if isinstance(value, str): - attributes[key] = AnyValue(string_value=value) + return AnyValue(string_value=value) elif isinstance(value, int): - attributes[key] = AnyValue(int_value=value) + return AnyValue(int_value=value) elif isinstance(value, bool): - attributes[key] = AnyValue(bool_value=value) - else: - attributes[key] = AnyValue(double_value=value) + return AnyValue(bool_value=value) + elif isinstance(value, float): + return AnyValue(double_value=value) + elif isinstance(value, list): + return AnyValue( + array_value=ArrayValue(values=[convert_attribute_value(v) for v in value]) + ) + return AnyValue() + + attributes: dict[str, AnyValue] = {} + for key, value in raw_attributes.items(): + attributes[key] = convert_attribute_value(value) write_raw_unprocessed_events( get_storage(StorageKey("eap_items")), # type: ignore From d23fcabd41fbfdd4d76d2967694c2efbb5852687 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 24 Nov 2025 14:36:12 -0800 Subject: [PATCH 02/16] Rename variables --- snuba/web/rpc/v1/endpoint_get_trace.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index 792b1045f22..11447f071d4 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -478,16 +478,16 @@ def add_attribute(key: str, value: Any) -> None: ) ) - for key, value in row.items(): - if isinstance(value, dict): - for k, v in value.items(): - add_attribute(k, v) + for row_key, row_value in row.items(): + if isinstance(row_value, dict): + for column_key, column_value in row_value.items(): + add_attribute(column_key, column_value) else: - add_attribute(key, value) + add_attribute(row_key, row_value) attributes_array = _process_arrays(arrays) - for key, value in attributes_array.items(): - add_attribute(k, v) + for array_key, array_value in attributes_array.items(): + add_attribute(array_key, array_value) item = GetTraceResponse.Item( id=id, From c8ef22d509bc23aa96b7c2f2840bec9c97e4c355 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 24 Nov 2025 14:37:46 -0800 Subject: [PATCH 03/16] Fix parameter name --- tests/web/rpc/v1/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/web/rpc/v1/test_utils.py b/tests/web/rpc/v1/test_utils.py index f7cc86c6db9..760a867267c 100644 --- a/tests/web/rpc/v1/test_utils.py +++ b/tests/web/rpc/v1/test_utils.py @@ -100,7 +100,7 @@ def write_eap_item( count: the number of these spans to write. """ - def convert_attribute_value(v: Any) -> AnyValue: + def convert_attribute_value(value: Any) -> AnyValue: if isinstance(value, str): return AnyValue(string_value=value) elif isinstance(value, int): From 30cfed576be6365cae585a9c37d4d7c7eb19944e Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 24 Nov 2025 16:07:31 -0800 Subject: [PATCH 04/16] Use proper types for booleans and integers --- snuba/web/rpc/v1/endpoint_get_trace.py | 91 +++++++++++++++------ tests/web/rpc/v1/test_endpoint_get_trace.py | 6 +- 2 files changed, 70 insertions(+), 27 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index 11447f071d4..2c488c957c5 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -13,7 +13,11 @@ GetTraceResponse, ) from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + Array, + AttributeKey, + AttributeValue, +) from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( AndFilter, ComparisonFilter, @@ -60,6 +64,7 @@ "trace_id", "sampling_factor", "attributes_bool", + "attributes_int", ] APPLY_FINAL_ROLLOUT_PERCENTAGE_CONFIG_KEY = "EndpointGetTrace.apply_final_rollout_percentage" @@ -206,7 +211,7 @@ def _build_query( else: selected_columns += [ SelectedExpression( - name=("attributes_string"), + name="attributes_string", expression=FunctionCall( ("attributes_string"), "mapConcat", @@ -214,7 +219,7 @@ def _build_query( ), ), SelectedExpression( - name=("attributes_float"), + name="attributes_float", expression=FunctionCall( ("attributes_float"), "mapConcat", @@ -222,7 +227,7 @@ def _build_query( ), ), SelectedExpression( - name=("attributes_array"), + name="attributes_array", expression=FunctionCall( "attributes_array", "toJSONString", @@ -373,6 +378,35 @@ def _build_snuba_request( ) +def convert_to_attribute_value(value: Any) -> AttributeValue: + if isinstance(value, int): + return AttributeValue( + val_int=value, + ) + elif isinstance(value, bool): + return AttributeValue( + val_bool=value, + ) + elif isinstance(value, float): + return AttributeValue( + val_double=value, + ) + elif isinstance(value, str): + return AttributeValue( + val_str=value, + ) + elif isinstance(value, list): + return AttributeValue( + val_array=Array(values=[convert_to_attribute_value(v) for v in value]) + ) + elif isinstance(value, datetime): + return AttributeValue( + val_double=value.timestamp(), + ) + else: + raise BadSnubaRPCRequestException(f"data type unknown: {type(value)}") + + def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeValue]: if isinstance(value, int): return ( @@ -380,9 +414,15 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa name=key, type=AttributeKey.Type.TYPE_INT, ), - AttributeValue( - val_int=value, + convert_to_attribute_value(value), + ) + elif isinstance(value, bool): + return ( + AttributeKey( + name=key, + type=AttributeKey.Type.TYPE_BOOLEAN, ), + convert_to_attribute_value(value), ) elif isinstance(value, float): return ( @@ -390,9 +430,7 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa name=key, type=AttributeKey.Type.TYPE_DOUBLE, ), - AttributeValue( - val_double=value, - ), + convert_to_attribute_value(value), ) elif isinstance(value, str): return ( @@ -400,9 +438,12 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa name=key, type=AttributeKey.Type.TYPE_STRING, ), - AttributeValue( - val_str=value, - ), + convert_to_attribute_value(value), + ) + elif isinstance(value, list): + return ( + AttributeKey(name=key, type=AttributeKey.Type.TYPE_ARRAY), + convert_to_attribute_value(value), ) elif isinstance(value, datetime): return ( @@ -410,9 +451,7 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa name=key, type=AttributeKey.Type.TYPE_DOUBLE, ), - AttributeValue( - val_double=value.timestamp(), - ), + convert_to_attribute_value(value), ) else: raise BadSnubaRPCRequestException(f"data type unknown: {type(value)}") @@ -458,7 +497,9 @@ def _process_results( for row in data: id = row.pop("id") ts = row.pop("timestamp") - arrays = row.pop("attributes_array") + arrays = row.pop("attributes_array", "{}") + booleans = row.pop("selected_attributes_bool", {}) + integers = row.pop("selected_attributes_int", {}) last_seen_timestamp_precise = float(ts) last_seen_id = id @@ -467,15 +508,13 @@ def _process_results( # then transform to nanoseconds timestamp.FromNanoseconds(int(ts * 1e6) * 1000) - attributes: list[GetTraceResponse.Item.Attribute] = [] + attributes: dict[str, GetTraceResponse.Item.Attribute] = {} def add_attribute(key: str, value: Any) -> None: attribute_key, attribute_value = _value_to_attribute(key, value) - attributes.append( - GetTraceResponse.Item.Attribute( - key=attribute_key, - value=attribute_value, - ) + attributes[key] = GetTraceResponse.Item.Attribute( + key=attribute_key, + value=attribute_value, ) for row_key, row_value in row.items(): @@ -489,11 +528,17 @@ def add_attribute(key: str, value: Any) -> None: for array_key, array_value in attributes_array.items(): add_attribute(array_key, array_value) + for bool_key, bool_value in booleans.items(): + add_attribute(bool_key, bool_value) + + for int_key, int_value in integers.items(): + add_attribute(int_key, int_value) + item = GetTraceResponse.Item( id=id, timestamp=timestamp, attributes=sorted( - attributes, + attributes.values(), key=attrgetter("key.name"), ), ) diff --git a/tests/web/rpc/v1/test_endpoint_get_trace.py b/tests/web/rpc/v1/test_endpoint_get_trace.py index 4b2af1251af..acefdf94321 100644 --- a/tests/web/rpc/v1/test_endpoint_get_trace.py +++ b/tests/web/rpc/v1/test_endpoint_get_trace.py @@ -86,10 +86,8 @@ _PROTOBUF_TO_SENTRY_PROTOS: dict[str, tuple[str, AttributeKey.Type.ValueType]] = { "string_value": ("val_str", AttributeKey.Type.TYPE_STRING), "double_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE), - # we store integers as double - "int_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE), - # we store boolean as double - "bool_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE), + "int_value": ("val_int", AttributeKey.Type.TYPE_INT), + "bool_value": ("val_bool", AttributeKey.Type.TYPE_BOOLEAN), "array_value": ("val_array", AttributeKey.Type.TYPE_ARRAY), } From 66cc844bfbeb047e348be0aa509bd51e832cc1ec Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 24 Nov 2025 16:29:24 -0800 Subject: [PATCH 05/16] Fix type condition order --- snuba/web/rpc/v1/endpoint_get_trace.py | 23 +++++++++++--------- tests/web/rpc/v1/test_endpoint_get_trace.py | 24 +++++++++------------ 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index 2c488c957c5..7ddafb63a75 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -379,13 +379,14 @@ def _build_snuba_request( def convert_to_attribute_value(value: Any) -> AttributeValue: - if isinstance(value, int): + print(value) + if isinstance(value, bool): return AttributeValue( - val_int=value, + val_bool=value, ) - elif isinstance(value, bool): + elif isinstance(value, int): return AttributeValue( - val_bool=value, + val_int=value, ) elif isinstance(value, float): return AttributeValue( @@ -408,19 +409,19 @@ def convert_to_attribute_value(value: Any) -> AttributeValue: def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeValue]: - if isinstance(value, int): + if isinstance(value, bool): return ( AttributeKey( name=key, - type=AttributeKey.Type.TYPE_INT, + type=AttributeKey.Type.TYPE_BOOLEAN, ), convert_to_attribute_value(value), ) - elif isinstance(value, bool): + elif isinstance(value, int): return ( AttributeKey( name=key, - type=AttributeKey.Type.TYPE_BOOLEAN, + type=AttributeKey.Type.TYPE_INT, ), convert_to_attribute_value(value), ) @@ -498,8 +499,10 @@ def _process_results( id = row.pop("id") ts = row.pop("timestamp") arrays = row.pop("attributes_array", "{}") - booleans = row.pop("selected_attributes_bool", {}) - integers = row.pop("selected_attributes_int", {}) + # We want to merge these values after to overwrite potential floats + # with the same name. + booleans = row.pop("attributes_bool", {}) + integers = row.pop("attributes_int", {}) last_seen_timestamp_precise = float(ts) last_seen_id = id diff --git a/tests/web/rpc/v1/test_endpoint_get_trace.py b/tests/web/rpc/v1/test_endpoint_get_trace.py index acefdf94321..530fc688515 100644 --- a/tests/web/rpc/v1/test_endpoint_get_trace.py +++ b/tests/web/rpc/v1/test_endpoint_get_trace.py @@ -95,23 +95,21 @@ def get_attributes( span: TraceItem, ) -> list[GetTraceResponse.Item.Attribute]: - attributes: list[GetTraceResponse.Item.Attribute] = [ - GetTraceResponse.Item.Attribute( + attributes: dict[str, GetTraceResponse.Item.Attribute] = { + "sampling_factor": GetTraceResponse.Item.Attribute( key=AttributeKey( name="sampling_factor", type=AttributeKey.Type.TYPE_DOUBLE, ), value=AttributeValue(val_double=1.0), ), - ] + } for key in {"organization_id", "project_id", "trace_id"}: attribute_key, attribute_value = _value_to_attribute(key, getattr(span, key)) - attributes.append( - GetTraceResponse.Item.Attribute( - key=attribute_key, - value=attribute_value, - ) + attributes[key] = GetTraceResponse.Item.Attribute( + key=attribute_key, + value=attribute_value, ) def _convert_to_attribute_value(value: AnyValue) -> AttributeValue: @@ -136,13 +134,11 @@ def _convert_to_attribute_value(value: AnyValue) -> AttributeValue: type=_PROTOBUF_TO_SENTRY_PROTOS[str(value_type)][1], ) attribute_value = _convert_to_attribute_value(value) - attributes.append( - GetTraceResponse.Item.Attribute( - key=attribute_key, - value=attribute_value, - ) + attributes[key] = GetTraceResponse.Item.Attribute( + key=attribute_key, + value=attribute_value, ) - return attributes + return list(attributes.values()) @pytest.fixture(autouse=False) From 36913e1eed75f12b0aee9e40d070fc019e4cd999 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Wed, 26 Nov 2025 11:40:31 -0800 Subject: [PATCH 06/16] Support all scalar types for arrays --- snuba/web/rpc/v1/endpoint_get_trace.py | 8 +++++++- tests/web/rpc/v1/test_utils.py | 7 +++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index 7ddafb63a75..1e87f3f686b 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -472,7 +472,13 @@ def _transform_array_value(value: dict[str, str]) -> Any: for t, v in value.items(): if t == "Int": return int(v) - return v + if t == "Bool": + return bool(v) + if t == "Double": + return float(v) + if t == "String": + return v + raise BadSnubaRPCRequestException(f"array value type unknown: {type(v)}") def _process_arrays(raw: str) -> dict[str, list[Any]]: diff --git a/tests/web/rpc/v1/test_utils.py b/tests/web/rpc/v1/test_utils.py index 760a867267c..0647e795313 100644 --- a/tests/web/rpc/v1/test_utils.py +++ b/tests/web/rpc/v1/test_utils.py @@ -73,10 +73,9 @@ array_value=ArrayValue( values=[ AnyValue(int_value=1), - AnyValue(int_value=2), - AnyValue(int_value=3), - AnyValue(int_value=4), - AnyValue(int_value=5), + AnyValue(bool_value=True), + AnyValue(double_value=3.0), + AnyValue(string_value="blah"), ] ) ), From 4a4deca593491fb7656f0ba1c5bf801436cae0fc Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 28 Nov 2025 17:38:59 -0800 Subject: [PATCH 07/16] Fix array insertion --- rust_snuba/src/processors/eap_items.rs | 20 ++++++++++---------- snuba/web/rpc/v1/endpoint_get_trace.py | 1 - tests/web/rpc/v1/test_endpoint_get_trace.py | 4 ++-- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 2384bc5173e..d8d5b00b71b 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -106,16 +106,7 @@ impl TryFrom for EAPItem { Some(Value::DoubleValue(double)) => eap_item.attributes.insert_float(key, double), Some(Value::IntValue(int)) => eap_item.attributes.insert_int(key, int), Some(Value::BoolValue(bool)) => eap_item.attributes.insert_bool(key, bool), - Some(Value::ArrayValue(array)) => { - if get_str_config(INSERT_ARRAYS_CONFIG) - .ok() - .flatten() - .unwrap_or("0".to_string()) - == "1" - { - eap_item.attributes.insert_array(key, array) - } - } + Some(Value::ArrayValue(array)) => eap_item.attributes.insert_array(key, array), Some(Value::BytesValue(_)) => (), Some(Value::KvlistValue(_)) => (), None => (), @@ -229,6 +220,15 @@ impl AttributeMap { } pub fn insert_array(&mut self, k: String, v: ArrayValue) { + if get_str_config(INSERT_ARRAYS_CONFIG) + .ok() + .flatten() + .unwrap_or("0".to_string()) + != "1" + { + return; + } + let mut values: Vec = Vec::default(); for value in v.values { match value.value { diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index 1e87f3f686b..a8b52afa342 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -379,7 +379,6 @@ def _build_snuba_request( def convert_to_attribute_value(value: Any) -> AttributeValue: - print(value) if isinstance(value, bool): return AttributeValue( val_bool=value, diff --git a/tests/web/rpc/v1/test_endpoint_get_trace.py b/tests/web/rpc/v1/test_endpoint_get_trace.py index 530fc688515..ea7bdfd16a2 100644 --- a/tests/web/rpc/v1/test_endpoint_get_trace.py +++ b/tests/web/rpc/v1/test_endpoint_get_trace.py @@ -143,8 +143,6 @@ def _convert_to_attribute_value(value: AnyValue) -> AttributeValue: @pytest.fixture(autouse=False) def setup_teardown(clickhouse_db: None, redis_db: None) -> None: - state.set_config("eap_items_consumer_insert_arrays", "1") - items_storage = get_storage(StorageKey("eap_items")) write_raw_unprocessed_events(items_storage, _SPANS) # type: ignore @@ -176,6 +174,8 @@ def test_without_data(self) -> None: assert response.status_code == 200, error_proto def test_with_data_all_attributes(self, setup_teardown: Any) -> None: + state.set_config("eap_items_consumer_insert_arrays", "1") + ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) message = GetTraceRequest( From 95029734f4ffa2ab33cc7734aae43b39327b6f67 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Sun, 30 Nov 2025 13:23:59 -0800 Subject: [PATCH 08/16] Remove unnecessary type conversion --- snuba/web/rpc/v1/endpoint_get_trace.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index a8b52afa342..506d36bdf7f 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -471,11 +471,9 @@ def _transform_array_value(value: dict[str, str]) -> Any: for t, v in value.items(): if t == "Int": return int(v) - if t == "Bool": - return bool(v) if t == "Double": return float(v) - if t == "String": + if t in {"String", "Bool"}: return v raise BadSnubaRPCRequestException(f"array value type unknown: {type(v)}") From 2a02f9115111080de4683097084caeac29dd8fe3 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Sun, 30 Nov 2025 13:24:35 -0800 Subject: [PATCH 09/16] Handle None values better --- snuba/web/rpc/v1/endpoint_get_trace.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index 506d36bdf7f..dac0d910ecf 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -501,11 +501,11 @@ def _process_results( for row in data: id = row.pop("id") ts = row.pop("timestamp") - arrays = row.pop("attributes_array", "{}") + arrays = row.pop("attributes_array", "{}") or "{}" # We want to merge these values after to overwrite potential floats # with the same name. - booleans = row.pop("attributes_bool", {}) - integers = row.pop("attributes_int", {}) + booleans = row.pop("attributes_bool", {}) or {} + integers = row.pop("attributes_int", {}) or {} last_seen_timestamp_precise = float(ts) last_seen_id = id From e63a8360fe81b0ef428ba9094c6ed9f5d59038ee Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Sun, 30 Nov 2025 13:24:54 -0800 Subject: [PATCH 10/16] Make sure array insertion is enabled --- tests/web/rpc/v1/test_endpoint_get_trace.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/web/rpc/v1/test_endpoint_get_trace.py b/tests/web/rpc/v1/test_endpoint_get_trace.py index ea7bdfd16a2..20e299e1adc 100644 --- a/tests/web/rpc/v1/test_endpoint_get_trace.py +++ b/tests/web/rpc/v1/test_endpoint_get_trace.py @@ -145,6 +145,8 @@ def _convert_to_attribute_value(value: AnyValue) -> AttributeValue: def setup_teardown(clickhouse_db: None, redis_db: None) -> None: items_storage = get_storage(StorageKey("eap_items")) + state.set_config("eap_items_consumer_insert_arrays", "1") + write_raw_unprocessed_events(items_storage, _SPANS) # type: ignore write_raw_unprocessed_events(items_storage, _LOGS) # type: ignore From 67cd446dc7761ae98b3c857217b4167c3b27d97b Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Sun, 30 Nov 2025 20:00:42 -0800 Subject: [PATCH 11/16] Remove feature flag --- rust_snuba/src/processors/eap_items.rs | 13 ------------- tests/web/rpc/v1/test_endpoint_get_trace.py | 4 ---- 2 files changed, 17 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index d8d5b00b71b..bda16624860 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -15,10 +15,6 @@ use crate::config::ProcessorConfig; use crate::processors::utils::enforce_retention; use crate::types::{InsertBatch, ItemTypeMetrics, KafkaMessageMetadata}; -use crate::runtime_config::get_str_config; - -const INSERT_ARRAYS_CONFIG: &str = "eap_items_consumer_insert_arrays"; - pub fn process_message( msg: KafkaPayload, _metadata: KafkaMessageMetadata, @@ -220,15 +216,6 @@ impl AttributeMap { } pub fn insert_array(&mut self, k: String, v: ArrayValue) { - if get_str_config(INSERT_ARRAYS_CONFIG) - .ok() - .flatten() - .unwrap_or("0".to_string()) - != "1" - { - return; - } - let mut values: Vec = Vec::default(); for value in v.values { match value.value { diff --git a/tests/web/rpc/v1/test_endpoint_get_trace.py b/tests/web/rpc/v1/test_endpoint_get_trace.py index 20e299e1adc..51036af8174 100644 --- a/tests/web/rpc/v1/test_endpoint_get_trace.py +++ b/tests/web/rpc/v1/test_endpoint_get_trace.py @@ -145,8 +145,6 @@ def _convert_to_attribute_value(value: AnyValue) -> AttributeValue: def setup_teardown(clickhouse_db: None, redis_db: None) -> None: items_storage = get_storage(StorageKey("eap_items")) - state.set_config("eap_items_consumer_insert_arrays", "1") - write_raw_unprocessed_events(items_storage, _SPANS) # type: ignore write_raw_unprocessed_events(items_storage, _LOGS) # type: ignore @@ -176,8 +174,6 @@ def test_without_data(self) -> None: assert response.status_code == 200, error_proto def test_with_data_all_attributes(self, setup_teardown: Any) -> None: - state.set_config("eap_items_consumer_insert_arrays", "1") - ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) message = GetTraceRequest( From c6beab6aa3a8c81429a977aef437fe8b1b2286d6 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Sun, 30 Nov 2025 20:02:07 -0800 Subject: [PATCH 12/16] Handle more None cases --- snuba/web/rpc/v1/endpoint_get_trace.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index dac0d910ecf..3ea6f3933ee 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -479,7 +479,7 @@ def _transform_array_value(value: dict[str, str]) -> Any: def _process_arrays(raw: str) -> dict[str, list[Any]]: - parsed = json.loads(raw) + parsed = json.loads(raw) or {} arrays = {} for key, values in parsed.items(): arrays[key] = [_transform_array_value(v) for v in values] From 7543ea5c3c4121a28f58bbd30f935d2747e2e7e3 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Sun, 30 Nov 2025 20:41:03 -0800 Subject: [PATCH 13/16] Lint and format Rust code --- rust_snuba/src/processors/eap_items.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index bda16624860..e1ef522c3ca 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -238,7 +238,6 @@ impl AttributeMap { mod tests { use std::time::SystemTime; - use crate::runtime_config::patch_str_config_for_test; use prost_types::Timestamp; use sentry_protos::snuba::v1::any_value::Value; use sentry_protos::snuba::v1::{AnyValue, ArrayValue, TraceItemType}; @@ -440,8 +439,6 @@ mod tests { }, ); - patch_str_config_for_test(INSERT_ARRAYS_CONFIG, Some("1")); - let eap_item = EAPItem::try_from(trace_item); assert!(eap_item.is_ok()); From 11cce906329681e9b1b9986c6c985d409fd45127 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 1 Dec 2025 13:00:17 -0800 Subject: [PATCH 14/16] Add the runtime config again --- rust_snuba/src/processors/eap_items.rs | 25 +++++++++++++++++++-- tests/web/rpc/v1/test_endpoint_get_trace.py | 2 ++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index e1ef522c3ca..f56ec3fd410 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -15,6 +15,10 @@ use crate::config::ProcessorConfig; use crate::processors::utils::enforce_retention; use crate::types::{InsertBatch, ItemTypeMetrics, KafkaMessageMetadata}; +use crate::runtime_config::get_str_config; + +const INSERT_ARRAYS_CONFIG: &str = "eap_items_consumer_insert_arrays"; + pub fn process_message( msg: KafkaPayload, _metadata: KafkaMessageMetadata, @@ -164,8 +168,15 @@ enum EAPValue { seq_attrs! { #[derive(Debug, Default, Serialize)] struct AttributeMap { + #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_bool: HashMap, + + #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_int: HashMap, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes_array: HashMap>, + #( #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_string_~N: HashMap, @@ -173,8 +184,6 @@ struct AttributeMap { #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_float_~N: HashMap, )* - - attributes_array: HashMap>, } } @@ -216,6 +225,15 @@ impl AttributeMap { } pub fn insert_array(&mut self, k: String, v: ArrayValue) { + if get_str_config(INSERT_ARRAYS_CONFIG) + .ok() + .flatten() + .unwrap_or("0".to_string()) + != "1" + { + return; + } + let mut values: Vec = Vec::default(); for value in v.values { match value.value { @@ -238,6 +256,7 @@ impl AttributeMap { mod tests { use std::time::SystemTime; + use crate::runtime_config::patch_str_config_for_test; use prost_types::Timestamp; use sentry_protos::snuba::v1::any_value::Value; use sentry_protos::snuba::v1::{AnyValue, ArrayValue, TraceItemType}; @@ -439,6 +458,8 @@ mod tests { }, ); + patch_str_config_for_test(INSERT_ARRAYS_CONFIG, Some("1")); + let eap_item = EAPItem::try_from(trace_item); assert!(eap_item.is_ok()); diff --git a/tests/web/rpc/v1/test_endpoint_get_trace.py b/tests/web/rpc/v1/test_endpoint_get_trace.py index 51036af8174..36f90a57e43 100644 --- a/tests/web/rpc/v1/test_endpoint_get_trace.py +++ b/tests/web/rpc/v1/test_endpoint_get_trace.py @@ -145,6 +145,8 @@ def _convert_to_attribute_value(value: AnyValue) -> AttributeValue: def setup_teardown(clickhouse_db: None, redis_db: None) -> None: items_storage = get_storage(StorageKey("eap_items")) + state.set_config("eap_items_consumer_insert_arrays", "1") + write_raw_unprocessed_events(items_storage, _SPANS) # type: ignore write_raw_unprocessed_events(items_storage, _LOGS) # type: ignore From 48ebb87563ddd9e4e2c0a7a9e56a351b2e67dacf Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 1 Dec 2025 15:23:46 -0800 Subject: [PATCH 15/16] Remove the runtime config --- rust_snuba/src/processors/eap_items.rs | 17 +---------------- tests/web/rpc/v1/test_endpoint_get_trace.py | 2 -- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index f56ec3fd410..fe62c328572 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -15,10 +15,6 @@ use crate::config::ProcessorConfig; use crate::processors::utils::enforce_retention; use crate::types::{InsertBatch, ItemTypeMetrics, KafkaMessageMetadata}; -use crate::runtime_config::get_str_config; - -const INSERT_ARRAYS_CONFIG: &str = "eap_items_consumer_insert_arrays"; - pub fn process_message( msg: KafkaPayload, _metadata: KafkaMessageMetadata, @@ -225,16 +221,8 @@ impl AttributeMap { } pub fn insert_array(&mut self, k: String, v: ArrayValue) { - if get_str_config(INSERT_ARRAYS_CONFIG) - .ok() - .flatten() - .unwrap_or("0".to_string()) - != "1" - { - return; - } - let mut values: Vec = Vec::default(); + for value in v.values { match value.value { Some(Value::StringValue(string)) => values.push(EAPValue::String(string)), @@ -256,7 +244,6 @@ impl AttributeMap { mod tests { use std::time::SystemTime; - use crate::runtime_config::patch_str_config_for_test; use prost_types::Timestamp; use sentry_protos::snuba::v1::any_value::Value; use sentry_protos::snuba::v1::{AnyValue, ArrayValue, TraceItemType}; @@ -458,8 +445,6 @@ mod tests { }, ); - patch_str_config_for_test(INSERT_ARRAYS_CONFIG, Some("1")); - let eap_item = EAPItem::try_from(trace_item); assert!(eap_item.is_ok()); diff --git a/tests/web/rpc/v1/test_endpoint_get_trace.py b/tests/web/rpc/v1/test_endpoint_get_trace.py index 36f90a57e43..51036af8174 100644 --- a/tests/web/rpc/v1/test_endpoint_get_trace.py +++ b/tests/web/rpc/v1/test_endpoint_get_trace.py @@ -145,8 +145,6 @@ def _convert_to_attribute_value(value: AnyValue) -> AttributeValue: def setup_teardown(clickhouse_db: None, redis_db: None) -> None: items_storage = get_storage(StorageKey("eap_items")) - state.set_config("eap_items_consumer_insert_arrays", "1") - write_raw_unprocessed_events(items_storage, _SPANS) # type: ignore write_raw_unprocessed_events(items_storage, _LOGS) # type: ignore From 3267be109a90b50fa4831fe0317eda86752571fb Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 1 Dec 2025 15:42:56 -0800 Subject: [PATCH 16/16] Update snapshots --- ...a-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap | 1 - 1 file changed, 1 deletion(-) diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap index b5e49f749cc..bf570a6e3c1 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap @@ -4,7 +4,6 @@ expression: snapshot_payload --- [ { - "attributes_array": {}, "attributes_bool": { "some_bool": true },