Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace NServiceBus.AcceptanceTests.Outbox;

using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
Expand All @@ -9,17 +8,17 @@ namespace NServiceBus.AcceptanceTests.Outbox;
using Features;
using NUnit.Framework;

public class When_subscribers_handles_the_same_event : NServiceBusAcceptanceTest
public class When_outbox_is_used_by_multiple_subscribers_for_the_same_event : NServiceBusAcceptanceTest
{
[Test, CancelAfter(10_000)]
public async Task Should_be_processed_by_all_subscribers(CancellationToken cancellationToken = default)
[Test]
public async Task Each_subscriber_should_dispatch_its_own_transport_operations()
{
Requires.OutboxPersistence();

var context = await Scenario.Define<Context>()
.WithEndpoint<Publisher>(b =>
b.When(c => c.Subscriber1Subscribed && c.Subscriber2Subscribed, session => session.Publish(new MyEvent()))
)
.WithEndpoint<Publisher>(b => b.When(
c => c is { Subscriber1Subscribed: true, Subscriber2Subscribed: true },
session => session.Publish(new MyEvent())))
.WithEndpoint<Subscriber1>(b => b.When(async (session, ctx) =>
{
await session.Subscribe<MyEvent>();
Expand All @@ -46,92 +45,108 @@ public async Task Should_be_processed_by_all_subscribers(CancellationToken cance
ctx.AddTrace("Subscriber2 has now asked to be subscribed to MyEvent");
}
}))
.Run(cancellationToken);
.WithEndpoint<Collector>()
.Run();

using (Assert.EnterMultipleScope())
{
Assert.That(context.Subscriber1GotTheEvent, Is.True);
Assert.That(context.Subscriber2GotTheEvent, Is.True);
Assert.That(context.FailedMessages.IsEmpty, Is.True);
Assert.That(context.Subscriber1ProcessedConfirmed, Is.True);
Assert.That(context.Subscriber2ProcessedConfirmed, Is.True);
}
}

public class Context : ScenarioContext
{
public bool Subscriber1Subscribed { get; set; }
public bool Subscriber2Subscribed { get; set; }
public bool Subscriber1ProcessedConfirmed { get; set; }
public bool Subscriber2ProcessedConfirmed { get; set; }

public bool Subscriber1GotTheEvent { get; set; }
public bool Subscriber2GotTheEvent { get; set; }

public void MaybeCompleted() => MarkAsCompleted(Subscriber1GotTheEvent, Subscriber2GotTheEvent);
public void MaybeCompleted() => MarkAsCompleted(Subscriber1ProcessedConfirmed, Subscriber2ProcessedConfirmed);
}

public class Publisher : EndpointConfigurationBuilder
{
public Publisher() => EndpointSetup<DefaultPublisher>(c =>
{
c.OnEndpointSubscribed<Context>((s, context) =>
c.OnEndpointSubscribed<Context>((s, ctx) =>
{
var subscriber1 = Conventions.EndpointNamingConvention(typeof(Subscriber1));
if (s.SubscriberEndpoint.Contains(subscriber1))
{
context.Subscriber1Subscribed = true;
context.AddTrace($"{subscriber1} is now subscribed");
ctx.Subscriber1Subscribed = true;
ctx.AddTrace($"{subscriber1} is now subscribed");
}

var subscriber2 = Conventions.EndpointNamingConvention(typeof(Subscriber2));
if (s.SubscriberEndpoint.Contains(subscriber2))
{
context.Subscriber2Subscribed = true;
context.AddTrace($"{subscriber2} is now subscribed");
ctx.Subscriber2Subscribed = true;
ctx.AddTrace($"{subscriber2} is now subscribed");
}
});
}, metadata => metadata.RegisterSelfAsPublisherFor<MyEvent>(this));
}

public class Subscriber1 : EndpointConfigurationBuilder
{
public Subscriber1() =>
EndpointSetup<DefaultServer>(c =>
{
c.DisableFeature<AutoSubscribe>();

c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
c.EnableOutbox();
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
public Subscriber1() => EndpointSetup<DefaultServer>(c =>
{
c.DisableFeature<AutoSubscribe>();
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
c.EnableOutbox();
c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber1), typeof(Collector));
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));

public class MyHandler(Context testContext) : IHandleMessages<MyEvent>
public class MyEventMessageHandler : IHandleMessages<MyEvent>
{
public Task Handle(MyEvent message, IMessageHandlerContext context)
{
testContext.Subscriber1GotTheEvent = true;
testContext.MaybeCompleted();
return Task.CompletedTask;
}
public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber1());
}
}

public class Subscriber2 : EndpointConfigurationBuilder
{
public Subscriber2() =>
EndpointSetup<DefaultServer>(c =>
{
c.DisableFeature<AutoSubscribe>();
public Subscriber2() => EndpointSetup<DefaultServer>(c =>
{
c.DisableFeature<AutoSubscribe>();
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
c.EnableOutbox();
c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber2), typeof(Collector));
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));

c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
c.EnableOutbox();
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
public class MyEventMessageHandler : IHandleMessages<MyEvent>
{
public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber2());
}
}

public class Collector : EndpointConfigurationBuilder
{
public Collector() => EndpointSetup<DefaultServer>();

public class Subscriber1ProcessedHandler(Context testContext) : IHandleMessages<ProcessBySubscriber1>
{
public Task Handle(ProcessBySubscriber1 message, IMessageHandlerContext context)
{
testContext.Subscriber1ProcessedConfirmed = true;
testContext.MaybeCompleted();
return Task.CompletedTask;
}
}

public class MyHandler(Context testContext) : IHandleMessages<MyEvent>
public class Subscriber2ProcessedHandler(Context testContext) : IHandleMessages<ProcessBySubscriber2>
{
public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context)
public Task Handle(ProcessBySubscriber2 message, IMessageHandlerContext context)
{
testContext.Subscriber2GotTheEvent = true;
testContext.Subscriber2ProcessedConfirmed = true;
testContext.MaybeCompleted();
return Task.CompletedTask;
}
}
}

public class MyEvent : IEvent;
}
public class ProcessBySubscriber1 : IMessage;
public class ProcessBySubscriber2 : IMessage;
}