diff --git a/lib/internal/streams/duplexpair.js b/lib/internal/streams/duplexpair.js index a32084c4d4cbdf..daeacc4ac9c42d 100644 --- a/lib/internal/streams/duplexpair.js +++ b/lib/internal/streams/duplexpair.js @@ -50,6 +50,19 @@ class DuplexSide extends Duplex { this.#otherSide.on('end', callback); this.#otherSide.push(null); } + + + _destroy(err, callback) { + if (err) { + // Error case: tell the other side to also destroy with that error. + this.#otherSide.destroy(err); + } else if (this.#otherSide && !this.#otherSide.destroyed) { + // Graceful close case (destroy() without error): + // send an EOF to the other side's readable end if it hasn't already closed. + this.#otherSide.push(null); + } + callback(err); + } } function duplexPair(options) { @@ -57,6 +70,6 @@ function duplexPair(options) { const side1 = new DuplexSide(options); side0[kInitOtherSide](side1); side1[kInitOtherSide](side0); - return [ side0, side1 ]; + return [side0, side1]; } module.exports = duplexPair; diff --git a/test/parallel/test-duplex-error.js b/test/parallel/test-duplex-error.js new file mode 100644 index 00000000000000..bdc2912f38bc0e --- /dev/null +++ b/test/parallel/test-duplex-error.js @@ -0,0 +1,30 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { duplexPair } = require('stream'); + +const [sideA, sideB] = duplexPair(); + +let sideAErrorReceived = false; +let sideBErrorReceived = false; + +// Use common.mustCall inside the listeners to ensure they trigger +sideA.on('error', common.mustCall((err) => { + sideAErrorReceived = true; +})); + +sideB.on('error', common.mustCall((err) => { + sideBErrorReceived = true; +})); + +sideA.resume(); +sideB.resume(); + +sideB.destroy(new Error('Simulated error')); + +// Wrap the callback in common.mustCall() +setImmediate(common.mustCall(() => { + assert.strictEqual(sideAErrorReceived, true); + assert.strictEqual(sideBErrorReceived, true); +}));