Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { MongoClient } from './mongo_client';
import { type InferIdType, TypedEventEmitter } from './mongo_types';
import type { AggregateOptions } from './operations/aggregate';
import type { OperationParent } from './operations/command';
import { DeprioritizedServers } from './sdam/server_selection';
import type { ServerSessionId } from './sessions';
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
Expand Down Expand Up @@ -1073,7 +1074,8 @@ export class ChangeStream<
try {
await topology.selectServer(this.cursor.readPreference, {
operationName: 'reconnect topology in change stream',
timeoutContext: this.timeoutContext
timeoutContext: this.timeoutContext,
deprioritizedServers: new DeprioritizedServers()
});
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
} catch {
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ export type {
TagSet,
TopologyVersion
} from './sdam/server_description';
export type { ServerSelector } from './sdam/server_selection';
export type { DeprioritizedServers, ServerSelector } from './sdam/server_selection';
export type { SrvPoller, SrvPollerEvents, SrvPollerOptions } from './sdam/srv_polling';
export type {
ConnectOptions,
Expand Down
4 changes: 2 additions & 2 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_conc
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
import type { ServerMonitoringMode } from './sdam/monitor';
import type { TagSet } from './sdam/server_description';
import { readPreferenceServerSelector } from './sdam/server_selection';
import { DeprioritizedServers, readPreferenceServerSelector } from './sdam/server_selection';
import type { SrvPoller } from './sdam/srv_polling';
import { Topology, type TopologyEvents } from './sdam/topology';
import { ClientSession, type ClientSessionOptions, ServerSessionPool } from './sessions';
Expand Down Expand Up @@ -789,7 +789,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
// to avoid the server selection timeout.
const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred);
const serverDescriptions = Array.from(topologyDescription.servers.values());
const servers = selector(topologyDescription, serverDescriptions);
const servers = selector(topologyDescription, serverDescriptions, new DeprioritizedServers());
if (servers.length !== 0) {
const endSessions = Array.from(client.s.sessionPool.sessions, ({ id }) => id);
if (endSessions.length !== 0) {
Expand Down
11 changes: 6 additions & 5 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import {
} from '../error';
import type { MongoClient } from '../mongo_client';
import { ReadPreference } from '../read_preference';
import type { ServerDescription } from '../sdam/server_description';
import {
DeprioritizedServers,
sameServerSelector,
secondaryWritableServerSelector,
type ServerSelector
Expand Down Expand Up @@ -207,7 +207,8 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
session,
operationName: operation.commandName,
timeoutContext,
signal: operation.options.signal
signal: operation.options.signal,
deprioritizedServers: new DeprioritizedServers()
});

const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
Expand All @@ -234,7 +235,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro

const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
let previousOperationError: MongoError | undefined;
let previousServer: ServerDescription | undefined;
const deprioritizedServers = new DeprioritizedServers();

for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
Expand Down Expand Up @@ -270,7 +271,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
previousServer,
deprioritizedServers,
signal: operation.options.signal
});

Expand Down Expand Up @@ -303,7 +304,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
) {
throw previousOperationError;
}
previousServer = server.description;
deprioritizedServers.add(server.description);
previousOperationError = operationError;

// Reset timeouts
Expand Down
Loading