stream: implement streams to webstreams adapters

Experimental adapters for the webstreams API

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: https://github.com/nodejs/node/pull/39134
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
James M Snell 2021-06-23 22:24:19 -07:00
parent 09b57f7909
commit a99c230305
No known key found for this signature in database
GPG key ID: 7341B15C070877AC
13 changed files with 2358 additions and 0 deletions

View file

@ -1892,6 +1892,87 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
the strings or buffers be iterated to match the other streams semantics the strings or buffers be iterated to match the other streams semantics
for performance reasons. for performance reasons.
### `stream.Readable.fromWeb(readableStream[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `readableStream` {ReadableStream}
* `options` {Object}
* `encoding` {string}
* `highWaterMark` {number}
* `objectModel` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Readable}
### `stream.Readable.toWeb(streamReadable)`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `streamReadable` {stream.Readable}
* Returns: {ReadableStream}
### `stream.Writable.fromWeb(writableStream[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `writableStream` {WritableStream}
* `options` {Object}
* `decodeStrings` {boolean}
* `highWaterMark` {number}
* `objectMode` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Writable}
### `stream.Writable.toWeb(streamWritable)`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `streamWritable` {stream.Writable}
* Returns: {WritableStream}
### `stream.Duplex.fromWeb(pair[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `pair` {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}
* `options` {Object}
* `allowHalfOpen` {boolean}
* `decodeStrings` {boolean}
* `encoding` {string}
* `highWaterMark` {number}
* `objectMode` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Duplex}
### `stream.Duplex.toWeb(streamDuplex)`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `streamDuplex` {stream.Duplex}
* Returns: {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}
### `stream.addAbortSignal(signal, stream)` ### `stream.addAbortSignal(signal, stream)`
<!-- YAML <!-- YAML
added: v15.4.0 added: v15.4.0

View file

@ -788,6 +788,8 @@ module.exports = {
appendFile, appendFile,
readFile, readFile,
watch, watch,
kHandle,
}, },
FileHandle, FileHandle,

View file

@ -114,3 +114,22 @@ ObjectDefineProperties(Duplex.prototype, {
} }
} }
}); });
let webStreamsAdapters;
// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}
Duplex.fromWeb = function(pair, options) {
return lazyWebStreams().newStreamDuplexFromReadableWritablePair(
pair,
options);
};
Duplex.toWeb = function(duplex) {
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
};

View file

@ -1355,3 +1355,22 @@ function endWritableNT(state, stream) {
Readable.from = function(iterable, opts) { Readable.from = function(iterable, opts) {
return from(Readable, iterable, opts); return from(Readable, iterable, opts);
}; };
let webStreamsAdapters;
// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}
Readable.fromWeb = function(readableStream, options) {
return lazyWebStreams().newStreamReadableFromReadableStream(
readableStream,
options);
};
Readable.toWeb = function(streamReadable) {
return lazyWebStreams().newStreamReadableFromReadableStream(streamReadable);
};

View file

@ -872,3 +872,22 @@ Writable.prototype._destroy = function(err, cb) {
Writable.prototype[EE.captureRejectionSymbol] = function(err) { Writable.prototype[EE.captureRejectionSymbol] = function(err) {
this.destroy(err); this.destroy(err);
}; };
let webStreamsAdapters;
// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}
Writable.fromWeb = function(writableStream, options) {
return lazyWebStreams().newStreamWritableFromWritableStream(
writableStream,
options);
};
Writable.toWeb = function(streamWritable) {
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
};

View file

@ -0,0 +1,921 @@
'use strict';
const {
ArrayPrototypeMap,
PromiseAll,
PromisePrototypeThen,
PromisePrototypeFinally,
PromiseResolve,
Uint8Array,
} = primordials;
const {
ReadableStream,
isReadableStream,
} = require('internal/webstreams/readablestream');
const {
WritableStream,
isWritableStream,
} = require('internal/webstreams/writablestream');
const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');
const {
Writable,
Readable,
Duplex,
destroy,
} = require('stream');
const {
isDestroyed,
isReadable,
isReadableEnded,
isWritable,
isWritableEnded,
} = require('internal/streams/utils');
const {
Buffer,
} = require('buffer');
const {
errnoException,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_STATE,
ERR_STREAM_PREMATURE_CLOSE,
},
} = require('internal/errors');
const {
createDeferredPromise,
} = require('internal/util');
const {
validateBoolean,
validateObject,
} = require('internal/validators');
const {
WriteWrap,
ShutdownWrap,
kReadBytesOrError,
kLastWriteWasAsync,
streamBaseState,
} = internalBinding('stream_wrap');
const finished = require('internal/streams/end-of-stream');
const { UV_EOF } = internalBinding('uv');
/**
* @typedef {import('../../stream').Writable} Writable
* @typedef {import('../../stream').Readable} Readable
* @typedef {import('./writablestream').WritableStream} WritableStream
* @typedef {import('./readablestream').ReadableStream} ReadableStream
*
* @typedef {import('../abort_controller').AbortSignal} AbortSignal
*/
/**
* @param {Writable} streamWritable
* @returns {WritableStream}
*/
function newWritableStreamFromStreamWritable(streamWritable) {
// Not using the internal/streams/utils isWritableNodeStream utility
// here because it will return false if streamWritable is a Duplex
// whose writable option is false. For a Duplex that is not writable,
// we want it to pass this check but return a closed WritableStream.
if (typeof streamWritable?._writableState !== 'object') {
throw new ERR_INVALID_ARG_TYPE(
'streamWritable',
'stream.Writable',
streamWritable);
}
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
const writable = new WritableStream();
writable.close();
return writable;
}
const highWaterMark = streamWritable.writableHighWaterMark;
const strategy =
streamWritable.writableObjectMode ?
new CountQueuingStrategy({ highWaterMark }) :
{ highWaterMark };
let controller;
let backpressurePromise;
let closed;
function onDrain() {
if (backpressurePromise !== undefined)
backpressurePromise.resolve();
}
const cleanup = finished(streamWritable, (error) => {
cleanup();
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamWritable.on('error', () => {});
if (error != null) {
if (backpressurePromise !== undefined)
backpressurePromise.reject(error);
// If closed is not undefined, the error is happening
// after the WritableStream close has already started.
// We need to reject it here.
if (closed !== undefined) {
closed.reject(error);
closed = undefined;
}
controller.error(error);
controller = undefined;
return;
}
if (closed !== undefined) {
closed.resolve();
closed = undefined;
return;
}
controller.error(new ERR_STREAM_PREMATURE_CLOSE());
controller = undefined;
});
streamWritable.on('drain', onDrain);
return new WritableStream({
start(c) { controller = c; },
async write(chunk) {
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = createDeferredPromise();
return PromisePrototypeFinally(
backpressurePromise.promise, () => {
backpressurePromise = undefined;
});
}
},
abort(reason) {
destroy(streamWritable, reason);
},
close() {
if (closed === undefined && !isWritableEnded(streamWritable)) {
closed = createDeferredPromise();
streamWritable.end();
return closed.promise;
}
controller = undefined;
return PromiseResolve();
},
}, strategy);
}
/**
* @param {WritableStream} writableStream
* @param {{
* decodeStrings? : boolean,
* highWaterMark? : number,
* objectMode? : boolean,
* signal? : AbortSignal,
* }} [options]
* @returns {Writable}
*/
function newStreamWritableFromWritableStream(writableStream, options = {}) {
if (!isWritableStream(writableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'writableStream',
'WritableStream',
writableStream);
}
validateObject(options, 'options');
const {
highWaterMark,
decodeStrings = true,
objectMode = false,
signal,
} = options;
validateBoolean(objectMode, 'options.objectMode');
validateBoolean(decodeStrings, 'options.decodeStrings');
const writer = writableStream.getWriter();
let closed = false;
const writable = new Writable({
highWaterMark,
objectMode,
decodeStrings,
signal,
writev(chunks, callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(writable, error));
}
}
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(
PromiseAll(
ArrayPrototypeMap(
chunks,
(chunk) => writer.write(chunk))),
done,
done);
},
done);
},
write(chunk, encoding, callback) {
if (typeof chunk === 'string' && decodeStrings && !objectMode) {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength);
}
function done(error) {
try {
callback(error);
} catch (error) {
destroy(writable, error);
}
}
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(
writer.write(chunk),
done,
done);
},
done);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => { throw error; });
}
}
if (!closed) {
if (error != null) {
PromisePrototypeThen(
writer.abort(error),
done,
done);
} else {
PromisePrototypeThen(
writer.close(),
done,
done);
}
return;
}
done();
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(writable, error));
}
}
if (!closed) {
PromisePrototypeThen(
writer.close(),
done,
done);
}
},
});
PromisePrototypeThen(
writer.closed,
() => {
// If the WritableStream closes before the stream.Writable has been
// ended, we signal an error on the stream.Writable.
closed = true;
if (!isWritableEnded(writable))
destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
},
(error) => {
// If the WritableStream errors before the stream.Writable has been
// destroyed, signal an error on the stream.Writable.
closed = true;
destroy(writable, error);
});
return writable;
}
/**
* @param {Readable} streamReadable
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamReadable(streamReadable) {
// Not using the internal/streams/utils isReadableNodeStream utility
// here because it will return false if streamReadable is a Duplex
// whose readable option is false. For a Duplex that is not readable,
// we want it to pass this check but return a closed ReadableStream.
if (typeof streamReadable?._readableState !== 'object') {
throw new ERR_INVALID_ARG_TYPE(
'streamReadable',
'stream.Readable',
streamReadable);
}
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
const readable = new ReadableStream();
readable.cancel();
return readable;
}
const objectMode = streamReadable.readableObjectMode;
const highWaterMark = streamReadable.readableHighWaterMark;
// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
const strategy =
objectMode ?
new CountQueuingStrategy({ highWaterMark }) :
{ highWaterMark };
let controller;
function onData(chunk) {
// Copy the Buffer to detach it from the pool.
if (Buffer.isBuffer(chunk) && !objectMode)
chunk = new Uint8Array(chunk);
controller.enqueue(chunk);
if (controller.desiredSize <= 0)
streamReadable.pause();
}
streamReadable.pause();
const cleanup = finished(streamReadable, (error) => {
cleanup();
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamReadable.on('error', () => {});
if (error)
return controller.error(error);
controller.close();
});
streamReadable.on('data', onData);
return new ReadableStream({
start(c) { controller = c; },
pull() { streamReadable.resume(); },
cancel(reason) {
destroy(streamReadable, reason);
},
}, strategy);
}
/**
* @param {ReadableStream} readableStream
* @param {{
* highWaterMark? : number,
* encoding? : string,
* objectMode? : boolean,
* signal? : AbortSignal,
* }} [options]
* @returns {Readable}
*/
function newStreamReadableFromReadableStream(readableStream, options = {}) {
if (!isReadableStream(readableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'readableStream',
'ReadableStream',
readableStream);
}
validateObject(options, 'options');
const {
highWaterMark,
encoding,
objectMode = false,
signal,
} = options;
if (encoding !== undefined && !Buffer.isEncoding(encoding))
throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
validateBoolean(objectMode, 'options.objectMode');
const reader = readableStream.getReader();
let closed = false;
const readable = new Readable({
objectMode,
highWaterMark,
encoding,
signal,
read() {
PromisePrototypeThen(
reader.read(),
(chunk) => {
if (chunk.done) {
// Value should always be undefined here.
readable.push(null);
} else {
readable.push(chunk.value);
}
},
(error) => destroy(readable, error));
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => { throw error; });
}
}
if (!closed) {
PromisePrototypeThen(
reader.cancel(error),
done,
done);
return;
}
done(error);
},
});
PromisePrototypeThen(
reader.closed,
() => {
closed = true;
if (!isReadableEnded(readable))
readable.push(null);
},
(error) => {
closed = true;
destroy(readable, error);
});
return readable;
}
/**
* @typedef {import('./readablestream').ReadableWritablePair
* } ReadableWritablePair
* @typedef {import('../../stream').Duplex} Duplex
*
* @param {Duplex} duplex
* @returns {ReadableWritablePair}
*/
function newReadableWritablePairFromDuplex(duplex) {
// Not using the internal/streams/utils isWritableNodeStream and
// isReadableNodestream utilities here because they will return false
// if the duplex was created with writable or readable options set to
// false. Instead, we'll check the readable and writable state after
// and return closed WritableStream or closed ReadableStream as
// necessary.
if (typeof duplex?._writableState !== 'object' ||
typeof duplex?._readableState !== 'object') {
throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex);
}
if (isDestroyed(duplex)) {
const writable = new WritableStream();
const readable = new ReadableStream();
writable.close();
readable.cancel();
return { readable, writable };
}
const writable =
isWritable(duplex) ?
newWritableStreamFromStreamWritable(duplex) :
new WritableStream();
if (!isWritable(duplex))
writable.close();
const readable =
isReadable(duplex) ?
newReadableStreamFromStreamReadable(duplex) :
new ReadableStream();
if (!isReadable(duplex))
readable.cancel();
return { writable, readable };
}
/**
* @param {ReadableWritablePair} pair
* @param {{
* allowHalfOpen? : boolean,
* decodeStrings? : boolean,
* encoding? : string,
* highWaterMark? : number,
* objectMode? : boolean,
* signal? : AbortSignal,
* }} [options]
* @returns
*/
function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) {
validateObject(pair, 'pair');
const {
readable: readableStream,
writable: writableStream,
} = pair;
if (!isReadableStream(readableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'pair.readable',
'ReadableStream',
readableStream);
}
if (!isWritableStream(writableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'pair.writable',
'WritableStream',
writableStream);
}
validateObject(options, 'options');
const {
allowHalfOpen = false,
objectMode = false,
encoding,
decodeStrings = true,
highWaterMark,
signal,
} = options;
validateBoolean(objectMode, 'options.objectMode');
if (encoding !== undefined && !Buffer.isEncoding(encoding))
throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
const writer = writableStream.getWriter();
const reader = readableStream.getReader();
let writableClosed = false;
let readableClosed = false;
const duplex = new Duplex({
allowHalfOpen,
highWaterMark,
objectMode,
encoding,
decodeStrings,
signal,
writev(chunks, callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(duplex, error));
}
}
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(
PromiseAll(
ArrayPrototypeMap(
chunks,
(chunk) => writer.write(chunk))),
done,
done);
},
done);
},
write(chunk, encoding, callback) {
if (typeof chunk === 'string' && decodeStrings && !objectMode) {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength);
}
function done(error) {
try {
callback(error);
} catch (error) {
destroy(duplex, error);
}
}
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(
writer.write(chunk),
done,
done);
},
done);
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(duplex, error));
}
}
if (!writableClosed) {
PromisePrototypeThen(
writer.close(),
done,
done);
}
},
read() {
PromisePrototypeThen(
reader.read(),
(chunk) => {
if (chunk.done) {
duplex.push(null);
} else {
duplex.push(chunk.value);
}
},
(error) => destroy(duplex, error));
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => { throw error; });
}
}
async function closeWriter() {
if (!writableClosed)
await writer.abort(error);
}
async function closeReader() {
if (!readableClosed)
await reader.cancel(error);
}
if (!writableClosed || !readableClosed) {
PromisePrototypeThen(
PromiseAll([
closeWriter(),
closeReader(),
]),
done,
done);
return;
}
done();
},
});
PromisePrototypeThen(
writer.closed,
() => {
writableClosed = true;
if (!isWritableEnded(duplex))
destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE());
},
(error) => {
writableClosed = true;
readableClosed = true;
destroy(duplex, error);
});
PromisePrototypeThen(
reader.closed,
() => {
readableClosed = true;
if (!isReadableEnded(duplex))
duplex.push(null);
},
(error) => {
writableClosed = true;
readableClosed = true;
destroy(duplex, error);
});
return duplex;
}
/**
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
* @typedef {{}} StreamBase
* @param {StreamBase} streamBase
* @param {QueuingStrategy} strategy
* @returns {WritableStream}
*/
function newWritableStreamFromStreamBase(streamBase, strategy) {
validateObject(streamBase, 'streamBase');
let current;
function createWriteWrap(controller, promise) {
const req = new WriteWrap();
req.handle = streamBase;
req.oncomplete = onWriteComplete;
req.async = false;
req.bytes = 0;
req.buffer = null;
req.controller = controller;
req.promise = promise;
return req;
}
function onWriteComplete(status) {
if (status < 0) {
const error = errnoException(status, 'write', this.error);
this.promise.reject(error);
this.controller.error(error);
return;
}
this.promise.resolve();
}
function doWrite(chunk, controller) {
const promise = createDeferredPromise();
let ret;
let req;
try {
req = createWriteWrap(controller, promise);
ret = streamBase.writeBuffer(req, chunk);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = chunk;
req.async = !!streamBaseState[kLastWriteWasAsync];
} catch (error) {
promise.reject(error);
}
if (ret !== 0)
promise.reject(errnoException(ret, 'write', req));
else if (!req.async)
promise.resolve();
return promise.promise;
}
return new WritableStream({
write(chunk, controller) {
current = current !== undefined ?
PromisePrototypeThen(
current,
() => doWrite(chunk, controller),
(error) => controller.error(error)) :
doWrite(chunk, controller);
return current;
},
close() {
const promise = createDeferredPromise();
const req = new ShutdownWrap();
req.oncomplete = () => promise.resolve();
const err = streamBase.shutdown(req);
if (err === 1)
promise.resolve();
return promise.promise;
},
}, strategy);
}
/**
* @param {StreamBase} streamBase
* @param {QueuingStrategy} strategy
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamBase(streamBase, strategy) {
validateObject(streamBase, 'streamBase');
if (typeof streamBase.onread === 'function')
throw new ERR_INVALID_STATE('StreamBase already has a consumer');
let controller;
streamBase.onread = (arrayBuffer) => {
const nread = streamBaseState[kReadBytesOrError];
if (nread === 0)
return;
try {
if (nread === UV_EOF) {
controller.close();
streamBase.readStop();
return;
}
controller.enqueue(arrayBuffer);
if (controller.desiredSize <= 0)
streamBase.readStop();
} catch (error) {
controller.error(error);
streamBase.readStop();
}
};
return new ReadableStream({
start(c) { controller = c; },
pull() {
streamBase.readStart();
},
cancel() {
const promise = createDeferredPromise();
const req = new ShutdownWrap();
req.oncomplete = () => promise.resolve();
const err = streamBase.shutdown(req);
if (err === 1)
promise.resolve();
return promise.promise;
},
}, strategy);
}
module.exports = {
newWritableStreamFromStreamWritable,
newReadableStreamFromStreamReadable,
newStreamWritableFromWritableStream,
newStreamReadableFromReadableStream,
newReadableWritablePairFromDuplex,
newStreamDuplexFromReadableWritablePair,
newWritableStreamFromStreamBase,
newReadableStreamFromStreamBase,
};

View file

@ -0,0 +1,67 @@
// Flags: --expose-internals --no-warnings
'use strict';
const common = require('../common');
const assert = require('assert');
const {
internalBinding,
} = require('internal/test/binding');
const {
newWritableStreamFromStreamBase,
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');
const {
JSStream
} = internalBinding('js_stream');
{
const stream = new JSStream();
stream.onwrite = common.mustCall((req, buf) => {
assert.deepStrictEqual(buf[0], Buffer.from('hello'));
req.oncomplete();
});
const writable = newWritableStreamFromStreamBase(stream);
const writer = writable.getWriter();
writer.write(Buffer.from('hello')).then(common.mustCall());
}
{
const buf = Buffer.from('hello');
const check = new Uint8Array(buf);
const stream = new JSStream();
const readable = newReadableStreamFromStreamBase(stream);
const reader = readable.getReader();
reader.read().then(common.mustCall(({ done, value }) => {
assert(!done);
assert.deepStrictEqual(new Uint8Array(value), check);
reader.read().then(common.mustCall(({ done, value }) => {
assert(done);
assert.strictEqual(value, undefined);
}));
}));
stream.readBuffer(buf);
stream.emitEOF();
}
{
const stream = new JSStream();
stream.onshutdown = common.mustCall((req) => {
req.oncomplete();
});
const readable = newReadableStreamFromStreamBase(stream);
readable.cancel().then(common.mustCall());
}

View file

@ -0,0 +1,204 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
newReadableStreamFromStreamReadable,
} = require('internal/webstreams/adapters');
const {
Duplex,
Readable,
} = require('stream');
const {
kState,
} = require('internal/webstreams/util');
{
// Canceling the readableStream closes the readable.
const readable = new Readable({
read() {
readable.push('hello');
readable.push(null);
}
});
readable.on('close', common.mustCall());
readable.on('end', common.mustNotCall());
readable.on('pause', common.mustCall());
readable.on('resume', common.mustNotCall());
readable.on('error', common.mustCall((error) => {
assert.strictEqual(error.code, 'ABORT_ERR');
}));
const readableStream = newReadableStreamFromStreamReadable(readable);
readableStream.cancel().then(common.mustCall());
}
{
// Prematurely destroying the stream.Readable without an error
// closes the ReadableStream with a premature close error but does
// not error the readable.
const readable = new Readable({
read() {
readable.push('hello');
readable.push(null);
}
});
const readableStream = newReadableStreamFromStreamReadable(readable);
assert(!readableStream.locked);
const reader = readableStream.getReader();
assert.rejects(reader.closed, {
code: 'ERR_STREAM_PREMATURE_CLOSE',
});
readable.on('end', common.mustNotCall());
readable.on('error', common.mustNotCall());
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'errored');
}));
readable.destroy();
}
{
// Ending the readable without an error just closes the
// readableStream without an error.
const readable = new Readable({
read() {
readable.push('hello');
readable.push(null);
}
});
const readableStream = newReadableStreamFromStreamReadable(readable);
assert(!readableStream.locked);
const reader = readableStream.getReader();
reader.closed.then(common.mustCall());
readable.on('end', common.mustCall());
readable.on('error', common.mustNotCall());
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'closed');
}));
readable.push(null);
}
{
// Destroying the readable with an error should error the readableStream
const error = new Error('boom');
const readable = new Readable({
read() {
readable.push('hello');
readable.push(null);
}
});
const readableStream = newReadableStreamFromStreamReadable(readable);
assert(!readableStream.locked);
const reader = readableStream.getReader();
assert.rejects(reader.closed, error);
readable.on('end', common.mustNotCall());
readable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'errored');
}));
readable.destroy(error);
}
{
const readable = new Readable({
encoding: 'utf8',
read() {
readable.push('hello');
readable.push(null);
}
});
const readableStream = newReadableStreamFromStreamReadable(readable);
const reader = readableStream.getReader();
readable.on('data', common.mustCall());
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
(async () => {
assert.deepStrictEqual(
await reader.read(),
{ value: 'hello', done: false });
assert.deepStrictEqual(
await reader.read(),
{ value: undefined, done: true });
})().then(common.mustCall());
}
{
const data = {};
const readable = new Readable({
objectMode: true,
read() {
readable.push(data);
readable.push(null);
}
});
assert(readable.readableObjectMode);
const readableStream = newReadableStreamFromStreamReadable(readable);
const reader = readableStream.getReader();
readable.on('data', common.mustCall());
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
(async () => {
assert.deepStrictEqual(
await reader.read(),
{ value: data, done: false });
assert.deepStrictEqual(
await reader.read(),
{ value: undefined, done: true });
})().then(common.mustCall());
}
{
const readable = new Readable();
readable.destroy();
const readableStream = newReadableStreamFromStreamReadable(readable);
const reader = readableStream.getReader();
reader.closed.then(common.mustCall());
}
{
const duplex = new Duplex({ readable: false });
duplex.destroy();
const readableStream = newReadableStreamFromStreamReadable(duplex);
const reader = readableStream.getReader();
reader.closed.then(common.mustCall());
}

View file

@ -0,0 +1,250 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
newReadableWritablePairFromDuplex,
} = require('internal/webstreams/adapters');
const {
PassThrough,
} = require('stream');
{
// Destroying the duplex without an error should close
// the readable and error the writable.
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
const reader = readable.getReader();
const writer = writable.getWriter();
assert.rejects(reader.closed, {
code: 'ERR_STREAM_PREMATURE_CLOSE',
});
assert.rejects(writer.closed, {
code: 'ERR_STREAM_PREMATURE_CLOSE',
});
duplex.destroy();
duplex.on('close', common.mustCall());
}
{
// Destroying the duplex with an error should error
// both the readable and writable
const error = new Error('boom');
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
const reader = readable.getReader();
const writer = writable.getWriter();
assert.rejects(reader.closed, error);
assert.rejects(writer.closed, error);
duplex.destroy(error);
}
{
const error = new Error('boom');
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
assert.rejects(writer.closed, error);
reader.cancel(error).then(common.mustCall());
}
{
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustNotCall());
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
writer.closed.then(common.mustCall());
writer.close().then(common.mustCall());
}
{
const error = new Error('boom');
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
const reader = readable.getReader();
const writer = writable.getWriter();
assert.rejects(reader.closed, error);
assert.rejects(writer.closed, error);
writer.abort(error).then(common.mustCall());
}
{
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustCall((error) => {
assert.strictEqual(error.code, 'ABORT_ERR');
}));
const reader = readable.getReader();
const writer = writable.getWriter();
assert.rejects(writer.closed, {
code: 'ABORT_ERR',
});
reader.cancel();
}
{
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustNotCall());
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
assert.rejects(writer.closed, {
code: 'ERR_STREAM_PREMATURE_CLOSE',
});
duplex.end();
}
{
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('data', common.mustCall(2));
duplex.on('close', common.mustCall());
duplex.on('end', common.mustCall());
duplex.on('finish', common.mustCall());
const writer = writable.getWriter();
const reader = readable.getReader();
const ec = new TextEncoder();
const dc = new TextDecoder();
Promise.all([
writer.write(ec.encode('hello')),
reader.read().then(common.mustCall(({ done, value }) => {
assert(!done);
assert.strictEqual(dc.decode(value), 'hello');
})),
reader.read().then(common.mustCall(({ done, value }) => {
assert(!done);
assert.strictEqual(dc.decode(value), 'there');
})),
writer.write(ec.encode('there')),
writer.close(),
reader.read().then(common.mustCall(({ done, value }) => {
assert(done);
assert.strictEqual(value, undefined);
})),
]).then(common.mustCall());
}
{
const duplex = new PassThrough();
duplex.destroy();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
writer.closed.then(common.mustCall());
}
{
const duplex = new PassThrough({ writable: false });
assert(duplex.readable);
assert(!duplex.writable);
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
const reader = readable.getReader();
const writer = writable.getWriter();
writer.closed.then(common.mustCall());
reader.cancel().then(common.mustCall());
}
{
const duplex = new PassThrough({ readable: false });
assert(!duplex.readable);
assert(duplex.writable);
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
writer.close().then(common.mustCall());
}

View file

@ -0,0 +1,149 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
TransformStream,
} = require('stream/web');
const {
newStreamDuplexFromReadableWritablePair,
} = require('internal/webstreams/adapters');
const {
finished,
pipeline,
Readable,
Writable,
} = require('stream');
const {
kState,
} = require('internal/webstreams/util');
{
const transform = new TransformStream();
const duplex = newStreamDuplexFromReadableWritablePair(transform);
assert(transform.readable.locked);
assert(transform.writable.locked);
duplex.destroy();
duplex.on('close', common.mustCall(() => {
assert.strictEqual(transform.readable[kState].state, 'closed');
assert.strictEqual(transform.writable[kState].state, 'errored');
}));
}
{
const error = new Error('boom');
const transform = new TransformStream();
const duplex = newStreamDuplexFromReadableWritablePair(transform);
assert(transform.readable.locked);
assert(transform.writable.locked);
duplex.destroy(error);
duplex.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
duplex.on('close', common.mustCall(() => {
assert.strictEqual(transform.readable[kState].state, 'closed');
assert.strictEqual(transform.writable[kState].state, 'errored');
assert.strictEqual(transform.writable[kState].storedError, error);
}));
}
{
const transform = new TransformStream();
const duplex = new newStreamDuplexFromReadableWritablePair(transform);
duplex.end();
duplex.resume();
duplex.on('close', common.mustCall(() => {
assert.strictEqual(transform.readable[kState].state, 'closed');
assert.strictEqual(transform.writable[kState].state, 'closed');
}));
}
{
const ec = new TextEncoder();
const dc = new TextDecoder();
const transform = new TransformStream({
transform(chunk, controller) {
const text = dc.decode(chunk);
controller.enqueue(ec.encode(text.toUpperCase()));
}
});
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
encoding: 'utf8',
});
duplex.end('hello');
duplex.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, 'HELLO');
}));
duplex.on('end', common.mustCall());
duplex.on('close', common.mustCall(() => {
assert.strictEqual(transform.readable[kState].state, 'closed');
assert.strictEqual(transform.writable[kState].state, 'closed');
}));
}
{
const ec = new TextEncoder();
const dc = new TextDecoder();
const transform = new TransformStream({
transform: common.mustCall((chunk, controller) => {
const text = dc.decode(chunk);
controller.enqueue(ec.encode(text.toUpperCase()));
})
});
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
encoding: 'utf8',
});
finished(duplex, common.mustCall());
duplex.end('hello');
duplex.resume();
}
{
const ec = new TextEncoder();
const dc = new TextDecoder();
const transform = new TransformStream({
transform: common.mustCall((chunk, controller) => {
const text = dc.decode(chunk);
controller.enqueue(ec.encode(text.toUpperCase()));
})
});
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
encoding: 'utf8',
});
const readable = new Readable({
read() {
readable.push(Buffer.from('hello'));
readable.push(null);
}
});
const writable = new Writable({
write: common.mustCall((chunk, encoding, callback) => {
assert.strictEqual(dc.decode(chunk), 'HELLO');
assert.strictEqual(encoding, 'buffer');
callback();
})
});
finished(duplex, common.mustCall());
pipeline(readable, duplex, writable, common.mustCall());
}

View file

@ -0,0 +1,229 @@
// Flags: --expose-internals --no-warnings
'use strict';
const common = require('../common');
const assert = require('assert');
const {
pipeline,
finished,
Writable,
} = require('stream');
const {
ReadableStream,
WritableStream,
} = require('stream/web');
const {
newStreamReadableFromReadableStream,
} = require('internal/webstreams/adapters');
const {
kState,
} = require('internal/webstreams/util');
class MySource {
constructor(value = new Uint8Array(10)) {
this.value = value;
}
start(c) {
this.started = true;
this.controller = c;
}
pull(controller) {
controller.enqueue(this.value);
controller.close();
}
cancel(reason) {
this.canceled = true;
this.cancelReason = reason;
}
}
{
// Destroying the readable without an error closes
// the readableStream.
const readableStream = new ReadableStream();
const readable = newStreamReadableFromReadableStream(readableStream);
assert(readableStream.locked);
assert.rejects(readableStream.cancel(), {
code: 'ERR_INVALID_STATE',
});
assert.rejects(readableStream.pipeTo(new WritableStream()), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => readableStream.tee(), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => readableStream.getReader(), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => {
readableStream.pipeThrough({
readable: new ReadableStream(),
writable: new WritableStream(),
});
}, {
code: 'ERR_INVALID_STATE',
});
readable.destroy();
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'closed');
}));
}
{
// Destroying the readable with an error closes the readableStream
// without error but records the cancel reason in the source.
const error = new Error('boom');
const source = new MySource();
const readableStream = new ReadableStream(source);
const readable = newStreamReadableFromReadableStream(readableStream);
assert(readableStream.locked);
readable.destroy(error);
readable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'closed');
assert.strictEqual(source.cancelReason, error);
}));
}
{
// An error in the source causes the readable to error.
const error = new Error('boom');
const source = new MySource();
const readableStream = new ReadableStream(source);
const readable = newStreamReadableFromReadableStream(readableStream);
assert(readableStream.locked);
source.controller.error(error);
readable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'errored');
}));
}
{
const readableStream = new ReadableStream(new MySource());
const readable = newStreamReadableFromReadableStream(readableStream);
readable.on('data', common.mustCall((chunk) => {
assert.deepStrictEqual(chunk, Buffer.alloc(10));
}));
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
readable.on('error', common.mustNotCall());
}
{
const readableStream = new ReadableStream(new MySource('hello'));
const readable = newStreamReadableFromReadableStream(readableStream, {
encoding: 'utf8',
});
readable.on('data', common.mustCall((chunk) => {
assert.deepStrictEqual(chunk, 'hello');
}));
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
readable.on('error', common.mustNotCall());
}
{
const readableStream = new ReadableStream(new MySource());
const readable = newStreamReadableFromReadableStream(readableStream, {
objectMode: true
});
readable.on('data', common.mustCall((chunk) => {
assert.deepStrictEqual(chunk, new Uint8Array(10));
}));
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
readable.on('error', common.mustNotCall());
}
{
const ec = new TextEncoder();
const readable = new ReadableStream({
start(controller) {
controller.enqueue(ec.encode('hello'));
setImmediate(() => {
controller.enqueue(ec.encode('there'));
controller.close();
});
}
});
const streamReadable = newStreamReadableFromReadableStream(readable);
finished(streamReadable, common.mustCall());
streamReadable.resume();
}
{
const ec = new TextEncoder();
const readable = new ReadableStream({
start(controller) {
controller.enqueue(ec.encode('hello'));
setImmediate(() => {
controller.enqueue(ec.encode('there'));
controller.close();
});
}
});
const streamReadable = newStreamReadableFromReadableStream(readable);
finished(streamReadable, common.mustCall());
streamReadable.resume();
}
{
const ec = new TextEncoder();
const dc = new TextDecoder();
const check = ['hello', 'there'];
const readable = new ReadableStream({
start(controller) {
controller.enqueue(ec.encode('hello'));
setImmediate(() => {
controller.enqueue(ec.encode('there'));
controller.close();
});
}
});
const writable = new Writable({
write: common.mustCall((chunk, encoding, callback) => {
assert.strictEqual(dc.decode(chunk), check.shift());
assert.strictEqual(encoding, 'buffer');
callback();
}, 2),
});
const streamReadable = newStreamReadableFromReadableStream(readable);
pipeline(streamReadable, writable, common.mustCall());
streamReadable.resume();
}

View file

@ -0,0 +1,231 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
WritableStream,
} = require('stream/web');
const {
newStreamWritableFromWritableStream,
} = require('internal/webstreams/adapters');
const {
finished,
pipeline,
Readable,
} = require('stream');
const {
kState,
} = require('internal/webstreams/util');
class TestSource {
constructor() {
this.chunks = [];
}
start(c) {
this.controller = c;
this.started = true;
}
write(chunk) {
this.chunks.push(chunk);
}
close() {
this.closed = true;
}
abort(reason) {
this.abortReason = reason;
}
}
[1, {}, false, []].forEach((arg) => {
assert.throws(() => newStreamWritableFromWritableStream(arg), {
code: 'ERR_INVALID_ARG_TYPE',
});
});
{
// Ending the stream.Writable should close the writableStream
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
assert(writableStream.locked);
writable.end('chunk');
writable.on('close', common.mustCall(() => {
assert(writableStream.locked);
assert.strictEqual(writableStream[kState].state, 'closed');
assert.strictEqual(source.chunks.length, 1);
assert.deepStrictEqual(source.chunks[0], Buffer.from('chunk'));
}));
}
{
// Destroying the stream.Writable without an error should close
// the writableStream with no error.
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
assert(writableStream.locked);
writable.destroy();
writable.on('close', common.mustCall(() => {
assert(writableStream.locked);
assert.strictEqual(writableStream[kState].state, 'closed');
assert.strictEqual(source.chunks.length, 0);
}));
}
{
// Destroying the stream.Writable with an error should error
// the writableStream
const error = new Error('boom');
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
assert(writableStream.locked);
writable.destroy(error);
writable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
writable.on('close', common.mustCall(() => {
assert(writableStream.locked);
assert.strictEqual(writableStream[kState].state, 'errored');
assert.strictEqual(writableStream[kState].storedError, error);
assert.strictEqual(source.chunks.length, 0);
}));
}
{
// Attempting to close, abort, or getWriter on writableStream
// should fail because it is locked. An internal error in
// writableStream should error the writable.
const error = new Error('boom');
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
assert(writableStream.locked);
assert.rejects(writableStream.close(), {
code: 'ERR_INVALID_STATE',
});
assert.rejects(writableStream.abort(), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => writableStream.getWriter(), {
code: 'ERR_INVALID_STATE',
});
writable.on('error', common.mustCall((reason) => {
assert.strictEqual(error, reason);
}));
source.controller.error(error);
}
{
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
writable.on('error', common.mustNotCall());
writable.on('finish', common.mustCall());
writable.on('close', common.mustCall(() => {
assert.strictEqual(source.chunks.length, 1);
assert.deepStrictEqual(source.chunks[0], Buffer.from('hello'));
}));
writable.write('hello', common.mustCall());
writable.end();
}
{
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable =
newStreamWritableFromWritableStream(writableStream, {
decodeStrings: false,
});
writable.on('error', common.mustNotCall());
writable.on('finish', common.mustCall());
writable.on('close', common.mustCall(() => {
assert.strictEqual(source.chunks.length, 1);
assert.deepStrictEqual(source.chunks[0], 'hello');
}));
writable.write('hello', common.mustCall());
writable.end();
}
{
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable =
newStreamWritableFromWritableStream(
writableStream, {
objectMode: true
});
assert(writable.writableObjectMode);
writable.on('error', common.mustNotCall());
writable.on('finish', common.mustCall());
writable.on('close', common.mustCall(() => {
assert.strictEqual(source.chunks.length, 1);
assert.strictEqual(source.chunks[0], 'hello');
}));
writable.write('hello', common.mustCall());
writable.end();
}
{
const writableStream = new WritableStream({
write: common.mustCall(2),
close: common.mustCall(),
});
const writable = newStreamWritableFromWritableStream(writableStream);
finished(writable, common.mustCall());
writable.write('hello');
writable.write('world');
writable.end();
}
{
const writableStream = new WritableStream({
write: common.mustCall(2),
close: common.mustCall(),
});
const writable = newStreamWritableFromWritableStream(writableStream);
const readable = new Readable({
read() {
readable.push(Buffer.from('hello'));
readable.push(Buffer.from('world'));
readable.push(null);
}
});
pipeline(readable, writable, common.mustCall());
}

View file

@ -0,0 +1,167 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
newWritableStreamFromStreamWritable,
} = require('internal/webstreams/adapters');
const {
Duplex,
Writable,
PassThrough,
} = require('stream');
class TestWritable extends Writable {
constructor(asyncWrite = false) {
super();
this.chunks = [];
this.asyncWrite = asyncWrite;
}
_write(chunk, encoding, callback) {
this.chunks.push({ chunk, encoding });
if (this.asyncWrite) {
setImmediate(() => callback());
return;
}
callback();
}
}
[1, {}, false, []].forEach((arg) => {
assert.throws(() => newWritableStreamFromStreamWritable(arg), {
code: 'ERR_INVALID_ARG_TYPE',
});
});
{
// Closing the WritableStream normally closes the stream.Writable
// without errors.
const writable = new TestWritable();
writable.on('error', common.mustNotCall());
writable.on('finish', common.mustCall());
writable.on('close', common.mustCall());
const writableStream = newWritableStreamFromStreamWritable(writable);
writableStream.close().then(common.mustCall(() => {
assert(writable.destroyed);
}));
}
{
// Aborting the WritableStream errors the stream.Writable
const error = new Error('boom');
const writable = new TestWritable();
writable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
writable.on('finish', common.mustNotCall());
writable.on('close', common.mustCall());
const writableStream = newWritableStreamFromStreamWritable(writable);
writableStream.abort(error).then(common.mustCall(() => {
assert(writable.destroyed);
}));
}
{
// Destroying the stream.Writable prematurely errors the
// WritableStream
const error = new Error('boom');
const writable = new TestWritable();
const writableStream = newWritableStreamFromStreamWritable(writable);
assert.rejects(writableStream.close(), error);
writable.destroy(error);
}
{
// Ending the stream.Writable directly errors the WritableStream
const writable = new TestWritable();
const writableStream = newWritableStreamFromStreamWritable(writable);
assert.rejects(writableStream.close(), {
code: 'ERR_STREAM_PREMATURE_CLOSE'
});
writable.end();
}
{
const writable = new TestWritable();
const writableStream = newWritableStreamFromStreamWritable(writable);
const writer = writableStream.getWriter();
const ec = new TextEncoder();
writer.write(ec.encode('hello')).then(common.mustCall(() => {
assert.strictEqual(writable.chunks.length, 1);
assert.deepStrictEqual(
writable.chunks[0],
{
chunk: Buffer.from('hello'),
encoding: 'buffer'
});
}));
}
{
const writable = new TestWritable(true);
writable.on('error', common.mustNotCall());
writable.on('close', common.mustCall());
writable.on('finish', common.mustCall());
const writableStream = newWritableStreamFromStreamWritable(writable);
const writer = writableStream.getWriter();
const ec = new TextEncoder();
writer.write(ec.encode('hello')).then(common.mustCall(() => {
assert.strictEqual(writable.chunks.length, 1);
assert.deepStrictEqual(
writable.chunks[0],
{
chunk: Buffer.from('hello'),
encoding: 'buffer'
});
writer.close().then(common.mustCall());
}));
}
{
const duplex = new PassThrough();
duplex.setEncoding('utf8');
const writableStream = newWritableStreamFromStreamWritable(duplex);
const ec = new TextEncoder();
writableStream
.getWriter()
.write(ec.encode('hello'))
.then(common.mustCall());
duplex.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, 'hello');
}));
}
{
const writable = new Writable();
writable.destroy();
const writableStream = newWritableStreamFromStreamWritable(writable);
const writer = writableStream.getWriter();
writer.closed.then(common.mustCall());
}
{
const duplex = new Duplex({ writable: false });
const writableStream = newWritableStreamFromStreamWritable(duplex);
const writer = writableStream.getWriter();
writer.closed.then(common.mustCall());
}