stream: improve tee perf by reduce ReflectConstruct usages

also added more webstream creation benchmarks

PR-URL: https://github.com/nodejs/node/pull/49546
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Raz Luvaton 2023-09-11 10:45:11 +03:00 committed by GitHub
parent b3fc9173ed
commit a7fe8b042a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 35 deletions

View file

@ -2,6 +2,8 @@
const common = require('../common.js');
const {
ReadableStream,
ReadableStreamDefaultReader,
ReadableStreamBYOBReader,
TransformStream,
WritableStream,
} = require('node:stream/web');
@ -9,40 +11,90 @@ const assert = require('assert');
const bench = common.createBenchmark(main, {
n: [50e3],
kind: ['ReadableStream', 'TransformStream', 'WritableStream'],
kind: [
'ReadableStream',
'TransformStream',
'WritableStream',
'ReadableStreamDefaultReader',
'ReadableStreamBYOBReader',
'ReadableStream.tee',
],
});
let rs, ws, ts;
let readableStream;
let transformStream;
let writableStream;
let readableStreamDefaultReader;
let readableStreamBYOBReader;
let teeResult;
function main({ n, kind }) {
switch (kind) {
case 'ReadableStream':
bench.start();
for (let i = 0; i < n; ++i)
rs = new ReadableStream();
readableStream = new ReadableStream();
bench.end(n);
// Avoid V8 deadcode (elimination)
assert.ok(rs);
assert.ok(readableStream);
break;
case 'WritableStream':
bench.start();
for (let i = 0; i < n; ++i)
ws = new WritableStream();
writableStream = new WritableStream();
bench.end(n);
// Avoid V8 deadcode (elimination)
assert.ok(ws);
assert.ok(writableStream);
break;
case 'TransformStream':
bench.start();
for (let i = 0; i < n; ++i)
ts = new TransformStream();
transformStream = new TransformStream();
bench.end(n);
// Avoid V8 deadcode (elimination)
assert.ok(ts);
assert.ok(transformStream);
break;
case 'ReadableStreamDefaultReader': {
const readers = Array.from({ length: n }, () => new ReadableStream());
bench.start();
for (let i = 0; i < n; ++i)
readableStreamDefaultReader = new ReadableStreamDefaultReader(readers[i]);
bench.end(n);
// Avoid V8 deadcode (elimination)
assert.ok(readableStreamDefaultReader);
break;
}
case 'ReadableStreamBYOBReader': {
const readers = Array.from({ length: n }, () => new ReadableStream({ type: 'bytes' }));
bench.start();
for (let i = 0; i < n; ++i)
readableStreamBYOBReader = new ReadableStreamBYOBReader(readers[i]);
bench.end(n);
// Avoid V8 deadcode (elimination)
assert.ok(readableStreamBYOBReader);
break;
}
case 'ReadableStream.tee': {
const streams = Array.from({ length: n }, () => new ReadableStream());
bench.start();
for (let i = 0; i < n; ++i)
teeResult = streams[i].tee();
bench.end(n);
// Avoid V8 deadcode (elimination)
assert.ok(teeResult);
break;
}
default:
throw new Error('Invalid kind');
}

View file

@ -1199,34 +1199,41 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name),
});
function TeeReadableStream(start, pull, cancel) {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port: undefined,
promise: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
this,
ObjectCreate(null, {
start: { __proto__: null, value: start },
pull: { __proto__: null, value: pull },
cancel: { __proto__: null, value: cancel },
}),
1,
() => 1);
}
ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(TeeReadableStream, ReadableStream);
function createTeeReadableStream(start, pull, cancel) {
return ReflectConstruct(
function() {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port: undefined,
promise: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
this,
ObjectCreate(null, {
start: { __proto__: null, value: start },
pull: { __proto__: null, value: pull },
cancel: { __proto__: null, value: cancel },
}),
1,
() => 1);
}, [], ReadableStream,
);
const tee = new TeeReadableStream(start, pull, cancel);
// For spec compliance the Tee must be a ReadableStream
tee.constructor = ReadableStream;
return tee;
}
const isReadableStream =