Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System;
using System.IO;
using System.Reflection;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;
using Particular.Approvals;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace NServiceBus.Core.Tests.AssemblyScanner;

using System.Linq;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner;

using System.IO;
using System.Linq;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using System.IO;
using System.Reflection;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using System.IO;
using System.Linq;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner;
using System.IO;
using System.Linq;
using System.Runtime.Loader;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using System.IO;
using System.Linq;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner;

using System.IO;
using System.Linq;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner;

using System.IO;
using System.Linq;
using Hosting.Helpers;
using NServiceBus.Hosting.Helpers;
using NUnit.Framework;

[TestFixture]
Expand Down
129 changes: 129 additions & 0 deletions src/NServiceBus.Core.Tests/Audit/InvokeAuditPipelineBehaviorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
namespace NServiceBus.Core.Tests.Audit;

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using NServiceBus.Pipeline;
using NUnit.Framework;
using Testing;
using Transport;

[TestFixture]
public class InvokeAuditPipelineBehaviorTests
{
[Test]
public async Task Should_invoke_next_before_forking_audit_pipeline()
{
var behavior = new InvokeAuditPipelineBehavior("audit", TimeSpan.FromMinutes(1));
var context = CreateContext(new RecordingAuditPipeline());
var nextWasCalled = false;

await behavior.Invoke(context, _ =>
{
nextWasCalled = true;
return Task.CompletedTask;
});

Assert.That(nextWasCalled, Is.True);
}

[Test]
public async Task Should_create_audit_message_with_copied_headers()
{
var auditPipeline = new RecordingAuditPipeline();
var behavior = new InvokeAuditPipelineBehavior("audit", TimeSpan.FromMinutes(1));
var context = CreateContext(auditPipeline);
context.Message.Headers["Custom"] = "value";

await behavior.Invoke(context, _ => Task.CompletedTask);

var audited = auditPipeline.LastContext.Message;
using (Assert.EnterMultipleScope())
{
Assert.That(audited.Headers, Is.EquivalentTo(context.Message.Headers));
Assert.That(audited.Headers, Is.Not.SameAs(context.Message.Headers));
}
}

[Test]
public async Task Should_use_incoming_message_id_and_body_for_audit_message()
{
var auditPipeline = new RecordingAuditPipeline();
var behavior = new InvokeAuditPipelineBehavior("audit", TimeSpan.FromMinutes(1));
var context = CreateContext(auditPipeline);

var body = new byte[] { 1, 2, 3, 4 };
context.Message = new IncomingMessage("incoming-id", [], body);

await behavior.Invoke(context, _ => Task.CompletedTask);

var audited = auditPipeline.LastContext.Message;
using (Assert.EnterMultipleScope())
{
Assert.That(audited.MessageId, Is.EqualTo("incoming-id"));
Assert.That(audited.Body.ToArray(), Is.EqualTo(body));
}
}

[Test]
public async Task Should_pass_audit_address_and_ttbr_to_audit_context()
{
var auditPipeline = new RecordingAuditPipeline();
var timeToBeReceived = TimeSpan.FromMinutes(5);
var behavior = new InvokeAuditPipelineBehavior("configured-audit-address", timeToBeReceived);
var context = CreateContext(auditPipeline);

await behavior.Invoke(context, _ => Task.CompletedTask);

using (Assert.EnterMultipleScope())
{
Assert.That(auditPipeline.LastContext.AuditAddress, Is.EqualTo("configured-audit-address"));
Assert.That(auditPipeline.LastContext.TimeToBeReceived, Is.EqualTo(timeToBeReceived));
}
}

[Test]
public void Should_not_fork_audit_pipeline_when_next_throws()
{
var auditPipeline = new RecordingAuditPipeline();
var behavior = new InvokeAuditPipelineBehavior("audit", TimeSpan.FromMinutes(1));
var context = CreateContext(auditPipeline);

var expected = new Exception("next failed");

var thrown = Assert.ThrowsAsync<Exception>(() => behavior.Invoke(context, _ => Task.FromException(expected)));

using (Assert.EnterMultipleScope())
{
Assert.That(thrown, Is.SameAs(expected));
Assert.That(auditPipeline.InvocationCount, Is.EqualTo(0));
}
}

static TestableIncomingPhysicalMessageContext CreateContext(RecordingAuditPipeline pipeline)
{
var context = new TestableIncomingPhysicalMessageContext();
context.Extensions.Set<IPipelineCache>(new FakePipelineCache(pipeline));
return context;
}

sealed class RecordingAuditPipeline : IPipeline<IAuditContext>
{
public int InvocationCount { get; private set; }
public IAuditContext LastContext { get; private set; }

public Task Invoke(IAuditContext context)
{
InvocationCount++;
LastContext = context;
return Task.CompletedTask;
}
}

sealed class FakePipelineCache(IPipeline<IAuditContext> pipeline) : IPipelineCache
{
public IPipeline<TContext> Pipeline<TContext>()
where TContext : IBehaviorContext =>
(IPipeline<TContext>)pipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,88 @@ public async Task Should_set_the_related_to_header_with_the_id_of_the_current_me
Assert.That(context.Headers[Headers.RelatedTo], Is.EqualTo("the message id"));
}

[Test]
public async Task Should_set_start_new_conversation_id_when_explicit_conversation_id_is_provided()
{
var behavior = new AttachCausationHeadersBehavior(ReturnDefaultConversationId);
var options = new SendOptions();
options.StartNewConversation("new conversation");

var context = new TestableOutgoingLogicalMessageContext
{
Extensions = options.Context
};

await behavior.Invoke(context, ctx => Task.CompletedTask);

Assert.That(context.Headers[Headers.ConversationId], Is.EqualTo("new conversation"));
}

[Test]
public async Task Should_set_start_new_conversation_id_using_strategy_when_no_explicit_conversation_id_is_provided()
{
var generatedId = "generated conversation";
var behavior = new AttachCausationHeadersBehavior(_ => generatedId);
var options = new SendOptions();
options.StartNewConversation();

var context = new TestableOutgoingLogicalMessageContext
{
Extensions = options.Context
};

await behavior.Invoke(context, ctx => Task.CompletedTask);

Assert.That(context.Headers[Headers.ConversationId], Is.EqualTo(generatedId));
}

[Test]
public async Task Should_set_previous_conversation_id_when_starting_new_conversation_from_incoming_message()
{
var incomingConversationId = "incoming conversation";
var behavior = new AttachCausationHeadersBehavior(_ => "new conversation");
var options = new SendOptions();
options.StartNewConversation();

var context = new TestableOutgoingLogicalMessageContext
{
Extensions = options.Context
};
context.Extensions.Set(new IncomingMessage("message-id", new Dictionary<string, string>
{
{ Headers.ConversationId, incomingConversationId }
}, Array.Empty<byte>()));

await behavior.Invoke(context, ctx => Task.CompletedTask);

using (Assert.EnterMultipleScope())
{
Assert.That(context.Headers[Headers.ConversationId], Is.EqualTo("new conversation"));
Assert.That(context.Headers[Headers.PreviousConversationId], Is.EqualTo(incomingConversationId));
}
}

[Test]
public void When_user_defined_conversation_id_is_set_and_start_new_conversation_was_requested_should_throw()
{
var behavior = new AttachCausationHeadersBehavior(ReturnDefaultConversationId);
var options = new SendOptions();
options.StartNewConversation();

var context = new TestableOutgoingLogicalMessageContext
{
Extensions = options.Context,
Headers =
{
[Headers.ConversationId] = "user-defined"
}
};

var exception = Assert.ThrowsAsync<Exception>(() => behavior.Invoke(context, ctx => Task.CompletedTask));

Assert.That(exception.Message, Is.EqualTo($"Cannot set the {Headers.ConversationId} header to 'user-defined' as StartNewConversation() was called."));
}

string ReturnDefaultConversationId(IOutgoingLogicalMessageContext context)
{
return ConversationId.Default.Value;
Expand Down
Loading
Loading