diff --git a/benchmark/streams/iter-throughput-share.js b/benchmark/streams/iter-throughput-share.js new file mode 100644 index 00000000000000..a0383172c04140 --- /dev/null +++ b/benchmark/streams/iter-throughput-share.js @@ -0,0 +1,32 @@ +'use strict'; + +const common = require('../common.js'); + +const bench = common.createBenchmark(main, { + consumers: [2, 8, 32], + batches: [1e4], + backpressure: ['block'], + n: [5], +}, { + flags: ['--experimental-stream-iter'], +}); + +async function main({ consumers, batches, backpressure, n }) { + const { share, array } = require('stream/iter'); + const chunk = Buffer.alloc(1024); + const totalOps = batches * consumers * n; + + async function* source() { + for (let i = 0; i < batches; i++) { + yield [chunk]; + } + } + + bench.start(); + for (let i = 0; i < n; i++) { + const shared = share(source(), { highWaterMark: 64, backpressure }); + const readers = Array.from({ length: consumers }, () => array(shared.pull())); + await Promise.all(readers); + } + bench.end(totalOps); +} diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index 769f93b5404c30..9b3ccebff9ac89 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -343,8 +343,9 @@ class BroadcastImpl { // Private methods #recomputeMinCursor() { - this.#cachedMinCursor = getMinCursor( + const { minCursor } = getMinCursor( this.#consumers, this.#bufferStart + this.#buffer.length); + this.#cachedMinCursor = minCursor; this.#minCursorDirty = false; } diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index 752c0bfcbcab8f..0160bc7eace009 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -77,6 +77,8 @@ class ShareImpl { #cancelled = false; #pulling = false; #pullWaiters = []; + #cachedMinCursor = 0; + #cachedMinCursorConsumers = 0; constructor(source, options) { this.#source = source; @@ -114,6 +116,14 @@ class ShareImpl { }; this.#consumers.add(state); + if (this.#consumers.size === 1) { + this.#cachedMinCursor = state.cursor; + this.#cachedMinCursorConsumers = 1; + } else if (state.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers++; + } else { + this.#recomputeMinCursor(); + } const self = this; return { @@ -139,7 +149,7 @@ class ShareImpl { if (self.#cancelled) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } @@ -147,14 +157,18 @@ class ShareImpl { const bufferIndex = state.cursor - self.#bufferStart; if (bufferIndex < self.#buffer.length) { const chunk = self.#buffer.get(bufferIndex); + const cursor = state.cursor; state.cursor++; - self.#tryTrimBuffer(); + if (cursor === self.#cachedMinCursor && + --self.#cachedMinCursorConsumers === 0) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: false, value: chunk }; } if (self.#sourceExhausted) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); if (self.#sourceError) throw self.#sourceError; return { __proto__: null, done: true, value: undefined }; } @@ -163,7 +177,7 @@ class ShareImpl { const canPull = await self.#waitForBufferSpace(); if (!canPull) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); if (self.#sourceError) throw self.#sourceError; return { __proto__: null, done: true, value: undefined }; } @@ -176,8 +190,9 @@ class ShareImpl { state.detached = true; state.resolve = null; state.reject = null; - self.#consumers.delete(state); - self.#tryTrimBuffer(); + if (self.#deleteConsumer(state)) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: true, value: undefined }; }, @@ -185,8 +200,9 @@ class ShareImpl { state.detached = true; state.resolve = null; state.reject = null; - self.#consumers.delete(state); - self.#tryTrimBuffer(); + if (self.#deleteConsumer(state)) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: true, value: undefined }; }, }; @@ -254,9 +270,11 @@ class ShareImpl { this.#bufferStart++; for (const consumer of this.#consumers) { if (consumer.cursor < this.#bufferStart) { + this.#deleteConsumerFromMin(consumer); consumer.cursor = this.#bufferStart; } } + this.#recomputeMinCursor(); return true; case 'drop-newest': return true; @@ -324,18 +342,41 @@ class ShareImpl { } #tryTrimBuffer() { - const minCursor = getMinCursor( - this.#consumers, this.#bufferStart + this.#buffer.length); - const trimCount = minCursor - this.#bufferStart; + if (this.#cachedMinCursorConsumers === 0) { + this.#recomputeMinCursor(); + } + const trimCount = this.#cachedMinCursor - this.#bufferStart; if (trimCount > 0) { this.#buffer.trimFront(trimCount); - this.#bufferStart = minCursor; + this.#bufferStart = this.#cachedMinCursor; for (let i = 0; i < this.#pullWaiters.length; i++) { this.#pullWaiters[i](); } this.#pullWaiters = []; } } + + #recomputeMinCursor() { + const { minCursor, minCursorConsumers } = getMinCursor( + this.#consumers, this.#bufferStart + this.#buffer.length); + this.#cachedMinCursor = minCursor; + this.#cachedMinCursorConsumers = minCursorConsumers; + } + + #deleteConsumerFromMin(consumer) { + if (consumer.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers--; + return this.#cachedMinCursorConsumers === 0; + } + return false; + } + + #deleteConsumer(consumer) { + if (this.#consumers.delete(consumer)) { + return this.#deleteConsumerFromMin(consumer); + } + return false; + } } // ============================================================================= @@ -352,6 +393,8 @@ class SyncShareImpl { #sourceExhausted = false; #sourceError = null; #cancelled = false; + #cachedMinCursor = 0; + #cachedMinCursorConsumers = 0; constructor(source, options) { this.#source = source; @@ -383,6 +426,14 @@ class SyncShareImpl { }; this.#consumers.add(state); + if (this.#consumers.size === 1) { + this.#cachedMinCursor = state.cursor; + this.#cachedMinCursorConsumers = 1; + } else if (state.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers++; + } else { + this.#recomputeMinCursor(); + } const self = this; return { @@ -396,26 +447,30 @@ class SyncShareImpl { } if (self.#sourceError) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); throw self.#sourceError; } if (self.#cancelled) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } const bufferIndex = state.cursor - self.#bufferStart; if (bufferIndex < self.#buffer.length) { const chunk = self.#buffer.get(bufferIndex); + const cursor = state.cursor; state.cursor++; - self.#tryTrimBuffer(); + if (cursor === self.#cachedMinCursor && + --self.#cachedMinCursorConsumers === 0) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: false, value: chunk }; } if (self.#sourceExhausted) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } @@ -436,13 +491,15 @@ class SyncShareImpl { self.#bufferStart++; for (const consumer of self.#consumers) { if (consumer.cursor < self.#bufferStart) { + self.#deleteConsumerFromMin(consumer); consumer.cursor = self.#bufferStart; } } + self.#recomputeMinCursor(); break; case 'drop-newest': state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } } @@ -451,21 +508,25 @@ class SyncShareImpl { if (self.#sourceError) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); throw self.#sourceError; } const newBufferIndex = state.cursor - self.#bufferStart; if (newBufferIndex < self.#buffer.length) { const chunk = self.#buffer.get(newBufferIndex); + const cursor = state.cursor; state.cursor++; - self.#tryTrimBuffer(); + if (cursor === self.#cachedMinCursor && + --self.#cachedMinCursorConsumers === 0) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: false, value: chunk }; } if (self.#sourceExhausted) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } @@ -474,15 +535,17 @@ class SyncShareImpl { return() { state.detached = true; - self.#consumers.delete(state); - self.#tryTrimBuffer(); + if (self.#deleteConsumer(state)) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: true, value: undefined }; }, throw() { state.detached = true; - self.#consumers.delete(state); - self.#tryTrimBuffer(); + if (self.#deleteConsumer(state)) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: true, value: undefined }; }, }; @@ -532,13 +595,36 @@ class SyncShareImpl { } #tryTrimBuffer() { - const minCursor = getMinCursor( - this.#consumers, this.#bufferStart + this.#buffer.length); - const trimCount = minCursor - this.#bufferStart; + if (this.#cachedMinCursorConsumers === 0) { + this.#recomputeMinCursor(); + } + const trimCount = this.#cachedMinCursor - this.#bufferStart; if (trimCount > 0) { this.#buffer.trimFront(trimCount); - this.#bufferStart = minCursor; + this.#bufferStart = this.#cachedMinCursor; + } + } + + #recomputeMinCursor() { + const { minCursor, minCursorConsumers } = getMinCursor( + this.#consumers, this.#bufferStart + this.#buffer.length); + this.#cachedMinCursor = minCursor; + this.#cachedMinCursorConsumers = minCursorConsumers; + } + + #deleteConsumerFromMin(consumer) { + if (consumer.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers--; + return this.#cachedMinCursorConsumers === 0; + } + return false; + } + + #deleteConsumer(consumer) { + if (this.#consumers.delete(consumer)) { + return this.#deleteConsumerFromMin(consumer); } + return false; } } diff --git a/lib/internal/streams/iter/utils.js b/lib/internal/streams/iter/utils.js index 0520630b09c4b8..7829afaade832f 100644 --- a/lib/internal/streams/iter/utils.js +++ b/lib/internal/streams/iter/utils.js @@ -70,20 +70,24 @@ function onSignalAbort(signal, handler) { } /** - * Compute the minimum cursor across a set of consumers. - * Returns fallback if the set is empty. + * Compute the minimum cursor across a set of consumers and count how many + * consumers are at that cursor. * @param {Set} consumers - Set of objects with a `cursor` property - * @param {number} fallback - Value to return when set is empty - * @returns {number} + * @param {number} fallback - Cursor to return when set is empty + * @returns {{ minCursor: number, minCursorConsumers: number }} */ function getMinCursor(consumers, fallback) { - let min = Infinity; + let minCursor = fallback; + let minCursorConsumers = 0; for (const consumer of consumers) { - if (consumer.cursor < min) { - min = consumer.cursor; + if (consumer.cursor < minCursor) { + minCursor = consumer.cursor; + minCursorConsumers = 1; + } else if (consumer.cursor === minCursor) { + minCursorConsumers++; } } - return min === Infinity ? fallback : min; + return { __proto__: null, minCursor, minCursorConsumers }; } /**