-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
RabbitMQ client version upgrade #1367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
nurlanmikayilov
wants to merge
5
commits into
danielgerlag:master
Choose a base branch
from
nurlanmikayilov:feature/rabbitmq-client-version-upgrade
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
4433bce
fix: #1344 convert destObj to the correct property type before settin…
nurlanmmikayilov 8508e54
test: add integration test for handling complex input property in wor…
nurlanmmikayilov 4be6bdd
feat: add support for list and array of complex input types in workflow
nurlanmmikayilov 176a4b4
fix: initialize AssigneeList and AssigneeArray to empty collections
nurlanmmikayilov 9caaf03
feat: upgrade RabbitMQ client to version 7.1.2 and refactor connectio…
nurlanmmikayilov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,5 +1,4 @@ | ||||||||||||||||||||||||||||
using Newtonsoft.Json; | ||||||||||||||||||||||||||||
using RabbitMQ.Client; | ||||||||||||||||||||||||||||
using RabbitMQ.Client; | ||||||||||||||||||||||||||||
using System; | ||||||||||||||||||||||||||||
using System.Linq; | ||||||||||||||||||||||||||||
using System.Text; | ||||||||||||||||||||||||||||
|
@@ -17,9 +16,8 @@ public class RabbitMQProvider : IQueueProvider | |||||||||||||||||||||||||||
private readonly IRabbitMqQueueNameProvider _queueNameProvider; | ||||||||||||||||||||||||||||
private readonly RabbitMqConnectionFactory _rabbitMqConnectionFactory; | ||||||||||||||||||||||||||||
private readonly IServiceProvider _serviceProvider; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
private IConnection _connection = null; | ||||||||||||||||||||||||||||
private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
private IConnection _connection; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
public bool IsDequeueBlocking => false; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
@@ -37,11 +35,13 @@ public async Task QueueWork(string id, QueueType queue) | |||||||||||||||||||||||||||
if (_connection == null) | ||||||||||||||||||||||||||||
throw new InvalidOperationException("RabbitMQ provider not running"); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
using (var channel = _connection.CreateModel()) | ||||||||||||||||||||||||||||
using (var channel = await _connection.CreateChannelAsync()) | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, autoDelete: false, arguments: null); | ||||||||||||||||||||||||||||
await channel.QueueDeclareAsync(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, | ||||||||||||||||||||||||||||
autoDelete: false, arguments: null); | ||||||||||||||||||||||||||||
var body = Encoding.UTF8.GetBytes(id); | ||||||||||||||||||||||||||||
channel.BasicPublish(exchange: "", routingKey: _queueNameProvider.GetQueueName(queue), basicProperties: null, body: body); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
await channel.BasicPublishAsync("", _queueNameProvider.GetQueueName(queue), false,body); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
@@ -50,34 +50,35 @@ public async Task<string> DequeueWork(QueueType queue, CancellationToken cancell | |||||||||||||||||||||||||||
if (_connection == null) | ||||||||||||||||||||||||||||
throw new InvalidOperationException("RabbitMQ provider not running"); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
using (var channel = _connection.CreateModel()) | ||||||||||||||||||||||||||||
using (var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken)) | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue), | ||||||||||||||||||||||||||||
durable: true, | ||||||||||||||||||||||||||||
exclusive: false, | ||||||||||||||||||||||||||||
autoDelete: false, | ||||||||||||||||||||||||||||
arguments: null); | ||||||||||||||||||||||||||||
await channel.QueueDeclareAsync(queue: _queueNameProvider.GetQueueName(queue), | ||||||||||||||||||||||||||||
durable: true, | ||||||||||||||||||||||||||||
exclusive: false, | ||||||||||||||||||||||||||||
autoDelete: false, | ||||||||||||||||||||||||||||
arguments: null, cancellationToken: cancellationToken); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, | ||||||||||||||||||||||||||||
cancellationToken: cancellationToken); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); | ||||||||||||||||||||||||||||
var msg = await channel.BasicGetAsync(_queueNameProvider.GetQueueName(queue), false, cancellationToken); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
var msg = channel.BasicGet(_queueNameProvider.GetQueueName(queue), false); | ||||||||||||||||||||||||||||
if (msg != null) | ||||||||||||||||||||||||||||
if (msg == null) | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
var data = Encoding.UTF8.GetString(msg.Body.ToArray()); | ||||||||||||||||||||||||||||
channel.BasicAck(msg.DeliveryTag, false); | ||||||||||||||||||||||||||||
return data; | ||||||||||||||||||||||||||||
return null; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
return null; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
var data = Encoding.UTF8.GetString(msg.Body.ToArray()); | ||||||||||||||||||||||||||||
await channel.BasicAckAsync(msg.DeliveryTag, false, cancellationToken); | ||||||||||||||||||||||||||||
return data; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
public void Dispose() | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
if (_connection != null) | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
if (_connection.IsOpen) | ||||||||||||||||||||||||||||
_connection.Close(); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
if (_connection == null) return; | ||||||||||||||||||||||||||||
if (_connection.IsOpen) | ||||||||||||||||||||||||||||
_connection.CloseAsync(); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Comment on lines
+81
to
83
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The CloseAsync() method call is not awaited. This should be 'await _connection.CloseAsync();' or the method should be synchronous if async is not intended in Dispose().
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||||||||||||||
public async Task Start() | ||||||||||||||||||||||||||||
|
@@ -89,11 +90,10 @@ public async Task Stop() | |||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
if (_connection != null) | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
_connection.Close(); | ||||||||||||||||||||||||||||
await _connection.CloseAsync(); | ||||||||||||||||||||||||||||
_connection = null; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using WorkflowCore.TestAssets.Steps; | ||
|
||
namespace WorkflowCore.TestAssets.DataTypes; | ||
|
||
public class FlowData | ||
{ | ||
public AssigneeInfo Assignee { get; set; } = new(); | ||
public List<AssigneeInfo> AssigneeList { get; set; } = []; | ||
public AssigneeInfo[] AssigneeArray { get; set; } = []; | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using WorkflowCore.Interface; | ||
using WorkflowCore.Models; | ||
using WorkflowCore.TestAssets.DataTypes; | ||
|
||
namespace WorkflowCore.TestAssets.Steps; | ||
|
||
public class AssignTask : StepBody | ||
{ | ||
public AssigneeInfo Assignee { get; set; } | ||
public List<AssigneeInfo> AssigneeList { get; set; } = []; | ||
|
||
public AssigneeInfo[] AssigneeArray { get; set; } = []; | ||
|
||
public override ExecutionResult Run(IStepExecutionContext context) | ||
{ | ||
if (context.Workflow.Data is FlowData flowData) | ||
{ | ||
if (Assignee != null) | ||
{ | ||
flowData.Assignee = new AssigneeInfo | ||
{ | ||
Id = Assignee.Id, | ||
Name = Assignee.Name, | ||
MemberType = Assignee.MemberType, | ||
UnitInfo = Assignee.UnitInfo | ||
}; | ||
} | ||
|
||
flowData.AssigneeList.AddRange(AssigneeList); | ||
flowData.AssigneeArray = AssigneeArray.ToArray(); | ||
} | ||
return ExecutionResult.Next(); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using GetAwaiter().GetResult() can lead to deadlocks in certain contexts. Consider using async/await pattern or ConfigureAwait(false) to avoid potential deadlock scenarios.
Copilot uses AI. Check for mistakes.