mirror of
https://github.com/nodejs/node.git
synced 2025-08-18 23:28:49 +02:00
stream: implement streams to webstreams adapters
Experimental adapters for the webstreams API Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: https://github.com/nodejs/node/pull/39134 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
09b57f7909
commit
a99c230305
13 changed files with 2358 additions and 0 deletions
|
@ -1892,6 +1892,87 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
|
|||
the strings or buffers be iterated to match the other streams semantics
|
||||
for performance reasons.
|
||||
|
||||
### `stream.Readable.fromWeb(readableStream[, options])`
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `readableStream` {ReadableStream}
|
||||
* `options` {Object}
|
||||
* `encoding` {string}
|
||||
* `highWaterMark` {number}
|
||||
* `objectModel` {boolean}
|
||||
* `signal` {AbortSignal}
|
||||
* Returns: {stream.Readable}
|
||||
|
||||
### `stream.Readable.toWeb(streamReadable)`
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `streamReadable` {stream.Readable}
|
||||
* Returns: {ReadableStream}
|
||||
|
||||
### `stream.Writable.fromWeb(writableStream[, options])`
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `writableStream` {WritableStream}
|
||||
* `options` {Object}
|
||||
* `decodeStrings` {boolean}
|
||||
* `highWaterMark` {number}
|
||||
* `objectMode` {boolean}
|
||||
* `signal` {AbortSignal}
|
||||
* Returns: {stream.Writable}
|
||||
|
||||
### `stream.Writable.toWeb(streamWritable)`
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `streamWritable` {stream.Writable}
|
||||
* Returns: {WritableStream}
|
||||
|
||||
### `stream.Duplex.fromWeb(pair[, options])`
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `pair` {Object}
|
||||
* `readable` {ReadableStream}
|
||||
* `writable` {WritableStream}
|
||||
* `options` {Object}
|
||||
* `allowHalfOpen` {boolean}
|
||||
* `decodeStrings` {boolean}
|
||||
* `encoding` {string}
|
||||
* `highWaterMark` {number}
|
||||
* `objectMode` {boolean}
|
||||
* `signal` {AbortSignal}
|
||||
* Returns: {stream.Duplex}
|
||||
|
||||
### `stream.Duplex.toWeb(streamDuplex)`
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `streamDuplex` {stream.Duplex}
|
||||
* Returns: {Object}
|
||||
* `readable` {ReadableStream}
|
||||
* `writable` {WritableStream}
|
||||
|
||||
### `stream.addAbortSignal(signal, stream)`
|
||||
<!-- YAML
|
||||
added: v15.4.0
|
||||
|
|
|
@ -788,6 +788,8 @@ module.exports = {
|
|||
appendFile,
|
||||
readFile,
|
||||
watch,
|
||||
|
||||
kHandle,
|
||||
},
|
||||
|
||||
FileHandle,
|
||||
|
|
|
@ -114,3 +114,22 @@ ObjectDefineProperties(Duplex.prototype, {
|
|||
}
|
||||
}
|
||||
});
|
||||
|
||||
let webStreamsAdapters;
|
||||
|
||||
// Lazy to avoid circular references
|
||||
function lazyWebStreams() {
|
||||
if (webStreamsAdapters === undefined)
|
||||
webStreamsAdapters = require('internal/webstreams/adapters');
|
||||
return webStreamsAdapters;
|
||||
}
|
||||
|
||||
Duplex.fromWeb = function(pair, options) {
|
||||
return lazyWebStreams().newStreamDuplexFromReadableWritablePair(
|
||||
pair,
|
||||
options);
|
||||
};
|
||||
|
||||
Duplex.toWeb = function(duplex) {
|
||||
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
|
||||
};
|
||||
|
|
|
@ -1355,3 +1355,22 @@ function endWritableNT(state, stream) {
|
|||
Readable.from = function(iterable, opts) {
|
||||
return from(Readable, iterable, opts);
|
||||
};
|
||||
|
||||
let webStreamsAdapters;
|
||||
|
||||
// Lazy to avoid circular references
|
||||
function lazyWebStreams() {
|
||||
if (webStreamsAdapters === undefined)
|
||||
webStreamsAdapters = require('internal/webstreams/adapters');
|
||||
return webStreamsAdapters;
|
||||
}
|
||||
|
||||
Readable.fromWeb = function(readableStream, options) {
|
||||
return lazyWebStreams().newStreamReadableFromReadableStream(
|
||||
readableStream,
|
||||
options);
|
||||
};
|
||||
|
||||
Readable.toWeb = function(streamReadable) {
|
||||
return lazyWebStreams().newStreamReadableFromReadableStream(streamReadable);
|
||||
};
|
||||
|
|
|
@ -872,3 +872,22 @@ Writable.prototype._destroy = function(err, cb) {
|
|||
Writable.prototype[EE.captureRejectionSymbol] = function(err) {
|
||||
this.destroy(err);
|
||||
};
|
||||
|
||||
let webStreamsAdapters;
|
||||
|
||||
// Lazy to avoid circular references
|
||||
function lazyWebStreams() {
|
||||
if (webStreamsAdapters === undefined)
|
||||
webStreamsAdapters = require('internal/webstreams/adapters');
|
||||
return webStreamsAdapters;
|
||||
}
|
||||
|
||||
Writable.fromWeb = function(writableStream, options) {
|
||||
return lazyWebStreams().newStreamWritableFromWritableStream(
|
||||
writableStream,
|
||||
options);
|
||||
};
|
||||
|
||||
Writable.toWeb = function(streamWritable) {
|
||||
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
|
||||
};
|
||||
|
|
921
lib/internal/webstreams/adapters.js
Normal file
921
lib/internal/webstreams/adapters.js
Normal file
|
@ -0,0 +1,921 @@
|
|||
'use strict';
|
||||
|
||||
const {
|
||||
ArrayPrototypeMap,
|
||||
PromiseAll,
|
||||
PromisePrototypeThen,
|
||||
PromisePrototypeFinally,
|
||||
PromiseResolve,
|
||||
Uint8Array,
|
||||
} = primordials;
|
||||
|
||||
const {
|
||||
ReadableStream,
|
||||
isReadableStream,
|
||||
} = require('internal/webstreams/readablestream');
|
||||
|
||||
const {
|
||||
WritableStream,
|
||||
isWritableStream,
|
||||
} = require('internal/webstreams/writablestream');
|
||||
|
||||
const {
|
||||
CountQueuingStrategy,
|
||||
} = require('internal/webstreams/queuingstrategies');
|
||||
|
||||
const {
|
||||
Writable,
|
||||
Readable,
|
||||
Duplex,
|
||||
destroy,
|
||||
} = require('stream');
|
||||
|
||||
const {
|
||||
isDestroyed,
|
||||
isReadable,
|
||||
isReadableEnded,
|
||||
isWritable,
|
||||
isWritableEnded,
|
||||
} = require('internal/streams/utils');
|
||||
|
||||
const {
|
||||
Buffer,
|
||||
} = require('buffer');
|
||||
|
||||
const {
|
||||
errnoException,
|
||||
codes: {
|
||||
ERR_INVALID_ARG_TYPE,
|
||||
ERR_INVALID_ARG_VALUE,
|
||||
ERR_INVALID_STATE,
|
||||
ERR_STREAM_PREMATURE_CLOSE,
|
||||
},
|
||||
} = require('internal/errors');
|
||||
|
||||
const {
|
||||
createDeferredPromise,
|
||||
} = require('internal/util');
|
||||
|
||||
const {
|
||||
validateBoolean,
|
||||
validateObject,
|
||||
} = require('internal/validators');
|
||||
|
||||
const {
|
||||
WriteWrap,
|
||||
ShutdownWrap,
|
||||
kReadBytesOrError,
|
||||
kLastWriteWasAsync,
|
||||
streamBaseState,
|
||||
} = internalBinding('stream_wrap');
|
||||
|
||||
const finished = require('internal/streams/end-of-stream');
|
||||
|
||||
const { UV_EOF } = internalBinding('uv');
|
||||
|
||||
/**
|
||||
* @typedef {import('../../stream').Writable} Writable
|
||||
* @typedef {import('../../stream').Readable} Readable
|
||||
* @typedef {import('./writablestream').WritableStream} WritableStream
|
||||
* @typedef {import('./readablestream').ReadableStream} ReadableStream
|
||||
*
|
||||
* @typedef {import('../abort_controller').AbortSignal} AbortSignal
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {Writable} streamWritable
|
||||
* @returns {WritableStream}
|
||||
*/
|
||||
function newWritableStreamFromStreamWritable(streamWritable) {
|
||||
// Not using the internal/streams/utils isWritableNodeStream utility
|
||||
// here because it will return false if streamWritable is a Duplex
|
||||
// whose writable option is false. For a Duplex that is not writable,
|
||||
// we want it to pass this check but return a closed WritableStream.
|
||||
if (typeof streamWritable?._writableState !== 'object') {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
'streamWritable',
|
||||
'stream.Writable',
|
||||
streamWritable);
|
||||
}
|
||||
|
||||
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
|
||||
const writable = new WritableStream();
|
||||
writable.close();
|
||||
return writable;
|
||||
}
|
||||
|
||||
const highWaterMark = streamWritable.writableHighWaterMark;
|
||||
const strategy =
|
||||
streamWritable.writableObjectMode ?
|
||||
new CountQueuingStrategy({ highWaterMark }) :
|
||||
{ highWaterMark };
|
||||
|
||||
let controller;
|
||||
let backpressurePromise;
|
||||
let closed;
|
||||
|
||||
function onDrain() {
|
||||
if (backpressurePromise !== undefined)
|
||||
backpressurePromise.resolve();
|
||||
}
|
||||
|
||||
const cleanup = finished(streamWritable, (error) => {
|
||||
cleanup();
|
||||
// This is a protection against non-standard, legacy streams
|
||||
// that happen to emit an error event again after finished is called.
|
||||
streamWritable.on('error', () => {});
|
||||
if (error != null) {
|
||||
if (backpressurePromise !== undefined)
|
||||
backpressurePromise.reject(error);
|
||||
// If closed is not undefined, the error is happening
|
||||
// after the WritableStream close has already started.
|
||||
// We need to reject it here.
|
||||
if (closed !== undefined) {
|
||||
closed.reject(error);
|
||||
closed = undefined;
|
||||
}
|
||||
controller.error(error);
|
||||
controller = undefined;
|
||||
return;
|
||||
}
|
||||
|
||||
if (closed !== undefined) {
|
||||
closed.resolve();
|
||||
closed = undefined;
|
||||
return;
|
||||
}
|
||||
controller.error(new ERR_STREAM_PREMATURE_CLOSE());
|
||||
controller = undefined;
|
||||
});
|
||||
|
||||
streamWritable.on('drain', onDrain);
|
||||
|
||||
return new WritableStream({
|
||||
start(c) { controller = c; },
|
||||
|
||||
async write(chunk) {
|
||||
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
|
||||
backpressurePromise = createDeferredPromise();
|
||||
return PromisePrototypeFinally(
|
||||
backpressurePromise.promise, () => {
|
||||
backpressurePromise = undefined;
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
abort(reason) {
|
||||
destroy(streamWritable, reason);
|
||||
},
|
||||
|
||||
close() {
|
||||
if (closed === undefined && !isWritableEnded(streamWritable)) {
|
||||
closed = createDeferredPromise();
|
||||
streamWritable.end();
|
||||
return closed.promise;
|
||||
}
|
||||
|
||||
controller = undefined;
|
||||
return PromiseResolve();
|
||||
},
|
||||
}, strategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {WritableStream} writableStream
|
||||
* @param {{
|
||||
* decodeStrings? : boolean,
|
||||
* highWaterMark? : number,
|
||||
* objectMode? : boolean,
|
||||
* signal? : AbortSignal,
|
||||
* }} [options]
|
||||
* @returns {Writable}
|
||||
*/
|
||||
function newStreamWritableFromWritableStream(writableStream, options = {}) {
|
||||
if (!isWritableStream(writableStream)) {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
'writableStream',
|
||||
'WritableStream',
|
||||
writableStream);
|
||||
}
|
||||
|
||||
validateObject(options, 'options');
|
||||
const {
|
||||
highWaterMark,
|
||||
decodeStrings = true,
|
||||
objectMode = false,
|
||||
signal,
|
||||
} = options;
|
||||
|
||||
validateBoolean(objectMode, 'options.objectMode');
|
||||
validateBoolean(decodeStrings, 'options.decodeStrings');
|
||||
|
||||
const writer = writableStream.getWriter();
|
||||
let closed = false;
|
||||
|
||||
const writable = new Writable({
|
||||
highWaterMark,
|
||||
objectMode,
|
||||
decodeStrings,
|
||||
signal,
|
||||
|
||||
writev(chunks, callback) {
|
||||
function done(error) {
|
||||
try {
|
||||
callback(error);
|
||||
} catch (error) {
|
||||
// In a next tick because this is happening within
|
||||
// a promise context, and if there are any errors
|
||||
// thrown we don't want those to cause an unhandled
|
||||
// rejection. Let's just escape the promise and
|
||||
// handle it separately.
|
||||
process.nextTick(() => destroy(writable, error));
|
||||
}
|
||||
}
|
||||
|
||||
PromisePrototypeThen(
|
||||
writer.ready,
|
||||
() => {
|
||||
return PromisePrototypeThen(
|
||||
PromiseAll(
|
||||
ArrayPrototypeMap(
|
||||
chunks,
|
||||
(chunk) => writer.write(chunk))),
|
||||
done,
|
||||
done);
|
||||
},
|
||||
done);
|
||||
},
|
||||
|
||||
write(chunk, encoding, callback) {
|
||||
if (typeof chunk === 'string' && decodeStrings && !objectMode) {
|
||||
chunk = Buffer.from(chunk, encoding);
|
||||
chunk = new Uint8Array(
|
||||
chunk.buffer,
|
||||
chunk.byteOffset,
|
||||
chunk.byteLength);
|
||||
}
|
||||
|
||||
function done(error) {
|
||||
try {
|
||||
callback(error);
|
||||
} catch (error) {
|
||||
destroy(writable, error);
|
||||
}
|
||||
}
|
||||
|
||||
PromisePrototypeThen(
|
||||
writer.ready,
|
||||
() => {
|
||||
return PromisePrototypeThen(
|
||||
writer.write(chunk),
|
||||
done,
|
||||
done);
|
||||
},
|
||||
done);
|
||||
},
|
||||
|
||||
destroy(error, callback) {
|
||||
function done() {
|
||||
try {
|
||||
callback(error);
|
||||
} catch (error) {
|
||||
// In a next tick because this is happening within
|
||||
// a promise context, and if there are any errors
|
||||
// thrown we don't want those to cause an unhandled
|
||||
// rejection. Let's just escape the promise and
|
||||
// handle it separately.
|
||||
process.nextTick(() => { throw error; });
|
||||
}
|
||||
}
|
||||
|
||||
if (!closed) {
|
||||
if (error != null) {
|
||||
PromisePrototypeThen(
|
||||
writer.abort(error),
|
||||
done,
|
||||
done);
|
||||
} else {
|
||||
PromisePrototypeThen(
|
||||
writer.close(),
|
||||
done,
|
||||
done);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
done();
|
||||
},
|
||||
|
||||
final(callback) {
|
||||
function done(error) {
|
||||
try {
|
||||
callback(error);
|
||||
} catch (error) {
|
||||
// In a next tick because this is happening within
|
||||
// a promise context, and if there are any errors
|
||||
// thrown we don't want those to cause an unhandled
|
||||
// rejection. Let's just escape the promise and
|
||||
// handle it separately.
|
||||
process.nextTick(() => destroy(writable, error));
|
||||
}
|
||||
}
|
||||
|
||||
if (!closed) {
|
||||
PromisePrototypeThen(
|
||||
writer.close(),
|
||||
done,
|
||||
done);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
PromisePrototypeThen(
|
||||
writer.closed,
|
||||
() => {
|
||||
// If the WritableStream closes before the stream.Writable has been
|
||||
// ended, we signal an error on the stream.Writable.
|
||||
closed = true;
|
||||
if (!isWritableEnded(writable))
|
||||
destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
|
||||
},
|
||||
(error) => {
|
||||
// If the WritableStream errors before the stream.Writable has been
|
||||
// destroyed, signal an error on the stream.Writable.
|
||||
closed = true;
|
||||
destroy(writable, error);
|
||||
});
|
||||
|
||||
return writable;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Readable} streamReadable
|
||||
* @returns {ReadableStream}
|
||||
*/
|
||||
function newReadableStreamFromStreamReadable(streamReadable) {
|
||||
// Not using the internal/streams/utils isReadableNodeStream utility
|
||||
// here because it will return false if streamReadable is a Duplex
|
||||
// whose readable option is false. For a Duplex that is not readable,
|
||||
// we want it to pass this check but return a closed ReadableStream.
|
||||
if (typeof streamReadable?._readableState !== 'object') {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
'streamReadable',
|
||||
'stream.Readable',
|
||||
streamReadable);
|
||||
}
|
||||
|
||||
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
|
||||
const readable = new ReadableStream();
|
||||
readable.cancel();
|
||||
return readable;
|
||||
}
|
||||
|
||||
const objectMode = streamReadable.readableObjectMode;
|
||||
const highWaterMark = streamReadable.readableHighWaterMark;
|
||||
// When not running in objectMode explicitly, we just fall
|
||||
// back to a minimal strategy that just specifies the highWaterMark
|
||||
// and no size algorithm. Using a ByteLengthQueuingStrategy here
|
||||
// is unnecessary.
|
||||
const strategy =
|
||||
objectMode ?
|
||||
new CountQueuingStrategy({ highWaterMark }) :
|
||||
{ highWaterMark };
|
||||
|
||||
let controller;
|
||||
|
||||
function onData(chunk) {
|
||||
// Copy the Buffer to detach it from the pool.
|
||||
if (Buffer.isBuffer(chunk) && !objectMode)
|
||||
chunk = new Uint8Array(chunk);
|
||||
controller.enqueue(chunk);
|
||||
if (controller.desiredSize <= 0)
|
||||
streamReadable.pause();
|
||||
}
|
||||
|
||||
streamReadable.pause();
|
||||
|
||||
const cleanup = finished(streamReadable, (error) => {
|
||||
cleanup();
|
||||
// This is a protection against non-standard, legacy streams
|
||||
// that happen to emit an error event again after finished is called.
|
||||
streamReadable.on('error', () => {});
|
||||
if (error)
|
||||
return controller.error(error);
|
||||
controller.close();
|
||||
});
|
||||
|
||||
streamReadable.on('data', onData);
|
||||
|
||||
return new ReadableStream({
|
||||
start(c) { controller = c; },
|
||||
|
||||
pull() { streamReadable.resume(); },
|
||||
|
||||
cancel(reason) {
|
||||
destroy(streamReadable, reason);
|
||||
},
|
||||
}, strategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {ReadableStream} readableStream
|
||||
* @param {{
|
||||
* highWaterMark? : number,
|
||||
* encoding? : string,
|
||||
* objectMode? : boolean,
|
||||
* signal? : AbortSignal,
|
||||
* }} [options]
|
||||
* @returns {Readable}
|
||||
*/
|
||||
function newStreamReadableFromReadableStream(readableStream, options = {}) {
|
||||
if (!isReadableStream(readableStream)) {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
'readableStream',
|
||||
'ReadableStream',
|
||||
readableStream);
|
||||
}
|
||||
|
||||
validateObject(options, 'options');
|
||||
const {
|
||||
highWaterMark,
|
||||
encoding,
|
||||
objectMode = false,
|
||||
signal,
|
||||
} = options;
|
||||
|
||||
if (encoding !== undefined && !Buffer.isEncoding(encoding))
|
||||
throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
|
||||
validateBoolean(objectMode, 'options.objectMode');
|
||||
|
||||
const reader = readableStream.getReader();
|
||||
let closed = false;
|
||||
|
||||
const readable = new Readable({
|
||||
objectMode,
|
||||
highWaterMark,
|
||||
encoding,
|
||||
signal,
|
||||
|
||||
read() {
|
||||
PromisePrototypeThen(
|
||||
reader.read(),
|
||||
(chunk) => {
|
||||
if (chunk.done) {
|
||||
// Value should always be undefined here.
|
||||
readable.push(null);
|
||||
} else {
|
||||
readable.push(chunk.value);
|
||||
}
|
||||
},
|
||||
(error) => destroy(readable, error));
|
||||
},
|
||||
|
||||
destroy(error, callback) {
|
||||
function done() {
|
||||
try {
|
||||
callback(error);
|
||||
} catch (error) {
|
||||
// In a next tick because this is happening within
|
||||
// a promise context, and if there are any errors
|
||||
// thrown we don't want those to cause an unhandled
|
||||
// rejection. Let's just escape the promise and
|
||||
// handle it separately.
|
||||
process.nextTick(() => { throw error; });
|
||||
}
|
||||
}
|
||||
|
||||
if (!closed) {
|
||||
PromisePrototypeThen(
|
||||
reader.cancel(error),
|
||||
done,
|
||||
done);
|
||||
return;
|
||||
}
|
||||
done(error);
|
||||
},
|
||||
});
|
||||
|
||||
PromisePrototypeThen(
|
||||
reader.closed,
|
||||
() => {
|
||||
closed = true;
|
||||
if (!isReadableEnded(readable))
|
||||
readable.push(null);
|
||||
},
|
||||
(error) => {
|
||||
closed = true;
|
||||
destroy(readable, error);
|
||||
});
|
||||
|
||||
return readable;
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {import('./readablestream').ReadableWritablePair
|
||||
* } ReadableWritablePair
|
||||
* @typedef {import('../../stream').Duplex} Duplex
|
||||
*
|
||||
* @param {Duplex} duplex
|
||||
* @returns {ReadableWritablePair}
|
||||
*/
|
||||
function newReadableWritablePairFromDuplex(duplex) {
|
||||
// Not using the internal/streams/utils isWritableNodeStream and
|
||||
// isReadableNodestream utilities here because they will return false
|
||||
// if the duplex was created with writable or readable options set to
|
||||
// false. Instead, we'll check the readable and writable state after
|
||||
// and return closed WritableStream or closed ReadableStream as
|
||||
// necessary.
|
||||
if (typeof duplex?._writableState !== 'object' ||
|
||||
typeof duplex?._readableState !== 'object') {
|
||||
throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex);
|
||||
}
|
||||
|
||||
if (isDestroyed(duplex)) {
|
||||
const writable = new WritableStream();
|
||||
const readable = new ReadableStream();
|
||||
writable.close();
|
||||
readable.cancel();
|
||||
return { readable, writable };
|
||||
}
|
||||
|
||||
const writable =
|
||||
isWritable(duplex) ?
|
||||
newWritableStreamFromStreamWritable(duplex) :
|
||||
new WritableStream();
|
||||
|
||||
if (!isWritable(duplex))
|
||||
writable.close();
|
||||
|
||||
const readable =
|
||||
isReadable(duplex) ?
|
||||
newReadableStreamFromStreamReadable(duplex) :
|
||||
new ReadableStream();
|
||||
|
||||
if (!isReadable(duplex))
|
||||
readable.cancel();
|
||||
|
||||
return { writable, readable };
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {ReadableWritablePair} pair
|
||||
* @param {{
|
||||
* allowHalfOpen? : boolean,
|
||||
* decodeStrings? : boolean,
|
||||
* encoding? : string,
|
||||
* highWaterMark? : number,
|
||||
* objectMode? : boolean,
|
||||
* signal? : AbortSignal,
|
||||
* }} [options]
|
||||
* @returns
|
||||
*/
|
||||
function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) {
|
||||
validateObject(pair, 'pair');
|
||||
const {
|
||||
readable: readableStream,
|
||||
writable: writableStream,
|
||||
} = pair;
|
||||
|
||||
if (!isReadableStream(readableStream)) {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
'pair.readable',
|
||||
'ReadableStream',
|
||||
readableStream);
|
||||
}
|
||||
if (!isWritableStream(writableStream)) {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
'pair.writable',
|
||||
'WritableStream',
|
||||
writableStream);
|
||||
}
|
||||
|
||||
validateObject(options, 'options');
|
||||
const {
|
||||
allowHalfOpen = false,
|
||||
objectMode = false,
|
||||
encoding,
|
||||
decodeStrings = true,
|
||||
highWaterMark,
|
||||
signal,
|
||||
} = options;
|
||||
|
||||
validateBoolean(objectMode, 'options.objectMode');
|
||||
if (encoding !== undefined && !Buffer.isEncoding(encoding))
|
||||
throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
|
||||
|
||||
const writer = writableStream.getWriter();
|
||||
const reader = readableStream.getReader();
|
||||
let writableClosed = false;
|
||||
let readableClosed = false;
|
||||
|
||||
const duplex = new Duplex({
|
||||
allowHalfOpen,
|
||||
highWaterMark,
|
||||
objectMode,
|
||||
encoding,
|
||||
decodeStrings,
|
||||
signal,
|
||||
|
||||
writev(chunks, callback) {
|
||||
function done(error) {
|
||||
try {
|
||||
callback(error);
|
||||
} catch (error) {
|
||||
// In a next tick because this is happening within
|
||||
// a promise context, and if there are any errors
|
||||
// thrown we don't want those to cause an unhandled
|
||||
// rejection. Let's just escape the promise and
|
||||
// handle it separately.
|
||||
process.nextTick(() => destroy(duplex, error));
|
||||
}
|
||||
}
|
||||
|
||||
PromisePrototypeThen(
|
||||
writer.ready,
|
||||
() => {
|
||||
return PromisePrototypeThen(
|
||||
PromiseAll(
|
||||
ArrayPrototypeMap(
|
||||
chunks,
|
||||
(chunk) => writer.write(chunk))),
|
||||
done,
|
||||
done);
|
||||
},
|
||||
done);
|
||||
},
|
||||
|
||||
write(chunk, encoding, callback) {
|
||||
if (typeof chunk === 'string' && decodeStrings && !objectMode) {
|
||||
chunk = Buffer.from(chunk, encoding);
|
||||
chunk = new Uint8Array(
|
||||
chunk.buffer,
|
||||
chunk.byteOffset,
|
||||
chunk.byteLength);
|
||||
}
|
||||
|
||||
function done(error) {
|
||||
try {
|
||||
callback(error);
|
||||
} catch (error) {
|
||||
destroy(duplex, error);
|
||||
}
|
||||
}
|
||||
|
||||
PromisePrototypeThen(
|
||||
writer.ready,
|
||||
() => {
|
||||
return PromisePrototypeThen(
|
||||
writer.write(chunk),
|
||||
done,
|
||||
done);
|
||||
},
|
||||
done);
|
||||
},
|
||||
|
||||
final(callback) {
|
||||
function done(error) {
|
||||
try {
|
||||
callback(error);
|
||||
} catch (error) {
|
||||
// In a next tick because this is happening within
|
||||
// a promise context, and if there are any errors
|
||||
// thrown we don't want those to cause an unhandled
|
||||
// rejection. Let's just escape the promise and
|
||||
// handle it separately.
|
||||
process.nextTick(() => destroy(duplex, error));
|
||||
}
|
||||
}
|
||||
|
||||
if (!writableClosed) {
|
||||
PromisePrototypeThen(
|
||||
writer.close(),
|
||||
done,
|
||||
done);
|
||||
}
|
||||
},
|
||||
|
||||
read() {
|
||||
PromisePrototypeThen(
|
||||
reader.read(),
|
||||
(chunk) => {
|
||||
if (chunk.done) {
|
||||
duplex.push(null);
|
||||
} else {
|
||||
duplex.push(chunk.value);
|
||||
}
|
||||
},
|
||||
(error) => destroy(duplex, error));
|
||||
},
|
||||
|
||||
destroy(error, callback) {
|
||||
function done() {
|
||||
try {
|
||||
callback(error);
|
||||
} catch (error) {
|
||||
// In a next tick because this is happening within
|
||||
// a promise context, and if there are any errors
|
||||
// thrown we don't want those to cause an unhandled
|
||||
// rejection. Let's just escape the promise and
|
||||
// handle it separately.
|
||||
process.nextTick(() => { throw error; });
|
||||
}
|
||||
}
|
||||
|
||||
async function closeWriter() {
|
||||
if (!writableClosed)
|
||||
await writer.abort(error);
|
||||
}
|
||||
|
||||
async function closeReader() {
|
||||
if (!readableClosed)
|
||||
await reader.cancel(error);
|
||||
}
|
||||
|
||||
if (!writableClosed || !readableClosed) {
|
||||
PromisePrototypeThen(
|
||||
PromiseAll([
|
||||
closeWriter(),
|
||||
closeReader(),
|
||||
]),
|
||||
done,
|
||||
done);
|
||||
return;
|
||||
}
|
||||
|
||||
done();
|
||||
},
|
||||
});
|
||||
|
||||
PromisePrototypeThen(
|
||||
writer.closed,
|
||||
() => {
|
||||
writableClosed = true;
|
||||
if (!isWritableEnded(duplex))
|
||||
destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE());
|
||||
},
|
||||
(error) => {
|
||||
writableClosed = true;
|
||||
readableClosed = true;
|
||||
destroy(duplex, error);
|
||||
});
|
||||
|
||||
PromisePrototypeThen(
|
||||
reader.closed,
|
||||
() => {
|
||||
readableClosed = true;
|
||||
if (!isReadableEnded(duplex))
|
||||
duplex.push(null);
|
||||
},
|
||||
(error) => {
|
||||
writableClosed = true;
|
||||
readableClosed = true;
|
||||
destroy(duplex, error);
|
||||
});
|
||||
|
||||
return duplex;
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
|
||||
* @typedef {{}} StreamBase
|
||||
* @param {StreamBase} streamBase
|
||||
* @param {QueuingStrategy} strategy
|
||||
* @returns {WritableStream}
|
||||
*/
|
||||
function newWritableStreamFromStreamBase(streamBase, strategy) {
|
||||
validateObject(streamBase, 'streamBase');
|
||||
|
||||
let current;
|
||||
|
||||
function createWriteWrap(controller, promise) {
|
||||
const req = new WriteWrap();
|
||||
req.handle = streamBase;
|
||||
req.oncomplete = onWriteComplete;
|
||||
req.async = false;
|
||||
req.bytes = 0;
|
||||
req.buffer = null;
|
||||
req.controller = controller;
|
||||
req.promise = promise;
|
||||
return req;
|
||||
}
|
||||
|
||||
function onWriteComplete(status) {
|
||||
if (status < 0) {
|
||||
const error = errnoException(status, 'write', this.error);
|
||||
this.promise.reject(error);
|
||||
this.controller.error(error);
|
||||
return;
|
||||
}
|
||||
this.promise.resolve();
|
||||
}
|
||||
|
||||
function doWrite(chunk, controller) {
|
||||
const promise = createDeferredPromise();
|
||||
let ret;
|
||||
let req;
|
||||
try {
|
||||
req = createWriteWrap(controller, promise);
|
||||
ret = streamBase.writeBuffer(req, chunk);
|
||||
if (streamBaseState[kLastWriteWasAsync])
|
||||
req.buffer = chunk;
|
||||
req.async = !!streamBaseState[kLastWriteWasAsync];
|
||||
} catch (error) {
|
||||
promise.reject(error);
|
||||
}
|
||||
|
||||
if (ret !== 0)
|
||||
promise.reject(errnoException(ret, 'write', req));
|
||||
else if (!req.async)
|
||||
promise.resolve();
|
||||
|
||||
return promise.promise;
|
||||
}
|
||||
|
||||
return new WritableStream({
|
||||
write(chunk, controller) {
|
||||
current = current !== undefined ?
|
||||
PromisePrototypeThen(
|
||||
current,
|
||||
() => doWrite(chunk, controller),
|
||||
(error) => controller.error(error)) :
|
||||
doWrite(chunk, controller);
|
||||
return current;
|
||||
},
|
||||
|
||||
close() {
|
||||
const promise = createDeferredPromise();
|
||||
const req = new ShutdownWrap();
|
||||
req.oncomplete = () => promise.resolve();
|
||||
const err = streamBase.shutdown(req);
|
||||
if (err === 1)
|
||||
promise.resolve();
|
||||
return promise.promise;
|
||||
},
|
||||
}, strategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {StreamBase} streamBase
|
||||
* @param {QueuingStrategy} strategy
|
||||
* @returns {ReadableStream}
|
||||
*/
|
||||
function newReadableStreamFromStreamBase(streamBase, strategy) {
|
||||
validateObject(streamBase, 'streamBase');
|
||||
|
||||
if (typeof streamBase.onread === 'function')
|
||||
throw new ERR_INVALID_STATE('StreamBase already has a consumer');
|
||||
|
||||
let controller;
|
||||
|
||||
streamBase.onread = (arrayBuffer) => {
|
||||
const nread = streamBaseState[kReadBytesOrError];
|
||||
|
||||
if (nread === 0)
|
||||
return;
|
||||
|
||||
try {
|
||||
if (nread === UV_EOF) {
|
||||
controller.close();
|
||||
streamBase.readStop();
|
||||
return;
|
||||
}
|
||||
|
||||
controller.enqueue(arrayBuffer);
|
||||
|
||||
if (controller.desiredSize <= 0)
|
||||
streamBase.readStop();
|
||||
} catch (error) {
|
||||
controller.error(error);
|
||||
streamBase.readStop();
|
||||
}
|
||||
};
|
||||
|
||||
return new ReadableStream({
|
||||
start(c) { controller = c; },
|
||||
|
||||
pull() {
|
||||
streamBase.readStart();
|
||||
},
|
||||
|
||||
cancel() {
|
||||
const promise = createDeferredPromise();
|
||||
const req = new ShutdownWrap();
|
||||
req.oncomplete = () => promise.resolve();
|
||||
const err = streamBase.shutdown(req);
|
||||
if (err === 1)
|
||||
promise.resolve();
|
||||
return promise.promise;
|
||||
},
|
||||
}, strategy);
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
newWritableStreamFromStreamWritable,
|
||||
newReadableStreamFromStreamReadable,
|
||||
newStreamWritableFromWritableStream,
|
||||
newStreamReadableFromReadableStream,
|
||||
newReadableWritablePairFromDuplex,
|
||||
newStreamDuplexFromReadableWritablePair,
|
||||
newWritableStreamFromStreamBase,
|
||||
newReadableStreamFromStreamBase,
|
||||
};
|
67
test/parallel/test-whatwg-webstreams-adapters-streambase.js
Normal file
67
test/parallel/test-whatwg-webstreams-adapters-streambase.js
Normal file
|
@ -0,0 +1,67 @@
|
|||
// Flags: --expose-internals --no-warnings
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const {
|
||||
internalBinding,
|
||||
} = require('internal/test/binding');
|
||||
|
||||
const {
|
||||
newWritableStreamFromStreamBase,
|
||||
newReadableStreamFromStreamBase,
|
||||
} = require('internal/webstreams/adapters');
|
||||
|
||||
const {
|
||||
JSStream
|
||||
} = internalBinding('js_stream');
|
||||
|
||||
{
|
||||
const stream = new JSStream();
|
||||
stream.onwrite = common.mustCall((req, buf) => {
|
||||
assert.deepStrictEqual(buf[0], Buffer.from('hello'));
|
||||
req.oncomplete();
|
||||
});
|
||||
|
||||
const writable = newWritableStreamFromStreamBase(stream);
|
||||
|
||||
const writer = writable.getWriter();
|
||||
|
||||
writer.write(Buffer.from('hello')).then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const buf = Buffer.from('hello');
|
||||
const check = new Uint8Array(buf);
|
||||
|
||||
const stream = new JSStream();
|
||||
|
||||
const readable = newReadableStreamFromStreamBase(stream);
|
||||
|
||||
const reader = readable.getReader();
|
||||
|
||||
reader.read().then(common.mustCall(({ done, value }) => {
|
||||
assert(!done);
|
||||
assert.deepStrictEqual(new Uint8Array(value), check);
|
||||
|
||||
reader.read().then(common.mustCall(({ done, value }) => {
|
||||
assert(done);
|
||||
assert.strictEqual(value, undefined);
|
||||
}));
|
||||
|
||||
}));
|
||||
|
||||
stream.readBuffer(buf);
|
||||
stream.emitEOF();
|
||||
}
|
||||
|
||||
{
|
||||
const stream = new JSStream();
|
||||
stream.onshutdown = common.mustCall((req) => {
|
||||
req.oncomplete();
|
||||
});
|
||||
const readable = newReadableStreamFromStreamBase(stream);
|
||||
readable.cancel().then(common.mustCall());
|
||||
}
|
|
@ -0,0 +1,204 @@
|
|||
// Flags: --no-warnings --expose-internals
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const {
|
||||
newReadableStreamFromStreamReadable,
|
||||
} = require('internal/webstreams/adapters');
|
||||
|
||||
const {
|
||||
Duplex,
|
||||
Readable,
|
||||
} = require('stream');
|
||||
|
||||
const {
|
||||
kState,
|
||||
} = require('internal/webstreams/util');
|
||||
|
||||
{
|
||||
// Canceling the readableStream closes the readable.
|
||||
const readable = new Readable({
|
||||
read() {
|
||||
readable.push('hello');
|
||||
readable.push(null);
|
||||
}
|
||||
});
|
||||
|
||||
readable.on('close', common.mustCall());
|
||||
readable.on('end', common.mustNotCall());
|
||||
readable.on('pause', common.mustCall());
|
||||
readable.on('resume', common.mustNotCall());
|
||||
readable.on('error', common.mustCall((error) => {
|
||||
assert.strictEqual(error.code, 'ABORT_ERR');
|
||||
}));
|
||||
|
||||
const readableStream = newReadableStreamFromStreamReadable(readable);
|
||||
|
||||
readableStream.cancel().then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Prematurely destroying the stream.Readable without an error
|
||||
// closes the ReadableStream with a premature close error but does
|
||||
// not error the readable.
|
||||
|
||||
const readable = new Readable({
|
||||
read() {
|
||||
readable.push('hello');
|
||||
readable.push(null);
|
||||
}
|
||||
});
|
||||
|
||||
const readableStream = newReadableStreamFromStreamReadable(readable);
|
||||
|
||||
assert(!readableStream.locked);
|
||||
|
||||
const reader = readableStream.getReader();
|
||||
|
||||
assert.rejects(reader.closed, {
|
||||
code: 'ERR_STREAM_PREMATURE_CLOSE',
|
||||
});
|
||||
|
||||
readable.on('end', common.mustNotCall());
|
||||
readable.on('error', common.mustNotCall());
|
||||
|
||||
readable.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(readableStream[kState].state, 'errored');
|
||||
}));
|
||||
|
||||
readable.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
// Ending the readable without an error just closes the
|
||||
// readableStream without an error.
|
||||
const readable = new Readable({
|
||||
read() {
|
||||
readable.push('hello');
|
||||
readable.push(null);
|
||||
}
|
||||
});
|
||||
|
||||
const readableStream = newReadableStreamFromStreamReadable(readable);
|
||||
|
||||
assert(!readableStream.locked);
|
||||
|
||||
const reader = readableStream.getReader();
|
||||
|
||||
reader.closed.then(common.mustCall());
|
||||
|
||||
readable.on('end', common.mustCall());
|
||||
readable.on('error', common.mustNotCall());
|
||||
|
||||
readable.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(readableStream[kState].state, 'closed');
|
||||
}));
|
||||
|
||||
readable.push(null);
|
||||
}
|
||||
|
||||
{
|
||||
// Destroying the readable with an error should error the readableStream
|
||||
const error = new Error('boom');
|
||||
const readable = new Readable({
|
||||
read() {
|
||||
readable.push('hello');
|
||||
readable.push(null);
|
||||
}
|
||||
});
|
||||
|
||||
const readableStream = newReadableStreamFromStreamReadable(readable);
|
||||
|
||||
assert(!readableStream.locked);
|
||||
|
||||
const reader = readableStream.getReader();
|
||||
|
||||
assert.rejects(reader.closed, error);
|
||||
|
||||
readable.on('end', common.mustNotCall());
|
||||
readable.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(reason, error);
|
||||
}));
|
||||
|
||||
readable.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(readableStream[kState].state, 'errored');
|
||||
}));
|
||||
|
||||
readable.destroy(error);
|
||||
}
|
||||
|
||||
{
|
||||
const readable = new Readable({
|
||||
encoding: 'utf8',
|
||||
read() {
|
||||
readable.push('hello');
|
||||
readable.push(null);
|
||||
}
|
||||
});
|
||||
|
||||
const readableStream = newReadableStreamFromStreamReadable(readable);
|
||||
const reader = readableStream.getReader();
|
||||
|
||||
readable.on('data', common.mustCall());
|
||||
readable.on('end', common.mustCall());
|
||||
readable.on('close', common.mustCall());
|
||||
|
||||
(async () => {
|
||||
assert.deepStrictEqual(
|
||||
await reader.read(),
|
||||
{ value: 'hello', done: false });
|
||||
assert.deepStrictEqual(
|
||||
await reader.read(),
|
||||
{ value: undefined, done: true });
|
||||
|
||||
})().then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const data = {};
|
||||
const readable = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
readable.push(data);
|
||||
readable.push(null);
|
||||
}
|
||||
});
|
||||
|
||||
assert(readable.readableObjectMode);
|
||||
|
||||
const readableStream = newReadableStreamFromStreamReadable(readable);
|
||||
const reader = readableStream.getReader();
|
||||
|
||||
readable.on('data', common.mustCall());
|
||||
readable.on('end', common.mustCall());
|
||||
readable.on('close', common.mustCall());
|
||||
|
||||
(async () => {
|
||||
assert.deepStrictEqual(
|
||||
await reader.read(),
|
||||
{ value: data, done: false });
|
||||
assert.deepStrictEqual(
|
||||
await reader.read(),
|
||||
{ value: undefined, done: true });
|
||||
|
||||
})().then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const readable = new Readable();
|
||||
readable.destroy();
|
||||
const readableStream = newReadableStreamFromStreamReadable(readable);
|
||||
const reader = readableStream.getReader();
|
||||
reader.closed.then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new Duplex({ readable: false });
|
||||
duplex.destroy();
|
||||
const readableStream = newReadableStreamFromStreamReadable(duplex);
|
||||
const reader = readableStream.getReader();
|
||||
reader.closed.then(common.mustCall());
|
||||
}
|
|
@ -0,0 +1,250 @@
|
|||
// Flags: --no-warnings --expose-internals
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const {
|
||||
newReadableWritablePairFromDuplex,
|
||||
} = require('internal/webstreams/adapters');
|
||||
|
||||
const {
|
||||
PassThrough,
|
||||
} = require('stream');
|
||||
|
||||
{
|
||||
// Destroying the duplex without an error should close
|
||||
// the readable and error the writable.
|
||||
|
||||
const duplex = new PassThrough();
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
|
||||
assert.rejects(reader.closed, {
|
||||
code: 'ERR_STREAM_PREMATURE_CLOSE',
|
||||
});
|
||||
|
||||
assert.rejects(writer.closed, {
|
||||
code: 'ERR_STREAM_PREMATURE_CLOSE',
|
||||
});
|
||||
|
||||
duplex.destroy();
|
||||
|
||||
duplex.on('close', common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Destroying the duplex with an error should error
|
||||
// both the readable and writable
|
||||
|
||||
const error = new Error('boom');
|
||||
const duplex = new PassThrough();
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
|
||||
duplex.on('close', common.mustCall());
|
||||
duplex.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(reason, error);
|
||||
}));
|
||||
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
|
||||
assert.rejects(reader.closed, error);
|
||||
assert.rejects(writer.closed, error);
|
||||
|
||||
duplex.destroy(error);
|
||||
}
|
||||
|
||||
{
|
||||
const error = new Error('boom');
|
||||
const duplex = new PassThrough();
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
|
||||
duplex.on('close', common.mustCall());
|
||||
duplex.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(reason, error);
|
||||
}));
|
||||
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
|
||||
reader.closed.then(common.mustCall());
|
||||
assert.rejects(writer.closed, error);
|
||||
|
||||
reader.cancel(error).then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new PassThrough();
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
|
||||
duplex.on('close', common.mustCall());
|
||||
duplex.on('error', common.mustNotCall());
|
||||
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
|
||||
reader.closed.then(common.mustCall());
|
||||
writer.closed.then(common.mustCall());
|
||||
|
||||
writer.close().then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const error = new Error('boom');
|
||||
const duplex = new PassThrough();
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
|
||||
duplex.on('close', common.mustCall());
|
||||
duplex.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(reason, error);
|
||||
}));
|
||||
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
|
||||
assert.rejects(reader.closed, error);
|
||||
assert.rejects(writer.closed, error);
|
||||
|
||||
writer.abort(error).then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new PassThrough();
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
|
||||
duplex.on('close', common.mustCall());
|
||||
|
||||
duplex.on('error', common.mustCall((error) => {
|
||||
assert.strictEqual(error.code, 'ABORT_ERR');
|
||||
}));
|
||||
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
|
||||
assert.rejects(writer.closed, {
|
||||
code: 'ABORT_ERR',
|
||||
});
|
||||
|
||||
reader.cancel();
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new PassThrough();
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
|
||||
duplex.on('close', common.mustCall());
|
||||
duplex.on('error', common.mustNotCall());
|
||||
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
|
||||
reader.closed.then(common.mustCall());
|
||||
assert.rejects(writer.closed, {
|
||||
code: 'ERR_STREAM_PREMATURE_CLOSE',
|
||||
});
|
||||
|
||||
duplex.end();
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new PassThrough();
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
|
||||
duplex.on('data', common.mustCall(2));
|
||||
duplex.on('close', common.mustCall());
|
||||
duplex.on('end', common.mustCall());
|
||||
duplex.on('finish', common.mustCall());
|
||||
|
||||
const writer = writable.getWriter();
|
||||
const reader = readable.getReader();
|
||||
|
||||
const ec = new TextEncoder();
|
||||
const dc = new TextDecoder();
|
||||
|
||||
Promise.all([
|
||||
writer.write(ec.encode('hello')),
|
||||
reader.read().then(common.mustCall(({ done, value }) => {
|
||||
assert(!done);
|
||||
assert.strictEqual(dc.decode(value), 'hello');
|
||||
})),
|
||||
reader.read().then(common.mustCall(({ done, value }) => {
|
||||
assert(!done);
|
||||
assert.strictEqual(dc.decode(value), 'there');
|
||||
})),
|
||||
writer.write(ec.encode('there')),
|
||||
writer.close(),
|
||||
reader.read().then(common.mustCall(({ done, value }) => {
|
||||
assert(done);
|
||||
assert.strictEqual(value, undefined);
|
||||
})),
|
||||
]).then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new PassThrough();
|
||||
duplex.destroy();
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
reader.closed.then(common.mustCall());
|
||||
writer.closed.then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new PassThrough({ writable: false });
|
||||
assert(duplex.readable);
|
||||
assert(!duplex.writable);
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
writer.closed.then(common.mustCall());
|
||||
reader.cancel().then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new PassThrough({ readable: false });
|
||||
assert(!duplex.readable);
|
||||
assert(duplex.writable);
|
||||
const {
|
||||
readable,
|
||||
writable,
|
||||
} = newReadableWritablePairFromDuplex(duplex);
|
||||
const reader = readable.getReader();
|
||||
const writer = writable.getWriter();
|
||||
reader.closed.then(common.mustCall());
|
||||
writer.close().then(common.mustCall());
|
||||
}
|
149
test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js
Normal file
149
test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js
Normal file
|
@ -0,0 +1,149 @@
|
|||
// Flags: --no-warnings --expose-internals
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const {
|
||||
TransformStream,
|
||||
} = require('stream/web');
|
||||
|
||||
const {
|
||||
newStreamDuplexFromReadableWritablePair,
|
||||
} = require('internal/webstreams/adapters');
|
||||
|
||||
const {
|
||||
finished,
|
||||
pipeline,
|
||||
Readable,
|
||||
Writable,
|
||||
} = require('stream');
|
||||
|
||||
const {
|
||||
kState,
|
||||
} = require('internal/webstreams/util');
|
||||
|
||||
{
|
||||
const transform = new TransformStream();
|
||||
const duplex = newStreamDuplexFromReadableWritablePair(transform);
|
||||
|
||||
assert(transform.readable.locked);
|
||||
assert(transform.writable.locked);
|
||||
|
||||
duplex.destroy();
|
||||
|
||||
duplex.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(transform.readable[kState].state, 'closed');
|
||||
assert.strictEqual(transform.writable[kState].state, 'errored');
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const error = new Error('boom');
|
||||
const transform = new TransformStream();
|
||||
const duplex = newStreamDuplexFromReadableWritablePair(transform);
|
||||
|
||||
assert(transform.readable.locked);
|
||||
assert(transform.writable.locked);
|
||||
|
||||
duplex.destroy(error);
|
||||
duplex.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(reason, error);
|
||||
}));
|
||||
|
||||
duplex.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(transform.readable[kState].state, 'closed');
|
||||
assert.strictEqual(transform.writable[kState].state, 'errored');
|
||||
assert.strictEqual(transform.writable[kState].storedError, error);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const transform = new TransformStream();
|
||||
const duplex = new newStreamDuplexFromReadableWritablePair(transform);
|
||||
|
||||
duplex.end();
|
||||
duplex.resume();
|
||||
|
||||
duplex.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(transform.readable[kState].state, 'closed');
|
||||
assert.strictEqual(transform.writable[kState].state, 'closed');
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const ec = new TextEncoder();
|
||||
const dc = new TextDecoder();
|
||||
const transform = new TransformStream({
|
||||
transform(chunk, controller) {
|
||||
const text = dc.decode(chunk);
|
||||
controller.enqueue(ec.encode(text.toUpperCase()));
|
||||
}
|
||||
});
|
||||
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
|
||||
encoding: 'utf8',
|
||||
});
|
||||
|
||||
duplex.end('hello');
|
||||
duplex.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk, 'HELLO');
|
||||
}));
|
||||
duplex.on('end', common.mustCall());
|
||||
|
||||
duplex.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(transform.readable[kState].state, 'closed');
|
||||
assert.strictEqual(transform.writable[kState].state, 'closed');
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const ec = new TextEncoder();
|
||||
const dc = new TextDecoder();
|
||||
const transform = new TransformStream({
|
||||
transform: common.mustCall((chunk, controller) => {
|
||||
const text = dc.decode(chunk);
|
||||
controller.enqueue(ec.encode(text.toUpperCase()));
|
||||
})
|
||||
});
|
||||
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
|
||||
encoding: 'utf8',
|
||||
});
|
||||
|
||||
finished(duplex, common.mustCall());
|
||||
|
||||
duplex.end('hello');
|
||||
duplex.resume();
|
||||
}
|
||||
|
||||
{
|
||||
const ec = new TextEncoder();
|
||||
const dc = new TextDecoder();
|
||||
const transform = new TransformStream({
|
||||
transform: common.mustCall((chunk, controller) => {
|
||||
const text = dc.decode(chunk);
|
||||
controller.enqueue(ec.encode(text.toUpperCase()));
|
||||
})
|
||||
});
|
||||
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
|
||||
encoding: 'utf8',
|
||||
});
|
||||
|
||||
const readable = new Readable({
|
||||
read() {
|
||||
readable.push(Buffer.from('hello'));
|
||||
readable.push(null);
|
||||
}
|
||||
});
|
||||
|
||||
const writable = new Writable({
|
||||
write: common.mustCall((chunk, encoding, callback) => {
|
||||
assert.strictEqual(dc.decode(chunk), 'HELLO');
|
||||
assert.strictEqual(encoding, 'buffer');
|
||||
callback();
|
||||
})
|
||||
});
|
||||
|
||||
finished(duplex, common.mustCall());
|
||||
pipeline(readable, duplex, writable, common.mustCall());
|
||||
}
|
|
@ -0,0 +1,229 @@
|
|||
// Flags: --expose-internals --no-warnings
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const {
|
||||
pipeline,
|
||||
finished,
|
||||
Writable,
|
||||
} = require('stream');
|
||||
|
||||
const {
|
||||
ReadableStream,
|
||||
WritableStream,
|
||||
} = require('stream/web');
|
||||
|
||||
const {
|
||||
newStreamReadableFromReadableStream,
|
||||
} = require('internal/webstreams/adapters');
|
||||
|
||||
const {
|
||||
kState,
|
||||
} = require('internal/webstreams/util');
|
||||
|
||||
class MySource {
|
||||
constructor(value = new Uint8Array(10)) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
start(c) {
|
||||
this.started = true;
|
||||
this.controller = c;
|
||||
}
|
||||
|
||||
pull(controller) {
|
||||
controller.enqueue(this.value);
|
||||
controller.close();
|
||||
}
|
||||
|
||||
cancel(reason) {
|
||||
this.canceled = true;
|
||||
this.cancelReason = reason;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Destroying the readable without an error closes
|
||||
// the readableStream.
|
||||
|
||||
const readableStream = new ReadableStream();
|
||||
const readable = newStreamReadableFromReadableStream(readableStream);
|
||||
|
||||
assert(readableStream.locked);
|
||||
|
||||
assert.rejects(readableStream.cancel(), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
assert.rejects(readableStream.pipeTo(new WritableStream()), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
assert.throws(() => readableStream.tee(), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
assert.throws(() => readableStream.getReader(), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
assert.throws(() => {
|
||||
readableStream.pipeThrough({
|
||||
readable: new ReadableStream(),
|
||||
writable: new WritableStream(),
|
||||
});
|
||||
}, {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
|
||||
readable.destroy();
|
||||
|
||||
readable.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(readableStream[kState].state, 'closed');
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Destroying the readable with an error closes the readableStream
|
||||
// without error but records the cancel reason in the source.
|
||||
const error = new Error('boom');
|
||||
const source = new MySource();
|
||||
const readableStream = new ReadableStream(source);
|
||||
const readable = newStreamReadableFromReadableStream(readableStream);
|
||||
|
||||
assert(readableStream.locked);
|
||||
|
||||
readable.destroy(error);
|
||||
|
||||
readable.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(reason, error);
|
||||
}));
|
||||
|
||||
readable.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(readableStream[kState].state, 'closed');
|
||||
assert.strictEqual(source.cancelReason, error);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// An error in the source causes the readable to error.
|
||||
const error = new Error('boom');
|
||||
const source = new MySource();
|
||||
const readableStream = new ReadableStream(source);
|
||||
const readable = newStreamReadableFromReadableStream(readableStream);
|
||||
|
||||
assert(readableStream.locked);
|
||||
|
||||
source.controller.error(error);
|
||||
|
||||
readable.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(reason, error);
|
||||
}));
|
||||
|
||||
readable.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(readableStream[kState].state, 'errored');
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const readableStream = new ReadableStream(new MySource());
|
||||
const readable = newStreamReadableFromReadableStream(readableStream);
|
||||
|
||||
readable.on('data', common.mustCall((chunk) => {
|
||||
assert.deepStrictEqual(chunk, Buffer.alloc(10));
|
||||
}));
|
||||
readable.on('end', common.mustCall());
|
||||
readable.on('close', common.mustCall());
|
||||
readable.on('error', common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
const readableStream = new ReadableStream(new MySource('hello'));
|
||||
const readable = newStreamReadableFromReadableStream(readableStream, {
|
||||
encoding: 'utf8',
|
||||
});
|
||||
|
||||
readable.on('data', common.mustCall((chunk) => {
|
||||
assert.deepStrictEqual(chunk, 'hello');
|
||||
}));
|
||||
readable.on('end', common.mustCall());
|
||||
readable.on('close', common.mustCall());
|
||||
readable.on('error', common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
const readableStream = new ReadableStream(new MySource());
|
||||
const readable = newStreamReadableFromReadableStream(readableStream, {
|
||||
objectMode: true
|
||||
});
|
||||
|
||||
readable.on('data', common.mustCall((chunk) => {
|
||||
assert.deepStrictEqual(chunk, new Uint8Array(10));
|
||||
}));
|
||||
readable.on('end', common.mustCall());
|
||||
readable.on('close', common.mustCall());
|
||||
readable.on('error', common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
const ec = new TextEncoder();
|
||||
const readable = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(ec.encode('hello'));
|
||||
setImmediate(() => {
|
||||
controller.enqueue(ec.encode('there'));
|
||||
controller.close();
|
||||
});
|
||||
}
|
||||
});
|
||||
const streamReadable = newStreamReadableFromReadableStream(readable);
|
||||
|
||||
finished(streamReadable, common.mustCall());
|
||||
|
||||
streamReadable.resume();
|
||||
}
|
||||
|
||||
{
|
||||
const ec = new TextEncoder();
|
||||
const readable = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(ec.encode('hello'));
|
||||
setImmediate(() => {
|
||||
controller.enqueue(ec.encode('there'));
|
||||
controller.close();
|
||||
});
|
||||
}
|
||||
});
|
||||
const streamReadable = newStreamReadableFromReadableStream(readable);
|
||||
|
||||
finished(streamReadable, common.mustCall());
|
||||
|
||||
streamReadable.resume();
|
||||
}
|
||||
|
||||
{
|
||||
const ec = new TextEncoder();
|
||||
const dc = new TextDecoder();
|
||||
const check = ['hello', 'there'];
|
||||
const readable = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(ec.encode('hello'));
|
||||
setImmediate(() => {
|
||||
controller.enqueue(ec.encode('there'));
|
||||
controller.close();
|
||||
});
|
||||
}
|
||||
});
|
||||
const writable = new Writable({
|
||||
write: common.mustCall((chunk, encoding, callback) => {
|
||||
assert.strictEqual(dc.decode(chunk), check.shift());
|
||||
assert.strictEqual(encoding, 'buffer');
|
||||
callback();
|
||||
}, 2),
|
||||
});
|
||||
|
||||
const streamReadable = newStreamReadableFromReadableStream(readable);
|
||||
|
||||
pipeline(streamReadable, writable, common.mustCall());
|
||||
|
||||
streamReadable.resume();
|
||||
}
|
|
@ -0,0 +1,231 @@
|
|||
// Flags: --no-warnings --expose-internals
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const {
|
||||
WritableStream,
|
||||
} = require('stream/web');
|
||||
|
||||
const {
|
||||
newStreamWritableFromWritableStream,
|
||||
} = require('internal/webstreams/adapters');
|
||||
|
||||
const {
|
||||
finished,
|
||||
pipeline,
|
||||
Readable,
|
||||
} = require('stream');
|
||||
|
||||
const {
|
||||
kState,
|
||||
} = require('internal/webstreams/util');
|
||||
|
||||
class TestSource {
|
||||
constructor() {
|
||||
this.chunks = [];
|
||||
}
|
||||
|
||||
start(c) {
|
||||
this.controller = c;
|
||||
this.started = true;
|
||||
}
|
||||
|
||||
write(chunk) {
|
||||
this.chunks.push(chunk);
|
||||
}
|
||||
|
||||
close() {
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
abort(reason) {
|
||||
this.abortReason = reason;
|
||||
}
|
||||
}
|
||||
|
||||
[1, {}, false, []].forEach((arg) => {
|
||||
assert.throws(() => newStreamWritableFromWritableStream(arg), {
|
||||
code: 'ERR_INVALID_ARG_TYPE',
|
||||
});
|
||||
});
|
||||
|
||||
{
|
||||
// Ending the stream.Writable should close the writableStream
|
||||
const source = new TestSource();
|
||||
const writableStream = new WritableStream(source);
|
||||
const writable = newStreamWritableFromWritableStream(writableStream);
|
||||
|
||||
assert(writableStream.locked);
|
||||
|
||||
writable.end('chunk');
|
||||
|
||||
writable.on('close', common.mustCall(() => {
|
||||
assert(writableStream.locked);
|
||||
assert.strictEqual(writableStream[kState].state, 'closed');
|
||||
assert.strictEqual(source.chunks.length, 1);
|
||||
assert.deepStrictEqual(source.chunks[0], Buffer.from('chunk'));
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Destroying the stream.Writable without an error should close
|
||||
// the writableStream with no error.
|
||||
const source = new TestSource();
|
||||
const writableStream = new WritableStream(source);
|
||||
const writable = newStreamWritableFromWritableStream(writableStream);
|
||||
|
||||
assert(writableStream.locked);
|
||||
|
||||
writable.destroy();
|
||||
|
||||
writable.on('close', common.mustCall(() => {
|
||||
assert(writableStream.locked);
|
||||
assert.strictEqual(writableStream[kState].state, 'closed');
|
||||
assert.strictEqual(source.chunks.length, 0);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Destroying the stream.Writable with an error should error
|
||||
// the writableStream
|
||||
const error = new Error('boom');
|
||||
const source = new TestSource();
|
||||
const writableStream = new WritableStream(source);
|
||||
const writable = newStreamWritableFromWritableStream(writableStream);
|
||||
|
||||
assert(writableStream.locked);
|
||||
|
||||
writable.destroy(error);
|
||||
|
||||
writable.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(reason, error);
|
||||
}));
|
||||
|
||||
writable.on('close', common.mustCall(() => {
|
||||
assert(writableStream.locked);
|
||||
assert.strictEqual(writableStream[kState].state, 'errored');
|
||||
assert.strictEqual(writableStream[kState].storedError, error);
|
||||
assert.strictEqual(source.chunks.length, 0);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Attempting to close, abort, or getWriter on writableStream
|
||||
// should fail because it is locked. An internal error in
|
||||
// writableStream should error the writable.
|
||||
const error = new Error('boom');
|
||||
const source = new TestSource();
|
||||
const writableStream = new WritableStream(source);
|
||||
const writable = newStreamWritableFromWritableStream(writableStream);
|
||||
|
||||
assert(writableStream.locked);
|
||||
|
||||
assert.rejects(writableStream.close(), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
|
||||
assert.rejects(writableStream.abort(), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
|
||||
assert.throws(() => writableStream.getWriter(), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
|
||||
writable.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(error, reason);
|
||||
}));
|
||||
|
||||
source.controller.error(error);
|
||||
}
|
||||
|
||||
{
|
||||
const source = new TestSource();
|
||||
const writableStream = new WritableStream(source);
|
||||
const writable = newStreamWritableFromWritableStream(writableStream);
|
||||
|
||||
writable.on('error', common.mustNotCall());
|
||||
writable.on('finish', common.mustCall());
|
||||
writable.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(source.chunks.length, 1);
|
||||
assert.deepStrictEqual(source.chunks[0], Buffer.from('hello'));
|
||||
}));
|
||||
|
||||
writable.write('hello', common.mustCall());
|
||||
writable.end();
|
||||
}
|
||||
|
||||
{
|
||||
const source = new TestSource();
|
||||
const writableStream = new WritableStream(source);
|
||||
const writable =
|
||||
newStreamWritableFromWritableStream(writableStream, {
|
||||
decodeStrings: false,
|
||||
});
|
||||
|
||||
writable.on('error', common.mustNotCall());
|
||||
writable.on('finish', common.mustCall());
|
||||
writable.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(source.chunks.length, 1);
|
||||
assert.deepStrictEqual(source.chunks[0], 'hello');
|
||||
}));
|
||||
|
||||
writable.write('hello', common.mustCall());
|
||||
writable.end();
|
||||
}
|
||||
|
||||
{
|
||||
const source = new TestSource();
|
||||
const writableStream = new WritableStream(source);
|
||||
const writable =
|
||||
newStreamWritableFromWritableStream(
|
||||
writableStream, {
|
||||
objectMode: true
|
||||
});
|
||||
assert(writable.writableObjectMode);
|
||||
|
||||
writable.on('error', common.mustNotCall());
|
||||
writable.on('finish', common.mustCall());
|
||||
writable.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(source.chunks.length, 1);
|
||||
assert.strictEqual(source.chunks[0], 'hello');
|
||||
}));
|
||||
|
||||
writable.write('hello', common.mustCall());
|
||||
writable.end();
|
||||
}
|
||||
|
||||
{
|
||||
const writableStream = new WritableStream({
|
||||
write: common.mustCall(2),
|
||||
close: common.mustCall(),
|
||||
});
|
||||
const writable = newStreamWritableFromWritableStream(writableStream);
|
||||
|
||||
finished(writable, common.mustCall());
|
||||
|
||||
writable.write('hello');
|
||||
writable.write('world');
|
||||
writable.end();
|
||||
}
|
||||
|
||||
{
|
||||
const writableStream = new WritableStream({
|
||||
write: common.mustCall(2),
|
||||
close: common.mustCall(),
|
||||
});
|
||||
const writable = newStreamWritableFromWritableStream(writableStream);
|
||||
|
||||
const readable = new Readable({
|
||||
read() {
|
||||
readable.push(Buffer.from('hello'));
|
||||
readable.push(Buffer.from('world'));
|
||||
readable.push(null);
|
||||
}
|
||||
});
|
||||
|
||||
pipeline(readable, writable, common.mustCall());
|
||||
}
|
|
@ -0,0 +1,167 @@
|
|||
// Flags: --no-warnings --expose-internals
|
||||
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const {
|
||||
newWritableStreamFromStreamWritable,
|
||||
} = require('internal/webstreams/adapters');
|
||||
|
||||
const {
|
||||
Duplex,
|
||||
Writable,
|
||||
PassThrough,
|
||||
} = require('stream');
|
||||
|
||||
class TestWritable extends Writable {
|
||||
constructor(asyncWrite = false) {
|
||||
super();
|
||||
this.chunks = [];
|
||||
this.asyncWrite = asyncWrite;
|
||||
}
|
||||
|
||||
_write(chunk, encoding, callback) {
|
||||
this.chunks.push({ chunk, encoding });
|
||||
if (this.asyncWrite) {
|
||||
setImmediate(() => callback());
|
||||
return;
|
||||
}
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
[1, {}, false, []].forEach((arg) => {
|
||||
assert.throws(() => newWritableStreamFromStreamWritable(arg), {
|
||||
code: 'ERR_INVALID_ARG_TYPE',
|
||||
});
|
||||
});
|
||||
|
||||
{
|
||||
// Closing the WritableStream normally closes the stream.Writable
|
||||
// without errors.
|
||||
|
||||
const writable = new TestWritable();
|
||||
writable.on('error', common.mustNotCall());
|
||||
writable.on('finish', common.mustCall());
|
||||
writable.on('close', common.mustCall());
|
||||
|
||||
const writableStream = newWritableStreamFromStreamWritable(writable);
|
||||
|
||||
writableStream.close().then(common.mustCall(() => {
|
||||
assert(writable.destroyed);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Aborting the WritableStream errors the stream.Writable
|
||||
|
||||
const error = new Error('boom');
|
||||
const writable = new TestWritable();
|
||||
writable.on('error', common.mustCall((reason) => {
|
||||
assert.strictEqual(reason, error);
|
||||
}));
|
||||
writable.on('finish', common.mustNotCall());
|
||||
writable.on('close', common.mustCall());
|
||||
|
||||
const writableStream = newWritableStreamFromStreamWritable(writable);
|
||||
|
||||
writableStream.abort(error).then(common.mustCall(() => {
|
||||
assert(writable.destroyed);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Destroying the stream.Writable prematurely errors the
|
||||
// WritableStream
|
||||
|
||||
const error = new Error('boom');
|
||||
const writable = new TestWritable();
|
||||
|
||||
const writableStream = newWritableStreamFromStreamWritable(writable);
|
||||
assert.rejects(writableStream.close(), error);
|
||||
writable.destroy(error);
|
||||
}
|
||||
|
||||
{
|
||||
// Ending the stream.Writable directly errors the WritableStream
|
||||
const writable = new TestWritable();
|
||||
|
||||
const writableStream = newWritableStreamFromStreamWritable(writable);
|
||||
|
||||
assert.rejects(writableStream.close(), {
|
||||
code: 'ERR_STREAM_PREMATURE_CLOSE'
|
||||
});
|
||||
|
||||
writable.end();
|
||||
}
|
||||
|
||||
{
|
||||
const writable = new TestWritable();
|
||||
const writableStream = newWritableStreamFromStreamWritable(writable);
|
||||
const writer = writableStream.getWriter();
|
||||
const ec = new TextEncoder();
|
||||
writer.write(ec.encode('hello')).then(common.mustCall(() => {
|
||||
assert.strictEqual(writable.chunks.length, 1);
|
||||
assert.deepStrictEqual(
|
||||
writable.chunks[0],
|
||||
{
|
||||
chunk: Buffer.from('hello'),
|
||||
encoding: 'buffer'
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const writable = new TestWritable(true);
|
||||
|
||||
writable.on('error', common.mustNotCall());
|
||||
writable.on('close', common.mustCall());
|
||||
writable.on('finish', common.mustCall());
|
||||
|
||||
const writableStream = newWritableStreamFromStreamWritable(writable);
|
||||
const writer = writableStream.getWriter();
|
||||
const ec = new TextEncoder();
|
||||
writer.write(ec.encode('hello')).then(common.mustCall(() => {
|
||||
assert.strictEqual(writable.chunks.length, 1);
|
||||
assert.deepStrictEqual(
|
||||
writable.chunks[0],
|
||||
{
|
||||
chunk: Buffer.from('hello'),
|
||||
encoding: 'buffer'
|
||||
});
|
||||
writer.close().then(common.mustCall());
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new PassThrough();
|
||||
duplex.setEncoding('utf8');
|
||||
const writableStream = newWritableStreamFromStreamWritable(duplex);
|
||||
const ec = new TextEncoder();
|
||||
writableStream
|
||||
.getWriter()
|
||||
.write(ec.encode('hello'))
|
||||
.then(common.mustCall());
|
||||
|
||||
duplex.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk, 'hello');
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const writable = new Writable();
|
||||
writable.destroy();
|
||||
const writableStream = newWritableStreamFromStreamWritable(writable);
|
||||
const writer = writableStream.getWriter();
|
||||
writer.closed.then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new Duplex({ writable: false });
|
||||
const writableStream = newWritableStreamFromStreamWritable(duplex);
|
||||
const writer = writableStream.getWriter();
|
||||
writer.closed.then(common.mustCall());
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue