mirror of
https://github.com/nodejs/node.git
synced 2025-08-15 13:48:44 +02:00
stream: support typed arrays
PR-URL: https://github.com/nodejs/node/pull/51866 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Raz Luvaton <rluvaton@gmail.com> Reviewed-By: Paolo Insogna <paolo@cowtech.it> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
parent
d62ab3a1ef
commit
7d258db1d7
8 changed files with 250 additions and 35 deletions
36
benchmark/streams/readable-uint8array.js
Normal file
36
benchmark/streams/readable-uint8array.js
Normal file
|
@ -0,0 +1,36 @@
|
|||
'use strict';
|
||||
const common = require('../common.js');
|
||||
const { Readable } = require('stream');
|
||||
|
||||
const bench = common.createBenchmark(main, {
|
||||
n: [1e6],
|
||||
kind: ['read', 'encoding'],
|
||||
});
|
||||
const ABC = new Uint8Array([0x41, 0x42, 0x43]);
|
||||
|
||||
function main({ n, kind }) {
|
||||
switch (kind) {
|
||||
case 'read': {
|
||||
bench.start();
|
||||
const readable = new Readable({
|
||||
read() {},
|
||||
});
|
||||
for (let i = 0; i < n; ++i) readable.push(ABC);
|
||||
bench.end(n);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'encoding': {
|
||||
bench.start();
|
||||
const readable = new Readable({
|
||||
read() {},
|
||||
});
|
||||
readable.setEncoding('utf8');
|
||||
for (let i = 0; i < n; ++i) readable.push(ABC);
|
||||
bench.end(n);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new Error('Invalid kind');
|
||||
}
|
||||
}
|
51
benchmark/streams/writable-uint8array.js
Normal file
51
benchmark/streams/writable-uint8array.js
Normal file
|
@ -0,0 +1,51 @@
|
|||
'use strict';
|
||||
const common = require('../common.js');
|
||||
const { Writable } = require('stream');
|
||||
|
||||
const bench = common.createBenchmark(main, {
|
||||
n: [50e6],
|
||||
kind: ['write', 'object-mode', 'writev'],
|
||||
});
|
||||
const ABC = new Uint8Array([0x41, 0x42, 0x43]);
|
||||
|
||||
function main({ n, kind }) {
|
||||
switch (kind) {
|
||||
case 'write': {
|
||||
bench.start();
|
||||
const wr = new Writable({
|
||||
write(chunk, encoding, cb) {
|
||||
cb();
|
||||
},
|
||||
});
|
||||
for (let i = 0; i < n; ++i) wr.write(ABC);
|
||||
bench.end(n);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'object-mode': {
|
||||
bench.start();
|
||||
const wr = new Writable({
|
||||
objectMode: true,
|
||||
write(chunk, encoding, cb) {
|
||||
cb();
|
||||
},
|
||||
});
|
||||
for (let i = 0; i < n; ++i) wr.write(ABC);
|
||||
bench.end(n);
|
||||
break;
|
||||
}
|
||||
case 'writev': {
|
||||
bench.start();
|
||||
const wr = new Writable({
|
||||
writev(chunks, cb) {
|
||||
cb();
|
||||
},
|
||||
});
|
||||
for (let i = 0; i < n; ++i) wr.write(ABC);
|
||||
bench.end(n);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new Error('Invalid kind');
|
||||
}
|
||||
}
|
|
@ -282,11 +282,19 @@ The `finished` API also provides a [callback version][stream-finished].
|
|||
|
||||
### Object mode
|
||||
|
||||
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
|
||||
(or `Uint8Array`) objects. It is possible, however, for stream implementations
|
||||
to work with other types of JavaScript values (with the exception of `null`,
|
||||
which serves a special purpose within streams). Such streams are considered to
|
||||
operate in "object mode".
|
||||
All streams created by Node.js APIs operate exclusively on strings, {Buffer},
|
||||
{TypedArray} and {DataView} objects:
|
||||
|
||||
* `Strings` and `Buffers` are the most common types used with streams.
|
||||
* `TypedArray` and `DataView` lets you handle binary data with types like
|
||||
`Int32Array` or `Uint8Array`. When you write a TypedArray or DataView to a
|
||||
stream, Node.js processes
|
||||
the raw bytes.
|
||||
|
||||
It is possible, however, for stream
|
||||
implementations to work with other types of JavaScript values (with the
|
||||
exception of `null`, which serves a special purpose within streams).
|
||||
Such streams are considered to operate in "object mode".
|
||||
|
||||
Stream instances are switched into object mode using the `objectMode` option
|
||||
when the stream is created. Attempting to switch an existing stream into
|
||||
|
@ -712,6 +720,9 @@ console.log(myStream.destroyed); // true
|
|||
<!-- YAML
|
||||
added: v0.9.4
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/51866
|
||||
description: The `chunk` argument can now be a `TypedArray` or `DataView` instance.
|
||||
- version: v15.0.0
|
||||
pr-url: https://github.com/nodejs/node/pull/34101
|
||||
description: The `callback` is invoked before 'finish' or on error.
|
||||
|
@ -726,10 +737,10 @@ changes:
|
|||
description: The `chunk` argument can now be a `Uint8Array` instance.
|
||||
-->
|
||||
|
||||
* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
|
||||
not operating in object mode, `chunk` must be a string, `Buffer` or
|
||||
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
|
||||
other than `null`.
|
||||
* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For
|
||||
streams not operating in object mode, `chunk` must be a {string}, {Buffer},
|
||||
{TypedArray} or {DataView}. For object mode streams, `chunk` may be any
|
||||
JavaScript value other than `null`.
|
||||
* `encoding` {string} The encoding if `chunk` is a string
|
||||
* `callback` {Function} Callback for when the stream is finished.
|
||||
* Returns: {this}
|
||||
|
@ -926,6 +937,9 @@ Getter for the property `objectMode` of a given `Writable` stream.
|
|||
<!-- YAML
|
||||
added: v0.9.4
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/51866
|
||||
description: The `chunk` argument can now be a `TypedArray` or `DataView` instance.
|
||||
- version: v8.0.0
|
||||
pr-url: https://github.com/nodejs/node/pull/11608
|
||||
description: The `chunk` argument can now be a `Uint8Array` instance.
|
||||
|
@ -935,10 +949,10 @@ changes:
|
|||
considered invalid now, even in object mode.
|
||||
-->
|
||||
|
||||
* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
|
||||
not operating in object mode, `chunk` must be a string, `Buffer` or
|
||||
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
|
||||
other than `null`.
|
||||
* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For
|
||||
streams not operating in object mode, `chunk` must be a {string}, {Buffer},
|
||||
{TypedArray} or {DataView}. For object mode streams, `chunk` may be any
|
||||
JavaScript value other than `null`.
|
||||
* `encoding` {string|null} The encoding, if `chunk` is a string. **Default:** `'utf8'`
|
||||
* `callback` {Function} Callback for when this chunk of data is flushed.
|
||||
* Returns: {boolean} `false` if the stream wishes for the calling code to
|
||||
|
@ -1763,15 +1777,18 @@ setTimeout(() => {
|
|||
<!-- YAML
|
||||
added: v0.9.11
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/51866
|
||||
description: The `chunk` argument can now be a `TypedArray` or `DataView` instance.
|
||||
- version: v8.0.0
|
||||
pr-url: https://github.com/nodejs/node/pull/11608
|
||||
description: The `chunk` argument can now be a `Uint8Array` instance.
|
||||
-->
|
||||
|
||||
* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to unshift onto the
|
||||
read queue. For streams not operating in object mode, `chunk` must be a
|
||||
string, `Buffer`, `Uint8Array`, or `null`. For object mode streams, `chunk`
|
||||
may be any JavaScript value.
|
||||
* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to unshift
|
||||
onto the read queue. For streams not operating in object mode, `chunk` must
|
||||
be a {string}, {Buffer}, {TypedArray}, {DataView} or `null`.
|
||||
For object mode streams, `chunk` may be any JavaScript value.
|
||||
* `encoding` {string} Encoding of string chunks. Must be a valid
|
||||
`Buffer` encoding, such as `'utf8'` or `'ascii'`.
|
||||
|
||||
|
@ -3515,8 +3532,8 @@ changes:
|
|||
**Default:** `'utf8'`.
|
||||
* `objectMode` {boolean} Whether or not the
|
||||
[`stream.write(anyObj)`][stream-write] is a valid operation. When set,
|
||||
it becomes possible to write JavaScript values other than string,
|
||||
`Buffer` or `Uint8Array` if supported by the stream implementation.
|
||||
it becomes possible to write JavaScript values other than string, {Buffer},
|
||||
{TypedArray} or {DataView} if supported by the stream implementation.
|
||||
**Default:** `false`.
|
||||
* `emitClose` {boolean} Whether or not the stream should emit `'close'`
|
||||
after it has been destroyed. **Default:** `true`.
|
||||
|
@ -4068,22 +4085,25 @@ It can be overridden by child classes but it **must not** be called directly.
|
|||
|
||||
<!-- YAML
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/51866
|
||||
description: The `chunk` argument can now be a `TypedArray` or `DataView` instance.
|
||||
- version: v8.0.0
|
||||
pr-url: https://github.com/nodejs/node/pull/11608
|
||||
description: The `chunk` argument can now be a `Uint8Array` instance.
|
||||
-->
|
||||
|
||||
* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the
|
||||
read queue. For streams not operating in object mode, `chunk` must be a
|
||||
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
|
||||
any JavaScript value.
|
||||
* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to push
|
||||
into the read queue. For streams not operating in object mode, `chunk` must
|
||||
be a {string}, {Buffer}, {TypedArray} or {DataView}. For object mode streams,
|
||||
`chunk` may be any JavaScript value.
|
||||
* `encoding` {string} Encoding of string chunks. Must be a valid
|
||||
`Buffer` encoding, such as `'utf8'` or `'ascii'`.
|
||||
* Returns: {boolean} `true` if additional chunks of data may continue to be
|
||||
pushed; `false` otherwise.
|
||||
|
||||
When `chunk` is a `Buffer`, `Uint8Array`, or `string`, the `chunk` of data will
|
||||
be added to the internal queue for users of the stream to consume.
|
||||
When `chunk` is a {Buffer}, {TypedArray}, {DataView} or {string}, the `chunk`
|
||||
of data will be added to the internal queue for users of the stream to consume.
|
||||
Passing `chunk` as `null` signals the end of the stream (EOF), after which no
|
||||
more data can be written.
|
||||
|
||||
|
@ -4758,8 +4778,9 @@ situations within Node.js where this is done, particularly in the
|
|||
|
||||
Use of `readable.push('')` is not recommended.
|
||||
|
||||
Pushing a zero-byte string, `Buffer`, or `Uint8Array` to a stream that is not in
|
||||
object mode has an interesting side effect. Because it _is_ a call to
|
||||
Pushing a zero-byte {string}, {Buffer}, {TypedArray} or {DataView} to a stream
|
||||
that is not in object mode has an interesting side effect.
|
||||
Because it _is_ a call to
|
||||
[`readable.push()`][stream-push], the call will end the reading process.
|
||||
However, because the argument is an empty string, no data is added to the
|
||||
readable buffer so there is nothing for a user to consume.
|
||||
|
|
|
@ -420,11 +420,11 @@ function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) {
|
|||
chunk = Buffer.from(chunk, encoding);
|
||||
}
|
||||
}
|
||||
} else if (Stream._isUint8Array(chunk)) {
|
||||
} else if (Stream._isArrayBufferView(chunk)) {
|
||||
chunk = Stream._uint8ArrayToBuffer(chunk);
|
||||
} else if (chunk !== undefined && !(chunk instanceof Buffer)) {
|
||||
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
|
||||
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
|
||||
'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -473,12 +473,12 @@ function readableAddChunkPushByteMode(stream, state, chunk, encoding) {
|
|||
}
|
||||
} else if (chunk instanceof Buffer) {
|
||||
encoding = '';
|
||||
} else if (Stream._isUint8Array(chunk)) {
|
||||
} else if (Stream._isArrayBufferView(chunk)) {
|
||||
chunk = Stream._uint8ArrayToBuffer(chunk);
|
||||
encoding = '';
|
||||
} else if (chunk !== undefined) {
|
||||
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
|
||||
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
|
||||
'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -467,12 +467,12 @@ function _write(stream, chunk, encoding, cb) {
|
|||
}
|
||||
} else if (chunk instanceof Buffer) {
|
||||
encoding = 'buffer';
|
||||
} else if (Stream._isUint8Array(chunk)) {
|
||||
} else if (Stream._isArrayBufferView(chunk)) {
|
||||
chunk = Stream._uint8ArrayToBuffer(chunk);
|
||||
encoding = 'buffer';
|
||||
} else {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
|
||||
'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ const internalBuffer = require('internal/buffer');
|
|||
|
||||
const promises = require('stream/promises');
|
||||
const utils = require('internal/streams/utils');
|
||||
const { isArrayBufferView, isUint8Array } = require('internal/util/types');
|
||||
|
||||
const Stream = module.exports = require('internal/streams/legacy').Stream;
|
||||
|
||||
|
@ -137,7 +138,8 @@ ObjectDefineProperty(eos, customPromisify, {
|
|||
// Backwards-compat with node 0.4.x
|
||||
Stream.Stream = Stream;
|
||||
|
||||
Stream._isUint8Array = require('internal/util/types').isUint8Array;
|
||||
Stream._isArrayBufferView = isArrayBufferView;
|
||||
Stream._isUint8Array = isUint8Array;
|
||||
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
|
||||
return new internalBuffer.FastBuffer(chunk.buffer,
|
||||
chunk.byteOffset,
|
||||
|
|
|
@ -34,6 +34,6 @@ assert.throws(() => {
|
|||
code: 'ERR_INVALID_ARG_TYPE',
|
||||
name: 'TypeError',
|
||||
message: 'The "chunk" argument must be of type string or an instance of ' +
|
||||
`Buffer or Uint8Array.${common.invalidArgTypeHelper(value)}`
|
||||
`Buffer, TypedArray, or DataView.${common.invalidArgTypeHelper(value)}`
|
||||
});
|
||||
});
|
||||
|
|
105
test/parallel/test-stream-typedarray.js
Normal file
105
test/parallel/test-stream-typedarray.js
Normal file
|
@ -0,0 +1,105 @@
|
|||
'use strict';
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
|
||||
const { Readable, Writable } = require('stream');
|
||||
|
||||
const buffer = Buffer.from('ABCD');
|
||||
const views = common.getArrayBufferViews(buffer);
|
||||
|
||||
{
|
||||
// Simple Writable test.
|
||||
let n = 0;
|
||||
const writable = new Writable({
|
||||
write: common.mustCall((chunk, encoding, cb) => {
|
||||
assert(chunk instanceof Buffer);
|
||||
assert(ArrayBuffer.isView(chunk));
|
||||
assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]);
|
||||
n++;
|
||||
cb();
|
||||
}, views.length),
|
||||
});
|
||||
|
||||
views.forEach((msg) => writable.write(msg));
|
||||
writable.end();
|
||||
}
|
||||
|
||||
{
|
||||
// Writable test with object mode True.
|
||||
let n = 0;
|
||||
const writable = new Writable({
|
||||
objectMode: true,
|
||||
write: common.mustCall((chunk, encoding, cb) => {
|
||||
assert(!(chunk instanceof Buffer));
|
||||
assert(ArrayBuffer.isView(chunk));
|
||||
assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]);
|
||||
n++;
|
||||
cb();
|
||||
}, views.length),
|
||||
});
|
||||
|
||||
views.forEach((msg) => writable.write(msg));
|
||||
writable.end();
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
// Writable test, multiple writes carried out via writev.
|
||||
let n = 0;
|
||||
let callback;
|
||||
const writable = new Writable({
|
||||
write: common.mustCall((chunk, encoding, cb) => {
|
||||
assert(chunk instanceof Buffer);
|
||||
assert(ArrayBuffer.isView(chunk));
|
||||
assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]);
|
||||
n++;
|
||||
callback = cb;
|
||||
}),
|
||||
|
||||
writev: common.mustCall((chunks, cb) => {
|
||||
assert.strictEqual(chunks.length, views.length);
|
||||
let res = '';
|
||||
for (const chunk of chunks) {
|
||||
assert.strictEqual(chunk.encoding, 'buffer');
|
||||
res += chunk.chunk;
|
||||
}
|
||||
assert.strictEqual(res, 'ABCD'.repeat(9));
|
||||
}),
|
||||
|
||||
});
|
||||
views.forEach((msg) => writable.write(msg));
|
||||
writable.end(views[0]);
|
||||
callback();
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
// Simple Readable test.
|
||||
const readable = new Readable({
|
||||
read() {}
|
||||
});
|
||||
|
||||
readable.push(views[1]);
|
||||
readable.push(views[2]);
|
||||
readable.unshift(views[0]);
|
||||
|
||||
const buf = readable.read();
|
||||
assert(buf instanceof Buffer);
|
||||
assert.deepStrictEqual([...buf], [...views[0], ...views[1], ...views[2]]);
|
||||
}
|
||||
|
||||
{
|
||||
// Readable test, setEncoding.
|
||||
const readable = new Readable({
|
||||
read() {}
|
||||
});
|
||||
|
||||
readable.setEncoding('utf8');
|
||||
|
||||
readable.push(views[1]);
|
||||
readable.push(views[2]);
|
||||
readable.unshift(views[0]);
|
||||
|
||||
const out = readable.read();
|
||||
assert.strictEqual(out, 'ABCD'.repeat(3));
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue