Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
fef0c4e
stream: prototype for new stream implementation
jasnell Mar 1, 2026
3da923b
stream: updates to stream/new impl
jasnell Mar 3, 2026
d8a7743
stream: clarify backpressure details in stream_new
jasnell Mar 3, 2026
9803d11
stream: fixup sync pull batching in stream/new
jasnell Mar 3, 2026
e428158
stream: fixup stream_new.md linting
jasnell Mar 3, 2026
104074a
stream: fixup some perf bugs in stream/new
jasnell Mar 3, 2026
d393c19
stream: apply more perf improvements to stream/new
jasnell Mar 3, 2026
13cafd6
stream: apply more perf improvements to stream/new
jasnell Mar 3, 2026
97ee8a3
stream: apply another minor perf improvement in stream/new
jasnell Mar 3, 2026
36bbc94
stream: use proper # private fields in stream/new
jasnell Mar 3, 2026
8669467
stream: fixup from memory issue / batch yielding
jasnell Mar 3, 2026
63c887d
stream: update stream/new default highWaterMark to 4
jasnell Mar 3, 2026
42da7ad
stream: rename stream/new to stream/iter
jasnell Mar 18, 2026
6af362e
stream: gate stream/iter behind --experimental-stream-iter
jasnell Mar 18, 2026
5bfafbc
stream: replace instanceof with cross-realm-safe type checks
jasnell Mar 18, 2026
18844d7
stream: use primordials for prototype method access in stream/iter
jasnell Mar 18, 2026
8811c6b
stream: fix Broadcast.from() silently dropping non-array chunks
jasnell Mar 18, 2026
a6a37e5
stream: fix share consumer premature termination on concurrent pull
jasnell Mar 18, 2026
0c9827a
stream: fix Writer sync method return values for try-fallback pattern
jasnell Mar 18, 2026
32eaae7
stream: use Writer try-fallback pattern in pipeTo and Broadcast.from
jasnell Mar 18, 2026
99eb712
stream: tolerate non-array batches in stream/iter consumers
jasnell Mar 18, 2026
c6c39d4
stream: treat Writer end/fail methods as optional in pipeTo
jasnell Mar 18, 2026
6b93fa8
stream: add bounds checking to RingBuffer.get()
jasnell Mar 18, 2026
4e30386
stream: deduplicate consumer sync/async iteration logic
jasnell Mar 18, 2026
eec6f0a
stream: validate all elements in isUint8ArrayBatch
jasnell Mar 18, 2026
88964ab
stream: handle sync iterables in merge() multi-source path
jasnell Mar 18, 2026
9b21cbd
stream: consolidate TextEncoder to single instance in utils
jasnell Mar 18, 2026
f2ada96
stream: prevent unhandled rejection in Broadcast.from() pump
jasnell Mar 18, 2026
2b05573
stream: validate backpressure option at construction time
jasnell Mar 18, 2026
604168c
stream: make a number of cleanups in broadcast.js
jasnell Mar 18, 2026
23a822a
stream: update and improve the stream_iter doc
jasnell Mar 18, 2026
888fced
stream: add pull/writer methods to FileHandle conditionally
jasnell Mar 18, 2026
f177ef3
stream: make multiple updates, cleanups, and fixes
jasnell Mar 18, 2026
dadd875
stream: improve validation of options in stream/iter
jasnell Mar 18, 2026
577e78f
stream: make additional fixups to stream/iter arg validation
jasnell Mar 18, 2026
70de86d
stream: expand and split stream/iter tests
jasnell Mar 18, 2026
371dbe9
stream: add stream/iter benchmarks
jasnell Mar 18, 2026
1af5e80
stream: performance optimizations for stream/iter pipeline and broadcast
jasnell Mar 18, 2026
b198a49
src: fixup linting issue
jasnell Mar 18, 2026
4e44ece
stream: fixup stream/iter benchmarks and test assertion
jasnell Mar 18, 2026
84df861
stream: fixup stream/iter tests
jasnell Mar 19, 2026
ecf1053
stream: cleanup a few minor details in stream/iter
jasnell Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions benchmark/fs/bench-filehandle-pull-vs-webstream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Flags: --experimental-stream-iter
// Compare FileHandle.createReadStream() vs readableWebStream() vs pull()
// reading a large file through two transforms: uppercase then gzip compress.
'use strict';

const common = require('../common.js');
const fs = require('fs');
const zlib = require('zlib');
const { Transform, Writable, pipeline } = require('stream');

const tmpdir = require('../../test/common/tmpdir');
tmpdir.refresh();
const filename = tmpdir.resolve(`.removeme-benchmark-garbage-${process.pid}`);

const bench = common.createBenchmark(main, {
api: ['classic', 'webstream', 'pull'],
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
n: [5],
});

function main({ api, filesize, n }) {
// Create the fixture file with repeating lowercase ASCII
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
const fd = fs.openSync(filename, 'w');
let remaining = filesize;
while (remaining > 0) {
const toWrite = Math.min(remaining, chunk.length);
fs.writeSync(fd, chunk, 0, toWrite);
remaining -= toWrite;
}
fs.closeSync(fd);

if (api === 'classic') {
benchClassic(n, filesize).then(() => cleanup());
} else if (api === 'webstream') {
benchWebStream(n, filesize).then(() => cleanup());
} else {
benchPull(n, filesize).then(() => cleanup());
}
}

function cleanup() {
try { fs.unlinkSync(filename); } catch { /* ignore */ }
}

// ---------------------------------------------------------------------------
// Classic streams path: createReadStream -> Transform (upper) -> createGzip
// ---------------------------------------------------------------------------
async function benchClassic(n, filesize) {
// Warm up
await runClassic();

bench.start();
let totalBytes = 0;
for (let i = 0; i < n; i++) {
totalBytes += await runClassic();
}
bench.end(totalBytes / (1024 * 1024));
}

function runClassic() {
return new Promise((resolve, reject) => {
const rs = fs.createReadStream(filename);

// Transform 1: uppercase
const upper = new Transform({
transform(chunk, encoding, callback) {
const buf = Buffer.allocUnsafe(chunk.length);
for (let i = 0; i < chunk.length; i++) {
const b = chunk[i];
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
callback(null, buf);
},
});

// Transform 2: gzip
const gz = zlib.createGzip();

// Sink: count compressed bytes
let totalBytes = 0;
const sink = new Writable({
write(chunk, encoding, callback) {
totalBytes += chunk.length;
callback();
},
});

pipeline(rs, upper, gz, sink, (err) => {
if (err) reject(err);
else resolve(totalBytes);
});
});
}

// ---------------------------------------------------------------------------
// WebStream path: readableWebStream -> TransformStream (upper) -> CompressionStream
// ---------------------------------------------------------------------------
async function benchWebStream(n, filesize) {
// Warm up
await runWebStream();

bench.start();
let totalBytes = 0;
for (let i = 0; i < n; i++) {
totalBytes += await runWebStream();
}
bench.end(totalBytes / (1024 * 1024));
}

async function runWebStream() {
const fh = await fs.promises.open(filename, 'r');
try {
const rs = fh.readableWebStream();

// Transform 1: uppercase
const upper = new TransformStream({
transform(chunk, controller) {
const buf = new Uint8Array(chunk.length);
for (let i = 0; i < chunk.length; i++) {
const b = chunk[i];
// a-z (0x61-0x7a) -> A-Z (0x41-0x5a)
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
controller.enqueue(buf);
},
});

// Transform 2: gzip via CompressionStream
const compress = new CompressionStream('gzip');

const output = rs.pipeThrough(upper).pipeThrough(compress);
const reader = output.getReader();

let totalBytes = 0;
while (true) {
const { done, value } = await reader.read();
if (done) break;
totalBytes += value.byteLength;
}
return totalBytes;
} finally {
await fh.close();
}
}

// ---------------------------------------------------------------------------
// New streams path: pull() with uppercase transform + gzip transform
// ---------------------------------------------------------------------------
async function benchPull(n, filesize) {
const { pull, compressGzip } = require('stream/iter');

// Warm up
await runPull(pull, compressGzip);

bench.start();
let totalBytes = 0;
for (let i = 0; i < n; i++) {
totalBytes += await runPull(pull, compressGzip);
}
bench.end(totalBytes / (1024 * 1024));
}

async function runPull(pull, compressGzip) {
const fh = await fs.promises.open(filename, 'r');
try {
// Stateless transform: uppercase each chunk in the batch
const upper = (chunks) => {
if (chunks === null) return null;
const out = new Array(chunks.length);
for (let j = 0; j < chunks.length; j++) {
const src = chunks[j];
const buf = new Uint8Array(src.length);
for (let i = 0; i < src.length; i++) {
const b = src[i];
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
out[j] = buf;
}
return out;
};

const readable = fh.pull(upper, compressGzip());

// Count bytes symmetrically with the classic path (no final
// concatenation into a single buffer).
let totalBytes = 0;
for await (const chunks of readable) {
for (let i = 0; i < chunks.length; i++) {
totalBytes += chunks[i].byteLength;
}
}
return totalBytes;
} finally {
await fh.close();
}
}
92 changes: 92 additions & 0 deletions benchmark/streams/iter-creation.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Object creation overhead benchmark.
// Measures the cost of constructing stream infrastructure (no data flow).
'use strict';

const common = require('../common.js');
const { Readable, Writable, Transform, PassThrough } = require('stream');

const bench = common.createBenchmark(main, {
api: ['classic', 'webstream', 'iter'],
type: ['readable', 'writable', 'transform', 'pair'],
n: [1e5],
}, {
flags: ['--experimental-stream-iter'],
// Iter has no standalone Transform class; transforms are plain functions.
combinationFilter: ({ api, type }) =>
!(api === 'iter' && type === 'transform'),
});

function main({ api, type, n }) {
switch (api) {
case 'classic':
return benchClassic(type, n);
case 'webstream':
return benchWebStream(type, n);
case 'iter':
return benchIter(type, n);
}
}

function benchClassic(type, n) {
bench.start();
switch (type) {
case 'readable':
for (let i = 0; i < n; i++) new Readable({ read() {} });
break;
case 'writable':
for (let i = 0; i < n; i++) new Writable({ write(c, e, cb) { cb(); } });
break;
case 'transform':
for (let i = 0; i < n; i++) new Transform({
transform(c, e, cb) { cb(null, c); },
});
break;
case 'pair':
for (let i = 0; i < n; i++) new PassThrough();
break;
}
bench.end(n);
}

function benchWebStream(type, n) {
bench.start();
switch (type) {
case 'readable':
for (let i = 0; i < n; i++) new ReadableStream({ pull() {} });
break;
case 'writable':
for (let i = 0; i < n; i++) new WritableStream({ write() {} });
break;
case 'transform':
for (let i = 0; i < n; i++) new TransformStream({
transform(c, controller) { controller.enqueue(c); },
});
break;
case 'pair': {
// TransformStream gives a readable+writable pair
for (let i = 0; i < n; i++) new TransformStream();
break;
}
}
bench.end(n);
}

function benchIter(type, n) {
const { push, from, duplex } = require('stream/iter');

bench.start();
switch (type) {
case 'readable':
for (let i = 0; i < n; i++) from('x');
break;
case 'writable':
// push() creates a writer+readable pair
for (let i = 0; i < n; i++) push();
break;
case 'pair':
// duplex() creates a bidirectional channel pair
for (let i = 0; i < n; i++) duplex();
break;
}
bench.end(n);
}
107 changes: 107 additions & 0 deletions benchmark/streams/iter-file-read.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// File reading throughput benchmark.
// Reads a real file through the three stream APIs.
'use strict';

const common = require('../common.js');
const fs = require('fs');
const { Writable, pipeline } = require('stream');
const tmpdir = require('../../test/common/tmpdir');

tmpdir.refresh();
const filename = tmpdir.resolve(`.removeme-bench-file-read-${process.pid}`);

const bench = common.createBenchmark(main, {
api: ['classic', 'webstream', 'iter'],
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
n: [5],
}, {
flags: ['--experimental-stream-iter'],
});

function main({ api, filesize, n }) {
// Create fixture file
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
const fd = fs.openSync(filename, 'w');
let remaining = filesize;
while (remaining > 0) {
const size = Math.min(remaining, chunk.length);
fs.writeSync(fd, chunk, 0, size);
remaining -= size;
}
fs.closeSync(fd);

const totalOps = (filesize * n) / (1024 * 1024);

switch (api) {
case 'classic':
return benchClassic(filesize, n, totalOps);
case 'webstream':
return benchWebStream(filesize, n, totalOps);
case 'iter':
return benchIter(filesize, n, totalOps);
}
}

function benchClassic(filesize, n, totalOps) {
function run(cb) {
const r = fs.createReadStream(filename);
const w = new Writable({ write(data, enc, cb) { cb(); } });
pipeline(r, w, cb);
}

// Warmup
run(() => {
let i = 0;
bench.start();
(function next() {
if (i++ >= n) {
fs.unlinkSync(filename);
return bench.end(totalOps);
}
run(next);
})();
});
}

function benchWebStream(filesize, n, totalOps) {
const fsp = require('fs/promises');

async function run() {
const fh = await fsp.open(filename, 'r');
const rs = fh.readableWebStream();
const ws = new WritableStream({ write() {} });
await rs.pipeTo(ws);
await fh.close();
}

(async () => {
// Warmup
await run();

bench.start();
for (let i = 0; i < n; i++) await run();
fs.unlinkSync(filename);
bench.end(totalOps);
})();
}

function benchIter(filesize, n, totalOps) {
const fsp = require('fs/promises');
const { pipeTo } = require('stream/iter');

async function run() {
const fh = await fsp.open(filename, 'r');
await pipeTo(fh.pull(), { write() {} });
await fh.close();
}

(async () => {
// Warmup
await run();

bench.start();
for (let i = 0; i < n; i++) await run();
fs.unlinkSync(filename);
bench.end(totalOps);
})();
}
Loading
Loading