diff --git a/Invoker.Test/TryBeginInvokeTests.cs b/Invoker.Test/TryBeginInvokeTests.cs new file mode 100644 index 0000000..cfb1564 --- /dev/null +++ b/Invoker.Test/TryBeginInvokeTests.cs @@ -0,0 +1,138 @@ +// Copyright (c) ktsu.dev +// All rights reserved. +// Licensed under the MIT license. + +namespace ktsu.Invoker.Test; + +using Microsoft.VisualStudio.TestTools.UnitTesting; + +[TestClass] +public class TryBeginInvokeTests +{ + [TestMethod] + public void TryBeginInvokeNullShouldThrowArgumentNullException() + { + Invoker invoker = new(); + Assert.ThrowsExactly(() => invoker.TryBeginInvoke(null!)); + } + + [TestMethod] + public void TryBeginInvokeSameThreadShouldInvokeImmediately() + { + Invoker invoker = new(); + bool invoked = false; + bool result = invoker.TryBeginInvoke(() => invoked = true); + Assert.IsTrue(result, "Should report success when invoked on the owner thread."); + Assert.IsTrue(invoked, "Action should be invoked immediately on the owner thread."); + } + + [TestMethod] + public void TryBeginInvokeOtherThreadShouldQueueUntilDoInvokes() + { + Invoker invoker = new(); + bool invoked = false; + + Thread thread = new(() => invoker.TryBeginInvoke(() => invoked = true)); + thread.Start(); + thread.Join(); + + Assert.IsFalse(invoked, "Action should not run before DoInvokes is pumped."); + invoker.DoInvokes(); + Assert.IsTrue(invoked, "Action should run when DoInvokes is pumped on the owner thread."); + } + + [TestMethod] + public void TryBeginInvokePreservesFifoOrder() + { + Invoker invoker = new(); + List order = []; + + Thread thread = new(() => + { + for (int i = 0; i < 50; i++) + { + int captured = i; + invoker.TryBeginInvoke(() => order.Add(captured)); + } + }); + thread.Start(); + thread.Join(); + + invoker.DoInvokes(); + + Assert.HasCount(50, order); + for (int i = 0; i < 50; i++) + { + Assert.AreEqual(i, order[i], "Actions should execute in the order they were queued."); + } + } + + [TestMethod] + public void TryBeginInvokeReturnsFalseWhenQueueFull() + { + // Small capacity (rounded up to a power of two) so we can fill it from a non-owner thread. + Invoker invoker = new(2); + bool sawFull = false; + int accepted = 0; + + Thread thread = new(() => + { + for (int i = 0; i < 100; i++) + { + if (invoker.TryBeginInvoke(() => { })) + { + accepted++; + } + else + { + sawFull = true; + } + } + }); + thread.Start(); + thread.Join(); + + Assert.IsTrue(sawFull, "Queue should report full once capacity is exceeded without pumping."); + Assert.IsTrue(accepted > 0, "Some actions should have been accepted before the queue filled."); + } + + [TestMethod] + public void TryBeginInvokeConcurrentProducersDeliverAllActions() + { + const int producers = 4; + const int perProducer = 10_000; + const int total = producers * perProducer; + + Invoker invoker = new(total); + int executed = 0; + + Thread[] threads = new Thread[producers]; + for (int p = 0; p < producers; p++) + { + threads[p] = new Thread(() => + { + for (int i = 0; i < perProducer; i++) + { + while (!invoker.TryBeginInvoke(() => Interlocked.Increment(ref executed))) + { + Thread.SpinWait(1); + } + } + }); + } + + foreach (Thread thread in threads) + { + thread.Start(); + } + + foreach (Thread thread in threads) + { + thread.Join(); + } + + invoker.DoInvokes(); + + Assert.AreEqual(total, executed, "Every queued action from every producer must run exactly once."); + } +} diff --git a/Invoker/BoundedMpscQueue.cs b/Invoker/BoundedMpscQueue.cs new file mode 100644 index 0000000..5438d6b --- /dev/null +++ b/Invoker/BoundedMpscQueue.cs @@ -0,0 +1,149 @@ +// Copyright (c) ktsu.dev +// All rights reserved. +// Licensed under the MIT license. + +namespace ktsu.Invoker; + +using System.Threading; + +/// +/// A bounded, lock-free multi-producer/single-consumer queue based on Dmitry Vyukov's bounded MPMC +/// algorithm (which also remains correct for multiple concurrent consumers). +/// +/// +/// The queue pre-allocates a fixed-size array of cells in its constructor; thereafter +/// and allocate nothing, take no locks, and never +/// block. Each cell carries a sequence number that producers and consumers advance with a single +/// compare-and-swap, which is what makes the structure safe for any number of concurrent producers +/// and consumers. +/// +/// +/// It is used by to back the non-blocking +/// path. Note that this queue is not intended for use from a hard real-time audio thread: while +/// it is allocation-free, the actions it carries (and anything they capture) are not, and audio→UI +/// telemetry should instead flow through a dedicated single-producer/single-consumer ring buffer. +/// +/// +/// The type of elements stored in the queue. +internal sealed class BoundedMpscQueue +{ + private struct Cell + { + public long Sequence; + public T? Item; + } + + private readonly Cell[] buffer; + private readonly int mask; + + // Producer and consumer cursors. Kept on separate cache lines would be ideal, but they are only + // ever advanced via Interlocked, so contention is bounded by the CAS itself. + private long enqueuePos; + private long dequeuePos; + + /// + /// Gets the maximum number of elements the queue can hold at once. + /// + public int Capacity { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The requested capacity; rounded up to the next power of two (minimum 2). + /// Thrown when is less than one. + public BoundedMpscQueue(int capacity) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(capacity); + + int size = NextPower2(Math.Max(capacity, 2)); + buffer = new Cell[size]; + mask = size - 1; + Capacity = size; + + for (int i = 0; i < size; i++) + { + buffer[i].Sequence = i; + } + } + + /// + /// Attempts to add an element to the queue. + /// + /// The element to add. + /// if the element was added; if the queue was full. + public bool TryEnqueue(T item) + { + while (true) + { + long pos = Volatile.Read(ref enqueuePos); + int index = (int)pos & mask; + long seq = Volatile.Read(ref buffer[index].Sequence); + long diff = seq - pos; + + if (diff == 0) + { + // The cell is free and it is our turn to claim it. + if (Interlocked.CompareExchange(ref enqueuePos, pos + 1, pos) == pos) + { + buffer[index].Item = item; + Volatile.Write(ref buffer[index].Sequence, pos + 1); + return true; + } + } + else if (diff < 0) + { + // The cell still holds an unconsumed element: the queue is full. + return false; + } + + // diff > 0: another producer claimed this slot; retry with the refreshed cursor. + } + } + + /// + /// Attempts to remove and return the oldest element in the queue. + /// + /// When this method returns , contains the dequeued element; otherwise the default value. + /// if an element was removed; if the queue was empty. + public bool TryDequeue(out T? item) + { + while (true) + { + long pos = Volatile.Read(ref dequeuePos); + int index = (int)pos & mask; + long seq = Volatile.Read(ref buffer[index].Sequence); + long diff = seq - (pos + 1); + + if (diff == 0) + { + if (Interlocked.CompareExchange(ref dequeuePos, pos + 1, pos) == pos) + { + item = buffer[index].Item; + buffer[index].Item = default; + Volatile.Write(ref buffer[index].Sequence, pos + mask + 1); + return true; + } + } + else if (diff < 0) + { + // The cell has not been published yet: the queue is empty. + item = default; + return false; + } + + // diff > 0: another consumer took this slot; retry with the refreshed cursor. + } + } + + private static int NextPower2(int v) + { + v--; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v++; + return v; + } +} diff --git a/Invoker/Invoker.cs b/Invoker/Invoker.cs index 3c06292..f62061f 100644 --- a/Invoker/Invoker.cs +++ b/Invoker/Invoker.cs @@ -9,8 +9,33 @@ namespace ktsu.Invoker; /// /// Provides methods to invoke actions and functions asynchronously or synchronously on a specific thread. /// -public class Invoker +/// +/// +/// Threading model. An belongs to the thread that constructed it (the +/// "owner thread"). Work submitted from another thread is queued and runs only when +/// is pumped on the owner thread — typically once per iteration of a UI or +/// render loop. and block/await the +/// owner thread, so they marshal work onto the owner thread; they do not make arbitrary code +/// thread-safe. +/// +/// +/// Not for real-time audio threads. The blocking and async paths allocate (a +/// per call) and can block the caller until the owner thread pumps, so they must +/// never be called from a hard real-time thread such as an audio callback. For the fire-and-forget, +/// non-blocking case from a non-real-time producer, prefer , +/// which is allocation-free and bounded. Audio→UI telemetry should not go through the invoker at all; +/// publish it through a dedicated single-producer/single-consumer ring buffer instead. +/// +/// +/// The capacity of the non-blocking queue. Rounded up to the next power of two. +/// Thrown when is less than one. +public class Invoker(int beginInvokeCapacity) { + /// + /// The default capacity of the non-blocking queue. + /// + private const int DefaultBeginInvokeCapacity = 1024; + /// /// Gets the ID of the thread on which this instance was created. /// @@ -21,6 +46,20 @@ public class Invoker /// internal ConcurrentQueue TaskQueue { get; } = new(); + /// + /// Gets the bounded, lock-free queue backing . + /// + private BoundedMpscQueue BeginInvokeQueue { get; } = new(beginInvokeCapacity); + + /// + /// Initializes a new instance of the class owned by the calling thread, with + /// the default non-blocking queue capacity. + /// + public Invoker() + : this(DefaultBeginInvokeCapacity) + { + } + /// /// Invokes the specified action asynchronously. /// @@ -99,6 +138,37 @@ public TReturn Invoke(Func func) } } + /// + /// Attempts to queue an action for fire-and-forget execution on the owner thread without blocking + /// or allocating. + /// + /// The action to queue. + /// + /// if the action was executed immediately (called on the owner thread) or + /// successfully queued; if the bounded queue was full and the action was + /// dropped. + /// + /// Thrown when is null. + /// + /// Unlike / , this method never blocks + /// and never allocates a : the caller is not notified of completion and cannot + /// await a result. It is intended for non-real-time producers that want to push work to the owner + /// thread cheaply. Queued actions run the next time is pumped, in FIFO order. + /// When called from the owner thread the action runs synchronously and immediately. + /// + public bool TryBeginInvoke(Action func) + { + Ensure.NotNull(func); + + if (ThreadId == Environment.CurrentManagedThreadId) + { + func(); + return true; + } + + return BeginInvokeQueue.TryEnqueue(func); + } + /// /// Executes all queued tasks synchronously on the thread that created the Invoker instance. /// @@ -110,6 +180,11 @@ public void DoInvokes() throw new InvalidOperationException("This method must be called on the thread that created the Invoker instance."); } + while (BeginInvokeQueue.TryDequeue(out Action? action)) + { + action!(); + } + while (TaskQueue.TryDequeue(out Task? task)) { task.RunSynchronously();