diff --git a/index.js b/index.js index ca5fc59..2d1d318 100644 --- a/index.js +++ b/index.js @@ -61,6 +61,7 @@ function through (write, end, opts) { }) function _end () { + if (stream.paused) return stream.on('resume', _end) stream.writable = false end.call(stream) if(!stream.readable && stream.autoDestroy) diff --git a/test/async.js b/test/async.js index 46bdbae..78e8e36 100644 --- a/test/async.js +++ b/test/async.js @@ -1,4 +1,5 @@ var from = require('from') +var Readable = require('stream').Readable var through = require('../') var tape = require('tape') @@ -26,3 +27,25 @@ tape('simple async example', function (t) { }) }) + +tape('does not end when paused with Readable Stream', function (t) { + var rs = new Readable + var expected = ['1','2','3','4','5'], actual = [] + + expected.forEach(data => rs.push(data)) + rs.push(null) + + rs.pipe(through(function(dataBuffer) { + var data = dataBuffer.toString() + this.pause() + setTimeout(function(){ + console.log('pushing data', data) + actual.push(data) + this.resume() + }.bind(this), 300) + }, function() { + t.deepEqual(actual, expected) + t.end() + })) + +})