stream: Simplify flowing, passive data listening

Closes #5860

In streams2, there is an "old mode" for compatibility.  Once switched
into this mode, there is no going back.

With this change, there is a "flowing mode" and a "paused mode".  If you
add a data listener, then this will start the flow of data.  However,
hitting the `pause()` method will switch *back* into a non-flowing mode,
where the `read()` method will pull data out.

Every time `read()` returns a data chunk, it also emits a `data` event.
In this way, a passive data listener can be added, and the stream passed
off to some other reader, for use with progress bars and the like.

There is no API change beyond this added flexibility.
This commit is contained in:
isaacs 2013-07-17 18:24:02 -07:00
parent 5fcd6e4038
commit 0f8de5e1f9
11 changed files with 433 additions and 201 deletions

View file

@ -104,11 +104,35 @@ Readable stream.
A Readable stream will not start emitting data until you indicate that A Readable stream will not start emitting data until you indicate that
you are ready to receive it. you are ready to receive it.
Readable streams have two "modes": a **flowing mode** and a **non-flowing Readable streams have two "modes": a **flowing mode** and a **paused
mode**. When in flowing mode, data is read from the underlying system mode**. When in flowing mode, data is read from the underlying system
and provided to your program as fast as possible. In non-flowing and provided to your program as fast as possible. In paused mode, you
mode, you must explicitly call `stream.read()` to get chunks of data must explicitly call `stream.read()` to get chunks of data out.
out. Streams start out in paused mode.
**Note**: If no data event handlers are attached, and there are no
[`pipe()`][] destinations, and the stream is switched into flowing
mode, then data will be lost.
You can switch to flowing mode by doing any of the following:
* Adding a [`'data'` event][] handler to listen for data.
* Calling the [`resume()`][] method to explicitly open the flow.
* Calling the [`pipe()`][] method to send the data to a [Writable][].
You can switch back to paused mode by doing either of the following:
* If there are no pipe destinations, by calling the [`pause()`][]
method.
* If there are pipe destinations, by removing any [`'data'` event][]
handlers, and removing all pipe destinations by calling the
[`unpipe()`][] method.
Note that, for backwards compatibility reasons, removing `'data'`
event handlers will **not** automatically pause the stream. Also, if
there are piped destinations, then calling `pause()` will not
guarantee that the stream will *remain* paused once those
destinations drain and ask for more data.
Examples of readable streams include: Examples of readable streams include:
@ -144,9 +168,9 @@ again when more data is available.
* `chunk` {Buffer | String} The chunk of data. * `chunk` {Buffer | String} The chunk of data.
If you attach a `data` event listener, then it will switch the stream Attaching a `data` event listener to a stream that has not been
into flowing mode, and data will be passed to your handler as soon as explicitly paused will switch the stream into flowing mode. Data will
it is available. then be passed as soon as it is available.
If you just want to get all the data out of the stream as fast as If you just want to get all the data out of the stream as fast as
possible, this is the best way to do so. possible, this is the best way to do so.
@ -200,9 +224,9 @@ bytes. If `size` bytes are not available, then it will return `null`.
If you do not specify a `size` argument, then it will return all the If you do not specify a `size` argument, then it will return all the
data in the internal buffer. data in the internal buffer.
This method should only be called in non-flowing mode. In This method should only be called in paused mode. In flowing mode,
flowing-mode, this method is called automatically until the internal this method is called automatically until the internal buffer is
buffer is drained. drained.
```javascript ```javascript
var readable = getReadableStreamSomehow(); var readable = getReadableStreamSomehow();
@ -214,6 +238,9 @@ readable.on('readable', function() {
}); });
``` ```
If this method returns a data chunk, then it will also trigger the
emission of a [`'data'` event][].
#### readable.setEncoding(encoding) #### readable.setEncoding(encoding)
* `encoding` {String} The encoding to use. * `encoding` {String} The encoding to use.
@ -244,9 +271,9 @@ readable.on('data', function(chunk) {
This method will cause the readable stream to resume emitting `data` This method will cause the readable stream to resume emitting `data`
events. events.
This method will switch the stream into flowing-mode. If you do *not* This method will switch the stream into flowing mode. If you do *not*
want to consume the data from a stream, but you *do* want to get to want to consume the data from a stream, but you *do* want to get to
its `end` event, you can call `readable.resume()` to open the flow of its `end` event, you can call [`readable.resume()`][] to open the flow of
data. data.
```javascript ```javascript
@ -259,13 +286,9 @@ readable.on('end', function(chunk) {
#### readable.pause() #### readable.pause()
This method will cause a stream in flowing-mode to stop emitting This method will cause a stream in flowing mode to stop emitting
`data` events. Any data that becomes available will remain in the `data` events, switching out of flowing mode. Any data that becomes
internal buffer. available will remain in the internal buffer.
This method is only relevant in flowing mode. When called on a
non-flowing stream, it will switch into flowing mode, but remain
paused.
```javascript ```javascript
var readable = getReadableStreamSomehow(); var readable = getReadableStreamSomehow();
@ -414,7 +437,7 @@ entire Streams API as it is today. (See "Compatibility" below for
more information.) more information.)
If you are using an older Node library that emits `'data'` events and If you are using an older Node library that emits `'data'` events and
has a `pause()` method that is advisory only, then you can use the has a [`pause()`][] method that is advisory only, then you can use the
`wrap()` method to create a [Readable][] stream that uses the old stream `wrap()` method to create a [Readable][] stream that uses the old stream
as its data source. as its data source.
@ -1298,23 +1321,23 @@ simpler, but also less powerful and less useful.
events would start emitting immediately. If you needed to do some events would start emitting immediately. If you needed to do some
I/O to decide how to handle data, then you had to store the chunks I/O to decide how to handle data, then you had to store the chunks
in some kind of buffer so that they would not be lost. in some kind of buffer so that they would not be lost.
* The `pause()` method was advisory, rather than guaranteed. This * The [`pause()`][] method was advisory, rather than guaranteed. This
meant that you still had to be prepared to receive `'data'` events meant that you still had to be prepared to receive `'data'` events
even when the stream was in a paused state. even when the stream was in a paused state.
In Node v0.10, the Readable class described below was added. For In Node v0.10, the Readable class described below was added. For
backwards compatibility with older Node programs, Readable streams backwards compatibility with older Node programs, Readable streams
switch into "flowing mode" when a `'data'` event handler is added, or switch into "flowing mode" when a `'data'` event handler is added, or
when the `pause()` or `resume()` methods are called. The effect is when the [`resume()`][] method is called. The effect is that, even if
that, even if you are not using the new `read()` method and you are not using the new `read()` method and `'readable'` event, you
`'readable'` event, you no longer have to worry about losing `'data'` no longer have to worry about losing `'data'` chunks.
chunks.
Most programs will continue to function normally. However, this Most programs will continue to function normally. However, this
introduces an edge case in the following conditions: introduces an edge case in the following conditions:
* No `'data'` event handler is added. * No [`'data'` event][] handler is added.
* The `pause()` and `resume()` methods are never called. * The [`resume()`][] method is never called.
* The stream is not piped to any writable destination.
For example, consider the following code: For example, consider the following code:
@ -1336,7 +1359,7 @@ simply discarded. However, in Node v0.10 and beyond, the socket will
remain paused forever. remain paused forever.
The workaround in this situation is to call the `resume()` method to The workaround in this situation is to call the `resume()` method to
trigger "old mode" behavior: start the flow of data:
```javascript ```javascript
// Workaround // Workaround
@ -1352,9 +1375,9 @@ net.createServer(function(socket) {
}).listen(1337); }).listen(1337);
``` ```
In addition to new Readable streams switching into flowing-mode, pre-v0.10 In addition to new Readable streams switching into flowing mode,
style streams can be wrapped in a Readable class using the `wrap()` pre-v0.10 style streams can be wrapped in a Readable class using the
method. `wrap()` method.
### Object Mode ### Object Mode
@ -1494,3 +1517,9 @@ modify them.
[_write]: #stream_writable_write_chunk_encoding_callback_1 [_write]: #stream_writable_write_chunk_encoding_callback_1
[`util.inherits`]: util.html#util_util_inherits_constructor_superconstructor [`util.inherits`]: util.html#util_util_inherits_constructor_superconstructor
[`end()`]: #stream_writable_end_chunk_encoding_callback [`end()`]: #stream_writable_end_chunk_encoding_callback
[`'data'` event]: #stream_event_data
[`resume()`]: #stream_readable_resume
[`readable.resume()`]: #stream_readable_resume
[`pause()`]: #stream_readable_pause
[`unpipe()`]: #stream_readable_unpipe_destination
[`pipe()`]: #stream_readable_pipe_destination_options

View file

@ -26,6 +26,7 @@ var EE = require('events').EventEmitter;
var Stream = require('stream'); var Stream = require('stream');
var util = require('util'); var util = require('util');
var StringDecoder; var StringDecoder;
var debug = util.debuglog('stream');
util.inherits(Readable, Stream); util.inherits(Readable, Stream);
@ -44,7 +45,7 @@ function ReadableState(options, stream) {
this.length = 0; this.length = 0;
this.pipes = null; this.pipes = null;
this.pipesCount = 0; this.pipesCount = 0;
this.flowing = false; this.flowing = null;
this.ended = false; this.ended = false;
this.endEmitted = false; this.endEmitted = false;
this.reading = false; this.reading = false;
@ -250,6 +251,7 @@ function howMuchToRead(n, state) {
// you can override either this method, or the async _read(n) below. // you can override either this method, or the async _read(n) below.
Readable.prototype.read = function(n) { Readable.prototype.read = function(n) {
debug('read', n);
var state = this._readableState; var state = this._readableState;
state.calledRead = true; state.calledRead = true;
var nOrig = n; var nOrig = n;
@ -263,6 +265,10 @@ Readable.prototype.read = function(n) {
if (n === 0 && if (n === 0 &&
state.needReadable && state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) { (state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable');
if (state.length === 0 && state.ended)
endReadable(this);
else
emitReadable(this); emitReadable(this);
return null; return null;
} }
@ -300,17 +306,23 @@ Readable.prototype.read = function(n) {
// if we need a readable event, then we need to do some reading. // if we need a readable event, then we need to do some reading.
var doRead = state.needReadable; var doRead = state.needReadable;
debug('need readable', doRead);
// if we currently have less than the highWaterMark, then also read some // if we currently have less than the highWaterMark, then also read some
if (state.length - n <= state.highWaterMark) if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true; doRead = true;
debug('length less than watermark', doRead);
}
// however, if we've ended, then there's no point, and if we're already // however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary. // reading, then it's unnecessary.
if (state.ended || state.reading) if (state.ended || state.reading) {
doRead = false; doRead = false;
debug('reading or ended', doRead);
}
if (doRead) { if (doRead) {
debug('do read');
state.reading = true; state.reading = true;
state.sync = true; state.sync = true;
// if the length is currently zero, then we *need* a readable event. // if the length is currently zero, then we *need* a readable event.
@ -351,6 +363,8 @@ Readable.prototype.read = function(n) {
if (state.ended && !state.endEmitted && state.length === 0) if (state.ended && !state.endEmitted && state.length === 0)
endReadable(this); endReadable(this);
if (ret !== null)
this.emit('data', ret);
return ret; return ret;
}; };
@ -392,9 +406,8 @@ function onEofChunk(stream, state) {
function emitReadable(stream) { function emitReadable(stream) {
var state = stream._readableState; var state = stream._readableState;
state.needReadable = false; state.needReadable = false;
if (state.emittedReadable) if (!state.emittedReadable) {
return; debug('emitReadable', state.flowing);
state.emittedReadable = true; state.emittedReadable = true;
if (state.sync) if (state.sync)
process.nextTick(function() { process.nextTick(function() {
@ -402,10 +415,13 @@ function emitReadable(stream) {
}); });
else else
emitReadable_(stream); emitReadable_(stream);
}
} }
function emitReadable_(stream) { function emitReadable_(stream) {
debug('emit readable');
stream.emit('readable'); stream.emit('readable');
flow(stream);
} }
@ -428,6 +444,7 @@ function maybeReadMore_(stream, state) {
var len = state.length; var len = state.length;
while (!state.reading && !state.flowing && !state.ended && while (!state.reading && !state.flowing && !state.ended &&
state.length < state.highWaterMark) { state.length < state.highWaterMark) {
debug('maybeReadMore read 0');
stream.read(0); stream.read(0);
if (len === state.length) if (len === state.length)
// didn't get any data, stop spinning. // didn't get any data, stop spinning.
@ -462,6 +479,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
break; break;
} }
state.pipesCount += 1; state.pipesCount += 1;
debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
var doEnd = (!pipeOpts || pipeOpts.end !== false) && var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout && dest !== process.stdout &&
@ -475,11 +493,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.on('unpipe', onunpipe); dest.on('unpipe', onunpipe);
function onunpipe(readable) { function onunpipe(readable) {
if (readable !== src) return; debug('onunpipe');
if (readable === src) {
cleanup(); cleanup();
} }
}
function onend() { function onend() {
debug('onend');
dest.end(); dest.end();
} }
@ -491,6 +512,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.on('drain', ondrain); dest.on('drain', ondrain);
function cleanup() { function cleanup() {
debug('cleanup');
// cleanup event handlers once the pipe is broken // cleanup event handlers once the pipe is broken
dest.removeListener('close', onclose); dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish); dest.removeListener('finish', onfinish);
@ -499,19 +521,34 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.removeListener('unpipe', onunpipe); dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend); src.removeListener('end', onend);
src.removeListener('end', cleanup); src.removeListener('end', cleanup);
src.removeListener('data', ondata);
// if the reader is waiting for a drain event from this // if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start // specific writer, then it would cause it to never start
// flowing again. // flowing again.
// So, if this is awaiting a drain, then we just call it now. // So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one. // If we don't know, then assume that we are waiting for one.
if (!dest._writableState || dest._writableState.needDrain) if (state.awaitDrain &&
(!dest._writableState || dest._writableState.needDrain))
ondrain(); ondrain();
} }
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
var ret = dest.write(chunk);
if (false === ret) {
debug('false write response, pause',
src._readableState.awaitDrain);
src._readableState.awaitDrain++;
src.pause();
}
}
// if the dest has an error, then stop piping into it. // if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this. // however, don't suppress the throwing behavior for this.
function onerror(er) { function onerror(er) {
debug('onerror', er);
unpipe(); unpipe();
if (EE.listenerCount(dest, 'error') === 0) if (EE.listenerCount(dest, 'error') === 0)
dest.emit('error', er); dest.emit('error', er);
@ -525,12 +562,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
} }
dest.once('close', onclose); dest.once('close', onclose);
function onfinish() { function onfinish() {
debug('onfinish');
dest.removeListener('close', onclose); dest.removeListener('close', onclose);
unpipe(); unpipe();
} }
dest.once('finish', onfinish); dest.once('finish', onfinish);
function unpipe() { function unpipe() {
debug('unpipe');
src.unpipe(dest); src.unpipe(dest);
} }
@ -539,16 +578,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// start the flow if it hasn't been started already. // start the flow if it hasn't been started already.
if (!state.flowing) { if (!state.flowing) {
// the handler that waits for readable events after all debug('pipe resume');
// the data gets sucked out in flow. src.resume();
// This would be easier to follow with a .once() handler
// in flow(), but that is too slow.
this.on('readable', pipeOnReadable);
state.flowing = true;
process.nextTick(function() {
flow(src);
});
} }
return dest; return dest;
@ -558,63 +589,16 @@ function pipeOnDrain(src) {
return function() { return function() {
var dest = this; var dest = this;
var state = src._readableState; var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--; state.awaitDrain--;
if (state.awaitDrain === 0) if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src); flow(src);
}
}; };
} }
function flow(src) {
var state = src._readableState;
var chunk;
state.awaitDrain = 0;
function write(dest, i, list) {
var written = dest.write(chunk);
if (false === written) {
state.awaitDrain++;
}
}
while (state.pipesCount && null !== (chunk = src.read())) {
if (state.pipesCount === 1)
write(state.pipes, 0, null);
else
state.pipes.forEach(write);
src.emit('data', chunk);
// if anyone needs a drain, then we have to wait for that.
if (state.awaitDrain > 0)
return;
}
// if every destination was unpiped, either before entering this
// function, or in the while loop, then stop flowing.
//
// NB: This is a pretty rare edge case.
if (state.pipesCount === 0) {
state.flowing = false;
// if there were data event listeners added, then switch to old mode.
if (EE.listenerCount(src, 'data') > 0)
emitDataEvents(src);
return;
}
// at this point, no one needed a drain, so we just ran out of data
// on the next readable event, start it over again.
state.ranOut = true;
}
function pipeOnReadable() {
if (this._readableState.ranOut) {
this._readableState.ranOut = false;
flow(this);
}
}
Readable.prototype.unpipe = function(dest) { Readable.prototype.unpipe = function(dest) {
var state = this._readableState; var state = this._readableState;
@ -635,7 +619,6 @@ Readable.prototype.unpipe = function(dest) {
// got a match. // got a match.
state.pipes = null; state.pipes = null;
state.pipesCount = 0; state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
state.flowing = false; state.flowing = false;
if (dest) if (dest)
dest.emit('unpipe', this); dest.emit('unpipe', this);
@ -650,7 +633,6 @@ Readable.prototype.unpipe = function(dest) {
var len = state.pipesCount; var len = state.pipesCount;
state.pipes = null; state.pipes = null;
state.pipesCount = 0; state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
state.flowing = false; state.flowing = false;
for (var i = 0; i < len; i++) for (var i = 0; i < len; i++)
@ -678,8 +660,11 @@ Readable.prototype.unpipe = function(dest) {
Readable.prototype.on = function(ev, fn) { Readable.prototype.on = function(ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn); var res = Stream.prototype.on.call(this, ev, fn);
if (ev === 'data' && !this._readableState.flowing) // If listening to data, and it has not explicitly been paused,
emitDataEvents(this); // then call resume to start the flow of data on the next tick.
if (ev === 'data' && false !== this._readableState.flowing) {
this.resume();
}
if (ev === 'readable' && this.readable) { if (ev === 'readable' && this.readable) {
var state = this._readableState; var state = this._readableState;
@ -688,7 +673,11 @@ Readable.prototype.on = function(ev, fn) {
state.emittedReadable = false; state.emittedReadable = false;
state.needReadable = true; state.needReadable = true;
if (!state.reading) { if (!state.reading) {
this.read(0); var self = this;
process.nextTick(function() {
debug('readable nexttick read 0');
self.read(0);
});
} else if (state.length) { } else if (state.length) {
emitReadable(this, state); emitReadable(this, state);
} }
@ -702,63 +691,52 @@ Readable.prototype.addListener = Readable.prototype.on;
// pause() and resume() are remnants of the legacy readable stream API // pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode. // If the user uses them, then switch into old mode.
Readable.prototype.resume = function() { Readable.prototype.resume = function() {
emitDataEvents(this); var state = this._readableState;
if (!state.flowing) {
debug('resume');
state.flowing = true;
if (!state.reading) {
debug('resume read 0');
this.read(0); this.read(0);
this.emit('resume'); }
resume(this, state);
}
}; };
function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
process.nextTick(function() {
resume_(stream, state);
});
}
}
function resume_(stream, state) {
state.resumeScheduled = false;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading)
stream.read(0);
}
Readable.prototype.pause = function() { Readable.prototype.pause = function() {
emitDataEvents(this, true); debug('call pause flowing=%j', this._readableState.flowing);
if (false !== this._readableState.flowing) {
debug('pause');
this._readableState.flowing = false;
this.emit('pause'); this.emit('pause');
}
}; };
function emitDataEvents(stream, startPaused) { function flow(stream) {
var state = stream._readableState; var state = stream._readableState;
debug('flow', state.flowing);
if (state.flowing) { if (state.flowing) {
// https://github.com/isaacs/readable-stream/issues/16 do {
throw new Error('Cannot switch to old mode now.'); var chunk = stream.read();
} while (null !== chunk && state.flowing);
} }
var paused = startPaused || false;
var readable = false;
// convert to an old-style stream.
stream.readable = true;
stream.pipe = Stream.prototype.pipe;
stream.on = stream.addListener = Stream.prototype.on;
stream.on('readable', function() {
readable = true;
var c;
while (!paused && (null !== (c = stream.read())))
stream.emit('data', c);
if (c === null) {
readable = false;
stream._readableState.needReadable = true;
}
});
stream.pause = function() {
paused = true;
this.emit('pause');
};
stream.resume = function() {
paused = false;
if (readable)
process.nextTick(function() {
stream.emit('readable');
});
else
this.read(0);
this.emit('resume');
};
// now make it start, just in case it hadn't already.
stream.emit('readable');
} }
// wrap an old-style stream as the async data source. // wrap an old-style stream as the async data source.
@ -770,6 +748,7 @@ Readable.prototype.wrap = function(stream) {
var self = this; var self = this;
stream.on('end', function() { stream.on('end', function() {
debug('wrapped end');
if (state.decoder && !state.ended) { if (state.decoder && !state.ended) {
var chunk = state.decoder.end(); var chunk = state.decoder.end();
if (chunk && chunk.length) if (chunk && chunk.length)
@ -780,6 +759,7 @@ Readable.prototype.wrap = function(stream) {
}); });
stream.on('data', function(chunk) { stream.on('data', function(chunk) {
debug('wrapped data');
if (state.decoder) if (state.decoder)
chunk = state.decoder.write(chunk); chunk = state.decoder.write(chunk);
if (!chunk || !state.objectMode && !chunk.length) if (!chunk || !state.objectMode && !chunk.length)
@ -812,6 +792,7 @@ Readable.prototype.wrap = function(stream) {
// when we try to consume some more bytes, simply unpause the // when we try to consume some more bytes, simply unpause the
// underlying stream. // underlying stream.
self._read = function(n) { self._read = function(n) {
debug('wrapped _read', n);
if (paused) { if (paused) {
paused = false; paused = false;
stream.resume(); stream.resume();

View file

@ -0,0 +1,52 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var spawn = require('child_process').spawn;
var child = spawn(process.execPath, [], {
env: {
NODE_DEBUG: process.argv[2]
}
});
var wanted = child.pid + '\n';
var found = '';
child.stdout.setEncoding('utf8');
child.stdout.on('data', function(c) {
found += c;
});
child.stderr.setEncoding('utf8');
child.stderr.on('data', function(c) {
console.error('> ' + c.trim().split(/\n/).join('\n> '));
});
child.on('close', function(c) {
assert(!c);
assert.equal(found, wanted);
console.log('ok');
});
setTimeout(function() {
child.stdin.end('console.log(process.pid)');
});

View file

@ -37,10 +37,8 @@ function TestReader(n) {
util.inherits(TestReader, R); util.inherits(TestReader, R);
TestReader.prototype.read = function(n) { TestReader.prototype._read = function(n) {
if (n === 0) return null;
var max = this._buffer.length - this._pos; var max = this._buffer.length - this._pos;
n = n || max;
n = Math.max(n, 0); n = Math.max(n, 0);
var toRead = Math.min(n, max); var toRead = Math.min(n, max);
if (toRead === 0) { if (toRead === 0) {
@ -51,20 +49,21 @@ TestReader.prototype.read = function(n) {
this._bufs -= 1; this._bufs -= 1;
if (this._bufs <= 0) { if (this._bufs <= 0) {
// read them all! // read them all!
if (!this.ended) { if (!this.ended)
this.emit('end'); this.push(null);
this.ended = true;
}
} else { } else {
this.emit('readable'); // now we have more.
// kinda cheating by calling _read, but whatever,
// it's just fake anyway.
this._read(n);
} }
}.bind(this), 10); }.bind(this), 10);
return null; return;
} }
var ret = this._buffer.slice(this._pos, this._pos + toRead); var ret = this._buffer.slice(this._pos, this._pos + toRead);
this._pos += toRead; this._pos += toRead;
return ret; this.push(ret);
}; };
///// /////
@ -135,21 +134,17 @@ test('a most basic test', function(t) {
'xxx', 'xxx',
'xxxx', 'xxxx',
'xxxxx', 'xxxxx',
'xxxxx',
'xxxxxxxx',
'xxxxxxxxx', 'xxxxxxxxx',
'xxx', 'xxxxxxxxxx',
'xxxxxxxxxxxx', 'xxxxxxxxxxxx',
'xxxxxxxx', 'xxxxxxxxxxxxx',
'xxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxx',
'xxxxx', 'xxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxx',
'xx', 'xxxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxxxx' ];
'xxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxx' ];
r.on('end', function() { r.on('end', function() {
t.same(reads, expect); t.same(reads, expect);
@ -342,6 +337,7 @@ test('back pressure respected', function (t) {
var w1 = new R(); var w1 = new R();
w1.write = function (chunk) { w1.write = function (chunk) {
console.error('w1.emit("close")');
assert.equal(chunk[0], "one"); assert.equal(chunk[0], "one");
w1.emit("close"); w1.emit("close");
process.nextTick(function () { process.nextTick(function () {
@ -357,6 +353,7 @@ test('back pressure respected', function (t) {
var w2 = new R(); var w2 = new R();
w2.write = function (chunk) { w2.write = function (chunk) {
console.error('w2 write', chunk, counter);
assert.equal(chunk[0], expected.shift()); assert.equal(chunk[0], expected.shift());
assert.equal(counter, 0); assert.equal(counter, 0);
@ -368,6 +365,7 @@ test('back pressure respected', function (t) {
setTimeout(function () { setTimeout(function () {
counter--; counter--;
console.error("w2 drain");
w2.emit("drain"); w2.emit("drain");
}, 10); }, 10);
@ -377,6 +375,7 @@ test('back pressure respected', function (t) {
var w3 = new R(); var w3 = new R();
w3.write = function (chunk) { w3.write = function (chunk) {
console.error('w3 write', chunk, counter);
assert.equal(chunk[0], expected.shift()); assert.equal(chunk[0], expected.shift());
assert.equal(counter, 1); assert.equal(counter, 1);
@ -388,6 +387,7 @@ test('back pressure respected', function (t) {
setTimeout(function () { setTimeout(function () {
counter--; counter--;
console.error("w3 drain");
w3.emit("drain"); w3.emit("drain");
}, 50); }, 50);

View file

@ -47,4 +47,7 @@ TestReader.prototype._read = function(n) {
}; };
var reader = new TestReader(); var reader = new TestReader();
assert.equal(ondataCalled, 1); setImmediate(function() {
assert.equal(ondataCalled, 1);
console.log('ok');
});

View file

@ -237,9 +237,12 @@ test('high watermark _read', function(t) {
assert.equal(v, '1'); assert.equal(v, '1');
var v2 = r.read(); var v2 = r.read();
assert.equal(v2, '2');
var v3 = r.read();
assert.equal(v3, '3');
assert.equal(calls, 1); assert.equal(calls, 1);
assert.equal(v2, '2');
t.end(); t.end();
}); });

View file

@ -39,17 +39,14 @@ function runTest(highWaterMark, objectMode, produce) {
ended = true; ended = true;
}); });
var pauses = 0;
var resumes = 0;
old.pause = function() { old.pause = function() {
pauses++; console.error('old.pause()');
old.emit('pause'); old.emit('pause');
flowing = false; flowing = false;
}; };
old.resume = function() { old.resume = function() {
resumes++; console.error('old.resume()');
old.emit('resume'); old.emit('resume');
flow(); flow();
}; };
@ -63,8 +60,9 @@ function runTest(highWaterMark, objectMode, produce) {
while (flowing && chunks-- > 0) { while (flowing && chunks-- > 0) {
var item = produce(); var item = produce();
expected.push(item); expected.push(item);
console.log('emit', chunks); console.log('old.emit', chunks, flowing);
old.emit('data', item); old.emit('data', item);
console.log('after emit', chunks, flowing);
} }
if (chunks <= 0) { if (chunks <= 0) {
oldEnded = true; oldEnded = true;
@ -76,7 +74,7 @@ function runTest(highWaterMark, objectMode, produce) {
var w = new Writable({ highWaterMark: highWaterMark * 2, objectMode: objectMode }); var w = new Writable({ highWaterMark: highWaterMark * 2, objectMode: objectMode });
var written = []; var written = [];
w._write = function(chunk, encoding, cb) { w._write = function(chunk, encoding, cb) {
console.log(chunk); console.log('_write', chunk);
written.push(chunk); written.push(chunk);
setTimeout(cb); setTimeout(cb);
}; };
@ -94,11 +92,10 @@ function runTest(highWaterMark, objectMode, produce) {
assert(ended); assert(ended);
assert(oldEnded); assert(oldEnded);
assert.deepEqual(written, expected); assert.deepEqual(written, expected);
assert.equal(pauses, 10);
assert.equal(resumes, 9);
} }
} }
runTest(100, false, function(){ return new Buffer(100); });
runTest(10, false, function(){ return new Buffer('xxxxxxxxxx'); }); runTest(10, false, function(){ return new Buffer('xxxxxxxxxx'); });
runTest(1, true, function(){ return { foo: 'bar' }; }); runTest(1, true, function(){ return { foo: 'bar' }; });

View file

@ -310,8 +310,7 @@ test('complex transform', function(t) {
pt.end(); pt.end();
t.end(); t.end();
}); });
t.equal(pt.read().toString(), 'abc'); t.equal(pt.read().toString(), 'abcdef');
t.equal(pt.read().toString(), 'def');
t.equal(pt.read(), null); t.equal(pt.read(), null);
}); });
}); });

View file

@ -68,7 +68,8 @@ assert.equal(dest.listeners('finish').length, 0);
console.error(src._readableState); console.error(src._readableState);
process.on('exit', function() { process.on('exit', function() {
assert(src._readableState.length >= src._readableState.highWaterMark);
src._readableState.buffer.length = 0; src._readableState.buffer.length = 0;
console.error(src._readableState); console.error(src._readableState);
assert(src._readableState.length >= src._readableState.highWaterMark);
console.log('ok');
}); });

View file

@ -0,0 +1,167 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;
var totalChunks = 100;
var chunkSize = 99;
var expectTotalData = totalChunks * chunkSize;
var expectEndingData = expectTotalData;
var r = new Readable({ highWaterMark: 1000 });
var chunks = totalChunks;
r._read = function(n) {
if (!(chunks % 2))
setImmediate(push);
else if (!(chunks % 3))
process.nextTick(push);
else
push();
};
var totalPushed = 0;
function push() {
var chunk = chunks-- > 0 ? new Buffer(chunkSize) : null;
if (chunk) {
totalPushed += chunk.length;
chunk.fill('x');
}
r.push(chunk);
}
read100();
// first we read 100 bytes
function read100() {
readn(100, onData);
}
function readn(n, then) {
console.error('read %d', n);
expectEndingData -= n;
;(function read() {
var c = r.read(n);
if (!c)
r.once('readable', read);
else {
assert.equal(c.length, n);
assert(!r._readableState.flowing);
then();
}
})();
}
// then we listen to some data events
function onData() {
expectEndingData -= 100;
console.error('onData');
var seen = 0;
r.on('data', function od(c) {
seen += c.length;
if (seen >= 100) {
// seen enough
r.removeListener('data', od);
r.pause();
if (seen > 100) {
// oh no, seen too much!
// put the extra back.
var diff = seen - 100;
r.unshift(c.slice(c.length - diff));
console.error('seen too much', seen, diff);
}
// Nothing should be lost in between
setImmediate(pipeLittle);
}
});
}
// Just pipe 200 bytes, then unshift the extra and unpipe
function pipeLittle() {
expectEndingData -= 200;
console.error('pipe a little');
var w = new Writable();
var written = 0;
w.on('finish', function() {
assert.equal(written, 200);
setImmediate(read1234);
});
w._write = function(chunk, encoding, cb) {
written += chunk.length;
if (written >= 200) {
r.unpipe(w);
w.end();
cb();
if (written > 200) {
var diff = written - 200;
written -= diff;
r.unshift(chunk.slice(chunk.length - diff));
}
} else {
setImmediate(cb);
}
};
r.pipe(w);
}
// now read 1234 more bytes
function read1234() {
readn(1234, resumePause);
}
function resumePause() {
console.error('resumePause');
// don't read anything, just resume and re-pause a whole bunch
r.resume();
r.pause();
r.resume();
r.pause();
r.resume();
r.pause();
r.resume();
r.pause();
r.resume();
r.pause();
setImmediate(pipe);
}
function pipe() {
console.error('pipe the rest');
var w = new Writable();
var written = 0;
w._write = function(chunk, encoding, cb) {
written += chunk.length;
cb();
};
w.on('finish', function() {
console.error('written', written, totalPushed);
assert.equal(written, expectEndingData);
assert.equal(totalPushed, expectTotalData);
console.log('ok');
});
r.pipe(w);
}