Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,11 @@ export abstract class AbstractCursor<
array.push(await this.transformDocument(doc));
}
} else {
array.push(...docs);
// Note: previous versions of this logic used `array.push(...)`, which adds each item
// to the callstack. For large arrays, this can exceed the maximum call size.
for (const doc of docs) {
array.push(doc);
}
}
}
return array;
Expand Down Expand Up @@ -889,7 +893,7 @@ export abstract class AbstractCursor<
): Promise<InitialCursorResponse>;

/** @internal */
async getMore(batchSize: number): Promise<CursorResponse> {
async getMore(): Promise<CursorResponse> {
if (this.cursorId == null) {
throw new MongoRuntimeError(
'Unexpected null cursor id. A cursor creating command should have set this'
Expand All @@ -906,11 +910,10 @@ export abstract class AbstractCursor<
'Unexpected null session. A cursor creating command should have set this'
);
}

const getMoreOptions = {
...this.cursorOptions,
session: this.cursorSession,
batchSize
batchSize: this.cursorOptions.batchSize
};

const getMoreOperation = new GetMoreOperation(
Expand Down Expand Up @@ -983,14 +986,11 @@ export abstract class AbstractCursor<
await this.cursorInit();
// If the cursor died or returned documents, return
if ((this.documents?.length ?? 0) !== 0 || this.isDead) return;
// Otherwise, run a getMore
}

// otherwise need to call getMore
const batchSize = this.cursorOptions.batchSize || 1000;

// Otherwise, run a getMore
try {
const response = await this.getMore(batchSize);
const response = await this.getMore();
this.cursorId = response.id;
this.documents = response;
} catch (error) {
Expand Down
4 changes: 2 additions & 2 deletions src/cursor/change_stream_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ export class ChangeStreamCursor<
return { server, session, response };
}

override async getMore(batchSize: number): Promise<CursorResponse> {
const response = await super.getMore(batchSize);
override async getMore(): Promise<CursorResponse> {
const response = await super.getMore();

this.maxWireVersion = maxWireVersion(this.server);
this._processBatch(response);
Expand Down
67 changes: 40 additions & 27 deletions src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { FindOperation, type FindOptions } from '../operations/find';
import type { Hint } from '../operations/operation';
import type { ClientSession } from '../sessions';
import { formatSort, type Sort, type SortDirection } from '../sort';
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils';
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, noop, squashError } from '../utils';
import { type InitialCursorResponse } from './abstract_cursor';
import { ExplainableCursor } from './explainable_cursor';

Expand Down Expand Up @@ -98,37 +98,50 @@ export class FindCursor<TSchema = any> extends ExplainableCursor<TSchema> {
}

/** @internal */
override async getMore(batchSize: number): Promise<CursorResponse> {
override async getMore(): Promise<CursorResponse> {
const numReturned = this.numReturned;
if (numReturned) {
// TODO(DRIVERS-1448): Remove logic to enforce `limit` in the driver
const limit = this.findOptions.limit;
batchSize =
limit && limit > 0 && numReturned + batchSize > limit ? limit - numReturned : batchSize;

if (batchSize <= 0) {
try {
await this.close();
} catch (error) {
squashError(error);
// this is an optimization for the special case of a limit for a find command to avoid an
// extra getMore when the limit has been reached and the limit is a multiple of the batchSize.
// This is a consequence of the new query engine in 5.0 having no knowledge of the limit as it
// produces results for the find command. Once a batch is filled up, it is returned and only
// on the subsequent getMore will the query framework consider the limit, determine the cursor
// is exhausted and return a cursorId of zero.
// instead, if we determine there are no more documents to request from the server, we preemptively
// close the cursor
}
return CursorResponse.emptyGetMore;
const limit = this.findOptions.limit ?? Infinity;
const remaining = limit - numReturned;

if (numReturned === limit && !this.id?.isZero()) {
// this is an optimization for the special case of a limit for a find command to avoid an
// extra getMore when the limit has been reached and the limit is a multiple of the batchSize.
// This is a consequence of the new query engine in 5.0 having no knowledge of the limit as it
// produces results for the find command. Once a batch is filled up, it is returned and only
// on the subsequent getMore will the query framework consider the limit, determine the cursor
// is exhausted and return a cursorId of zero.
// instead, if we determine there are no more documents to request from the server, we preemptively
// close the cursor
try {
await this.close();
} catch (error) {
squashError(error);
}
return CursorResponse.emptyGetMore;
}

// TODO(DRIVERS-1448): Remove logic to enforce `limit` in the driver
let cleanup: () => void = noop;
const { batchSize } = this.cursorOptions;
if (batchSize != null && batchSize > remaining) {
this.cursorOptions.batchSize = remaining;

// After executing the final getMore, re-assign the batchSize back to its original value so that
// if the cursor is rewound and executed, the batchSize is still correct.
cleanup = () => {
this.cursorOptions.batchSize = batchSize;
};
}

const response = await super.getMore(batchSize);
// TODO: wrap this in some logic to prevent it from happening if we don't need this support
this.numReturned = this.numReturned + response.batchSize;
try {
const response = await super.getMore();

return response;
this.numReturned = this.numReturned + response.batchSize;

return response;
} finally {
cleanup?.();
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/run_command_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export class RunCommandCursor extends AbstractCursor {
}

/** @internal */
override async getMore(_batchSize: number): Promise<CursorResponse> {
override async getMore(): Promise<CursorResponse> {
if (!this.session) {
throw new MongoRuntimeError(
'Unexpected null session. A cursor creating command should have set this'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,53 +515,49 @@ describe('Command Monitoring', function () {
}
});

it('should correctly decorate the apm result for aggregation with cursorId', {
metadata: { requires: { topology: ['single', 'replicaset'], mongodb: '>=3.0.0' } },

test: function () {
const started = [];
const succeeded = [];

// Generate docs
const docs = [];
for (let i = 0; i < 2500; i++) docs.push({ a: i });

const client = this.configuration.newClient(
{ writeConcern: { w: 1 } },
{ maxPoolSize: 1, monitorCommands: true }
);
it('should correctly decorate the apm result for aggregation with cursorId', async function () {
const started = [];
const succeeded = [];

const desiredEvents = ['aggregate', 'getMore'];
client.on('commandStarted', filterForCommands(desiredEvents, started));
client.on('commandSucceeded', filterForCommands(desiredEvents, succeeded));
// Generate docs
const docs = [];
for (let i = 0; i < 3500; i++) docs.push({ a: i });

const db = client.db(this.configuration.db);
return db
.collection('apm_test_u_4')
.drop()
.catch(ignoreNsNotFound)
.then(() => db.collection('apm_test_u_4').insertMany(docs))
.then(r => {
expect(r).to.exist;
return db
.collection('apm_test_u_4')
.aggregate([{ $match: {} }])
.toArray();
})
.then(r => {
expect(r).to.exist;
expect(started).to.have.length(4);
expect(succeeded).to.have.length(4);
const cursors = succeeded.map(x => x.reply.cursor);
const client = this.configuration.newClient(
{ writeConcern: { w: 1 } },
{ maxPoolSize: 1, monitorCommands: true }
);

// Check we have a cursor
expect(cursors[0].id).to.exist;
expect(cursors[0].id.toString()).to.equal(cursors[1].id.toString());
expect(cursors[3].id.toString()).to.equal('0');
const desiredEvents = ['aggregate', 'getMore'];
client.on('commandStarted', filterForCommands(desiredEvents, started));
client.on('commandSucceeded', filterForCommands(desiredEvents, succeeded));

return client.close();
});
}
const db = client.db(this.configuration.db);
return db
.collection('apm_test_u_4')
.drop()
.catch(ignoreNsNotFound)
.then(() => db.collection('apm_test_u_4').insertMany(docs))
.then(r => {
expect(r).to.exist;
return db
.collection('apm_test_u_4')
.aggregate([{ $match: {} }], { batchSize: 1000 })
.toArray();
})
.then(r => {
expect(r).to.exist;
expect(started).to.have.length(4);
expect(succeeded).to.have.length(4);
const cursors = succeeded.map(x => x.reply.cursor);

// Check we have a cursor
expect(cursors[0].id).to.exist;
expect(cursors[0].id.toString()).to.equal(cursors[1].id.toString());
expect(cursors[3].id.toString()).to.equal('0');

return client.close();
});
});

it('should correctly decorate the apm result for listCollections with cursorId', {
Expand Down
37 changes: 35 additions & 2 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ describe('class AbstractCursor', function () {

afterEach(async function () {
sinon.restore();
await cursor.close();
await client.close();
await cursor?.close();
await client?.close();
});

it('iterates per batch not per document', async () => {
Expand All @@ -439,6 +439,39 @@ describe('class AbstractCursor', function () {
const numDocuments = numBatches * batchSize;
expect(nextSpy.callCount).to.be.lessThan(numDocuments);
});

it(
'does not exceed stack size for large arrays',
// $documents was added in 6.0
{ requires: { mongodb: '>=6.0' } },
async function () {
await client
.db()
.aggregate([
{
$documents: [
{
doc: 'foo'
}
]
},
{
$set: {
field: {
$reduce: {
input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
initialValue: [0],
in: { $concatArrays: ['$$value', '$$value'] }
}
}
}
},
{ $unwind: '$field' },
{ $limit: 1000000 }
])
.toArray();
}
);
});

describe('externally provided timeout contexts', function () {
Expand Down