src, quic: refine more of the quic implementation

Signed-off-by: James M Snell <jasnell@gmail.com>
PR-URL: https://github.com/nodejs/node/pull/56328
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
This commit is contained in:
James M Snell 2024-11-24 14:57:24 -08:00
parent 72537f5631
commit 062ae6f3cb
51 changed files with 6723 additions and 3515 deletions

View file

@ -71,8 +71,8 @@ const {
} = require('internal/validators');
const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');
setImmediate,
} = require('timers');
const { queueMicrotask } = require('internal/process/task_queues');
@ -315,80 +315,7 @@ class Blob {
stream() {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');
const reader = this[kHandle].getReader();
return new lazyReadableStream({
type: 'bytes',
start(c) {
// There really should only be one read at a time so using an
// array here is purely defensive.
this.pendingPulls = [];
},
pull(c) {
const { promise, resolve, reject } = PromiseWithResolvers();
this.pendingPulls.push({ resolve, reject });
const readNext = () => {
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
// We can simply exit.
if (this.pendingPulls.length === 0) {
return;
}
if (status === 0) {
// EOS
c.close();
// This is to signal the end for byob readers
// see https://streams.spec.whatwg.org/#example-rbs-pull
c.byobRequest?.respond(0);
const pending = this.pendingPulls.shift();
pending.resolve();
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
const pending = this.pendingPulls.shift();
c.error(error);
pending.reject(error);
return;
}
// ReadableByteStreamController.enqueue errors if we submit a 0-length
// buffer. We need to check for that here.
if (buffer !== undefined && buffer.byteLength !== 0) {
c.enqueue(new Uint8Array(buffer));
}
// We keep reading until we either reach EOS, some error, or we
// hit the flow rate of the stream (c.desiredSize).
queueMicrotask(() => {
if (c.desiredSize < 0) {
// A manual backpressure check.
if (this.pendingPulls.length !== 0) {
// A case of waiting pull finished (= not yet canceled)
const pending = this.pendingPulls.shift();
pending.resolve();
}
return;
}
readNext();
});
});
};
readNext();
return promise;
},
cancel(reason) {
// Reject any currently pending pulls here.
for (const pending of this.pendingPulls) {
pending.reject(reason);
}
this.pendingPulls = [];
},
// We set the highWaterMark to 0 because we do not want the stream to
// start reading immediately on creation. We want it to wait until read
// is called.
}, new CountQueuingStrategy({ highWaterMark: 0 }));
return createBlobReaderStream(this[kHandle].getReader());
}
}
@ -505,6 +432,84 @@ function arrayBuffer(blob) {
return promise;
}
function createBlobReaderStream(reader) {
return new lazyReadableStream({
type: 'bytes',
start(c) {
// There really should only be one read at a time so using an
// array here is purely defensive.
this.pendingPulls = [];
},
pull(c) {
const { promise, resolve, reject } = PromiseWithResolvers();
this.pendingPulls.push({ resolve, reject });
const readNext = () => {
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
// We can simply exit.
if (this.pendingPulls.length === 0) {
return;
}
if (status === 0) {
// EOS
c.close();
// This is to signal the end for byob readers
// see https://streams.spec.whatwg.org/#example-rbs-pull
c.byobRequest?.respond(0);
const pending = this.pendingPulls.shift();
pending.resolve();
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
const pending = this.pendingPulls.shift();
c.error(error);
pending.reject(error);
return;
}
// ReadableByteStreamController.enqueue errors if we submit a 0-length
// buffer. We need to check for that here.
if (buffer !== undefined && buffer.byteLength !== 0) {
c.enqueue(new Uint8Array(buffer));
}
// We keep reading until we either reach EOS, some error, or we
// hit the flow rate of the stream (c.desiredSize).
// We use set immediate here because we have to allow the event
// loop to turn in order to proecss any pending i/o. Using
// queueMicrotask won't allow the event loop to turn.
setImmediate(() => {
if (c.desiredSize < 0) {
// A manual backpressure check.
if (this.pendingPulls.length !== 0) {
// A case of waiting pull finished (= not yet canceled)
const pending = this.pendingPulls.shift();
pending.resolve();
}
return;
}
readNext();
});
});
};
readNext();
return promise;
},
cancel(reason) {
// Reject any currently pending pulls here.
for (const pending of this.pendingPulls) {
pending.reject(reason);
}
this.pendingPulls = [];
},
// We set the highWaterMark to 0 because we do not want the stream to
// start reading immediately on creation. We want it to wait until read
// is called.
}, { highWaterMark: 0 });
}
module.exports = {
Blob,
createBlob,
@ -513,4 +518,5 @@ module.exports = {
kHandle,
resolveObjectURL,
TransferableBlob,
createBlobReaderStream,
};