Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions Invoker.Test/TryBeginInvokeTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) ktsu.dev
// All rights reserved.
// Licensed under the MIT license.

namespace ktsu.Invoker.Test;

using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class TryBeginInvokeTests
{
[TestMethod]
public void TryBeginInvokeNullShouldThrowArgumentNullException()
{
Invoker invoker = new();
Assert.ThrowsExactly<ArgumentNullException>(() => invoker.TryBeginInvoke(null!));
}

[TestMethod]
public void TryBeginInvokeSameThreadShouldInvokeImmediately()
{
Invoker invoker = new();
bool invoked = false;
bool result = invoker.TryBeginInvoke(() => invoked = true);
Assert.IsTrue(result, "Should report success when invoked on the owner thread.");
Assert.IsTrue(invoked, "Action should be invoked immediately on the owner thread.");
}

[TestMethod]
public void TryBeginInvokeOtherThreadShouldQueueUntilDoInvokes()
{
Invoker invoker = new();
bool invoked = false;

Thread thread = new(() => invoker.TryBeginInvoke(() => invoked = true));
thread.Start();
thread.Join();

Assert.IsFalse(invoked, "Action should not run before DoInvokes is pumped.");
invoker.DoInvokes();
Assert.IsTrue(invoked, "Action should run when DoInvokes is pumped on the owner thread.");
}

[TestMethod]
public void TryBeginInvokePreservesFifoOrder()
{
Invoker invoker = new();
List<int> order = [];

Thread thread = new(() =>
{
for (int i = 0; i < 50; i++)
{
int captured = i;
invoker.TryBeginInvoke(() => order.Add(captured));
}
});
thread.Start();
thread.Join();

invoker.DoInvokes();

Assert.HasCount(50, order);
for (int i = 0; i < 50; i++)
{
Assert.AreEqual(i, order[i], "Actions should execute in the order they were queued.");
}
}

[TestMethod]
public void TryBeginInvokeReturnsFalseWhenQueueFull()
{
// Small capacity (rounded up to a power of two) so we can fill it from a non-owner thread.
Invoker invoker = new(2);
bool sawFull = false;
int accepted = 0;

Thread thread = new(() =>
{
for (int i = 0; i < 100; i++)
{
if (invoker.TryBeginInvoke(() => { }))
{
accepted++;
}
else
{
sawFull = true;
}
}
});
thread.Start();
thread.Join();

Assert.IsTrue(sawFull, "Queue should report full once capacity is exceeded without pumping.");
Assert.IsTrue(accepted > 0, "Some actions should have been accepted before the queue filled.");
}

[TestMethod]
public void TryBeginInvokeConcurrentProducersDeliverAllActions()
{
const int producers = 4;
const int perProducer = 10_000;
const int total = producers * perProducer;

Invoker invoker = new(total);
int executed = 0;

Thread[] threads = new Thread[producers];
for (int p = 0; p < producers; p++)
{
threads[p] = new Thread(() =>
{
for (int i = 0; i < perProducer; i++)
{
while (!invoker.TryBeginInvoke(() => Interlocked.Increment(ref executed)))
{
Thread.SpinWait(1);
}
}
});
}

foreach (Thread thread in threads)
{
thread.Start();
}

foreach (Thread thread in threads)
{
thread.Join();
}

invoker.DoInvokes();

Assert.AreEqual(total, executed, "Every queued action from every producer must run exactly once.");
}
}
149 changes: 149 additions & 0 deletions Invoker/BoundedMpscQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) ktsu.dev
// All rights reserved.
// Licensed under the MIT license.

namespace ktsu.Invoker;

using System.Threading;

/// <summary>
/// A bounded, lock-free multi-producer/single-consumer queue based on Dmitry Vyukov's bounded MPMC
/// algorithm (which also remains correct for multiple concurrent consumers).
/// </summary>
/// <remarks>
/// The queue pre-allocates a fixed-size array of cells in its constructor; thereafter
/// <see cref="TryEnqueue"/> and <see cref="TryDequeue"/> allocate nothing, take no locks, and never
/// block. Each cell carries a sequence number that producers and consumers advance with a single
/// compare-and-swap, which is what makes the structure safe for any number of concurrent producers
/// and consumers.
///
/// <para>
/// It is used by <see cref="Invoker"/> to back the non-blocking <see cref="Invoker.TryBeginInvoke"/>
/// path. Note that this queue is <b>not</b> intended for use from a hard real-time audio thread: while
/// it is allocation-free, the actions it carries (and anything they capture) are not, and audio→UI
/// telemetry should instead flow through a dedicated single-producer/single-consumer ring buffer.
/// </para>
/// </remarks>
/// <typeparam name="T">The type of elements stored in the queue.</typeparam>
internal sealed class BoundedMpscQueue<T>
{
private struct Cell
{
public long Sequence;
public T? Item;
}

private readonly Cell[] buffer;
private readonly int mask;

// Producer and consumer cursors. Kept on separate cache lines would be ideal, but they are only
// ever advanced via Interlocked, so contention is bounded by the CAS itself.
private long enqueuePos;
private long dequeuePos;

/// <summary>
/// Gets the maximum number of elements the queue can hold at once.
/// </summary>
public int Capacity { get; }

/// <summary>
/// Initializes a new instance of the <see cref="BoundedMpscQueue{T}"/> class.
/// </summary>
/// <param name="capacity">The requested capacity; rounded up to the next power of two (minimum 2).</param>
/// <exception cref="ArgumentOutOfRangeException">Thrown when <paramref name="capacity"/> is less than one.</exception>
public BoundedMpscQueue(int capacity)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(capacity);

int size = NextPower2(Math.Max(capacity, 2));
buffer = new Cell[size];
mask = size - 1;
Capacity = size;

for (int i = 0; i < size; i++)
{
buffer[i].Sequence = i;
}
}

/// <summary>
/// Attempts to add an element to the queue.
/// </summary>
/// <param name="item">The element to add.</param>
/// <returns><see langword="true"/> if the element was added; <see langword="false"/> if the queue was full.</returns>
public bool TryEnqueue(T item)
{
while (true)
{
long pos = Volatile.Read(ref enqueuePos);
int index = (int)pos & mask;
long seq = Volatile.Read(ref buffer[index].Sequence);
long diff = seq - pos;

if (diff == 0)
{
// The cell is free and it is our turn to claim it.
if (Interlocked.CompareExchange(ref enqueuePos, pos + 1, pos) == pos)
{
buffer[index].Item = item;
Volatile.Write(ref buffer[index].Sequence, pos + 1);
return true;
}
}
else if (diff < 0)
{
// The cell still holds an unconsumed element: the queue is full.
return false;
}

// diff > 0: another producer claimed this slot; retry with the refreshed cursor.
}
}

/// <summary>
/// Attempts to remove and return the oldest element in the queue.
/// </summary>
/// <param name="item">When this method returns <see langword="true"/>, contains the dequeued element; otherwise the default value.</param>
/// <returns><see langword="true"/> if an element was removed; <see langword="false"/> if the queue was empty.</returns>
public bool TryDequeue(out T? item)
{
while (true)
{
long pos = Volatile.Read(ref dequeuePos);
int index = (int)pos & mask;
long seq = Volatile.Read(ref buffer[index].Sequence);
long diff = seq - (pos + 1);

if (diff == 0)
{
if (Interlocked.CompareExchange(ref dequeuePos, pos + 1, pos) == pos)
{
item = buffer[index].Item;
buffer[index].Item = default;
Volatile.Write(ref buffer[index].Sequence, pos + mask + 1);
return true;
}
}
else if (diff < 0)
{
// The cell has not been published yet: the queue is empty.
item = default;
return false;
}

// diff > 0: another consumer took this slot; retry with the refreshed cursor.
}
}

private static int NextPower2(int v)
{
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v++;
return v;
}
}
77 changes: 76 additions & 1 deletion Invoker/Invoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,33 @@ namespace ktsu.Invoker;
/// <summary>
/// Provides methods to invoke actions and functions asynchronously or synchronously on a specific thread.
/// </summary>
public class Invoker
/// <remarks>
/// <para>
/// <b>Threading model.</b> An <see cref="Invoker"/> belongs to the thread that constructed it (the
/// "owner thread"). Work submitted from another thread is queued and runs only when
/// <see cref="DoInvokes"/> is pumped on the owner thread — typically once per iteration of a UI or
/// render loop. <see cref="Invoke(Action)"/> and <see cref="InvokeAsync(Action)"/> block/await the
/// owner thread, so they marshal work <i>onto</i> the owner thread; they do not make arbitrary code
/// thread-safe.
/// </para>
/// <para>
/// <b>Not for real-time audio threads.</b> The blocking and async paths allocate (a
/// <see cref="Task"/> per call) and can block the caller until the owner thread pumps, so they must
/// never be called from a hard real-time thread such as an audio callback. For the fire-and-forget,
/// non-blocking case from a non-real-time producer, prefer <see cref="TryBeginInvoke(Action)"/>,
/// which is allocation-free and bounded. Audio→UI telemetry should not go through the invoker at all;
/// publish it through a dedicated single-producer/single-consumer ring buffer instead.
/// </para>
/// </remarks>
/// <param name="beginInvokeCapacity">The capacity of the non-blocking <see cref="TryBeginInvoke(Action)"/> queue. Rounded up to the next power of two.</param>
/// <exception cref="ArgumentOutOfRangeException">Thrown when <paramref name="beginInvokeCapacity"/> is less than one.</exception>
public class Invoker(int beginInvokeCapacity)
{
/// <summary>
/// The default capacity of the non-blocking <see cref="TryBeginInvoke(Action)"/> queue.
/// </summary>
private const int DefaultBeginInvokeCapacity = 1024;

/// <summary>
/// Gets the ID of the thread on which this instance was created.
/// </summary>
Expand All @@ -21,6 +46,20 @@ public class Invoker
/// </summary>
internal ConcurrentQueue<Task> TaskQueue { get; } = new();

/// <summary>
/// Gets the bounded, lock-free queue backing <see cref="TryBeginInvoke(Action)"/>.
/// </summary>
private BoundedMpscQueue<Action> BeginInvokeQueue { get; } = new(beginInvokeCapacity);

/// <summary>
/// Initializes a new instance of the <see cref="Invoker"/> class owned by the calling thread, with
/// the default non-blocking queue capacity.
/// </summary>
public Invoker()
: this(DefaultBeginInvokeCapacity)
{
}

/// <summary>
/// Invokes the specified action asynchronously.
/// </summary>
Expand Down Expand Up @@ -99,6 +138,37 @@ public TReturn Invoke<TReturn>(Func<TReturn> func)
}
}

/// <summary>
/// Attempts to queue an action for fire-and-forget execution on the owner thread without blocking
/// or allocating.
/// </summary>
/// <param name="func">The action to queue.</param>
/// <returns>
/// <see langword="true"/> if the action was executed immediately (called on the owner thread) or
/// successfully queued; <see langword="false"/> if the bounded queue was full and the action was
/// dropped.
/// </returns>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="func"/> is null.</exception>
/// <remarks>
/// Unlike <see cref="Invoke(Action)"/> / <see cref="InvokeAsync(Action)"/>, this method never blocks
/// and never allocates a <see cref="Task"/>: the caller is not notified of completion and cannot
/// await a result. It is intended for non-real-time producers that want to push work to the owner
/// thread cheaply. Queued actions run the next time <see cref="DoInvokes"/> is pumped, in FIFO order.
/// When called from the owner thread the action runs synchronously and immediately.
/// </remarks>
public bool TryBeginInvoke(Action func)
{
Ensure.NotNull(func);

if (ThreadId == Environment.CurrentManagedThreadId)
{
func();
return true;
}

return BeginInvokeQueue.TryEnqueue(func);
}

/// <summary>
/// Executes all queued tasks synchronously on the thread that created the Invoker instance.
/// </summary>
Expand All @@ -110,6 +180,11 @@ public void DoInvokes()
throw new InvalidOperationException("This method must be called on the thread that created the Invoker instance.");
}

while (BeginInvokeQueue.TryDequeue(out Action? action))
{
action!();
}

while (TaskQueue.TryDequeue(out Task? task))
{
task.RunSynchronously();
Expand Down