Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
73414e2
SQL-3008 - signed commits
jpowell-mongo Nov 20, 2025
cf965ee
Nest score details in if-statement
jpowell-mongo Nov 20, 2025
51ac057
Rename RankFusionPipeline to RankFusionInput to match docs
jpowell-mongo Nov 24, 2025
bbeb694
Address schema_derivation.rs comments
jpowell-mongo Nov 24, 2025
f437aa7
Replace pipeline functions with macros. Format code
jpowell-mongo Nov 24, 2025
0ac316d
Fix failing test
jpowell-mongo Nov 24, 2025
7510d2e
Make scoreDetails optional. Add test for deduplicating match + sort p…
jpowell-mongo Nov 24, 2025
1de0811
Additional stage tests
jpowell-mongo Nov 24, 2025
b2ee1bf
Use if-let to fix clippy warnings
jpowell-mongo Nov 24, 2025
1b52ac0
Add a starting schema to type narrowing test for validation
jpowell-mongo Nov 24, 2025
b6ccdbd
Remove extraneous test comments
jpowell-mongo Nov 24, 2025
b017dbb
Simplify if let into a single conditional
jpowell-mongo Nov 25, 2025
04be401
Use merge() instead of union() for pipeline and merge schemas
jpowell-mongo Dec 1, 2025
d499f83
Address clippy errors. Convert nested match to if-let
jpowell-mongo Dec 1, 2025
c0d5a7a
Replace fold() with for-loop. Add error handling test case.
jpowell-mongo Dec 1, 2025
eb5718b
Refactor for-loop into try_fold
jpowell-mongo Dec 1, 2025
12b851e
Clean up tests. Move macros into rank_fusion module
jpowell-mongo Dec 1, 2025
5a48efe
Format JSON in tests. destructure kv-pair in try_fold
jpowell-mongo Dec 2, 2025
2ebb942
Update agg-ast/schema_derivation/src/schema_derivation.rs
jpowell-mongo Dec 3, 2025
7b59382
Remove macro-export. Format stage test json
jpowell-mongo Dec 3, 2025
534d281
Merge branch 'jp/SQL-3008_signed' of github.com:mongodb/mongosql into…
jpowell-mongo Dec 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions agg-ast/ast/src/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub enum Stage {
Sample(Sample),
#[serde(rename = "$unionWith")]
UnionWith(UnionWith),
#[serde(rename = "$rankFusion")]
RankFusion(RankFusion),

// Search stages
#[serde(rename = "$graphLookup")]
Expand Down Expand Up @@ -620,6 +622,23 @@ pub struct Bucket {
pub default: Option<Bson>,
pub output: Option<LinkedHashMap<String, Expression>>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RankFusion {
pub input: RankFusionInput,
pub combination: Option<RankFusionCombination>,
pub score_details: Option<bool>,
Copy link
Collaborator Author

@jpowell-mongo jpowell-mongo Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the docs for $rankFusion don't list this field as optional, it defaults to false. However, one of the example agg pipelines in the docs omit this field, so I've made it Option to ensure parsing can handle that case.

}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct RankFusionInput {
pub pipelines: LinkedHashMap<String, Vec<Stage>>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct RankFusionCombination {
pub weights: LinkedHashMap<String, f64>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -1783,6 +1802,7 @@ impl Stage {
Stage::GeoNear(_) => "$geoNear",
Stage::Sample(_) => "$sample",
Stage::UnionWith(_) => "$unionWith",
Stage::RankFusion(_) => "$rankFusion",
Stage::GraphLookup(_) => "$graphLookup",
Stage::AtlasSearchStage(_) => "<Atlas search stage>",
}
Expand Down
161 changes: 161 additions & 0 deletions agg-ast/ast/src/serde_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,167 @@ mod stage_test {
}}"#
);
}
mod rank_fusion {

use crate::definitions::Stage::{AtlasSearchStage, Match, Sort};
use crate::definitions::{
AtlasSearchStage::{Search, VectorSearch},
Expression,
Expression::Literal,
LiteralValue, MatchBinaryOp, MatchExpression, MatchField, MatchStage, RankFusion,
RankFusionCombination, RankFusionInput, Ref, Stage,
Stage::Limit,
};
use crate::map;

macro_rules! vector_pipeline {
() => {
vec![Stage::AtlasSearchStage(VectorSearch(Box::new(
Expression::Document(map! {
"index".to_string() => Literal(LiteralValue::String("hybrid-vector-search".to_string())),
"path".to_string() => Literal(LiteralValue::String("plot_embedding_voyage_3_large".to_string())),
"queryVector".to_string() => Expression::Array(vec![Literal(LiteralValue::Double(10.6)), Expression::Literal(LiteralValue::Double(60.5))]),
"numCandidates".to_string() => Literal(LiteralValue::Int32(100)),
}),
)))]
};
}

macro_rules! text_search_pipeline {
() => {
vec![Stage::AtlasSearchStage(
Search(Box::new(Expression::Document(
map! {
"index".to_string() => Literal(LiteralValue::String("hybrid-full-text-search".to_string())),
"phrase".to_string() => Expression::Document(map! {
"query".to_string() => Literal(LiteralValue::String("star wars".to_string())),
"path".to_string() => Literal(LiteralValue::String("title".to_string())),
})
},
)))
), Limit(20)]
};
}

test_serde_stage!(
rank_fusion_single_pipeline,
expected = Stage::RankFusion(RankFusion {
input: RankFusionInput {
pipelines: map! {
"searchOne".to_string() => vector_pipeline!()
},
},
combination: None,
score_details: None
}),
input = r#"stage: {"$rankFusion": {
"input": { "pipelines": { searchOne: [{ "$vectorSearch" : {"index" : "hybrid-vector-search", "path" : "plot_embedding_voyage_3_large", "queryVector": [10.6, 60.5], "numCandidates": 100} }] } },
}}"#
);

test_serde_stage!(
rank_fusion_multiple_pipelines_with_weights,
expected = Stage::RankFusion(RankFusion {
input: RankFusionInput {
pipelines: map! {
"vectorPipeline".to_string() => vector_pipeline!(),
"fullTextPipeline".to_string() => text_search_pipeline!(),
},
},
combination: Some(RankFusionCombination {
weights: map! {
"vectorPipeline".to_string() => 0.5,
"fullTextPipeline".to_string() => 0.5
}
}),
score_details: Some(true)
}),
input = r#"stage: {
"$rankFusion": {
"input": {
pipelines: {
vectorPipeline: [
{
"$vectorSearch": {
"index": "hybrid-vector-search",
"path": "plot_embedding_voyage_3_large",
"queryVector": [10.6, 60.5],
"numCandidates": 100,
}
}
],
fullTextPipeline: [
{
"$search": {
"index": "hybrid-full-text-search",
"phrase": {
"query": "star wars",
"path": "title"
}
}
},
{ "$limit": 20 }
]
}
},
"combination": {
weights: {
vectorPipeline: 0.5,
fullTextPipeline: 0.5
}
},
"scoreDetails": true
}
}"#
);

test_serde_stage!(
pipelines_are_deduplicated,
expected = Stage::RankFusion(RankFusion {
input: RankFusionInput {
pipelines: map! {
"searchOne".to_string() => vec![AtlasSearchStage(Search(Box::new(
Expression::Document(map! {
"index".to_string() => Literal(LiteralValue::String("hybrid-full-text-search".to_string())),
"phrase".to_string() => Expression::Document(map! {
"query".to_string() => Literal(LiteralValue::String("adventure".to_string())),
"path".to_string() => Literal(LiteralValue::String("plot".to_string()))
}),

}),
))),
Match(MatchStage {
expr: vec![MatchExpression::Field(MatchField {
field: Ref::FieldRef("metacritic".to_string()),
ops: map! { MatchBinaryOp::Gt => bson::Bson::Int32(75) }
})]
}) ,
Sort(map! {"title".to_string() => 1})]
},
},
combination: None,
score_details: Some(false)
}),
input = r#"stage: { "$rankFusion" : {
"input" : {
"pipelines" : {
searchOne: [
{ "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure", "path": "plot"}}},
{ "$match": { "genres": "Western", "year": { "$lt": 1980 }}},
{ "$sort": { "runtime": 1}
}],
searchOne: [
{ "$search": { "index": "hybrid-full-text-search", "phrase": { "query": "adventure","path": "plot"}}},
{ "$match": { "metacritic": { "$gt": 75 }}},
{ "$sort": { "title": 1}
}]
}
},
"scoreDetails": false
}
}"#
);
}

mod densify {
use crate::definitions::{Densify, DensifyRange, DensifyRangeBounds, Stage};
Expand Down
61 changes: 59 additions & 2 deletions agg-ast/schema_derivation/src/schema_derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::{
use agg_ast::definitions::{
AtlasSearchStage, Bucket, BucketAuto, ConciseSubqueryLookup, Densify, Documents,
EqualityLookup, Expression, Fill, FillOutput, GraphLookup, Group, LiteralValue, Lookup,
LookupFrom, Namespace, ProjectItem, ProjectStage, Ref, SetWindowFields, Stage, SubqueryLookup,
TaggedOperator, UnionWith, Unset, UntaggedOperator, UntaggedOperatorName, Unwind,
LookupFrom, Namespace, ProjectItem, ProjectStage, RankFusion, Ref, SetWindowFields, Stage,
SubqueryLookup, TaggedOperator, UnionWith, Unset, UntaggedOperator, UntaggedOperatorName,
Unwind,
};
use linked_hash_map::LinkedHashMap;
use mongosql::{
Expand Down Expand Up @@ -433,6 +434,61 @@ impl DeriveSchema for Stage {
}
}

fn rank_fusion_derive_schema(
rank_fusion: &RankFusion,
state: &mut ResultSetState,
) -> Result<Schema> {
// Derive the schema for each pipeline and union them together
let mut unioned_schema_pipelines: Schema = rank_fusion
.input
.pipelines
.iter()
.try_fold(Schema::Unsat, |acc, (_, pipeline)| {
let derived_pipeline_schema =
derive_schema_for_pipeline(pipeline.clone(), None, &mut state.clone())?;

Ok(acc.union(&derived_pipeline_schema))
})?;

// 2. If score_details is true, add scoreDetails schema to the overall schema
if let Some(true) = rank_fusion.score_details {
let score_details_document: Document = Document {
keys: map! {
"scoreDetails".to_string() => Schema::Document(Document {
keys: map! {
"value".to_string() => Schema::Atomic(Atomic::Decimal),
"description".to_string() => Schema::Atomic(Atomic::String),
"details".to_string() => Schema::Array(Box::new(Schema::Document(Document {
keys: map! {
"inputPipelineName".to_string() => Schema::Atomic(Atomic::String),
"rank".to_string() => Schema::Atomic(Atomic::Integer),
"weight".to_string() => Schema::Atomic(Atomic::Integer),
"value".to_string() => Schema::Atomic(Atomic::Decimal),
"details".to_string() => Schema::Array(Box::new(Schema::Any)),
},
required: set!("inputPipelineName".to_string(), "rank".to_string(),),
..Default::default()
})))
},
required: set!("value".to_string(), "description".to_string(),),
..Default::default()
})
},
required: set!("scoreDetails".to_string()),
additional_properties: false,
jaccard_index: None,
};

// Merge the pipeline schema and score details schema together
if let Schema::Document(ref pipeline_doc) = unioned_schema_pipelines {
unioned_schema_pipelines =
Schema::Document(pipeline_doc.clone().merge(score_details_document));
}
}

Ok(unioned_schema_pipelines)
}

/// bucket_derive_schema derives the schema for a $bucket stage. The schema is defined by the output field,
/// and contains _id as well.
fn bucket_derive_schema(bucket: &Bucket, state: &mut ResultSetState) -> Result<Schema> {
Expand Down Expand Up @@ -1128,6 +1184,7 @@ impl DeriveSchema for Stage {
Stage::Lookup(l) => lookup_derive_schema(l, state),
Stage::Match(ref m) => m.derive_schema(state),
Stage::Project(p) => project_derive_schema(p, state),
Stage::RankFusion(rf) => rank_fusion_derive_schema(rf, state),
Stage::Redact(_) => Ok(state.result_set_schema.to_owned()),
Stage::ReplaceWith(r) => r
.to_owned()
Expand Down
Loading