diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index f60ae2a44af..80b0690d9c5 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -697,7 +697,11 @@ export abstract class AbstractCursor< array.push(await this.transformDocument(doc)); } } else { - array.push(...docs); + // Note: previous versions of this logic used `array.push(...)`, which adds each item + // to the callstack. For large arrays, this can exceed the maximum call size. + for (const doc of docs) { + array.push(doc); + } } } return array; @@ -889,7 +893,7 @@ export abstract class AbstractCursor< ): Promise; /** @internal */ - async getMore(batchSize: number): Promise { + async getMore(): Promise { if (this.cursorId == null) { throw new MongoRuntimeError( 'Unexpected null cursor id. A cursor creating command should have set this' @@ -906,11 +910,10 @@ export abstract class AbstractCursor< 'Unexpected null session. A cursor creating command should have set this' ); } - const getMoreOptions = { ...this.cursorOptions, session: this.cursorSession, - batchSize + batchSize: this.cursorOptions.batchSize }; const getMoreOperation = new GetMoreOperation( @@ -983,14 +986,11 @@ export abstract class AbstractCursor< await this.cursorInit(); // If the cursor died or returned documents, return if ((this.documents?.length ?? 0) !== 0 || this.isDead) return; - // Otherwise, run a getMore } - // otherwise need to call getMore - const batchSize = this.cursorOptions.batchSize || 1000; - + // Otherwise, run a getMore try { - const response = await this.getMore(batchSize); + const response = await this.getMore(); this.cursorId = response.id; this.documents = response; } catch (error) { diff --git a/src/cursor/change_stream_cursor.ts b/src/cursor/change_stream_cursor.ts index 8b56e2d355a..7b4db301069 100644 --- a/src/cursor/change_stream_cursor.ts +++ b/src/cursor/change_stream_cursor.ts @@ -158,8 +158,8 @@ export class ChangeStreamCursor< return { server, session, response }; } - override async getMore(batchSize: number): Promise { - const response = await super.getMore(batchSize); + override async getMore(): Promise { + const response = await super.getMore(); this.maxWireVersion = maxWireVersion(this.server); this._processBatch(response); diff --git a/src/cursor/find_cursor.ts b/src/cursor/find_cursor.ts index 83082f00243..bb6d6108591 100644 --- a/src/cursor/find_cursor.ts +++ b/src/cursor/find_cursor.ts @@ -16,7 +16,7 @@ import { FindOperation, type FindOptions } from '../operations/find'; import type { Hint } from '../operations/operation'; import type { ClientSession } from '../sessions'; import { formatSort, type Sort, type SortDirection } from '../sort'; -import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils'; +import { emitWarningOnce, mergeOptions, type MongoDBNamespace, noop, squashError } from '../utils'; import { type InitialCursorResponse } from './abstract_cursor'; import { ExplainableCursor } from './explainable_cursor'; @@ -98,37 +98,50 @@ export class FindCursor extends ExplainableCursor { } /** @internal */ - override async getMore(batchSize: number): Promise { + override async getMore(): Promise { const numReturned = this.numReturned; - if (numReturned) { - // TODO(DRIVERS-1448): Remove logic to enforce `limit` in the driver - const limit = this.findOptions.limit; - batchSize = - limit && limit > 0 && numReturned + batchSize > limit ? limit - numReturned : batchSize; - - if (batchSize <= 0) { - try { - await this.close(); - } catch (error) { - squashError(error); - // this is an optimization for the special case of a limit for a find command to avoid an - // extra getMore when the limit has been reached and the limit is a multiple of the batchSize. - // This is a consequence of the new query engine in 5.0 having no knowledge of the limit as it - // produces results for the find command. Once a batch is filled up, it is returned and only - // on the subsequent getMore will the query framework consider the limit, determine the cursor - // is exhausted and return a cursorId of zero. - // instead, if we determine there are no more documents to request from the server, we preemptively - // close the cursor - } - return CursorResponse.emptyGetMore; + const limit = this.findOptions.limit ?? Infinity; + const remaining = limit - numReturned; + + if (numReturned === limit && !this.id?.isZero()) { + // this is an optimization for the special case of a limit for a find command to avoid an + // extra getMore when the limit has been reached and the limit is a multiple of the batchSize. + // This is a consequence of the new query engine in 5.0 having no knowledge of the limit as it + // produces results for the find command. Once a batch is filled up, it is returned and only + // on the subsequent getMore will the query framework consider the limit, determine the cursor + // is exhausted and return a cursorId of zero. + // instead, if we determine there are no more documents to request from the server, we preemptively + // close the cursor + try { + await this.close(); + } catch (error) { + squashError(error); } + return CursorResponse.emptyGetMore; + } + + // TODO(DRIVERS-1448): Remove logic to enforce `limit` in the driver + let cleanup: () => void = noop; + const { batchSize } = this.cursorOptions; + if (batchSize != null && batchSize > remaining) { + this.cursorOptions.batchSize = remaining; + + // After executing the final getMore, re-assign the batchSize back to its original value so that + // if the cursor is rewound and executed, the batchSize is still correct. + cleanup = () => { + this.cursorOptions.batchSize = batchSize; + }; } - const response = await super.getMore(batchSize); - // TODO: wrap this in some logic to prevent it from happening if we don't need this support - this.numReturned = this.numReturned + response.batchSize; + try { + const response = await super.getMore(); - return response; + this.numReturned = this.numReturned + response.batchSize; + + return response; + } finally { + cleanup?.(); + } } /** diff --git a/src/cursor/run_command_cursor.ts b/src/cursor/run_command_cursor.ts index e6ceee1060c..997fe1ffebc 100644 --- a/src/cursor/run_command_cursor.ts +++ b/src/cursor/run_command_cursor.ts @@ -159,7 +159,7 @@ export class RunCommandCursor extends AbstractCursor { } /** @internal */ - override async getMore(_batchSize: number): Promise { + override async getMore(): Promise { if (!this.session) { throw new MongoRuntimeError( 'Unexpected null session. A cursor creating command should have set this' diff --git a/test/integration/command-logging-and-monitoring/command_monitoring.test.ts b/test/integration/command-logging-and-monitoring/command_monitoring.test.ts index 90ee80a81fb..240a9363064 100644 --- a/test/integration/command-logging-and-monitoring/command_monitoring.test.ts +++ b/test/integration/command-logging-and-monitoring/command_monitoring.test.ts @@ -515,53 +515,49 @@ describe('Command Monitoring', function () { } }); - it('should correctly decorate the apm result for aggregation with cursorId', { - metadata: { requires: { topology: ['single', 'replicaset'], mongodb: '>=3.0.0' } }, - - test: function () { - const started = []; - const succeeded = []; - - // Generate docs - const docs = []; - for (let i = 0; i < 2500; i++) docs.push({ a: i }); - - const client = this.configuration.newClient( - { writeConcern: { w: 1 } }, - { maxPoolSize: 1, monitorCommands: true } - ); + it('should correctly decorate the apm result for aggregation with cursorId', async function () { + const started = []; + const succeeded = []; - const desiredEvents = ['aggregate', 'getMore']; - client.on('commandStarted', filterForCommands(desiredEvents, started)); - client.on('commandSucceeded', filterForCommands(desiredEvents, succeeded)); + // Generate docs + const docs = []; + for (let i = 0; i < 3500; i++) docs.push({ a: i }); - const db = client.db(this.configuration.db); - return db - .collection('apm_test_u_4') - .drop() - .catch(ignoreNsNotFound) - .then(() => db.collection('apm_test_u_4').insertMany(docs)) - .then(r => { - expect(r).to.exist; - return db - .collection('apm_test_u_4') - .aggregate([{ $match: {} }]) - .toArray(); - }) - .then(r => { - expect(r).to.exist; - expect(started).to.have.length(4); - expect(succeeded).to.have.length(4); - const cursors = succeeded.map(x => x.reply.cursor); + const client = this.configuration.newClient( + { writeConcern: { w: 1 } }, + { maxPoolSize: 1, monitorCommands: true } + ); - // Check we have a cursor - expect(cursors[0].id).to.exist; - expect(cursors[0].id.toString()).to.equal(cursors[1].id.toString()); - expect(cursors[3].id.toString()).to.equal('0'); + const desiredEvents = ['aggregate', 'getMore']; + client.on('commandStarted', filterForCommands(desiredEvents, started)); + client.on('commandSucceeded', filterForCommands(desiredEvents, succeeded)); - return client.close(); - }); - } + const db = client.db(this.configuration.db); + return db + .collection('apm_test_u_4') + .drop() + .catch(ignoreNsNotFound) + .then(() => db.collection('apm_test_u_4').insertMany(docs)) + .then(r => { + expect(r).to.exist; + return db + .collection('apm_test_u_4') + .aggregate([{ $match: {} }], { batchSize: 1000 }) + .toArray(); + }) + .then(r => { + expect(r).to.exist; + expect(started).to.have.length(4); + expect(succeeded).to.have.length(4); + const cursors = succeeded.map(x => x.reply.cursor); + + // Check we have a cursor + expect(cursors[0].id).to.exist; + expect(cursors[0].id.toString()).to.equal(cursors[1].id.toString()); + expect(cursors[3].id.toString()).to.equal('0'); + + return client.close(); + }); }); it('should correctly decorate the apm result for listCollections with cursorId', { diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index c3049de8cb1..92ac2d9125a 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -428,8 +428,8 @@ describe('class AbstractCursor', function () { afterEach(async function () { sinon.restore(); - await cursor.close(); - await client.close(); + await cursor?.close(); + await client?.close(); }); it('iterates per batch not per document', async () => { @@ -439,6 +439,39 @@ describe('class AbstractCursor', function () { const numDocuments = numBatches * batchSize; expect(nextSpy.callCount).to.be.lessThan(numDocuments); }); + + it( + 'does not exceed stack size for large arrays', + // $documents was added in 6.0 + { requires: { mongodb: '>=6.0' } }, + async function () { + await client + .db() + .aggregate([ + { + $documents: [ + { + doc: 'foo' + } + ] + }, + { + $set: { + field: { + $reduce: { + input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], + initialValue: [0], + in: { $concatArrays: ['$$value', '$$value'] } + } + } + } + }, + { $unwind: '$field' }, + { $limit: 1000000 } + ]) + .toArray(); + } + ); }); describe('externally provided timeout contexts', function () {