diff --git a/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs b/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs index 5d7452792a..4d62f67d1b 100644 --- a/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs +++ b/src/NServiceBus.AcceptanceTests/Outbox/When_subscribers_handles_the_same_event.cs @@ -1,6 +1,5 @@ namespace NServiceBus.AcceptanceTests.Outbox; -using System.Threading; using System.Threading.Tasks; using AcceptanceTesting; using AcceptanceTesting.Customization; @@ -9,17 +8,17 @@ namespace NServiceBus.AcceptanceTests.Outbox; using Features; using NUnit.Framework; -public class When_subscribers_handles_the_same_event : NServiceBusAcceptanceTest +public class When_outbox_is_used_by_multiple_subscribers_for_the_same_event : NServiceBusAcceptanceTest { - [Test, CancelAfter(10_000)] - public async Task Should_be_processed_by_all_subscribers(CancellationToken cancellationToken = default) + [Test] + public async Task Each_subscriber_should_dispatch_its_own_transport_operations() { Requires.OutboxPersistence(); var context = await Scenario.Define() - .WithEndpoint(b => - b.When(c => c.Subscriber1Subscribed && c.Subscriber2Subscribed, session => session.Publish(new MyEvent())) - ) + .WithEndpoint(b => b.When( + c => c is { Subscriber1Subscribed: true, Subscriber2Subscribed: true }, + session => session.Publish(new MyEvent()))) .WithEndpoint(b => b.When(async (session, ctx) => { await session.Subscribe(); @@ -46,12 +45,14 @@ public async Task Should_be_processed_by_all_subscribers(CancellationToken cance ctx.AddTrace("Subscriber2 has now asked to be subscribed to MyEvent"); } })) - .Run(cancellationToken); + .WithEndpoint() + .Run(); using (Assert.EnterMultipleScope()) { - Assert.That(context.Subscriber1GotTheEvent, Is.True); - Assert.That(context.Subscriber2GotTheEvent, Is.True); + Assert.That(context.FailedMessages.IsEmpty, Is.True); + Assert.That(context.Subscriber1ProcessedConfirmed, Is.True); + Assert.That(context.Subscriber2ProcessedConfirmed, Is.True); } } @@ -59,31 +60,30 @@ public class Context : ScenarioContext { public bool Subscriber1Subscribed { get; set; } public bool Subscriber2Subscribed { get; set; } + public bool Subscriber1ProcessedConfirmed { get; set; } + public bool Subscriber2ProcessedConfirmed { get; set; } - public bool Subscriber1GotTheEvent { get; set; } - public bool Subscriber2GotTheEvent { get; set; } - - public void MaybeCompleted() => MarkAsCompleted(Subscriber1GotTheEvent, Subscriber2GotTheEvent); + public void MaybeCompleted() => MarkAsCompleted(Subscriber1ProcessedConfirmed, Subscriber2ProcessedConfirmed); } public class Publisher : EndpointConfigurationBuilder { public Publisher() => EndpointSetup(c => { - c.OnEndpointSubscribed((s, context) => + c.OnEndpointSubscribed((s, ctx) => { var subscriber1 = Conventions.EndpointNamingConvention(typeof(Subscriber1)); if (s.SubscriberEndpoint.Contains(subscriber1)) { - context.Subscriber1Subscribed = true; - context.AddTrace($"{subscriber1} is now subscribed"); + ctx.Subscriber1Subscribed = true; + ctx.AddTrace($"{subscriber1} is now subscribed"); } var subscriber2 = Conventions.EndpointNamingConvention(typeof(Subscriber2)); if (s.SubscriberEndpoint.Contains(subscriber2)) { - context.Subscriber2Subscribed = true; - context.AddTrace($"{subscriber2} is now subscribed"); + ctx.Subscriber2Subscribed = true; + ctx.AddTrace($"{subscriber2} is now subscribed"); } }); }, metadata => metadata.RegisterSelfAsPublisherFor(this)); @@ -91,42 +91,55 @@ public Publisher() => EndpointSetup(c => public class Subscriber1 : EndpointConfigurationBuilder { - public Subscriber1() => - EndpointSetup(c => - { - c.DisableFeature(); - - c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; - c.EnableOutbox(); - }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + public Subscriber1() => EndpointSetup(c => + { + c.DisableFeature(); + c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + c.EnableOutbox(); + c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber1), typeof(Collector)); + }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); - public class MyHandler(Context testContext) : IHandleMessages + public class MyEventMessageHandler : IHandleMessages { - public Task Handle(MyEvent message, IMessageHandlerContext context) - { - testContext.Subscriber1GotTheEvent = true; - testContext.MaybeCompleted(); - return Task.CompletedTask; - } + public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber1()); } } public class Subscriber2 : EndpointConfigurationBuilder { - public Subscriber2() => - EndpointSetup(c => - { - c.DisableFeature(); + public Subscriber2() => EndpointSetup(c => + { + c.DisableFeature(); + c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + c.EnableOutbox(); + c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber2), typeof(Collector)); + }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); - c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; - c.EnableOutbox(); - }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + public class MyEventMessageHandler : IHandleMessages + { + public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber2()); + } + } + + public class Collector : EndpointConfigurationBuilder + { + public Collector() => EndpointSetup(); + + public class Subscriber1ProcessedHandler(Context testContext) : IHandleMessages + { + public Task Handle(ProcessBySubscriber1 message, IMessageHandlerContext context) + { + testContext.Subscriber1ProcessedConfirmed = true; + testContext.MaybeCompleted(); + return Task.CompletedTask; + } + } - public class MyHandler(Context testContext) : IHandleMessages + public class Subscriber2ProcessedHandler(Context testContext) : IHandleMessages { - public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context) + public Task Handle(ProcessBySubscriber2 message, IMessageHandlerContext context) { - testContext.Subscriber2GotTheEvent = true; + testContext.Subscriber2ProcessedConfirmed = true; testContext.MaybeCompleted(); return Task.CompletedTask; } @@ -134,4 +147,6 @@ public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context } public class MyEvent : IEvent; -} \ No newline at end of file + public class ProcessBySubscriber1 : IMessage; + public class ProcessBySubscriber2 : IMessage; +}