From 921ab42c34fe335c62828dbe94d8f0919c6c4b41 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Wed, 13 May 2026 19:29:17 -0700 Subject: [PATCH] stream: avoid retrying accepted pipeTo writes PushWriter in block backpressure mode can return false from writeSync() and writevSync() after accepting data. Treat that false return as backpressure and wait for drain instead of retrying the same chunks asynchronously. Fixes: https://github.com/nodejs/node/issues/63296 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/pull.js | 32 +++++++++++++++++++ lib/internal/streams/iter/push.js | 11 +++++-- lib/internal/streams/iter/types.js | 3 ++ .../test-stream-iter-pipeto-writev.js | 32 ++++++++++++++++++- 4 files changed, 75 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index b4a7678237f465..5b004c58a5e995 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -51,6 +51,8 @@ const { } = require('internal/streams/iter/utils'); const { + drainableProtocol, + kSyncWriteAcceptedOnFalse, kValidatedTransform, } = require('internal/streams/iter/types'); @@ -828,6 +830,22 @@ async function pipeTo(source, ...args) { const hasWriteSync = typeof writer.writeSync === 'function'; const hasWritevSync = typeof writer.writevSync === 'function'; const hasEndSync = typeof writer.endSync === 'function'; + const syncFalseCanBeAccepted = writer[kSyncWriteAcceptedOnFalse] === true; + + function syncFalseWasAccepted() { + return syncFalseCanBeAccepted && writer.desiredSize === 0; + } + + function waitForSyncBackpressure() { + const ondrain = writer[drainableProtocol]; + return ondrain?.call(writer); + } + + async function writeBatchAfterAcceptedBackpressure(batch, startIndex) { + await waitForSyncBackpressure(); + await writeBatchAsyncFallback(batch, startIndex); + } + // Async fallback for writeBatch when sync write fails partway through. // Continues writing from batch[startIndex] using async write(). async function writeBatchAsyncFallback(batch, startIndex) { @@ -835,6 +853,10 @@ async function pipeTo(source, ...args) { const chunk = batch[i]; if (hasWriteSync && writer.writeSync(chunk)) { // Sync retry succeeded + } else if (syncFalseWasAccepted()) { + totalBytes += TypedArrayPrototypeGetByteLength(chunk); + await waitForSyncBackpressure(); + continue; } else { const result = writer.write( chunk, signal ? { __proto__: null, signal } : undefined); @@ -852,6 +874,12 @@ async function pipeTo(source, ...args) { function writeBatch(batch) { if (hasWritev && batch.length > 1) { if (!hasWritevSync || !writer.writevSync(batch)) { + if (hasWritevSync && syncFalseWasAccepted()) { + for (let i = 0; i < batch.length; i++) { + totalBytes += TypedArrayPrototypeGetByteLength(batch[i]); + } + return waitForSyncBackpressure(); + } const opts = signal ? { __proto__: null, signal } : undefined; return PromisePrototypeThen(writer.writev(batch, opts), () => { for (let i = 0; i < batch.length; i++) { @@ -867,6 +895,10 @@ async function pipeTo(source, ...args) { for (let i = 0; i < batch.length; i++) { const chunk = batch[i]; if (!hasWriteSync || !writer.writeSync(chunk)) { + if (hasWriteSync && syncFalseWasAccepted()) { + totalBytes += TypedArrayPrototypeGetByteLength(chunk); + return writeBatchAfterAcceptedBackpressure(batch, i + 1); + } // Sync path failed at index i - fall back to async for the rest. // Count bytes for chunks already written synchronously (0..i-1). return writeBatchAsyncFallback(batch, i); diff --git a/lib/internal/streams/iter/push.js b/lib/internal/streams/iter/push.js index 4c0b3240d45fdb..71e009e7cb64c7 100644 --- a/lib/internal/streams/iter/push.js +++ b/lib/internal/streams/iter/push.js @@ -32,6 +32,7 @@ const { const { drainableProtocol, + kSyncWriteAcceptedOnFalse, } = require('internal/streams/iter/types'); const { @@ -556,6 +557,10 @@ class PushWriter { return this.#queue.desiredSize; } + get [kSyncWriteAcceptedOnFalse]() { + return this.#queue.backpressurePolicy === 'block'; + } + write(chunk, options) { if (!options?.signal && this.#queue.canWriteSync()) { const bytes = toUint8Array(chunk); @@ -582,7 +587,8 @@ class PushWriter { writeSync(chunk) { const bytes = toUint8Array(chunk); const result = this.#queue.writeSync([bytes]); - if (!result && this.#queue.backpressurePolicy === 'block') { + if (!result && this.#queue.backpressurePolicy === 'block' && + this.#queue.desiredSize === 0) { // Block policy: force-enqueue and return false as backpressure signal. // Data IS accepted; false tells caller to slow down. this.#queue.forceEnqueue([bytes]); @@ -597,7 +603,8 @@ class PushWriter { } const bytes = convertChunks(chunks); const result = this.#queue.writeSync(bytes); - if (!result && this.#queue.backpressurePolicy === 'block') { + if (!result && this.#queue.backpressurePolicy === 'block' && + this.#queue.desiredSize === 0) { this.#queue.forceEnqueue(bytes); return false; } diff --git a/lib/internal/streams/iter/types.js b/lib/internal/streams/iter/types.js index 99ddc8fd582770..71112b1515c081 100644 --- a/lib/internal/streams/iter/types.js +++ b/lib/internal/streams/iter/types.js @@ -64,9 +64,12 @@ const kValidatedTransform = Symbol('kValidatedTransform'); */ const kValidatedSource = Symbol('kValidatedSource'); +const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse'); + module.exports = { broadcastProtocol, drainableProtocol, + kSyncWriteAcceptedOnFalse, kValidatedSource, kValidatedTransform, shareProtocol, diff --git a/test/parallel/test-stream-iter-pipeto-writev.js b/test/parallel/test-stream-iter-pipeto-writev.js index 505bdd6d2b2ced..d45c76a19332d2 100644 --- a/test/parallel/test-stream-iter-pipeto-writev.js +++ b/test/parallel/test-stream-iter-pipeto-writev.js @@ -5,7 +5,7 @@ const common = require('../common'); const assert = require('assert'); -const { pipeTo, pipeToSync } = require('stream/iter'); +const { pipeTo, pipeToSync, push, text } = require('stream/iter'); // Multi-chunk batch with writevSync (sync success path) async function testWritevSyncSuccess() { @@ -104,6 +104,35 @@ async function testWriteSyncAlwaysFails() { assert.strictEqual(total, 2); } +// PushWriter block mode accepts sync writes even when returning false for +// backpressure. pipeTo must wait for drain, not retry the same write. +async function assertPushWriterBlockPipeTo(source, expected, expectedTotal) { + const { writer, readable } = push({ + highWaterMark: 1, + backpressure: 'block', + }); + + const pipe = pipeTo(source, writer); + await new Promise(setImmediate); + const data = await text(readable); + const total = await pipe; + + assert.strictEqual(data, expected); + assert.strictEqual(total, expectedTotal); +} + +async function testPushWriterBlockSyncFalseAccepted() { + await assertPushWriterBlockPipeTo((async function*() { + yield [new Uint8Array([97])]; + yield [new Uint8Array([98])]; + })(), 'ab', 2); + + await assertPushWriterBlockPipeTo((async function*() { + yield [new Uint8Array([97, 98])]; + yield [new Uint8Array([99]), new Uint8Array([100])]; + })(), 'abcd', 4); +} + // pipeToSync with writevSync async function testPipeToSyncWritev() { const batches = []; @@ -142,6 +171,7 @@ Promise.all([ testWritevSyncFails(), testWriteSyncFailsMidBatch(), testWriteSyncAlwaysFails(), + testPushWriterBlockSyncFalseAccepted(), testPipeToSyncWritev(), testPipeToSyncWriteFallback(), ]).then(common.mustCall());