diff --git a/index.js b/index.js index 136a3b7..275e70c 100644 --- a/index.js +++ b/index.js @@ -108,7 +108,7 @@ function SDK(id, data, awsResourceConfig) { * @param {Object} config - An object that contains config values that control the flow of events to outQueue * @return {stream} Stream */ - load: leoStream.load, + load: leoStream.load.bind(leoStream), /** * Process events from a queue. @@ -121,7 +121,7 @@ function SDK(id, data, awsResourceConfig) { * @param {function} callback - A function called when all events have been processed. (payload, metadata, done) => { } * @return {stream} Stream */ - offload: leoStream.offload, + offload: leoStream.offload.bind(leoStream), /** * Process events from a queue. * @param {Object} opts @@ -145,7 +145,7 @@ function SDK(id, data, awsResourceConfig) { * @param {function} callback - A function called when all events have been processed. (payload, metadata, done) => { } * @return {stream} Stream */ - enrich: leoStream.enrich, + enrich: leoStream.enrich.bind(leoStream), /** * Enrich events from one queue to another. * @param {Object} opts diff --git a/lib/mock-sdk.ts b/lib/mock-sdk.ts index 6b7c9eb..2b30ccf 100644 --- a/lib/mock-sdk.ts +++ b/lib/mock-sdk.ts @@ -465,108 +465,9 @@ export function mockRStreamsSdk(sdk: RStreamsSdk, opts?: MockRStreamsSdkOptions) s3State.fromS3Spy = fromS3Spy; mockStreams.fromS3 = fromS3Spy; - // ── Reimplemented enrich/offload ────────────────────────────────── - // - // The real enrich/offload in leo-stream.js reference a closed-over `ls` - // variable, so Object.create overrides on mockStreams are invisible to - // them. We reimplement the pipeline assembly here so it uses the mock's - // fromLeo/toLeo/toCheckpoint. - // - // IMPORTANT: If the real enrich/offload pipeline assembly changes in - // leo-stream.js, this reimplementation must be updated to match. - // This is a known trade-off — the alternative (patching the closed-over - // `ls` directly) would require mutating the original SDK's streams. - - mockStreams.enrich = function (opts: any, callback: Callback) { - const id = opts.id; - const inQueue = opts.inQueue; - const outQueue = opts.outQueue; - const func = opts.transform || opts.each; - const config = opts.config || {}; - config.start = config.start || opts.start; - config.debug = opts.debug; - - const args: any[] = []; - args.push(mockStreams.fromLeo(id, inQueue, config)); - - if (opts.batch) { - args.push(realStreams.batch(opts.batch)); - } - - args.push(realStreams.process(id, func, outQueue)); - args.push(mockStreams.toLeo(id, opts)); - args.push(mockStreams.toCheckpoint({ - debug: opts.debug, - force: opts.force, - onCheckpoint: opts.onCheckpoint, - })); - args.push(callback); - return realStreams.pipe.apply(realStreams, args); - }; - - mockStreams.offload = function (opts: any, callback: Callback) { - const id = opts.id; - const inQueue = opts.inQueue || opts.queue; - const func = opts.each || opts.transform; - let batchConfig: any = { size: 1, map: (payload: any, meta: any, done: any) => done(null, payload) }; - - // Normalize top-level batch shorthand options (matches real offload behavior) - if (typeof opts.size != "object" && (opts.count || opts.records || opts.units || opts.time || opts.bytes)) { - const size = {} as any; - size.count = opts.count || opts.records || opts.units; - size.time = opts.time; - size.bytes = opts.size || opts.bytes; - size.highWaterMark = opts.highWaterMark || 2; - opts.size = size; - } - - if (!opts.batch || typeof opts.batch === "number") { - batchConfig.size = opts.batch || batchConfig.size; - } else { - batchConfig.size = opts.batch.size || ((opts.batch.count || opts.batch.bytes || opts.batch.time || opts.batch.highWaterMark) && opts.batch) || batchConfig.size; - batchConfig.map = opts.batch.map || batchConfig.map; - } - if (typeof batchConfig.size !== "object") { - batchConfig.size = { count: batchConfig.size }; - } - batchConfig.size.highWaterMark = batchConfig.size.highWaterMark || 2; - - const batchSize = typeof batchConfig.size === "number" ? batchConfig.size : (batchConfig.size.count || batchConfig.size.records); - - return realStreams.pipe( - mockStreams.fromLeo(id, inQueue, opts), - realStreams.through((obj: any, done: any) => { - batchConfig.map(obj.payload, obj, (err: any, r: any) => { - if (err || !r) { - done(err); - } else { - obj.payload = r; - done(null, obj); - } - }); - }), - realStreams.batch(batchConfig.size), - realStreams.through({ highWaterMark: 1 }, (batch: any, done: any) => { - batch.units = batch.payload.length; - const last = batch.payload[batch.units - 1]; - if (batchSize == 1) { - done(null, last); - } else { - batch.event_source_timestamp = last.event_source_timestamp; - batch.event = last.event; - batch.eid = last.eid; - done(null, batch); - } - }), - realStreams.process(id, func, null, undefined, { highWaterMark: 1 }), - mockStreams.toCheckpoint({ - debug: opts.debug, - force: opts.force, - onCheckpoint: opts.onCheckpoint, - }), - callback - ); - }; + // enrich/offload/load in leo-stream.js now use `this` instead of the + // closed-over `ls`, so they naturally pick up our overrides on mockStreams + // via the prototype chain (Object.create). No reimplementation needed. // Override cron on the streams object const botSpies = createBotSpies(state); @@ -597,22 +498,27 @@ export function mockRStreamsSdk(sdk: RStreamsSdk, opts?: MockRStreamsSdkOptions) // Override bot mockSdk.bot = botSpies.cron; - // Wrap enrich/offload with disableS3 stripping if enabled + // Bind enrich/offload to mockStreams so `this` resolves to the mock + // (where fromLeo/toLeo/toCheckpoint are overridden) rather than the SDK object. + const boundEnrich = mockStreams.enrich.bind(mockStreams); + const boundOffload = mockStreams.offload.bind(mockStreams); + + // Wrap with disableS3 stripping if enabled const wrappedEnrich = disableS3 ? (enrichOpts: any, callback: Callback) => { delete enrichOpts.useS3; - return mockStreams.enrich(enrichOpts, callback); + return boundEnrich(enrichOpts, callback); } - : mockStreams.enrich; + : boundEnrich; mockSdk.enrich = wrappedEnrich; mockSdk.enrichEvents = promisify(wrappedEnrich) as any; const wrappedOffload = disableS3 ? (offloadOpts: any, callback: Callback) => { delete offloadOpts.useS3; - return mockStreams.offload(offloadOpts, callback); + return boundOffload(offloadOpts, callback); } - : mockStreams.offload; + : boundOffload; mockSdk.offload = wrappedOffload; mockSdk.offloadEvents = promisify(wrappedOffload) as any; diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index d06e02d..6103c78 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -391,9 +391,10 @@ module.exports = function(configure) { autoDetectPayload: true, autoCheckpoint: true }, opts || {}); + var self = this; var args = []; - args.push(ls.through((obj, done) => { + args.push(self.through((obj, done) => { var e; if (opts.autoDetectPayload && (obj.payload || obj.correlation_id)) { e = obj; @@ -413,17 +414,17 @@ module.exports = function(configure) { done(null, e); })); if (opts.useS3) { - args.push(leoS3(ls, outQueue, configure, { prefix: opts.prefix || id, ...opts.s3Opts })); + args.push(leoS3(self, outQueue, configure, { prefix: opts.prefix || id, ...opts.s3Opts })); } else if (!opts.firehose) { // TODO: This should be part of auto switch - args.push(ls.throughAsync(async (obj, push) => { + args.push(self.throughAsync(async (obj, push) => { let size = Buffer.byteLength(JSON.stringify(obj)); if (size > twoHundredK * 3) { logger.info('Sending event to S3 because it exceeds the max size for DDB. ', size); - await ls.pipeAsync( + await self.pipeAsync( es.readArray([obj]), - leoS3(ls, outQueue, configure, { prefix: opts.prefix || id }), - ls.write((newobj, done) => { + leoS3(self, outQueue, configure, { prefix: opts.prefix || id }), + self.write((newobj, done) => { push(newobj); done(); }) @@ -434,18 +435,18 @@ module.exports = function(configure) { })); } - args.push(ls.toLeo(id, opts)); + args.push(self.toLeo(id, opts)); // else { - // args.push(ls.autoSwitch(outQueue, opts)) + // args.push(self.autoSwitch(outQueue, opts)) // } if (opts.autoCheckpoint !== false) { - args.push(ls.toCheckpoint({ + args.push(self.toCheckpoint({ debug: opts.debug, force: opts.force, onCheckpoint: opts.onCheckpoint })); } else { - args.push(ls.write({ + args.push(self.write({ hasCommands: false, commands: { ignoreCommands: ["checkpoint"] @@ -453,10 +454,11 @@ module.exports = function(configure) { }, (_event, done) => done())); } - return ls.pipeline.apply(ls, args); + return self.pipeline.apply(self, args); }, enrich: function(opts, callback) { configure.validate(); + var self = this; var id = opts.id; var inQueue = opts.inQueue; var outQueue = opts.outQueue; @@ -467,27 +469,28 @@ module.exports = function(configure) { config.debug = opts.debug; var args = []; - args.push(ls.fromLeo(id, inQueue, config)); + args.push(self.fromLeo(id, inQueue, config)); if (opts.batch) { - args.push(ls.batch(opts.batch)); + args.push(self.batch(opts.batch)); } - args.push(ls.process(id, func, outQueue)); + args.push(self.process(id, func, outQueue)); if (opts.useS3) { - args.push(leoS3(ls, outQueue, configure, { prefix: id })); + args.push(leoS3(self, outQueue, configure, { prefix: id })); } - args.push(ls.toLeo(id, opts)); - args.push(ls.toCheckpoint({ + args.push(self.toLeo(id, opts)); + args.push(self.toCheckpoint({ debug: opts.debug, force: opts.force, onCheckpoint: opts.onCheckpoint })); args.push(callback); - return ls.pipe.apply(ls, args); + return self.pipe.apply(self, args); }, offload: function(opts, callback) { configure.validate(); + var self = this; var id = opts.id; var inQueue = opts.inQueue || opts.queue; var func = opts.each || opts.transform; @@ -519,9 +522,9 @@ module.exports = function(configure) { batch.size.highWaterMark = batch.size.highWaterMark || 2; var batchSize = typeof batch.size === "number" ? batch.size : (batch.size.count || batch.size.records); - return ls.pipe( - ls.fromLeo(id, inQueue, opts), - ls.through((obj, done) => { + return self.pipe( + self.fromLeo(id, inQueue, opts), + self.through((obj, done) => { batch.map(obj.payload, obj, (err, r, rOpts) => { rOpts = rOpts || {}; if (err || !r) { @@ -532,8 +535,8 @@ module.exports = function(configure) { } }); }), - ls.batch(batch.size), - ls.through({ + self.batch(batch.size), + self.through({ highWaterMark: 1 }, (batch, done) => { batch.units = batch.payload.length; @@ -547,10 +550,10 @@ module.exports = function(configure) { done(null, batch); } }), - ls.process(id, func, null, undefined, { + self.process(id, func, null, undefined, { highWaterMark: 1 }), - ls.toCheckpoint({ + self.toCheckpoint({ debug: opts.debug, force: opts.force, onCheckpoint: opts.onCheckpoint @@ -2415,7 +2418,7 @@ module.exports = function(configure) { opts = {}; } - var stream = ls.load(id, event, opts); + var stream = this.load(id, event, opts); stream.write(obj); stream.end(err => { err && logger.info("Error:", err); diff --git a/test/lib.mock-sdk.utest.ts b/test/lib.mock-sdk.utest.ts index fe25708..2ad9f68 100644 --- a/test/lib.mock-sdk.utest.ts +++ b/test/lib.mock-sdk.utest.ts @@ -864,11 +864,13 @@ describe("lib/mock-sdk.ts", function () { }); }); - it("can be explicitly disabled to preserve useS3", (done) => { + it("can be explicitly disabled to preserve useS3", () => { const baseSdk = createMinimalSdk(); const sdkWithS3 = mockRStreamsSdk(baseSdk, { disableS3: false }); - sdkWithS3.mock.queues["in-q"] = [{ a: 1 }]; + // Verify the wrapper doesn't strip useS3 by checking the opts + // object after calling enrich. We don't actually run the pipeline + // (useS3:true would try real S3), just confirm the flag survives. const enrichOpts: any = { id: "bot", inQueue: "in-q", @@ -877,10 +879,10 @@ describe("lib/mock-sdk.ts", function () { transform(payload: any, event: any, cb: any) { cb(null, payload); }, }; - sdkWithS3.enrich(enrichOpts, (err: any) => { - assert.isTrue(enrichOpts.useS3); - done(err); - }); + // Call enrich — it will start the pipeline but we don't need to wait + // for completion. The synchronous stripping (or not) happens immediately. + sdkWithS3.enrich(enrichOpts, () => {}); + assert.isTrue(enrichOpts.useS3); }); });