mirror of
https://github.com/nodejs/node.git
synced 2025-08-15 13:48:44 +02:00
stream: Throw on 'error' if listeners removed
In this situation:
writable.on('error', handler);
readable.pipe(writable);
writable.removeListener('error', handler);
writable.emit('error', new Error('boom'));
there is actually no error handler, but it doesn't throw, because of the
fix for stream.once('error', handler), in 23d92ec
.
Note that simply reverting that change is not valid either, because
otherwise this will emit twice, being handled the first time, and then
throwing the second:
writable.once('error', handler);
readable.pipe(writable);
writable.emit('error', new Error('boom'));
Fix this with a horrible hack to make the stream pipe onerror handler
added before any other userland handlers, so that our handler is not
affected by adding or removing any userland handlers.
Closes #6007.
This commit is contained in:
parent
0c2960ef4a
commit
545807918e
2 changed files with 85 additions and 4 deletions
|
@ -511,14 +511,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
||||||
|
|
||||||
// if the dest has an error, then stop piping into it.
|
// if the dest has an error, then stop piping into it.
|
||||||
// however, don't suppress the throwing behavior for this.
|
// however, don't suppress the throwing behavior for this.
|
||||||
// check for listeners before emit removes one-time listeners.
|
|
||||||
var errListeners = EE.listenerCount(dest, 'error');
|
|
||||||
function onerror(er) {
|
function onerror(er) {
|
||||||
unpipe();
|
unpipe();
|
||||||
if (errListeners === 0 && EE.listenerCount(dest, 'error') === 0)
|
dest.removeListener('error', onerror);
|
||||||
|
if (EE.listenerCount(dest, 'error') === 0)
|
||||||
dest.emit('error', er);
|
dest.emit('error', er);
|
||||||
}
|
}
|
||||||
dest.once('error', onerror);
|
// This is a brutally ugly hack to make sure that our error handler
|
||||||
|
// is attached before any userland ones. NEVER DO THIS.
|
||||||
|
if (!dest._events.error)
|
||||||
|
dest.on('error', onerror);
|
||||||
|
else if (Array.isArray(dest._events.error))
|
||||||
|
dest._events.error.unshift(onerror);
|
||||||
|
else
|
||||||
|
dest._events.error = [onerror, dest._events.error];
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Both close and finish should trigger unpipe, but only once.
|
// Both close and finish should trigger unpipe, but only once.
|
||||||
function onclose() {
|
function onclose() {
|
||||||
|
|
|
@ -56,3 +56,76 @@ var Stream = require('stream').Stream;
|
||||||
|
|
||||||
assert.strictEqual(gotErr, err);
|
assert.strictEqual(gotErr, err);
|
||||||
})();
|
})();
|
||||||
|
|
||||||
|
(function testErrorWithRemovedListenerThrows() {
|
||||||
|
var EE = require('events').EventEmitter;
|
||||||
|
var R = Stream.Readable;
|
||||||
|
var W = Stream.Writable;
|
||||||
|
|
||||||
|
var r = new R;
|
||||||
|
var w = new W;
|
||||||
|
var removed = false;
|
||||||
|
var didTest = false;
|
||||||
|
|
||||||
|
process.on('exit', function() {
|
||||||
|
assert(didTest);
|
||||||
|
console.log('ok');
|
||||||
|
});
|
||||||
|
|
||||||
|
r._read = function() {
|
||||||
|
setTimeout(function() {
|
||||||
|
assert(removed);
|
||||||
|
assert.throws(function() {
|
||||||
|
w.emit('error', new Error('fail'));
|
||||||
|
});
|
||||||
|
didTest = true;
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
w.on('error', myOnError);
|
||||||
|
r.pipe(w);
|
||||||
|
w.removeListener('error', myOnError);
|
||||||
|
removed = true;
|
||||||
|
|
||||||
|
function myOnError(er) {
|
||||||
|
throw new Error('this should not happen');
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
(function testErrorWithRemovedListenerThrows() {
|
||||||
|
var EE = require('events').EventEmitter;
|
||||||
|
var R = Stream.Readable;
|
||||||
|
var W = Stream.Writable;
|
||||||
|
|
||||||
|
var r = new R;
|
||||||
|
var w = new W;
|
||||||
|
var removed = false;
|
||||||
|
var didTest = false;
|
||||||
|
var caught = false;
|
||||||
|
|
||||||
|
process.on('exit', function() {
|
||||||
|
assert(didTest);
|
||||||
|
console.log('ok');
|
||||||
|
});
|
||||||
|
|
||||||
|
r._read = function() {
|
||||||
|
setTimeout(function() {
|
||||||
|
assert(removed);
|
||||||
|
w.emit('error', new Error('fail'));
|
||||||
|
didTest = true;
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
w.on('error', myOnError);
|
||||||
|
w._write = function() {};
|
||||||
|
|
||||||
|
r.pipe(w);
|
||||||
|
// Removing some OTHER random listener should not do anything
|
||||||
|
w.removeListener('error', function() {});
|
||||||
|
removed = true;
|
||||||
|
|
||||||
|
function myOnError(er) {
|
||||||
|
assert(!caught);
|
||||||
|
caught = true;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue