stream: implement streams to webstreams adapters

Experimental adapters for the webstreams API

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: https://github.com/nodejs/node/pull/39134
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
James M Snell 2021-06-23 22:24:19 -07:00
parent 09b57f7909
commit a99c230305
No known key found for this signature in database
GPG key ID: 7341B15C070877AC
13 changed files with 2358 additions and 0 deletions

View file

@ -1892,6 +1892,87 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
the strings or buffers be iterated to match the other streams semantics
for performance reasons.
### `stream.Readable.fromWeb(readableStream[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `readableStream` {ReadableStream}
* `options` {Object}
* `encoding` {string}
* `highWaterMark` {number}
* `objectModel` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Readable}
### `stream.Readable.toWeb(streamReadable)`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `streamReadable` {stream.Readable}
* Returns: {ReadableStream}
### `stream.Writable.fromWeb(writableStream[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `writableStream` {WritableStream}
* `options` {Object}
* `decodeStrings` {boolean}
* `highWaterMark` {number}
* `objectMode` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Writable}
### `stream.Writable.toWeb(streamWritable)`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `streamWritable` {stream.Writable}
* Returns: {WritableStream}
### `stream.Duplex.fromWeb(pair[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `pair` {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}
* `options` {Object}
* `allowHalfOpen` {boolean}
* `decodeStrings` {boolean}
* `encoding` {string}
* `highWaterMark` {number}
* `objectMode` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Duplex}
### `stream.Duplex.toWeb(streamDuplex)`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `streamDuplex` {stream.Duplex}
* Returns: {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}
### `stream.addAbortSignal(signal, stream)`
<!-- YAML
added: v15.4.0

View file

@ -788,6 +788,8 @@ module.exports = {
appendFile,
readFile,
watch,
kHandle,
},
FileHandle,

View file

@ -114,3 +114,22 @@ ObjectDefineProperties(Duplex.prototype, {
}
}
});
let webStreamsAdapters;
// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}
Duplex.fromWeb = function(pair, options) {
return lazyWebStreams().newStreamDuplexFromReadableWritablePair(
pair,
options);
};
Duplex.toWeb = function(duplex) {
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
};

View file

@ -1355,3 +1355,22 @@ function endWritableNT(state, stream) {
Readable.from = function(iterable, opts) {
return from(Readable, iterable, opts);
};
let webStreamsAdapters;
// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}
Readable.fromWeb = function(readableStream, options) {
return lazyWebStreams().newStreamReadableFromReadableStream(
readableStream,
options);
};
Readable.toWeb = function(streamReadable) {
return lazyWebStreams().newStreamReadableFromReadableStream(streamReadable);
};

View file

@ -872,3 +872,22 @@ Writable.prototype._destroy = function(err, cb) {
Writable.prototype[EE.captureRejectionSymbol] = function(err) {
this.destroy(err);
};
let webStreamsAdapters;
// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}
Writable.fromWeb = function(writableStream, options) {
return lazyWebStreams().newStreamWritableFromWritableStream(
writableStream,
options);
};
Writable.toWeb = function(streamWritable) {
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
};

View file

@ -0,0 +1,921 @@
'use strict';
const {
ArrayPrototypeMap,
PromiseAll,
PromisePrototypeThen,
PromisePrototypeFinally,
PromiseResolve,
Uint8Array,
} = primordials;
const {
ReadableStream,
isReadableStream,
} = require('internal/webstreams/readablestream');
const {
WritableStream,
isWritableStream,
} = require('internal/webstreams/writablestream');
const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');
const {
Writable,
Readable,
Duplex,
destroy,
} = require('stream');
const {
isDestroyed,
isReadable,
isReadableEnded,
isWritable,
isWritableEnded,
} = require('internal/streams/utils');
const {
Buffer,
} = require('buffer');
const {
errnoException,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_STATE,
ERR_STREAM_PREMATURE_CLOSE,
},
} = require('internal/errors');
const {
createDeferredPromise,
} = require('internal/util');
const {
validateBoolean,
validateObject,
} = require('internal/validators');
const {
WriteWrap,
ShutdownWrap,
kReadBytesOrError,
kLastWriteWasAsync,
streamBaseState,
} = internalBinding('stream_wrap');
const finished = require('internal/streams/end-of-stream');
const { UV_EOF } = internalBinding('uv');
/**
* @typedef {import('../../stream').Writable} Writable
* @typedef {import('../../stream').Readable} Readable
* @typedef {import('./writablestream').WritableStream} WritableStream
* @typedef {import('./readablestream').ReadableStream} ReadableStream
*
* @typedef {import('../abort_controller').AbortSignal} AbortSignal
*/
/**
* @param {Writable} streamWritable
* @returns {WritableStream}
*/
function newWritableStreamFromStreamWritable(streamWritable) {
// Not using the internal/streams/utils isWritableNodeStream utility
// here because it will return false if streamWritable is a Duplex
// whose writable option is false. For a Duplex that is not writable,
// we want it to pass this check but return a closed WritableStream.
if (typeof streamWritable?._writableState !== 'object') {
throw new ERR_INVALID_ARG_TYPE(
'streamWritable',
'stream.Writable',
streamWritable);
}
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
const writable = new WritableStream();
writable.close();
return writable;
}
const highWaterMark = streamWritable.writableHighWaterMark;
const strategy =
streamWritable.writableObjectMode ?
new CountQueuingStrategy({ highWaterMark }) :
{ highWaterMark };
let controller;
let backpressurePromise;
let closed;
function onDrain() {
if (backpressurePromise !== undefined)
backpressurePromise.resolve();
}
const cleanup = finished(streamWritable, (error) => {
cleanup();
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamWritable.on('error', () => {});
if (error != null) {
if (backpressurePromise !== undefined)
backpressurePromise.reject(error);
// If closed is not undefined, the error is happening
// after the WritableStream close has already started.
// We need to reject it here.
if (closed !== undefined) {
closed.reject(error);
closed = undefined;
}
controller.error(error);
controller = undefined;
return;
}
if (closed !== undefined) {
closed.resolve();
closed = undefined;
return;
}
controller.error(new ERR_STREAM_PREMATURE_CLOSE());
controller = undefined;
});
streamWritable.on('drain', onDrain);
return new WritableStream({
start(c) { controller = c; },
async write(chunk) {
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = createDeferredPromise();
return PromisePrototypeFinally(
backpressurePromise.promise, () => {
backpressurePromise = undefined;
});
}
},
abort(reason) {
destroy(streamWritable, reason);
},
close() {
if (closed === undefined && !isWritableEnded(streamWritable)) {
closed = createDeferredPromise();
streamWritable.end();
return closed.promise;
}
controller = undefined;
return PromiseResolve();
},
}, strategy);
}
/**
* @param {WritableStream} writableStream
* @param {{
* decodeStrings? : boolean,
* highWaterMark? : number,
* objectMode? : boolean,
* signal? : AbortSignal,
* }} [options]
* @returns {Writable}
*/
function newStreamWritableFromWritableStream(writableStream, options = {}) {
if (!isWritableStream(writableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'writableStream',
'WritableStream',
writableStream);
}
validateObject(options, 'options');
const {
highWaterMark,
decodeStrings = true,
objectMode = false,
signal,
} = options;
validateBoolean(objectMode, 'options.objectMode');
validateBoolean(decodeStrings, 'options.decodeStrings');
const writer = writableStream.getWriter();
let closed = false;
const writable = new Writable({
highWaterMark,
objectMode,
decodeStrings,
signal,
writev(chunks, callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(writable, error));
}
}
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(
PromiseAll(
ArrayPrototypeMap(
chunks,
(chunk) => writer.write(chunk))),
done,
done);
},
done);
},
write(chunk, encoding, callback) {
if (typeof chunk === 'string' && decodeStrings && !objectMode) {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength);
}
function done(error) {
try {
callback(error);
} catch (error) {
destroy(writable, error);
}
}
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(
writer.write(chunk),
done,
done);
},
done);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => { throw error; });
}
}
if (!closed) {
if (error != null) {
PromisePrototypeThen(
writer.abort(error),
done,
done);
} else {
PromisePrototypeThen(
writer.close(),
done,
done);
}
return;
}
done();
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(writable, error));
}
}
if (!closed) {
PromisePrototypeThen(
writer.close(),
done,
done);
}
},
});
PromisePrototypeThen(
writer.closed,
() => {
// If the WritableStream closes before the stream.Writable has been
// ended, we signal an error on the stream.Writable.
closed = true;
if (!isWritableEnded(writable))
destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
},
(error) => {
// If the WritableStream errors before the stream.Writable has been
// destroyed, signal an error on the stream.Writable.
closed = true;
destroy(writable, error);
});
return writable;
}
/**
* @param {Readable} streamReadable
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamReadable(streamReadable) {
// Not using the internal/streams/utils isReadableNodeStream utility
// here because it will return false if streamReadable is a Duplex
// whose readable option is false. For a Duplex that is not readable,
// we want it to pass this check but return a closed ReadableStream.
if (typeof streamReadable?._readableState !== 'object') {
throw new ERR_INVALID_ARG_TYPE(
'streamReadable',
'stream.Readable',
streamReadable);
}
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
const readable = new ReadableStream();
readable.cancel();
return readable;
}
const objectMode = streamReadable.readableObjectMode;
const highWaterMark = streamReadable.readableHighWaterMark;
// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
const strategy =
objectMode ?
new CountQueuingStrategy({ highWaterMark }) :
{ highWaterMark };
let controller;
function onData(chunk) {
// Copy the Buffer to detach it from the pool.
if (Buffer.isBuffer(chunk) && !objectMode)
chunk = new Uint8Array(chunk);
controller.enqueue(chunk);
if (controller.desiredSize <= 0)
streamReadable.pause();
}
streamReadable.pause();
const cleanup = finished(streamReadable, (error) => {
cleanup();
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamReadable.on('error', () => {});
if (error)
return controller.error(error);
controller.close();
});
streamReadable.on('data', onData);
return new ReadableStream({
start(c) { controller = c; },
pull() { streamReadable.resume(); },
cancel(reason) {
destroy(streamReadable, reason);
},
}, strategy);
}
/**
* @param {ReadableStream} readableStream
* @param {{
* highWaterMark? : number,
* encoding? : string,
* objectMode? : boolean,
* signal? : AbortSignal,
* }} [options]
* @returns {Readable}
*/
function newStreamReadableFromReadableStream(readableStream, options = {}) {
if (!isReadableStream(readableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'readableStream',
'ReadableStream',
readableStream);
}
validateObject(options, 'options');
const {
highWaterMark,
encoding,
objectMode = false,
signal,
} = options;
if (encoding !== undefined && !Buffer.isEncoding(encoding))
throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
validateBoolean(objectMode, 'options.objectMode');
const reader = readableStream.getReader();
let closed = false;
const readable = new Readable({
objectMode,
highWaterMark,
encoding,
signal,
read() {
PromisePrototypeThen(
reader.read(),
(chunk) => {
if (chunk.done) {
// Value should always be undefined here.
readable.push(null);
} else {
readable.push(chunk.value);
}
},
(error) => destroy(readable, error));
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => { throw error; });
}
}
if (!closed) {
PromisePrototypeThen(
reader.cancel(error),
done,
done);
return;
}
done(error);
},
});
PromisePrototypeThen(
reader.closed,
() => {
closed = true;
if (!isReadableEnded(readable))
readable.push(null);
},
(error) => {
closed = true;
destroy(readable, error);
});
return readable;
}
/**
* @typedef {import('./readablestream').ReadableWritablePair
* } ReadableWritablePair
* @typedef {import('../../stream').Duplex} Duplex
*
* @param {Duplex} duplex
* @returns {ReadableWritablePair}
*/
function newReadableWritablePairFromDuplex(duplex) {
// Not using the internal/streams/utils isWritableNodeStream and
// isReadableNodestream utilities here because they will return false
// if the duplex was created with writable or readable options set to
// false. Instead, we'll check the readable and writable state after
// and return closed WritableStream or closed ReadableStream as
// necessary.
if (typeof duplex?._writableState !== 'object' ||
typeof duplex?._readableState !== 'object') {
throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex);
}
if (isDestroyed(duplex)) {
const writable = new WritableStream();
const readable = new ReadableStream();
writable.close();
readable.cancel();
return { readable, writable };
}
const writable =
isWritable(duplex) ?
newWritableStreamFromStreamWritable(duplex) :
new WritableStream();
if (!isWritable(duplex))
writable.close();
const readable =
isReadable(duplex) ?
newReadableStreamFromStreamReadable(duplex) :
new ReadableStream();
if (!isReadable(duplex))
readable.cancel();
return { writable, readable };
}
/**
* @param {ReadableWritablePair} pair
* @param {{
* allowHalfOpen? : boolean,
* decodeStrings? : boolean,
* encoding? : string,
* highWaterMark? : number,
* objectMode? : boolean,
* signal? : AbortSignal,
* }} [options]
* @returns
*/
function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) {
validateObject(pair, 'pair');
const {
readable: readableStream,
writable: writableStream,
} = pair;
if (!isReadableStream(readableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'pair.readable',
'ReadableStream',
readableStream);
}
if (!isWritableStream(writableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'pair.writable',
'WritableStream',
writableStream);
}
validateObject(options, 'options');
const {
allowHalfOpen = false,
objectMode = false,
encoding,
decodeStrings = true,
highWaterMark,
signal,
} = options;
validateBoolean(objectMode, 'options.objectMode');
if (encoding !== undefined && !Buffer.isEncoding(encoding))
throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
const writer = writableStream.getWriter();
const reader = readableStream.getReader();
let writableClosed = false;
let readableClosed = false;
const duplex = new Duplex({
allowHalfOpen,
highWaterMark,
objectMode,
encoding,
decodeStrings,
signal,
writev(chunks, callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(duplex, error));
}
}
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(
PromiseAll(
ArrayPrototypeMap(
chunks,
(chunk) => writer.write(chunk))),
done,
done);
},
done);
},
write(chunk, encoding, callback) {
if (typeof chunk === 'string' && decodeStrings && !objectMode) {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength);
}
function done(error) {
try {
callback(error);
} catch (error) {
destroy(duplex, error);
}
}
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(
writer.write(chunk),
done,
done);
},
done);
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(duplex, error));
}
}
if (!writableClosed) {
PromisePrototypeThen(
writer.close(),
done,
done);
}
},
read() {
PromisePrototypeThen(
reader.read(),
(chunk) => {
if (chunk.done) {
duplex.push(null);
} else {
duplex.push(chunk.value);
}
},
(error) => destroy(duplex, error));
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => { throw error; });
}
}
async function closeWriter() {
if (!writableClosed)
await writer.abort(error);
}
async function closeReader() {
if (!readableClosed)
await reader.cancel(error);
}
if (!writableClosed || !readableClosed) {
PromisePrototypeThen(
PromiseAll([
closeWriter(),
closeReader(),
]),
done,
done);
return;
}
done();
},
});
PromisePrototypeThen(
writer.closed,
() => {
writableClosed = true;
if (!isWritableEnded(duplex))
destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE());
},
(error) => {
writableClosed = true;
readableClosed = true;
destroy(duplex, error);
});
PromisePrototypeThen(
reader.closed,
() => {
readableClosed = true;
if (!isReadableEnded(duplex))
duplex.push(null);
},
(error) => {
writableClosed = true;
readableClosed = true;
destroy(duplex, error);
});
return duplex;
}
/**
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
* @typedef {{}} StreamBase
* @param {StreamBase} streamBase
* @param {QueuingStrategy} strategy
* @returns {WritableStream}
*/
function newWritableStreamFromStreamBase(streamBase, strategy) {
validateObject(streamBase, 'streamBase');
let current;
function createWriteWrap(controller, promise) {
const req = new WriteWrap();
req.handle = streamBase;
req.oncomplete = onWriteComplete;
req.async = false;
req.bytes = 0;
req.buffer = null;
req.controller = controller;
req.promise = promise;
return req;
}
function onWriteComplete(status) {
if (status < 0) {
const error = errnoException(status, 'write', this.error);
this.promise.reject(error);
this.controller.error(error);
return;
}
this.promise.resolve();
}
function doWrite(chunk, controller) {
const promise = createDeferredPromise();
let ret;
let req;
try {
req = createWriteWrap(controller, promise);
ret = streamBase.writeBuffer(req, chunk);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = chunk;
req.async = !!streamBaseState[kLastWriteWasAsync];
} catch (error) {
promise.reject(error);
}
if (ret !== 0)
promise.reject(errnoException(ret, 'write', req));
else if (!req.async)
promise.resolve();
return promise.promise;
}
return new WritableStream({
write(chunk, controller) {
current = current !== undefined ?
PromisePrototypeThen(
current,
() => doWrite(chunk, controller),
(error) => controller.error(error)) :
doWrite(chunk, controller);
return current;
},
close() {
const promise = createDeferredPromise();
const req = new ShutdownWrap();
req.oncomplete = () => promise.resolve();
const err = streamBase.shutdown(req);
if (err === 1)
promise.resolve();
return promise.promise;
},
}, strategy);
}
/**
* @param {StreamBase} streamBase
* @param {QueuingStrategy} strategy
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamBase(streamBase, strategy) {
validateObject(streamBase, 'streamBase');
if (typeof streamBase.onread === 'function')
throw new ERR_INVALID_STATE('StreamBase already has a consumer');
let controller;
streamBase.onread = (arrayBuffer) => {
const nread = streamBaseState[kReadBytesOrError];
if (nread === 0)
return;
try {
if (nread === UV_EOF) {
controller.close();
streamBase.readStop();
return;
}
controller.enqueue(arrayBuffer);
if (controller.desiredSize <= 0)
streamBase.readStop();
} catch (error) {
controller.error(error);
streamBase.readStop();
}
};
return new ReadableStream({
start(c) { controller = c; },
pull() {
streamBase.readStart();
},
cancel() {
const promise = createDeferredPromise();
const req = new ShutdownWrap();
req.oncomplete = () => promise.resolve();
const err = streamBase.shutdown(req);
if (err === 1)
promise.resolve();
return promise.promise;
},
}, strategy);
}
module.exports = {
newWritableStreamFromStreamWritable,
newReadableStreamFromStreamReadable,
newStreamWritableFromWritableStream,
newStreamReadableFromReadableStream,
newReadableWritablePairFromDuplex,
newStreamDuplexFromReadableWritablePair,
newWritableStreamFromStreamBase,
newReadableStreamFromStreamBase,
};

View file

@ -0,0 +1,67 @@
// Flags: --expose-internals --no-warnings
'use strict';
const common = require('../common');
const assert = require('assert');
const {
internalBinding,
} = require('internal/test/binding');
const {
newWritableStreamFromStreamBase,
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');
const {
JSStream
} = internalBinding('js_stream');
{
const stream = new JSStream();
stream.onwrite = common.mustCall((req, buf) => {
assert.deepStrictEqual(buf[0], Buffer.from('hello'));
req.oncomplete();
});
const writable = newWritableStreamFromStreamBase(stream);
const writer = writable.getWriter();
writer.write(Buffer.from('hello')).then(common.mustCall());
}
{
const buf = Buffer.from('hello');
const check = new Uint8Array(buf);
const stream = new JSStream();
const readable = newReadableStreamFromStreamBase(stream);
const reader = readable.getReader();
reader.read().then(common.mustCall(({ done, value }) => {
assert(!done);
assert.deepStrictEqual(new Uint8Array(value), check);
reader.read().then(common.mustCall(({ done, value }) => {
assert(done);
assert.strictEqual(value, undefined);
}));
}));
stream.readBuffer(buf);
stream.emitEOF();
}
{
const stream = new JSStream();
stream.onshutdown = common.mustCall((req) => {
req.oncomplete();
});
const readable = newReadableStreamFromStreamBase(stream);
readable.cancel().then(common.mustCall());
}

View file

@ -0,0 +1,204 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
newReadableStreamFromStreamReadable,
} = require('internal/webstreams/adapters');
const {
Duplex,
Readable,
} = require('stream');
const {
kState,
} = require('internal/webstreams/util');
{
// Canceling the readableStream closes the readable.
const readable = new Readable({
read() {
readable.push('hello');
readable.push(null);
}
});
readable.on('close', common.mustCall());
readable.on('end', common.mustNotCall());
readable.on('pause', common.mustCall());
readable.on('resume', common.mustNotCall());
readable.on('error', common.mustCall((error) => {
assert.strictEqual(error.code, 'ABORT_ERR');
}));
const readableStream = newReadableStreamFromStreamReadable(readable);
readableStream.cancel().then(common.mustCall());
}
{
// Prematurely destroying the stream.Readable without an error
// closes the ReadableStream with a premature close error but does
// not error the readable.
const readable = new Readable({
read() {
readable.push('hello');
readable.push(null);
}
});
const readableStream = newReadableStreamFromStreamReadable(readable);
assert(!readableStream.locked);
const reader = readableStream.getReader();
assert.rejects(reader.closed, {
code: 'ERR_STREAM_PREMATURE_CLOSE',
});
readable.on('end', common.mustNotCall());
readable.on('error', common.mustNotCall());
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'errored');
}));
readable.destroy();
}
{
// Ending the readable without an error just closes the
// readableStream without an error.
const readable = new Readable({
read() {
readable.push('hello');
readable.push(null);
}
});
const readableStream = newReadableStreamFromStreamReadable(readable);
assert(!readableStream.locked);
const reader = readableStream.getReader();
reader.closed.then(common.mustCall());
readable.on('end', common.mustCall());
readable.on('error', common.mustNotCall());
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'closed');
}));
readable.push(null);
}
{
// Destroying the readable with an error should error the readableStream
const error = new Error('boom');
const readable = new Readable({
read() {
readable.push('hello');
readable.push(null);
}
});
const readableStream = newReadableStreamFromStreamReadable(readable);
assert(!readableStream.locked);
const reader = readableStream.getReader();
assert.rejects(reader.closed, error);
readable.on('end', common.mustNotCall());
readable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'errored');
}));
readable.destroy(error);
}
{
const readable = new Readable({
encoding: 'utf8',
read() {
readable.push('hello');
readable.push(null);
}
});
const readableStream = newReadableStreamFromStreamReadable(readable);
const reader = readableStream.getReader();
readable.on('data', common.mustCall());
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
(async () => {
assert.deepStrictEqual(
await reader.read(),
{ value: 'hello', done: false });
assert.deepStrictEqual(
await reader.read(),
{ value: undefined, done: true });
})().then(common.mustCall());
}
{
const data = {};
const readable = new Readable({
objectMode: true,
read() {
readable.push(data);
readable.push(null);
}
});
assert(readable.readableObjectMode);
const readableStream = newReadableStreamFromStreamReadable(readable);
const reader = readableStream.getReader();
readable.on('data', common.mustCall());
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
(async () => {
assert.deepStrictEqual(
await reader.read(),
{ value: data, done: false });
assert.deepStrictEqual(
await reader.read(),
{ value: undefined, done: true });
})().then(common.mustCall());
}
{
const readable = new Readable();
readable.destroy();
const readableStream = newReadableStreamFromStreamReadable(readable);
const reader = readableStream.getReader();
reader.closed.then(common.mustCall());
}
{
const duplex = new Duplex({ readable: false });
duplex.destroy();
const readableStream = newReadableStreamFromStreamReadable(duplex);
const reader = readableStream.getReader();
reader.closed.then(common.mustCall());
}

View file

@ -0,0 +1,250 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
newReadableWritablePairFromDuplex,
} = require('internal/webstreams/adapters');
const {
PassThrough,
} = require('stream');
{
// Destroying the duplex without an error should close
// the readable and error the writable.
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
const reader = readable.getReader();
const writer = writable.getWriter();
assert.rejects(reader.closed, {
code: 'ERR_STREAM_PREMATURE_CLOSE',
});
assert.rejects(writer.closed, {
code: 'ERR_STREAM_PREMATURE_CLOSE',
});
duplex.destroy();
duplex.on('close', common.mustCall());
}
{
// Destroying the duplex with an error should error
// both the readable and writable
const error = new Error('boom');
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
const reader = readable.getReader();
const writer = writable.getWriter();
assert.rejects(reader.closed, error);
assert.rejects(writer.closed, error);
duplex.destroy(error);
}
{
const error = new Error('boom');
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
assert.rejects(writer.closed, error);
reader.cancel(error).then(common.mustCall());
}
{
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustNotCall());
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
writer.closed.then(common.mustCall());
writer.close().then(common.mustCall());
}
{
const error = new Error('boom');
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
const reader = readable.getReader();
const writer = writable.getWriter();
assert.rejects(reader.closed, error);
assert.rejects(writer.closed, error);
writer.abort(error).then(common.mustCall());
}
{
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustCall((error) => {
assert.strictEqual(error.code, 'ABORT_ERR');
}));
const reader = readable.getReader();
const writer = writable.getWriter();
assert.rejects(writer.closed, {
code: 'ABORT_ERR',
});
reader.cancel();
}
{
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('close', common.mustCall());
duplex.on('error', common.mustNotCall());
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
assert.rejects(writer.closed, {
code: 'ERR_STREAM_PREMATURE_CLOSE',
});
duplex.end();
}
{
const duplex = new PassThrough();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
duplex.on('data', common.mustCall(2));
duplex.on('close', common.mustCall());
duplex.on('end', common.mustCall());
duplex.on('finish', common.mustCall());
const writer = writable.getWriter();
const reader = readable.getReader();
const ec = new TextEncoder();
const dc = new TextDecoder();
Promise.all([
writer.write(ec.encode('hello')),
reader.read().then(common.mustCall(({ done, value }) => {
assert(!done);
assert.strictEqual(dc.decode(value), 'hello');
})),
reader.read().then(common.mustCall(({ done, value }) => {
assert(!done);
assert.strictEqual(dc.decode(value), 'there');
})),
writer.write(ec.encode('there')),
writer.close(),
reader.read().then(common.mustCall(({ done, value }) => {
assert(done);
assert.strictEqual(value, undefined);
})),
]).then(common.mustCall());
}
{
const duplex = new PassThrough();
duplex.destroy();
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
writer.closed.then(common.mustCall());
}
{
const duplex = new PassThrough({ writable: false });
assert(duplex.readable);
assert(!duplex.writable);
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
const reader = readable.getReader();
const writer = writable.getWriter();
writer.closed.then(common.mustCall());
reader.cancel().then(common.mustCall());
}
{
const duplex = new PassThrough({ readable: false });
assert(!duplex.readable);
assert(duplex.writable);
const {
readable,
writable,
} = newReadableWritablePairFromDuplex(duplex);
const reader = readable.getReader();
const writer = writable.getWriter();
reader.closed.then(common.mustCall());
writer.close().then(common.mustCall());
}

View file

@ -0,0 +1,149 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
TransformStream,
} = require('stream/web');
const {
newStreamDuplexFromReadableWritablePair,
} = require('internal/webstreams/adapters');
const {
finished,
pipeline,
Readable,
Writable,
} = require('stream');
const {
kState,
} = require('internal/webstreams/util');
{
const transform = new TransformStream();
const duplex = newStreamDuplexFromReadableWritablePair(transform);
assert(transform.readable.locked);
assert(transform.writable.locked);
duplex.destroy();
duplex.on('close', common.mustCall(() => {
assert.strictEqual(transform.readable[kState].state, 'closed');
assert.strictEqual(transform.writable[kState].state, 'errored');
}));
}
{
const error = new Error('boom');
const transform = new TransformStream();
const duplex = newStreamDuplexFromReadableWritablePair(transform);
assert(transform.readable.locked);
assert(transform.writable.locked);
duplex.destroy(error);
duplex.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
duplex.on('close', common.mustCall(() => {
assert.strictEqual(transform.readable[kState].state, 'closed');
assert.strictEqual(transform.writable[kState].state, 'errored');
assert.strictEqual(transform.writable[kState].storedError, error);
}));
}
{
const transform = new TransformStream();
const duplex = new newStreamDuplexFromReadableWritablePair(transform);
duplex.end();
duplex.resume();
duplex.on('close', common.mustCall(() => {
assert.strictEqual(transform.readable[kState].state, 'closed');
assert.strictEqual(transform.writable[kState].state, 'closed');
}));
}
{
const ec = new TextEncoder();
const dc = new TextDecoder();
const transform = new TransformStream({
transform(chunk, controller) {
const text = dc.decode(chunk);
controller.enqueue(ec.encode(text.toUpperCase()));
}
});
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
encoding: 'utf8',
});
duplex.end('hello');
duplex.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, 'HELLO');
}));
duplex.on('end', common.mustCall());
duplex.on('close', common.mustCall(() => {
assert.strictEqual(transform.readable[kState].state, 'closed');
assert.strictEqual(transform.writable[kState].state, 'closed');
}));
}
{
const ec = new TextEncoder();
const dc = new TextDecoder();
const transform = new TransformStream({
transform: common.mustCall((chunk, controller) => {
const text = dc.decode(chunk);
controller.enqueue(ec.encode(text.toUpperCase()));
})
});
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
encoding: 'utf8',
});
finished(duplex, common.mustCall());
duplex.end('hello');
duplex.resume();
}
{
const ec = new TextEncoder();
const dc = new TextDecoder();
const transform = new TransformStream({
transform: common.mustCall((chunk, controller) => {
const text = dc.decode(chunk);
controller.enqueue(ec.encode(text.toUpperCase()));
})
});
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
encoding: 'utf8',
});
const readable = new Readable({
read() {
readable.push(Buffer.from('hello'));
readable.push(null);
}
});
const writable = new Writable({
write: common.mustCall((chunk, encoding, callback) => {
assert.strictEqual(dc.decode(chunk), 'HELLO');
assert.strictEqual(encoding, 'buffer');
callback();
})
});
finished(duplex, common.mustCall());
pipeline(readable, duplex, writable, common.mustCall());
}

View file

@ -0,0 +1,229 @@
// Flags: --expose-internals --no-warnings
'use strict';
const common = require('../common');
const assert = require('assert');
const {
pipeline,
finished,
Writable,
} = require('stream');
const {
ReadableStream,
WritableStream,
} = require('stream/web');
const {
newStreamReadableFromReadableStream,
} = require('internal/webstreams/adapters');
const {
kState,
} = require('internal/webstreams/util');
class MySource {
constructor(value = new Uint8Array(10)) {
this.value = value;
}
start(c) {
this.started = true;
this.controller = c;
}
pull(controller) {
controller.enqueue(this.value);
controller.close();
}
cancel(reason) {
this.canceled = true;
this.cancelReason = reason;
}
}
{
// Destroying the readable without an error closes
// the readableStream.
const readableStream = new ReadableStream();
const readable = newStreamReadableFromReadableStream(readableStream);
assert(readableStream.locked);
assert.rejects(readableStream.cancel(), {
code: 'ERR_INVALID_STATE',
});
assert.rejects(readableStream.pipeTo(new WritableStream()), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => readableStream.tee(), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => readableStream.getReader(), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => {
readableStream.pipeThrough({
readable: new ReadableStream(),
writable: new WritableStream(),
});
}, {
code: 'ERR_INVALID_STATE',
});
readable.destroy();
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'closed');
}));
}
{
// Destroying the readable with an error closes the readableStream
// without error but records the cancel reason in the source.
const error = new Error('boom');
const source = new MySource();
const readableStream = new ReadableStream(source);
const readable = newStreamReadableFromReadableStream(readableStream);
assert(readableStream.locked);
readable.destroy(error);
readable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'closed');
assert.strictEqual(source.cancelReason, error);
}));
}
{
// An error in the source causes the readable to error.
const error = new Error('boom');
const source = new MySource();
const readableStream = new ReadableStream(source);
const readable = newStreamReadableFromReadableStream(readableStream);
assert(readableStream.locked);
source.controller.error(error);
readable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
readable.on('close', common.mustCall(() => {
assert.strictEqual(readableStream[kState].state, 'errored');
}));
}
{
const readableStream = new ReadableStream(new MySource());
const readable = newStreamReadableFromReadableStream(readableStream);
readable.on('data', common.mustCall((chunk) => {
assert.deepStrictEqual(chunk, Buffer.alloc(10));
}));
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
readable.on('error', common.mustNotCall());
}
{
const readableStream = new ReadableStream(new MySource('hello'));
const readable = newStreamReadableFromReadableStream(readableStream, {
encoding: 'utf8',
});
readable.on('data', common.mustCall((chunk) => {
assert.deepStrictEqual(chunk, 'hello');
}));
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
readable.on('error', common.mustNotCall());
}
{
const readableStream = new ReadableStream(new MySource());
const readable = newStreamReadableFromReadableStream(readableStream, {
objectMode: true
});
readable.on('data', common.mustCall((chunk) => {
assert.deepStrictEqual(chunk, new Uint8Array(10));
}));
readable.on('end', common.mustCall());
readable.on('close', common.mustCall());
readable.on('error', common.mustNotCall());
}
{
const ec = new TextEncoder();
const readable = new ReadableStream({
start(controller) {
controller.enqueue(ec.encode('hello'));
setImmediate(() => {
controller.enqueue(ec.encode('there'));
controller.close();
});
}
});
const streamReadable = newStreamReadableFromReadableStream(readable);
finished(streamReadable, common.mustCall());
streamReadable.resume();
}
{
const ec = new TextEncoder();
const readable = new ReadableStream({
start(controller) {
controller.enqueue(ec.encode('hello'));
setImmediate(() => {
controller.enqueue(ec.encode('there'));
controller.close();
});
}
});
const streamReadable = newStreamReadableFromReadableStream(readable);
finished(streamReadable, common.mustCall());
streamReadable.resume();
}
{
const ec = new TextEncoder();
const dc = new TextDecoder();
const check = ['hello', 'there'];
const readable = new ReadableStream({
start(controller) {
controller.enqueue(ec.encode('hello'));
setImmediate(() => {
controller.enqueue(ec.encode('there'));
controller.close();
});
}
});
const writable = new Writable({
write: common.mustCall((chunk, encoding, callback) => {
assert.strictEqual(dc.decode(chunk), check.shift());
assert.strictEqual(encoding, 'buffer');
callback();
}, 2),
});
const streamReadable = newStreamReadableFromReadableStream(readable);
pipeline(streamReadable, writable, common.mustCall());
streamReadable.resume();
}

View file

@ -0,0 +1,231 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
WritableStream,
} = require('stream/web');
const {
newStreamWritableFromWritableStream,
} = require('internal/webstreams/adapters');
const {
finished,
pipeline,
Readable,
} = require('stream');
const {
kState,
} = require('internal/webstreams/util');
class TestSource {
constructor() {
this.chunks = [];
}
start(c) {
this.controller = c;
this.started = true;
}
write(chunk) {
this.chunks.push(chunk);
}
close() {
this.closed = true;
}
abort(reason) {
this.abortReason = reason;
}
}
[1, {}, false, []].forEach((arg) => {
assert.throws(() => newStreamWritableFromWritableStream(arg), {
code: 'ERR_INVALID_ARG_TYPE',
});
});
{
// Ending the stream.Writable should close the writableStream
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
assert(writableStream.locked);
writable.end('chunk');
writable.on('close', common.mustCall(() => {
assert(writableStream.locked);
assert.strictEqual(writableStream[kState].state, 'closed');
assert.strictEqual(source.chunks.length, 1);
assert.deepStrictEqual(source.chunks[0], Buffer.from('chunk'));
}));
}
{
// Destroying the stream.Writable without an error should close
// the writableStream with no error.
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
assert(writableStream.locked);
writable.destroy();
writable.on('close', common.mustCall(() => {
assert(writableStream.locked);
assert.strictEqual(writableStream[kState].state, 'closed');
assert.strictEqual(source.chunks.length, 0);
}));
}
{
// Destroying the stream.Writable with an error should error
// the writableStream
const error = new Error('boom');
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
assert(writableStream.locked);
writable.destroy(error);
writable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
writable.on('close', common.mustCall(() => {
assert(writableStream.locked);
assert.strictEqual(writableStream[kState].state, 'errored');
assert.strictEqual(writableStream[kState].storedError, error);
assert.strictEqual(source.chunks.length, 0);
}));
}
{
// Attempting to close, abort, or getWriter on writableStream
// should fail because it is locked. An internal error in
// writableStream should error the writable.
const error = new Error('boom');
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
assert(writableStream.locked);
assert.rejects(writableStream.close(), {
code: 'ERR_INVALID_STATE',
});
assert.rejects(writableStream.abort(), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => writableStream.getWriter(), {
code: 'ERR_INVALID_STATE',
});
writable.on('error', common.mustCall((reason) => {
assert.strictEqual(error, reason);
}));
source.controller.error(error);
}
{
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable = newStreamWritableFromWritableStream(writableStream);
writable.on('error', common.mustNotCall());
writable.on('finish', common.mustCall());
writable.on('close', common.mustCall(() => {
assert.strictEqual(source.chunks.length, 1);
assert.deepStrictEqual(source.chunks[0], Buffer.from('hello'));
}));
writable.write('hello', common.mustCall());
writable.end();
}
{
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable =
newStreamWritableFromWritableStream(writableStream, {
decodeStrings: false,
});
writable.on('error', common.mustNotCall());
writable.on('finish', common.mustCall());
writable.on('close', common.mustCall(() => {
assert.strictEqual(source.chunks.length, 1);
assert.deepStrictEqual(source.chunks[0], 'hello');
}));
writable.write('hello', common.mustCall());
writable.end();
}
{
const source = new TestSource();
const writableStream = new WritableStream(source);
const writable =
newStreamWritableFromWritableStream(
writableStream, {
objectMode: true
});
assert(writable.writableObjectMode);
writable.on('error', common.mustNotCall());
writable.on('finish', common.mustCall());
writable.on('close', common.mustCall(() => {
assert.strictEqual(source.chunks.length, 1);
assert.strictEqual(source.chunks[0], 'hello');
}));
writable.write('hello', common.mustCall());
writable.end();
}
{
const writableStream = new WritableStream({
write: common.mustCall(2),
close: common.mustCall(),
});
const writable = newStreamWritableFromWritableStream(writableStream);
finished(writable, common.mustCall());
writable.write('hello');
writable.write('world');
writable.end();
}
{
const writableStream = new WritableStream({
write: common.mustCall(2),
close: common.mustCall(),
});
const writable = newStreamWritableFromWritableStream(writableStream);
const readable = new Readable({
read() {
readable.push(Buffer.from('hello'));
readable.push(Buffer.from('world'));
readable.push(null);
}
});
pipeline(readable, writable, common.mustCall());
}

View file

@ -0,0 +1,167 @@
// Flags: --no-warnings --expose-internals
'use strict';
const common = require('../common');
const assert = require('assert');
const {
newWritableStreamFromStreamWritable,
} = require('internal/webstreams/adapters');
const {
Duplex,
Writable,
PassThrough,
} = require('stream');
class TestWritable extends Writable {
constructor(asyncWrite = false) {
super();
this.chunks = [];
this.asyncWrite = asyncWrite;
}
_write(chunk, encoding, callback) {
this.chunks.push({ chunk, encoding });
if (this.asyncWrite) {
setImmediate(() => callback());
return;
}
callback();
}
}
[1, {}, false, []].forEach((arg) => {
assert.throws(() => newWritableStreamFromStreamWritable(arg), {
code: 'ERR_INVALID_ARG_TYPE',
});
});
{
// Closing the WritableStream normally closes the stream.Writable
// without errors.
const writable = new TestWritable();
writable.on('error', common.mustNotCall());
writable.on('finish', common.mustCall());
writable.on('close', common.mustCall());
const writableStream = newWritableStreamFromStreamWritable(writable);
writableStream.close().then(common.mustCall(() => {
assert(writable.destroyed);
}));
}
{
// Aborting the WritableStream errors the stream.Writable
const error = new Error('boom');
const writable = new TestWritable();
writable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
writable.on('finish', common.mustNotCall());
writable.on('close', common.mustCall());
const writableStream = newWritableStreamFromStreamWritable(writable);
writableStream.abort(error).then(common.mustCall(() => {
assert(writable.destroyed);
}));
}
{
// Destroying the stream.Writable prematurely errors the
// WritableStream
const error = new Error('boom');
const writable = new TestWritable();
const writableStream = newWritableStreamFromStreamWritable(writable);
assert.rejects(writableStream.close(), error);
writable.destroy(error);
}
{
// Ending the stream.Writable directly errors the WritableStream
const writable = new TestWritable();
const writableStream = newWritableStreamFromStreamWritable(writable);
assert.rejects(writableStream.close(), {
code: 'ERR_STREAM_PREMATURE_CLOSE'
});
writable.end();
}
{
const writable = new TestWritable();
const writableStream = newWritableStreamFromStreamWritable(writable);
const writer = writableStream.getWriter();
const ec = new TextEncoder();
writer.write(ec.encode('hello')).then(common.mustCall(() => {
assert.strictEqual(writable.chunks.length, 1);
assert.deepStrictEqual(
writable.chunks[0],
{
chunk: Buffer.from('hello'),
encoding: 'buffer'
});
}));
}
{
const writable = new TestWritable(true);
writable.on('error', common.mustNotCall());
writable.on('close', common.mustCall());
writable.on('finish', common.mustCall());
const writableStream = newWritableStreamFromStreamWritable(writable);
const writer = writableStream.getWriter();
const ec = new TextEncoder();
writer.write(ec.encode('hello')).then(common.mustCall(() => {
assert.strictEqual(writable.chunks.length, 1);
assert.deepStrictEqual(
writable.chunks[0],
{
chunk: Buffer.from('hello'),
encoding: 'buffer'
});
writer.close().then(common.mustCall());
}));
}
{
const duplex = new PassThrough();
duplex.setEncoding('utf8');
const writableStream = newWritableStreamFromStreamWritable(duplex);
const ec = new TextEncoder();
writableStream
.getWriter()
.write(ec.encode('hello'))
.then(common.mustCall());
duplex.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, 'hello');
}));
}
{
const writable = new Writable();
writable.destroy();
const writableStream = newWritableStreamFromStreamWritable(writable);
const writer = writableStream.getWriter();
writer.closed.then(common.mustCall());
}
{
const duplex = new Duplex({ writable: false });
const writableStream = newWritableStreamFromStreamWritable(duplex);
const writer = writableStream.getWriter();
writer.closed.then(common.mustCall());
}