Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ function SDK(id, data, awsResourceConfig) {
* @param {Object} config - An object that contains config values that control the flow of events to outQueue
* @return {stream} Stream
*/
load: leoStream.load,
load: leoStream.load.bind(leoStream),

/**
* Process events from a queue.
Expand All @@ -121,7 +121,7 @@ function SDK(id, data, awsResourceConfig) {
* @param {function} callback - A function called when all events have been processed. (payload, metadata, done) => { }
* @return {stream} Stream
*/
offload: leoStream.offload,
offload: leoStream.offload.bind(leoStream),
/**
* Process events from a queue.
* @param {Object} opts
Expand All @@ -145,7 +145,7 @@ function SDK(id, data, awsResourceConfig) {
* @param {function} callback - A function called when all events have been processed. (payload, metadata, done) => { }
* @return {stream} Stream
*/
enrich: leoStream.enrich,
enrich: leoStream.enrich.bind(leoStream),
/**
* Enrich events from one queue to another.
* @param {Object} opts
Expand Down
120 changes: 13 additions & 107 deletions lib/mock-sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,108 +465,9 @@ export function mockRStreamsSdk(sdk: RStreamsSdk, opts?: MockRStreamsSdkOptions)
s3State.fromS3Spy = fromS3Spy;
mockStreams.fromS3 = fromS3Spy;

// ── Reimplemented enrich/offload ──────────────────────────────────
//
// The real enrich/offload in leo-stream.js reference a closed-over `ls`
// variable, so Object.create overrides on mockStreams are invisible to
// them. We reimplement the pipeline assembly here so it uses the mock's
// fromLeo/toLeo/toCheckpoint.
//
// IMPORTANT: If the real enrich/offload pipeline assembly changes in
// leo-stream.js, this reimplementation must be updated to match.
// This is a known trade-off — the alternative (patching the closed-over
// `ls` directly) would require mutating the original SDK's streams.

mockStreams.enrich = function (opts: any, callback: Callback) {
const id = opts.id;
const inQueue = opts.inQueue;
const outQueue = opts.outQueue;
const func = opts.transform || opts.each;
const config = opts.config || {};
config.start = config.start || opts.start;
config.debug = opts.debug;

const args: any[] = [];
args.push(mockStreams.fromLeo(id, inQueue, config));

if (opts.batch) {
args.push(realStreams.batch(opts.batch));
}

args.push(realStreams.process(id, func, outQueue));
args.push(mockStreams.toLeo(id, opts));
args.push(mockStreams.toCheckpoint({
debug: opts.debug,
force: opts.force,
onCheckpoint: opts.onCheckpoint,
}));
args.push(callback);
return realStreams.pipe.apply(realStreams, args);
};

mockStreams.offload = function (opts: any, callback: Callback) {
const id = opts.id;
const inQueue = opts.inQueue || opts.queue;
const func = opts.each || opts.transform;
let batchConfig: any = { size: 1, map: (payload: any, meta: any, done: any) => done(null, payload) };

// Normalize top-level batch shorthand options (matches real offload behavior)
if (typeof opts.size != "object" && (opts.count || opts.records || opts.units || opts.time || opts.bytes)) {
const size = {} as any;
size.count = opts.count || opts.records || opts.units;
size.time = opts.time;
size.bytes = opts.size || opts.bytes;
size.highWaterMark = opts.highWaterMark || 2;
opts.size = size;
}

if (!opts.batch || typeof opts.batch === "number") {
batchConfig.size = opts.batch || batchConfig.size;
} else {
batchConfig.size = opts.batch.size || ((opts.batch.count || opts.batch.bytes || opts.batch.time || opts.batch.highWaterMark) && opts.batch) || batchConfig.size;
batchConfig.map = opts.batch.map || batchConfig.map;
}
if (typeof batchConfig.size !== "object") {
batchConfig.size = { count: batchConfig.size };
}
batchConfig.size.highWaterMark = batchConfig.size.highWaterMark || 2;

const batchSize = typeof batchConfig.size === "number" ? batchConfig.size : (batchConfig.size.count || batchConfig.size.records);

return realStreams.pipe(
mockStreams.fromLeo(id, inQueue, opts),
realStreams.through((obj: any, done: any) => {
batchConfig.map(obj.payload, obj, (err: any, r: any) => {
if (err || !r) {
done(err);
} else {
obj.payload = r;
done(null, obj);
}
});
}),
realStreams.batch(batchConfig.size),
realStreams.through({ highWaterMark: 1 }, (batch: any, done: any) => {
batch.units = batch.payload.length;
const last = batch.payload[batch.units - 1];
if (batchSize == 1) {
done(null, last);
} else {
batch.event_source_timestamp = last.event_source_timestamp;
batch.event = last.event;
batch.eid = last.eid;
done(null, batch);
}
}),
realStreams.process(id, func, null, undefined, { highWaterMark: 1 }),
mockStreams.toCheckpoint({
debug: opts.debug,
force: opts.force,
onCheckpoint: opts.onCheckpoint,
}),
callback
);
};
// enrich/offload/load in leo-stream.js now use `this` instead of the
// closed-over `ls`, so they naturally pick up our overrides on mockStreams
// via the prototype chain (Object.create). No reimplementation needed.

// Override cron on the streams object
const botSpies = createBotSpies(state);
Expand Down Expand Up @@ -597,22 +498,27 @@ export function mockRStreamsSdk(sdk: RStreamsSdk, opts?: MockRStreamsSdkOptions)
// Override bot
mockSdk.bot = botSpies.cron;

// Wrap enrich/offload with disableS3 stripping if enabled
// Bind enrich/offload to mockStreams so `this` resolves to the mock
// (where fromLeo/toLeo/toCheckpoint are overridden) rather than the SDK object.
const boundEnrich = mockStreams.enrich.bind(mockStreams);
const boundOffload = mockStreams.offload.bind(mockStreams);

// Wrap with disableS3 stripping if enabled
const wrappedEnrich = disableS3
? (enrichOpts: any, callback: Callback) => {
delete enrichOpts.useS3;
return mockStreams.enrich(enrichOpts, callback);
return boundEnrich(enrichOpts, callback);
}
: mockStreams.enrich;
: boundEnrich;
mockSdk.enrich = wrappedEnrich;
mockSdk.enrichEvents = promisify(wrappedEnrich) as any;

const wrappedOffload = disableS3
? (offloadOpts: any, callback: Callback) => {
delete offloadOpts.useS3;
return mockStreams.offload(offloadOpts, callback);
return boundOffload(offloadOpts, callback);
}
: mockStreams.offload;
: boundOffload;
mockSdk.offload = wrappedOffload;
mockSdk.offloadEvents = promisify(wrappedOffload) as any;

Expand Down
55 changes: 29 additions & 26 deletions lib/stream/leo-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ module.exports = function(configure) {
autoDetectPayload: true,
autoCheckpoint: true
}, opts || {});
var self = this;
var args = [];

args.push(ls.through((obj, done) => {
args.push(self.through((obj, done) => {
var e;
if (opts.autoDetectPayload && (obj.payload || obj.correlation_id)) {
e = obj;
Expand All @@ -413,17 +414,17 @@ module.exports = function(configure) {
done(null, e);
}));
if (opts.useS3) {
args.push(leoS3(ls, outQueue, configure, { prefix: opts.prefix || id, ...opts.s3Opts }));
args.push(leoS3(self, outQueue, configure, { prefix: opts.prefix || id, ...opts.s3Opts }));
} else if (!opts.firehose) {
// TODO: This should be part of auto switch
args.push(ls.throughAsync(async (obj, push) => {
args.push(self.throughAsync(async (obj, push) => {
let size = Buffer.byteLength(JSON.stringify(obj));
if (size > twoHundredK * 3) {
logger.info('Sending event to S3 because it exceeds the max size for DDB. ', size);
await ls.pipeAsync(
await self.pipeAsync(
es.readArray([obj]),
leoS3(ls, outQueue, configure, { prefix: opts.prefix || id }),
ls.write((newobj, done) => {
leoS3(self, outQueue, configure, { prefix: opts.prefix || id }),
self.write((newobj, done) => {
push(newobj);
done();
})
Expand All @@ -434,29 +435,30 @@ module.exports = function(configure) {
}));
}

args.push(ls.toLeo(id, opts));
args.push(self.toLeo(id, opts));
// else {
// args.push(ls.autoSwitch(outQueue, opts))
// args.push(self.autoSwitch(outQueue, opts))
// }
if (opts.autoCheckpoint !== false) {
args.push(ls.toCheckpoint({
args.push(self.toCheckpoint({
debug: opts.debug,
force: opts.force,
onCheckpoint: opts.onCheckpoint
}));
} else {
args.push(ls.write({
args.push(self.write({
hasCommands: false,
commands: {
ignoreCommands: ["checkpoint"]
}
}, (_event, done) => done()));
}

return ls.pipeline.apply(ls, args);
return self.pipeline.apply(self, args);
},
enrich: function(opts, callback) {
configure.validate();
var self = this;
var id = opts.id;
var inQueue = opts.inQueue;
var outQueue = opts.outQueue;
Expand All @@ -467,27 +469,28 @@ module.exports = function(configure) {
config.debug = opts.debug;

var args = [];
args.push(ls.fromLeo(id, inQueue, config));
args.push(self.fromLeo(id, inQueue, config));

if (opts.batch) {
args.push(ls.batch(opts.batch));
args.push(self.batch(opts.batch));
}

args.push(ls.process(id, func, outQueue));
args.push(self.process(id, func, outQueue));
if (opts.useS3) {
args.push(leoS3(ls, outQueue, configure, { prefix: id }));
args.push(leoS3(self, outQueue, configure, { prefix: id }));
}
args.push(ls.toLeo(id, opts));
args.push(ls.toCheckpoint({
args.push(self.toLeo(id, opts));
args.push(self.toCheckpoint({
debug: opts.debug,
force: opts.force,
onCheckpoint: opts.onCheckpoint
}));
args.push(callback);
return ls.pipe.apply(ls, args);
return self.pipe.apply(self, args);
},
offload: function(opts, callback) {
configure.validate();
var self = this;
var id = opts.id;
var inQueue = opts.inQueue || opts.queue;
var func = opts.each || opts.transform;
Expand Down Expand Up @@ -519,9 +522,9 @@ module.exports = function(configure) {
batch.size.highWaterMark = batch.size.highWaterMark || 2;

var batchSize = typeof batch.size === "number" ? batch.size : (batch.size.count || batch.size.records);
return ls.pipe(
ls.fromLeo(id, inQueue, opts),
ls.through((obj, done) => {
return self.pipe(
self.fromLeo(id, inQueue, opts),
self.through((obj, done) => {
batch.map(obj.payload, obj, (err, r, rOpts) => {
rOpts = rOpts || {};
if (err || !r) {
Expand All @@ -532,8 +535,8 @@ module.exports = function(configure) {
}
});
}),
ls.batch(batch.size),
ls.through({
self.batch(batch.size),
self.through({
highWaterMark: 1
}, (batch, done) => {
batch.units = batch.payload.length;
Expand All @@ -547,10 +550,10 @@ module.exports = function(configure) {
done(null, batch);
}
}),
ls.process(id, func, null, undefined, {
self.process(id, func, null, undefined, {
highWaterMark: 1
}),
ls.toCheckpoint({
self.toCheckpoint({
debug: opts.debug,
force: opts.force,
onCheckpoint: opts.onCheckpoint
Expand Down Expand Up @@ -2415,7 +2418,7 @@ module.exports = function(configure) {
opts = {};
}

var stream = ls.load(id, event, opts);
var stream = this.load(id, event, opts);
stream.write(obj);
stream.end(err => {
err && logger.info("Error:", err);
Expand Down
14 changes: 8 additions & 6 deletions test/lib.mock-sdk.utest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -864,11 +864,13 @@ describe("lib/mock-sdk.ts", function () {
});
});

it("can be explicitly disabled to preserve useS3", (done) => {
it("can be explicitly disabled to preserve useS3", () => {
const baseSdk = createMinimalSdk();
const sdkWithS3 = mockRStreamsSdk(baseSdk, { disableS3: false });
sdkWithS3.mock.queues["in-q"] = [{ a: 1 }];

// Verify the wrapper doesn't strip useS3 by checking the opts
// object after calling enrich. We don't actually run the pipeline
// (useS3:true would try real S3), just confirm the flag survives.
const enrichOpts: any = {
id: "bot",
inQueue: "in-q",
Expand All @@ -877,10 +879,10 @@ describe("lib/mock-sdk.ts", function () {
transform(payload: any, event: any, cb: any) { cb(null, payload); },
};

sdkWithS3.enrich(enrichOpts, (err: any) => {
assert.isTrue(enrichOpts.useS3);
done(err);
});
// Call enrich — it will start the pipeline but we don't need to wait
// for completion. The synchronous stripping (or not) happens immediately.
sdkWithS3.enrich(enrichOpts, () => {});
assert.isTrue(enrichOpts.useS3);
});
});

Expand Down
Loading