stream: Return false from push() more properly

There are cases where a push() call would return true, even though
the thing being pushed was in fact way way larger than the high
water mark, simply because the 'needReadable' was already set, and
would not get unset until nextTick.

In some cases, this could lead to an infinite loop of pushing data
into the buffer, never getting to the 'readable' event which would
unset the needReadable flag.

Fix by splitting up the emitReadable function, so that it always
sets the flag on this tick, even if it defers until nextTick to
actually emit the event.

Also, if we're not ending or already in the process of reading, it
now calls read(0) if we're below the high water mark.  Thus, the
highWaterMark value is the intended amount to buffer up to, and it
is smarter about hitting the target.
This commit is contained in:
isaacs 2013-02-21 14:30:36 -08:00
parent 3b2e9d2648
commit a63c28e6eb
2 changed files with 110 additions and 11 deletions

View file

@ -327,19 +327,13 @@ function onread(stream, er, chunk) {
return;
}
// Don't emit readable right away in sync mode, because this can trigger
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
if (state.needReadable) {
if (!sync)
emitReadable(stream);
else
process.nextTick(function() {
emitReadable(stream);
});
}
if (state.needReadable)
emitReadable(stream);
}
// Don't emit readable right away in sync mode, because this can trigger
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
@ -347,7 +341,28 @@ function emitReadable(stream) {
return;
state.emittedReadable = true;
if (state.sync)
process.nextTick(function() {
emitReadable_(stream);
});
else
emitReadable_(stream);
}
function emitReadable_(stream) {
var state = stream._readableState;
stream.emit('readable');
// at this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered
// in turn another _read(n,cb) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more right now preemptively.
if (!state.reading && !state.ending && !state.ended &&
state.length < state.highWaterMark) {
stream.read(0);
}
}
// abstract method. to be overridden in specific implementation classes.