diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ad3f0796875aae..4359c9a5d47c2f 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -13,7 +13,6 @@ const { const { eos } = require('internal/streams/end-of-stream'); const { once } = require('internal/util'); const destroyImpl = require('internal/streams/destroy'); -const Duplex = require('internal/streams/duplex'); const { AbortError, aggregateTwoErrors, @@ -36,6 +35,8 @@ const { isIterable, isReadable, isReadableNodeStream, + isWritableNodeStream, + isWritableStream, isNodeStream, isTransformStream, isWebStream, @@ -159,7 +160,8 @@ async function pumpToWeb(readable, writable, finish, { end }) { try { for await (const chunk of readable) { await writer.ready; - writer.write(chunk).catch(() => {}); + writer.write(chunk).catch(() => { + }); } await writer.ready; @@ -179,6 +181,68 @@ async function pumpToWeb(readable, writable, finish, { end }) { } } +function isValidPipelineSource(stream) { + return ( + isIterable(stream) || + isReadableNodeStream(stream) || + isReadableStream(stream) || + isTransformStream(stream) + ); +} + +function isValidPipelineTransform(stream) { + return ( + isTransformStream(stream) || + (isNodeStream(stream) && + isReadableNodeStream(stream) && + isWritableNodeStream(stream)) + ); +} + +function isValidPipelineDestination(stream) { + return ( + isWritableNodeStream(stream) || + isWritableStream(stream) || + isTransformStream(stream) + ); +} + +function validatePipelineStream(stream, i, len) { + if (typeof stream === 'function') { + return; + } + + if (i === 0) { + if (!isValidPipelineSource(stream)) { + throw new ERR_INVALID_ARG_TYPE( + 'source', + ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], + stream, + ); + } + return; + } + + if (i === len - 1) { + if (!isValidPipelineDestination(stream)) { + throw new ERR_INVALID_ARG_TYPE( + 'destination', + ['Writable', 'WritableStream', 'TransformStream'], + stream, + ); + } + return; + } + + if (!isValidPipelineTransform(stream)) { + throw new ERR_INVALID_ARG_TYPE( + `transform[${i - 1}]`, + ['Duplex', 'Transform', 'TransformStream'], + stream, + ); + } +} + function pipeline(...streams) { return pipelineImpl(streams, once(popCallback(streams))); } @@ -192,6 +256,10 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_MISSING_ARGS('streams'); } + for (let i = 0; i < streams.length; i++) { + validatePipelineStream(streams[i], i, streams.length); + } + const ac = new AbortController(); const signal = ac.signal; const outerSignal = opts?.signal; @@ -298,10 +366,8 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { - ret = stream; } else { - ret = Duplex.from(stream); + ret = stream; } } else if (typeof stream === 'function') { if (isTransformStream(ret)) { @@ -402,8 +468,6 @@ function pipelineImpl(streams, callback, opts) { 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); } ret = stream; - } else { - ret = Duplex.from(stream); } } diff --git a/test/parallel/test-stream-pipeline-validation.js b/test/parallel/test-stream-pipeline-validation.js new file mode 100644 index 00000000000000..403a89498eac99 --- /dev/null +++ b/test/parallel/test-stream-pipeline-validation.js @@ -0,0 +1,221 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { Readable, Writable, PassThrough, pipeline } = require('stream'); +const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); + +class NonReadableNodeStream extends Writable { + _write(chunk, encoding, callback) { + callback(); + } +} + +class NonWritableNodeStream extends Readable { + _read() { + this.push('x'); + this.push(null); + } +} + +function assertInvalidArg(fn, name) { + assert.throws(fn, (err) => { + assert.strictEqual(err.code, 'ERR_INVALID_ARG_TYPE'); + assert.strictEqual(err.message.includes(`The "${name}"`), true); + return true; + }); +} + +// Non-readable Node stream +{ + assertInvalidArg(() => { + pipeline( + new NonReadableNodeStream(), // source + new PassThrough(), + common.mustNotCall(), + ); + }, 'source'); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new PassThrough(), + new NonReadableNodeStream(), // transform + new PassThrough(), + common.mustNotCall(), + ); + }, 'transform[1]'); + + pipeline( + Readable.from(['x']), + new NonReadableNodeStream(), // destination + common.mustSucceed(), + ); +} + +// Non-writable Node stream +{ + pipeline( + new NonWritableNodeStream(), // source + new Writable({ + write(chunk, encoding, callback) { + callback(); + }, + }), + common.mustSucceed(), + ); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new NonWritableNodeStream(), // transform + new PassThrough(), + common.mustNotCall(), + ); + }, 'transform[0]'); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new NonWritableNodeStream(), // destination + common.mustNotCall(), + ); + }, 'destination'); +} + +// ReadableStream +{ + const chunks = []; + pipeline( + new ReadableStream({ // source + start(controller) { + controller.enqueue('x'); + controller.close(); + }, + }), + new Writable({ + write(chunk, encoding, callback) { + chunks.push(chunk.toString()); + callback(); + }, + }), + common.mustSucceed(() => { + assert.deepStrictEqual(chunks, ['x']); + }), + ); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new ReadableStream({ // transform + start(controller) { + controller.enqueue('x'); + controller.close(); + }, + }), + new PassThrough(), + common.mustNotCall(), + ); + }, 'transform[0]'); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new ReadableStream({ // destination + start(controller) { + controller.enqueue('x'); + controller.close(); + }, + }), + common.mustNotCall(), + ); + }, 'destination'); +} + +// TransformStream +{ + const source = new TransformStream(); + const writer = source.writable.getWriter(); + const seen = []; + pipeline( + source, // source + new Writable({ + write(chunk, encoding, callback) { + seen.push(chunk.toString()); + callback(); + }, + }), + common.mustSucceed(() => { + assert.deepStrictEqual(seen, ['x']); + }), + ); + + writer.write('x') + .then(() => writer.close()) + .then(common.mustCall()); + + const transformed = []; + pipeline( + Readable.from(['x']), + new TransformStream({ // transform + transform(chunk, controller) { + controller.enqueue(chunk.toString().toUpperCase()); + }, + }), + new Writable({ + write(chunk, encoding, callback) { + transformed.push(chunk.toString()); + callback(); + }, + }), + common.mustSucceed(() => { + assert.deepStrictEqual(transformed, ['X']); + }), + ); + + // Destination TransformStream should not throw + pipeline( + Readable.from(['x']), + new TransformStream({ // destination + transform() {}, + }), + () => {}, + ); +} + +// WritableStream +{ + assertInvalidArg(() => { + pipeline( + new WritableStream({ // source + write() {}, + }), + new PassThrough(), + common.mustNotCall(), + ); + }, 'source'); + + assertInvalidArg(() => { + pipeline( + Readable.from(['x']), + new WritableStream({ // transform + write() {}, + }), + new PassThrough(), + common.mustNotCall(), + ); + }, 'transform[0]'); + + const values = []; + pipeline( + Readable.from(['x']), + new WritableStream({ // destination + write(chunk) { + values.push(chunk.toString()); + }, + }), + common.mustSucceed(() => { + assert.deepStrictEqual(values, ['x']); + }), + ); +}