Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ public static void AddRabbitMQ(
});

services.AddScoped<IServiceBrokerTopicsService, RabbitMQTopicsService>();
services.AddScoped<IServiceBrokerQueuesService, RabbitMQQueuesService>();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
using System.Runtime.Serialization;
using System.Text.Json.Serialization;

namespace EventBusExplorer.Server.Infrastructure.RabbitMQ;

internal record ExchangeTopic(
internal record Exchange(
string Name,
bool Durable,
[property: JsonPropertyName("auto_delete")] bool AutoDelete,
string Type = "topic");
ExchangeType Type);

[JsonConverter(typeof(JsonStringEnumMemberConverter))]
internal enum ExchangeType
{
[EnumMember(Value = "topic")]
Topic,
[EnumMember(Value = "direct")]
Direct,
[EnumMember(Value = "fanout")]
Fanout,
[EnumMember(Value = "headers")]
Headers
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Macross.Json.Extensions" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="7.0.4" />
<PackageReference Include="RabbitMQ.Client" Version="6.5.0" />
<PackageReference Include="System.Net.Http.Json" Version="7.0.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ public RabbitMQAdministrationClient(HttpClient? httpClient)
_httpClient = httpClient!;
}

internal async Task CreateTopicAsync(
internal async Task CreateExchangeAsync(
string? name,
ExchangeType type,
string virtualHost = "/",
CancellationToken cancellationToken = default)
{
Expand All @@ -24,20 +25,22 @@ internal async Task CreateTopicAsync(
string path = GetExchangePath(virtualHost) +
$"{Uri.EscapeDataString(name)}";

ExchangeTopic requestTopic = new(
Exchange exchange = new(
Name: name ?? string.Empty,
Durable: true,
AutoDelete: false);
AutoDelete: false,
Type: type);

HttpResponseMessage response = await _httpClient.PutAsJsonAsync(
path,
requestTopic,
exchange,
cancellationToken: cancellationToken);

await Utils.ThrowExceptionIfUnsuccessfulAsync(response, "PUT", path);
}

internal async Task<IList<ExchangeTopic>> GetTopicsAsync(
internal async Task<IList<Exchange>> GetExchangesAsync(
ExchangeType type,
string virtualHost = "/",
CancellationToken cancellationToken = default)
{
Expand All @@ -49,35 +52,36 @@ internal async Task<IList<ExchangeTopic>> GetTopicsAsync(

await Utils.ThrowExceptionIfUnsuccessfulAsync(response, "GET", path);

List<ExchangeTopic>? payload = await response.Content.ReadFromJsonAsync<List<ExchangeTopic>>(
cancellationToken: cancellationToken);
List<Exchange> exchanges = await response.Content.ReadFromJsonAsync<List<Exchange>>(
cancellationToken: cancellationToken) ?? new List<Exchange>();

return payload!;
return exchanges.Where(x => x.Type == type).ToList();
}

internal async Task<ExchangeTopic> GetTopicAsync(
internal async Task<Exchange?> GetExchangeAsync(
string name,
ExchangeType type,
string virtualHost = "/",
CancellationToken cancellationToken = default)
{
string path = GetExchangePath(virtualHost) +
$"{Uri.EscapeDataString(name)}";

HttpResponseMessage response = await _httpClient.GetAsync(
path,
cancellationToken: cancellationToken);
path,
cancellationToken: cancellationToken);

await Utils.ThrowExceptionIfUnsuccessfulAsync(response, "GET", path);

ExchangeTopic? topic = await response
Exchange? exchange = await response
.Content
.ReadFromJsonAsync<ExchangeTopic>(
cancellationToken: cancellationToken);
.ReadFromJsonAsync<Exchange>(
cancellationToken: cancellationToken);

return topic!;
return exchange is not null && exchange.Type == type ? exchange : null;
}

internal async Task DeleteTopicAsync(
internal async Task DeleteExchangeAsync(
string name,
string virtualHost = "/",
CancellationToken cancellationToken = default)
Expand All @@ -96,4 +100,4 @@ private static string GetExchangePath(string virtualHost = "/")
{
return EXCHANGE_PATH + $"{Uri.EscapeDataString(virtualHost)}/";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using EventBusExplorer.Server.Application.ServiceBroker.Abstractions;

namespace EventBusExplorer.Server.Infrastructure.RabbitMQ;

public class RabbitMQQueuesService : IServiceBrokerQueuesService
{
private readonly RabbitMQAdministrationClient _adminClient;

public RabbitMQQueuesService(
RabbitMQAdministrationClient adminClient
)
{
_adminClient = adminClient;
}

public async Task<string> CreateAsync(string? name, CancellationToken cancellationToken = default)
{
await _adminClient.CreateExchangeAsync(
name,
type: ExchangeType.Direct,
cancellationToken: cancellationToken);

//
// Since exchange creation through management API does not return anything,
// It is assumed that if the request is successful then the exchange has been created.
//
return name!;
}

public async Task DeleteAsync(string name, CancellationToken cancellationToken = default)
{
await _adminClient.DeleteExchangeAsync(name, cancellationToken: cancellationToken);
}

public async Task<IList<string>> GetAsync(CancellationToken cancellationToken = default)
{
IList<Exchange> directExchanges = await _adminClient.GetExchangesAsync(
type: ExchangeType.Direct,
cancellationToken: cancellationToken);

return directExchanges.Select(x => x.Name).ToList();
}

public async Task<string> GetAsync(string name, CancellationToken cancellationToken = default)
{
Exchange? directExchange = await _adminClient.GetExchangeAsync(
name,
type: ExchangeType.Direct,
cancellationToken: cancellationToken);

if (directExchange is null)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new Exception($"Cannot find exchange with name: {name}"); //TODO: define or find a suitable exc type

return directExchange.Name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ RabbitMQAdministrationClient adminClient

public async Task<string> CreateTopicAsync(string? name, CancellationToken cancellationToken = default)
{
await _adminClient.CreateTopicAsync(
await _adminClient.CreateExchangeAsync(
name,
type: ExchangeType.Topic,
cancellationToken: cancellationToken);

//
Expand All @@ -28,24 +29,29 @@ await _adminClient.CreateTopicAsync(

public async Task<IList<string>> GetTopicsAsync(CancellationToken cancellationToken = default)
{
IList<ExchangeTopic> topics = await _adminClient.GetTopicsAsync(
IList<Exchange> topicExchanges = await _adminClient.GetExchangesAsync(
type: ExchangeType.Topic,
cancellationToken: cancellationToken);

return topics.Select(x => x.Name).ToList();
return topicExchanges.Select(x => x.Name).ToList();
}

public async Task<string> GetTopicAsync(string name, CancellationToken cancellationToken = default)
{
ExchangeTopic topic = await _adminClient.GetTopicAsync(
Exchange? topic = await _adminClient.GetExchangeAsync(
name,
type: ExchangeType.Topic,
cancellationToken: cancellationToken);

if (topic is null)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Krusty93 probably in this case we want to return at API level something like 204 or 404.
What do you think about?

throw new Exception($"Cannot find exchange with name: {name}"); //TODO: define or find a suitable exc type

return topic.Name;
}

public async Task DeleteTopicAsync(string name, CancellationToken cancellationToken = default)
{
await _adminClient.DeleteTopicAsync(name, cancellationToken: cancellationToken);
await _adminClient.DeleteExchangeAsync(name, cancellationToken: cancellationToken);
}

public Task<IList<string>> GetSubscriptionsAsync(string topicName, CancellationToken cancellationToken = default)
Expand Down