From 73414e2d78cea6729beab17bdfa9beabc7213b14 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Thu, 20 Nov 2025 15:05:12 -0500 Subject: [PATCH 01/20] SQL-3008 - signed commits --- agg-ast/ast/src/definitions.rs | 20 ++ agg-ast/ast/src/serde_test.rs | 176 ++++++++++++ .../src/schema_derivation.rs | 61 +++- .../src/schema_derivation_tests/stage.rs | 266 ++++++++++++++++++ mongosql/src/air/agg_ast/ast_definitions.rs | 1 + 5 files changed, 522 insertions(+), 2 deletions(-) diff --git a/agg-ast/ast/src/definitions.rs b/agg-ast/ast/src/definitions.rs index 5b929e712..dde3b92fb 100644 --- a/agg-ast/ast/src/definitions.rs +++ b/agg-ast/ast/src/definitions.rs @@ -78,6 +78,8 @@ pub enum Stage { Sample(Sample), #[serde(rename = "$unionWith")] UnionWith(UnionWith), + #[serde(rename = "$rankFusion")] + RankFusion(RankFusion), // Search stages #[serde(rename = "$graphLookup")] @@ -620,6 +622,23 @@ pub struct Bucket { pub default: Option, pub output: Option>, } +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RankFusion { + pub input: RankFusionPipeline, + pub combination: Option, + pub score_details: bool, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct RankFusionPipeline { + pub pipelines: LinkedHashMap>, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct RankFusionCombination { + pub weights: LinkedHashMap, +} #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -1783,6 +1802,7 @@ impl Stage { Stage::GeoNear(_) => "$geoNear", Stage::Sample(_) => "$sample", Stage::UnionWith(_) => "$unionWith", + Stage::RankFusion(_) => "$rankFusion", Stage::GraphLookup(_) => "$graphLookup", Stage::AtlasSearchStage(_) => "", } diff --git a/agg-ast/ast/src/serde_test.rs b/agg-ast/ast/src/serde_test.rs index 384ade8f8..fed9ca52d 100644 --- a/agg-ast/ast/src/serde_test.rs +++ b/agg-ast/ast/src/serde_test.rs @@ -1605,6 +1605,182 @@ mod stage_test { }}"# ); } + mod rank_fusion { + + use crate::definitions::AtlasSearchStage::{Search, VectorSearch}; + use crate::definitions::Expression::Literal; + use crate::definitions::Stage::Limit; + use crate::definitions::{ + AtlasSearchStage, Expression, LiteralValue, RankFusion, RankFusionCombination, + RankFusionPipeline, Stage, + }; + use crate::map; + use linked_hash_map::LinkedHashMap; + + pub fn empty_rankfusion_pipeline() -> LinkedHashMap> { + let mut pipelines: LinkedHashMap> = LinkedHashMap::new(); + + pipelines.insert("searchOne".to_string(), Vec::new()); + pipelines + } + + pub fn single_rankfusion_pipeline() -> LinkedHashMap> { + let mut pipelines: LinkedHashMap> = LinkedHashMap::new(); + + let vector_search_stage: AtlasSearchStage = VectorSearch(Box::new( + Expression::Document(map! { + "index".to_string() => Expression::Literal(LiteralValue::String("movie_collection_index".to_string())), + "path".to_string() => Expression::Literal(LiteralValue::String("title".to_string())), + "queryVector".to_string() => Expression::Array(vec![Expression::Literal(LiteralValue::Double(10.6)), Expression::Literal(LiteralValue::Double(60.5))]), + "numCandidates".to_string() => Expression::Literal(LiteralValue::Int32(500)), + }), + )); + let search_one_pipeline: Vec = + vec![Stage::AtlasSearchStage(vector_search_stage)]; + + pipelines.insert("searchOne".to_string(), search_one_pipeline); + pipelines + } + + pub fn two_rank_fusion_pipelines() -> LinkedHashMap> { + let mut pipelines: LinkedHashMap> = LinkedHashMap::new(); + + let vector_search_stage: AtlasSearchStage = VectorSearch(Box::new( + Expression::Document(map! { + "index".to_string() => Literal(LiteralValue::String("hybrid-vector-search".to_string())), + "path".to_string() => Literal(LiteralValue::String("plot_embedding_voyage_3_large".to_string())), + "queryVector".to_string() => Expression::Array(vec![Literal(LiteralValue::Double(10.6)), Expression::Literal(LiteralValue::Double(60.5))]), + "numCandidates".to_string() => Literal(LiteralValue::Int32(100)), + "limit".to_string() => Literal(LiteralValue::Int32(20)), + }), + )); + let search_one_pipeline: Vec = + vec![Stage::AtlasSearchStage(vector_search_stage)]; + pipelines.insert("vectorPipeline".to_string(), search_one_pipeline); + + let full_text_search_stage: AtlasSearchStage = Search(Box::new(Expression::Document( + map! { + "index".to_string() => Literal(LiteralValue::String("hybrid-full-text-search".to_string())), + "phrase".to_string() => Expression::Document(map! { + "query".to_string() => Literal(LiteralValue::String("star wars".to_string())), + "path".to_string() => Literal(LiteralValue::String("title".to_string())), + }) + }, + ))); + let limit_stage: Stage = Limit(20); + let search_two_pipeline: Vec = + vec![Stage::AtlasSearchStage(full_text_search_stage), limit_stage]; + pipelines.insert("fullTextPipeline".to_string(), search_two_pipeline); + + pipelines + } + + // Test #1: Validate that we can parse a basic version of $rankFusion with 0 pipelines + test_serde_stage!( + rank_fusion_empty_pipelines, + expected = Stage::RankFusion(RankFusion { + input: RankFusionPipeline { + pipelines: empty_rankfusion_pipeline(), + }, + combination: None, + score_details: true + }), + input = r#"stage: {"$rankFusion": { + "input": { "pipelines": { searchOne: []}}, + "scoreDetails": true, + }}"# + ); + + // Test #2: Validate we can parse $rankFusion stage with a single pipeline + test_serde_stage!( + rank_fusion_single_pipeline, + expected = Stage::RankFusion(RankFusion { + input: RankFusionPipeline { + pipelines: single_rankfusion_pipeline(), + }, + combination: None, + score_details: false + }), + input = r#"stage: {"$rankFusion": { + "input": { "pipelines": { searchOne: [{ "$vectorSearch" : {"index" : "movie_collection_index", "path" : "title", "queryVector": [10.6, 60.5], "numCandidates": 500} }] } }, + "scoreDetails": false, + }}"# + ); + + // Test #3: Validate that if a pipeline with the same key appears multiple times, only the last key is respected. + test_serde_stage!( + rank_fusion_uses_latest_key_to_deduplicate_pipelines, + expected = Stage::RankFusion(RankFusion { + input: RankFusionPipeline { + pipelines: single_rankfusion_pipeline(), + }, + combination: None, + score_details: false + }), + input = r#"stage: {"$rankFusion": { + "input": { "pipelines": { + searchOne: [{ "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "star wars", "path": "title"}}}, { "$project": { "title": 1, "released" : 1 } }], + searchOne: [{ "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "star wars", "path": "title"}}}, { "$limit": 20 }], + searchOne: [{ "$vectorSearch" : {"index" : "movie_collection_index", "path" : "title", "queryVector": [10.6, 60.5], "numCandidates": 500} }] } }, + "scoreDetails": false, + }}"# + ); + + // Test #4: Validate we parse multiple input pipelines along with specifying weights for each pipeline + test_serde_stage!( + rank_fusion_multiple_pipelines_with_weights, + expected = Stage::RankFusion(RankFusion { + input: RankFusionPipeline { + pipelines: two_rank_fusion_pipelines(), + }, + combination: Some(RankFusionCombination { + weights: map! { + "vectorPipeline".to_string() => 0.5, + "fullTextPipeline".to_string() => 0.5 + } + }), + score_details: true + }), + input = r#"stage: { + "$rankFusion": { + "input": { + pipelines: { + vectorPipeline: [ + { + "$vectorSearch": { + "index": "hybrid-vector-search", + "path": "plot_embedding_voyage_3_large", + "queryVector": [10.6, 60.5], + "numCandidates": 100, + "limit": 20 + } + } + ], + fullTextPipeline: [ + { + "$search": { + "index": "hybrid-full-text-search", + "phrase": { + "query": "star wars", + "path": "title" + } + } + }, + { "$limit": 20 } + ] + } + }, + "combination": { + weights: { + vectorPipeline: 0.5, + fullTextPipeline: 0.5 + } + }, + "scoreDetails": true + } + }"# + ); + } mod densify { use crate::definitions::{Densify, DensifyRange, DensifyRangeBounds, Stage}; diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index 161ad5a36..e609fd207 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -7,8 +7,9 @@ use crate::{ use agg_ast::definitions::{ AtlasSearchStage, Bucket, BucketAuto, ConciseSubqueryLookup, Densify, Documents, EqualityLookup, Expression, Fill, FillOutput, GraphLookup, Group, LiteralValue, Lookup, - LookupFrom, Namespace, ProjectItem, ProjectStage, Ref, SetWindowFields, Stage, SubqueryLookup, - TaggedOperator, UnionWith, Unset, UntaggedOperator, UntaggedOperatorName, Unwind, + LookupFrom, Namespace, ProjectItem, ProjectStage, RankFusion, Ref, SetWindowFields, Stage, + SubqueryLookup, TaggedOperator, UnionWith, Unset, UntaggedOperator, UntaggedOperatorName, + Unwind, }; use linked_hash_map::LinkedHashMap; use mongosql::{ @@ -433,6 +434,61 @@ impl DeriveSchema for Stage { } } + fn rank_fusion_derive_schema( + rank_fusion: &RankFusion, + state: &mut ResultSetState, + ) -> Result { + // 1. Derive the schema for each pipeline and union them together + let unioned_schema_pipelines: Schema = rank_fusion + .input + .pipelines + .values() + .map(|pipeline| { + let derived_schema = + derive_schema_for_pipeline(pipeline.clone(), None, &mut state.clone()); + derived_schema.unwrap_or(EMPTY_DOCUMENT.clone()) + }) + .fold(Schema::Unsat, |acc, derived_schema| { + acc.union(&derived_schema) + }); + + // 2. If score_details is true, add scoreDetails to the schema + let score_details_metadata_schema = Schema::Document(Document { + keys: map! { + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "description".to_string() => Schema::Atomic(Atomic::String), + "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { + keys: map! { + "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), + "rank".to_string() => Schema::Atomic(Atomic::Integer), + "weight".to_string() => Schema::Atomic(Atomic::Integer), + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "details".to_string() => Schema::Array(Box::new(Schema::Any)), + }, + required: set!("inputPipelineName".to_string(), "rank".to_string(),), + ..Default::default() + }))) + }, + required: set!("value".to_string(), "description".to_string(),), + ..Default::default() + }); + + let score_details_scheme = Schema::Document(Document { + keys: map! { + "scoreDetails".to_string() => score_details_metadata_schema + }, + required: map! {}, + additional_properties: false, + jaccard_index: None, + }); + + if rank_fusion.score_details { + Ok(unioned_schema_pipelines.document_union(score_details_scheme)) + } else { + Ok(unioned_schema_pipelines) + } + } + /// bucket_derive_schema derives the schema for a $bucket stage. The schema is defined by the output field, /// and contains _id as well. fn bucket_derive_schema(bucket: &Bucket, state: &mut ResultSetState) -> Result { @@ -1128,6 +1184,7 @@ impl DeriveSchema for Stage { Stage::Lookup(l) => lookup_derive_schema(l, state), Stage::Match(ref m) => m.derive_schema(state), Stage::Project(p) => project_derive_schema(p, state), + Stage::RankFusion(rf) => rank_fusion_derive_schema(rf, state), Stage::Redact(_) => Ok(state.result_set_schema.to_owned()), Stage::ReplaceWith(r) => r .to_owned() diff --git a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs index f0362863f..76266a633 100644 --- a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs +++ b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs @@ -2450,3 +2450,269 @@ mod unset_fields { }) ); } + +mod rank_fusion { + use super::*; + + // Test #1: Base Test - two pipelines and a starting schema + test_derive_stage_schema!( + two_search_pipelines_no_score_details, + expected = Ok(Schema::Document(Document { + keys: map! { + "title".to_string() => Schema::Atomic(Atomic::String), + "isbn".to_string() => Schema::Atomic(Atomic::String), + "author".to_string() => Schema::AnyOf(set!( + Schema::Document(Document { + keys: map! { + "first".to_string() => Schema::Atomic(Atomic::String), + "last".to_string() => Schema::Atomic(Atomic::String), + }, + required: set!("first".to_string(), "last".to_string()), + ..Default::default() + }), + Schema::Atomic(Atomic::String), + )), + }, + required: set!( + "title".to_string(), + "author".to_string(), + "isbn".to_string(), + ), + ..Default::default() + })), + input = r#"{ + "$rankFusion": { + "input": { + "pipelines": { + "vectorPipeline": [ + { + "$vectorSearch": { + "index": "hybrid-vector-search", + "path": "plot_embedding_voyage_3_large", + "queryVector": [10.6, 60.5], + "numCandidates": 100, + "limit": 20 + } + } + ], + "fullTextPipeline": [ + { + "$search": { + "index": "hybrid-full-text-search", + "phrase": { + "query": "legend", + "path": "title" + } + } + }, + { "$limit": 10 } + ] + } + }, + "combination": { + "weights": { + "vectorPipeline": 0.5, + "fullTextPipeline": 0.5 + } + }, + "scoreDetails": false + } + }"#, + starting_schema = Schema::Document(Document { + keys: map! { + "title".to_string() => Schema::Atomic(Atomic::String), + "isbn".to_string() => Schema::Atomic(Atomic::String), + "author".to_string() => Schema::AnyOf(set!( + Schema::Document(Document { + keys: map! { + "first".to_string() => Schema::Atomic(Atomic::String), + "last".to_string() => Schema::Atomic(Atomic::String), + }, + required: set!("first".to_string(), "last".to_string()), + ..Default::default() + }), + Schema::Atomic(Atomic::String), + )), + }, + required: set!( + "title".to_string(), + "author".to_string(), + "isbn".to_string(), + ), + ..Default::default() + }) + ); + // Test #2: Score Details - Base Case - With Score Details + test_derive_stage_schema!( + pipeline_with_score_details, + expected = Ok(Schema::Document(Document { + keys: map! { + "title".to_string() => Schema::Atomic(Atomic::String), + "isbn".to_string() => Schema::Atomic(Atomic::String), + "author".to_string() => Schema::AnyOf(set!( + Schema::Document(Document { + keys: map! { + "first".to_string() => Schema::Atomic(Atomic::String), + "last".to_string() => Schema::Atomic(Atomic::String), + }, + required: set!("first".to_string(), "last".to_string()), + ..Default::default() + }), + Schema::Atomic(Atomic::String), + )), + "scoreDetails".to_string() => Schema::Document(Document { + keys: map! { + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "description".to_string() => Schema::Atomic(Atomic::String), + "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { + keys: map! { + "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), + "rank".to_string() => Schema::Atomic(Atomic::Integer), + "weight".to_string() => Schema::Atomic(Atomic::Integer), + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "details".to_string() => Schema::Array(Box::new(Schema::Any)), + }, + required: set!("inputPipelineName".to_string(), "rank".to_string(),), + ..Default::default() + }))) + }, + required: set!("value".to_string(), "description".to_string(),), + ..Default::default() + }) + }, + ..Default::default() + })), + input = r#"{ + "$rankFusion": { + "input": { + "pipelines": { + "vectorPipeline": [ + { + "$vectorSearch": { + "index": "hybrid-vector-search", + "path": "plot_embedding_voyage_3_large", + "queryVector": [10.6, 60.5], + "numCandidates": 100, + "limit": 20 + } + } + ] + } + }, + "combination": { + "weights": { + "vectorPipeline": 0.5, + "fullTextPipeline": 0.5 + } + }, + "scoreDetails": true + }}"#, + starting_schema = Schema::Document(Document { + keys: map! { + "title".to_string() => Schema::Atomic(Atomic::String), + "isbn".to_string() => Schema::Atomic(Atomic::String), + "author".to_string() => Schema::AnyOf(set!( + Schema::Document(Document { + keys: map! { + "first".to_string() => Schema::Atomic(Atomic::String), + "last".to_string() => Schema::Atomic(Atomic::String), + }, + required: set!("first".to_string(), "last".to_string()), + ..Default::default() + }), + Schema::Atomic(Atomic::String), + )), + }, + required: set!( + "title".to_string(), + "author".to_string(), + "isbn".to_string(), + ), + ..Default::default() + }) + ); + // Test #3: Test where sub-pipelines have different output schemas + test_derive_stage_schema!( + sub_pipelines_with_different_output_schemas, + expected = Ok(Schema::Document(Document { + keys: map! { + "title".to_string() => Schema::Atomic(Atomic::String), + "author".to_string() => Schema::AnyOf(set!( + Schema::Document(Document { + keys: map! { + "first".to_string() => Schema::Atomic(Atomic::String), + "last".to_string() => Schema::Atomic(Atomic::String), + }, + required: set!("first".to_string(), "last".to_string()), + ..Default::default() + }), + Schema::Atomic(Atomic::String), + )), + }, + ..Default::default() + })), + input = r#"{ + "$rankFusion": { + "input": { + "pipelines": { + "vectorPipeline": [ + { + "$vectorSearch": { + "index": "hybrid-vector-search", + "path": "plot_embedding_voyage_3_large", + "queryVector": [10.6, 60.5], + "numCandidates": 100, + "limit": 20 + } + }, + {"$project" : { "title": 1 }} + + ], + "fullTextPipeline": [ + { + "$search": { + "index": "hybrid-full-text-search", + "phrase": { + "query": "legend", + "path": "title" + } + } + }, + {"$project" : {"author" : 1}} + ] + } + }, + "combination": { + "weights": { + "vectorPipeline": 0.5, + "fullTextPipeline": 0.5 + } + }, + "scoreDetails": false + } + }"#, + starting_schema = Schema::Document(Document { + keys: map! { + "title".to_string() => Schema::Atomic(Atomic::String), + "isbn".to_string() => Schema::Atomic(Atomic::String), + "author".to_string() => Schema::AnyOf(set!( + Schema::Document(Document { + keys: map! { + "first".to_string() => Schema::Atomic(Atomic::String), + "last".to_string() => Schema::Atomic(Atomic::String), + }, + required: set!("first".to_string(), "last".to_string()), + ..Default::default() + }), + Schema::Atomic(Atomic::String), + )), + }, + required: set!( + "title".to_string(), + "author".to_string(), + "isbn".to_string(), + ), + ..Default::default() + }) + ); +} diff --git a/mongosql/src/air/agg_ast/ast_definitions.rs b/mongosql/src/air/agg_ast/ast_definitions.rs index d487f5d2a..ff0ec7c7f 100644 --- a/mongosql/src/air/agg_ast/ast_definitions.rs +++ b/mongosql/src/air/agg_ast/ast_definitions.rs @@ -305,6 +305,7 @@ impl From<(Option, Stage)> for air::Stage { | Stage::Facet(_) | Stage::Fill(_) | Stage::GeoNear(_) + | Stage::RankFusion(_) | Stage::Sample(_) | Stage::SortByCount(_) | Stage::UnionWith(_) From cf965ee7f92dc3b0361b54901e1c2fa46f9988c8 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Thu, 20 Nov 2025 15:27:42 -0500 Subject: [PATCH 02/20] Nest score details in if-statement --- .../src/schema_derivation.rs | 55 +++++++++---------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index e609fd207..bd395576f 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -452,37 +452,36 @@ impl DeriveSchema for Stage { acc.union(&derived_schema) }); - // 2. If score_details is true, add scoreDetails to the schema - let score_details_metadata_schema = Schema::Document(Document { - keys: map! { - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "description".to_string() => Schema::Atomic(Atomic::String), - "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { + if rank_fusion.score_details { + // 2. If score_details is true, add scoreDetails to the schema + let score_details_metadata_schema = Schema::Document(Document { keys: map! { - "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), - "rank".to_string() => Schema::Atomic(Atomic::Integer), - "weight".to_string() => Schema::Atomic(Atomic::Integer), - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "details".to_string() => Schema::Array(Box::new(Schema::Any)), - }, - required: set!("inputPipelineName".to_string(), "rank".to_string(),), + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "description".to_string() => Schema::Atomic(Atomic::String), + "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { + keys: map! { + "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), + "rank".to_string() => Schema::Atomic(Atomic::Integer), + "weight".to_string() => Schema::Atomic(Atomic::Integer), + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "details".to_string() => Schema::Array(Box::new(Schema::Any)), + }, + required: set!("inputPipelineName".to_string(), "rank".to_string(),), + ..Default::default() + }))) + }, + required: set!("value".to_string(), "description".to_string(),), ..Default::default() - }))) - }, - required: set!("value".to_string(), "description".to_string(),), - ..Default::default() - }); - - let score_details_scheme = Schema::Document(Document { - keys: map! { - "scoreDetails".to_string() => score_details_metadata_schema - }, - required: map! {}, - additional_properties: false, - jaccard_index: None, - }); + }); - if rank_fusion.score_details { + let score_details_scheme = Schema::Document(Document { + keys: map! { + "scoreDetails".to_string() => score_details_metadata_schema + }, + required: map! {}, + additional_properties: false, + jaccard_index: None, + }); Ok(unioned_schema_pipelines.document_union(score_details_scheme)) } else { Ok(unioned_schema_pipelines) From 51ac0576249c94dc1a8bb96b5f35e6c3360a8e01 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 24 Nov 2025 09:18:00 -0500 Subject: [PATCH 03/20] Rename RankFusionPipeline to RankFusionInput to match docs --- agg-ast/ast/src/definitions.rs | 4 ++-- agg-ast/ast/src/serde_test.rs | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/agg-ast/ast/src/definitions.rs b/agg-ast/ast/src/definitions.rs index dde3b92fb..4e86ce831 100644 --- a/agg-ast/ast/src/definitions.rs +++ b/agg-ast/ast/src/definitions.rs @@ -625,13 +625,13 @@ pub struct Bucket { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RankFusion { - pub input: RankFusionPipeline, + pub input: RankFusionInput, pub combination: Option, pub score_details: bool, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct RankFusionPipeline { +pub struct RankFusionInput { pub pipelines: LinkedHashMap>, } diff --git a/agg-ast/ast/src/serde_test.rs b/agg-ast/ast/src/serde_test.rs index fed9ca52d..79fab5cac 100644 --- a/agg-ast/ast/src/serde_test.rs +++ b/agg-ast/ast/src/serde_test.rs @@ -1612,7 +1612,7 @@ mod stage_test { use crate::definitions::Stage::Limit; use crate::definitions::{ AtlasSearchStage, Expression, LiteralValue, RankFusion, RankFusionCombination, - RankFusionPipeline, Stage, + RankFusionInput, Stage, }; use crate::map; use linked_hash_map::LinkedHashMap; @@ -1679,7 +1679,7 @@ mod stage_test { test_serde_stage!( rank_fusion_empty_pipelines, expected = Stage::RankFusion(RankFusion { - input: RankFusionPipeline { + input: RankFusionInput { pipelines: empty_rankfusion_pipeline(), }, combination: None, @@ -1695,7 +1695,7 @@ mod stage_test { test_serde_stage!( rank_fusion_single_pipeline, expected = Stage::RankFusion(RankFusion { - input: RankFusionPipeline { + input: RankFusionInput { pipelines: single_rankfusion_pipeline(), }, combination: None, @@ -1711,7 +1711,7 @@ mod stage_test { test_serde_stage!( rank_fusion_uses_latest_key_to_deduplicate_pipelines, expected = Stage::RankFusion(RankFusion { - input: RankFusionPipeline { + input: RankFusionInput { pipelines: single_rankfusion_pipeline(), }, combination: None, @@ -1730,7 +1730,7 @@ mod stage_test { test_serde_stage!( rank_fusion_multiple_pipelines_with_weights, expected = Stage::RankFusion(RankFusion { - input: RankFusionPipeline { + input: RankFusionInput { pipelines: two_rank_fusion_pipelines(), }, combination: Some(RankFusionCombination { From bbeb694784ccb11e4d9b893e5f44f20a541c0388 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 24 Nov 2025 09:28:06 -0500 Subject: [PATCH 04/20] Address schema_derivation.rs comments --- .../src/schema_derivation.rs | 49 +++++++++---------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index bd395576f..09e43389d 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -439,7 +439,7 @@ impl DeriveSchema for Stage { state: &mut ResultSetState, ) -> Result { // 1. Derive the schema for each pipeline and union them together - let unioned_schema_pipelines: Schema = rank_fusion + let mut unioned_schema_pipelines: Schema = rank_fusion .input .pipelines .values() @@ -452,40 +452,39 @@ impl DeriveSchema for Stage { acc.union(&derived_schema) }); + // 2. If score_details is true, add scoreDetails schema to the overall schema if rank_fusion.score_details { - // 2. If score_details is true, add scoreDetails to the schema - let score_details_metadata_schema = Schema::Document(Document { + let score_details_scheme = Schema::Document(Document { keys: map! { - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "description".to_string() => Schema::Atomic(Atomic::String), - "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { + "scoreDetails".to_string() => Schema::Document(Document { keys: map! { - "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), - "rank".to_string() => Schema::Atomic(Atomic::Integer), - "weight".to_string() => Schema::Atomic(Atomic::Integer), - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "details".to_string() => Schema::Array(Box::new(Schema::Any)), - }, - required: set!("inputPipelineName".to_string(), "rank".to_string(),), + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "description".to_string() => Schema::Atomic(Atomic::String), + "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { + keys: map! { + "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), + "rank".to_string() => Schema::Atomic(Atomic::Integer), + "weight".to_string() => Schema::Atomic(Atomic::Integer), + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "details".to_string() => Schema::Array(Box::new(Schema::Any)), + }, + required: set!("inputPipelineName".to_string(), "rank".to_string(),), + ..Default::default() + }))) + }, + required: set!("value".to_string(), "description".to_string(),), ..Default::default() - }))) + }) }, - required: set!("value".to_string(), "description".to_string(),), - ..Default::default() - }); - - let score_details_scheme = Schema::Document(Document { - keys: map! { - "scoreDetails".to_string() => score_details_metadata_schema - }, required: map! {}, additional_properties: false, jaccard_index: None, }); - Ok(unioned_schema_pipelines.document_union(score_details_scheme)) - } else { - Ok(unioned_schema_pipelines) + unioned_schema_pipelines = unioned_schema_pipelines.union(&score_details_scheme); } + + Ok(unioned_schema_pipelines) + } /// bucket_derive_schema derives the schema for a $bucket stage. The schema is defined by the output field, From f437aa750f4771193324460bede155b07184a8db Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 24 Nov 2025 10:35:57 -0500 Subject: [PATCH 05/20] Replace pipeline functions with macros. Format code --- agg-ast/ast/src/lib.rs | 32 ++++ agg-ast/ast/src/serde_test.rs | 175 ++++++------------ .../src/schema_derivation.rs | 1 - 3 files changed, 84 insertions(+), 124 deletions(-) diff --git a/agg-ast/ast/src/lib.rs b/agg-ast/ast/src/lib.rs index d6b334da9..717c073af 100644 --- a/agg-ast/ast/src/lib.rs +++ b/agg-ast/ast/src/lib.rs @@ -22,3 +22,35 @@ macro_rules! map { ].into_iter()) }; } + +#[macro_export] +macro_rules! vector_pipeline { + () => { + vec![Stage::AtlasSearchStage(VectorSearch(Box::new( + Expression::Document(map! { + "index".to_string() => Literal(LiteralValue::String("movie_collection_index".to_string())), + "path".to_string() => Literal(LiteralValue::String("title".to_string())), + "queryVector".to_string() => Expression::Array(vec![Literal(LiteralValue::Double(10.6)), Expression::Literal(LiteralValue::Double(60.5))]), + "numCandidates".to_string() => Literal(LiteralValue::Int32(500)), + }), + )))] + }; +} + +#[macro_export] +macro_rules! text_search_pipeline { + () => { + + vec![Stage::AtlasSearchStage( + Search(Box::new(Expression::Document( + map! { + "index".to_string() => Literal(LiteralValue::String("hybrid-full-text-search".to_string())), + "phrase".to_string() => Expression::Document(map! { + "query".to_string() => Literal(LiteralValue::String("star wars".to_string())), + "path".to_string() => Literal(LiteralValue::String("title".to_string())), + }) + }, + ))) + ), Limit(20)] + }; +} diff --git a/agg-ast/ast/src/serde_test.rs b/agg-ast/ast/src/serde_test.rs index 79fab5cac..261259690 100644 --- a/agg-ast/ast/src/serde_test.rs +++ b/agg-ast/ast/src/serde_test.rs @@ -1607,96 +1607,22 @@ mod stage_test { } mod rank_fusion { - use crate::definitions::AtlasSearchStage::{Search, VectorSearch}; - use crate::definitions::Expression::Literal; - use crate::definitions::Stage::Limit; use crate::definitions::{ - AtlasSearchStage, Expression, LiteralValue, RankFusion, RankFusionCombination, - RankFusionInput, Stage, + AtlasSearchStage::{Search, VectorSearch}, + Expression, + Expression::Literal, + LiteralValue, RankFusion, RankFusionCombination, RankFusionInput, Stage, + Stage::Limit, }; - use crate::map; - use linked_hash_map::LinkedHashMap; + use crate::{map, text_search_pipeline, vector_pipeline}; - pub fn empty_rankfusion_pipeline() -> LinkedHashMap> { - let mut pipelines: LinkedHashMap> = LinkedHashMap::new(); - - pipelines.insert("searchOne".to_string(), Vec::new()); - pipelines - } - - pub fn single_rankfusion_pipeline() -> LinkedHashMap> { - let mut pipelines: LinkedHashMap> = LinkedHashMap::new(); - - let vector_search_stage: AtlasSearchStage = VectorSearch(Box::new( - Expression::Document(map! { - "index".to_string() => Expression::Literal(LiteralValue::String("movie_collection_index".to_string())), - "path".to_string() => Expression::Literal(LiteralValue::String("title".to_string())), - "queryVector".to_string() => Expression::Array(vec![Expression::Literal(LiteralValue::Double(10.6)), Expression::Literal(LiteralValue::Double(60.5))]), - "numCandidates".to_string() => Expression::Literal(LiteralValue::Int32(500)), - }), - )); - let search_one_pipeline: Vec = - vec![Stage::AtlasSearchStage(vector_search_stage)]; - - pipelines.insert("searchOne".to_string(), search_one_pipeline); - pipelines - } - - pub fn two_rank_fusion_pipelines() -> LinkedHashMap> { - let mut pipelines: LinkedHashMap> = LinkedHashMap::new(); - - let vector_search_stage: AtlasSearchStage = VectorSearch(Box::new( - Expression::Document(map! { - "index".to_string() => Literal(LiteralValue::String("hybrid-vector-search".to_string())), - "path".to_string() => Literal(LiteralValue::String("plot_embedding_voyage_3_large".to_string())), - "queryVector".to_string() => Expression::Array(vec![Literal(LiteralValue::Double(10.6)), Expression::Literal(LiteralValue::Double(60.5))]), - "numCandidates".to_string() => Literal(LiteralValue::Int32(100)), - "limit".to_string() => Literal(LiteralValue::Int32(20)), - }), - )); - let search_one_pipeline: Vec = - vec![Stage::AtlasSearchStage(vector_search_stage)]; - pipelines.insert("vectorPipeline".to_string(), search_one_pipeline); - - let full_text_search_stage: AtlasSearchStage = Search(Box::new(Expression::Document( - map! { - "index".to_string() => Literal(LiteralValue::String("hybrid-full-text-search".to_string())), - "phrase".to_string() => Expression::Document(map! { - "query".to_string() => Literal(LiteralValue::String("star wars".to_string())), - "path".to_string() => Literal(LiteralValue::String("title".to_string())), - }) - }, - ))); - let limit_stage: Stage = Limit(20); - let search_two_pipeline: Vec = - vec![Stage::AtlasSearchStage(full_text_search_stage), limit_stage]; - pipelines.insert("fullTextPipeline".to_string(), search_two_pipeline); - - pipelines - } - - // Test #1: Validate that we can parse a basic version of $rankFusion with 0 pipelines - test_serde_stage!( - rank_fusion_empty_pipelines, - expected = Stage::RankFusion(RankFusion { - input: RankFusionInput { - pipelines: empty_rankfusion_pipeline(), - }, - combination: None, - score_details: true - }), - input = r#"stage: {"$rankFusion": { - "input": { "pipelines": { searchOne: []}}, - "scoreDetails": true, - }}"# - ); - - // Test #2: Validate we can parse $rankFusion stage with a single pipeline test_serde_stage!( rank_fusion_single_pipeline, expected = Stage::RankFusion(RankFusion { input: RankFusionInput { - pipelines: single_rankfusion_pipeline(), + pipelines: map! { + "searchOne".to_string() => vector_pipeline!() + }, }, combination: None, score_details: false @@ -1707,12 +1633,13 @@ mod stage_test { }}"# ); - // Test #3: Validate that if a pipeline with the same key appears multiple times, only the last key is respected. test_serde_stage!( rank_fusion_uses_latest_key_to_deduplicate_pipelines, expected = Stage::RankFusion(RankFusion { input: RankFusionInput { - pipelines: single_rankfusion_pipeline(), + pipelines: map! { + "searchOne".to_string() => vector_pipeline!() + }, }, combination: None, score_details: false @@ -1726,12 +1653,14 @@ mod stage_test { }}"# ); - // Test #4: Validate we parse multiple input pipelines along with specifying weights for each pipeline test_serde_stage!( rank_fusion_multiple_pipelines_with_weights, expected = Stage::RankFusion(RankFusion { input: RankFusionInput { - pipelines: two_rank_fusion_pipelines(), + pipelines: map! { + "vectorPipeline".to_string() => vector_pipeline!(), + "fullTextPipeline".to_string() => text_search_pipeline!(), + }, }, combination: Some(RankFusionCombination { weights: map! { @@ -1742,43 +1671,43 @@ mod stage_test { score_details: true }), input = r#"stage: { - "$rankFusion": { - "input": { - pipelines: { - vectorPipeline: [ - { - "$vectorSearch": { - "index": "hybrid-vector-search", - "path": "plot_embedding_voyage_3_large", - "queryVector": [10.6, 60.5], - "numCandidates": 100, - "limit": 20 - } - } - ], - fullTextPipeline: [ - { - "$search": { - "index": "hybrid-full-text-search", - "phrase": { - "query": "star wars", - "path": "title" + "$rankFusion": { + "input": { + pipelines: { + vectorPipeline: [ + { + "$vectorSearch": { + "index": "hybrid-vector-search", + "path": "plot_embedding_voyage_3_large", + "queryVector": [10.6, 60.5], + "numCandidates": 100, + "limit": 20 + } + } + ], + fullTextPipeline: [ + { + "$search": { + "index": "hybrid-full-text-search", + "phrase": { + "query": "star wars", + "path": "title" + } + } + }, + { "$limit": 20 } + ] + } + }, + "combination": { + weights: { + vectorPipeline: 0.5, + fullTextPipeline: 0.5 + } + }, + "scoreDetails": true } - } - }, - { "$limit": 20 } - ] - } - }, - "combination": { - weights: { - vectorPipeline: 0.5, - fullTextPipeline: 0.5 - } - }, - "scoreDetails": true - } - }"# + }"# ); } diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index 09e43389d..6ebe96bea 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -484,7 +484,6 @@ impl DeriveSchema for Stage { } Ok(unioned_schema_pipelines) - } /// bucket_derive_schema derives the schema for a $bucket stage. The schema is defined by the output field, From 0ac316d3d0d5bc751de954daf0b4c7e9e920c670 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 24 Nov 2025 11:08:48 -0500 Subject: [PATCH 06/20] Fix failing test --- agg-ast/ast/src/lib.rs | 6 +++--- agg-ast/ast/src/serde_test.rs | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/agg-ast/ast/src/lib.rs b/agg-ast/ast/src/lib.rs index 717c073af..7d37e864b 100644 --- a/agg-ast/ast/src/lib.rs +++ b/agg-ast/ast/src/lib.rs @@ -28,10 +28,10 @@ macro_rules! vector_pipeline { () => { vec![Stage::AtlasSearchStage(VectorSearch(Box::new( Expression::Document(map! { - "index".to_string() => Literal(LiteralValue::String("movie_collection_index".to_string())), - "path".to_string() => Literal(LiteralValue::String("title".to_string())), + "index".to_string() => Literal(LiteralValue::String("hybrid-vector-search".to_string())), + "path".to_string() => Literal(LiteralValue::String("plot_embedding_voyage_3_large".to_string())), "queryVector".to_string() => Expression::Array(vec![Literal(LiteralValue::Double(10.6)), Expression::Literal(LiteralValue::Double(60.5))]), - "numCandidates".to_string() => Literal(LiteralValue::Int32(500)), + "numCandidates".to_string() => Literal(LiteralValue::Int32(100)), }), )))] }; diff --git a/agg-ast/ast/src/serde_test.rs b/agg-ast/ast/src/serde_test.rs index 261259690..59b3318ed 100644 --- a/agg-ast/ast/src/serde_test.rs +++ b/agg-ast/ast/src/serde_test.rs @@ -1628,7 +1628,7 @@ mod stage_test { score_details: false }), input = r#"stage: {"$rankFusion": { - "input": { "pipelines": { searchOne: [{ "$vectorSearch" : {"index" : "movie_collection_index", "path" : "title", "queryVector": [10.6, 60.5], "numCandidates": 500} }] } }, + "input": { "pipelines": { searchOne: [{ "$vectorSearch" : {"index" : "hybrid-vector-search", "path" : "plot_embedding_voyage_3_large", "queryVector": [10.6, 60.5], "numCandidates": 100} }] } }, "scoreDetails": false, }}"# ); @@ -1648,7 +1648,7 @@ mod stage_test { "input": { "pipelines": { searchOne: [{ "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "star wars", "path": "title"}}}, { "$project": { "title": 1, "released" : 1 } }], searchOne: [{ "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "star wars", "path": "title"}}}, { "$limit": 20 }], - searchOne: [{ "$vectorSearch" : {"index" : "movie_collection_index", "path" : "title", "queryVector": [10.6, 60.5], "numCandidates": 500} }] } }, + searchOne: [{ "$vectorSearch" : {"index" : "hybrid-vector-search", "path" : "plot_embedding_voyage_3_large", "queryVector": [10.6, 60.5], "numCandidates": 100} }] } }, "scoreDetails": false, }}"# ); @@ -1681,7 +1681,6 @@ mod stage_test { "path": "plot_embedding_voyage_3_large", "queryVector": [10.6, 60.5], "numCandidates": 100, - "limit": 20 } } ], From 7510d2e5efd4d35fd511bcf00c815c0831748289 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 24 Nov 2025 13:58:41 -0500 Subject: [PATCH 07/20] Make scoreDetails optional. Add test for deduplicating match + sort pipelines --- agg-ast/ast/src/definitions.rs | 2 +- agg-ast/ast/src/serde_test.rs | 57 +++++++++++++++-- .../src/schema_derivation.rs | 61 ++++++++++--------- 3 files changed, 87 insertions(+), 33 deletions(-) diff --git a/agg-ast/ast/src/definitions.rs b/agg-ast/ast/src/definitions.rs index 4e86ce831..b5999261f 100644 --- a/agg-ast/ast/src/definitions.rs +++ b/agg-ast/ast/src/definitions.rs @@ -627,7 +627,7 @@ pub struct Bucket { pub struct RankFusion { pub input: RankFusionInput, pub combination: Option, - pub score_details: bool, + pub score_details: Option, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/agg-ast/ast/src/serde_test.rs b/agg-ast/ast/src/serde_test.rs index 59b3318ed..a8eff8159 100644 --- a/agg-ast/ast/src/serde_test.rs +++ b/agg-ast/ast/src/serde_test.rs @@ -1607,11 +1607,13 @@ mod stage_test { } mod rank_fusion { + use crate::definitions::Stage::{AtlasSearchStage, Match, Sort}; use crate::definitions::{ AtlasSearchStage::{Search, VectorSearch}, Expression, Expression::Literal, - LiteralValue, RankFusion, RankFusionCombination, RankFusionInput, Stage, + LiteralValue, MatchBinaryOp, MatchExpression, MatchField, MatchStage, RankFusion, + RankFusionCombination, RankFusionInput, Ref, Stage, Stage::Limit, }; use crate::{map, text_search_pipeline, vector_pipeline}; @@ -1625,7 +1627,7 @@ mod stage_test { }, }, combination: None, - score_details: false + score_details: Some(false) }), input = r#"stage: {"$rankFusion": { "input": { "pipelines": { searchOne: [{ "$vectorSearch" : {"index" : "hybrid-vector-search", "path" : "plot_embedding_voyage_3_large", "queryVector": [10.6, 60.5], "numCandidates": 100} }] } }, @@ -1642,7 +1644,7 @@ mod stage_test { }, }, combination: None, - score_details: false + score_details: Some(false) }), input = r#"stage: {"$rankFusion": { "input": { "pipelines": { @@ -1668,7 +1670,7 @@ mod stage_test { "fullTextPipeline".to_string() => 0.5 } }), - score_details: true + score_details: Some(true) }), input = r#"stage: { "$rankFusion": { @@ -1708,6 +1710,53 @@ mod stage_test { } }"# ); + + test_serde_stage!( + pipelines_are_deduplicated, + expected = Stage::RankFusion(RankFusion { + input: RankFusionInput { + pipelines: map! { + "searchOne".to_string() => vec![AtlasSearchStage(Search(Box::new( + Expression::Document(map! { + "index".to_string() => Literal(LiteralValue::String("hybrid-full-text-search".to_string())), + "phrase".to_string() => Expression::Document(map! { + "query".to_string() => Literal(LiteralValue::String("adventure".to_string())), + "path".to_string() => Literal(LiteralValue::String("plot".to_string())) + }), + + }), + ))), + Match(MatchStage { + expr: vec![MatchExpression::Field(MatchField { + field: Ref::FieldRef("metacritic".to_string()), + ops: map! { MatchBinaryOp::Gt => bson::Bson::Int32(75) } + })] + }) , + Sort(map! {"title".to_string() => 1})] + }, + }, + combination: None, + score_details: Some(false) + }), + input = r#"stage: { "$rankFusion" : { + "input" : { + "pipelines" : { + searchOne: [ + { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure", "path": "plot"}}}, + { "$match": { "genres": "Western", "year": { "$lt": 1980 }}}, + { "$sort": { "runtime": 1} + }], + searchOne: [ + { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure","path": "plot"}}}, + { "$match": { "metacritic": { "$gt": 75 }}}, + { "$sort": { "title": 1} + }] + } + }, + "scoreDetails": false + } +}"# + ); } mod densify { diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index 6ebe96bea..0be1bce5f 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -453,36 +453,41 @@ impl DeriveSchema for Stage { }); // 2. If score_details is true, add scoreDetails schema to the overall schema - if rank_fusion.score_details { - let score_details_scheme = Schema::Document(Document { - keys: map! { - "scoreDetails".to_string() => Schema::Document(Document { - keys: map! { - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "description".to_string() => Schema::Atomic(Atomic::String), - "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { + match rank_fusion.score_details { + Some(score_details_enabled) => { + if score_details_enabled { + let score_details_schema: Schema = Schema::Document(Document { keys: map! { - "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), - "rank".to_string() => Schema::Atomic(Atomic::Integer), - "weight".to_string() => Schema::Atomic(Atomic::Integer), - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "details".to_string() => Schema::Array(Box::new(Schema::Any)), - }, - required: set!("inputPipelineName".to_string(), "rank".to_string(),), - ..Default::default() - }))) - }, - required: set!("value".to_string(), "description".to_string(),), - ..Default::default() - }) - }, - required: map! {}, - additional_properties: false, - jaccard_index: None, - }); - unioned_schema_pipelines = unioned_schema_pipelines.union(&score_details_scheme); + "scoreDetails".to_string() => Schema::Document(Document { + keys: map! { + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "description".to_string() => Schema::Atomic(Atomic::String), + "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { + keys: map! { + "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), + "rank".to_string() => Schema::Atomic(Atomic::Integer), + "weight".to_string() => Schema::Atomic(Atomic::Integer), + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "details".to_string() => Schema::Array(Box::new(Schema::Any)), + }, + required: set!("inputPipelineName".to_string(), "rank".to_string(),), + ..Default::default() + }))) + }, + required: set!("value".to_string(), "description".to_string(),), + ..Default::default() + }) + }, + required: map! {}, + additional_properties: false, + jaccard_index: None, + }); + unioned_schema_pipelines = + unioned_schema_pipelines.union(&score_details_schema); + } + } + None => {} } - Ok(unioned_schema_pipelines) } From 1de0811d68217d912512e038d3004a8ff23a85bc Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 24 Nov 2025 15:21:49 -0500 Subject: [PATCH 08/20] Additional stage tests --- agg-ast/ast/src/serde_test.rs | 34 ++++++------ .../src/schema_derivation_tests/stage.rs | 52 +++++++++++++++++++ 2 files changed, 69 insertions(+), 17 deletions(-) diff --git a/agg-ast/ast/src/serde_test.rs b/agg-ast/ast/src/serde_test.rs index a8eff8159..78968ba2c 100644 --- a/agg-ast/ast/src/serde_test.rs +++ b/agg-ast/ast/src/serde_test.rs @@ -1739,23 +1739,23 @@ mod stage_test { score_details: Some(false) }), input = r#"stage: { "$rankFusion" : { - "input" : { - "pipelines" : { - searchOne: [ - { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure", "path": "plot"}}}, - { "$match": { "genres": "Western", "year": { "$lt": 1980 }}}, - { "$sort": { "runtime": 1} - }], - searchOne: [ - { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure","path": "plot"}}}, - { "$match": { "metacritic": { "$gt": 75 }}}, - { "$sort": { "title": 1} - }] - } - }, - "scoreDetails": false - } -}"# + "input" : { + "pipelines" : { + searchOne: [ + { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure", "path": "plot"}}}, + { "$match": { "genres": "Western", "year": { "$lt": 1980 }}}, + { "$sort": { "runtime": 1} + }], + searchOne: [ + { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure","path": "plot"}}}, + { "$match": { "metacritic": { "$gt": 75 }}}, + { "$sort": { "title": 1} + }] + } + }, + "scoreDetails": false + } + }"# ); } diff --git a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs index 76266a633..9baaa10ff 100644 --- a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs +++ b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs @@ -2453,6 +2453,7 @@ mod unset_fields { mod rank_fusion { use super::*; + use mongosql::schema::Schema::AnyOf; // Test #1: Base Test - two pipelines and a starting schema test_derive_stage_schema!( @@ -2715,4 +2716,55 @@ mod rank_fusion { ..Default::default() }) ); + + test_derive_stage_schema!( + rank_fusion_unions_types_together, + expected = Ok(Schema::Document(Document { + keys: map! { + "phoneNumber".to_string() => AnyOf(set![Schema::Atomic(Atomic::Integer), Schema::Atomic(Atomic::String)]), + }, + additional_properties: true, + ..Default::default() + })), + input = r#"{ + "$rankFusion" : { + "input" : { + "pipelines" : { + "searchOne": [{ "$match": { "phoneNumber": { "$type": "int"}}}, {"$sort": { "country": 1}}], + "searchTwo": [{ "$match": { "phoneNumber": { "$type": "string"}}}, {"$sort": { "country": 1}}] + } + } + } + }"# + ); + + test_derive_stage_schema!( + rank_fusion_uses_latest_key_for_duplicate_pipelines, + expected = Ok(Schema::Document(Document { + keys: map! { + "metacritic".to_string() => AnyOf(set![Schema::Atomic(Atomic::Integer), Schema::Atomic(Atomic::Long), Schema::Atomic(Atomic::Double), Schema::Atomic(Atomic::Decimal)]), + }, + required: Default::default(), + additional_properties: true, + jaccard_index: None, + })), + input = r#"{ "$rankFusion" : { + "input" : { + "pipelines" : { + "searchOne": [ + { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure", "path": "plot"}}}, + { "$match": { "genres": "Western", "year": { "$lt": 1980 }}}, + { "$sort": { "runtime": 1} + }], + "searchOne": [ + { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure","path": "plot"}}}, + { "$match": { "metacritic": { "$gt": 75 }}}, + { "$sort": { "title": 1} + }] + } + }, + "scoreDetails": false + } + }"# + ); } From b2ee1bf4d02b1f6b5a3028f1f5dc3380794bff20 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 24 Nov 2025 15:25:42 -0500 Subject: [PATCH 09/20] Use if-let to fix clippy warnings --- .../src/schema_derivation.rs | 56 +++++++++---------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index 0be1bce5f..4963d014e 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -453,41 +453,39 @@ impl DeriveSchema for Stage { }); // 2. If score_details is true, add scoreDetails schema to the overall schema - match rank_fusion.score_details { - Some(score_details_enabled) => { - if score_details_enabled { - let score_details_schema: Schema = Schema::Document(Document { + if let Some(score_details_enabled) = rank_fusion.score_details { + if score_details_enabled { + let score_details_schema: Schema = Schema::Document(Document { + keys: map! { + "scoreDetails".to_string() => Schema::Document(Document { keys: map! { - "scoreDetails".to_string() => Schema::Document(Document { + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "description".to_string() => Schema::Atomic(Atomic::String), + "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { keys: map! { - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "description".to_string() => Schema::Atomic(Atomic::String), - "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { - keys: map! { - "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), - "rank".to_string() => Schema::Atomic(Atomic::Integer), - "weight".to_string() => Schema::Atomic(Atomic::Integer), - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "details".to_string() => Schema::Array(Box::new(Schema::Any)), - }, - required: set!("inputPipelineName".to_string(), "rank".to_string(),), - ..Default::default() - }))) - }, - required: set!("value".to_string(), "description".to_string(),), + "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), + "rank".to_string() => Schema::Atomic(Atomic::Integer), + "weight".to_string() => Schema::Atomic(Atomic::Integer), + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "details".to_string() => Schema::Array(Box::new(Schema::Any)), + }, + required: set!("inputPipelineName".to_string(), "rank".to_string(),), ..Default::default() - }) + }))) }, - required: map! {}, - additional_properties: false, - jaccard_index: None, - }); - unioned_schema_pipelines = - unioned_schema_pipelines.union(&score_details_schema); - } + required: set!("value".to_string(), "description".to_string(),), + ..Default::default() + }) + }, + required: map! {}, + additional_properties: false, + jaccard_index: None, + }); + unioned_schema_pipelines = + unioned_schema_pipelines.union(&score_details_schema); } - None => {} } + Ok(unioned_schema_pipelines) } From 1b52ac05467b968b7e8d1dec41e1f3625d975ef1 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 24 Nov 2025 15:52:27 -0500 Subject: [PATCH 10/20] Add a starting schema to type narrowing test for validation --- .../src/schema_derivation_tests/stage.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs index 9baaa10ff..32611f481 100644 --- a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs +++ b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs @@ -2718,12 +2718,13 @@ mod rank_fusion { ); test_derive_stage_schema!( - rank_fusion_unions_types_together, + rank_fusion_narrows_any_of_types, expected = Ok(Schema::Document(Document { keys: map! { "phoneNumber".to_string() => AnyOf(set![Schema::Atomic(Atomic::Integer), Schema::Atomic(Atomic::String)]), }, - additional_properties: true, + required: set!["phoneNumber".to_string()], + additional_properties: false, ..Default::default() })), input = r#"{ @@ -2735,7 +2736,15 @@ mod rank_fusion { } } } - }"# + }"#, + starting_schema = Schema::Document(Document { + keys: map! { + "phoneNumber".to_string() => AnyOf(set![AnyOf(set![Schema::Atomic(Atomic::Timestamp), Schema::Atomic(Atomic::Integer), Schema::Atomic(Atomic::Double),Schema::Atomic(Atomic::String)])]), + }, + required: set!["phoneNumber".to_string()], + additional_properties: false, + jaccard_index: None, + }) ); test_derive_stage_schema!( @@ -2758,7 +2767,7 @@ mod rank_fusion { }], "searchOne": [ { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure","path": "plot"}}}, - { "$match": { "metacritic": { "$gt": 75 }}}, + { "$match": { "metacritic": { "$gt": 75.0 }}}, { "$sort": { "title": 1} }] } From b6ccdbdb650e925755ccfe94ba03d6d2c396ddeb Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 24 Nov 2025 16:30:17 -0500 Subject: [PATCH 11/20] Remove extraneous test comments --- .../schema_derivation/src/schema_derivation_tests/stage.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs index 32611f481..7f9061b44 100644 --- a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs +++ b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs @@ -2455,7 +2455,6 @@ mod rank_fusion { use super::*; use mongosql::schema::Schema::AnyOf; - // Test #1: Base Test - two pipelines and a starting schema test_derive_stage_schema!( two_search_pipelines_no_score_details, expected = Ok(Schema::Document(Document { @@ -2543,7 +2542,7 @@ mod rank_fusion { ..Default::default() }) ); - // Test #2: Score Details - Base Case - With Score Details + test_derive_stage_schema!( pipeline_with_score_details, expected = Ok(Schema::Document(Document { @@ -2632,7 +2631,6 @@ mod rank_fusion { ..Default::default() }) ); - // Test #3: Test where sub-pipelines have different output schemas test_derive_stage_schema!( sub_pipelines_with_different_output_schemas, expected = Ok(Schema::Document(Document { @@ -2718,7 +2716,7 @@ mod rank_fusion { ); test_derive_stage_schema!( - rank_fusion_narrows_any_of_types, + rank_fusion_narrows_anyof_types, expected = Ok(Schema::Document(Document { keys: map! { "phoneNumber".to_string() => AnyOf(set![Schema::Atomic(Atomic::Integer), Schema::Atomic(Atomic::String)]), From b017dbb11812f9205ce2548033dcc6d787d69813 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Tue, 25 Nov 2025 16:03:44 -0500 Subject: [PATCH 12/20] Simplify if let into a single conditional --- agg-ast/schema_derivation/src/schema_derivation.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index 4963d014e..9e4240ec2 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -453,8 +453,7 @@ impl DeriveSchema for Stage { }); // 2. If score_details is true, add scoreDetails schema to the overall schema - if let Some(score_details_enabled) = rank_fusion.score_details { - if score_details_enabled { + if let Some(true) = rank_fusion.score_details { let score_details_schema: Schema = Schema::Document(Document { keys: map! { "scoreDetails".to_string() => Schema::Document(Document { @@ -483,7 +482,7 @@ impl DeriveSchema for Stage { }); unioned_schema_pipelines = unioned_schema_pipelines.union(&score_details_schema); - } + } Ok(unioned_schema_pipelines) From 04be401fdfc6a2306d1c5a26e6eb1cd57af55c44 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 1 Dec 2025 10:38:55 -0500 Subject: [PATCH 13/20] Use merge() instead of union() for pipeline and merge schemas --- .../src/schema_derivation.rs | 57 +++++++++++-------- .../src/schema_derivation_tests/stage.rs | 5 ++ 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index 9e4240ec2..1443ccee7 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -454,35 +454,44 @@ impl DeriveSchema for Stage { // 2. If score_details is true, add scoreDetails schema to the overall schema if let Some(true) = rank_fusion.score_details { - let score_details_schema: Schema = Schema::Document(Document { + let score_details_schema: Schema = Schema::Document(Document { + keys: map! { + "scoreDetails".to_string() => Schema::Document(Document { keys: map! { - "scoreDetails".to_string() => Schema::Document(Document { + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "description".to_string() => Schema::Atomic(Atomic::String), + "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { keys: map! { - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "description".to_string() => Schema::Atomic(Atomic::String), - "details".to_string() => Schema::Array(Box::new(Schema::Document(Document { - keys: map! { - "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), - "rank".to_string() => Schema::Atomic(Atomic::Integer), - "weight".to_string() => Schema::Atomic(Atomic::Integer), - "value".to_string() => Schema::Atomic(Atomic::Decimal), - "details".to_string() => Schema::Array(Box::new(Schema::Any)), - }, - required: set!("inputPipelineName".to_string(), "rank".to_string(),), - ..Default::default() - }))) - }, - required: set!("value".to_string(), "description".to_string(),), + "inputPipelineName".to_string() => Schema::Atomic(Atomic::String), + "rank".to_string() => Schema::Atomic(Atomic::Integer), + "weight".to_string() => Schema::Atomic(Atomic::Integer), + "value".to_string() => Schema::Atomic(Atomic::Decimal), + "details".to_string() => Schema::Array(Box::new(Schema::Any)), + }, + required: set!("inputPipelineName".to_string(), "rank".to_string(),), ..Default::default() - }) + }))) }, - required: map! {}, - additional_properties: false, - jaccard_index: None, - }); - unioned_schema_pipelines = - unioned_schema_pipelines.union(&score_details_schema); + required: set!("value".to_string(), "description".to_string(),), + ..Default::default() + }) + }, + required: map! {}, + additional_properties: false, + jaccard_index: None, + }); + // Merge the pipeline schemas with the score details document + match unioned_schema_pipelines { + Schema::Document(ref pipeline_doc) => match score_details_schema { + Schema::Document(doc) => { + unioned_schema_pipelines = + Schema::Document(pipeline_doc.clone().merge(doc)); + } + _ => {} + }, + _ => {} + } } Ok(unioned_schema_pipelines) diff --git a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs index 7f9061b44..c03ee8682 100644 --- a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs +++ b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs @@ -2580,6 +2580,11 @@ mod rank_fusion { ..Default::default() }) }, + required: set!( + "title".to_string(), + "author".to_string(), + "isbn".to_string() + ), ..Default::default() })), input = r#"{ From d499f83890e4b45be1dad2154ba7c57bd966c89c Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 1 Dec 2025 10:51:14 -0500 Subject: [PATCH 14/20] Address clippy errors. Convert nested match to if-let --- .../schema_derivation/src/schema_derivation.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index 1443ccee7..a8a73760e 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -481,16 +481,12 @@ impl DeriveSchema for Stage { jaccard_index: None, }); - // Merge the pipeline schemas with the score details document - match unioned_schema_pipelines { - Schema::Document(ref pipeline_doc) => match score_details_schema { - Schema::Document(doc) => { - unioned_schema_pipelines = - Schema::Document(pipeline_doc.clone().merge(doc)); - } - _ => {} - }, - _ => {} + // Merge the pipeline schema and score details schema together + if let Schema::Document(ref pipeline_doc) = unioned_schema_pipelines { + if let Schema::Document(score_details_doc) = score_details_schema { + unioned_schema_pipelines = + Schema::Document(pipeline_doc.clone().merge(score_details_doc)); + } } } From c0d5a7a9c9a002365ef0303ad5cdde437614f882 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 1 Dec 2025 14:12:19 -0500 Subject: [PATCH 15/20] Replace fold() with for-loop. Add error handling test case. --- agg-ast/ast/src/serde_test.rs | 3 +- .../src/schema_derivation.rs | 40 ++++++++++--------- .../src/schema_derivation_tests/stage.rs | 38 +++++++++++++++++- 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/agg-ast/ast/src/serde_test.rs b/agg-ast/ast/src/serde_test.rs index 78968ba2c..ad5bb6b6c 100644 --- a/agg-ast/ast/src/serde_test.rs +++ b/agg-ast/ast/src/serde_test.rs @@ -1627,11 +1627,10 @@ mod stage_test { }, }, combination: None, - score_details: Some(false) + score_details: None }), input = r#"stage: {"$rankFusion": { "input": { "pipelines": { searchOne: [{ "$vectorSearch" : {"index" : "hybrid-vector-search", "path" : "plot_embedding_voyage_3_large", "queryVector": [10.6, 60.5], "numCandidates": 100} }] } }, - "scoreDetails": false, }}"# ); diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index a8a73760e..818e37441 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -439,22 +439,26 @@ impl DeriveSchema for Stage { state: &mut ResultSetState, ) -> Result { // 1. Derive the schema for each pipeline and union them together - let mut unioned_schema_pipelines: Schema = rank_fusion - .input - .pipelines - .values() - .map(|pipeline| { - let derived_schema = - derive_schema_for_pipeline(pipeline.clone(), None, &mut state.clone()); - derived_schema.unwrap_or(EMPTY_DOCUMENT.clone()) - }) - .fold(Schema::Unsat, |acc, derived_schema| { - acc.union(&derived_schema) - }); + let pipelines = rank_fusion.input.pipelines.values(); + + let mut unioned_schema_pipelines = Schema::Unsat; + + for pipeline in pipelines { + let derived_pipeline_schema = + derive_schema_for_pipeline(pipeline.clone(), None, &mut state.clone()); + match derived_pipeline_schema { + Ok(derived_schema) => { + unioned_schema_pipelines = unioned_schema_pipelines.union(&derived_schema); + } + Err(e) => { + return Err(e); + } + } + } // 2. If score_details is true, add scoreDetails schema to the overall schema if let Some(true) = rank_fusion.score_details { - let score_details_schema: Schema = Schema::Document(Document { + let score_details_document: Document = Document { keys: map! { "scoreDetails".to_string() => Schema::Document(Document { keys: map! { @@ -476,17 +480,15 @@ impl DeriveSchema for Stage { ..Default::default() }) }, - required: map! {}, + required: set!("scoreDetails".to_string()), additional_properties: false, jaccard_index: None, - }); + }; // Merge the pipeline schema and score details schema together if let Schema::Document(ref pipeline_doc) = unioned_schema_pipelines { - if let Schema::Document(score_details_doc) = score_details_schema { - unioned_schema_pipelines = - Schema::Document(pipeline_doc.clone().merge(score_details_doc)); - } + unioned_schema_pipelines = + Schema::Document(pipeline_doc.clone().merge(score_details_document)); } } diff --git a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs index c03ee8682..23ac028f6 100644 --- a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs +++ b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs @@ -2453,6 +2453,9 @@ mod unset_fields { mod rank_fusion { use super::*; + use agg_ast::definitions; + use agg_ast::definitions::{GeoNear, GeoNearPoint}; + use bson::Bson; use mongosql::schema::Schema::AnyOf; test_derive_stage_schema!( @@ -2583,7 +2586,8 @@ mod rank_fusion { required: set!( "title".to_string(), "author".to_string(), - "isbn".to_string() + "isbn".to_string(), + "scoreDetails".to_string() ), ..Default::default() })), @@ -2779,4 +2783,36 @@ mod rank_fusion { } }"# ); + test_derive_stage_schema!( + rank_fusion_errors_when_pipeline_schema_derivation_fails, + expected = Err(crate::Error::InvalidStage(Box::new(Stage::GeoNear( + Box::new(GeoNear { + distance_field: "dist.calculated".to_string(), + distance_multiplier: None, + include_locs: None, + key: None, + max_distance: Some(Bson::Int32(2)), + min_distance: None, + near: GeoNearPoint::GeoJSON(definitions::GeoJSON { + r#type: "Point".to_string(), + coordinates: [Bson::Double(-73.99279), Bson::Double(40.719296)] + }), + query: None, + spherical: None, + }) + )))), + input = r#"{ "$rankFusion" : { + "input" : { + "pipelines" : { + "searchOne": [{"$geoNear": { + "near": { "type": "Point", "coordinates": [ -73.99279 , 40.719296 ] }, + "distanceField": "dist.calculated", + "maxDistance": 2 + }}] + } + }, + "scoreDetails": false + } + }"# + ); } From eb5718b3804335d303a2a49de68227926f8e3ff6 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 1 Dec 2025 14:58:05 -0500 Subject: [PATCH 16/20] Refactor for-loop into try_fold --- .../src/schema_derivation.rs | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index 818e37441..e1160eda9 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -439,22 +439,17 @@ impl DeriveSchema for Stage { state: &mut ResultSetState, ) -> Result { // 1. Derive the schema for each pipeline and union them together - let pipelines = rank_fusion.input.pipelines.values(); - let mut unioned_schema_pipelines = Schema::Unsat; + let mut unioned_schema_pipelines: Schema = rank_fusion + .input + .pipelines + .iter() + .try_fold(Schema::Unsat, |acc, pair| { + let derived_pipeline_schema = + derive_schema_for_pipeline(pair.1.clone(), None, &mut state.clone())?; - for pipeline in pipelines { - let derived_pipeline_schema = - derive_schema_for_pipeline(pipeline.clone(), None, &mut state.clone()); - match derived_pipeline_schema { - Ok(derived_schema) => { - unioned_schema_pipelines = unioned_schema_pipelines.union(&derived_schema); - } - Err(e) => { - return Err(e); - } - } - } + Ok(acc.union(&derived_pipeline_schema)) + })?; // 2. If score_details is true, add scoreDetails schema to the overall schema if let Some(true) = rank_fusion.score_details { From 12b851e8a8190cf91174aa0f39db2d36c6ca003f Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Mon, 1 Dec 2025 15:09:50 -0500 Subject: [PATCH 17/20] Clean up tests. Move macros into rank_fusion module --- agg-ast/ast/src/lib.rs | 32 ----------- agg-ast/ast/src/serde_test.rs | 53 +++++++++++-------- .../src/schema_derivation_tests/stage.rs | 29 ---------- 3 files changed, 32 insertions(+), 82 deletions(-) diff --git a/agg-ast/ast/src/lib.rs b/agg-ast/ast/src/lib.rs index 7d37e864b..d6b334da9 100644 --- a/agg-ast/ast/src/lib.rs +++ b/agg-ast/ast/src/lib.rs @@ -22,35 +22,3 @@ macro_rules! map { ].into_iter()) }; } - -#[macro_export] -macro_rules! vector_pipeline { - () => { - vec![Stage::AtlasSearchStage(VectorSearch(Box::new( - Expression::Document(map! { - "index".to_string() => Literal(LiteralValue::String("hybrid-vector-search".to_string())), - "path".to_string() => Literal(LiteralValue::String("plot_embedding_voyage_3_large".to_string())), - "queryVector".to_string() => Expression::Array(vec![Literal(LiteralValue::Double(10.6)), Expression::Literal(LiteralValue::Double(60.5))]), - "numCandidates".to_string() => Literal(LiteralValue::Int32(100)), - }), - )))] - }; -} - -#[macro_export] -macro_rules! text_search_pipeline { - () => { - - vec![Stage::AtlasSearchStage( - Search(Box::new(Expression::Document( - map! { - "index".to_string() => Literal(LiteralValue::String("hybrid-full-text-search".to_string())), - "phrase".to_string() => Expression::Document(map! { - "query".to_string() => Literal(LiteralValue::String("star wars".to_string())), - "path".to_string() => Literal(LiteralValue::String("title".to_string())), - }) - }, - ))) - ), Limit(20)] - }; -} diff --git a/agg-ast/ast/src/serde_test.rs b/agg-ast/ast/src/serde_test.rs index ad5bb6b6c..d6505de49 100644 --- a/agg-ast/ast/src/serde_test.rs +++ b/agg-ast/ast/src/serde_test.rs @@ -1616,7 +1616,38 @@ mod stage_test { RankFusionCombination, RankFusionInput, Ref, Stage, Stage::Limit, }; - use crate::{map, text_search_pipeline, vector_pipeline}; + use crate::map; + + #[macro_export] + macro_rules! vector_pipeline { + () => { + vec![Stage::AtlasSearchStage(VectorSearch(Box::new( + Expression::Document(map! { + "index".to_string() => Literal(LiteralValue::String("hybrid-vector-search".to_string())), + "path".to_string() => Literal(LiteralValue::String("plot_embedding_voyage_3_large".to_string())), + "queryVector".to_string() => Expression::Array(vec![Literal(LiteralValue::Double(10.6)), Expression::Literal(LiteralValue::Double(60.5))]), + "numCandidates".to_string() => Literal(LiteralValue::Int32(100)), + }), + )))] + }; + } + + #[macro_export] + macro_rules! text_search_pipeline { + () => { + vec![Stage::AtlasSearchStage( + Search(Box::new(Expression::Document( + map! { + "index".to_string() => Literal(LiteralValue::String("hybrid-full-text-search".to_string())), + "phrase".to_string() => Expression::Document(map! { + "query".to_string() => Literal(LiteralValue::String("star wars".to_string())), + "path".to_string() => Literal(LiteralValue::String("title".to_string())), + }) + }, + ))) + ), Limit(20)] + }; + } test_serde_stage!( rank_fusion_single_pipeline, @@ -1634,26 +1665,6 @@ mod stage_test { }}"# ); - test_serde_stage!( - rank_fusion_uses_latest_key_to_deduplicate_pipelines, - expected = Stage::RankFusion(RankFusion { - input: RankFusionInput { - pipelines: map! { - "searchOne".to_string() => vector_pipeline!() - }, - }, - combination: None, - score_details: Some(false) - }), - input = r#"stage: {"$rankFusion": { - "input": { "pipelines": { - searchOne: [{ "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "star wars", "path": "title"}}}, { "$project": { "title": 1, "released" : 1 } }], - searchOne: [{ "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "star wars", "path": "title"}}}, { "$limit": 20 }], - searchOne: [{ "$vectorSearch" : {"index" : "hybrid-vector-search", "path" : "plot_embedding_voyage_3_large", "queryVector": [10.6, 60.5], "numCandidates": 100} }] } }, - "scoreDetails": false, - }}"# - ); - test_serde_stage!( rank_fusion_multiple_pipelines_with_weights, expected = Stage::RankFusion(RankFusion { diff --git a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs index 23ac028f6..429c75ad5 100644 --- a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs +++ b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs @@ -2754,35 +2754,6 @@ mod rank_fusion { }) ); - test_derive_stage_schema!( - rank_fusion_uses_latest_key_for_duplicate_pipelines, - expected = Ok(Schema::Document(Document { - keys: map! { - "metacritic".to_string() => AnyOf(set![Schema::Atomic(Atomic::Integer), Schema::Atomic(Atomic::Long), Schema::Atomic(Atomic::Double), Schema::Atomic(Atomic::Decimal)]), - }, - required: Default::default(), - additional_properties: true, - jaccard_index: None, - })), - input = r#"{ "$rankFusion" : { - "input" : { - "pipelines" : { - "searchOne": [ - { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure", "path": "plot"}}}, - { "$match": { "genres": "Western", "year": { "$lt": 1980 }}}, - { "$sort": { "runtime": 1} - }], - "searchOne": [ - { "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure","path": "plot"}}}, - { "$match": { "metacritic": { "$gt": 75.0 }}}, - { "$sort": { "title": 1} - }] - } - }, - "scoreDetails": false - } - }"# - ); test_derive_stage_schema!( rank_fusion_errors_when_pipeline_schema_derivation_fails, expected = Err(crate::Error::InvalidStage(Box::new(Stage::GeoNear( From 5a48efeb6d1dc4d8bd57565916e150b95288bd07 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Tue, 2 Dec 2025 12:09:44 -0500 Subject: [PATCH 18/20] Format JSON in tests. destructure kv-pair in try_fold --- .../src/schema_derivation.rs | 4 +- .../src/schema_derivation_tests/stage.rs | 212 +++++++++--------- 2 files changed, 108 insertions(+), 108 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index e1160eda9..cfde48deb 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -444,9 +444,9 @@ impl DeriveSchema for Stage { .input .pipelines .iter() - .try_fold(Schema::Unsat, |acc, pair| { + .try_fold(Schema::Unsat, |acc, (_, pipeline)| { let derived_pipeline_schema = - derive_schema_for_pipeline(pair.1.clone(), None, &mut state.clone())?; + derive_schema_for_pipeline(pipeline.clone(), None, &mut state.clone())?; Ok(acc.union(&derived_pipeline_schema)) })?; diff --git a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs index 429c75ad5..e17f7580b 100644 --- a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs +++ b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs @@ -2484,43 +2484,43 @@ mod rank_fusion { ..Default::default() })), input = r#"{ - "$rankFusion": { - "input": { - "pipelines": { - "vectorPipeline": [ - { - "$vectorSearch": { - "index": "hybrid-vector-search", - "path": "plot_embedding_voyage_3_large", - "queryVector": [10.6, 60.5], - "numCandidates": 100, - "limit": 20 - } - } - ], - "fullTextPipeline": [ - { - "$search": { - "index": "hybrid-full-text-search", - "phrase": { - "query": "legend", - "path": "title" + "$rankFusion": { + "input": { + "pipelines": { + "vectorPipeline": [ + { + "$vectorSearch": { + "index": "hybrid-vector-search", + "path": "plot_embedding_voyage_3_large", + "queryVector": [10.6, 60.5], + "numCandidates": 100, + "limit": 20 + } + } + ], + "fullTextPipeline": [ + { + "$search": { + "index": "hybrid-full-text-search", + "phrase": { + "query": "legend", + "path": "title" + } + } + }, + { "$limit": 10 } + ] } - } - }, - { "$limit": 10 } - ] - } - }, - "combination": { - "weights": { - "vectorPipeline": 0.5, - "fullTextPipeline": 0.5 - } - }, - "scoreDetails": false - } - }"#, + }, + "combination": { + "weights": { + "vectorPipeline": 0.5, + "fullTextPipeline": 0.5 + } + }, + "scoreDetails": false + } + }"#, starting_schema = Schema::Document(Document { keys: map! { "title".to_string() => Schema::Atomic(Atomic::String), @@ -2593,29 +2593,29 @@ mod rank_fusion { })), input = r#"{ "$rankFusion": { - "input": { - "pipelines": { - "vectorPipeline": [ - { - "$vectorSearch": { - "index": "hybrid-vector-search", - "path": "plot_embedding_voyage_3_large", - "queryVector": [10.6, 60.5], - "numCandidates": 100, - "limit": 20 - } - } - ] - } - }, - "combination": { - "weights": { - "vectorPipeline": 0.5, - "fullTextPipeline": 0.5 - } - }, - "scoreDetails": true - }}"#, + "input": { + "pipelines": { + "vectorPipeline": [ + { + "$vectorSearch": { + "index": "hybrid-vector-search", + "path": "plot_embedding_voyage_3_large", + "queryVector": [10.6, 60.5], + "numCandidates": 100, + "limit": 20 + } + } + ] + } + }, + "combination": { + "weights": { + "vectorPipeline": 0.5, + "fullTextPipeline": 0.5 + } + }, + "scoreDetails": true + }}"#, starting_schema = Schema::Document(Document { keys: map! { "title".to_string() => Schema::Atomic(Atomic::String), @@ -2660,45 +2660,45 @@ mod rank_fusion { ..Default::default() })), input = r#"{ - "$rankFusion": { - "input": { - "pipelines": { - "vectorPipeline": [ - { - "$vectorSearch": { - "index": "hybrid-vector-search", - "path": "plot_embedding_voyage_3_large", - "queryVector": [10.6, 60.5], - "numCandidates": 100, - "limit": 20 - } - }, - {"$project" : { "title": 1 }} - - ], - "fullTextPipeline": [ - { - "$search": { - "index": "hybrid-full-text-search", - "phrase": { - "query": "legend", - "path": "title" + "$rankFusion": { + "input": { + "pipelines": { + "vectorPipeline": [ + { + "$vectorSearch": { + "index": "hybrid-vector-search", + "path": "plot_embedding_voyage_3_large", + "queryVector": [10.6, 60.5], + "numCandidates": 100, + "limit": 20 + } + }, + {"$project" : { "title": 1 }} + + ], + "fullTextPipeline": [ + { + "$search": { + "index": "hybrid-full-text-search", + "phrase": { + "query": "legend", + "path": "title" + } + } + }, + {"$project" : {"author" : 1}} + ] } - } - }, - {"$project" : {"author" : 1}} - ] - } - }, - "combination": { - "weights": { - "vectorPipeline": 0.5, - "fullTextPipeline": 0.5 - } - }, - "scoreDetails": false - } - }"#, + }, + "combination": { + "weights": { + "vectorPipeline": 0.5, + "fullTextPipeline": 0.5 + } + }, + "scoreDetails": false + } + }"#, starting_schema = Schema::Document(Document { keys: map! { "title".to_string() => Schema::Atomic(Atomic::String), @@ -2735,15 +2735,15 @@ mod rank_fusion { ..Default::default() })), input = r#"{ - "$rankFusion" : { - "input" : { - "pipelines" : { - "searchOne": [{ "$match": { "phoneNumber": { "$type": "int"}}}, {"$sort": { "country": 1}}], - "searchTwo": [{ "$match": { "phoneNumber": { "$type": "string"}}}, {"$sort": { "country": 1}}] - } - } - } - }"#, + "$rankFusion" : { + "input" : { + "pipelines" : { + "searchOne": [{ "$match": { "phoneNumber": { "$type": "int"}}}, {"$sort": { "country": 1}}], + "searchTwo": [{ "$match": { "phoneNumber": { "$type": "string"}}}, {"$sort": { "country": 1}}] + } + } + } + }"#, starting_schema = Schema::Document(Document { keys: map! { "phoneNumber".to_string() => AnyOf(set![AnyOf(set![Schema::Atomic(Atomic::Timestamp), Schema::Atomic(Atomic::Integer), Schema::Atomic(Atomic::Double),Schema::Atomic(Atomic::String)])]), From 2ebb942549c3e838bbf6f7780aa5499ea573a027 Mon Sep 17 00:00:00 2001 From: jpowell-mongo Date: Tue, 2 Dec 2025 20:31:37 -0500 Subject: [PATCH 19/20] Update agg-ast/schema_derivation/src/schema_derivation.rs Co-authored-by: Jonathan Chemburkar --- agg-ast/schema_derivation/src/schema_derivation.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/agg-ast/schema_derivation/src/schema_derivation.rs b/agg-ast/schema_derivation/src/schema_derivation.rs index cfde48deb..6ae58ecac 100644 --- a/agg-ast/schema_derivation/src/schema_derivation.rs +++ b/agg-ast/schema_derivation/src/schema_derivation.rs @@ -438,8 +438,7 @@ impl DeriveSchema for Stage { rank_fusion: &RankFusion, state: &mut ResultSetState, ) -> Result { - // 1. Derive the schema for each pipeline and union them together - + // Derive the schema for each pipeline and union them together let mut unioned_schema_pipelines: Schema = rank_fusion .input .pipelines From 7b593828522d30f83060fec7e2bb2b95d32a7360 Mon Sep 17 00:00:00 2001 From: Jonathan Powell Date: Wed, 3 Dec 2025 09:32:15 -0500 Subject: [PATCH 20/20] Remove macro-export. Format stage test json --- agg-ast/ast/src/serde_test.rs | 2 - .../src/schema_derivation_tests/stage.rs | 53 ++++++++++--------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/agg-ast/ast/src/serde_test.rs b/agg-ast/ast/src/serde_test.rs index d6505de49..9ef770a15 100644 --- a/agg-ast/ast/src/serde_test.rs +++ b/agg-ast/ast/src/serde_test.rs @@ -1618,7 +1618,6 @@ mod stage_test { }; use crate::map; - #[macro_export] macro_rules! vector_pipeline { () => { vec![Stage::AtlasSearchStage(VectorSearch(Box::new( @@ -1632,7 +1631,6 @@ mod stage_test { }; } - #[macro_export] macro_rules! text_search_pipeline { () => { vec![Stage::AtlasSearchStage( diff --git a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs index e17f7580b..a6ad4c3e5 100644 --- a/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs +++ b/agg-ast/schema_derivation/src/schema_derivation_tests/stage.rs @@ -2483,7 +2483,8 @@ mod rank_fusion { ), ..Default::default() })), - input = r#"{ + input = r#" + { "$rankFusion": { "input": { "pipelines": { @@ -2520,7 +2521,7 @@ mod rank_fusion { }, "scoreDetails": false } - }"#, + }"#, starting_schema = Schema::Document(Document { keys: map! { "title".to_string() => Schema::Atomic(Atomic::String), @@ -2591,31 +2592,33 @@ mod rank_fusion { ), ..Default::default() })), - input = r#"{ - "$rankFusion": { - "input": { - "pipelines": { - "vectorPipeline": [ - { - "$vectorSearch": { - "index": "hybrid-vector-search", - "path": "plot_embedding_voyage_3_large", - "queryVector": [10.6, 60.5], - "numCandidates": 100, - "limit": 20 - } + input = r#" + { + "$rankFusion": { + "input": { + "pipelines": { + "vectorPipeline": [ + { + "$vectorSearch": { + "index": "hybrid-vector-search", + "path": "plot_embedding_voyage_3_large", + "queryVector": [10.6, 60.5], + "numCandidates": 100, + "limit": 20 + } + } + ] } - ] - } - }, - "combination": { - "weights": { - "vectorPipeline": 0.5, - "fullTextPipeline": 0.5 + }, + "combination": { + "weights": { + "vectorPipeline": 0.5, + "fullTextPipeline": 0.5 + } + }, + "scoreDetails": true } - }, - "scoreDetails": true - }}"#, + }"#, starting_schema = Schema::Document(Document { keys: map! { "title".to_string() => Schema::Atomic(Atomic::String),