node/lib/internal/streams/fast-utf8-stream.js
James M Snell 5335c101a9 fs: port SonicBoom module to fs module as Utf8Stream
As a first step to porting portions of the pino structured
logger into the runtime, this commit ports the SonicBoom
module to the fs module as Utf8Stream.

This is a faithful port of the SonicBoom module with some
modern updates, such as converting to a Class and using
Symbol.dispose. The bulk of the implementation is unchanged
from the original.

PR-URL: https://github.com/nodejs/node/pull/58897
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Vinícius Lourenço Claro Cardoso <contact@viniciusl.com.br>
2025-07-28 20:27:07 -07:00

919 lines
24 KiB
JavaScript

'use strict';
// This file is derived from the original SonicBoom module
// MIT License
// Copyright (c) 2017 Matteo Collina
const {
ArrayPrototypePush,
AtomicsWait,
Int32Array,
MathMax,
SymbolDispose,
globalThis: {
Number,
SharedArrayBuffer,
},
} = primordials;
const {
Buffer,
} = require('buffer');
const fs = require('fs');
const EventEmitter = require('events');
const path = require('path');
const {
clearInterval,
setInterval,
setTimeout,
} = require('timers');
const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_STATE,
ERR_OPERATION_FAILED,
},
} = require('internal/errors');
const {
validateBoolean,
validateFunction,
validateObject,
validateOneOf,
validateString,
validateUint32,
} = require('internal/validators');
const BUSY_WRITE_TIMEOUT = 100;
const kEmptyBuffer = Buffer.allocUnsafe(0);
const kNil = new Int32Array(new SharedArrayBuffer(4));
function sleep(ms) {
// Also filters out NaN, non-number types, including empty strings, but allows bigints
const valid = ms > 0 && ms < Infinity;
if (valid === false) {
if (typeof ms !== 'number' && typeof ms !== 'bigint') {
throw new ERR_INVALID_ARG_TYPE('ms', ['number', 'bigint'], ms);
}
throw new ERR_INVALID_ARG_VALUE.RangeError('ms', ms,
'must be a number greater than 0 and less than Infinity');
}
AtomicsWait(kNil, 0, 0, Number(ms));
}
// 16 KB. Don't write more than docker buffer size.
// https://github.com/moby/moby/blob/513ec73831269947d38a644c278ce3cac36783b2/daemon/logger/copier.go#L13
const kMaxWrite = 16 * 1024;
const kContentModeBuffer = 'buffer';
const kContentModeUtf8 = 'utf8';
const kNullPrototype = { __proto__: null };
// Utf8Stream is a port of the original SonicBoom module
// (https://github.com/pinojs/sonic-boom) that provides a fast and efficient
// way to write UTF-8 encoded data to a file or stream.
class Utf8Stream extends EventEmitter {
#len = 0;
#fd = -1;
#bufs = [];
#lens = [];
#writing = false;
#ending = false;
#reopening = false;
#asyncDrainScheduled = false;
#flushPending = false;
#hwm = 16387; // 16 KB
#file = null;
#destroyed = false;
#minLength = 0;
#maxLength = 0;
#maxWrite = kMaxWrite;
#opening = false;
#periodicFlush = 0;
#periodicFlushTimer = undefined;
#sync = false;
#fsync = false;
#append = true;
#mode = 0o666;
#retryEAGAIN = () => true;
#mkdir = false;
#writingBuf = '';
#write;
#flush;
#flushSync;
#actualWrite;
#fsWriteSync;
#fsWrite;
#fs;
/**
* @typedef {object} Utf8StreamOptions
* @property {string} [dest] - path to the file to write to
* @property {number} [fd] - file descriptor to write to
* @property {number} [minLength] - minimum length of the internal buffer before a write is triggered
* @property {number} [maxLength] - maximum length of the internal buffer before writes are dropped
* @property {number} [maxWrite] - maximum size of a single write operation (default 16384)
* @property {number} [periodicFlush] - interval in ms to flush the stream periodically (default 0, disabled)
* @property {boolean} [sync] - if true, writes are performed synchronously (default false)
* @property {boolean} [fsync] - if true, fsync is called after every write (default false)
* @property {boolean} [append] - if true, data is appended to the file (default true, ignored
* if fd is provided)
* @property {string} [contentMode] - 'utf8' or 'buffer'
* @property {string} [mode] - file mode (permission and sticky bits), default is 0o666
* @property {boolean} [mkdir] - if true, the directory path will be created if it does not exist (default false)
* @property {Function} [retryEAGAIN] - function that receives (err, writingBufLength,
* remainingBufLength) and returns true if EAGAIN/EBUSY should be retried
* @property {object} [fs] - custom fs implementation, must provide write, writeSync, fsync,
* fsyncSync, close, open, mkdir, mkdirSync methods. Mostly useful for testing.
* @param {Utf8StreamOptions} [options]
*/
constructor(options = kNullPrototype) {
validateObject(options, 'options');
let {
fd,
} = options;
const {
dest,
minLength,
maxLength,
maxWrite,
periodicFlush,
sync,
append = true,
mkdir,
retryEAGAIN,
fsync,
contentMode = kContentModeUtf8,
mode,
// Provides for a custom fs implementation. Mostly useful for testing.
fs: overrideFs = {},
} = options;
super();
fd ??= dest;
validateObject(overrideFs, 'options.fs');
this.#fs = { ...fs, ...overrideFs };
validateFunction(this.#fs.write, 'options.fs.write');
validateFunction(this.#fs.writeSync, 'options.fs.writeSync');
validateFunction(this.#fs.fsync, 'options.fs.fsync');
validateFunction(this.#fs.fsyncSync, 'options.fs.fsyncSync');
validateFunction(this.#fs.close, 'options.fs.close');
validateFunction(this.#fs.open, 'options.fs.open');
validateFunction(this.#fs.mkdir, 'options.fs.mkdir');
validateFunction(this.#fs.mkdirSync, 'options.fs.mkdirSync');
this.#hwm = MathMax(minLength || 0, this.#hwm);
this.#minLength = minLength || 0;
this.#maxLength = maxLength || 0;
this.#maxWrite = maxWrite || kMaxWrite;
this.#periodicFlush = periodicFlush || 0;
this.#sync = sync || false;
this.#fsync = fsync || false;
this.#append = append || false;
this.#mode = mode;
this.#retryEAGAIN = retryEAGAIN || (() => true);
this.#mkdir = mkdir || false;
validateUint32(this.#hwm, 'options.hwm');
validateUint32(this.#minLength, 'options.minLength');
validateUint32(this.#maxLength, 'options.maxLength');
validateUint32(this.#maxWrite, 'options.maxWrite');
validateUint32(this.#periodicFlush, 'options.periodicFlush');
validateBoolean(this.#sync, 'options.sync');
validateBoolean(this.#fsync, 'options.fsync');
validateBoolean(this.#append, 'options.append');
validateBoolean(this.#mkdir, 'options.mkdir');
validateFunction(this.#retryEAGAIN, 'options.retryEAGAIN');
validateOneOf(contentMode, 'options.contentMode', [kContentModeBuffer, kContentModeUtf8]);
if (contentMode === kContentModeBuffer) {
this.#writingBuf = kEmptyBuffer;
this.#write = (...args) => this.#writeBuffer(...args);
this.#flush = (...args) => this.#flushBuffer(...args);
this.#flushSync = (...args) => this.#flushBufferSync(...args);
this.#actualWrite = (...args) => this.#actualWriteBuffer(...args);
this.#fsWriteSync = () => this.#fs.writeSync(this.#fd, this.#writingBuf);
this.#fsWrite = () => this.#fs.write(this.#fd, this.#writingBuf, (...args) => {
return this.#release(...args);
});
} else {
this.#writingBuf = '';
this.#write = (...args) => this.#writeUtf8(...args);
this.#flush = (...args) => this.#flushUtf8(...args);
this.#flushSync = (...args) => this.#flushSyncUtf8(...args);
this.#actualWrite = (...args) => this.#actualWriteUtf8(...args);
this.#fsWriteSync = () => this.#fs.writeSync(this.#fd, this.#writingBuf, 'utf8');
this.#fsWrite = () => this.#fs.write(this.#fd, this.#writingBuf, 'utf8', (...args) => {
return this.#release(...args);
});
}
// TODO(@jasnell): Support passing in an AbortSignal to cancel the stream.
// TODO(@jasnell): Support FileHandle here as well?
// TODO(@jasnell): The `dest` option is a path string. We may consider
// also supporting URL and Buffer types here as well like the rest of
// the fs module APIs do.
if (typeof fd === 'number') {
this.#fd = fd;
process.nextTick(() => this.emit('ready'));
} else if (typeof fd === 'string') {
this.#openFile(fd);
} else {
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'string'], fd);
}
if (this.#minLength >= this.#maxWrite) {
throw new ERR_INVALID_ARG_VALUE.RangeError('minLength', this.#minLength,
`should be smaller than maxWrite (${this.#maxWrite})`);
}
this.on('newListener', (name) => {
if (name === 'drain') {
this._asyncDrainScheduled = false;
}
});
if (this.#periodicFlush !== 0) {
this.#periodicFlushTimer = setInterval(() => this.flush(null), this.#periodicFlush);
this.#periodicFlushTimer.unref();
}
}
/**
* @param {string|Buffer} data
* @returns {boolean}
*/
write(data) {
return this.#write(data);
}
/**
* @callback FlushCallback
* @param {Error} [err] - Error if any
* @returns {void}
*/
/**
* @param {FlushCallback} [cb]
*/
flush(cb = function(_err) {}) { this.#flush(cb); }
flushSync() { return this.#flushSync(); }
/**
* @param {string} [file]
*/
reopen(file) {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
if (this.#opening) {
this.once('ready', () => this.reopen(file));
return;
}
if (this.#ending) {
return;
}
if (!this.#file) {
throw new ERR_OPERATION_FAILED(
'Unable to reopen a file descriptor, you must pass a file to SonicBoom');
}
if (file) {
this.#file = file;
}
this.#reopening = true;
if (this.#writing) {
return;
}
const fd = this.#fd;
this.once('ready', () => {
if (fd !== this.#fd) {
this.#fs.close(fd, (err) => {
if (err) {
return this.emit('error', err);
}
});
}
});
this.#openFile(this.#file);
}
end() {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
if (this.#opening) {
this.once('ready', () => {
this.end();
});
return;
}
if (this.#ending) {
return;
}
this.#ending = true;
if (this.#writing) {
return;
}
if (this.#len > 0 && this.#fd >= 0) {
this.#actualWrite();
} else {
this.#actualClose();
}
}
destroy() {
if (this.#destroyed) {
return;
}
this.#actualClose();
}
/** @type {number} */
get mode() { return this.#mode; }
/** @type {string|undefined} */
get file() { return this.#file; }
/** @type {number} */
get fd() { return this.#fd; }
/** @type {number} */
get minLength() { return this.#minLength; }
/** @type {number} */
get maxLength() { return this.#maxLength; }
/** @type {boolean} */
get writing() { return this.#writing; }
/** @type {boolean} */
get sync() { return this.#sync; }
/** @type {boolean} */
get fsync() { return this.#fsync; }
/** @type {boolean} */
get append() { return this.#append; }
/** @type {number} */
get periodicFlush() { return this.#periodicFlush; }
/** @type {'buffer'|'utf8'} */
get contentMode() {
return this.#writingBuf instanceof Buffer ? kContentModeBuffer : kContentModeUtf8;
}
/** @type {boolean} */
get mkdir() { return this.#mkdir; }
[SymbolDispose]() { this.destroy(); }
#release(err, n) {
if (err) {
if ((err.code === 'EAGAIN' || err.code === 'EBUSY') &&
this.#retryEAGAIN(err, this.#writingBuf.length, this.#len - this.#writingBuf.length)) {
if (this.#sync) {
// This error code should not happen in sync mode, because it is
// not using the underlining operating system asynchronous functions.
// However it happens, and so we handle it.
// Ref: https://github.com/pinojs/pino/issues/783
try {
sleep(BUSY_WRITE_TIMEOUT);
this.#release(undefined, 0);
} catch (err) {
this.#release(err);
}
} else {
// Let's give the destination some time to process the chunk.
setTimeout(() => this.#fsWrite(), BUSY_WRITE_TIMEOUT);
}
} else {
this.#writing = false;
this.emit('error', err);
}
return;
}
this.emit('write', n);
const releasedBufObj = releaseWritingBuf(this.#writingBuf, this.#len, n);
this.#len = releasedBufObj.len;
this.#writingBuf = releasedBufObj.writingBuf;
if (this.#writingBuf.length) {
if (!this.#sync) {
this.#fsWrite();
return;
}
try {
do {
const n = this.#fsWriteSync();
const releasedBufObj = releaseWritingBuf(this.#writingBuf, this.#len, n);
this.#len = releasedBufObj.len;
this.#writingBuf = releasedBufObj.writingBuf;
} while (this.#writingBuf.length);
} catch (err) {
this.#release(err);
return;
}
}
if (this.#fsync) {
this.#fs.fsyncSync(this.#fd);
}
const len = this.#len;
if (this.#reopening) {
this.#writing = false;
this.#reopening = false;
this.reopen();
} else if (len > this.#minLength) {
this.#actualWrite();
} else if (this.#ending) {
if (len > 0) {
this.#actualWrite();
} else {
this.#writing = false;
this.#actualClose();
}
} else {
this.#writing = false;
if (this.#sync) {
if (!this.#asyncDrainScheduled) {
this.#asyncDrainScheduled = true;
process.nextTick(() => this.#emitDrain());
}
} else {
this.emit('drain');
}
}
}
#openFile(file) {
this.#opening = true;
this.#writing = true;
this.#asyncDrainScheduled = false;
// NOTE: 'error' and 'ready' events emitted below only relevant when sonic.sync===false
// for sync mode, there is no way to add a listener that will receive these
const fileOpened = (err, fd) => {
if (err) {
this.#reopening = false;
this.#writing = false;
this.#opening = false;
if (this.#sync) {
process.nextTick(() => {
if (this.listenerCount('error') > 0) {
this.emit('error', err);
}
});
} else {
this.emit('error', err);
}
return;
}
const reopening = this.#reopening;
this.#fd = fd;
this.#file = file;
this.#reopening = false;
this.#opening = false;
this.#writing = false;
if (this.#sync) {
process.nextTick(() => this.emit('ready'));
} else {
this.emit('ready');
}
if (this.#destroyed) {
return;
}
// start
if ((!this.#writing && this.#len > this.#minLength) || this.#flushPending) {
this.#actualWrite();
} else if (reopening) {
process.nextTick(() => this.emit('drain'));
}
};
const flags = this.#append ? 'a' : 'w';
const mode = this.#mode;
if (this.#sync) {
try {
if (this.#mkdir) this.#fs.mkdirSync(path.dirname(file), { recursive: true });
const fd = this.#fs.openSync(file, flags, mode);
fileOpened(null, fd);
} catch (err) {
fileOpened(err);
throw err;
}
} else if (this.#mkdir) {
this.#fs.mkdir(path.dirname(file), { recursive: true }, (err) => {
if (err) return fileOpened(err);
this.#fs.open(file, flags, mode, fileOpened);
});
} else {
this.#fs.open(file, flags, mode, fileOpened);
}
}
#emitDrain() {
const hasListeners = this.listenerCount('drain') > 0;
if (!hasListeners) return;
this.#asyncDrainScheduled = false;
this.emit('drain');
}
#actualClose() {
if (this.#fd === -1) {
this.once('ready', () => this.#actualClose());
return;
}
if (this.#periodicFlushTimer !== undefined) {
clearInterval(this.#periodicFlushTimer);
}
this.#destroyed = true;
this.#bufs = [];
this.#lens = [];
const done = (err) => {
if (err) {
this.emit('error', err);
return;
}
if (this.#ending && !this.#writing) {
this.emit('finish');
}
this.emit('close');
};
const closeWrapped = () => {
// We skip errors in fsync
if (this.#fd !== 1 && this.#fd !== 2) {
this.#fs.close(this.#fd, done);
} else {
done();
}
};
try {
this.#fs.fsync(this.#fd, closeWrapped);
} catch {
// Intentionally empty.
}
}
#actualWriteBuffer() {
this.#writing = true;
this.#writingBuf = this.#writingBuf.length ? this.#writingBuf : mergeBuf(this.#bufs.shift(), this.#lens.shift());
if (this.#sync) {
try {
const written = this.#fs.writeSync(this.#fd, this.#writingBuf);
this.#release(null, written);
} catch (err) {
this.#release(err);
}
} else {
// fs.write will need to copy string to buffer anyway so
// we do it here to avoid the overhead of calculating the buffer size
// in releaseWritingBuf.
this.#writingBuf = Buffer.from(this.#writingBuf);
this.#fs.write(this.#fd, this.#writingBuf, (...args) => this.#release(...args));
}
}
#actualWriteUtf8() {
this.#writing = true;
this.#writingBuf ||= this.#bufs.shift() || '';
if (this.#sync) {
try {
const written = this.#fs.writeSync(this.#fd, this.#writingBuf, 'utf8');
this.#release(null, written);
} catch (err) {
this.#release(err);
}
} else {
this.#fs.write(this.#fd, this.#writingBuf, 'utf8', (...args) => this.#release(...args));
}
}
#flushBufferSync() {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
if (this.#fd < 0) {
throw new ERR_INVALID_STATE('Invalid file descriptor');
}
if (!this.#writing && this.#writingBuf.length > 0) {
this.#bufs.unshift([this.#writingBuf]);
this.#writingBuf = kEmptyBuffer;
}
let buf = kEmptyBuffer;
while (this.#bufs.length || buf.length) {
if (buf.length <= 0) {
buf = mergeBuf(this.#bufs[0], this.#lens[0]);
}
try {
const n = this.#fs.writeSync(this.#fd, buf);
buf = buf.subarray(n);
this.#len = MathMax(this.#len - n, 0);
if (buf.length <= 0) {
this.#bufs.shift();
this.#lens.shift();
}
} catch (err) {
const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY';
if (shouldRetry && !this.#retryEAGAIN(err, buf.length, this.#len - buf.length)) {
throw err;
}
sleep(BUSY_WRITE_TIMEOUT);
}
}
}
#flushSyncUtf8() {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
if (this.#fd < 0) {
throw new ERR_INVALID_STATE('Invalid file descriptor');
}
if (!this.#writing && this.#writingBuf.length > 0) {
this.#bufs.unshift(this.#writingBuf);
this.#writingBuf = '';
}
let buf = '';
while (this.#bufs.length || buf) {
if (buf.length <= 0) {
buf = this.#bufs[0];
}
try {
const n = this.#fs.writeSync(this.#fd, buf, 'utf8');
const releasedBufObj = releaseWritingBuf(buf, this.#len, n);
buf = releasedBufObj.writingBuf;
this.#len = releasedBufObj.len;
if (buf.length <= 0) {
this.#bufs.shift();
}
} catch (err) {
const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY';
if (shouldRetry && !this.#retryEAGAIN(err, buf.length, this.#len - buf.length)) {
throw err;
}
sleep(BUSY_WRITE_TIMEOUT);
}
}
try {
this.#fs.fsyncSync(this.#fd);
} catch {
// Skip the error. The fd might not support fsync.
}
}
#callFlushCallbackOnDrain(cb) {
this.#flushPending = true;
const onDrain = () => {
// Only if _fsync is false to avoid double fsync
if (!this.#fsync && !this.#destroyed) {
try {
this.#fs.fsync(this.#fd, (err) => {
this.#flushPending = false;
// If the fd is closed, we ignore the error.
if (err?.code === 'EBADF') {
cb();
return;
}
cb(err);
});
} catch (err) {
this.#flushPending = false;
cb(err);
}
} else {
this.#flushPending = false;
cb();
}
this.off('error', onError);
};
const onError = (err) => {
this.#flushPending = false;
cb(err);
this.off('drain', onDrain);
};
this.once('drain', onDrain);
this.once('error', onError);
}
#flushBuffer(cb) {
validateFunction(cb, 'cb');
if (this.#destroyed) {
const error = new ERR_INVALID_STATE('Utf8Stream is destroyed');
if (cb) {
cb(error);
return;
}
throw error;
}
if (this.#minLength <= 0) {
cb?.();
return;
}
if (cb) {
this.#callFlushCallbackOnDrain(cb);
}
if (this.#writing) {
return;
}
if (this.#bufs.length === 0) {
ArrayPrototypePush(this.#bufs, []);
ArrayPrototypePush(this.#lens, 0);
}
this.#actualWrite();
}
#flushUtf8(cb) {
validateFunction(cb, 'cb');
if (this.#destroyed) {
const error = new ERR_INVALID_STATE('Utf8Stream is destroyed');
if (cb) {
cb(error);
return;
}
throw error;
}
if (this.#minLength <= 0) {
cb?.();
return;
}
if (cb) {
this.#callFlushCallbackOnDrain(cb);
}
if (this.#writing) {
return;
}
if (this.#bufs.length === 0) {
ArrayPrototypePush(this.#bufs, '');
}
this.#actualWrite();
}
#writeBuffer(data) {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
// TODO(@jasnell): Support any ArrayBufferView type here, not just Buffer.
if (!Buffer.isBuffer(data)) {
throw new ERR_INVALID_ARG_TYPE('data', 'Buffer', data);
}
const len = this.#len + data.length;
const bufs = this.#bufs;
const lens = this.#lens;
if (this.#maxLength && len > this.#maxLength) {
this.emit('drop', data);
return this.#len < this.#hwm;
}
if (
bufs.length === 0 ||
lens[lens.length - 1] + data.length > this.#maxWrite
) {
ArrayPrototypePush(bufs, []);
ArrayPrototypePush(lens, data.length);
} else {
ArrayPrototypePush(bufs[bufs.length - 1], data);
lens[lens.length - 1] += data.length;
}
this.#len = len;
if (!this.#writing && this.#len >= this.#minLength) {
this.#actualWrite();
}
return this.#len < this.#hwm;
}
#writeUtf8(data) {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
validateString(data, 'data');
const len = this.#len + data.length;
const bufs = this.#bufs;
if (this.#maxLength && len > this.#maxLength) {
this.emit('drop', data);
return this.#len < this.#hwm;
}
if (
bufs.length === 0 ||
bufs[bufs.length - 1].length + data.length > this.#maxWrite
) {
ArrayPrototypePush(bufs, '' + data);
} else {
bufs[bufs.length - 1] += data;
}
this.#len = len;
if (!this.#writing && this.#len >= this.#minLength) {
this.#actualWrite();
}
return this.#len < this.#hwm;
}
}
/**
* Release the writingBuf after fs.write n bytes data
* @param {string | Buffer} writingBuf - currently writing buffer, usually be instance._writingBuf.
* @param {number} len - currently buffer length, usually be instance._len.
* @param {number} n - number of bytes fs already written
* @returns {{writingBuf: string | Buffer, len: number}} released writingBuf and length
*/
function releaseWritingBuf(writingBuf, len, n) {
// if Buffer.byteLength is equal to n, that means writingBuf contains no multi-byte character
if (typeof writingBuf === 'string' && Buffer.byteLength(writingBuf) !== n) {
// Since the fs.write callback parameter `n` means how many bytes the passed of string
// We calculate the original string length for avoiding the multi-byte character issue
n = Buffer.from(writingBuf).subarray(0, n).toString().length;
}
len = MathMax(len - n, 0);
writingBuf = writingBuf.slice(n);
return { writingBuf, len };
}
function mergeBuf(bufs, len) {
if (bufs.length === 0) {
return kEmptyBuffer;
}
if (bufs.length === 1) {
return bufs[0];
}
return Buffer.concat(bufs, len);
}
module.exports = Utf8Stream;