diff --git a/.gitignore b/.gitignore
index 809f06a76..e80f90d16 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,6 +26,7 @@ bld/
## See more on package restore for Nuget 2.7 here: http://docs.nuget.org/consume/package-restore
# Ignore NuGet Packages
*.nupkg
+nupkg/
# Ignore the packages folder
**/packages/*
# except build/, which is used as an MSBuild target.
@@ -58,3 +59,5 @@ $tf/pendingchanges.tfb
$tf/properties.tf1
project.lock.json
**/.vs/
+
+.claude/
\ No newline at end of file
diff --git a/PACKAGE_README.md b/PACKAGE_README.md
new file mode 100644
index 000000000..a3e43390d
--- /dev/null
+++ b/PACKAGE_README.md
@@ -0,0 +1,20 @@
+# Trill.StreamProcessing
+
+Trill is a high-performance one-pass in-memory streaming analytics engine originally developed by Microsoft Research. It can handle both real-time and offline data, and is based on a temporal data and query model. Trill can be used as a streaming engine, a lightweight in-memory relational engine, and as a progressive query processor for early query results on partial data.
+
+This package is a community-maintained fork of [Microsoft's Trill](https://github.com/microsoft/Trill), updated for .NET 10 and actively maintained by [Amir Burbea](https://github.com/amirburbea).
+
+## Installation
+
+```
+dotnet add package Trill.StreamProcessing
+```
+
+This package contains `Microsoft.StreamProcessing` (the core streaming engine). The experimental `Microsoft.StreamProcessing.Provider` project in the repo is not published as part of this package.
+
+## Learn More
+
+- [Trill on GitHub](https://github.com/amirburbea/trill)
+- [Original announcement blog post](https://azure.microsoft.com/en-us/blog/microsoft-open-sources-trill-to-deliver-insights-on-a-trillion-events-a-day/)
+- [Trill paper (VLDB 2015)](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/trill-vldb2015.pdf)
+- [Trill samples repository](https://github.com/Microsoft/TrillSamples)
diff --git a/Sources/.editorconfig b/Sources/.editorconfig
index 1bae2e959..d1110ea7e 100644
--- a/Sources/.editorconfig
+++ b/Sources/.editorconfig
@@ -50,4 +50,4 @@ csharp_new_line_before_catch = true
csharp_new_line_before_finally = true
csharp_indent_case_contents = true
csharp_indent_switch_labels = true
-csharp_preserve_single_line_statements = false
\ No newline at end of file
+csharp_preserve_single_line_statements = false
diff --git a/Sources/Core/Directory.Build.props b/Sources/Core/Directory.Build.props
index 15f391280..fa8e33c0e 100644
--- a/Sources/Core/Directory.Build.props
+++ b/Sources/Core/Directory.Build.props
@@ -3,15 +3,5 @@
-
-
- all
- runtime; build; native; contentfiles; analyzers
-
-
-
-
- $(EnlistmentRoot)\Sources\Microsoft.StreamProcessing.ruleset
-
diff --git a/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj b/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj
index 1d9de4e69..44ed8b756 100644
--- a/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj
+++ b/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj
@@ -1,29 +1,18 @@
- netstandard2.0
+ net10.0
x64;AnyCPU
true
Microsoft.StreamProcessing.Provider
+ false
-
-
-
-
-
-
-
-
-
-
-
-
-
+
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctions.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctions.cs
index dcc3e31f0..9073f7692 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctions.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctions.cs
@@ -48,8 +48,8 @@ public static IAggregate Wrap aggregate,
Expression> transform)
{
- Contract.Requires(aggregate != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
+ ArgumentNullException.ThrowIfNull(transform);
return aggregate.TransformInput(transform);
}
@@ -62,8 +62,8 @@ public static IAggregate Wrap aggregate,
Expression> transform) where TAggregateInput : struct
{
- Contract.Requires(aggregate != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
+ ArgumentNullException.ThrowIfNull(transform);
return aggregate.MakeInputNullableAndSkipNulls().TransformInput(transform);
}
@@ -71,8 +71,8 @@ internal static IAggregate TransformInput aggregate,
Expression> transform)
{
- Contract.Requires(aggregate != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
+ ArgumentNullException.ThrowIfNull(transform);
return GeneratedAggregate.Create(
initialState: aggregate.InitialState(),
@@ -86,19 +86,19 @@ private static Expression> TransformInput3> func,
Expression> transform)
{
- Contract.Requires(func != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(func);
+ ArgumentNullException.ThrowIfNull(transform);
var result = func.ReplaceParametersInBody(func.Parameters[0], func.Parameters[1], transform.Body);
var transformParam = transform.Parameters[0];
- return Expression.Lambda>(result, new[] { func.Parameters[0], func.Parameters[1], transformParam });
+ return Expression.Lambda>(result, [func.Parameters[0], func.Parameters[1], transformParam]);
}
internal static IAggregate TransformOutput(
this IAggregate aggregate,
Expression> transform)
{
- Contract.Requires(aggregate != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
+ ArgumentNullException.ThrowIfNull(transform);
return GeneratedAggregate.Create(
initialState: aggregate.InitialState(),
@@ -112,8 +112,8 @@ private static Expression> TransformOutput> func,
Expression> transform)
{
- Contract.Requires(func != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(func);
+ ArgumentNullException.ThrowIfNull(transform);
var result = transform.ReplaceParametersInBody(func.Body);
return Expression.Lambda>(result, func.Parameters);
}
@@ -127,7 +127,7 @@ public static IAggregate ApplyFilter aggregate,
Expression> filter)
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
if (filter == null || filter.Body.ExpressionEquals(Expression.Constant(true))) return aggregate;
Expression> newAccumulate = (oldState, timestamp, input) =>
@@ -151,7 +151,7 @@ public static IAggregate ApplyFilter MakeInputNullableAndSkipNulls(
this IAggregate aggregate) where TInput : struct
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
Expression> newAccumulate = (oldState, timestamp, input) =>
input.HasValue ? CallInliner.Call(aggregate.Accumulate(), oldState, timestamp, input.Value) : oldState;
@@ -175,9 +175,9 @@ public static IAggregate ApplyFilter SkipNulls(
this IAggregate aggregate)
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
- var inputType = typeof(TInput).GetTypeInfo();
+ var inputType = typeof(TInput);
return inputType.IsClass
? GeneratedAggregate.Create(
initialState: aggregate.InitialState(),
@@ -229,7 +229,7 @@ private static Expression> AddSkipNullValueLo
public static IAggregate, TResult?> MakeOutputNullableAndOutputNullWhenEmpty(
this IAggregate aggregate) where TResult : struct
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
Expression>> newInitialState =
() => new NullOutputWrapper
@@ -279,7 +279,7 @@ private static Expression> AddSkipNullValueLo
public static IAggregate, TResult> OutputDefaultWhenEmpty(
this IAggregate aggregate)
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
Expression>> newInitialState =
() => new NullOutputWrapper
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.cs
index e6435694f..74a7ee508 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.cs
@@ -30,8 +30,8 @@ public static IAggregate, TResult> Combine aggregate1,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[1];
Expression, TState1>> target1 = state => state.Item1;
@@ -93,9 +93,9 @@ public static IAggregate, TResult> Combine
IAggregate aggregate2,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[2];
Expression, TState1>> target1 = state => state.Item1;
@@ -171,10 +171,10 @@ public static IAggregate, TResult
IAggregate aggregate3,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[3];
Expression, TState1>> target1 = state => state.Item1;
@@ -268,11 +268,11 @@ public static IAggregate
IAggregate aggregate4,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[4];
Expression, TState1>> target1 = state => state.Item1;
@@ -388,12 +388,12 @@ public static IAggregate aggregate5,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[5];
Expression, TState1>> target1 = state => state.Item1;
@@ -535,13 +535,13 @@ public static IAggregate aggregate6,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[6];
Expression, TState1>> target1 = state => state.Item1;
@@ -713,14 +713,14 @@ public static IAggregate aggregate7,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[7];
Expression, TState1>> target1 = state => state.Item1;
@@ -926,15 +926,15 @@ public static IAggregate aggregate8,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[8];
Expression, TState1>> target1 = state => state.Item1;
@@ -1178,16 +1178,16 @@ public static IAggregate aggregate9,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[9];
Expression, TState1>> target1 = state => state.Item1;
@@ -1473,17 +1473,17 @@ public static IAggregate aggregate10,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[10];
Expression, TState1>> target1 = state => state.Item1;
@@ -1815,18 +1815,18 @@ public static IAggregate aggregate11,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[11];
Expression, TState1>> target1 = state => state.Item1;
@@ -2208,19 +2208,19 @@ public static IAggregate aggregate12,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(aggregate12 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(aggregate12);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[12];
Expression, TState1>> target1 = state => state.Item1;
@@ -2656,20 +2656,20 @@ public static IAggregate aggregate13,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(aggregate12 != null);
- Contract.Requires(aggregate13 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(aggregate12);
+ ArgumentNullException.ThrowIfNull(aggregate13);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[13];
Expression, TState1>> target1 = state => state.Item1;
@@ -3163,21 +3163,21 @@ public static IAggregate aggregate14,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(aggregate12 != null);
- Contract.Requires(aggregate13 != null);
- Contract.Requires(aggregate14 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(aggregate12);
+ ArgumentNullException.ThrowIfNull(aggregate13);
+ ArgumentNullException.ThrowIfNull(aggregate14);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[14];
Expression, TState1>> target1 = state => state.Item1;
@@ -3733,22 +3733,22 @@ public static IAggregate aggregate15,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(aggregate12 != null);
- Contract.Requires(aggregate13 != null);
- Contract.Requires(aggregate14 != null);
- Contract.Requires(aggregate15 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(aggregate12);
+ ArgumentNullException.ThrowIfNull(aggregate13);
+ ArgumentNullException.ThrowIfNull(aggregate14);
+ ArgumentNullException.ThrowIfNull(aggregate15);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[15];
Expression, TState1>> target1 = state => state.Item1;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.tt b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.tt
index 4e9d03d82..1770cb83e 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.tt
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.tt
@@ -35,8 +35,8 @@ namespace Microsoft.StreamProcessing.Aggregates
<#= IterateCommaLine(" IAggregate aggregate$", count) #>,
Expression, TResult>> merger)
{
-<#= IterateLine(" Contract.Requires(aggregate$ != null);", count) #>
- Contract.Requires(merger != null);
+<#= IterateLine(" ArgumentNullException.ThrowIfNull(aggregate$);", count) #>
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[<#= count #>];
<# for (int i = 1; i <= count; i++) { #>
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs
index db3e0ab86..d4ab16d42 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs
@@ -21,7 +21,7 @@ internal sealed class CountAggregate : ISummableAggregate> diff = (leftCount, rightCount) => leftCount - rightCount;
public Expression> Difference() => diff;
- private static readonly Expression> sum = (leftCount, rightCount) => leftCount - rightCount;
+ private static readonly Expression> sum = (leftCount, rightCount) => leftCount + rightCount;
public Expression> Sum() => sum;
private static readonly Expression> res = count => count;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/GeneratedAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/GeneratedAggregate.cs
index 2c85abddc..ad890db45 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/GeneratedAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/GeneratedAggregate.cs
@@ -17,7 +17,7 @@ public static GeneratedAggregate Create> deaccumulate,
Expression> difference,
Expression> computeResult)
- => new GeneratedAggregate(initialState, accumulate, deaccumulate, difference, computeResult);
+ => new(initialState, accumulate, deaccumulate, difference, computeResult);
}
internal class GeneratedAggregate : IAggregate
@@ -39,11 +39,11 @@ public GeneratedAggregate(
Expression> difference,
Expression> computeResult)
{
- Contract.Requires(initialState != null);
- Contract.Requires(accumulate != null);
- Contract.Requires(deaccumulate != null);
- Contract.Requires(difference != null);
- Contract.Requires(computeResult != null);
+ ArgumentNullException.ThrowIfNull(initialState);
+ ArgumentNullException.ThrowIfNull(accumulate);
+ ArgumentNullException.ThrowIfNull(deaccumulate);
+ ArgumentNullException.ThrowIfNull(difference);
+ ArgumentNullException.ThrowIfNull(computeResult);
this.initialState = initialState;
this.accumulate = accumulate;
this.deaccumulate = deaccumulate;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/ListAggregateBase.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/ListAggregateBase.cs
index 4f9fda76f..c5267e51b 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/ListAggregateBase.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/ListAggregateBase.cs
@@ -10,7 +10,7 @@ namespace Microsoft.StreamProcessing.Aggregates
{
internal abstract class ListAggregateBase : IAggregate, R>
{
- private static readonly Expression>> init = () => new List();
+ private static readonly Expression>> init = () => new();
public Expression>> InitialState() => init;
public Expression, long, T, List>> Accumulate()
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxAggregate.cs
index 0b8a7c037..0177d4a3b 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxAggregate.cs
@@ -16,7 +16,7 @@ protected MinMaxAggregateBase(QueryContainer container) : this(ComparerExpressio
protected MinMaxAggregateBase(IComparerExpression comparer, QueryContainer container)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
var generator = comparer.CreateSortedDictionaryGenerator(container);
Expression>, MinMaxState>> template
@@ -26,7 +26,7 @@ Expression>, MinMaxState>> template
}
private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
+ public Expression>> InitialState() => this.initialState;
private static readonly Expression, long, T, MinMaxState>> acc
= (set, timestamp, input) => new MinMaxState { savedValues = set.savedValues.Add(input) };
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileContinuous.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileContinuous.cs
index c92063a7f..f4703205d 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileContinuous.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileContinuous.cs
@@ -18,12 +18,12 @@ public PercentileContinuousDoubleAggregate(double percentile, QueryContainer con
public PercentileContinuousDoubleAggregate(double percentile, IComparerExpression rankComparer, QueryContainer container)
: base(rankComparer, container)
{
- Contract.Requires(rankComparer != null);
+ ArgumentNullException.ThrowIfNull(rankComparer);
Contract.Requires(percentile >= 0.0 && percentile <= 1.0);
this.percentile = percentile;
}
- public override Expression, double>> ComputeResult() => set => CalculatePercentile(set);
+ public override Expression, double>> ComputeResult() => set => this.CalculatePercentile(set);
public double CalculatePercentile(SortedMultiSet set)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileDiscrete.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileDiscrete.cs
index 2cd3d8771..0e99e6a86 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileDiscrete.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileDiscrete.cs
@@ -18,12 +18,12 @@ public PercentileDiscreteDoubleAggregate(double percentile, QueryContainer conta
public PercentileDiscreteDoubleAggregate(double percentile, IComparerExpression rankComparer, QueryContainer container)
: base(rankComparer, container)
{
- Contract.Requires(rankComparer != null);
+ ArgumentNullException.ThrowIfNull(rankComparer);
Contract.Requires(percentile >= 0.0 && percentile <= 1.0);
this.percentile = percentile;
}
- public override Expression, double>> ComputeResult() => set => CalculatePercentile(set);
+ public override Expression, double>> ComputeResult() => set => this.CalculatePercentile(set);
public double CalculatePercentile(SortedMultiSet set)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
index fe31f2630..519484698 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
@@ -19,7 +19,7 @@ public SlidingMaxAggregate(QueryContainer container) : this(ComparerExpression comparer, QueryContainer container)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
this.comparer = comparer.GetCompareExpr().Compile();
var generator = comparer.CreateSortedDictionaryGenerator(container);
@@ -30,10 +30,10 @@ Expression>, MinMaxState>> template
}
private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
+ public Expression>> InitialState() => this.initialState;
public Expression, long, T, MinMaxState>> Accumulate()
- => (state, timestamp, input) => Accumulate(state, timestamp, input);
+ => (state, timestamp, input) => this.Accumulate(state, timestamp, input);
private MinMaxState Accumulate(MinMaxState state, long timestamp, T input)
{
@@ -75,7 +75,7 @@ private static MinMaxState Difference(MinMaxState leftSet, MinMaxState
return leftSet;
}
- public Expression, T>> ComputeResult() => state => ComputeResult(state);
+ public Expression, T>> ComputeResult() => state => this.ComputeResult(state);
private T ComputeResult(MinMaxState state)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
index 9ad193bec..5e6e203bd 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
@@ -19,7 +19,7 @@ public SlidingMinAggregate(QueryContainer container) : this(ComparerExpression comparer, QueryContainer container)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
this.comparer = comparer.GetCompareExpr().Compile();
var generator = comparer.CreateSortedDictionaryGenerator(container);
@@ -30,10 +30,10 @@ Expression>, MinMaxState>> template
}
private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
+ public Expression>> InitialState() => this.initialState;
public Expression, long, T, MinMaxState>> Accumulate()
- => (state, timestamp, input) => Accumulate(state, timestamp, input);
+ => (state, timestamp, input) => this.Accumulate(state, timestamp, input);
private MinMaxState Accumulate(MinMaxState state, long timestamp, T input)
{
@@ -75,7 +75,7 @@ private static MinMaxState Difference(MinMaxState leftSet, MinMaxState
return leftSet;
}
- public Expression, T>> ComputeResult() => state => ComputeResult(state);
+ public Expression, T>> ComputeResult() => state => this.ComputeResult(state);
private T ComputeResult(MinMaxState state)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs
index 645f9f226..17dc5c772 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs
@@ -17,11 +17,11 @@ protected SortedMultisetAggregateBase(IComparerExpression comparer, QueryCont
Expression>, SortedMultiSet>> template
= (g) => new SortedMultiSet(g);
var replaced = template.ReplaceParametersInBody(generator);
- initialState = Expression.Lambda>>(replaced);
+ this.initialState = Expression.Lambda>>(replaced);
}
private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
+ public Expression>> InitialState() => this.initialState;
private static readonly Expression, long, T, SortedMultiSet>> acc
= (set, timestamp, input) => set.Add(input);
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/StatisticalAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/StatisticalAggregate.cs
index a8017fdb7..32a313b96 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/StatisticalAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/StatisticalAggregate.cs
@@ -3,14 +3,19 @@
// Licensed under the MIT License
// *********************************************************************
using System;
+using System.Buffers;
using System.Collections.Generic;
-using System.Linq;
using System.Linq.Expressions;
+using System.Numerics.Tensors;
+using System.Runtime.InteropServices;
namespace Microsoft.StreamProcessing.Aggregates
{
internal abstract class StatisticalAggregate : ListAggregateBase
{
+ /// At or below this size, a simple scalar loop avoids renting a scratch buffer for the second pass.
+ internal const int VarianceScratchThreshold = 64;
+
protected static double? ComputeStdev(List valueList, bool useAsPopulation)
{
var variance = ComputeVariance(valueList, useAsPopulation);
@@ -22,19 +27,43 @@ internal abstract class StatisticalAggregate : ListAggregateBase element / list.Count);
+ int n = list.Count;
+ var divisor = useAsPopulation ? n : n - 1;
+ ReadOnlySpan span = CollectionsMarshal.AsSpan(list);
- // for the population variance the divisor is n, for the sample variance the divisor is n - 1
- var divisor = useAsPopulation ? list.Count : list.Count - 1;
+ // TensorPrimitives.Sum is SIMD-accelerated on supported hardware.
+ // Mean is computed as Sum/n rather than Sum(x/n) per element; this is faster and
+ // typically as accurate; for pathological magnitudes, per-element scaling avoids
+ // intermediate overflow in the sum (trade-off: rare edge case vs. hot-path cost).
+ double mean = TensorPrimitives.Sum(span) / n;
- // compute variance
- // instead of dividing the sum of differences we divide each difference to try to avoid a potential overflow
- double variance = list.Select(element => element - mean).Sum(difference => (difference * difference) / divisor);
+ double variance;
+ if (n <= VarianceScratchThreshold)
+ {
+ variance = 0.0;
+ for (int i = 0; i < n; i++)
+ {
+ double d = span[i] - mean;
+ variance += (d * d) / divisor;
+ }
+ }
+ else
+ {
+ double[] rented = ArrayPool.Shared.Rent(n);
+ try
+ {
+ Span scratch = rented.AsSpan(0, n);
+ TensorPrimitives.Subtract(span, mean, scratch);
+ TensorPrimitives.Multiply(scratch, scratch, scratch);
+ variance = TensorPrimitives.Sum(scratch) / divisor;
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(rented);
+ }
+ }
- // difference or variance can still overflow
- return double.IsInfinity(variance) ? null : (double?)variance;
+ return double.IsInfinity(variance) ? null : variance;
}
}
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
index 15ebdacff..c9ff82461 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
@@ -22,8 +22,8 @@ public TopKAggregate(int k, IComparerExpression rankComparer, QueryContainer
public TopKAggregate(int k, IComparerExpression rankComparer, IComparerExpression overallComparer, QueryContainer container)
: base(ThenOrderBy(Reverse(rankComparer), overallComparer), container)
{
- Contract.Requires(rankComparer != null);
- Contract.Requires(overallComparer != null);
+ ArgumentNullException.ThrowIfNull(rankComparer);
+ ArgumentNullException.ThrowIfNull(overallComparer);
Contract.Requires(k > 0);
this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile();
this.k = k;
@@ -31,7 +31,7 @@ public TopKAggregate(int k, IComparerExpression rankComparer, IComparerExpres
private static IComparerExpression Reverse(IComparerExpression comparer)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
var expression = comparer.GetCompareExpr();
Expression> template = (left, right) => CallInliner.Call(expression, right, left);
var reversedExpression = template.InlineCalls();
@@ -40,8 +40,8 @@ private static IComparerExpression Reverse(IComparerExpression comparer)
private static IComparerExpression ThenOrderBy(IComparerExpression comparer1, IComparerExpression comparer2)
{
- Contract.Requires(comparer1 != null);
- Contract.Requires(comparer2 != null);
+ ArgumentNullException.ThrowIfNull(comparer1);
+ ArgumentNullException.ThrowIfNull(comparer2);
var primary = comparer1.GetCompareExpr();
var secondary = comparer2.GetCompareExpr();
Expression> template =
@@ -53,7 +53,7 @@ private static IComparerExpression ThenOrderBy(IComparerExpression compare
return new ComparerExpression(newExpression);
}
- public override Expression, List>>> ComputeResult() => set => GetTopK(set);
+ public override Expression, List>>> ComputeResult() => set => this.GetTopK(set);
private List> GetTopK(SortedMultiSet set)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMaxAggregate.cs
index f53686fa5..a76600c71 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMaxAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMaxAggregate.cs
@@ -19,7 +19,7 @@ public TumblingMaxAggregate() : this(ComparerExpression.Default) { }
public TumblingMaxAggregate(IComparerExpression comparer)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
var stateExpression = Expression.Parameter(typeof(MinMaxState), "state");
var timestampExpression = Expression.Parameter(typeof(long), "timestamp");
@@ -32,7 +32,7 @@ public TumblingMaxAggregate(IComparerExpression comparer)
var comparerExpression = comparer.GetCompareExpr().ReplaceParametersInBody(
inputExpression, currentValue.ReplaceParametersInBody(stateExpression));
- var typeInfo = typeof(MinMaxState).GetTypeInfo();
+ var minMaxStateType = typeof(MinMaxState);
this.accumulate = Expression.Lambda, long, T, MinMaxState>>(
Expression.Condition(
Expression.OrElse(
@@ -40,8 +40,8 @@ public TumblingMaxAggregate(IComparerExpression comparer)
Expression.GreaterThan(comparerExpression, Expression.Constant(0))),
Expression.MemberInit(
(NewExpression)constructor.Body,
- Expression.Bind(typeInfo.GetField("currentTimestamp"), timestampExpression),
- Expression.Bind(typeInfo.GetField("currentValue"), inputExpression)),
+ Expression.Bind(minMaxStateType.GetField("currentTimestamp"), timestampExpression),
+ Expression.Bind(minMaxStateType.GetField("currentValue"), inputExpression)),
stateExpression),
stateExpression,
timestampExpression,
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMinAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMinAggregate.cs
index 2427e3e86..7e83e473d 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMinAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMinAggregate.cs
@@ -19,7 +19,7 @@ public TumblingMinAggregate() : this(ComparerExpression.Default) { }
public TumblingMinAggregate(IComparerExpression comparer)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
var stateExpression = Expression.Parameter(typeof(MinMaxState), "state");
var timestampExpression = Expression.Parameter(typeof(long), "timestamp");
@@ -32,7 +32,7 @@ public TumblingMinAggregate(IComparerExpression comparer)
var comparerExpression = comparer.GetCompareExpr().ReplaceParametersInBody(
inputExpression, currentValue.ReplaceParametersInBody(stateExpression));
- var typeInfo = typeof(MinMaxState).GetTypeInfo();
+ var minMaxStateType = typeof(MinMaxState);
this.accumulate = Expression.Lambda, long, T, MinMaxState>>(
Expression.Condition(
Expression.OrElse(
@@ -40,8 +40,8 @@ public TumblingMinAggregate(IComparerExpression comparer)
Expression.LessThan(comparerExpression, Expression.Constant(0))),
Expression.MemberInit(
(NewExpression)constructor.Body,
- Expression.Bind(typeInfo.GetField("currentTimestamp"), timestampExpression),
- Expression.Bind(typeInfo.GetField("currentValue"), inputExpression)),
+ Expression.Bind(minMaxStateType.GetField("currentTimestamp"), timestampExpression),
+ Expression.Bind(minMaxStateType.GetField("currentValue"), inputExpression)),
stateExpression),
stateExpression,
timestampExpression,
diff --git a/Sources/Core/Microsoft.StreamProcessing/CacheUtilities/StreamableIO.cs b/Sources/Core/Microsoft.StreamProcessing/CacheUtilities/StreamableIO.cs
index c30e42081..b3ed00143 100644
--- a/Sources/Core/Microsoft.StreamProcessing/CacheUtilities/StreamableIO.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/CacheUtilities/StreamableIO.cs
@@ -149,27 +149,19 @@ private static void InferProperties(
}
}
- internal sealed class QueuedMessageObservable : IObservable>>
+ internal sealed class QueuedMessageObservable(
+ IStreamable stream
+ ) : IObservable>>
{
- private readonly IStreamable streamable;
-
- public QueuedMessageObservable(IStreamable stream) => this.streamable = stream;
-
public IDisposable Subscribe(IObserver>> observer)
- => this.streamable.Subscribe(new QueuedMessageObserver(observer));
+ => stream.Subscribe(new QueuedMessageObserver(observer));
}
- internal sealed class QueuedMessageObserver : IStreamObserver, IDisposable
+ internal sealed class QueuedMessageObserver(
+ IObserver>> observer
+ ) : IStreamObserver, IDisposable
{
- private readonly IObserver>> observer;
-
- public QueuedMessageObserver(IObserver>> observer)
- {
- this.observer = observer;
- this.ClassId = Guid.NewGuid();
- }
-
- public Guid ClassId { get; }
+ public Guid ClassId { get; } = Guid.NewGuid();
public int CurrentlyBufferedOutputCount => 0;
@@ -177,10 +169,10 @@ public QueuedMessageObserver(IObserver