diff --git a/.jules/bolt.md b/.jules/bolt.md new file mode 100644 index 0000000..83a33b2 --- /dev/null +++ b/.jules/bolt.md @@ -0,0 +1,3 @@ +## 2025-10-18 - Optimizing Error Recovery in Rust Pipelines +**Learning:** Preemptive cloning for error recovery (e.g., `let backup = candidates.clone()`) is a major performance bottleneck in hot paths. +**Action:** Redesign traits to return ownership of the data on failure (e.g., `Result`), allowing zero-cost error recovery without cloning. diff --git a/candidate-pipeline/candidate_pipeline.rs b/candidate-pipeline/candidate_pipeline.rs index b55bb05..7159377 100644 --- a/candidate-pipeline/candidate_pipeline.rs +++ b/candidate-pipeline/candidate_pipeline.rs @@ -244,13 +244,12 @@ where let request_id = query.request_id().to_string(); let mut all_removed = Vec::new(); for filter in filters.iter().filter(|f| f.enable(query)) { - let backup = candidates.clone(); match filter.filter(query, candidates).await { Ok(result) => { candidates = result.kept; all_removed.extend(result.removed); } - Err(err) => { + Err((err, original_candidates)) => { error!( "request_id={} stage={:?} component={} failed: {}", request_id, @@ -258,7 +257,7 @@ where filter.name(), err ); - candidates = backup; + candidates = original_candidates; } } } @@ -327,3 +326,77 @@ where }); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::filter::FilterResult; + + #[derive(Clone, Debug)] + struct TestQuery; + impl HasRequestId for TestQuery { + fn request_id(&self) -> &str { + "test_req" + } + } + + #[derive(Clone, Debug, PartialEq)] + struct TestCandidate(i32); + + struct FailingFilter; + #[async_trait] + impl Filter for FailingFilter { + async fn filter( + &self, + _query: &TestQuery, + candidates: Vec, + ) -> Result, (String, Vec)> { + // Simulate failure and return ownership + Err(("Failure simulated".to_string(), candidates)) + } + } + + struct TestPipeline { + filters: Vec>>, + // other fields empty/mocked + } + + // Implement minimal mocks for other traits + struct MockSelector; + impl Selector for MockSelector { + fn select(&self, _q: &TestQuery, c: Vec) -> Vec { c } + } + + #[async_trait] + impl CandidatePipeline for TestPipeline { + fn query_hydrators(&self) -> &[Box>] { &[] } + fn sources(&self) -> &[Box>] { &[] } + fn hydrators(&self) -> &[Box>] { &[] } + fn filters(&self) -> &[Box>] { &self.filters } + fn scorers(&self) -> &[Box>] { &[] } + fn selector(&self) -> &dyn Selector { &MockSelector } + fn post_selection_hydrators(&self) -> &[Box>] { &[] } + fn post_selection_filters(&self) -> &[Box>] { &[] } + fn side_effects(&self) -> Arc>>> { Arc::new(vec![]) } + fn result_size(&self) -> usize { 10 } + } + + #[tokio::test] + async fn test_filter_error_recovery() { + let pipeline = TestPipeline { + filters: vec![Box::new(FailingFilter)], + }; + let query = TestQuery; + let candidates = vec![TestCandidate(1), TestCandidate(2)]; + + // Run filters directly to test the logic + let (kept, _) = pipeline.run_filters(&query, candidates.clone(), pipeline.filters(), PipelineStage::Filter).await; + + // Verify that despite the error, candidates were restored and returned (as if filtered, but here we just want to ensure we didn't crash and got data back) + // In the implementation, if error occurs, candidates = original_candidates. + // And then loop continues. Since there are no more filters, these candidates are returned. + assert_eq!(kept.len(), 2); + assert_eq!(kept[0], TestCandidate(1)); + assert_eq!(kept[1], TestCandidate(2)); + } +} diff --git a/candidate-pipeline/filter.rs b/candidate-pipeline/filter.rs index 516c780..32dd340 100644 --- a/candidate-pipeline/filter.rs +++ b/candidate-pipeline/filter.rs @@ -23,7 +23,8 @@ where /// Filter candidates by evaluating each against some criteria. /// Returns a FilterResult containing kept candidates (which continue to the next stage) /// and removed candidates (which are excluded from further processing). - async fn filter(&self, query: &Q, candidates: Vec) -> Result, String>; + /// On error, returns a tuple of the error message and the original candidates for recovery. + async fn filter(&self, query: &Q, candidates: Vec) -> Result, (String, Vec)>; /// Returns a stable name for logging/metrics. fn name(&self) -> &'static str { diff --git a/home-mixer/filters/age_filter.rs b/home-mixer/filters/age_filter.rs index 81bca79..7da2d3e 100644 --- a/home-mixer/filters/age_filter.rs +++ b/home-mixer/filters/age_filter.rs @@ -28,7 +28,7 @@ impl Filter for AgeFilter { &self, _query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let (kept, removed): (Vec<_>, Vec<_>) = candidates .into_iter() .partition(|c| self.is_within_age(c.tweet_id)); diff --git a/home-mixer/filters/author_socialgraph_filter.rs b/home-mixer/filters/author_socialgraph_filter.rs index 8b34e45..a84b315 100644 --- a/home-mixer/filters/author_socialgraph_filter.rs +++ b/home-mixer/filters/author_socialgraph_filter.rs @@ -12,7 +12,7 @@ impl Filter for AuthorSocialgraphFilter { &self, query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let viewer_blocked_user_ids = query.user_features.blocked_user_ids.clone(); let viewer_muted_user_ids = query.user_features.muted_user_ids.clone(); diff --git a/home-mixer/filters/core_data_hydration_filter.rs b/home-mixer/filters/core_data_hydration_filter.rs index 9e0d253..5b74df1 100644 --- a/home-mixer/filters/core_data_hydration_filter.rs +++ b/home-mixer/filters/core_data_hydration_filter.rs @@ -11,7 +11,7 @@ impl Filter for CoreDataHydrationFilter { &self, _query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let (kept, removed) = candidates .into_iter() .partition(|c| c.author_id != 0 && !c.tweet_text.trim().is_empty()); diff --git a/home-mixer/filters/dedup_conversation_filter.rs b/home-mixer/filters/dedup_conversation_filter.rs index f28c06b..6aac6f6 100644 --- a/home-mixer/filters/dedup_conversation_filter.rs +++ b/home-mixer/filters/dedup_conversation_filter.rs @@ -13,7 +13,7 @@ impl Filter for DedupConversationFilter { &self, _query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let mut kept: Vec = Vec::new(); let mut removed: Vec = Vec::new(); let mut best_per_convo: HashMap = HashMap::new(); diff --git a/home-mixer/filters/drop_duplicates_filter.rs b/home-mixer/filters/drop_duplicates_filter.rs index cfe5990..a455c43 100644 --- a/home-mixer/filters/drop_duplicates_filter.rs +++ b/home-mixer/filters/drop_duplicates_filter.rs @@ -12,7 +12,7 @@ impl Filter for DropDuplicatesFilter { &self, _query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let mut seen_ids = HashSet::new(); let mut kept = Vec::new(); let mut removed = Vec::new(); diff --git a/home-mixer/filters/ineligible_subscription_filter.rs b/home-mixer/filters/ineligible_subscription_filter.rs index f5a84e0..c8338a7 100644 --- a/home-mixer/filters/ineligible_subscription_filter.rs +++ b/home-mixer/filters/ineligible_subscription_filter.rs @@ -13,7 +13,7 @@ impl Filter for IneligibleSubscriptionFilter { &self, query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let subscribed_user_ids: HashSet = query .user_features .subscribed_user_ids diff --git a/home-mixer/filters/muted_keyword_filter.rs b/home-mixer/filters/muted_keyword_filter.rs index 7db116c..07bd17f 100644 --- a/home-mixer/filters/muted_keyword_filter.rs +++ b/home-mixer/filters/muted_keyword_filter.rs @@ -25,7 +25,7 @@ impl Filter for MutedKeywordFilter { &self, query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let muted_keywords = query.user_features.muted_keywords.clone(); if muted_keywords.is_empty() { diff --git a/home-mixer/filters/previously_seen_posts_filter.rs b/home-mixer/filters/previously_seen_posts_filter.rs index 3e62ae9..9505f38 100644 --- a/home-mixer/filters/previously_seen_posts_filter.rs +++ b/home-mixer/filters/previously_seen_posts_filter.rs @@ -15,7 +15,7 @@ impl Filter for PreviouslySeenPostsFilter { &self, query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let bloom_filters = query .bloom_filter_entries .iter() diff --git a/home-mixer/filters/previously_served_posts_filter.rs b/home-mixer/filters/previously_served_posts_filter.rs index 7b88e42..1d97980 100644 --- a/home-mixer/filters/previously_served_posts_filter.rs +++ b/home-mixer/filters/previously_served_posts_filter.rs @@ -16,7 +16,7 @@ impl Filter for PreviouslyServedPostsFilter { &self, query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let (removed, kept): (Vec<_>, Vec<_>) = candidates.into_iter().partition(|c| { get_related_post_ids(c) .iter() diff --git a/home-mixer/filters/retweet_deduplication_filter.rs b/home-mixer/filters/retweet_deduplication_filter.rs index 1216f5a..7c6001b 100644 --- a/home-mixer/filters/retweet_deduplication_filter.rs +++ b/home-mixer/filters/retweet_deduplication_filter.rs @@ -14,7 +14,7 @@ impl Filter for RetweetDeduplicationFilter { &self, _query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let mut seen_tweet_ids: HashSet = HashSet::new(); let mut kept = Vec::new(); let mut removed = Vec::new(); diff --git a/home-mixer/filters/self_tweet_filter.rs b/home-mixer/filters/self_tweet_filter.rs index 0b1877f..693ab35 100644 --- a/home-mixer/filters/self_tweet_filter.rs +++ b/home-mixer/filters/self_tweet_filter.rs @@ -12,7 +12,7 @@ impl Filter for SelfTweetFilter { &self, query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let viewer_id = query.user_id as u64; let (kept, removed): (Vec<_>, Vec<_>) = candidates .into_iter() diff --git a/home-mixer/filters/vf_filter.rs b/home-mixer/filters/vf_filter.rs index 20aca71..d320596 100644 --- a/home-mixer/filters/vf_filter.rs +++ b/home-mixer/filters/vf_filter.rs @@ -13,7 +13,7 @@ impl Filter for VFFilter { &self, _query: &ScoredPostsQuery, candidates: Vec, - ) -> Result, String> { + ) -> Result, (String, Vec)> { let (removed, kept): (Vec<_>, Vec<_>) = candidates .into_iter() .partition(|c| should_drop(&c.visibility_reason));