From 63c31c3588515143cd1dc77a3cabb405d95592f8 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 03:05:36 +0000 Subject: [PATCH 1/2] Add non-blocking TryBeginInvoke and clarify threading model Adds the realtime-oriented invoker work from the .NET VST3 effects host plan (upstream roadmap item #4): - TryBeginInvoke(Action): a non-blocking, allocation-free, fire-and-forget path that marshals work onto the owner thread. Backed by a new bounded, lock-free multi-producer queue (Dmitry Vyukov's bounded MPMC algorithm) so producers neither block nor allocate; returns false when the bounded queue is full instead of growing unboundedly. Drained in FIFO order by DoInvokes alongside the existing Task queue. Runs immediately when called on the owner thread. - New Invoker(int beginInvokeCapacity) constructor to size the queue. - XML-doc clarification that Invoker is owner-thread-oriented and that the blocking/async paths (which allocate a Task and may block) must never be called from a hard real-time audio thread; audio->UI telemetry should use a dedicated SPSC ring buffer instead. Includes MSTest coverage: immediate owner-thread execution, cross-thread queueing, FIFO ordering, full-queue rejection, and a concurrent multi-producer delivery test. --- Invoker.Test/TryBeginInvokeTests.cs | 138 ++++++++++++++++++++++++++ Invoker/BoundedMpscQueue.cs | 149 ++++++++++++++++++++++++++++ Invoker/Invoker.cs | 80 +++++++++++++++ 3 files changed, 367 insertions(+) create mode 100644 Invoker.Test/TryBeginInvokeTests.cs create mode 100644 Invoker/BoundedMpscQueue.cs diff --git a/Invoker.Test/TryBeginInvokeTests.cs b/Invoker.Test/TryBeginInvokeTests.cs new file mode 100644 index 0000000..cfb1564 --- /dev/null +++ b/Invoker.Test/TryBeginInvokeTests.cs @@ -0,0 +1,138 @@ +// Copyright (c) ktsu.dev +// All rights reserved. +// Licensed under the MIT license. + +namespace ktsu.Invoker.Test; + +using Microsoft.VisualStudio.TestTools.UnitTesting; + +[TestClass] +public class TryBeginInvokeTests +{ + [TestMethod] + public void TryBeginInvokeNullShouldThrowArgumentNullException() + { + Invoker invoker = new(); + Assert.ThrowsExactly(() => invoker.TryBeginInvoke(null!)); + } + + [TestMethod] + public void TryBeginInvokeSameThreadShouldInvokeImmediately() + { + Invoker invoker = new(); + bool invoked = false; + bool result = invoker.TryBeginInvoke(() => invoked = true); + Assert.IsTrue(result, "Should report success when invoked on the owner thread."); + Assert.IsTrue(invoked, "Action should be invoked immediately on the owner thread."); + } + + [TestMethod] + public void TryBeginInvokeOtherThreadShouldQueueUntilDoInvokes() + { + Invoker invoker = new(); + bool invoked = false; + + Thread thread = new(() => invoker.TryBeginInvoke(() => invoked = true)); + thread.Start(); + thread.Join(); + + Assert.IsFalse(invoked, "Action should not run before DoInvokes is pumped."); + invoker.DoInvokes(); + Assert.IsTrue(invoked, "Action should run when DoInvokes is pumped on the owner thread."); + } + + [TestMethod] + public void TryBeginInvokePreservesFifoOrder() + { + Invoker invoker = new(); + List order = []; + + Thread thread = new(() => + { + for (int i = 0; i < 50; i++) + { + int captured = i; + invoker.TryBeginInvoke(() => order.Add(captured)); + } + }); + thread.Start(); + thread.Join(); + + invoker.DoInvokes(); + + Assert.HasCount(50, order); + for (int i = 0; i < 50; i++) + { + Assert.AreEqual(i, order[i], "Actions should execute in the order they were queued."); + } + } + + [TestMethod] + public void TryBeginInvokeReturnsFalseWhenQueueFull() + { + // Small capacity (rounded up to a power of two) so we can fill it from a non-owner thread. + Invoker invoker = new(2); + bool sawFull = false; + int accepted = 0; + + Thread thread = new(() => + { + for (int i = 0; i < 100; i++) + { + if (invoker.TryBeginInvoke(() => { })) + { + accepted++; + } + else + { + sawFull = true; + } + } + }); + thread.Start(); + thread.Join(); + + Assert.IsTrue(sawFull, "Queue should report full once capacity is exceeded without pumping."); + Assert.IsTrue(accepted > 0, "Some actions should have been accepted before the queue filled."); + } + + [TestMethod] + public void TryBeginInvokeConcurrentProducersDeliverAllActions() + { + const int producers = 4; + const int perProducer = 10_000; + const int total = producers * perProducer; + + Invoker invoker = new(total); + int executed = 0; + + Thread[] threads = new Thread[producers]; + for (int p = 0; p < producers; p++) + { + threads[p] = new Thread(() => + { + for (int i = 0; i < perProducer; i++) + { + while (!invoker.TryBeginInvoke(() => Interlocked.Increment(ref executed))) + { + Thread.SpinWait(1); + } + } + }); + } + + foreach (Thread thread in threads) + { + thread.Start(); + } + + foreach (Thread thread in threads) + { + thread.Join(); + } + + invoker.DoInvokes(); + + Assert.AreEqual(total, executed, "Every queued action from every producer must run exactly once."); + } +} diff --git a/Invoker/BoundedMpscQueue.cs b/Invoker/BoundedMpscQueue.cs new file mode 100644 index 0000000..5438d6b --- /dev/null +++ b/Invoker/BoundedMpscQueue.cs @@ -0,0 +1,149 @@ +// Copyright (c) ktsu.dev +// All rights reserved. +// Licensed under the MIT license. + +namespace ktsu.Invoker; + +using System.Threading; + +/// +/// A bounded, lock-free multi-producer/single-consumer queue based on Dmitry Vyukov's bounded MPMC +/// algorithm (which also remains correct for multiple concurrent consumers). +/// +/// +/// The queue pre-allocates a fixed-size array of cells in its constructor; thereafter +/// and allocate nothing, take no locks, and never +/// block. Each cell carries a sequence number that producers and consumers advance with a single +/// compare-and-swap, which is what makes the structure safe for any number of concurrent producers +/// and consumers. +/// +/// +/// It is used by to back the non-blocking +/// path. Note that this queue is not intended for use from a hard real-time audio thread: while +/// it is allocation-free, the actions it carries (and anything they capture) are not, and audio→UI +/// telemetry should instead flow through a dedicated single-producer/single-consumer ring buffer. +/// +/// +/// The type of elements stored in the queue. +internal sealed class BoundedMpscQueue +{ + private struct Cell + { + public long Sequence; + public T? Item; + } + + private readonly Cell[] buffer; + private readonly int mask; + + // Producer and consumer cursors. Kept on separate cache lines would be ideal, but they are only + // ever advanced via Interlocked, so contention is bounded by the CAS itself. + private long enqueuePos; + private long dequeuePos; + + /// + /// Gets the maximum number of elements the queue can hold at once. + /// + public int Capacity { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The requested capacity; rounded up to the next power of two (minimum 2). + /// Thrown when is less than one. + public BoundedMpscQueue(int capacity) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(capacity); + + int size = NextPower2(Math.Max(capacity, 2)); + buffer = new Cell[size]; + mask = size - 1; + Capacity = size; + + for (int i = 0; i < size; i++) + { + buffer[i].Sequence = i; + } + } + + /// + /// Attempts to add an element to the queue. + /// + /// The element to add. + /// if the element was added; if the queue was full. + public bool TryEnqueue(T item) + { + while (true) + { + long pos = Volatile.Read(ref enqueuePos); + int index = (int)pos & mask; + long seq = Volatile.Read(ref buffer[index].Sequence); + long diff = seq - pos; + + if (diff == 0) + { + // The cell is free and it is our turn to claim it. + if (Interlocked.CompareExchange(ref enqueuePos, pos + 1, pos) == pos) + { + buffer[index].Item = item; + Volatile.Write(ref buffer[index].Sequence, pos + 1); + return true; + } + } + else if (diff < 0) + { + // The cell still holds an unconsumed element: the queue is full. + return false; + } + + // diff > 0: another producer claimed this slot; retry with the refreshed cursor. + } + } + + /// + /// Attempts to remove and return the oldest element in the queue. + /// + /// When this method returns , contains the dequeued element; otherwise the default value. + /// if an element was removed; if the queue was empty. + public bool TryDequeue(out T? item) + { + while (true) + { + long pos = Volatile.Read(ref dequeuePos); + int index = (int)pos & mask; + long seq = Volatile.Read(ref buffer[index].Sequence); + long diff = seq - (pos + 1); + + if (diff == 0) + { + if (Interlocked.CompareExchange(ref dequeuePos, pos + 1, pos) == pos) + { + item = buffer[index].Item; + buffer[index].Item = default; + Volatile.Write(ref buffer[index].Sequence, pos + mask + 1); + return true; + } + } + else if (diff < 0) + { + // The cell has not been published yet: the queue is empty. + item = default; + return false; + } + + // diff > 0: another consumer took this slot; retry with the refreshed cursor. + } + } + + private static int NextPower2(int v) + { + v--; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v++; + return v; + } +} diff --git a/Invoker/Invoker.cs b/Invoker/Invoker.cs index 3c06292..d063ed1 100644 --- a/Invoker/Invoker.cs +++ b/Invoker/Invoker.cs @@ -9,8 +9,31 @@ namespace ktsu.Invoker; /// /// Provides methods to invoke actions and functions asynchronously or synchronously on a specific thread. /// +/// +/// +/// Threading model. An belongs to the thread that constructed it (the +/// "owner thread"). Work submitted from another thread is queued and runs only when +/// is pumped on the owner thread — typically once per iteration of a UI or +/// render loop. and block/await the +/// owner thread, so they marshal work onto the owner thread; they do not make arbitrary code +/// thread-safe. +/// +/// +/// Not for real-time audio threads. The blocking and async paths allocate (a +/// per call) and can block the caller until the owner thread pumps, so they must +/// never be called from a hard real-time thread such as an audio callback. For the fire-and-forget, +/// non-blocking case from a non-real-time producer, prefer , +/// which is allocation-free and bounded. Audio→UI telemetry should not go through the invoker at all; +/// publish it through a dedicated single-producer/single-consumer ring buffer instead. +/// +/// public class Invoker { + /// + /// The default capacity of the non-blocking queue. + /// + private const int DefaultBeginInvokeCapacity = 1024; + /// /// Gets the ID of the thread on which this instance was created. /// @@ -21,6 +44,27 @@ public class Invoker /// internal ConcurrentQueue TaskQueue { get; } = new(); + /// + /// Gets the bounded, lock-free queue backing . + /// + private BoundedMpscQueue BeginInvokeQueue { get; } + + /// + /// Initializes a new instance of the class owned by the calling thread, with + /// the default non-blocking queue capacity. + /// + public Invoker() + : this(DefaultBeginInvokeCapacity) + { + } + + /// + /// Initializes a new instance of the class owned by the calling thread. + /// + /// The capacity of the non-blocking queue. Rounded up to the next power of two. + /// Thrown when is less than one. + public Invoker(int beginInvokeCapacity) => BeginInvokeQueue = new BoundedMpscQueue(beginInvokeCapacity); + /// /// Invokes the specified action asynchronously. /// @@ -99,6 +143,37 @@ public TReturn Invoke(Func func) } } + /// + /// Attempts to queue an action for fire-and-forget execution on the owner thread without blocking + /// or allocating. + /// + /// The action to queue. + /// + /// if the action was executed immediately (called on the owner thread) or + /// successfully queued; if the bounded queue was full and the action was + /// dropped. + /// + /// Thrown when is null. + /// + /// Unlike / , this method never blocks + /// and never allocates a : the caller is not notified of completion and cannot + /// await a result. It is intended for non-real-time producers that want to push work to the owner + /// thread cheaply. Queued actions run the next time is pumped, in FIFO order. + /// When called from the owner thread the action runs synchronously and immediately. + /// + public bool TryBeginInvoke(Action func) + { + Ensure.NotNull(func); + + if (ThreadId == Environment.CurrentManagedThreadId) + { + func(); + return true; + } + + return BeginInvokeQueue.TryEnqueue(func); + } + /// /// Executes all queued tasks synchronously on the thread that created the Invoker instance. /// @@ -110,6 +185,11 @@ public void DoInvokes() throw new InvalidOperationException("This method must be called on the thread that created the Invoker instance."); } + while (BeginInvokeQueue.TryDequeue(out Action? action)) + { + action!(); + } + while (TaskQueue.TryDequeue(out Task? task)) { task.RunSynchronously(); From 3c282554ca66b0c4455f257f1603328ad33ddf84 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 03:42:43 +0000 Subject: [PATCH 2/2] Use a primary constructor in Invoker CI's code-style analyzers (which the local sandbox SDK cannot load) flag the explicit capacity constructor as IDE0290. Convert to a primary constructor taking beginInvokeCapacity, with the parameterless constructor delegating to it via the default capacity. Behaviour is unchanged. --- Invoker/Invoker.cs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/Invoker/Invoker.cs b/Invoker/Invoker.cs index d063ed1..f62061f 100644 --- a/Invoker/Invoker.cs +++ b/Invoker/Invoker.cs @@ -27,7 +27,9 @@ namespace ktsu.Invoker; /// publish it through a dedicated single-producer/single-consumer ring buffer instead. /// /// -public class Invoker +/// The capacity of the non-blocking queue. Rounded up to the next power of two. +/// Thrown when is less than one. +public class Invoker(int beginInvokeCapacity) { /// /// The default capacity of the non-blocking queue. @@ -47,7 +49,7 @@ public class Invoker /// /// Gets the bounded, lock-free queue backing . /// - private BoundedMpscQueue BeginInvokeQueue { get; } + private BoundedMpscQueue BeginInvokeQueue { get; } = new(beginInvokeCapacity); /// /// Initializes a new instance of the class owned by the calling thread, with @@ -58,13 +60,6 @@ public Invoker() { } - /// - /// Initializes a new instance of the class owned by the calling thread. - /// - /// The capacity of the non-blocking queue. Rounded up to the next power of two. - /// Thrown when is less than one. - public Invoker(int beginInvokeCapacity) => BeginInvokeQueue = new BoundedMpscQueue(beginInvokeCapacity); - /// /// Invokes the specified action asynchronously. ///