Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 2025-10-18 - Optimizing Error Recovery in Rust Pipelines
**Learning:** Preemptive cloning for error recovery (e.g., `let backup = candidates.clone()`) is a major performance bottleneck in hot paths.
**Action:** Redesign traits to return ownership of the data on failure (e.g., `Result<Success, (Error, Data)>`), allowing zero-cost error recovery without cloning.
79 changes: 76 additions & 3 deletions candidate-pipeline/candidate_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,20 @@ where
let request_id = query.request_id().to_string();
let mut all_removed = Vec::new();
for filter in filters.iter().filter(|f| f.enable(query)) {
let backup = candidates.clone();
match filter.filter(query, candidates).await {
Ok(result) => {
candidates = result.kept;
all_removed.extend(result.removed);
}
Err(err) => {
Err((err, original_candidates)) => {
error!(
"request_id={} stage={:?} component={} failed: {}",
request_id,
stage,
filter.name(),
err
);
candidates = backup;
candidates = original_candidates;
}
}
}
Expand Down Expand Up @@ -327,3 +326,77 @@ where
});
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::filter::FilterResult;

#[derive(Clone, Debug)]
struct TestQuery;
impl HasRequestId for TestQuery {
fn request_id(&self) -> &str {
"test_req"
}
}

#[derive(Clone, Debug, PartialEq)]
struct TestCandidate(i32);

struct FailingFilter;
#[async_trait]
impl Filter<TestQuery, TestCandidate> for FailingFilter {
async fn filter(
&self,
_query: &TestQuery,
candidates: Vec<TestCandidate>,
) -> Result<FilterResult<TestCandidate>, (String, Vec<TestCandidate>)> {
// Simulate failure and return ownership
Err(("Failure simulated".to_string(), candidates))
}
}

struct TestPipeline {
filters: Vec<Box<dyn Filter<TestQuery, TestCandidate>>>,
// other fields empty/mocked
}

// Implement minimal mocks for other traits
struct MockSelector;
impl Selector<TestQuery, TestCandidate> for MockSelector {
fn select(&self, _q: &TestQuery, c: Vec<TestCandidate>) -> Vec<TestCandidate> { c }
}

#[async_trait]
impl CandidatePipeline<TestQuery, TestCandidate> for TestPipeline {
fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<TestQuery>>] { &[] }
fn sources(&self) -> &[Box<dyn Source<TestQuery, TestCandidate>>] { &[] }
fn hydrators(&self) -> &[Box<dyn Hydrator<TestQuery, TestCandidate>>] { &[] }
fn filters(&self) -> &[Box<dyn Filter<TestQuery, TestCandidate>>] { &self.filters }
fn scorers(&self) -> &[Box<dyn Scorer<TestQuery, TestCandidate>>] { &[] }
fn selector(&self) -> &dyn Selector<TestQuery, TestCandidate> { &MockSelector }
fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<TestQuery, TestCandidate>>] { &[] }
fn post_selection_filters(&self) -> &[Box<dyn Filter<TestQuery, TestCandidate>>] { &[] }
fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<TestQuery, TestCandidate>>>> { Arc::new(vec![]) }
fn result_size(&self) -> usize { 10 }
}

#[tokio::test]
async fn test_filter_error_recovery() {
let pipeline = TestPipeline {
filters: vec![Box::new(FailingFilter)],
};
let query = TestQuery;
let candidates = vec![TestCandidate(1), TestCandidate(2)];

// Run filters directly to test the logic
let (kept, _) = pipeline.run_filters(&query, candidates.clone(), pipeline.filters(), PipelineStage::Filter).await;

// Verify that despite the error, candidates were restored and returned (as if filtered, but here we just want to ensure we didn't crash and got data back)
// In the implementation, if error occurs, candidates = original_candidates.
// And then loop continues. Since there are no more filters, these candidates are returned.
assert_eq!(kept.len(), 2);
assert_eq!(kept[0], TestCandidate(1));
assert_eq!(kept[1], TestCandidate(2));
}
}
3 changes: 2 additions & 1 deletion candidate-pipeline/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ where
/// Filter candidates by evaluating each against some criteria.
/// Returns a FilterResult containing kept candidates (which continue to the next stage)
/// and removed candidates (which are excluded from further processing).
async fn filter(&self, query: &Q, candidates: Vec<C>) -> Result<FilterResult<C>, String>;
/// On error, returns a tuple of the error message and the original candidates for recovery.
async fn filter(&self, query: &Q, candidates: Vec<C>) -> Result<FilterResult<C>, (String, Vec<C>)>;

/// Returns a stable name for logging/metrics.
fn name(&self) -> &'static str {
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/age_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for AgeFilter {
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let (kept, removed): (Vec<_>, Vec<_>) = candidates
.into_iter()
.partition(|c| self.is_within_age(c.tweet_id));
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/author_socialgraph_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for AuthorSocialgraphFilter {
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let viewer_blocked_user_ids = query.user_features.blocked_user_ids.clone();
let viewer_muted_user_ids = query.user_features.muted_user_ids.clone();

Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/core_data_hydration_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for CoreDataHydrationFilter {
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let (kept, removed) = candidates
.into_iter()
.partition(|c| c.author_id != 0 && !c.tweet_text.trim().is_empty());
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/dedup_conversation_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for DedupConversationFilter {
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let mut kept: Vec<PostCandidate> = Vec::new();
let mut removed: Vec<PostCandidate> = Vec::new();
let mut best_per_convo: HashMap<u64, (usize, f64)> = HashMap::new();
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/drop_duplicates_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for DropDuplicatesFilter {
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let mut seen_ids = HashSet::new();
let mut kept = Vec::new();
let mut removed = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/ineligible_subscription_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for IneligibleSubscriptionFilter {
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let subscribed_user_ids: HashSet<u64> = query
.user_features
.subscribed_user_ids
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/muted_keyword_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for MutedKeywordFilter {
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let muted_keywords = query.user_features.muted_keywords.clone();

if muted_keywords.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/previously_seen_posts_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for PreviouslySeenPostsFilter {
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let bloom_filters = query
.bloom_filter_entries
.iter()
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/previously_served_posts_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for PreviouslyServedPostsFilter {
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let (removed, kept): (Vec<_>, Vec<_>) = candidates.into_iter().partition(|c| {
get_related_post_ids(c)
.iter()
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/retweet_deduplication_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for RetweetDeduplicationFilter {
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let mut seen_tweet_ids: HashSet<u64> = HashSet::new();
let mut kept = Vec::new();
let mut removed = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/self_tweet_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for SelfTweetFilter {
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let viewer_id = query.user_id as u64;
let (kept, removed): (Vec<_>, Vec<_>) = candidates
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion home-mixer/filters/vf_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl Filter<ScoredPostsQuery, PostCandidate> for VFFilter {
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> Result<FilterResult<PostCandidate>, String> {
) -> Result<FilterResult<PostCandidate>, (String, Vec<PostCandidate>)> {
let (removed, kept): (Vec<_>, Vec<_>) = candidates
.into_iter()
.partition(|c| should_drop(&c.visibility_reason));
Expand Down