From ab68aad1a01bf2c102057a74b0174cd7cfa7ca93 Mon Sep 17 00:00:00 2001 From: Efe Karasakal Date: Wed, 11 Mar 2026 18:11:02 +0100 Subject: [PATCH 1/4] stream: validate streams in pipeline --- lib/internal/streams/pipeline.js | 86 +++++++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 7 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ad3f0796875aae..f243e456ce741a 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -13,7 +13,6 @@ const { const { eos } = require('internal/streams/end-of-stream'); const { once } = require('internal/util'); const destroyImpl = require('internal/streams/destroy'); -const Duplex = require('internal/streams/duplex'); const { AbortError, aggregateTwoErrors, @@ -36,6 +35,8 @@ const { isIterable, isReadable, isReadableNodeStream, + isWritableNodeStream, + isWritableStream, isNodeStream, isTransformStream, isWebStream, @@ -159,7 +160,8 @@ async function pumpToWeb(readable, writable, finish, { end }) { try { for await (const chunk of readable) { await writer.ready; - writer.write(chunk).catch(() => {}); + writer.write(chunk).catch(() => { + }); } await writer.ready; @@ -179,6 +181,76 @@ async function pumpToWeb(readable, writable, finish, { end }) { } } +function getPipelineArgName(i, len) { + if (i === 0) return 'source'; + if (i === len - 1) return 'destination'; + return `transform[${i - 1}]`; +} + +function isValidPipelineSource(stream) { + return ( + isIterable(stream) || + isReadableNodeStream(stream) || + isReadableStream(stream) || + isTransformStream(stream) + ); +} + +function isValidPipelineTransform(stream) { + return ( + isTransformStream(stream) || + (isNodeStream(stream) && + isReadableNodeStream(stream) && + isWritableNodeStream(stream)) + ); +} + +function isValidPipelineDestination(stream) { + return ( + isWritableNodeStream(stream) || + isWritableStream(stream) || + isTransformStream(stream) + ); +} + +function validatePipelineStream(stream, i, len) { + if (typeof stream === 'function') { + return; + } + + const name = getPipelineArgName(i, len); + + if (i === 0) { + if (!isValidPipelineSource(stream)) { + throw new ERR_INVALID_ARG_TYPE( + name, + ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], + stream, + ); + } + return; + } + + if (i === len - 1) { + if (!isValidPipelineDestination(stream)) { + throw new ERR_INVALID_ARG_TYPE( + name, + ['Writable', 'WritableStream', 'TransformStream'], + stream, + ); + } + return; + } + + if (!isValidPipelineTransform(stream)) { + throw new ERR_INVALID_ARG_TYPE( + name, + ['Duplex', 'Transform', 'TransformStream'], + stream, + ); + } +} + function pipeline(...streams) { return pipelineImpl(streams, once(popCallback(streams))); } @@ -192,6 +264,10 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_MISSING_ARGS('streams'); } + for (let i = 0; i < streams.length; i++) { + validatePipelineStream(streams[i], i, streams.length); + } + const ac = new AbortController(); const signal = ac.signal; const outerSignal = opts?.signal; @@ -298,10 +374,8 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { - ret = stream; } else { - ret = Duplex.from(stream); + ret = stream; } } else if (typeof stream === 'function') { if (isTransformStream(ret)) { @@ -402,8 +476,6 @@ function pipelineImpl(streams, callback, opts) { 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); } ret = stream; - } else { - ret = Duplex.from(stream); } } From c5da09b641b66ba24f5b19d3a4fa6134234f0b96 Mon Sep 17 00:00:00 2001 From: Efe Karasakal Date: Wed, 11 Mar 2026 20:36:09 +0100 Subject: [PATCH 2/4] stream: inline name arg creation --- lib/internal/streams/pipeline.js | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index f243e456ce741a..4359c9a5d47c2f 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -181,12 +181,6 @@ async function pumpToWeb(readable, writable, finish, { end }) { } } -function getPipelineArgName(i, len) { - if (i === 0) return 'source'; - if (i === len - 1) return 'destination'; - return `transform[${i - 1}]`; -} - function isValidPipelineSource(stream) { return ( isIterable(stream) || @@ -218,12 +212,10 @@ function validatePipelineStream(stream, i, len) { return; } - const name = getPipelineArgName(i, len); - if (i === 0) { if (!isValidPipelineSource(stream)) { throw new ERR_INVALID_ARG_TYPE( - name, + 'source', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], stream, ); @@ -234,7 +226,7 @@ function validatePipelineStream(stream, i, len) { if (i === len - 1) { if (!isValidPipelineDestination(stream)) { throw new ERR_INVALID_ARG_TYPE( - name, + 'destination', ['Writable', 'WritableStream', 'TransformStream'], stream, ); @@ -244,7 +236,7 @@ function validatePipelineStream(stream, i, len) { if (!isValidPipelineTransform(stream)) { throw new ERR_INVALID_ARG_TYPE( - name, + `transform[${i - 1}]`, ['Duplex', 'Transform', 'TransformStream'], stream, ); From cfa4614cf3f524f1e76e6ff7b5d261f7782d07be Mon Sep 17 00:00:00 2001 From: Efe Karasakal Date: Tue, 17 Mar 2026 22:28:07 +0100 Subject: [PATCH 3/4] stream: add tests to validate possible stream arg scenarios --- .../test-stream-pipeline-validation.js | 220 ++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 test/parallel/test-stream-pipeline-validation.js diff --git a/test/parallel/test-stream-pipeline-validation.js b/test/parallel/test-stream-pipeline-validation.js new file mode 100644 index 00000000000000..6aa4e191bd8d35 --- /dev/null +++ b/test/parallel/test-stream-pipeline-validation.js @@ -0,0 +1,220 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { Readable, Writable, PassThrough, pipeline } = require('stream'); +const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); + +class NonReadableNodeStream extends Writable { + _write(chunk, encoding, callback) { + callback(); + } +} + +class NonWritableNodeStream extends Readable { + _read() { + this.push('x'); + this.push(null); + } +} + +function assertInvalidArg(fn, name) { + assert.throws(fn, (err) => { + assert.strictEqual(err.code, 'ERR_INVALID_ARG_TYPE'); + assert.strictEqual(err.message.includes(`The "${name}"`), true); + return true; + }); +} + +// Non-readable Node stream +{ + assertInvalidArg(() => { + pipeline( + new NonReadableNodeStream(), // source + new PassThrough(), + common.mustNotCall(), + ); + }, 'source'); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new PassThrough(), + new NonReadableNodeStream(), // transform + new PassThrough(), + common.mustNotCall(), + ); + }, 'transform[1]'); + + pipeline( + Readable.from(['x']), + new NonReadableNodeStream(), // destination + common.mustSucceed(), + ); +} + +// Non-writable Node stream +{ + pipeline( + new NonWritableNodeStream(), // source + new Writable({ + write(chunk, encoding, callback) { + callback(); + }, + }), + common.mustSucceed(), + ); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new NonWritableNodeStream(), // transform + new PassThrough(), + common.mustNotCall(), + ); + }, 'transform[0]'); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new NonWritableNodeStream(), // destination + common.mustNotCall(), + ); + }, 'destination'); +} + +// ReadableStream +{ + const chunks = []; + pipeline( + new ReadableStream({ // source + start(controller) { + controller.enqueue('x'); + controller.close(); + }, + }), + new Writable({ + write(chunk, encoding, callback) { + chunks.push(chunk.toString()); + callback(); + }, + }), + common.mustSucceed(() => { + assert.deepStrictEqual(chunks, ['x']); + }), + ); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new ReadableStream({ // transform + start(controller) { + controller.enqueue('x'); + controller.close(); + }, + }), + new PassThrough(), + common.mustNotCall(), + ); + }, 'transform[0]'); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new ReadableStream({ // destination + start(controller) { + controller.enqueue('x'); + controller.close(); + }, + }), + common.mustNotCall(), + ); + }, 'destination'); +} + +// TransformStream +{ + const source = new TransformStream(); + const writer = source.writable.getWriter(); + const seen = []; + pipeline( + source, // source + new Writable({ + write(chunk, encoding, callback) { + seen.push(chunk.toString()); + callback(); + }, + }), + common.mustSucceed(() => { + assert.deepStrictEqual(seen, ['x']); + }), + ); + + writer.write('x').then(common.mustCall(() => writer.close())); + + const transformed = []; + pipeline( + Readable.from(['x']), + new TransformStream({ // transform + transform(chunk, controller) { + controller.enqueue(chunk.toString().toUpperCase()); + }, + }), + new Writable({ + write(chunk, encoding, callback) { + transformed.push(chunk.toString()); + callback(); + }, + }), + common.mustSucceed(() => { + assert.deepStrictEqual(transformed, ['X']); + }), + ); + + assert.doesNotThrow(() => { + pipeline( + Readable.from(['x']), + new TransformStream({ // destination + transform() {}, + }), + () => {}, + ); + }); +} + +// WritableStream +{ + assertInvalidArg(() => { + pipeline( + new WritableStream({ // source + write() {}, + }), + new PassThrough(), + common.mustNotCall(), + ); + }, 'source'); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new WritableStream({ // transform + write() {}, + }), + new PassThrough(), + common.mustNotCall(), + ); + }, 'transform[0]'); + + const values = []; + pipeline( + Readable.from(['x']), + new WritableStream({ // destination + write(chunk) { + values.push(chunk.toString()); + }, + }), + common.mustSucceed(() => { + assert.deepStrictEqual(values, ['x']); + }), + ); +} From ca214e20f9e5d819c31f46356d11635ecfd80bea Mon Sep 17 00:00:00 2001 From: Efe Karasakal Date: Tue, 17 Mar 2026 22:38:13 +0100 Subject: [PATCH 4/4] fix: lint issues --- .../test-stream-pipeline-validation.js | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/test/parallel/test-stream-pipeline-validation.js b/test/parallel/test-stream-pipeline-validation.js index 6aa4e191bd8d35..403a89498eac99 100644 --- a/test/parallel/test-stream-pipeline-validation.js +++ b/test/parallel/test-stream-pipeline-validation.js @@ -150,7 +150,9 @@ function assertInvalidArg(fn, name) { }), ); - writer.write('x').then(common.mustCall(() => writer.close())); + writer.write('x') + .then(() => writer.close()) + .then(common.mustCall()); const transformed = []; pipeline( @@ -171,15 +173,14 @@ function assertInvalidArg(fn, name) { }), ); - assert.doesNotThrow(() => { - pipeline( - Readable.from(['x']), - new TransformStream({ // destination - transform() {}, - }), - () => {}, - ); - }); + // Destination TransformStream should not throw + pipeline( + Readable.from(['x']), + new TransformStream({ // destination + transform() {}, + }), + () => {}, + ); } // WritableStream