Skip to content

Commit ca21000

Browse files
authored
ref(processing): Allow rate limiting to change the type (#5463)
1 parent a70c0c5 commit ca21000

File tree

7 files changed

+37
-32
lines changed

7 files changed

+37
-32
lines changed

relay-server/src/processing/check_ins/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl processing::Processor for CheckInsProcessor {
8787
process::normalize(&mut check_ins);
8888
}
8989

90-
self.limiter.enforce_quotas(&mut check_ins, ctx).await?;
90+
let check_ins = self.limiter.enforce_quotas(check_ins, ctx).await?;
9191

9292
Ok(Output::just(CheckInsOutput(check_ins)))
9393
}

relay-server/src/processing/limits.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ impl QuotaRateLimiter {
4444
/// Enforces quotas for the passed item.
4545
pub async fn enforce_quotas<T>(
4646
&self,
47-
data: &mut Managed<T>,
47+
data: Managed<T>,
4848
ctx: Context<'_>,
49-
) -> Result<(), Rejected<<Managed<T> as RateLimited>::Error>>
49+
) -> Result<<Managed<T> as RateLimited>::Output, Rejected<<Managed<T> as RateLimited>::Error>>
5050
where
5151
T: Counted,
5252
Managed<T>: RateLimited,
@@ -112,20 +112,22 @@ pub trait RateLimiter {
112112
///
113113
/// A [`RateLimiter`] is usually created by the [`QuotaRateLimiter`].
114114
pub trait RateLimited {
115+
/// The new item returned after (partially) accepting the item.
116+
type Output;
115117
/// Error returned when rejecting the entire item.
116118
type Error;
117119

118120
/// Enforce quotas and rate limits using the passed [`RateLimiter`].
119121
///
120122
/// The implementation must check the quotas and already discard the necessary items
121123
/// as well as emit the correct outcomes.
122-
async fn enforce<T>(
123-
&mut self,
124-
rate_limiter: T,
124+
async fn enforce<R>(
125+
self,
126+
rate_limiter: R,
125127
ctx: Context<'_>,
126-
) -> Result<(), Rejected<Self::Error>>
128+
) -> Result<Self::Output, Rejected<Self::Error>>
127129
where
128-
T: RateLimiter;
130+
R: RateLimiter;
129131
}
130132

131133
impl<T> RateLimiter for Option<T>
@@ -153,13 +155,14 @@ where
153155
Managed<T>: CountRateLimited,
154156
T: Counted,
155157
{
158+
type Output = Self;
156159
type Error = <<Managed<T> as CountRateLimited>::Error as OutcomeError>::Error;
157160

158161
async fn enforce<R>(
159-
&mut self,
162+
self,
160163
mut rate_limiter: R,
161164
_ctx: Context<'_>,
162-
) -> Result<(), Rejected<Self::Error>>
165+
) -> Result<Self::Output, Rejected<Self::Error>>
163166
where
164167
R: RateLimiter,
165168
{
@@ -176,7 +179,7 @@ where
176179
}
177180
}
178181

179-
Ok(())
182+
Ok(self)
180183
}
181184
}
182185

relay-server/src/processing/logs/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ impl processing::Processor for LogsProcessor {
156156
process::normalize(&mut logs);
157157
filter::filter(&mut logs, ctx);
158158

159-
self.limiter.enforce_quotas(&mut logs, ctx).await?;
159+
let mut logs = self.limiter.enforce_quotas(logs, ctx).await?;
160160

161161
process::scrub(&mut logs, ctx);
162162

relay-server/src/processing/sessions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl processing::Processor for SessionsProcessor {
101101

102102
process::normalize(&mut sessions, ctx);
103103

104-
self.limiter.enforce_quotas(&mut sessions, ctx).await?;
104+
let sessions = self.limiter.enforce_quotas(sessions, ctx).await?;
105105

106106
let sessions = process::extract(sessions, ctx);
107107
Ok(Output::metrics(sessions))

relay-server/src/processing/spans/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ impl processing::Processor for SpansProcessor {
173173
process::normalize(&mut spans, &self.geo_lookup, ctx);
174174
filter::filter(&mut spans, ctx);
175175

176-
self.limiter.enforce_quotas(&mut spans, ctx).await?;
176+
let mut spans = self.limiter.enforce_quotas(spans, ctx).await?;
177177

178178
process::scrub(&mut spans, ctx);
179179

@@ -536,15 +536,16 @@ impl Counted for ExpandedSpans<Indexed> {
536536
}
537537

538538
impl RateLimited for Managed<ExpandedSpans<TotalAndIndexed>> {
539+
type Output = Self;
539540
type Error = Error;
540541

541-
async fn enforce<T>(
542-
&mut self,
543-
mut rate_limiter: T,
542+
async fn enforce<R>(
543+
mut self,
544+
mut rate_limiter: R,
544545
_: Context<'_>,
545-
) -> std::result::Result<(), Rejected<Self::Error>>
546+
) -> Result<Self, Rejected<Self::Error>>
546547
where
547-
T: processing::RateLimiter,
548+
R: processing::RateLimiter,
548549
{
549550
let scoping = self.scoping();
550551

@@ -592,7 +593,7 @@ impl RateLimited for Managed<ExpandedSpans<TotalAndIndexed>> {
592593
.await;
593594
}
594595

595-
Ok(())
596+
Ok(self)
596597
}
597598
}
598599

relay-server/src/processing/trace_metrics/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl processing::Processor for TraceMetricsProcessor {
137137
filter::filter(&mut metrics, ctx);
138138
process::scrub(&mut metrics, ctx);
139139

140-
self.limiter.enforce_quotas(&mut metrics, ctx).await?;
140+
let metrics = self.limiter.enforce_quotas(metrics, ctx).await?;
141141

142142
Ok(Output::just(TraceMetricOutput(metrics)))
143143
}

relay-server/src/processing/transactions/mod.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,11 @@ impl Processor for TransactionProcessor {
167167
process::extract_metrics(work, ctx, SamplingDecision::Drop)?;
168168

169169
let headers = work.headers.clone();
170-
let mut profile = process::drop_after_sampling(work, ctx, outcome);
171-
if let Some(profile) = profile.as_mut() {
172-
self.limiter.enforce_quotas(profile, ctx).await?;
173-
}
170+
let profile = process::drop_after_sampling(work, ctx, outcome);
171+
let profile = match profile {
172+
Some(profile) => self.limiter.enforce_quotas(profile, ctx).await.ok(),
173+
None => None,
174+
};
174175

175176
return Ok(Output {
176177
main: profile.map(TransactionOutput::OnlyProfile),
@@ -189,9 +190,8 @@ impl Processor for TransactionProcessor {
189190
let (indexed, extracted_metrics) =
190191
process::extract_metrics(work, ctx, SamplingDecision::Keep)?;
191192

192-
let mut indexed = process::extract_spans(indexed, ctx, server_sample_rate);
193-
194-
self.limiter.enforce_quotas(&mut indexed, ctx).await?;
193+
let indexed = process::extract_spans(indexed, ctx, server_sample_rate);
194+
let indexed = self.limiter.enforce_quotas(indexed, ctx).await?;
195195

196196
if !indexed.flags.fully_normalized {
197197
relay_log::error!(
@@ -207,7 +207,7 @@ impl Processor for TransactionProcessor {
207207
});
208208
}
209209

210-
self.limiter.enforce_quotas(&mut work, ctx).await?;
210+
let work = self.limiter.enforce_quotas(work, ctx).await?;
211211

212212
Ok(Output {
213213
main: Some(TransactionOutput::Full(work)),
@@ -367,13 +367,14 @@ impl<T: Counted + AsRef<Annotated<Event>>> Counted for ExpandedTransaction<T> {
367367
}
368368

369369
impl<T: Counted + AsRef<Annotated<Event>>> RateLimited for Managed<ExpandedTransaction<T>> {
370+
type Output = Self;
370371
type Error = Error;
371372

372373
async fn enforce<R>(
373-
&mut self,
374+
mut self,
374375
mut rate_limiter: R,
375376
ctx: Context<'_>,
376-
) -> Result<(), Rejected<Self::Error>>
377+
) -> Result<Self, Rejected<Self::Error>>
377378
where
378379
R: RateLimiter,
379380
{
@@ -446,7 +447,7 @@ impl<T: Counted + AsRef<Annotated<Event>>> RateLimited for Managed<ExpandedTrans
446447
}
447448
}
448449

449-
Ok(())
450+
Ok(self)
450451
}
451452
}
452453

0 commit comments

Comments
 (0)