From 0eb5b2301448a0c927b7fe7eb31b3151dcb1bf6e Mon Sep 17 00:00:00 2001 From: Andreas Bednarz <110360248+abparticular@users.noreply.github.com> Date: Fri, 3 Apr 2026 16:54:23 +1100 Subject: [PATCH 1/2] Fixed outbox test for multiple subscribers handling the same event (#7680) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixed outbox test for multiple subscribers handling the same event * Fix all issues caused by Mauro's merge 😅 * Only for persisters supporting outbox * Add back tracing * Update When_subscribers_handles_the_same_event.cs * More tracing, syntax error * syntax * Use routing instead of SendOpetions * Refactor handlers and routing in `When_subscribers_handles_the_same_event` acceptance test for clarity and alignment with naming conventions. --------- Co-authored-by: Mauro Servienti Co-authored-by: Daniel Marbach --- ...When_subscribers_handles_the_same_event.cs | 107 +++++++++++------- 1 file changed, 63 insertions(+), 44 deletions(-) diff --git a/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs b/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs index 5d7452792a..352d963813 100644 --- a/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs +++ b/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs @@ -1,6 +1,5 @@ namespace NServiceBus.AcceptanceTests.Outbox; -using System.Threading; using System.Threading.Tasks; using AcceptanceTesting; using AcceptanceTesting.Customization; @@ -9,17 +8,17 @@ namespace NServiceBus.AcceptanceTests.Outbox; using Features; using NUnit.Framework; -public class When_subscribers_handles_the_same_event : NServiceBusAcceptanceTest +public class When_outbox_is_used_by_multiple_subscribers_for_the_same_event : NServiceBusAcceptanceTest { - [Test, CancelAfter(10_000)] - public async Task Should_be_processed_by_all_subscribers(CancellationToken cancellationToken = default) + [Test] + public async Task Each_subscriber_should_dispatch_its_own_transport_operations() { Requires.OutboxPersistence(); var context = await Scenario.Define() - .WithEndpoint(b => - b.When(c => c.Subscriber1Subscribed && c.Subscriber2Subscribed, session => session.Publish(new MyEvent())) - ) + .WithEndpoint(b => b.When( + c => c is { Subscriber1Subscribed: true, Subscriber2Subscribed: true }, + session => session.Publish(new MyEvent()))) .WithEndpoint(b => b.When(async (session, ctx) => { await session.Subscribe(); @@ -46,12 +45,14 @@ public async Task Should_be_processed_by_all_subscribers(CancellationToken cance ctx.AddTrace("Subscriber2 has now asked to be subscribed to MyEvent"); } })) - .Run(cancellationToken); + .WithEndpoint() + .Run(); using (Assert.EnterMultipleScope()) { - Assert.That(context.Subscriber1GotTheEvent, Is.True); - Assert.That(context.Subscriber2GotTheEvent, Is.True); + Assert.That(context.FailedMessages.IsEmpty, Is.True); + Assert.That(context.Subscriber1ProcessedConfirmed, Is.True); + Assert.That(context.Subscriber2ProcessedConfirmed, Is.True); } } @@ -59,31 +60,30 @@ public class Context : ScenarioContext { public bool Subscriber1Subscribed { get; set; } public bool Subscriber2Subscribed { get; set; } + public bool Subscriber1ProcessedConfirmed { get; set; } + public bool Subscriber2ProcessedConfirmed { get; set; } - public bool Subscriber1GotTheEvent { get; set; } - public bool Subscriber2GotTheEvent { get; set; } - - public void MaybeCompleted() => MarkAsCompleted(Subscriber1GotTheEvent, Subscriber2GotTheEvent); + public void MaybeCompleted() => MarkAsCompleted(Subscriber1ProcessedConfirmed, Subscriber2ProcessedConfirmed); } public class Publisher : EndpointConfigurationBuilder { public Publisher() => EndpointSetup(c => { - c.OnEndpointSubscribed((s, context) => + c.OnEndpointSubscribed((s, ctx) => { var subscriber1 = Conventions.EndpointNamingConvention(typeof(Subscriber1)); if (s.SubscriberEndpoint.Contains(subscriber1)) { - context.Subscriber1Subscribed = true; - context.AddTrace($"{subscriber1} is now subscribed"); + ctx.Subscriber1Subscribed = true; + ctx.AddTrace($"{subscriber1} is now subscribed"); } var subscriber2 = Conventions.EndpointNamingConvention(typeof(Subscriber2)); if (s.SubscriberEndpoint.Contains(subscriber2)) { - context.Subscriber2Subscribed = true; - context.AddTrace($"{subscriber2} is now subscribed"); + ctx.Subscriber2Subscribed = true; + ctx.AddTrace($"{subscriber2} is now subscribed"); } }); }, metadata => metadata.RegisterSelfAsPublisherFor(this)); @@ -91,42 +91,59 @@ public Publisher() => EndpointSetup(c => public class Subscriber1 : EndpointConfigurationBuilder { - public Subscriber1() => - EndpointSetup(c => - { - c.DisableFeature(); + public Subscriber1() => EndpointSetup(c => + { + c.DisableFeature(); + c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + c.EnableOutbox(); + c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber1), typeof(Collector)); + }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + + [Handler] + public class MyEventMessageHandler : IHandleMessages + { + public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber1()); + } + } + + public class Subscriber2 : EndpointConfigurationBuilder + { + public Subscriber2() => EndpointSetup(c => + { + c.DisableFeature(); + c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + c.EnableOutbox(); + c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber2), typeof(Collector)); + }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + + [Handler] + public class MyEventMessageHandler : IHandleMessages + { + public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber2()); + } + } - c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; - c.EnableOutbox(); - }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + public class Collector : EndpointConfigurationBuilder + { + public Collector() => EndpointSetup(); - public class MyHandler(Context testContext) : IHandleMessages + [Handler] + public class Subscriber1ProcessedHandler(Context testContext) : IHandleMessages { - public Task Handle(MyEvent message, IMessageHandlerContext context) + public Task Handle(ProcessBySubscriber1 message, IMessageHandlerContext context) { - testContext.Subscriber1GotTheEvent = true; + testContext.Subscriber1ProcessedConfirmed = true; testContext.MaybeCompleted(); return Task.CompletedTask; } } - } - - public class Subscriber2 : EndpointConfigurationBuilder - { - public Subscriber2() => - EndpointSetup(c => - { - c.DisableFeature(); - - c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; - c.EnableOutbox(); - }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); - public class MyHandler(Context testContext) : IHandleMessages + [Handler] + public class Subscriber2ProcessedHandler(Context testContext) : IHandleMessages { - public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context) + public Task Handle(ProcessBySubscriber2 message, IMessageHandlerContext context) { - testContext.Subscriber2GotTheEvent = true; + testContext.Subscriber2ProcessedConfirmed = true; testContext.MaybeCompleted(); return Task.CompletedTask; } @@ -134,4 +151,6 @@ public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context } public class MyEvent : IEvent; + public class ProcessBySubscriber1 : IMessage; + public class ProcessBySubscriber2 : IMessage; } \ No newline at end of file From a2b3f622616a8b92d8d39286a40578d98363bf99 Mon Sep 17 00:00:00 2001 From: Mauro Servienti Date: Fri, 3 Apr 2026 14:48:57 +0200 Subject: [PATCH 2/2] Remove the 10.2-only Handler attribute --- .../Outbox/When_subscribers_handles_the_same_event.cs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs b/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs index 352d963813..4d62f67d1b 100644 --- a/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs +++ b/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs @@ -99,7 +99,6 @@ public Subscriber1() => EndpointSetup(c => c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber1), typeof(Collector)); }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); - [Handler] public class MyEventMessageHandler : IHandleMessages { public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber1()); @@ -116,7 +115,6 @@ public Subscriber2() => EndpointSetup(c => c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber2), typeof(Collector)); }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); - [Handler] public class MyEventMessageHandler : IHandleMessages { public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber2()); @@ -127,7 +125,6 @@ public class Collector : EndpointConfigurationBuilder { public Collector() => EndpointSetup(); - [Handler] public class Subscriber1ProcessedHandler(Context testContext) : IHandleMessages { public Task Handle(ProcessBySubscriber1 message, IMessageHandlerContext context) @@ -138,7 +135,6 @@ public Task Handle(ProcessBySubscriber1 message, IMessageHandlerContext context) } } - [Handler] public class Subscriber2ProcessedHandler(Context testContext) : IHandleMessages { public Task Handle(ProcessBySubscriber2 message, IMessageHandlerContext context) @@ -153,4 +149,4 @@ public Task Handle(ProcessBySubscriber2 message, IMessageHandlerContext context) public class MyEvent : IEvent; public class ProcessBySubscriber1 : IMessage; public class ProcessBySubscriber2 : IMessage; -} \ No newline at end of file +}