Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
961940c
MM
adam-dot-cohen Dec 2, 2022
a28d04e
1.GlobalSuppression file added to supress irregular naming conventions
adam-dot-cohen Dec 2, 2022
7fa170a
- Removed netcoreapp3.1
adam-dot-cohen Jan 11, 2023
4cc6c5b
Upgrade to .NET 8 and fix async-incompatible ConfigModifier locking
amirburbea Mar 24, 2026
cc4836c
Fix race condition in BinaryPipe.OnCompleted causing lost batches und…
amirburbea Mar 24, 2026
d1d21a3
Upgrade to .NET 10, remove StyleCop from core libraries, fix expressi…
amirburbea Mar 24, 2026
fbdd523
Merge branch 'master' into modernize-net10
amirburbea Mar 24, 2026
b4e281f
Eliminate heap allocations in BinaryDecoder/BinaryEncoder hot paths
amirburbea Mar 24, 2026
e86158a
Fix JoinTestWithException to be deterministic across test orderings
amirburbea Mar 24, 2026
f05cf2d
Fix long standing bug in CountAggregate where Sum was accidentally im…
amirburbea Mar 26, 2026
a07b6d5
Refactor: use method groups & C# 12 collection expressions and span/s…
amirburbea Mar 27, 2026
3f55305
Remove unused references to Newtonsoft.Json
amirburbea Mar 27, 2026
a0c7ae9
Add NuGet packaging for Trill.StreamProcessing 10.0.0
amirburbea Mar 28, 2026
9e1ebf7
- Add this.
amirburbea Apr 12, 2026
1d565b9
Merge modernize-net10 into nuget-packaging for release
amirburbea Apr 12, 2026
962ab60
Bump package version to 10.0.1 for NuGet release
amirburbea Apr 12, 2026
8183dfc
Remove references to typeInfo, use span more, use .net10 bitvector
amirburbea Apr 12, 2026
f5c8b41
Merge branch 'modernize-net10' into nuget-packaging
amirburbea Apr 12, 2026
74e4977
Packaging: ship core only in nupkg (no Provider); bump version to 10.0.2
amirburbea Apr 12, 2026
5a466d7
Make StreamEvent readonly and use System.Threading.Lock in more places
amirburbea Apr 16, 2026
c16eaf1
Merge remote-tracking branch 'origin/modernize-net10' into nuget-pack…
amirburbea Apr 16, 2026
c9fde13
Bump version to 10.0.3
amirburbea Apr 16, 2026
5d1f28d
Add optional support for codegen cache in config
amirburbea Apr 17, 2026
dec6c6b
Bump to 10.0.5
amirburbea Apr 17, 2026
3a28e4f
Add more rigorous checks for skipping the codegen cache
amirburbea Apr 18, 2026
d6f922a
Bump package version to 10.0.6
amirburbea Apr 18, 2026
bf78ab4
Use better mechanism for deciding to use a cached assembly
amirburbea Apr 18, 2026
0623d3b
Further modernizations in c#
amirburbea Apr 18, 2026
5d69d77
fixes for codegen_timing
amirburbea Apr 18, 2026
04acf5e
Use new pattern to prevent failing when the cache is invalid, allowin…
amirburbea Apr 18, 2026
d7ad0f8
Remove obsolete StreamProcessing.nuspec; packaging uses SDK-style dot…
amirburbea Apr 18, 2026
19cd1e9
Use SIMD for stdev, further lock/modernization work
amirburbea May 9, 2026
fafc549
Bump to 10.1.1
amirburbea May 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ bld/
## See more on package restore for Nuget 2.7 here: http://docs.nuget.org/consume/package-restore
# Ignore NuGet Packages
*.nupkg
nupkg/
# Ignore the packages folder
**/packages/*
# except build/, which is used as an MSBuild target.
Expand Down Expand Up @@ -58,3 +59,5 @@ $tf/pendingchanges.tfb
$tf/properties.tf1
project.lock.json
**/.vs/

.claude/
20 changes: 20 additions & 0 deletions PACKAGE_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Trill.StreamProcessing

Trill is a high-performance one-pass in-memory streaming analytics engine originally developed by Microsoft Research. It can handle both real-time and offline data, and is based on a temporal data and query model. Trill can be used as a streaming engine, a lightweight in-memory relational engine, and as a progressive query processor for early query results on partial data.

This package is a community-maintained fork of [Microsoft's Trill](https://github.com/microsoft/Trill), updated for .NET 10 and actively maintained by [Amir Burbea](https://github.com/amirburbea).

## Installation

```
dotnet add package Trill.StreamProcessing
```

This package contains `Microsoft.StreamProcessing` (the core streaming engine). The experimental `Microsoft.StreamProcessing.Provider` project in the repo is not published as part of this package.

## Learn More

- [Trill on GitHub](https://github.com/amirburbea/trill)
- [Original announcement blog post](https://azure.microsoft.com/en-us/blog/microsoft-open-sources-trill-to-deliver-insights-on-a-trillion-events-a-day/)
- [Trill paper (VLDB 2015)](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/trill-vldb2015.pdf)
- [Trill samples repository](https://github.com/Microsoft/TrillSamples)
2 changes: 1 addition & 1 deletion Sources/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ csharp_new_line_before_catch = true
csharp_new_line_before_finally = true
csharp_indent_case_contents = true
csharp_indent_switch_labels = true
csharp_preserve_single_line_statements = false
csharp_preserve_single_line_statements = false
10 changes: 0 additions & 10 deletions Sources/Core/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,5 @@
<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />

<!-- Code Analysis -->
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0' And '$(Configuration)' == 'Debug'">
<PackageReference Include="StyleCop.Analyzers" Version="1.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<!-- Do not set RunCodeAnalysis*, as this is incompatible with netstandard2.0. Analyzers run by default whenever included. -->
<CodeAnalysisRuleSet>$(EnlistmentRoot)\Sources\Microsoft.StreamProcessing.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<TargetFrameworks>net10.0</TargetFrameworks>
<Platforms>x64;AnyCPU</Platforms>
</PropertyGroup>

<PropertyGroup>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<RootNamespace>Microsoft.StreamProcessing.Provider</RootNamespace>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.Scripting" Version="3.1.0" />
<PackageReference Include="System.Diagnostics.Contracts" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.Process" Version="4.3.0" />
<PackageReference Include="System.Runtime.Serialization.Primitives" Version="4.3.0" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'">
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
<PackageReference Include="Microsoft.CodeAnalysis.Scripting" Version="5.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public static IAggregate<TInput, TState, TResult> Wrap<TInput, TAggregateInput,
this IAggregate<TAggregateInput, TState, TResult> aggregate,
Expression<Func<TInput, TAggregateInput>> transform)
{
Contract.Requires(aggregate != null);
Contract.Requires(transform != null);
ArgumentNullException.ThrowIfNull(aggregate);
ArgumentNullException.ThrowIfNull(transform);
return aggregate.TransformInput(transform);
}

Expand All @@ -62,17 +62,17 @@ public static IAggregate<TInput, TState, TResult> Wrap<TInput, TAggregateInput,
this IAggregate<TAggregateInput, TState, TResult> aggregate,
Expression<Func<TInput, TAggregateInput?>> transform) where TAggregateInput : struct
{
Contract.Requires(aggregate != null);
Contract.Requires(transform != null);
ArgumentNullException.ThrowIfNull(aggregate);
ArgumentNullException.ThrowIfNull(transform);
return aggregate.MakeInputNullableAndSkipNulls().TransformInput(transform);
}

internal static IAggregate<TInput, TState, TResult> TransformInput<TInput, TAggregateInput, TState, TResult>(
this IAggregate<TAggregateInput, TState, TResult> aggregate,
Expression<Func<TInput, TAggregateInput>> transform)
{
Contract.Requires(aggregate != null);
Contract.Requires(transform != null);
ArgumentNullException.ThrowIfNull(aggregate);
ArgumentNullException.ThrowIfNull(transform);

return GeneratedAggregate.Create(
initialState: aggregate.InitialState(),
Expand All @@ -86,19 +86,19 @@ private static Expression<Func<T1, T2, TInput, TOutput>> TransformInput3<T1, T2,
this Expression<Func<T1, T2, T3, TOutput>> func,
Expression<Func<TInput, T3>> transform)
{
Contract.Requires(func != null);
Contract.Requires(transform != null);
ArgumentNullException.ThrowIfNull(func);
ArgumentNullException.ThrowIfNull(transform);
var result = func.ReplaceParametersInBody(func.Parameters[0], func.Parameters[1], transform.Body);
var transformParam = transform.Parameters[0];
return Expression.Lambda<Func<T1, T2, TInput, TOutput>>(result, new[] { func.Parameters[0], func.Parameters[1], transformParam });
return Expression.Lambda<Func<T1, T2, TInput, TOutput>>(result, [func.Parameters[0], func.Parameters[1], transformParam]);
}

internal static IAggregate<TInput, TState, TResult> TransformOutput<TInput, TState, TAggregateResult, TResult>(
this IAggregate<TInput, TState, TAggregateResult> aggregate,
Expression<Func<TAggregateResult, TResult>> transform)
{
Contract.Requires(aggregate != null);
Contract.Requires(transform != null);
ArgumentNullException.ThrowIfNull(aggregate);
ArgumentNullException.ThrowIfNull(transform);

return GeneratedAggregate.Create(
initialState: aggregate.InitialState(),
Expand All @@ -112,8 +112,8 @@ private static Expression<Func<T1, TOutput>> TransformOutput<T1, TFuncOutput, TO
this Expression<Func<T1, TFuncOutput>> func,
Expression<Func<TFuncOutput, TOutput>> transform)
{
Contract.Requires(func != null);
Contract.Requires(transform != null);
ArgumentNullException.ThrowIfNull(func);
ArgumentNullException.ThrowIfNull(transform);
var result = transform.ReplaceParametersInBody(func.Body);
return Expression.Lambda<Func<T1, TOutput>>(result, func.Parameters);
}
Expand All @@ -127,7 +127,7 @@ public static IAggregate<TInput, TState, TResult> ApplyFilter<TInput, TState, TR
this IAggregate<TInput, TState, TResult> aggregate,
Expression<Func<TInput, bool>> filter)
{
Contract.Requires(aggregate != null);
ArgumentNullException.ThrowIfNull(aggregate);
if (filter == null || filter.Body.ExpressionEquals(Expression.Constant(true))) return aggregate;

Expression<Func<TState, long, TInput, TState>> newAccumulate = (oldState, timestamp, input) =>
Expand All @@ -151,7 +151,7 @@ public static IAggregate<TInput, TState, TResult> ApplyFilter<TInput, TState, TR
public static IAggregate<TInput?, TState, TResult> MakeInputNullableAndSkipNulls<TInput, TState, TResult>(
this IAggregate<TInput, TState, TResult> aggregate) where TInput : struct
{
Contract.Requires(aggregate != null);
ArgumentNullException.ThrowIfNull(aggregate);

Expression<Func<TState, long, TInput?, TState>> newAccumulate = (oldState, timestamp, input) =>
input.HasValue ? CallInliner.Call(aggregate.Accumulate(), oldState, timestamp, input.Value) : oldState;
Expand All @@ -175,9 +175,9 @@ public static IAggregate<TInput, TState, TResult> ApplyFilter<TInput, TState, TR
public static IAggregate<TInput, TState, TResult> SkipNulls<TInput, TState, TResult>(
this IAggregate<TInput, TState, TResult> aggregate)
{
Contract.Requires(aggregate != null);
ArgumentNullException.ThrowIfNull(aggregate);

var inputType = typeof(TInput).GetTypeInfo();
var inputType = typeof(TInput);
return inputType.IsClass
? GeneratedAggregate.Create(
initialState: aggregate.InitialState(),
Expand Down Expand Up @@ -229,7 +229,7 @@ private static Expression<Func<TState, long, TInput, TState>> AddSkipNullValueLo
public static IAggregate<TInput, NullOutputWrapper<TState>, TResult?> MakeOutputNullableAndOutputNullWhenEmpty<TInput, TState, TResult>(
this IAggregate<TInput, TState, TResult> aggregate) where TResult : struct
{
Contract.Requires(aggregate != null);
ArgumentNullException.ThrowIfNull(aggregate);

Expression<Func<NullOutputWrapper<TState>>> newInitialState =
() => new NullOutputWrapper<TState>
Expand Down Expand Up @@ -279,7 +279,7 @@ private static Expression<Func<TState, long, TInput, TState>> AddSkipNullValueLo
public static IAggregate<TInput, NullOutputWrapper<TState>, TResult> OutputDefaultWhenEmpty<TInput, TState, TResult>(
this IAggregate<TInput, TState, TResult> aggregate)
{
Contract.Requires(aggregate != null);
ArgumentNullException.ThrowIfNull(aggregate);

Expression<Func<NullOutputWrapper<TState>>> newInitialState =
() => new NullOutputWrapper<TState>
Expand Down
Loading