Skip to content

Conversation

@jpowell-mongo
Copy link
Collaborator

@jpowell-mongo jpowell-mongo commented Nov 20, 2025

Overview

This PR adds $rankFusion support to schema derivation.

Per the $rankFusion docs the operator "first executes all input pipelines independently and then de-duplicates and combines the input pipeline results into a final ranked results set."

$rankFusion is not currently supported in views as of mongo version 8.2.1. As is, these changes cannot be validated on live data in mongosh or Atlas.

Changes Introduced

  • Model the $rankFusion operator in agg-ast based on Command Fields section of the docs
  • Implement schema derivation for the $rankFusion operator by union-ing all the input pipelines together.
  • Unit Tests for JSON parsing of $rankFusion, and schema derivation tests.

Testing

Unit tests for the derived schema for $rankFusion, and unit tests for parsing the JSON of a $rankFusion aggregation stage.

Notes

  • I wasn't sure what the right default value is for an empty schema, notice I used Schema::Unsat in one case and EMPTY_DOCUMENT in another.
  • I may need guidance on how requiredFields works for unions. It seems like it takes an intersection, and I wasn't sure if required fields are meant to be preserved when you union two schemas together.

@jpowell-mongo jpowell-mongo requested a review from a team as a code owner November 20, 2025 20:06
Copy link
Collaborator

@jchemburkar jchemburkar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking really good (and not just for a first PR!)

Have mostly nits about code conventions, and a couple thoughts on additional tests 😄

}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct RankFusionPipeline {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is a tiny nit, but maybe something like RankFusionInput would make more sense, sense the input is a document of multiple pipelines

Copy link
Collaborator Author

@jpowell-mongo jpowell-mongo Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in:

51ac057

}
mod rank_fusion {

use crate::definitions::AtlasSearchStage::{Search, VectorSearch};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised clippy/rustfmt didn't catch this, but we should compress this into something like:

use crate::definitions::{AtlasSearchStage::{Search, VectorSearch}, Expression::Literal}, etc etc

such that all imports from the same crate are nested together

score_details: true
}),
input = r#"stage: {
"$rankFusion": {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we indent these in a similar format to others (e.g. each level of nesting is one tab space further in than its parent bracket). I think this might just be moving the whole chunk post 1744 over the same amount (a few tabs)

}}"#
);

// Test #4: Validate we parse multiple input pipelines along with specifying weights for each pipeline
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a tiny nit but I think convention is that the test name should describe the test itself, so these comments are (or should be) redundant

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in:
f437aa7

use crate::map;
use linked_hash_map::LinkedHashMap;

pub fn empty_rankfusion_pipeline() -> LinkedHashMap<String, Vec<Stage>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit but might be helpful for future endeavors -- I think we should make these macros instead of functions

the benefit of these being macros > functions is that rather than being evaluated at runtime, macros are evaluated at compile time, and are functionally codegen. This makes them slighly more finicky when writing but then when the code is actually executed, there's less ambiguity. Its more of a rust-ism than something that directly impacts what is being done here though

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

counter nit: this could also be a LazyLock if you want a nice compromise between nicer compiler messages and runtime shenanigans.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I see that AtlasSearchStage implements deserialize, so you might also be able to simplify the construction of these pipeline by passing in a string that you deserialize into the requisite type. The snapshot tests you have below show how the string should look like.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up replacing the functions with macros here:
f437aa7

score_details: true
}),
input = r#"stage: {"$rankFusion": {
"input": { "pipelines": { searchOne: []}},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to remove this test -- iirc this doesn't actually work on the server (eg in mongosh), right?

one assumption of schema derivation is that the input pipelines are valid, so we dont need to error handle/validate the empty pipeline case, but if it isn't valid we can skip it. Ignore this comment if it is valid though 😁

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, the docs say that pipelines must have at least one pipeline:

"""
Contains a map of pipeline names to the aggregation stages that define that pipeline. input.pipelines must contain at least one pipeline. All pipelines must operate on the same collection and must have a unique name.

For more information on input pipeline restrictions, see Input Pipelines and Input Pipeline Names.
"""

Addressed in:
f437aa7

rank_fusion_multiple_pipelines_with_weights,
expected = Stage::RankFusion(RankFusion {
input: RankFusionPipeline {
pipelines: two_rank_fusion_pipelines(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the above comment about making these macros, I have a slight preference for only factoring out the pipelines themselves, not this whole document.

E.g. if we have a search_pipeline!() macro and a vector_pipeline!() macro, I think this test would be most readable if it was like:

pipelines: {
    vectorPipeline: vector_pipeline!(),
    fullTextSearchPipeline: search_pipeline!(),
}

I think this would make it easier to debug if the test started failing to still have the keys present

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in:
f437aa7

I factored out the pipelines into dedicated macros.

state: &mut ResultSetState,
) -> Result<Schema> {
// 1. Derive the schema for each pipeline and union them together
let unioned_schema_pipelines: Schema = rank_fusion
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we make unioned_schema_pipelines mutable and then just always return it at the end (e.g. in the if case, just do unioned_schema_pipelines = unioned_schema_pipelines.union(score_details_metadata_schema))?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in:
bbeb694

..Default::default()
});

let score_details_scheme = Schema::Document(Document {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should just define this combine this with the score_details_metadata_schema variable I think, just have one big constant 🤷

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in:
bbeb694

);
}

mod rank_fusion {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is probably a test case or two that could be added here that would be beneficial. Here's what I have in mind:

  1. one where there are two pipelines, with the same key, where both pipelines are of the form: [{$match ...}, {$sort: ...}]. The match stages will be different , meaning it matters that we only get the schema for the second mention of the key. This is really something that should be solved by deserialization, but would be worth seeing here too
  2. a case where we have two match pipelines (similar format to ^), different keys (ie both output schemas get unioned), and the resulting schema is a subset of the original. Consider something like -- original schema has a field a that is AnyOf(int, double, string). The first pipeline matches where a is an int, and the second matches where a is a string. We would expect the output schema to not have double as a possible type for a. I think this will really verify the other case for union

I tried my best writing this but know it could be confusing -- let me know if you have any questions!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in:
1de0811

I added these two test cases

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to make some small changes so actual prefer this commit if you want to see the tests:
1b52ac0

Copy link
Collaborator

@nicholascioli nicholascioli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am super new to this codebase, but I left a small comment on some possible optimizations to help in legibility of the tests!

use crate::map;
use linked_hash_map::LinkedHashMap;

pub fn empty_rankfusion_pipeline() -> LinkedHashMap<String, Vec<Stage>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

counter nit: this could also be a LazyLock if you want a nice compromise between nicer compiler messages and runtime shenanigans.

use crate::map;
use linked_hash_map::LinkedHashMap;

pub fn empty_rankfusion_pipeline() -> LinkedHashMap<String, Vec<Stage>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I see that AtlasSearchStage implements deserialize, so you might also be able to simplify the construction of these pipeline by passing in a string that you deserialize into the requisite type. The snapshot tests you have below show how the string should look like.

}

#[macro_export]
macro_rules! vector_pipeline {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[TODO] I need to revisit these macros to possibly make them more extensible? I wasn't sure if the goal was to return "canned" vector and search pipelines, or if I should consider these more generic to either accepts field values for the VectorSearch and Search AtlasSearchStage types.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to make these generic since really all we care about is that there's a stage present. The details of the stage don't really matter for these tests, so let's just leave these as they are.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After fighting my IDE on types, I ultimately went with a for-loop approach to avoid all the issues I had satifying fold() and try_fold().

Addressed in:
c0d5a7a

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to figure out the try_fold with help from Jonathan Chemburkar:
eb5718b

pub struct RankFusion {
pub input: RankFusionInput,
pub combination: Option<RankFusionCombination>,
pub score_details: Option<bool>,
Copy link
Collaborator Author

@jpowell-mongo jpowell-mongo Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the docs for $rankFusion don't list this field as optional, it defaults to false. However, one of the example agg pipelines in the docs omit this field, so I've made it Option to ensure parsing can handle that case.

@jpowell-mongo jpowell-mongo requested review from mattChiaravalloti and removed request for EthanHardyMongo November 25, 2025 16:11
Copy link
Collaborator

@mattChiaravalloti mattChiaravalloti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice start! Some things you can definitely work on while Jon is away, but some others I noted he might want to discuss with us before we commit to making any changes.

}

#[macro_export]
macro_rules! vector_pipeline {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to make these generic since really all we care about is that there's a stage present. The details of the stage don't really matter for these tests, so let's just leave these as they are.

);

test_derive_stage_schema!(
rank_fusion_uses_latest_key_for_duplicate_pipelines,
Copy link
Collaborator

@mattChiaravalloti mattChiaravalloti Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know @jchemburkar specifically asked for this one but I'm not sure we should be testing this. A couple reasons:

  1. The docs for $rankFusion specifically say "All pipelines must operate on the same collection and must have a unique name." (emphasis mine).
  2. Documents with the same key present multiple times is technically undefined behavior. I see where Jon is coming from since we have followed the convention in the past that the "last" value is the one used (and I think that's how our parser implementation will always work), but in principle I'm not sure we should be testing undefined behavior.

Given those 2 reasons, and the fact that we operate on the assumption that view pipelines are "valid" when deriving schema, I think we should remove this test. We can wait to hear from Jon next week when he's back in office before removing it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the duplicate key tests here:
12b851e

Copy link
Collaborator

@mattChiaravalloti mattChiaravalloti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 🔥

I left one last comment about possibly updating the naming of the try_fold args. Also, it looks like some of the spacing in the test files is off if you want to try to sort that out. No need to retag me if you make either of those changes though.

Comment on lines +447 to +449
.try_fold(Schema::Unsat, |acc, pair| {
let derived_pipeline_schema =
derive_schema_for_pipeline(pair.1.clone(), None, &mut state.clone())?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.try_fold(Schema::Unsat, |acc, pair| {
let derived_pipeline_schema =
derive_schema_for_pipeline(pair.1.clone(), None, &mut state.clone())?;
.try_fold(Schema::Unsat, |acc,(_, pipeline)| {
let derived_pipeline_schema =
derive_schema_for_pipeline(pipeline.clone(), None, &mut state.clone())?;

Could we update this to match and name the (String, Vec<Stage>) pairs directly? pair.1 is a bit unclear

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants