mirror of
https://github.com/nodejs/node.git
synced 2025-08-15 13:48:44 +02:00

As a first step to porting portions of the pino structured logger into the runtime, this commit ports the SonicBoom module to the fs module as Utf8Stream. This is a faithful port of the SonicBoom module with some modern updates, such as converting to a Class and using Symbol.dispose. The bulk of the implementation is unchanged from the original. PR-URL: https://github.com/nodejs/node/pull/58897 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Vinícius Lourenço Claro Cardoso <contact@viniciusl.com.br>
919 lines
24 KiB
JavaScript
919 lines
24 KiB
JavaScript
'use strict';
|
|
|
|
// This file is derived from the original SonicBoom module
|
|
// MIT License
|
|
// Copyright (c) 2017 Matteo Collina
|
|
|
|
const {
|
|
ArrayPrototypePush,
|
|
AtomicsWait,
|
|
Int32Array,
|
|
MathMax,
|
|
SymbolDispose,
|
|
globalThis: {
|
|
Number,
|
|
SharedArrayBuffer,
|
|
},
|
|
} = primordials;
|
|
|
|
const {
|
|
Buffer,
|
|
} = require('buffer');
|
|
|
|
const fs = require('fs');
|
|
const EventEmitter = require('events');
|
|
const path = require('path');
|
|
const {
|
|
clearInterval,
|
|
setInterval,
|
|
setTimeout,
|
|
} = require('timers');
|
|
const {
|
|
codes: {
|
|
ERR_INVALID_ARG_TYPE,
|
|
ERR_INVALID_ARG_VALUE,
|
|
ERR_INVALID_STATE,
|
|
ERR_OPERATION_FAILED,
|
|
},
|
|
} = require('internal/errors');
|
|
|
|
const {
|
|
validateBoolean,
|
|
validateFunction,
|
|
validateObject,
|
|
validateOneOf,
|
|
validateString,
|
|
validateUint32,
|
|
} = require('internal/validators');
|
|
|
|
const BUSY_WRITE_TIMEOUT = 100;
|
|
const kEmptyBuffer = Buffer.allocUnsafe(0);
|
|
|
|
const kNil = new Int32Array(new SharedArrayBuffer(4));
|
|
|
|
function sleep(ms) {
|
|
// Also filters out NaN, non-number types, including empty strings, but allows bigints
|
|
const valid = ms > 0 && ms < Infinity;
|
|
if (valid === false) {
|
|
if (typeof ms !== 'number' && typeof ms !== 'bigint') {
|
|
throw new ERR_INVALID_ARG_TYPE('ms', ['number', 'bigint'], ms);
|
|
}
|
|
throw new ERR_INVALID_ARG_VALUE.RangeError('ms', ms,
|
|
'must be a number greater than 0 and less than Infinity');
|
|
}
|
|
|
|
AtomicsWait(kNil, 0, 0, Number(ms));
|
|
}
|
|
|
|
// 16 KB. Don't write more than docker buffer size.
|
|
// https://github.com/moby/moby/blob/513ec73831269947d38a644c278ce3cac36783b2/daemon/logger/copier.go#L13
|
|
const kMaxWrite = 16 * 1024;
|
|
const kContentModeBuffer = 'buffer';
|
|
const kContentModeUtf8 = 'utf8';
|
|
const kNullPrototype = { __proto__: null };
|
|
|
|
// Utf8Stream is a port of the original SonicBoom module
|
|
// (https://github.com/pinojs/sonic-boom) that provides a fast and efficient
|
|
// way to write UTF-8 encoded data to a file or stream.
|
|
class Utf8Stream extends EventEmitter {
|
|
#len = 0;
|
|
#fd = -1;
|
|
#bufs = [];
|
|
#lens = [];
|
|
#writing = false;
|
|
#ending = false;
|
|
#reopening = false;
|
|
#asyncDrainScheduled = false;
|
|
#flushPending = false;
|
|
#hwm = 16387; // 16 KB
|
|
#file = null;
|
|
#destroyed = false;
|
|
#minLength = 0;
|
|
#maxLength = 0;
|
|
#maxWrite = kMaxWrite;
|
|
#opening = false;
|
|
#periodicFlush = 0;
|
|
#periodicFlushTimer = undefined;
|
|
#sync = false;
|
|
#fsync = false;
|
|
#append = true;
|
|
#mode = 0o666;
|
|
#retryEAGAIN = () => true;
|
|
#mkdir = false;
|
|
#writingBuf = '';
|
|
#write;
|
|
#flush;
|
|
#flushSync;
|
|
#actualWrite;
|
|
#fsWriteSync;
|
|
#fsWrite;
|
|
#fs;
|
|
|
|
/**
|
|
* @typedef {object} Utf8StreamOptions
|
|
* @property {string} [dest] - path to the file to write to
|
|
* @property {number} [fd] - file descriptor to write to
|
|
* @property {number} [minLength] - minimum length of the internal buffer before a write is triggered
|
|
* @property {number} [maxLength] - maximum length of the internal buffer before writes are dropped
|
|
* @property {number} [maxWrite] - maximum size of a single write operation (default 16384)
|
|
* @property {number} [periodicFlush] - interval in ms to flush the stream periodically (default 0, disabled)
|
|
* @property {boolean} [sync] - if true, writes are performed synchronously (default false)
|
|
* @property {boolean} [fsync] - if true, fsync is called after every write (default false)
|
|
* @property {boolean} [append] - if true, data is appended to the file (default true, ignored
|
|
* if fd is provided)
|
|
* @property {string} [contentMode] - 'utf8' or 'buffer'
|
|
* @property {string} [mode] - file mode (permission and sticky bits), default is 0o666
|
|
* @property {boolean} [mkdir] - if true, the directory path will be created if it does not exist (default false)
|
|
* @property {Function} [retryEAGAIN] - function that receives (err, writingBufLength,
|
|
* remainingBufLength) and returns true if EAGAIN/EBUSY should be retried
|
|
* @property {object} [fs] - custom fs implementation, must provide write, writeSync, fsync,
|
|
* fsyncSync, close, open, mkdir, mkdirSync methods. Mostly useful for testing.
|
|
* @param {Utf8StreamOptions} [options]
|
|
*/
|
|
constructor(options = kNullPrototype) {
|
|
validateObject(options, 'options');
|
|
let {
|
|
fd,
|
|
} = options;
|
|
const {
|
|
dest,
|
|
minLength,
|
|
maxLength,
|
|
maxWrite,
|
|
periodicFlush,
|
|
sync,
|
|
append = true,
|
|
mkdir,
|
|
retryEAGAIN,
|
|
fsync,
|
|
contentMode = kContentModeUtf8,
|
|
mode,
|
|
// Provides for a custom fs implementation. Mostly useful for testing.
|
|
fs: overrideFs = {},
|
|
} = options;
|
|
|
|
super();
|
|
|
|
fd ??= dest;
|
|
|
|
validateObject(overrideFs, 'options.fs');
|
|
this.#fs = { ...fs, ...overrideFs };
|
|
validateFunction(this.#fs.write, 'options.fs.write');
|
|
validateFunction(this.#fs.writeSync, 'options.fs.writeSync');
|
|
validateFunction(this.#fs.fsync, 'options.fs.fsync');
|
|
validateFunction(this.#fs.fsyncSync, 'options.fs.fsyncSync');
|
|
validateFunction(this.#fs.close, 'options.fs.close');
|
|
validateFunction(this.#fs.open, 'options.fs.open');
|
|
validateFunction(this.#fs.mkdir, 'options.fs.mkdir');
|
|
validateFunction(this.#fs.mkdirSync, 'options.fs.mkdirSync');
|
|
|
|
this.#hwm = MathMax(minLength || 0, this.#hwm);
|
|
this.#minLength = minLength || 0;
|
|
this.#maxLength = maxLength || 0;
|
|
this.#maxWrite = maxWrite || kMaxWrite;
|
|
this.#periodicFlush = periodicFlush || 0;
|
|
this.#sync = sync || false;
|
|
this.#fsync = fsync || false;
|
|
this.#append = append || false;
|
|
this.#mode = mode;
|
|
this.#retryEAGAIN = retryEAGAIN || (() => true);
|
|
this.#mkdir = mkdir || false;
|
|
|
|
validateUint32(this.#hwm, 'options.hwm');
|
|
validateUint32(this.#minLength, 'options.minLength');
|
|
validateUint32(this.#maxLength, 'options.maxLength');
|
|
validateUint32(this.#maxWrite, 'options.maxWrite');
|
|
validateUint32(this.#periodicFlush, 'options.periodicFlush');
|
|
validateBoolean(this.#sync, 'options.sync');
|
|
validateBoolean(this.#fsync, 'options.fsync');
|
|
validateBoolean(this.#append, 'options.append');
|
|
validateBoolean(this.#mkdir, 'options.mkdir');
|
|
validateFunction(this.#retryEAGAIN, 'options.retryEAGAIN');
|
|
validateOneOf(contentMode, 'options.contentMode', [kContentModeBuffer, kContentModeUtf8]);
|
|
|
|
if (contentMode === kContentModeBuffer) {
|
|
this.#writingBuf = kEmptyBuffer;
|
|
this.#write = (...args) => this.#writeBuffer(...args);
|
|
this.#flush = (...args) => this.#flushBuffer(...args);
|
|
this.#flushSync = (...args) => this.#flushBufferSync(...args);
|
|
this.#actualWrite = (...args) => this.#actualWriteBuffer(...args);
|
|
this.#fsWriteSync = () => this.#fs.writeSync(this.#fd, this.#writingBuf);
|
|
this.#fsWrite = () => this.#fs.write(this.#fd, this.#writingBuf, (...args) => {
|
|
return this.#release(...args);
|
|
});
|
|
} else {
|
|
this.#writingBuf = '';
|
|
this.#write = (...args) => this.#writeUtf8(...args);
|
|
this.#flush = (...args) => this.#flushUtf8(...args);
|
|
this.#flushSync = (...args) => this.#flushSyncUtf8(...args);
|
|
this.#actualWrite = (...args) => this.#actualWriteUtf8(...args);
|
|
this.#fsWriteSync = () => this.#fs.writeSync(this.#fd, this.#writingBuf, 'utf8');
|
|
this.#fsWrite = () => this.#fs.write(this.#fd, this.#writingBuf, 'utf8', (...args) => {
|
|
return this.#release(...args);
|
|
});
|
|
}
|
|
|
|
// TODO(@jasnell): Support passing in an AbortSignal to cancel the stream.
|
|
|
|
// TODO(@jasnell): Support FileHandle here as well?
|
|
|
|
// TODO(@jasnell): The `dest` option is a path string. We may consider
|
|
// also supporting URL and Buffer types here as well like the rest of
|
|
// the fs module APIs do.
|
|
|
|
if (typeof fd === 'number') {
|
|
this.#fd = fd;
|
|
process.nextTick(() => this.emit('ready'));
|
|
} else if (typeof fd === 'string') {
|
|
this.#openFile(fd);
|
|
} else {
|
|
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'string'], fd);
|
|
}
|
|
if (this.#minLength >= this.#maxWrite) {
|
|
throw new ERR_INVALID_ARG_VALUE.RangeError('minLength', this.#minLength,
|
|
`should be smaller than maxWrite (${this.#maxWrite})`);
|
|
}
|
|
|
|
this.on('newListener', (name) => {
|
|
if (name === 'drain') {
|
|
this._asyncDrainScheduled = false;
|
|
}
|
|
});
|
|
|
|
if (this.#periodicFlush !== 0) {
|
|
this.#periodicFlushTimer = setInterval(() => this.flush(null), this.#periodicFlush);
|
|
this.#periodicFlushTimer.unref();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {string|Buffer} data
|
|
* @returns {boolean}
|
|
*/
|
|
write(data) {
|
|
return this.#write(data);
|
|
}
|
|
|
|
/**
|
|
* @callback FlushCallback
|
|
* @param {Error} [err] - Error if any
|
|
* @returns {void}
|
|
*/
|
|
|
|
/**
|
|
* @param {FlushCallback} [cb]
|
|
*/
|
|
flush(cb = function(_err) {}) { this.#flush(cb); }
|
|
|
|
flushSync() { return this.#flushSync(); }
|
|
|
|
/**
|
|
* @param {string} [file]
|
|
*/
|
|
reopen(file) {
|
|
if (this.#destroyed) {
|
|
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
|
|
}
|
|
|
|
if (this.#opening) {
|
|
this.once('ready', () => this.reopen(file));
|
|
return;
|
|
}
|
|
|
|
if (this.#ending) {
|
|
return;
|
|
}
|
|
|
|
if (!this.#file) {
|
|
throw new ERR_OPERATION_FAILED(
|
|
'Unable to reopen a file descriptor, you must pass a file to SonicBoom');
|
|
}
|
|
|
|
if (file) {
|
|
this.#file = file;
|
|
}
|
|
this.#reopening = true;
|
|
|
|
if (this.#writing) {
|
|
return;
|
|
}
|
|
|
|
const fd = this.#fd;
|
|
this.once('ready', () => {
|
|
if (fd !== this.#fd) {
|
|
this.#fs.close(fd, (err) => {
|
|
if (err) {
|
|
return this.emit('error', err);
|
|
}
|
|
});
|
|
}
|
|
});
|
|
|
|
this.#openFile(this.#file);
|
|
}
|
|
|
|
end() {
|
|
if (this.#destroyed) {
|
|
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
|
|
}
|
|
|
|
if (this.#opening) {
|
|
this.once('ready', () => {
|
|
this.end();
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (this.#ending) {
|
|
return;
|
|
}
|
|
|
|
this.#ending = true;
|
|
|
|
if (this.#writing) {
|
|
return;
|
|
}
|
|
|
|
if (this.#len > 0 && this.#fd >= 0) {
|
|
this.#actualWrite();
|
|
} else {
|
|
this.#actualClose();
|
|
}
|
|
}
|
|
|
|
destroy() {
|
|
if (this.#destroyed) {
|
|
return;
|
|
}
|
|
this.#actualClose();
|
|
}
|
|
|
|
/** @type {number} */
|
|
get mode() { return this.#mode; }
|
|
|
|
/** @type {string|undefined} */
|
|
get file() { return this.#file; }
|
|
|
|
/** @type {number} */
|
|
get fd() { return this.#fd; }
|
|
|
|
/** @type {number} */
|
|
get minLength() { return this.#minLength; }
|
|
|
|
/** @type {number} */
|
|
get maxLength() { return this.#maxLength; }
|
|
|
|
/** @type {boolean} */
|
|
get writing() { return this.#writing; }
|
|
|
|
/** @type {boolean} */
|
|
get sync() { return this.#sync; }
|
|
|
|
/** @type {boolean} */
|
|
get fsync() { return this.#fsync; }
|
|
|
|
/** @type {boolean} */
|
|
get append() { return this.#append; }
|
|
|
|
/** @type {number} */
|
|
get periodicFlush() { return this.#periodicFlush; }
|
|
|
|
/** @type {'buffer'|'utf8'} */
|
|
get contentMode() {
|
|
return this.#writingBuf instanceof Buffer ? kContentModeBuffer : kContentModeUtf8;
|
|
}
|
|
|
|
/** @type {boolean} */
|
|
get mkdir() { return this.#mkdir; }
|
|
|
|
[SymbolDispose]() { this.destroy(); }
|
|
|
|
#release(err, n) {
|
|
if (err) {
|
|
if ((err.code === 'EAGAIN' || err.code === 'EBUSY') &&
|
|
this.#retryEAGAIN(err, this.#writingBuf.length, this.#len - this.#writingBuf.length)) {
|
|
if (this.#sync) {
|
|
// This error code should not happen in sync mode, because it is
|
|
// not using the underlining operating system asynchronous functions.
|
|
// However it happens, and so we handle it.
|
|
// Ref: https://github.com/pinojs/pino/issues/783
|
|
try {
|
|
sleep(BUSY_WRITE_TIMEOUT);
|
|
this.#release(undefined, 0);
|
|
} catch (err) {
|
|
this.#release(err);
|
|
}
|
|
} else {
|
|
// Let's give the destination some time to process the chunk.
|
|
setTimeout(() => this.#fsWrite(), BUSY_WRITE_TIMEOUT);
|
|
}
|
|
} else {
|
|
this.#writing = false;
|
|
|
|
this.emit('error', err);
|
|
}
|
|
return;
|
|
}
|
|
|
|
this.emit('write', n);
|
|
const releasedBufObj = releaseWritingBuf(this.#writingBuf, this.#len, n);
|
|
this.#len = releasedBufObj.len;
|
|
this.#writingBuf = releasedBufObj.writingBuf;
|
|
|
|
if (this.#writingBuf.length) {
|
|
if (!this.#sync) {
|
|
this.#fsWrite();
|
|
return;
|
|
}
|
|
|
|
try {
|
|
do {
|
|
const n = this.#fsWriteSync();
|
|
const releasedBufObj = releaseWritingBuf(this.#writingBuf, this.#len, n);
|
|
this.#len = releasedBufObj.len;
|
|
this.#writingBuf = releasedBufObj.writingBuf;
|
|
} while (this.#writingBuf.length);
|
|
} catch (err) {
|
|
this.#release(err);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (this.#fsync) {
|
|
this.#fs.fsyncSync(this.#fd);
|
|
}
|
|
|
|
const len = this.#len;
|
|
if (this.#reopening) {
|
|
this.#writing = false;
|
|
this.#reopening = false;
|
|
this.reopen();
|
|
} else if (len > this.#minLength) {
|
|
this.#actualWrite();
|
|
} else if (this.#ending) {
|
|
if (len > 0) {
|
|
this.#actualWrite();
|
|
} else {
|
|
this.#writing = false;
|
|
this.#actualClose();
|
|
}
|
|
} else {
|
|
this.#writing = false;
|
|
if (this.#sync) {
|
|
if (!this.#asyncDrainScheduled) {
|
|
this.#asyncDrainScheduled = true;
|
|
process.nextTick(() => this.#emitDrain());
|
|
}
|
|
} else {
|
|
this.emit('drain');
|
|
}
|
|
}
|
|
}
|
|
|
|
#openFile(file) {
|
|
this.#opening = true;
|
|
this.#writing = true;
|
|
this.#asyncDrainScheduled = false;
|
|
|
|
// NOTE: 'error' and 'ready' events emitted below only relevant when sonic.sync===false
|
|
// for sync mode, there is no way to add a listener that will receive these
|
|
|
|
const fileOpened = (err, fd) => {
|
|
if (err) {
|
|
this.#reopening = false;
|
|
this.#writing = false;
|
|
this.#opening = false;
|
|
|
|
if (this.#sync) {
|
|
process.nextTick(() => {
|
|
if (this.listenerCount('error') > 0) {
|
|
this.emit('error', err);
|
|
}
|
|
});
|
|
} else {
|
|
this.emit('error', err);
|
|
}
|
|
return;
|
|
}
|
|
|
|
const reopening = this.#reopening;
|
|
|
|
this.#fd = fd;
|
|
this.#file = file;
|
|
this.#reopening = false;
|
|
this.#opening = false;
|
|
this.#writing = false;
|
|
|
|
if (this.#sync) {
|
|
process.nextTick(() => this.emit('ready'));
|
|
} else {
|
|
this.emit('ready');
|
|
}
|
|
|
|
if (this.#destroyed) {
|
|
return;
|
|
}
|
|
|
|
// start
|
|
if ((!this.#writing && this.#len > this.#minLength) || this.#flushPending) {
|
|
this.#actualWrite();
|
|
} else if (reopening) {
|
|
process.nextTick(() => this.emit('drain'));
|
|
}
|
|
};
|
|
|
|
const flags = this.#append ? 'a' : 'w';
|
|
const mode = this.#mode;
|
|
|
|
if (this.#sync) {
|
|
try {
|
|
if (this.#mkdir) this.#fs.mkdirSync(path.dirname(file), { recursive: true });
|
|
const fd = this.#fs.openSync(file, flags, mode);
|
|
fileOpened(null, fd);
|
|
} catch (err) {
|
|
fileOpened(err);
|
|
throw err;
|
|
}
|
|
} else if (this.#mkdir) {
|
|
this.#fs.mkdir(path.dirname(file), { recursive: true }, (err) => {
|
|
if (err) return fileOpened(err);
|
|
this.#fs.open(file, flags, mode, fileOpened);
|
|
});
|
|
} else {
|
|
this.#fs.open(file, flags, mode, fileOpened);
|
|
}
|
|
}
|
|
|
|
#emitDrain() {
|
|
const hasListeners = this.listenerCount('drain') > 0;
|
|
if (!hasListeners) return;
|
|
this.#asyncDrainScheduled = false;
|
|
this.emit('drain');
|
|
}
|
|
|
|
#actualClose() {
|
|
if (this.#fd === -1) {
|
|
this.once('ready', () => this.#actualClose());
|
|
return;
|
|
}
|
|
|
|
if (this.#periodicFlushTimer !== undefined) {
|
|
clearInterval(this.#periodicFlushTimer);
|
|
}
|
|
|
|
this.#destroyed = true;
|
|
this.#bufs = [];
|
|
this.#lens = [];
|
|
|
|
const done = (err) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return;
|
|
}
|
|
|
|
if (this.#ending && !this.#writing) {
|
|
this.emit('finish');
|
|
}
|
|
this.emit('close');
|
|
};
|
|
|
|
const closeWrapped = () => {
|
|
// We skip errors in fsync
|
|
if (this.#fd !== 1 && this.#fd !== 2) {
|
|
this.#fs.close(this.#fd, done);
|
|
} else {
|
|
done();
|
|
}
|
|
};
|
|
|
|
try {
|
|
this.#fs.fsync(this.#fd, closeWrapped);
|
|
} catch {
|
|
// Intentionally empty.
|
|
}
|
|
}
|
|
|
|
#actualWriteBuffer() {
|
|
this.#writing = true;
|
|
this.#writingBuf = this.#writingBuf.length ? this.#writingBuf : mergeBuf(this.#bufs.shift(), this.#lens.shift());
|
|
|
|
if (this.#sync) {
|
|
try {
|
|
const written = this.#fs.writeSync(this.#fd, this.#writingBuf);
|
|
this.#release(null, written);
|
|
} catch (err) {
|
|
this.#release(err);
|
|
}
|
|
} else {
|
|
// fs.write will need to copy string to buffer anyway so
|
|
// we do it here to avoid the overhead of calculating the buffer size
|
|
// in releaseWritingBuf.
|
|
this.#writingBuf = Buffer.from(this.#writingBuf);
|
|
this.#fs.write(this.#fd, this.#writingBuf, (...args) => this.#release(...args));
|
|
}
|
|
}
|
|
|
|
#actualWriteUtf8() {
|
|
this.#writing = true;
|
|
this.#writingBuf ||= this.#bufs.shift() || '';
|
|
|
|
if (this.#sync) {
|
|
try {
|
|
const written = this.#fs.writeSync(this.#fd, this.#writingBuf, 'utf8');
|
|
this.#release(null, written);
|
|
} catch (err) {
|
|
this.#release(err);
|
|
}
|
|
} else {
|
|
this.#fs.write(this.#fd, this.#writingBuf, 'utf8', (...args) => this.#release(...args));
|
|
}
|
|
}
|
|
|
|
#flushBufferSync() {
|
|
if (this.#destroyed) {
|
|
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
|
|
}
|
|
|
|
if (this.#fd < 0) {
|
|
throw new ERR_INVALID_STATE('Invalid file descriptor');
|
|
}
|
|
|
|
if (!this.#writing && this.#writingBuf.length > 0) {
|
|
this.#bufs.unshift([this.#writingBuf]);
|
|
this.#writingBuf = kEmptyBuffer;
|
|
}
|
|
|
|
let buf = kEmptyBuffer;
|
|
while (this.#bufs.length || buf.length) {
|
|
if (buf.length <= 0) {
|
|
buf = mergeBuf(this.#bufs[0], this.#lens[0]);
|
|
}
|
|
try {
|
|
const n = this.#fs.writeSync(this.#fd, buf);
|
|
buf = buf.subarray(n);
|
|
this.#len = MathMax(this.#len - n, 0);
|
|
if (buf.length <= 0) {
|
|
this.#bufs.shift();
|
|
this.#lens.shift();
|
|
}
|
|
} catch (err) {
|
|
const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY';
|
|
if (shouldRetry && !this.#retryEAGAIN(err, buf.length, this.#len - buf.length)) {
|
|
throw err;
|
|
}
|
|
|
|
sleep(BUSY_WRITE_TIMEOUT);
|
|
}
|
|
}
|
|
}
|
|
|
|
#flushSyncUtf8() {
|
|
if (this.#destroyed) {
|
|
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
|
|
}
|
|
|
|
if (this.#fd < 0) {
|
|
throw new ERR_INVALID_STATE('Invalid file descriptor');
|
|
}
|
|
|
|
if (!this.#writing && this.#writingBuf.length > 0) {
|
|
this.#bufs.unshift(this.#writingBuf);
|
|
this.#writingBuf = '';
|
|
}
|
|
|
|
let buf = '';
|
|
while (this.#bufs.length || buf) {
|
|
if (buf.length <= 0) {
|
|
buf = this.#bufs[0];
|
|
}
|
|
try {
|
|
const n = this.#fs.writeSync(this.#fd, buf, 'utf8');
|
|
const releasedBufObj = releaseWritingBuf(buf, this.#len, n);
|
|
buf = releasedBufObj.writingBuf;
|
|
this.#len = releasedBufObj.len;
|
|
if (buf.length <= 0) {
|
|
this.#bufs.shift();
|
|
}
|
|
} catch (err) {
|
|
const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY';
|
|
if (shouldRetry && !this.#retryEAGAIN(err, buf.length, this.#len - buf.length)) {
|
|
throw err;
|
|
}
|
|
|
|
sleep(BUSY_WRITE_TIMEOUT);
|
|
}
|
|
}
|
|
|
|
try {
|
|
this.#fs.fsyncSync(this.#fd);
|
|
} catch {
|
|
// Skip the error. The fd might not support fsync.
|
|
}
|
|
}
|
|
|
|
#callFlushCallbackOnDrain(cb) {
|
|
this.#flushPending = true;
|
|
const onDrain = () => {
|
|
// Only if _fsync is false to avoid double fsync
|
|
if (!this.#fsync && !this.#destroyed) {
|
|
try {
|
|
this.#fs.fsync(this.#fd, (err) => {
|
|
this.#flushPending = false;
|
|
// If the fd is closed, we ignore the error.
|
|
if (err?.code === 'EBADF') {
|
|
cb();
|
|
return;
|
|
}
|
|
cb(err);
|
|
});
|
|
} catch (err) {
|
|
this.#flushPending = false;
|
|
cb(err);
|
|
}
|
|
} else {
|
|
this.#flushPending = false;
|
|
cb();
|
|
}
|
|
this.off('error', onError);
|
|
};
|
|
const onError = (err) => {
|
|
this.#flushPending = false;
|
|
cb(err);
|
|
this.off('drain', onDrain);
|
|
};
|
|
|
|
this.once('drain', onDrain);
|
|
this.once('error', onError);
|
|
}
|
|
|
|
#flushBuffer(cb) {
|
|
validateFunction(cb, 'cb');
|
|
|
|
if (this.#destroyed) {
|
|
const error = new ERR_INVALID_STATE('Utf8Stream is destroyed');
|
|
if (cb) {
|
|
cb(error);
|
|
return;
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
|
|
if (this.#minLength <= 0) {
|
|
cb?.();
|
|
return;
|
|
}
|
|
|
|
if (cb) {
|
|
this.#callFlushCallbackOnDrain(cb);
|
|
}
|
|
|
|
if (this.#writing) {
|
|
return;
|
|
}
|
|
|
|
if (this.#bufs.length === 0) {
|
|
ArrayPrototypePush(this.#bufs, []);
|
|
ArrayPrototypePush(this.#lens, 0);
|
|
}
|
|
|
|
this.#actualWrite();
|
|
}
|
|
|
|
#flushUtf8(cb) {
|
|
validateFunction(cb, 'cb');
|
|
|
|
if (this.#destroyed) {
|
|
const error = new ERR_INVALID_STATE('Utf8Stream is destroyed');
|
|
if (cb) {
|
|
cb(error);
|
|
return;
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
|
|
if (this.#minLength <= 0) {
|
|
cb?.();
|
|
return;
|
|
}
|
|
|
|
if (cb) {
|
|
this.#callFlushCallbackOnDrain(cb);
|
|
}
|
|
|
|
if (this.#writing) {
|
|
return;
|
|
}
|
|
|
|
if (this.#bufs.length === 0) {
|
|
ArrayPrototypePush(this.#bufs, '');
|
|
}
|
|
|
|
this.#actualWrite();
|
|
}
|
|
|
|
#writeBuffer(data) {
|
|
if (this.#destroyed) {
|
|
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
|
|
}
|
|
|
|
// TODO(@jasnell): Support any ArrayBufferView type here, not just Buffer.
|
|
if (!Buffer.isBuffer(data)) {
|
|
throw new ERR_INVALID_ARG_TYPE('data', 'Buffer', data);
|
|
}
|
|
|
|
const len = this.#len + data.length;
|
|
const bufs = this.#bufs;
|
|
const lens = this.#lens;
|
|
|
|
if (this.#maxLength && len > this.#maxLength) {
|
|
this.emit('drop', data);
|
|
return this.#len < this.#hwm;
|
|
}
|
|
|
|
if (
|
|
bufs.length === 0 ||
|
|
lens[lens.length - 1] + data.length > this.#maxWrite
|
|
) {
|
|
ArrayPrototypePush(bufs, []);
|
|
ArrayPrototypePush(lens, data.length);
|
|
} else {
|
|
ArrayPrototypePush(bufs[bufs.length - 1], data);
|
|
lens[lens.length - 1] += data.length;
|
|
}
|
|
|
|
this.#len = len;
|
|
|
|
if (!this.#writing && this.#len >= this.#minLength) {
|
|
this.#actualWrite();
|
|
}
|
|
|
|
return this.#len < this.#hwm;
|
|
}
|
|
|
|
#writeUtf8(data) {
|
|
if (this.#destroyed) {
|
|
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
|
|
}
|
|
validateString(data, 'data');
|
|
|
|
const len = this.#len + data.length;
|
|
const bufs = this.#bufs;
|
|
|
|
if (this.#maxLength && len > this.#maxLength) {
|
|
this.emit('drop', data);
|
|
return this.#len < this.#hwm;
|
|
}
|
|
|
|
if (
|
|
bufs.length === 0 ||
|
|
bufs[bufs.length - 1].length + data.length > this.#maxWrite
|
|
) {
|
|
ArrayPrototypePush(bufs, '' + data);
|
|
} else {
|
|
bufs[bufs.length - 1] += data;
|
|
}
|
|
|
|
this.#len = len;
|
|
|
|
if (!this.#writing && this.#len >= this.#minLength) {
|
|
this.#actualWrite();
|
|
}
|
|
|
|
return this.#len < this.#hwm;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Release the writingBuf after fs.write n bytes data
|
|
* @param {string | Buffer} writingBuf - currently writing buffer, usually be instance._writingBuf.
|
|
* @param {number} len - currently buffer length, usually be instance._len.
|
|
* @param {number} n - number of bytes fs already written
|
|
* @returns {{writingBuf: string | Buffer, len: number}} released writingBuf and length
|
|
*/
|
|
function releaseWritingBuf(writingBuf, len, n) {
|
|
// if Buffer.byteLength is equal to n, that means writingBuf contains no multi-byte character
|
|
if (typeof writingBuf === 'string' && Buffer.byteLength(writingBuf) !== n) {
|
|
// Since the fs.write callback parameter `n` means how many bytes the passed of string
|
|
// We calculate the original string length for avoiding the multi-byte character issue
|
|
n = Buffer.from(writingBuf).subarray(0, n).toString().length;
|
|
}
|
|
len = MathMax(len - n, 0);
|
|
writingBuf = writingBuf.slice(n);
|
|
return { writingBuf, len };
|
|
}
|
|
|
|
function mergeBuf(bufs, len) {
|
|
if (bufs.length === 0) {
|
|
return kEmptyBuffer;
|
|
}
|
|
|
|
if (bufs.length === 1) {
|
|
return bufs[0];
|
|
}
|
|
|
|
return Buffer.concat(bufs, len);
|
|
}
|
|
|
|
module.exports = Utf8Stream;
|