From a1306a6bacd7c8be723e0a17727d4e9353c4ab33 Mon Sep 17 00:00:00 2001 From: Matteo Pessina Date: Tue, 11 Jul 2023 23:31:56 +0200 Subject: [PATCH] rabbitmq direct exchange impl --- .../Bootstrapper.cs | 1 + .../DataTransferObjects.cs | 18 +++++- ...orer.Server.Infrastructure.RabbitMQ.csproj | 1 + .../RabbitMQAdministrationClient.cs | 38 +++++++------ .../RabbitMQQueuesService.cs | 56 +++++++++++++++++++ .../RabbitMQTopicsService.cs | 16 ++++-- 6 files changed, 106 insertions(+), 24 deletions(-) create mode 100644 src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQQueuesService.cs diff --git a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/Bootstrapper.cs b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/Bootstrapper.cs index 5d8e024..59589b2 100644 --- a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/Bootstrapper.cs +++ b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/Bootstrapper.cs @@ -44,5 +44,6 @@ public static void AddRabbitMQ( }); services.AddScoped(); + services.AddScoped(); } } diff --git a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/DataTransferObjects.cs b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/DataTransferObjects.cs index 0259dad..0cbc942 100644 --- a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/DataTransferObjects.cs +++ b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/DataTransferObjects.cs @@ -1,9 +1,23 @@ +using System.Runtime.Serialization; using System.Text.Json.Serialization; namespace EventBusExplorer.Server.Infrastructure.RabbitMQ; -internal record ExchangeTopic( +internal record Exchange( string Name, bool Durable, [property: JsonPropertyName("auto_delete")] bool AutoDelete, - string Type = "topic"); \ No newline at end of file + ExchangeType Type); + +[JsonConverter(typeof(JsonStringEnumMemberConverter))] +internal enum ExchangeType +{ + [EnumMember(Value = "topic")] + Topic, + [EnumMember(Value = "direct")] + Direct, + [EnumMember(Value = "fanout")] + Fanout, + [EnumMember(Value = "headers")] + Headers +} diff --git a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/EventBusExplorer.Server.Infrastructure.RabbitMQ.csproj b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/EventBusExplorer.Server.Infrastructure.RabbitMQ.csproj index 4076d87..decf5dc 100644 --- a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/EventBusExplorer.Server.Infrastructure.RabbitMQ.csproj +++ b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/EventBusExplorer.Server.Infrastructure.RabbitMQ.csproj @@ -11,6 +11,7 @@ + diff --git a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQAdministrationClient.cs b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQAdministrationClient.cs index 5b16781..c04df43 100644 --- a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQAdministrationClient.cs +++ b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQAdministrationClient.cs @@ -13,8 +13,9 @@ public RabbitMQAdministrationClient(HttpClient? httpClient) _httpClient = httpClient!; } - internal async Task CreateTopicAsync( + internal async Task CreateExchangeAsync( string? name, + ExchangeType type, string virtualHost = "/", CancellationToken cancellationToken = default) { @@ -24,20 +25,22 @@ internal async Task CreateTopicAsync( string path = GetExchangePath(virtualHost) + $"{Uri.EscapeDataString(name)}"; - ExchangeTopic requestTopic = new( + Exchange exchange = new( Name: name ?? string.Empty, Durable: true, - AutoDelete: false); + AutoDelete: false, + Type: type); HttpResponseMessage response = await _httpClient.PutAsJsonAsync( path, - requestTopic, + exchange, cancellationToken: cancellationToken); await Utils.ThrowExceptionIfUnsuccessfulAsync(response, "PUT", path); } - internal async Task> GetTopicsAsync( + internal async Task> GetExchangesAsync( + ExchangeType type, string virtualHost = "/", CancellationToken cancellationToken = default) { @@ -49,14 +52,15 @@ internal async Task> GetTopicsAsync( await Utils.ThrowExceptionIfUnsuccessfulAsync(response, "GET", path); - List? payload = await response.Content.ReadFromJsonAsync>( - cancellationToken: cancellationToken); + List exchanges = await response.Content.ReadFromJsonAsync>( + cancellationToken: cancellationToken) ?? new List(); - return payload!; + return exchanges.Where(x => x.Type == type).ToList(); } - internal async Task GetTopicAsync( + internal async Task GetExchangeAsync( string name, + ExchangeType type, string virtualHost = "/", CancellationToken cancellationToken = default) { @@ -64,20 +68,20 @@ internal async Task GetTopicAsync( $"{Uri.EscapeDataString(name)}"; HttpResponseMessage response = await _httpClient.GetAsync( - path, - cancellationToken: cancellationToken); + path, + cancellationToken: cancellationToken); await Utils.ThrowExceptionIfUnsuccessfulAsync(response, "GET", path); - ExchangeTopic? topic = await response + Exchange? exchange = await response .Content - .ReadFromJsonAsync( - cancellationToken: cancellationToken); + .ReadFromJsonAsync( + cancellationToken: cancellationToken); - return topic!; + return exchange is not null && exchange.Type == type ? exchange : null; } - internal async Task DeleteTopicAsync( + internal async Task DeleteExchangeAsync( string name, string virtualHost = "/", CancellationToken cancellationToken = default) @@ -96,4 +100,4 @@ private static string GetExchangePath(string virtualHost = "/") { return EXCHANGE_PATH + $"{Uri.EscapeDataString(virtualHost)}/"; } -} \ No newline at end of file +} diff --git a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQQueuesService.cs b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQQueuesService.cs new file mode 100644 index 0000000..5e61b4a --- /dev/null +++ b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQQueuesService.cs @@ -0,0 +1,56 @@ +using EventBusExplorer.Server.Application.ServiceBroker.Abstractions; + +namespace EventBusExplorer.Server.Infrastructure.RabbitMQ; + +public class RabbitMQQueuesService : IServiceBrokerQueuesService +{ + private readonly RabbitMQAdministrationClient _adminClient; + + public RabbitMQQueuesService( + RabbitMQAdministrationClient adminClient + ) + { + _adminClient = adminClient; + } + + public async Task CreateAsync(string? name, CancellationToken cancellationToken = default) + { + await _adminClient.CreateExchangeAsync( + name, + type: ExchangeType.Direct, + cancellationToken: cancellationToken); + + // + // Since exchange creation through management API does not return anything, + // It is assumed that if the request is successful then the exchange has been created. + // + return name!; + } + + public async Task DeleteAsync(string name, CancellationToken cancellationToken = default) + { + await _adminClient.DeleteExchangeAsync(name, cancellationToken: cancellationToken); + } + + public async Task> GetAsync(CancellationToken cancellationToken = default) + { + IList directExchanges = await _adminClient.GetExchangesAsync( + type: ExchangeType.Direct, + cancellationToken: cancellationToken); + + return directExchanges.Select(x => x.Name).ToList(); + } + + public async Task GetAsync(string name, CancellationToken cancellationToken = default) + { + Exchange? directExchange = await _adminClient.GetExchangeAsync( + name, + type: ExchangeType.Direct, + cancellationToken: cancellationToken); + + if (directExchange is null) + throw new Exception($"Cannot find exchange with name: {name}"); //TODO: define or find a suitable exc type + + return directExchange.Name; + } +} diff --git a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQTopicsService.cs b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQTopicsService.cs index a9a0108..56770c5 100644 --- a/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQTopicsService.cs +++ b/src/Server/EventBusExplorer.Server.Infrastructure.RabbitMQ/RabbitMQTopicsService.cs @@ -15,8 +15,9 @@ RabbitMQAdministrationClient adminClient public async Task CreateTopicAsync(string? name, CancellationToken cancellationToken = default) { - await _adminClient.CreateTopicAsync( + await _adminClient.CreateExchangeAsync( name, + type: ExchangeType.Topic, cancellationToken: cancellationToken); // @@ -28,24 +29,29 @@ await _adminClient.CreateTopicAsync( public async Task> GetTopicsAsync(CancellationToken cancellationToken = default) { - IList topics = await _adminClient.GetTopicsAsync( + IList topicExchanges = await _adminClient.GetExchangesAsync( + type: ExchangeType.Topic, cancellationToken: cancellationToken); - return topics.Select(x => x.Name).ToList(); + return topicExchanges.Select(x => x.Name).ToList(); } public async Task GetTopicAsync(string name, CancellationToken cancellationToken = default) { - ExchangeTopic topic = await _adminClient.GetTopicAsync( + Exchange? topic = await _adminClient.GetExchangeAsync( name, + type: ExchangeType.Topic, cancellationToken: cancellationToken); + if (topic is null) + throw new Exception($"Cannot find exchange with name: {name}"); //TODO: define or find a suitable exc type + return topic.Name; } public async Task DeleteTopicAsync(string name, CancellationToken cancellationToken = default) { - await _adminClient.DeleteTopicAsync(name, cancellationToken: cancellationToken); + await _adminClient.DeleteExchangeAsync(name, cancellationToken: cancellationToken); } public Task> GetSubscriptionsAsync(string topicName, CancellationToken cancellationToken = default)