From 02e1ec1c91451315894e4e32ff4fe2c4ea12ac46 Mon Sep 17 00:00:00 2001 From: Christoph Rehbichler Date: Mon, 24 Nov 2025 17:40:57 +0100 Subject: [PATCH 1/4] fix(NODE-7298): ensure commonWireVersion is computed from server maxWireVersion --- src/sdam/server_selection.ts | 2 +- src/sdam/topology.ts | 2 +- src/sdam/topology_description.ts | 4 +- test/unit/sdam/topology_description.test.ts | 63 +++++++++++++++++++++ 4 files changed, 67 insertions(+), 4 deletions(-) create mode 100644 test/unit/sdam/topology_description.test.ts diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 03b6e959386..dd19b98005c 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -56,7 +56,7 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec * server potentially for a write on a secondary. */ export function secondaryWritableServerSelector( - wireVersion?: number, + wireVersion?: number | null, readPreference?: ReadPreference ): ServerSelector { // If server version < 5.0, read preference always primary. diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 05cd8eb2abe..ac51dea6775 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -771,7 +771,7 @@ export class Topology extends TypedEventEmitter { return result; } - get commonWireVersion(): number | undefined { + get commonWireVersion(): number | null { return this.description.commonWireVersion; } diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 55eb48bb35f..1fe4e663027 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -42,7 +42,7 @@ export class TopologyDescription { logicalSessionTimeoutMinutes: number | null; heartbeatFrequencyMS: number; localThresholdMS: number; - commonWireVersion: number; + commonWireVersion: number | null; /** * Create a TopologyDescription */ @@ -66,7 +66,7 @@ export class TopologyDescription { this.setName = setName ?? null; this.maxElectionId = maxElectionId ?? null; this.maxSetVersion = maxSetVersion ?? null; - this.commonWireVersion = commonWireVersion ?? 0; + this.commonWireVersion = commonWireVersion ?? null; // determine server compatibility for (const serverDescription of this.servers.values()) { diff --git a/test/unit/sdam/topology_description.test.ts b/test/unit/sdam/topology_description.test.ts new file mode 100644 index 00000000000..2ea78f9df43 --- /dev/null +++ b/test/unit/sdam/topology_description.test.ts @@ -0,0 +1,63 @@ +import { expect } from 'chai'; + +import { TopologyType } from '../../../src/sdam/common'; +import { ServerDescription } from '../../../src/sdam/server_description'; +import { TopologyDescription } from '../../../src/sdam/topology_description'; + +describe('TopologyDescription', function () { + describe('#constructor', function () { + it('sets commonWireVersion to null', function () { + const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary); + + expect(initial.commonWireVersion).to.equal(null); + }); + }); + + describe('update()', function () { + it('initializes commonWireVersion from first non-zero maxWireVersion', function () { + const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary); + + const sd1 = new ServerDescription('a:27017', { + maxWireVersion: 25 + }); + + const updated = initial.update(sd1); + + expect(updated.commonWireVersion).to.equal(25); + }); + + it('tracks the minimum non-zero maxWireVersion across updates in commonWireVersion', function () { + const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary); + + const sd1 = new ServerDescription('a:27017', { + maxWireVersion: 25 + }); + + const sd2 = new ServerDescription('b:27017', { + maxWireVersion: 21 + }); + + let updated = initial.update(sd1); + updated = updated.update(sd2); + + expect(updated.commonWireVersion).to.equal(21); + }); + + it('ignores servers with maxWireVersion === 0 when computing commonWireVersion', function () { + const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary); + + const sd1 = new ServerDescription('a:27017', { + maxWireVersion: 25 + }); + + const sdUnknown = new ServerDescription('b:27017', { + maxWireVersion: 0 + }); + + let updated = initial.update(sd1); + updated = updated.update(sdUnknown); + + expect(updated.commonWireVersion).to.equal(25); + }); + }); +}); From 1beb3c69782268049aa2791eee5fc6400f456c42 Mon Sep 17 00:00:00 2001 From: Christoph Rehbichler Date: Thu, 27 Nov 2025 16:58:23 +0100 Subject: [PATCH 2/4] test(NODE-7298): add integration test for aggregations with write stage using provided read preference --- test/integration/crud/aggregation.test.ts | 69 +++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/test/integration/crud/aggregation.test.ts b/test/integration/crud/aggregation.test.ts index 92602f7b875..c8310e8bddc 100644 --- a/test/integration/crud/aggregation.test.ts +++ b/test/integration/crud/aggregation.test.ts @@ -870,4 +870,73 @@ describe('Aggregation', function () { .finally(() => client.close()); } }); + + it( + 'should perform aggregations with a write stage on secondary when readPreference is secondary', + { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + const databaseName = this.configuration.db; + const client = this.configuration.newClient(this.configuration.writeConcernMax(), { + maxPoolSize: 1, + monitorCommands: true + }); + + const events = []; + client.on('commandStarted', filterForCommands(['hello', 'aggregate'], events)); + + // Discover primary to be able to check the actual server address + await client.db('admin').command({ hello: 1 }); + const [helloEvent] = events; + const primaryAddress = helloEvent.address; + + // Clear events + events.length = 0; + + const src = client.db(databaseName).collection('read_pref_src'); + const outMerge = client.db(databaseName).collection('read_pref_merge_out'); + const outOut = client.db(databaseName).collection('read_pref_out_out'); + + await Promise.all([src.deleteMany({}), outMerge.deleteMany({}), outOut.deleteMany({})]); + await src.insertMany([{ a: 1 }, { a: 2 }]); + await Promise.all([ + src + .aggregate( + [ + { + $merge: { + into: 'read_pref_merge_out', + whenMatched: 'replace', + whenNotMatched: 'insert' + } + } + ], + { readPreference: 'secondary' } + ) + .toArray(), + src + .aggregate( + [ + { + $out: 'read_pref_out_out' + } + ], + { readPreference: 'secondary' } + ) + .toArray() + ]); + + expect(events).to.have.length(2); + events.forEach(event => { + expect(event).to.have.property('commandName', 'aggregate'); + expect(event.address).to.not.equal(primaryAddress); + expect(event).to.have.deep.nested.property('command.$readPreference', { + mode: 'secondary' + }); + }); + + await client.close(); + } + } + ); }); From bdc4bb11d9ee997eeb3faf7c3c3a3dcbc2248282 Mon Sep 17 00:00:00 2001 From: Christoph Rehbichler Date: Wed, 3 Dec 2025 13:56:47 +0100 Subject: [PATCH 3/4] fix(NODE-7298): make wireVersion non-optional in secondaryWritableServerSelector --- src/sdam/server_selection.ts | 2 +- test/unit/sdam/server_selection.test.ts | 55 +------------------------ 2 files changed, 3 insertions(+), 54 deletions(-) diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index dd19b98005c..63ccedf73d9 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -56,7 +56,7 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec * server potentially for a write on a secondary. */ export function secondaryWritableServerSelector( - wireVersion?: number | null, + wireVersion: number | null, readPreference?: ReadPreference ): ServerSelector { // If server version < 5.0, read preference always primary. diff --git a/test/unit/sdam/server_selection.test.ts b/test/unit/sdam/server_selection.test.ts index 4c0a45985de..1e049ea4e45 100644 --- a/test/unit/sdam/server_selection.test.ts +++ b/test/unit/sdam/server_selection.test.ts @@ -345,23 +345,6 @@ describe('server selection', function () { }); }); }); - - context('when a common wire version is not provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.Sharded, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = secondaryWritableServerSelector(); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); - - it('selects a mongos', function () { - expect(servers).to.deep.equal([mongos]); - }); - }); }); context('when the topology is load balanced', function () { @@ -431,23 +414,6 @@ describe('server selection', function () { }); }); }); - - context('when a common wire version is not provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.LoadBalanced, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = secondaryWritableServerSelector(); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); - - it('selects a load balancer', function () { - expect(servers).to.deep.equal([loadBalancer]); - }); - }); }); context('when the topology is single', function () { @@ -517,23 +483,6 @@ describe('server selection', function () { }); }); }); - - context('when a common wire version is not provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.Single, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = secondaryWritableServerSelector(); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); - - it('selects a standalone', function () { - expect(servers).to.deep.equal([single]); - }); - }); }); context('localThresholdMS is respected as an option', function () { @@ -580,7 +529,7 @@ describe('server selection', function () { new ObjectId(), MIN_SECONDARY_WRITE_WIRE_VERSION ); - const selector = secondaryWritableServerSelector(); + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); expect(servers).to.have.lengthOf(2); const selectedAddresses = new Set(servers.map(({ address }) => address)); @@ -599,7 +548,7 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, { localThresholdMS: 5 } ); - const selector = secondaryWritableServerSelector(); + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); expect(servers).to.have.lengthOf(1); const selectedAddresses = new Set(servers.map(({ address }) => address)); From 9837a8bd803a300972aa0c09e28ce47602bb1b91 Mon Sep 17 00:00:00 2001 From: Christoph Rehbichler Date: Wed, 3 Dec 2025 22:18:02 +0100 Subject: [PATCH 4/4] fix(NODE-7298): revert commonWireVersion type change; adapt check in update() accordingly --- src/sdam/server_selection.ts | 2 +- src/sdam/topology.ts | 2 +- src/sdam/topology_description.ts | 6 +++--- test/unit/sdam/server_selection.test.ts | 17 ----------------- test/unit/sdam/topology_description.test.ts | 4 ++-- 5 files changed, 7 insertions(+), 24 deletions(-) diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 63ccedf73d9..ed8ad73dbeb 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -56,7 +56,7 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec * server potentially for a write on a secondary. */ export function secondaryWritableServerSelector( - wireVersion: number | null, + wireVersion: number, readPreference?: ReadPreference ): ServerSelector { // If server version < 5.0, read preference always primary. diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index ac51dea6775..c856e61231f 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -771,7 +771,7 @@ export class Topology extends TypedEventEmitter { return result; } - get commonWireVersion(): number | null { + get commonWireVersion(): number { return this.description.commonWireVersion; } diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 1fe4e663027..a1c301dbc2d 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -42,7 +42,7 @@ export class TopologyDescription { logicalSessionTimeoutMinutes: number | null; heartbeatFrequencyMS: number; localThresholdMS: number; - commonWireVersion: number | null; + commonWireVersion: number; /** * Create a TopologyDescription */ @@ -66,7 +66,7 @@ export class TopologyDescription { this.setName = setName ?? null; this.maxElectionId = maxElectionId ?? null; this.maxSetVersion = maxSetVersion ?? null; - this.commonWireVersion = commonWireVersion ?? null; + this.commonWireVersion = commonWireVersion ?? 0; // determine server compatibility for (const serverDescription of this.servers.values()) { @@ -192,7 +192,7 @@ export class TopologyDescription { // update common wire version if (serverDescription.maxWireVersion !== 0) { - if (commonWireVersion == null) { + if (commonWireVersion === 0) { commonWireVersion = serverDescription.maxWireVersion; } else { commonWireVersion = Math.min(commonWireVersion, serverDescription.maxWireVersion); diff --git a/test/unit/sdam/server_selection.test.ts b/test/unit/sdam/server_selection.test.ts index 1e049ea4e45..93fc7fbf274 100644 --- a/test/unit/sdam/server_selection.test.ts +++ b/test/unit/sdam/server_selection.test.ts @@ -259,23 +259,6 @@ describe('server selection', function () { }); }); }); - - context('when a common wire version is not provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.ReplicaSetWithPrimary, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = secondaryWritableServerSelector(undefined, ReadPreference.secondary); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); - - it('selects a primary', function () { - expect(servers).to.deep.equal([primary]); - }); - }); }); context('when the topology is sharded', function () { diff --git a/test/unit/sdam/topology_description.test.ts b/test/unit/sdam/topology_description.test.ts index 2ea78f9df43..01c572cea97 100644 --- a/test/unit/sdam/topology_description.test.ts +++ b/test/unit/sdam/topology_description.test.ts @@ -6,10 +6,10 @@ import { TopologyDescription } from '../../../src/sdam/topology_description'; describe('TopologyDescription', function () { describe('#constructor', function () { - it('sets commonWireVersion to null', function () { + it('sets commonWireVersion to 0', function () { const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary); - expect(initial.commonWireVersion).to.equal(null); + expect(initial.commonWireVersion).to.equal(0); }); });