Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions lib/internal/streams/iter/pull.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const {
} = require('internal/streams/iter/utils');

const {
drainableProtocol,
kSyncWriteAcceptedOnFalse,
kValidatedTransform,
} = require('internal/streams/iter/types');

Expand Down Expand Up @@ -828,13 +830,33 @@ async function pipeTo(source, ...args) {
const hasWriteSync = typeof writer.writeSync === 'function';
const hasWritevSync = typeof writer.writevSync === 'function';
const hasEndSync = typeof writer.endSync === 'function';
const syncFalseCanBeAccepted = writer[kSyncWriteAcceptedOnFalse] === true;

function syncFalseWasAccepted() {
return syncFalseCanBeAccepted && writer.desiredSize === 0;
}

function waitForSyncBackpressure() {
const ondrain = writer[drainableProtocol];
return ondrain?.call(writer);
}

async function writeBatchAfterAcceptedBackpressure(batch, startIndex) {
await waitForSyncBackpressure();
await writeBatchAsyncFallback(batch, startIndex);
}

// Async fallback for writeBatch when sync write fails partway through.
// Continues writing from batch[startIndex] using async write().
async function writeBatchAsyncFallback(batch, startIndex) {
for (let i = startIndex; i < batch.length; i++) {
const chunk = batch[i];
if (hasWriteSync && writer.writeSync(chunk)) {
// Sync retry succeeded
} else if (syncFalseWasAccepted()) {
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
await waitForSyncBackpressure();
continue;
} else {
const result = writer.write(
chunk, signal ? { __proto__: null, signal } : undefined);
Expand All @@ -852,6 +874,12 @@ async function pipeTo(source, ...args) {
function writeBatch(batch) {
if (hasWritev && batch.length > 1) {
if (!hasWritevSync || !writer.writevSync(batch)) {
if (hasWritevSync && syncFalseWasAccepted()) {
for (let i = 0; i < batch.length; i++) {
totalBytes += TypedArrayPrototypeGetByteLength(batch[i]);
}
return waitForSyncBackpressure();
}
const opts = signal ? { __proto__: null, signal } : undefined;
return PromisePrototypeThen(writer.writev(batch, opts), () => {
for (let i = 0; i < batch.length; i++) {
Expand All @@ -867,6 +895,10 @@ async function pipeTo(source, ...args) {
for (let i = 0; i < batch.length; i++) {
const chunk = batch[i];
if (!hasWriteSync || !writer.writeSync(chunk)) {
if (hasWriteSync && syncFalseWasAccepted()) {
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
return writeBatchAfterAcceptedBackpressure(batch, i + 1);
}
// Sync path failed at index i - fall back to async for the rest.
// Count bytes for chunks already written synchronously (0..i-1).
return writeBatchAsyncFallback(batch, i);
Expand Down
11 changes: 9 additions & 2 deletions lib/internal/streams/iter/push.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {

const {
drainableProtocol,
kSyncWriteAcceptedOnFalse,
} = require('internal/streams/iter/types');

const {
Expand Down Expand Up @@ -556,6 +557,10 @@ class PushWriter {
return this.#queue.desiredSize;
}

get [kSyncWriteAcceptedOnFalse]() {
return this.#queue.backpressurePolicy === 'block';
}

write(chunk, options) {
if (!options?.signal && this.#queue.canWriteSync()) {
const bytes = toUint8Array(chunk);
Expand All @@ -582,7 +587,8 @@ class PushWriter {
writeSync(chunk) {
const bytes = toUint8Array(chunk);
const result = this.#queue.writeSync([bytes]);
if (!result && this.#queue.backpressurePolicy === 'block') {
if (!result && this.#queue.backpressurePolicy === 'block' &&
this.#queue.desiredSize === 0) {
// Block policy: force-enqueue and return false as backpressure signal.
// Data IS accepted; false tells caller to slow down.
this.#queue.forceEnqueue([bytes]);
Expand All @@ -597,7 +603,8 @@ class PushWriter {
}
const bytes = convertChunks(chunks);
const result = this.#queue.writeSync(bytes);
if (!result && this.#queue.backpressurePolicy === 'block') {
if (!result && this.#queue.backpressurePolicy === 'block' &&
this.#queue.desiredSize === 0) {
this.#queue.forceEnqueue(bytes);
return false;
}
Expand Down
3 changes: 3 additions & 0 deletions lib/internal/streams/iter/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,12 @@ const kValidatedTransform = Symbol('kValidatedTransform');
*/
const kValidatedSource = Symbol('kValidatedSource');

const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse');

module.exports = {
broadcastProtocol,
drainableProtocol,
kSyncWriteAcceptedOnFalse,
kValidatedSource,
kValidatedTransform,
shareProtocol,
Expand Down
32 changes: 31 additions & 1 deletion test/parallel/test-stream-iter-pipeto-writev.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

const common = require('../common');
const assert = require('assert');
const { pipeTo, pipeToSync } = require('stream/iter');
const { pipeTo, pipeToSync, push, text } = require('stream/iter');

// Multi-chunk batch with writevSync (sync success path)
async function testWritevSyncSuccess() {
Expand Down Expand Up @@ -104,6 +104,35 @@ async function testWriteSyncAlwaysFails() {
assert.strictEqual(total, 2);
}

// PushWriter block mode accepts sync writes even when returning false for
// backpressure. pipeTo must wait for drain, not retry the same write.
async function assertPushWriterBlockPipeTo(source, expected, expectedTotal) {
const { writer, readable } = push({
highWaterMark: 1,
backpressure: 'block',
});

const pipe = pipeTo(source, writer);
await new Promise(setImmediate);
const data = await text(readable);
const total = await pipe;

assert.strictEqual(data, expected);
assert.strictEqual(total, expectedTotal);
}

async function testPushWriterBlockSyncFalseAccepted() {
await assertPushWriterBlockPipeTo((async function*() {
yield [new Uint8Array([97])];
yield [new Uint8Array([98])];
})(), 'ab', 2);

await assertPushWriterBlockPipeTo((async function*() {
yield [new Uint8Array([97, 98])];
yield [new Uint8Array([99]), new Uint8Array([100])];
})(), 'abcd', 4);
}

// pipeToSync with writevSync
async function testPipeToSyncWritev() {
const batches = [];
Expand Down Expand Up @@ -142,6 +171,7 @@ Promise.all([
testWritevSyncFails(),
testWriteSyncFailsMidBatch(),
testWriteSyncAlwaysFails(),
testPushWriterBlockSyncFalseAccepted(),
testPipeToSyncWritev(),
testPipeToSyncWriteFallback(),
]).then(common.mustCall());
Loading