Skip to content

Commit fd98de2

Browse files
authored
Merge pull request #965 from Project-MONAI/AI-288
adding conditions
2 parents 10c9c54 + 97bfa29 commit fd98de2

File tree

8 files changed

+249
-20
lines changed

8 files changed

+249
-20
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
//
2+
// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
17+
using Mongo.Migration.Migrations.Document;
18+
using MongoDB.Bson;
19+
20+
namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
21+
{
22+
public class M002_WorkflowRevision_addVerion : DocumentMigration<WorkflowRevision>
23+
{
24+
public M002_WorkflowRevision_addVerion() : base("1.0.0") { }
25+
26+
public override void Up(BsonDocument document)
27+
{
28+
// empty, but this will make all objects re-saved with a version
29+
}
30+
public override void Down(BsonDocument document)
31+
{
32+
try
33+
{
34+
document.Remove("Version");
35+
}
36+
catch
37+
{ // can ignore we dont want failures stopping startup !
38+
}
39+
}
40+
}
41+
}

src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addDataRetension.cs renamed to src/WorkflowManager/Contracts/Migrations/M003_WorkflowRevision_addDataRetension.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
2222
{
23-
public class M004_WorkflowRevision_AddDataRetension : DocumentMigration<WorkflowRevision>
23+
public class M003_WorkflowRevision_addDataRetension : DocumentMigration<WorkflowRevision>
2424
{
25-
public M004_WorkflowRevision_AddDataRetension() : base("1.0.1") { }
25+
public M003_WorkflowRevision_addDataRetension() : base("1.0.1") { }
2626

2727
public override void Up(BsonDocument document)
2828
{
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
//
2+
// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
17+
using Mongo.Migration.Migrations.Document;
18+
using MongoDB.Bson;
19+
20+
21+
namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
22+
{
23+
public class M004_WorkflowRevision_addConditions : DocumentMigration<WorkflowRevision>
24+
{
25+
public M004_WorkflowRevision_addConditions() : base("1.0.2") { }
26+
27+
public override void Up(BsonDocument document)
28+
{
29+
var workflow = document["Workflow"].AsBsonDocument;
30+
workflow.Add("Conditions", new BsonArray { });
31+
}
32+
33+
public override void Down(BsonDocument document)
34+
{
35+
try
36+
{
37+
var workflow = document["Workflow"].AsBsonDocument;
38+
workflow.Remove("Conditions");
39+
}
40+
catch
41+
{ // can ignore we dont want failures stopping startup !
42+
}
43+
}
44+
}
45+
}

src/WorkflowManager/Contracts/Models/Workflow.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,8 @@ public class Workflow
3939
[JsonProperty(PropertyName = "dataRetentionDays")]
4040
public int? DataRetentionDays { get; set; } = 3;// note. -1 = never delete
4141

42+
[JsonProperty(PropertyName = "conditions")]
43+
public string[] Conditions { get; set; } = [];
44+
4245
}
4346
}

src/WorkflowManager/Contracts/Models/WorkflowRevision.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models
2525
{
26-
[CollectionLocation("Workflows"), RuntimeVersion("1.0.1")]
26+
[CollectionLocation("Workflows"), RuntimeVersion("1.0.2")]
2727
public class WorkflowRevision : ISoftDeleteable, IDocument
2828
{
2929
[BsonId]

src/WorkflowManager/Logging/Log.200000.Workflow.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,5 +114,8 @@ public static partial class Log
114114

115115
[LoggerMessage(EventId = 210019, Level = LogLevel.Error, Message = "Task is missing required input artifacts {taskId} Artifacts {ArtifactsJson}")]
116116
public static partial void TaskIsMissingRequiredInputArtifacts(this ILogger logger, string taskId, string ArtifactsJson);
117+
118+
[LoggerMessage(EventId = 200020, Level = LogLevel.Warning, Message = "no workflow to execute for the given workflow request.")]
119+
public static partial void DidntToCreateWorkflowInstances(this ILogger logger);
117120
}
118121
}

src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,13 @@ public async Task<bool> ProcessPayload(WorkflowRequestEvent message, Payload pay
146146

147147
var tasks = workflows.Select(workflow => CreateWorkflowInstanceAsync(message, workflow));
148148
var newInstances = await Task.WhenAll(tasks).ConfigureAwait(false);
149+
150+
if (newInstances is null || newInstances.Length == 0 || newInstances[0] is null) // if null then it because it didnt meet the conditions needed to create a workflow instance
151+
{
152+
_logger.DidntToCreateWorkflowInstances();
153+
return false;
154+
}
155+
149156
workflowInstances.AddRange(newInstances);
150157

151158
var existingInstances = await _workflowInstanceRepository.GetByWorkflowsIdsAsync(workflowInstances.Select(w => w.WorkflowId).ToList());
@@ -1103,29 +1110,34 @@ private async Task<bool> ClinicalReviewTimeOutEvent(WorkflowInstance workflowIns
11031110
return true;
11041111
}
11051112

1106-
private async Task<WorkflowInstance> CreateWorkflowInstanceAsync(WorkflowRequestEvent message, WorkflowRevision workflow)
1113+
private async Task<WorkflowInstance?> CreateWorkflowInstanceAsync(WorkflowRequestEvent message, WorkflowRevision workflow)
11071114
{
11081115
ArgumentNullException.ThrowIfNull(message, nameof(message));
11091116
ArgumentNullException.ThrowIfNull(workflow, nameof(workflow));
11101117
ArgumentNullException.ThrowIfNull(workflow.Workflow, nameof(workflow.Workflow));
11111118

1112-
var workflowInstanceId = Guid.NewGuid().ToString();
1119+
var workflowInstance = MakeInstance(message, workflow);
11131120

1114-
var workflowInstance = new WorkflowInstance()
1121+
// check if the conditionals allow the workflow to be created
1122+
1123+
if (workflow.Workflow.Conditions.Length != 0)
11151124
{
1116-
Id = workflowInstanceId,
1117-
WorkflowId = workflow.WorkflowId,
1118-
WorkflowName = workflow.Workflow.Name,
1119-
PayloadId = message.PayloadId.ToString(),
1120-
StartTime = DateTime.UtcNow,
1121-
Status = Status.Created,
1122-
AeTitle = workflow.Workflow?.InformaticsGateway?.AeTitle,
1123-
BucketId = message.Bucket,
1124-
InputMetaData = { } //Functionality to be added later
1125-
};
1125+
var conditionalMet = _conditionalParameterParser.TryParse(workflow.Workflow.Conditions, workflowInstance, out var resolvedConditional);
1126+
if (conditionalMet is false)
1127+
{
1128+
return null;
1129+
}
1130+
}
1131+
1132+
await CreateTaskExecutionForFirstTask(message, workflow, workflowInstance);
11261133

1134+
return workflowInstance;
1135+
}
1136+
1137+
private async Task CreateTaskExecutionForFirstTask(WorkflowRequestEvent message, WorkflowRevision workflow, WorkflowInstance workflowInstance)
1138+
{
11271139
var tasks = new List<TaskExecution>();
1128-
// part of this ticket just take the first task
1140+
11291141
if (workflow?.Workflow?.Tasks.Length > 0)
11301142
{
11311143
var firstTask = workflow.Workflow.Tasks.First();
@@ -1141,7 +1153,24 @@ private async Task<WorkflowInstance> CreateWorkflowInstanceAsync(WorkflowRequest
11411153
}
11421154

11431155
workflowInstance.Tasks = tasks;
1156+
}
11441157

1158+
private static WorkflowInstance MakeInstance(WorkflowRequestEvent message, WorkflowRevision workflow)
1159+
{
1160+
var workflowInstanceId = Guid.NewGuid().ToString();
1161+
1162+
var workflowInstance = new WorkflowInstance()
1163+
{
1164+
Id = workflowInstanceId,
1165+
WorkflowId = workflow.WorkflowId,
1166+
WorkflowName = workflow.Workflow.Name,
1167+
PayloadId = message.PayloadId.ToString(),
1168+
StartTime = DateTime.UtcNow,
1169+
Status = Status.Created,
1170+
AeTitle = workflow.Workflow?.InformaticsGateway?.AeTitle,
1171+
BucketId = message.Bucket,
1172+
InputMetaData = { } //Functionality to be added later
1173+
};
11451174
return workflowInstance;
11461175
}
11471176

tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class WorkflowExecuterServiceTests
6767
private readonly IOptions<WorkflowManagerOptions> _configuration;
6868
private readonly IOptions<StorageServiceConfiguration> _storageConfiguration;
6969
private readonly Mock<ITaskExecutionStatsRepository> _taskExecutionStatsRepository;
70+
private readonly Mock<IDicomService> _dicom = new Mock<IDicomService>();
7071
private readonly int _timeoutForTypeTask = 999;
7172
private readonly int _timeoutForDefault = 966;
7273

@@ -98,11 +99,10 @@ public WorkflowExecuterServiceTests()
9899

99100
_storageConfiguration = Options.Create(new StorageServiceConfiguration() { Settings = new Dictionary<string, string> { { "bucket", "testbucket" }, { "endpoint", "localhost" }, { "securedConnection", "False" } } });
100101

101-
var dicom = new Mock<IDicomService>();
102102
var logger = new Mock<ILogger<ConditionalParameterParser>>();
103103

104104
var conditionalParser = new ConditionalParameterParser(logger.Object,
105-
dicom.Object,
105+
_dicom.Object,
106106
_workflowInstanceService.Object,
107107
_payloadService.Object,
108108
_workflowService.Object);
@@ -3868,7 +3868,115 @@ public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs()
38683868

38693869
#pragma warning restore CS8604 // Possible null reference argument.
38703870
}
3871-
}
38723871

3872+
[Fact]
3873+
public async Task ProcessPayload_With_Failing_Workflow_Conditional_Should_Not_Procced()
3874+
{
3875+
var workflowRequest = new WorkflowRequestEvent
3876+
{
3877+
Bucket = "testbucket",
3878+
DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" },
3879+
CorrelationId = Guid.NewGuid().ToString(),
3880+
Timestamp = DateTime.UtcNow
3881+
};
3882+
3883+
var workflows = new List<WorkflowRevision>
3884+
{
3885+
new() {
3886+
Id = Guid.NewGuid().ToString(),
3887+
WorkflowId = Guid.NewGuid().ToString(),
3888+
Revision = 1,
3889+
Workflow = new Workflow
3890+
{
3891+
Name = "Workflowname",
3892+
Description = "Workflowdesc",
3893+
Version = "1",
3894+
InformaticsGateway = new InformaticsGateway
3895+
{
3896+
AeTitle = "aetitle"
3897+
},
3898+
Tasks =
3899+
[
3900+
new TaskObject {
3901+
Id = Guid.NewGuid().ToString(),
3902+
Type = "type",
3903+
Description = "taskdesc"
3904+
}
3905+
],
3906+
Conditions = ["{{ context.dicom.series.any('0010','0040') }} == 'lordge'"]
3907+
}
3908+
}
3909+
};
3910+
3911+
_workflowRepository.Setup(w => w.GetWorkflowsByAeTitleAsync(It.IsAny<List<string>>())).ReturnsAsync(workflows);
3912+
_workflowRepository.Setup(w => w.GetWorkflowsForWorkflowRequestAsync(It.IsAny<string>(), It.IsAny<string>())).ReturnsAsync(workflows);
3913+
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflows[0].WorkflowId)).ReturnsAsync(workflows[0]);
3914+
_workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny<List<WorkflowInstance>>())).ReturnsAsync(true);
3915+
_workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny<List<string>>())).ReturnsAsync(new List<WorkflowInstance>());
3916+
_workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<TaskExecutionStatus>())).ReturnsAsync(true);
3917+
3918+
var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() });
3919+
3920+
_messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.TaskDispatchRequest, It.IsAny<Message>()), Times.Never());
3921+
3922+
Assert.False(result);
3923+
}
3924+
3925+
[Fact]
3926+
public async Task ProcessPayload_With_Passing_Workflow_Conditional_Should_Procced()
3927+
{
3928+
var workflowRequest = new WorkflowRequestEvent
3929+
{
3930+
Bucket = "testbucket",
3931+
DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" },
3932+
CorrelationId = Guid.NewGuid().ToString(),
3933+
Timestamp = DateTime.UtcNow
3934+
};
3935+
3936+
var workflows = new List<WorkflowRevision>
3937+
{
3938+
new() {
3939+
Id = Guid.NewGuid().ToString(),
3940+
WorkflowId = Guid.NewGuid().ToString(),
3941+
Revision = 1,
3942+
Workflow = new Workflow
3943+
{
3944+
Name = "Workflowname",
3945+
Description = "Workflowdesc",
3946+
Version = "1",
3947+
InformaticsGateway = new InformaticsGateway
3948+
{
3949+
AeTitle = "aetitle"
3950+
},
3951+
Tasks =
3952+
[
3953+
new TaskObject {
3954+
Id = Guid.NewGuid().ToString(),
3955+
Type = "type",
3956+
Description = "taskdesc"
3957+
}
3958+
],
3959+
Conditions = ["{{ context.dicom.series.any('0010','0040') }} == 'lordge'"]
3960+
}
3961+
}
3962+
};
3963+
3964+
_dicom.Setup(w => w.GetAnyValueAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
3965+
.ReturnsAsync(() => "lordge");
3966+
3967+
_workflowRepository.Setup(w => w.GetWorkflowsByAeTitleAsync(It.IsAny<List<string>>())).ReturnsAsync(workflows);
3968+
_workflowRepository.Setup(w => w.GetWorkflowsForWorkflowRequestAsync(It.IsAny<string>(), It.IsAny<string>())).ReturnsAsync(workflows);
3969+
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflows[0].WorkflowId)).ReturnsAsync(workflows[0]);
3970+
_workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny<List<WorkflowInstance>>())).ReturnsAsync(true);
3971+
_workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny<List<string>>())).ReturnsAsync(new List<WorkflowInstance>());
3972+
_workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<TaskExecutionStatus>())).ReturnsAsync(true);
3973+
3974+
var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() });
3975+
3976+
_messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.TaskDispatchRequest, It.IsAny<Message>()), Times.Once());
3977+
3978+
Assert.True(result);
3979+
}
3980+
}
38733981
#pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type.
38743982
}

0 commit comments

Comments
 (0)