From 65c659d6e31f568d2bb2984f5cd2ed5e1a3bc967 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 9 Dec 2025 21:22:27 +0100 Subject: [PATCH 1/3] fix(spanv2): Indexed rate limits should not drop metric payloads --- .../src/processing/spans/dynamic_sampling.rs | 32 ++++-- relay-server/src/processing/spans/mod.rs | 29 +++-- tests/integration/test_spansv2.py | 101 ++++++++++++++++++ 3 files changed, 145 insertions(+), 17 deletions(-) diff --git a/relay-server/src/processing/spans/dynamic_sampling.rs b/relay-server/src/processing/spans/dynamic_sampling.rs index 28cf8051f9..9ef013002c 100644 --- a/relay-server/src/processing/spans/dynamic_sampling.rs +++ b/relay-server/src/processing/spans/dynamic_sampling.rs @@ -141,7 +141,20 @@ pub async fn run( Err(metrics) } -/// Type returned by [`create_indexed_metrics`]. +/// Rejects the indexed portion of the provided spans and returns the total count as metrics. +/// +/// This is used when the indexed payload is designated to be dropped *after* dynamic sampling (decision is keep), +/// but metrics have not yet been extracted. +pub fn reject_indexed_spans( + spans: Managed, + error: Error, +) -> Managed { + let (indexed, total) = split_indexed_and_total(spans); + let _ = indexed.reject_err(error); + total +} + +/// Type returned by [`try_split_indexed_and_total`]. /// /// Contains the indexed spans and the metrics extracted from the spans. type SpansAndMetrics = (Managed>, Managed); @@ -150,7 +163,7 @@ type SpansAndMetrics = (Managed>, Managed, ctx: Context<'_>, ) -> Either, SpansAndMetrics> { @@ -158,8 +171,17 @@ pub fn create_indexed_metrics( return Either::Left(spans); } + Either::Right(split_indexed_and_total(spans)) +} + +/// Splits spans into indexed spans and metrics representing the total counts. +/// +/// Dynamic sampling internal function, outside users should use the safer, use case driven variants +/// [`try_split_indexed_and_total`] and [`reject_indexed_spans`]. +fn split_indexed_and_total(spans: Managed) -> SpansAndMetrics { let scoping = spans.scoping(); - let (indexed, metrics) = spans.split_once(|spans| { + + spans.split_once(|spans| { let metrics = create_metrics( scoping, spans.spans.len() as u32, @@ -168,9 +190,7 @@ pub fn create_indexed_metrics( ); (spans.into_indexed(), metrics) - }); - - Either::Right((indexed, metrics)) + }) } async fn compute(spans: &Managed, ctx: Context<'_>) -> SamplingResult { diff --git a/relay-server/src/processing/spans/mod.rs b/relay-server/src/processing/spans/mod.rs index c0c6efb59e..b04b6ca53f 100644 --- a/relay-server/src/processing/spans/mod.rs +++ b/relay-server/src/processing/spans/mod.rs @@ -14,6 +14,7 @@ use crate::integrations::Integration; use crate::managed::{ Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected, }; +use crate::metrics_extraction::transactions::ExtractedMetrics; use crate::processing::trace_attachments::forward::attachment_to_item; use crate::processing::trace_attachments::process::ScrubAttachmentError; use crate::processing::trace_attachments::types::ExpandedAttachment; @@ -181,17 +182,21 @@ impl processing::Processor for SpansProcessor { process::normalize(&mut spans, &self.geo_lookup, ctx); filter::filter(&mut spans, ctx); - let mut spans = self.limiter.enforce_quotas(spans, ctx).await?; + let spans = self.limiter.enforce_quotas(spans, ctx).await?; + let mut spans = match spans.transpose() { + Either::Left(spans) => spans, + Either::Right(metrics) => return Ok(Output::metrics(metrics)), + }; process::scrub(&mut spans, ctx); - Ok(match dynamic_sampling::create_indexed_metrics(spans, ctx) { - Either::Left(spans) => Output::just(SpanOutput::TotalAndIndexed(spans)), - Either::Right((spans, metrics)) => Output { + match dynamic_sampling::try_split_indexed_and_total(spans, ctx) { + Either::Left(spans) => Ok(Output::just(SpanOutput::TotalAndIndexed(spans))), + Either::Right((spans, metrics)) => Ok(Output { main: Some(SpanOutput::Indexed(spans)), metrics: Some(metrics), - }, - }) + }), + } } } @@ -535,14 +540,14 @@ impl Counted for ExpandedSpans { } impl RateLimited for Managed> { - type Output = Self; + type Output = Managed, ExtractedMetrics>>; type Error = Error; async fn enforce( mut self, mut rate_limiter: R, _: Context<'_>, - ) -> Result> + ) -> Result> where R: processing::RateLimiter, { @@ -567,8 +572,10 @@ impl RateLimited for Managed> { .try_consume(scoping.item(DataCategory::SpanIndexed), span) .await; if !limits.is_empty() { - // If there is a span quota reject all the spans and the associated attachments. - return Err(self.reject_err(Error::from(limits))); + // If there is an indexed span quota reject all the spans and the associated attachments, + // but keep the total counts. + let total = dynamic_sampling::reject_indexed_spans(self, limits.into()); + return Ok(total.map(|total, _| Either::Right(total))); } } @@ -592,7 +599,7 @@ impl RateLimited for Managed> { .await; } - Ok(self) + Ok(self.map(|s, _| Either::Left(s))) } } diff --git a/tests/integration/test_spansv2.py b/tests/integration/test_spansv2.py index e882d7c326..092854421c 100644 --- a/tests/integration/test_spansv2.py +++ b/tests/integration/test_spansv2.py @@ -8,6 +8,7 @@ from .test_dynamic_sampling import add_sampling_config +import uuid import json import pytest @@ -268,6 +269,106 @@ def test_spansv2_ds_drop(mini_sentry, relay, rule_type): assert mini_sentry.captured_outcomes.empty() +@pytest.mark.parametrize("rate_limit", [DataCategory.SPAN, DataCategory.SPAN_INDEXED]) +def test_spansv2_rate_limits(mini_sentry, relay, rate_limit): + """ + The test asserts that dynamic sampling correctly drops items, based on different rule types + and makes sure the correct outcomes and metrics are emitted. + """ + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + ] + + ts = datetime.now(timezone.utc) + + project_config["config"]["quotas"] = [ + { + "categories": [rate_limit.name.lower()], + "limit": 0, + "window": int(ts.timestamp()), + "id": uuid.uuid4(), + "reasonCode": "rate_limit_exceeded", + } + ] + + relay = relay(mini_sentry, options=TEST_CONFIG) + + envelope = envelope_with_spans( + { + "start_timestamp": ts.timestamp(), + "end_timestamp": ts.timestamp() + 0.5, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b175", + "is_segment": True, + "name": "some op", + "status": "ok", + }, + trace_info={ + "trace_id": "5b8efff798038103d269b633813fc60c", + "public_key": project_config["publicKeys"][0]["publicKey"], + }, + ) + + relay.send_envelope(project_id, envelope) + + assert mini_sentry.get_aggregated_outcomes() == [ + *( + [ + { + "category": 12, + "key_id": 123, + "org_id": 1, + "outcome": 2, + "project_id": 42, + "quantity": 1, + "reason": "rate_limit_exceeded", + } + ] + if rate_limit == DataCategory.SPAN + else [] + ), + { + "category": DataCategory.SPAN_INDEXED.value, + "key_id": 123, + "org_id": 1, + "outcome": 2, + "project_id": 42, + "quantity": 1, + "reason": "rate_limit_exceeded", + }, + ] + + if rate_limit == DataCategory.SPAN_INDEXED: + assert mini_sentry.get_metrics() == [ + { + "metadata": mock.ANY, + "name": "c:spans/count_per_root_project@none", + "tags": { + "decision": "keep", + "target_project_id": "42", + }, + "timestamp": time_within_delta(), + "type": "c", + "value": 1.0, + "width": 1, + }, + { + "metadata": mock.ANY, + "name": "c:spans/usage@none", + "timestamp": time_within_delta(), + "type": "c", + "value": 1.0, + "width": 1, + }, + ] + + assert mini_sentry.captured_events.empty() + assert mini_sentry.captured_outcomes.empty() + + def test_spansv2_ds_sampled( mini_sentry, relay, From 809aa1df6b284df3b9d3963d7b2b95cfd48b0508 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 10 Dec 2025 15:54:43 +0100 Subject: [PATCH 2/3] fix span rate limiting and attachments test --- relay-server/src/processing/spans/mod.rs | 33 ++++++++++++------------ tests/integration/test_attachmentsv2.py | 29 ++++++++++++++++----- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/relay-server/src/processing/spans/mod.rs b/relay-server/src/processing/spans/mod.rs index b04b6ca53f..de909f77d4 100644 --- a/relay-server/src/processing/spans/mod.rs +++ b/relay-server/src/processing/spans/mod.rs @@ -559,24 +559,23 @@ impl RateLimited for Managed> { attachment_item, } = self.span_quantities(); - if span > 0 { - let limits = rate_limiter - .try_consume(scoping.item(DataCategory::Span), span) - .await; - if !limits.is_empty() { - // If there is a span quota reject all the spans and the associated attachments. - return Err(self.reject_err(Error::from(limits))); - } + // Always check span limits, all items depend on spans. + let limits = rate_limiter + .try_consume(scoping.item(DataCategory::Span), span) + .await; + if !limits.is_empty() { + // If there is a span quota reject all the spans and the associated attachments. + return Err(self.reject_err(Error::from(limits))); + } - let limits = rate_limiter - .try_consume(scoping.item(DataCategory::SpanIndexed), span) - .await; - if !limits.is_empty() { - // If there is an indexed span quota reject all the spans and the associated attachments, - // but keep the total counts. - let total = dynamic_sampling::reject_indexed_spans(self, limits.into()); - return Ok(total.map(|total, _| Either::Right(total))); - } + let limits = rate_limiter + .try_consume(scoping.item(DataCategory::SpanIndexed), span) + .await; + if !limits.is_empty() { + // If there is an indexed span quota reject all the spans and the associated attachments, + // but keep the total counts. + let total = dynamic_sampling::reject_indexed_spans(self, limits.into()); + return Ok(total.map(|total, _| Either::Right(total))); } if attachment > 0 { diff --git a/tests/integration/test_attachmentsv2.py b/tests/integration/test_attachmentsv2.py index 1a5f7dacd9..35cae78406 100644 --- a/tests/integration/test_attachmentsv2.py +++ b/tests/integration/test_attachmentsv2.py @@ -1019,6 +1019,27 @@ def test_attachment_dropped_with_invalid_spans(mini_sentry, relay): @pytest.mark.parametrize( "quota_config,expected_outcomes", [ + # TODO: https://github.com/getsentry/relay/issues/5469 + # pytest.param( + # [ + # { + # "categories": ["span"], + # "limit": 0, + # "window": 3600, + # "id": "span_limit", + # "reasonCode": "span_quota_exceeded", + # } + # ], + # { + # # Rate limit spans + # (DataCategory.SPAN.value, 2): 1, + # (DataCategory.SPAN_INDEXED.value, 2): 1, + # # Rate limit associated span attachments + # (DataCategory.ATTACHMENT.value, 2): 64, + # (DataCategory.ATTACHMENT_ITEM.value, 2): 2, + # }, + # id="span_quota_exceeded", + # ), pytest.param( [ { @@ -1030,14 +1051,11 @@ def test_attachment_dropped_with_invalid_spans(mini_sentry, relay): } ], { - # Rate limit spans - (DataCategory.SPAN.value, 2): 1, (DataCategory.SPAN_INDEXED.value, 2): 1, - # Rate limit associated span attachments (DataCategory.ATTACHMENT.value, 2): 64, (DataCategory.ATTACHMENT_ITEM.value, 2): 2, }, - id="span_quota_exceeded", + id="span_indexed_quota_exceeded", ), pytest.param( [ @@ -1059,7 +1077,7 @@ def test_attachment_dropped_with_invalid_spans(mini_sentry, relay): pytest.param( [ { - "categories": ["span_indexed"], + "categories": ["span"], "limit": 0, "window": 3600, "id": "span_limit", @@ -1194,7 +1212,6 @@ def test_span_attachment_independent_rate_limiting( outcome_counter[key] += outcome["quantity"] assert outcome_counter == expected_outcomes - assert mini_sentry.captured_outcomes.empty() From 88f1d5c2d9ad09ceddf8e0cfc7567bd593274030 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 12 Dec 2025 11:42:18 +0100 Subject: [PATCH 3/3] jajaja change in masta --- tests/integration/test_spansv2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_spansv2.py b/tests/integration/test_spansv2.py index 092854421c..d195c80375 100644 --- a/tests/integration/test_spansv2.py +++ b/tests/integration/test_spansv2.py @@ -365,7 +365,7 @@ def test_spansv2_rate_limits(mini_sentry, relay, rate_limit): }, ] - assert mini_sentry.captured_events.empty() + assert mini_sentry.captured_envelopes.empty() assert mini_sentry.captured_outcomes.empty()