Skip to content

Commit 3a5ff60

Browse files
committed
fix(spanv2): Indexed rate limits should not drop metric payloads
1 parent ca21000 commit 3a5ff60

File tree

3 files changed

+145
-17
lines changed

3 files changed

+145
-17
lines changed

relay-server/src/processing/spans/dynamic_sampling.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,20 @@ pub async fn run(
144144
Err(metrics)
145145
}
146146

147-
/// Type returned by [`create_indexed_metrics`].
147+
/// Rejects the indexed portion of the provided spans and returns the total count as metrics.
148+
///
149+
/// This is used when the indexed payload is designated to be dropped *after* dynamic sampling (decision is keep),
150+
/// but metrics have not yet been extracted.
151+
pub fn reject_indexed_spans(
152+
spans: Managed<ExpandedSpans>,
153+
error: Error,
154+
) -> Managed<ExtractedMetrics> {
155+
let (indexed, total) = split_indexed_and_total(spans);
156+
let _ = indexed.reject_err(error);
157+
total
158+
}
159+
160+
/// Type returned by [`try_split_indexed_and_total`].
148161
///
149162
/// Contains the indexed spans and the metrics extracted from the spans.
150163
type SpansAndMetrics = (Managed<ExpandedSpans<Indexed>>, Managed<ExtractedMetrics>);
@@ -153,16 +166,25 @@ type SpansAndMetrics = (Managed<ExpandedSpans<Indexed>>, Managed<ExtractedMetric
153166
///
154167
/// Indexed metrics can only be extracted from the Relay making the final sampling decision,
155168
/// if the current Relay is not the final Relay, the function returns the original spans unchanged.
156-
pub fn create_indexed_metrics(
169+
pub fn try_split_indexed_and_total(
157170
spans: Managed<ExpandedSpans>,
158171
ctx: Context<'_>,
159172
) -> Either<Managed<ExpandedSpans>, SpansAndMetrics> {
160173
if !ctx.is_processing() {
161174
return Either::Left(spans);
162175
}
163176

177+
Either::Right(split_indexed_and_total(spans))
178+
}
179+
180+
/// Splits spans into indexed spans and metrics representing the total counts.
181+
///
182+
/// Dynamic sampling internal function, outside users should use the safer, use case driven variants
183+
/// [`try_split_indexed_and_total`] and [`reject_indexed_spans`].
184+
fn split_indexed_and_total(spans: Managed<ExpandedSpans>) -> SpansAndMetrics {
164185
let scoping = spans.scoping();
165-
let (indexed, metrics) = spans.split_once(|spans| {
186+
187+
spans.split_once(|spans| {
166188
let metrics = create_metrics(
167189
scoping,
168190
spans.spans.len() as u32,
@@ -171,9 +193,7 @@ pub fn create_indexed_metrics(
171193
);
172194

173195
(spans.into_indexed(), metrics)
174-
});
175-
176-
Either::Right((indexed, metrics))
196+
})
177197
}
178198

179199
async fn compute(spans: &Managed<SerializedSpans>, ctx: Context<'_>) -> SamplingResult {

relay-server/src/processing/spans/mod.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::integrations::Integration;
1818
use crate::managed::{
1919
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
2020
};
21+
use crate::metrics_extraction::transactions::ExtractedMetrics;
2122
use crate::processing::{self, Context, Forward, Output, QuotaRateLimiter, RateLimited};
2223
use crate::services::outcome::{DiscardReason, Outcome};
2324

@@ -173,17 +174,21 @@ impl processing::Processor for SpansProcessor {
173174
process::normalize(&mut spans, &self.geo_lookup, ctx);
174175
filter::filter(&mut spans, ctx);
175176

176-
let mut spans = self.limiter.enforce_quotas(spans, ctx).await?;
177+
let spans = self.limiter.enforce_quotas(spans, ctx).await?;
178+
let mut spans = match spans.transpose() {
179+
Either::Left(spans) => spans,
180+
Either::Right(metrics) => return Ok(Output::metrics(metrics)),
181+
};
177182

178183
process::scrub(&mut spans, ctx);
179184

180-
Ok(match dynamic_sampling::create_indexed_metrics(spans, ctx) {
181-
Either::Left(spans) => Output::just(SpanOutput::TotalAndIndexed(spans)),
182-
Either::Right((spans, metrics)) => Output {
185+
match dynamic_sampling::try_split_indexed_and_total(spans, ctx) {
186+
Either::Left(spans) => Ok(Output::just(SpanOutput::TotalAndIndexed(spans))),
187+
Either::Right((spans, metrics)) => Ok(Output {
183188
main: Some(SpanOutput::Indexed(spans)),
184189
metrics: Some(metrics),
185-
},
186-
})
190+
}),
191+
}
187192
}
188193
}
189194

@@ -536,14 +541,14 @@ impl Counted for ExpandedSpans<Indexed> {
536541
}
537542

538543
impl RateLimited for Managed<ExpandedSpans<TotalAndIndexed>> {
539-
type Output = Self;
544+
type Output = Managed<Either<ExpandedSpans<TotalAndIndexed>, ExtractedMetrics>>;
540545
type Error = Error;
541546

542547
async fn enforce<R>(
543548
mut self,
544549
mut rate_limiter: R,
545550
_: Context<'_>,
546-
) -> Result<Self, Rejected<Self::Error>>
551+
) -> Result<Self::Output, Rejected<Self::Error>>
547552
where
548553
R: processing::RateLimiter,
549554
{
@@ -568,8 +573,10 @@ impl RateLimited for Managed<ExpandedSpans<TotalAndIndexed>> {
568573
.try_consume(scoping.item(DataCategory::SpanIndexed), span)
569574
.await;
570575
if !limits.is_empty() {
571-
// If there is a span quota reject all the spans and the associated attachments.
572-
return Err(self.reject_err(Error::from(limits)));
576+
// If there is an indexed span quota reject all the spans and the associated attachments,
577+
// but keep the total counts.
578+
let total = dynamic_sampling::reject_indexed_spans(self, limits.into());
579+
return Ok(total.map(|total, _| Either::Right(total)));
573580
}
574581
}
575582

@@ -593,7 +600,7 @@ impl RateLimited for Managed<ExpandedSpans<TotalAndIndexed>> {
593600
.await;
594601
}
595602

596-
Ok(self)
603+
Ok(self.map(|s, _| Either::Left(s)))
597604
}
598605
}
599606

tests/integration/test_spansv2.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from .test_dynamic_sampling import _add_sampling_config
1010

11+
import uuid
1112
import json
1213
import pytest
1314

@@ -268,6 +269,106 @@ def test_spansv2_ds_drop(mini_sentry, relay, rule_type):
268269
assert mini_sentry.captured_outcomes.empty()
269270

270271

272+
@pytest.mark.parametrize("rate_limit", [DataCategory.SPAN, DataCategory.SPAN_INDEXED])
273+
def test_spansv2_rate_limits(mini_sentry, relay, rate_limit):
274+
"""
275+
The test asserts that dynamic sampling correctly drops items, based on different rule types
276+
and makes sure the correct outcomes and metrics are emitted.
277+
"""
278+
project_id = 42
279+
project_config = mini_sentry.add_full_project_config(project_id)
280+
project_config["config"]["features"] = [
281+
"organizations:standalone-span-ingestion",
282+
"projects:span-v2-experimental-processing",
283+
]
284+
285+
ts = datetime.now(timezone.utc)
286+
287+
project_config["config"]["quotas"] = [
288+
{
289+
"categories": [rate_limit.name.lower()],
290+
"limit": 0,
291+
"window": int(ts.timestamp()),
292+
"id": uuid.uuid4(),
293+
"reasonCode": "rate_limit_exceeded",
294+
}
295+
]
296+
297+
relay = relay(mini_sentry, options=TEST_CONFIG)
298+
299+
envelope = envelope_with_spans(
300+
{
301+
"start_timestamp": ts.timestamp(),
302+
"end_timestamp": ts.timestamp() + 0.5,
303+
"trace_id": "5b8efff798038103d269b633813fc60c",
304+
"span_id": "eee19b7ec3c1b175",
305+
"is_segment": True,
306+
"name": "some op",
307+
"status": "ok",
308+
},
309+
trace_info={
310+
"trace_id": "5b8efff798038103d269b633813fc60c",
311+
"public_key": project_config["publicKeys"][0]["publicKey"],
312+
},
313+
)
314+
315+
relay.send_envelope(project_id, envelope)
316+
317+
assert mini_sentry.get_aggregated_outcomes() == [
318+
*(
319+
[
320+
{
321+
"category": 12,
322+
"key_id": 123,
323+
"org_id": 1,
324+
"outcome": 2,
325+
"project_id": 42,
326+
"quantity": 1,
327+
"reason": "rate_limit_exceeded",
328+
}
329+
]
330+
if rate_limit == DataCategory.SPAN
331+
else []
332+
),
333+
{
334+
"category": DataCategory.SPAN_INDEXED.value,
335+
"key_id": 123,
336+
"org_id": 1,
337+
"outcome": 2,
338+
"project_id": 42,
339+
"quantity": 1,
340+
"reason": "rate_limit_exceeded",
341+
},
342+
]
343+
344+
if rate_limit == DataCategory.SPAN_INDEXED:
345+
assert mini_sentry.get_metrics() == [
346+
{
347+
"metadata": mock.ANY,
348+
"name": "c:spans/count_per_root_project@none",
349+
"tags": {
350+
"decision": "keep",
351+
"target_project_id": "42",
352+
},
353+
"timestamp": time_within_delta(),
354+
"type": "c",
355+
"value": 1.0,
356+
"width": 1,
357+
},
358+
{
359+
"metadata": mock.ANY,
360+
"name": "c:spans/usage@none",
361+
"timestamp": time_within_delta(),
362+
"type": "c",
363+
"value": 1.0,
364+
"width": 1,
365+
},
366+
]
367+
368+
assert mini_sentry.captured_events.empty()
369+
assert mini_sentry.captured_outcomes.empty()
370+
371+
271372
def test_spansv2_ds_sampled(
272373
mini_sentry,
273374
relay,

0 commit comments

Comments
 (0)