diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index b4a7678237f465..5b004c58a5e995 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -51,6 +51,8 @@ const { } = require('internal/streams/iter/utils'); const { + drainableProtocol, + kSyncWriteAcceptedOnFalse, kValidatedTransform, } = require('internal/streams/iter/types'); @@ -828,6 +830,22 @@ async function pipeTo(source, ...args) { const hasWriteSync = typeof writer.writeSync === 'function'; const hasWritevSync = typeof writer.writevSync === 'function'; const hasEndSync = typeof writer.endSync === 'function'; + const syncFalseCanBeAccepted = writer[kSyncWriteAcceptedOnFalse] === true; + + function syncFalseWasAccepted() { + return syncFalseCanBeAccepted && writer.desiredSize === 0; + } + + function waitForSyncBackpressure() { + const ondrain = writer[drainableProtocol]; + return ondrain?.call(writer); + } + + async function writeBatchAfterAcceptedBackpressure(batch, startIndex) { + await waitForSyncBackpressure(); + await writeBatchAsyncFallback(batch, startIndex); + } + // Async fallback for writeBatch when sync write fails partway through. // Continues writing from batch[startIndex] using async write(). async function writeBatchAsyncFallback(batch, startIndex) { @@ -835,6 +853,10 @@ async function pipeTo(source, ...args) { const chunk = batch[i]; if (hasWriteSync && writer.writeSync(chunk)) { // Sync retry succeeded + } else if (syncFalseWasAccepted()) { + totalBytes += TypedArrayPrototypeGetByteLength(chunk); + await waitForSyncBackpressure(); + continue; } else { const result = writer.write( chunk, signal ? { __proto__: null, signal } : undefined); @@ -852,6 +874,12 @@ async function pipeTo(source, ...args) { function writeBatch(batch) { if (hasWritev && batch.length > 1) { if (!hasWritevSync || !writer.writevSync(batch)) { + if (hasWritevSync && syncFalseWasAccepted()) { + for (let i = 0; i < batch.length; i++) { + totalBytes += TypedArrayPrototypeGetByteLength(batch[i]); + } + return waitForSyncBackpressure(); + } const opts = signal ? { __proto__: null, signal } : undefined; return PromisePrototypeThen(writer.writev(batch, opts), () => { for (let i = 0; i < batch.length; i++) { @@ -867,6 +895,10 @@ async function pipeTo(source, ...args) { for (let i = 0; i < batch.length; i++) { const chunk = batch[i]; if (!hasWriteSync || !writer.writeSync(chunk)) { + if (hasWriteSync && syncFalseWasAccepted()) { + totalBytes += TypedArrayPrototypeGetByteLength(chunk); + return writeBatchAfterAcceptedBackpressure(batch, i + 1); + } // Sync path failed at index i - fall back to async for the rest. // Count bytes for chunks already written synchronously (0..i-1). return writeBatchAsyncFallback(batch, i); diff --git a/lib/internal/streams/iter/push.js b/lib/internal/streams/iter/push.js index 4c0b3240d45fdb..71e009e7cb64c7 100644 --- a/lib/internal/streams/iter/push.js +++ b/lib/internal/streams/iter/push.js @@ -32,6 +32,7 @@ const { const { drainableProtocol, + kSyncWriteAcceptedOnFalse, } = require('internal/streams/iter/types'); const { @@ -556,6 +557,10 @@ class PushWriter { return this.#queue.desiredSize; } + get [kSyncWriteAcceptedOnFalse]() { + return this.#queue.backpressurePolicy === 'block'; + } + write(chunk, options) { if (!options?.signal && this.#queue.canWriteSync()) { const bytes = toUint8Array(chunk); @@ -582,7 +587,8 @@ class PushWriter { writeSync(chunk) { const bytes = toUint8Array(chunk); const result = this.#queue.writeSync([bytes]); - if (!result && this.#queue.backpressurePolicy === 'block') { + if (!result && this.#queue.backpressurePolicy === 'block' && + this.#queue.desiredSize === 0) { // Block policy: force-enqueue and return false as backpressure signal. // Data IS accepted; false tells caller to slow down. this.#queue.forceEnqueue([bytes]); @@ -597,7 +603,8 @@ class PushWriter { } const bytes = convertChunks(chunks); const result = this.#queue.writeSync(bytes); - if (!result && this.#queue.backpressurePolicy === 'block') { + if (!result && this.#queue.backpressurePolicy === 'block' && + this.#queue.desiredSize === 0) { this.#queue.forceEnqueue(bytes); return false; } diff --git a/lib/internal/streams/iter/types.js b/lib/internal/streams/iter/types.js index 99ddc8fd582770..71112b1515c081 100644 --- a/lib/internal/streams/iter/types.js +++ b/lib/internal/streams/iter/types.js @@ -64,9 +64,12 @@ const kValidatedTransform = Symbol('kValidatedTransform'); */ const kValidatedSource = Symbol('kValidatedSource'); +const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse'); + module.exports = { broadcastProtocol, drainableProtocol, + kSyncWriteAcceptedOnFalse, kValidatedSource, kValidatedTransform, shareProtocol, diff --git a/test/parallel/test-stream-iter-pipeto-writev.js b/test/parallel/test-stream-iter-pipeto-writev.js index 505bdd6d2b2ced..d45c76a19332d2 100644 --- a/test/parallel/test-stream-iter-pipeto-writev.js +++ b/test/parallel/test-stream-iter-pipeto-writev.js @@ -5,7 +5,7 @@ const common = require('../common'); const assert = require('assert'); -const { pipeTo, pipeToSync } = require('stream/iter'); +const { pipeTo, pipeToSync, push, text } = require('stream/iter'); // Multi-chunk batch with writevSync (sync success path) async function testWritevSyncSuccess() { @@ -104,6 +104,35 @@ async function testWriteSyncAlwaysFails() { assert.strictEqual(total, 2); } +// PushWriter block mode accepts sync writes even when returning false for +// backpressure. pipeTo must wait for drain, not retry the same write. +async function assertPushWriterBlockPipeTo(source, expected, expectedTotal) { + const { writer, readable } = push({ + highWaterMark: 1, + backpressure: 'block', + }); + + const pipe = pipeTo(source, writer); + await new Promise(setImmediate); + const data = await text(readable); + const total = await pipe; + + assert.strictEqual(data, expected); + assert.strictEqual(total, expectedTotal); +} + +async function testPushWriterBlockSyncFalseAccepted() { + await assertPushWriterBlockPipeTo((async function*() { + yield [new Uint8Array([97])]; + yield [new Uint8Array([98])]; + })(), 'ab', 2); + + await assertPushWriterBlockPipeTo((async function*() { + yield [new Uint8Array([97, 98])]; + yield [new Uint8Array([99]), new Uint8Array([100])]; + })(), 'abcd', 4); +} + // pipeToSync with writevSync async function testPipeToSyncWritev() { const batches = []; @@ -142,6 +171,7 @@ Promise.all([ testWritevSyncFails(), testWriteSyncFailsMidBatch(), testWriteSyncAlwaysFails(), + testPushWriterBlockSyncFalseAccepted(), testPipeToSyncWritev(), testPipeToSyncWriteFallback(), ]).then(common.mustCall());