Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions doc/api/stream_iter.md
Original file line number Diff line number Diff line change
Expand Up @@ -1543,8 +1543,9 @@ the synchronous Writer methods (`writeSync`, `writevSync`, `endSync`) always
return `false` or `-1`, deferring to the async path. The per-write
`options.signal` parameter from the Writer interface is also ignored.

The result is cached per instance -- calling `fromWritable()` twice with the
same stream returns the same Writer.
The result is cached per instance and backpressure policy -- calling
`fromWritable()` twice with the same stream and `backpressure` option returns
the same Writer.

For duck-typed streams that do not expose `writableHighWaterMark`,
`writableLength`, or similar properties, sensible defaults are used.
Expand Down
18 changes: 13 additions & 5 deletions lib/internal/streams/iter/classic.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const {
PromiseReject,
PromiseResolve,
PromiseWithResolvers,
SafeMap,
SafeWeakMap,
SymbolAsyncDispose,
SymbolAsyncIterator,
Expand Down Expand Up @@ -413,10 +414,6 @@ function fromWritable(writable, options = kNullPrototype) {
throw new ERR_INVALID_ARG_TYPE('writable', 'Writable', writable);
}

// Return cached adapter if available.
const cached = fromWritableCache.get(writable);
if (cached !== undefined) return cached;

validateObject(options, 'options');
const {
backpressure = 'strict',
Expand Down Expand Up @@ -445,6 +442,17 @@ function fromWritable(writable, options = kNullPrototype) {
'drop-oldest is not supported for classic stream.Writable');
}

// Return cached adapter if available. Backpressure policy changes writer
// behavior, so cache one adapter per policy.
let cachedByBackpressure = fromWritableCache.get(writable);
if (cachedByBackpressure !== undefined) {
const cached = cachedByBackpressure.get(backpressure);
if (cached !== undefined) return cached;
} else {
cachedByBackpressure = new SafeMap();
fromWritableCache.set(writable, cachedByBackpressure);
}

// Fall back to sensible defaults for duck-typed streams that may not
// expose the full stream.Writable property set.
const hwm = writable.writableHighWaterMark ?? 16384;
Expand Down Expand Up @@ -696,7 +704,7 @@ function fromWritable(writable, options = kNullPrototype) {
return promise;
};

fromWritableCache.set(writable, writer);
cachedByBackpressure.set(backpressure, writer);
return writer;
}

Expand Down
45 changes: 45 additions & 0 deletions test/parallel/test-stream-iter-from-writable-cache-options.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Flags: --experimental-stream-iter
'use strict';

const common = require('../common');
const assert = require('assert');
const { Writable } = require('stream');
const { fromWritable } = require('stream/iter');

{
const writable = new Writable({ write() {} });

fromWritable(writable);

assert.throws(
() => fromWritable(writable, { backpressure: 'invalid' }),
{ code: 'ERR_INVALID_ARG_VALUE' },
);

writable.destroy();
}

async function testCachedWritableUsesLaterBackpressureOptions() {
const chunks = [];
const writable = new Writable({
highWaterMark: 1,
write(chunk, encoding, callback) {
chunks.push(Buffer.from(chunk));
},
});

fromWritable(writable);
const writer = fromWritable(writable, { backpressure: 'drop-newest' });

await writer.write('a');
await writer.write('b');

assert.deepStrictEqual(
chunks.map((chunk) => chunk.toString()),
['a'],
);

writable.destroy();
}

testCachedWritableUsesLaterBackpressureOptions().then(common.mustCall());
Loading