Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions relay-server/src/processing/spans/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,20 @@ pub async fn run(
Err(metrics)
}

/// Type returned by [`create_indexed_metrics`].
/// Rejects the indexed portion of the provided spans and returns the total count as metrics.
///
/// This is used when the indexed payload is designated to be dropped *after* dynamic sampling (decision is keep),
/// but metrics have not yet been extracted.
pub fn reject_indexed_spans(
spans: Managed<ExpandedSpans>,
error: Error,
) -> Managed<ExtractedMetrics> {
let (indexed, total) = split_indexed_and_total(spans);
let _ = indexed.reject_err(error);
total
}

/// Type returned by [`try_split_indexed_and_total`].
///
/// Contains the indexed spans and the metrics extracted from the spans.
type SpansAndMetrics = (Managed<ExpandedSpans<Indexed>>, Managed<ExtractedMetrics>);
Expand All @@ -150,16 +163,25 @@ type SpansAndMetrics = (Managed<ExpandedSpans<Indexed>>, Managed<ExtractedMetric
///
/// Indexed metrics can only be extracted from the Relay making the final sampling decision,
/// if the current Relay is not the final Relay, the function returns the original spans unchanged.
pub fn create_indexed_metrics(
pub fn try_split_indexed_and_total(
spans: Managed<ExpandedSpans>,
ctx: Context<'_>,
) -> Either<Managed<ExpandedSpans>, SpansAndMetrics> {
if !ctx.is_processing() {
return Either::Left(spans);
}

Either::Right(split_indexed_and_total(spans))
}

/// Splits spans into indexed spans and metrics representing the total counts.
///
/// Dynamic sampling internal function, outside users should use the safer, use case driven variants
/// [`try_split_indexed_and_total`] and [`reject_indexed_spans`].
fn split_indexed_and_total(spans: Managed<ExpandedSpans>) -> SpansAndMetrics {
let scoping = spans.scoping();
let (indexed, metrics) = spans.split_once(|spans| {

spans.split_once(|spans| {
let metrics = create_metrics(
scoping,
spans.spans.len() as u32,
Expand All @@ -168,9 +190,7 @@ pub fn create_indexed_metrics(
);

(spans.into_indexed(), metrics)
});

Either::Right((indexed, metrics))
})
}

async fn compute(spans: &Managed<SerializedSpans>, ctx: Context<'_>) -> SamplingResult {
Expand Down
54 changes: 30 additions & 24 deletions relay-server/src/processing/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::integrations::Integration;
use crate::managed::{
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
};
use crate::metrics_extraction::transactions::ExtractedMetrics;
use crate::processing::trace_attachments::forward::attachment_to_item;
use crate::processing::trace_attachments::process::ScrubAttachmentError;
use crate::processing::trace_attachments::types::ExpandedAttachment;
Expand Down Expand Up @@ -181,17 +182,21 @@ impl processing::Processor for SpansProcessor {
process::normalize(&mut spans, &self.geo_lookup, ctx);
filter::filter(&mut spans, ctx);

let mut spans = self.limiter.enforce_quotas(spans, ctx).await?;
let spans = self.limiter.enforce_quotas(spans, ctx).await?;
let mut spans = match spans.transpose() {
Either::Left(spans) => spans,
Either::Right(metrics) => return Ok(Output::metrics(metrics)),
};

process::scrub(&mut spans, ctx);

Ok(match dynamic_sampling::create_indexed_metrics(spans, ctx) {
Either::Left(spans) => Output::just(SpanOutput::TotalAndIndexed(spans)),
Either::Right((spans, metrics)) => Output {
match dynamic_sampling::try_split_indexed_and_total(spans, ctx) {
Either::Left(spans) => Ok(Output::just(SpanOutput::TotalAndIndexed(spans))),
Either::Right((spans, metrics)) => Ok(Output {
main: Some(SpanOutput::Indexed(spans)),
metrics: Some(metrics),
},
})
}),
}
}
}

Expand Down Expand Up @@ -535,14 +540,14 @@ impl Counted for ExpandedSpans<Indexed> {
}

impl RateLimited for Managed<ExpandedSpans<TotalAndIndexed>> {
type Output = Self;
type Output = Managed<Either<ExpandedSpans<TotalAndIndexed>, ExtractedMetrics>>;
type Error = Error;

async fn enforce<R>(
mut self,
mut rate_limiter: R,
_: Context<'_>,
) -> Result<Self, Rejected<Self::Error>>
) -> Result<Self::Output, Rejected<Self::Error>>
where
R: processing::RateLimiter,
{
Expand All @@ -554,22 +559,23 @@ impl RateLimited for Managed<ExpandedSpans<TotalAndIndexed>> {
attachment_item,
} = self.span_quantities();

if span > 0 {
let limits = rate_limiter
.try_consume(scoping.item(DataCategory::Span), span)
.await;
if !limits.is_empty() {
// If there is a span quota reject all the spans and the associated attachments.
return Err(self.reject_err(Error::from(limits)));
}
// Always check span limits, all items depend on spans.
let limits = rate_limiter
.try_consume(scoping.item(DataCategory::Span), span)
.await;
if !limits.is_empty() {
// If there is a span quota reject all the spans and the associated attachments.
return Err(self.reject_err(Error::from(limits)));
}

let limits = rate_limiter
.try_consume(scoping.item(DataCategory::SpanIndexed), span)
.await;
if !limits.is_empty() {
// If there is a span quota reject all the spans and the associated attachments.
return Err(self.reject_err(Error::from(limits)));
}
let limits = rate_limiter
.try_consume(scoping.item(DataCategory::SpanIndexed), span)
.await;
if !limits.is_empty() {
// If there is an indexed span quota reject all the spans and the associated attachments,
// but keep the total counts.
let total = dynamic_sampling::reject_indexed_spans(self, limits.into());
return Ok(total.map(|total, _| Either::Right(total)));
}

if attachment > 0 {
Expand All @@ -592,7 +598,7 @@ impl RateLimited for Managed<ExpandedSpans<TotalAndIndexed>> {
.await;
}

Ok(self)
Ok(self.map(|s, _| Either::Left(s)))
}
}

Expand Down
29 changes: 23 additions & 6 deletions tests/integration/test_attachmentsv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,27 @@ def test_attachment_dropped_with_invalid_spans(mini_sentry, relay):
@pytest.mark.parametrize(
"quota_config,expected_outcomes",
[
# TODO: https://github.com/getsentry/relay/issues/5469
# pytest.param(
# [
# {
# "categories": ["span"],
# "limit": 0,
# "window": 3600,
# "id": "span_limit",
# "reasonCode": "span_quota_exceeded",
# }
# ],
# {
# # Rate limit spans
# (DataCategory.SPAN.value, 2): 1,
# (DataCategory.SPAN_INDEXED.value, 2): 1,
# # Rate limit associated span attachments
# (DataCategory.ATTACHMENT.value, 2): 64,
# (DataCategory.ATTACHMENT_ITEM.value, 2): 2,
# },
# id="span_quota_exceeded",
# ),
pytest.param(
[
{
Expand All @@ -1030,14 +1051,11 @@ def test_attachment_dropped_with_invalid_spans(mini_sentry, relay):
}
],
{
# Rate limit spans
(DataCategory.SPAN.value, 2): 1,
(DataCategory.SPAN_INDEXED.value, 2): 1,
# Rate limit associated span attachments
(DataCategory.ATTACHMENT.value, 2): 64,
(DataCategory.ATTACHMENT_ITEM.value, 2): 2,
},
id="span_quota_exceeded",
id="span_indexed_quota_exceeded",
),
pytest.param(
[
Expand All @@ -1059,7 +1077,7 @@ def test_attachment_dropped_with_invalid_spans(mini_sentry, relay):
pytest.param(
[
{
"categories": ["span_indexed"],
"categories": ["span"],
"limit": 0,
"window": 3600,
"id": "span_limit",
Expand Down Expand Up @@ -1194,7 +1212,6 @@ def test_span_attachment_independent_rate_limiting(
outcome_counter[key] += outcome["quantity"]

assert outcome_counter == expected_outcomes

assert mini_sentry.captured_outcomes.empty()


Expand Down
101 changes: 101 additions & 0 deletions tests/integration/test_spansv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from .test_dynamic_sampling import add_sampling_config

import uuid
import json
import pytest

Expand Down Expand Up @@ -268,6 +269,106 @@ def test_spansv2_ds_drop(mini_sentry, relay, rule_type):
assert mini_sentry.captured_outcomes.empty()


@pytest.mark.parametrize("rate_limit", [DataCategory.SPAN, DataCategory.SPAN_INDEXED])
def test_spansv2_rate_limits(mini_sentry, relay, rate_limit):
"""
The test asserts that dynamic sampling correctly drops items, based on different rule types
and makes sure the correct outcomes and metrics are emitted.
"""
project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"]["features"] = [
"organizations:standalone-span-ingestion",
"projects:span-v2-experimental-processing",
]

ts = datetime.now(timezone.utc)

project_config["config"]["quotas"] = [
{
"categories": [rate_limit.name.lower()],
"limit": 0,
"window": int(ts.timestamp()),
"id": uuid.uuid4(),
"reasonCode": "rate_limit_exceeded",
}
]

relay = relay(mini_sentry, options=TEST_CONFIG)

envelope = envelope_with_spans(
{
"start_timestamp": ts.timestamp(),
"end_timestamp": ts.timestamp() + 0.5,
"trace_id": "5b8efff798038103d269b633813fc60c",
"span_id": "eee19b7ec3c1b175",
"is_segment": True,
"name": "some op",
"status": "ok",
},
trace_info={
"trace_id": "5b8efff798038103d269b633813fc60c",
"public_key": project_config["publicKeys"][0]["publicKey"],
},
)

relay.send_envelope(project_id, envelope)

assert mini_sentry.get_aggregated_outcomes() == [
*(
[
{
"category": 12,
"key_id": 123,
"org_id": 1,
"outcome": 2,
"project_id": 42,
"quantity": 1,
"reason": "rate_limit_exceeded",
}
]
if rate_limit == DataCategory.SPAN
else []
),
{
"category": DataCategory.SPAN_INDEXED.value,
"key_id": 123,
"org_id": 1,
"outcome": 2,
"project_id": 42,
"quantity": 1,
"reason": "rate_limit_exceeded",
},
]

if rate_limit == DataCategory.SPAN_INDEXED:
assert mini_sentry.get_metrics() == [
{
"metadata": mock.ANY,
"name": "c:spans/count_per_root_project@none",
"tags": {
"decision": "keep",
"target_project_id": "42",
},
"timestamp": time_within_delta(),
"type": "c",
"value": 1.0,
"width": 1,
},
{
"metadata": mock.ANY,
"name": "c:spans/usage@none",
"timestamp": time_within_delta(),
"type": "c",
"value": 1.0,
"width": 1,
},
]

assert mini_sentry.captured_envelopes.empty()
assert mini_sentry.captured_outcomes.empty()


def test_spansv2_ds_sampled(
mini_sentry,
relay,
Expand Down
Loading