diff --git a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs index c5cc5e083..bff2b1725 100644 --- a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs +++ b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs @@ -201,13 +201,21 @@ private void AttachInputs(StepSourceV1 source, Type dataType, Type stepType, Wor continue; } - if ((input.Value is IDictionary) || (input.Value is IDictionary)) + if (input.Value is IDictionary || input.Value is IDictionary) { var acn = BuildObjectInputAction(input, dataParameter, contextParameter, environmentVarsParameter, stepProperty); step.Inputs.Add(new ActionParameter(acn)); continue; } + if (input.Value is IEnumerable list) + { + var acn = BuildListInputAction(list, dataParameter, contextParameter, environmentVarsParameter, + stepProperty); + step.Inputs.Add(new ActionParameter(acn)); + continue; + } + throw new ArgumentException($"Unknown type for input {input.Key} on {source.Id}"); } } @@ -253,7 +261,7 @@ private void AttachDirectlyOutput(KeyValuePair output, WorkflowS Action acn = (pStep, pData) => { - object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ; + object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); propertyInfo.SetValue(pData, resolvedValue, new object[] { output.Key }); }; @@ -306,7 +314,7 @@ private void AttachNestedOutput(KeyValuePair output, WorkflowSte { var targetExpr = Expression.Lambda(memberExpression, dataParameter); object data = targetExpr.Compile().DynamicInvoke(pData); - object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ; + object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); propertyInfo.SetValue(data, resolvedValue, new object[] { items[1] }); }; @@ -379,6 +387,80 @@ void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) return acn; } + private static Action BuildListInputAction( + IEnumerable input, + ParameterExpression dataParameter, + ParameterExpression contextParameter, + ParameterExpression environmentVarsParameter, + PropertyInfo stepProperty) + { + void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) + { + if (input == null) + throw new ArgumentNullException(nameof(input)); + + var itemType = stepProperty.PropertyType.IsArray + ? stepProperty.PropertyType.GetElementType() + : stepProperty.PropertyType.GetGenericArguments().FirstOrDefault(); + + if (itemType == null) + throw new InvalidOperationException("Unable to determine the item type for stepProperty."); + + var processedItems = new List(); + + foreach (var item in input) + { + var obj = JObject.FromObject(item); + var stack = new Stack(); + stack.Push(obj); + + while (stack.Count > 0) + { + var current = stack.Pop(); + foreach (var prop in current.Properties().ToList()) + { + if (prop.Name.StartsWith("@")) + { + var expr = DynamicExpressionParser.ParseLambda( + new[] { dataParameter, contextParameter, environmentVarsParameter }, + typeof(object), + prop.Value.ToString()); + + var resolved = expr.Compile().DynamicInvoke(pData, pContext, + Environment.GetEnvironmentVariables()); + current.Remove(prop.Name); + current.Add(prop.Name.TrimStart('@'), JToken.FromObject(resolved)); + } + } + + foreach (var child in current.Children()) + stack.Push(child); + } + + processedItems.Add(obj.ToObject(itemType)); + } + + if (stepProperty.PropertyType.IsArray) + { + var array = Array.CreateInstance(itemType, processedItems.Count); + for (var i = 0; i < processedItems.Count; i++) + array.SetValue(processedItems[i], i); + stepProperty.SetValue(pStep, array); + } + else + { + var listInstance = Activator.CreateInstance(typeof(List<>).MakeGenericType(itemType)); + var addMethod = listInstance.GetType().GetMethod("Add"); + foreach (var item in processedItems) + addMethod?.Invoke(listInstance, new[] { item }); + + stepProperty.SetValue(pStep, listInstance); + } + } + + return acn; + } + private static Action BuildObjectInputAction(KeyValuePair input, ParameterExpression dataParameter, ParameterExpression contextParameter, ParameterExpression environmentVarsParameter, PropertyInfo stepProperty) { void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) @@ -405,7 +487,7 @@ void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) stack.Push(child); } - stepProperty.SetValue(pStep, destObj); + stepProperty.SetValue(pStep, destObj.ToObject(stepProperty.PropertyType)); } return acn; } diff --git a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs index 4474e403a..86a1c5b7c 100644 --- a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs +++ b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection.Extensions; using WorkflowCore.Interface; using WorkflowCore.Models; @@ -19,8 +20,7 @@ public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, IConnect if (options == null) throw new ArgumentNullException(nameof(options)); if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory)); - return options - .UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(name)); + return options.UseRabbitMQ(async (sp, name) => await connectionFactory.CreateConnectionAsync(name)); } public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, @@ -31,16 +31,23 @@ public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory)); if (hostnames == null) throw new ArgumentNullException(nameof(hostnames)); - return options - .UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(hostnames.ToList(), name)); + return options.UseRabbitMQ(async (sp, name) => await connectionFactory.CreateConnectionAsync(hostnames.ToList(), name)); } - - public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, RabbitMqConnectionFactory rabbitMqConnectionFactory) + + private static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, Func> rabbitMqConnectionFactory) { if (options == null) throw new ArgumentNullException(nameof(options)); if (rabbitMqConnectionFactory == null) throw new ArgumentNullException(nameof(rabbitMqConnectionFactory)); options.Services.AddSingleton(rabbitMqConnectionFactory); + + options.Services.AddSingleton( + sp => (provider, name) => + { + var connection = rabbitMqConnectionFactory(provider, name).GetAwaiter().GetResult(); + return connection; + }); + options.Services.TryAddSingleton(); options.UseQueueProvider(RabbitMqQueueProviderFactory); diff --git a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs index 8a4ab33bb..199c308d7 100644 --- a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs +++ b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs @@ -1,5 +1,4 @@ -using Newtonsoft.Json; -using RabbitMQ.Client; +using RabbitMQ.Client; using System; using System.Linq; using System.Text; @@ -17,9 +16,8 @@ public class RabbitMQProvider : IQueueProvider private readonly IRabbitMqQueueNameProvider _queueNameProvider; private readonly RabbitMqConnectionFactory _rabbitMqConnectionFactory; private readonly IServiceProvider _serviceProvider; - - private IConnection _connection = null; - private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }; + + private IConnection _connection; public bool IsDequeueBlocking => false; @@ -37,11 +35,13 @@ public async Task QueueWork(string id, QueueType queue) if (_connection == null) throw new InvalidOperationException("RabbitMQ provider not running"); - using (var channel = _connection.CreateModel()) + using (var channel = await _connection.CreateChannelAsync()) { - channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, autoDelete: false, arguments: null); + await channel.QueueDeclareAsync(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, + autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(id); - channel.BasicPublish(exchange: "", routingKey: _queueNameProvider.GetQueueName(queue), basicProperties: null, body: body); + + await channel.BasicPublishAsync("", _queueNameProvider.GetQueueName(queue), false,body); } } @@ -50,34 +50,35 @@ public async Task DequeueWork(QueueType queue, CancellationToken cancell if (_connection == null) throw new InvalidOperationException("RabbitMQ provider not running"); - using (var channel = _connection.CreateModel()) + using (var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken)) { - channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue), - durable: true, - exclusive: false, - autoDelete: false, - arguments: null); + await channel.QueueDeclareAsync(queue: _queueNameProvider.GetQueueName(queue), + durable: true, + exclusive: false, + autoDelete: false, + arguments: null, cancellationToken: cancellationToken); + + await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, + cancellationToken: cancellationToken); - channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); + var msg = await channel.BasicGetAsync(_queueNameProvider.GetQueueName(queue), false, cancellationToken); - var msg = channel.BasicGet(_queueNameProvider.GetQueueName(queue), false); - if (msg != null) + if (msg == null) { - var data = Encoding.UTF8.GetString(msg.Body.ToArray()); - channel.BasicAck(msg.DeliveryTag, false); - return data; + return null; } - return null; + + var data = Encoding.UTF8.GetString(msg.Body.ToArray()); + await channel.BasicAckAsync(msg.DeliveryTag, false, cancellationToken); + return data; } } - + public void Dispose() { - if (_connection != null) - { - if (_connection.IsOpen) - _connection.Close(); - } + if (_connection == null) return; + if (_connection.IsOpen) + _connection.CloseAsync(); } public async Task Start() @@ -89,11 +90,10 @@ public async Task Stop() { if (_connection != null) { - _connection.Close(); + await _connection.CloseAsync(); _connection = null; } } - } #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously -} +} \ No newline at end of file diff --git a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj index df02daa76..8c7707439 100644 --- a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj +++ b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj @@ -23,7 +23,7 @@ - + diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs index 383eb39a4..f1e35ee98 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs @@ -84,5 +84,53 @@ public void should_execute_json_workflow_with_dynamic_data() data["Counter6"].Should().Be(1); data["Counter10"].Should().Be(1); } + + + [Fact] + public void should_execute_json_workflow_with_complex_input_type() + { + var initialData = new FlowData(); + var workflowId = StartWorkflow(TestAssets.Utils.GetTestDefinitionJsonComplexInputProperty(), initialData); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + var data = GetData(workflowId); + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(0); + data.Assignee.Should().NotBeNull(); + data.Assignee.Name.Should().Be("John Doe"); + data.Assignee.UnitInfo.Should().NotBeNull(); + data.Assignee.UnitInfo.Name.Should().Be("IT Department"); + + } + + [Fact] + public void should_execute_json_workflow_with_list_of_complex_input_type() + { + var initialData = new FlowData(); + var workflowId = StartWorkflow(TestAssets.Utils.GetTestDefinitionJsonListOfComplexInputProperty(), initialData); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + var data = GetData(workflowId); + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(0); + + data.AssigneeList.Should().NotBeNullOrEmpty(); + data.AssigneeList.Count.Should().Be(2); + + data.Assignee.Should().NotBeNull(); + data.Assignee.Name.Should().Be("John Doe"); + data.Assignee.UnitInfo?.Name.Should().Be("IT Department"); + + data.AssigneeList[0]?.Name.Should().Be(@"Nurlan Mikayilov"); + data.AssigneeList[0]?.UnitInfo?.Name.Should().Be("IT Department"); + + data.AssigneeList[1]?.Name.Should().Be(@"Jala Mammadova"); + data.AssigneeList[1]?.UnitInfo?.Name.Should().Be("General Department"); + + data.AssigneeArray[0]?.Name.Should().Be(@"Amin Nabiyev"); + data.AssigneeArray[0]?.UnitInfo?.Name.Should().Be("IT Department"); + + + } } } diff --git a/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs b/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs new file mode 100644 index 000000000..eaac2981b --- /dev/null +++ b/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using WorkflowCore.TestAssets.Steps; + +namespace WorkflowCore.TestAssets.DataTypes; + +public class FlowData +{ + public AssigneeInfo Assignee { get; set; } = new(); + public List AssigneeList { get; set; } = []; + public AssigneeInfo[] AssigneeArray { get; set; } = []; +} \ No newline at end of file diff --git a/test/WorkflowCore.TestAssets/Steps/AssignTask.cs b/test/WorkflowCore.TestAssets/Steps/AssignTask.cs new file mode 100644 index 000000000..eb2159494 --- /dev/null +++ b/test/WorkflowCore.TestAssets/Steps/AssignTask.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.TestAssets.DataTypes; + +namespace WorkflowCore.TestAssets.Steps; + +public class AssignTask : StepBody +{ + public AssigneeInfo Assignee { get; set; } + public List AssigneeList { get; set; } = []; + + public AssigneeInfo[] AssigneeArray { get; set; } = []; + + public override ExecutionResult Run(IStepExecutionContext context) + { + if (context.Workflow.Data is FlowData flowData) + { + if (Assignee != null) + { + flowData.Assignee = new AssigneeInfo + { + Id = Assignee.Id, + Name = Assignee.Name, + MemberType = Assignee.MemberType, + UnitInfo = Assignee.UnitInfo + }; + } + + flowData.AssigneeList.AddRange(AssigneeList); + flowData.AssigneeArray = AssigneeArray.ToArray(); + } + return ExecutionResult.Next(); + } +} \ No newline at end of file diff --git a/test/WorkflowCore.TestAssets/Steps/AssigneeInfo.cs b/test/WorkflowCore.TestAssets/Steps/AssigneeInfo.cs new file mode 100644 index 000000000..42901dbab --- /dev/null +++ b/test/WorkflowCore.TestAssets/Steps/AssigneeInfo.cs @@ -0,0 +1,20 @@ +using System; +using System.Linq; + +namespace WorkflowCore.TestAssets.Steps; + +public class AssigneeInfo +{ + public int Id { get; set; } + public string Name { get; set; } + public int MemberType { get; set; } + + public UnitInfo UnitInfo { get; set; } +} + +public class UnitInfo +{ + public int Id { get; set; } + public string Name { get; set; } + public int UnitType { get; set; } +} \ No newline at end of file diff --git a/test/WorkflowCore.TestAssets/Utils.cs b/test/WorkflowCore.TestAssets/Utils.cs index b7a7919cd..adf6df2c9 100644 --- a/test/WorkflowCore.TestAssets/Utils.cs +++ b/test/WorkflowCore.TestAssets/Utils.cs @@ -39,6 +39,17 @@ public static string GetTestDefinitionJsonMissingInputProperty() { return File.ReadAllText("stored-def-missing-input-property.json"); } + + + public static string GetTestDefinitionJsonComplexInputProperty() + { + return File.ReadAllText("def-complex-input-property.json"); + } + + public static string GetTestDefinitionJsonListOfComplexInputProperty() + { + return File.ReadAllText("def-list-of-complex-input-property.json"); + } } } diff --git a/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj b/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj index cbba0dccb..4c7a862a8 100644 --- a/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj +++ b/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj @@ -13,9 +13,13 @@ + + + Always + Always @@ -31,6 +35,9 @@ Always + + Always + diff --git a/test/WorkflowCore.TestAssets/def-complex-input-property.json b/test/WorkflowCore.TestAssets/def-complex-input-property.json new file mode 100644 index 000000000..8849f0b27 --- /dev/null +++ b/test/WorkflowCore.TestAssets/def-complex-input-property.json @@ -0,0 +1,23 @@ +{ + "Id": "FlowWithComplexInputType", + "Version": 1, + "DataType": "WorkflowCore.TestAssets.DataTypes.FlowData, WorkflowCore.TestAssets", + "Steps": [ + { + "Id": "AssignTask", + "StepType": "WorkflowCore.TestAssets.Steps.AssignTask, WorkflowCore.TestAssets", + "Inputs": { + "Assignee": { + "id": 1, + "name": "John Doe", + "memberType": 1, + "unitInfo": { + "id": 1, + "name": "IT Department", + "unitType": 1 + } + } + } + } + ] +} \ No newline at end of file diff --git a/test/WorkflowCore.TestAssets/def-list-of-complex-input-property.json b/test/WorkflowCore.TestAssets/def-list-of-complex-input-property.json new file mode 100644 index 000000000..f4508fba7 --- /dev/null +++ b/test/WorkflowCore.TestAssets/def-list-of-complex-input-property.json @@ -0,0 +1,57 @@ +{ + "Id": "FlowWithListOfComplexInputType", + "Version": 1, + "DataType": "WorkflowCore.TestAssets.DataTypes.FlowData, WorkflowCore.TestAssets", + "Steps": [ + { + "Id": "AssignTask", + "StepType": "WorkflowCore.TestAssets.Steps.AssignTask, WorkflowCore.TestAssets", + "Inputs": { + "AssigneeList": [ + { + "id": 1, + "name": "Nurlan Mikayilov", + "memberType": 1, + "unitInfo": { + "id": 2, + "name": "IT Department", + "unitType": 1 + } + }, + { + "id": 2, + "name": "Jala Mammadova", + "memberType": 2, + "unitInfo": { + "id": 1, + "name": "General Department", + "unitType": 1 + } + } + ], + "AssigneeArray": [ + { + "id": 3, + "name": "Amin Nabiyev", + "memberType": 1, + "unitInfo": { + "id": 2, + "name": "IT Department", + "unitType": 1 + } + } + ], + "Assignee": { + "id": 1, + "name": "John Doe", + "memberType": 1, + "unitInfo": { + "id": 1, + "name": "IT Department", + "unitType": 1 + } + } + } + } + ] +} \ No newline at end of file