From eb326fb2a9a859a4cc8d5479c2eb544129b1e342 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Wed, 13 May 2026 00:12:16 -0700 Subject: [PATCH] stream: validate fromWritable() options before cache Validate options before returning a cached fromWritable() adapter so invalid later options still throw. Cache adapters by backpressure policy as well as Writable instance, since the policy changes write behavior. Fixes: https://github.com/nodejs/node/issues/63277 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- doc/api/stream_iter.md | 5 ++- lib/internal/streams/iter/classic.js | 18 +++++--- ...stream-iter-from-writable-cache-options.js | 45 +++++++++++++++++++ 3 files changed, 61 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-stream-iter-from-writable-cache-options.js diff --git a/doc/api/stream_iter.md b/doc/api/stream_iter.md index 04d416e5b1b855..611f1bf88c7c02 100644 --- a/doc/api/stream_iter.md +++ b/doc/api/stream_iter.md @@ -1543,8 +1543,9 @@ the synchronous Writer methods (`writeSync`, `writevSync`, `endSync`) always return `false` or `-1`, deferring to the async path. The per-write `options.signal` parameter from the Writer interface is also ignored. -The result is cached per instance -- calling `fromWritable()` twice with the -same stream returns the same Writer. +The result is cached per instance and backpressure policy -- calling +`fromWritable()` twice with the same stream and `backpressure` option returns +the same Writer. For duck-typed streams that do not expose `writableHighWaterMark`, `writableLength`, or similar properties, sensible defaults are used. diff --git a/lib/internal/streams/iter/classic.js b/lib/internal/streams/iter/classic.js index 18d1733d6ad648..45109627f22b3a 100644 --- a/lib/internal/streams/iter/classic.js +++ b/lib/internal/streams/iter/classic.js @@ -21,6 +21,7 @@ const { PromiseReject, PromiseResolve, PromiseWithResolvers, + SafeMap, SafeWeakMap, SymbolAsyncDispose, SymbolAsyncIterator, @@ -413,10 +414,6 @@ function fromWritable(writable, options = kNullPrototype) { throw new ERR_INVALID_ARG_TYPE('writable', 'Writable', writable); } - // Return cached adapter if available. - const cached = fromWritableCache.get(writable); - if (cached !== undefined) return cached; - validateObject(options, 'options'); const { backpressure = 'strict', @@ -445,6 +442,17 @@ function fromWritable(writable, options = kNullPrototype) { 'drop-oldest is not supported for classic stream.Writable'); } + // Return cached adapter if available. Backpressure policy changes writer + // behavior, so cache one adapter per policy. + let cachedByBackpressure = fromWritableCache.get(writable); + if (cachedByBackpressure !== undefined) { + const cached = cachedByBackpressure.get(backpressure); + if (cached !== undefined) return cached; + } else { + cachedByBackpressure = new SafeMap(); + fromWritableCache.set(writable, cachedByBackpressure); + } + // Fall back to sensible defaults for duck-typed streams that may not // expose the full stream.Writable property set. const hwm = writable.writableHighWaterMark ?? 16384; @@ -696,7 +704,7 @@ function fromWritable(writable, options = kNullPrototype) { return promise; }; - fromWritableCache.set(writable, writer); + cachedByBackpressure.set(backpressure, writer); return writer; } diff --git a/test/parallel/test-stream-iter-from-writable-cache-options.js b/test/parallel/test-stream-iter-from-writable-cache-options.js new file mode 100644 index 00000000000000..67e673c36f6f2f --- /dev/null +++ b/test/parallel/test-stream-iter-from-writable-cache-options.js @@ -0,0 +1,45 @@ +// Flags: --experimental-stream-iter +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { Writable } = require('stream'); +const { fromWritable } = require('stream/iter'); + +{ + const writable = new Writable({ write() {} }); + + fromWritable(writable); + + assert.throws( + () => fromWritable(writable, { backpressure: 'invalid' }), + { code: 'ERR_INVALID_ARG_VALUE' }, + ); + + writable.destroy(); +} + +async function testCachedWritableUsesLaterBackpressureOptions() { + const chunks = []; + const writable = new Writable({ + highWaterMark: 1, + write(chunk, encoding, callback) { + chunks.push(Buffer.from(chunk)); + }, + }); + + fromWritable(writable); + const writer = fromWritable(writable, { backpressure: 'drop-newest' }); + + await writer.write('a'); + await writer.write('b'); + + assert.deepStrictEqual( + chunks.map((chunk) => chunk.toString()), + ['a'], + ); + + writable.destroy(); +} + +testCachedWritableUsesLaterBackpressureOptions().then(common.mustCall());