Skip to content

Commit 8b97086

Browse files
authored
Merge pull request #894 from Project-MONAI/AI-230
Ai 230
2 parents 6f94803 + df25fa2 commit 8b97086

34 files changed

+1445
-102
lines changed

src/Common/Configuration/MessageBrokerConfigurationKeys.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,8 @@ public class MessageBrokerConfigurationKeys
8888

8989
[ConfigurationKeyName("notificationEmailCancelation")]
9090
public string NotificationEmailCancelation { get; set; } = "aide.notification_email.cancellation";
91+
92+
[ConfigurationKeyName("artifactrecieved")]
93+
public string ArtifactRecieved { get; set; } = "md.workflow.artifactrecieved";
9194
}
9295
}

src/Monai.Deploy.WorkflowManager.sln.DotSettings

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
2+
<s:Boolean x:Key="/Default/CodeInspection/Highlighting/ConvertIfStatementToSwitchStatement/AssumeOpenTypeHierarchy/@EntryValue">True</s:Boolean>
23
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=AR/@EntryIndexedValue">AR</s:String>
34
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=AS/@EntryIndexedValue">AS</s:String>
45
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ASMT/@EntryIndexedValue">ASMT</s:String>

src/TaskManager/TaskManager/TaskManager.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -241,15 +241,17 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
241241
}
242242

243243
var pluginAssembly = string.Empty;
244-
ITaskPlugin? taskRunner = null;
244+
ITaskPlugin? taskRunner;
245245
try
246246
{
247247
var taskExecution = await _taskDispatchEventService.GetByTaskExecutionIdAsync(message.Body.ExecutionId).ConfigureAwait(false);
248-
pluginAssembly = _options.Value.TaskManager.PluginAssemblyMappings[taskExecution?.Event.TaskPluginType] ?? string.Empty;
249-
var taskExecEvent = taskExecution?.Event;
250-
if (taskExecEvent == null)
248+
249+
var taskExecEvent = taskExecution?.Event ?? throw new InvalidOperationException("Task Event data not found.");
250+
251+
pluginAssembly = string.Empty;
252+
if (_options.Value.TaskManager.PluginAssemblyMappings.ContainsKey(taskExecution?.Event.TaskPluginType))
251253
{
252-
throw new InvalidOperationException("Task Event data not found.");
254+
pluginAssembly = _options.Value.TaskManager.PluginAssemblyMappings[taskExecution?.Event.TaskPluginType];
253255
}
254256

255257
taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2022 MONAI Consortium
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System.Collections.Generic;
18+
using System.Threading.Tasks;
19+
using Artifact = Monai.Deploy.Messaging.Common.Artifact;
20+
21+
namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories
22+
{
23+
public interface IArtifactsRepository
24+
{
25+
/// <summary>
26+
/// Gets All ArtifactsReceivedItems by workflowInstance and taskId.
27+
/// </summary>
28+
/// <param name="workflowInstance"></param>
29+
/// <param name="taskId"></param>
30+
/// <returns></returns>
31+
Task<List<ArtifactReceivedItems>?> GetAllAsync(string workflowInstance, string taskId);
32+
33+
/// <summary>
34+
/// Adds an item to the ArtifactsReceivedItems collection.
35+
/// </summary>
36+
/// <param name="item"></param>
37+
/// <returns></returns>
38+
Task AddItemAsync(ArtifactReceivedItems item);
39+
40+
/// <summary>
41+
/// Adds an item to the ArtifactsReceivedItems collection.
42+
/// </summary>
43+
/// <param name="workflowInstanceId"></param>
44+
/// <param name="taskId"></param>
45+
/// <param name="artifactsOutputs"></param>
46+
/// <returns></returns>
47+
Task AddItemAsync(string workflowInstanceId, string taskId, List<Artifact> artifactsOutputs);
48+
49+
Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId,
50+
IEnumerable<Artifact> artifactsOutputs);
51+
}
52+
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2022 MONAI Consortium
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Linq;
20+
using System.Threading.Tasks;
21+
using Ardalis.GuardClauses;
22+
using Microsoft.Extensions.Logging;
23+
using Microsoft.Extensions.Options;
24+
using Monai.Deploy.WorkflowManager.Common.Database.Options;
25+
using MongoDB.Bson;
26+
using MongoDB.Driver;
27+
using Artifact = Monai.Deploy.Messaging.Common.Artifact;
28+
29+
namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories
30+
{
31+
public class ArtifactReceivedDetails : Artifact
32+
{
33+
/// <summary>
34+
/// Gets or Sets LastReceived.
35+
/// </summary>
36+
public DateTime? Received { get; set; } = null;
37+
38+
public static ArtifactReceivedDetails FromArtifact(Artifact artifact) =>
39+
new()
40+
{
41+
Received = DateTime.UtcNow,
42+
Type = artifact.Type,
43+
Path = artifact.Path
44+
};
45+
}
46+
47+
public class ArtifactReceivedItems
48+
{
49+
public string Id { get; set; }
50+
51+
/// <summary>
52+
/// Gets or Sets WorkflowInstanceId.
53+
/// </summary>
54+
public string WorkflowInstanceId { get; set; } = string.Empty;
55+
56+
/// <summary>
57+
/// Gets or Sets TaskId.
58+
/// </summary>
59+
public string TaskId { get; set; } = string.Empty;
60+
61+
/// <summary>
62+
/// Gets or Sets Artifacts.
63+
/// </summary>
64+
public List<ArtifactReceivedDetails> Artifacts { get; set; } = new();
65+
66+
/// <summary>
67+
/// The date Time this was received
68+
/// </summary>
69+
public DateTime Received { get; set; } = DateTime.UtcNow;
70+
}
71+
72+
public class ArtifactsRepository : IArtifactsRepository
73+
{
74+
private readonly ILogger<ArtifactsRepository> _logger;
75+
private readonly IMongoCollection<ArtifactReceivedItems> _artifactReceivedItemsCollection;
76+
77+
public ArtifactsRepository(
78+
IMongoClient client,
79+
IOptions<WorkloadManagerDatabaseSettings> bookStoreDatabaseSettings,
80+
ILogger<ArtifactsRepository> logger)
81+
{
82+
if (client == null)
83+
{
84+
throw new ArgumentNullException(nameof(client));
85+
}
86+
87+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
88+
var mongoDatabase = client.GetDatabase(bookStoreDatabaseSettings.Value.DatabaseName);
89+
_artifactReceivedItemsCollection = mongoDatabase.GetCollection<ArtifactReceivedItems>("ArtifactReceivedItems");
90+
EnsureIndex().GetAwaiter().GetResult();
91+
}
92+
private async Task EnsureIndex()
93+
{
94+
var indexName = "ArtifactReceivedWorkflowInstanceIdTaskIdIndex";
95+
96+
var model = new CreateIndexModel<ArtifactReceivedItems>(
97+
Builders<ArtifactReceivedItems>.IndexKeys.Ascending(s => s.WorkflowInstanceId).Ascending(s => s.TaskId),
98+
new CreateIndexOptions { Name = indexName }
99+
);
100+
101+
await MakeIndex(_artifactReceivedItemsCollection, indexName, model);
102+
103+
indexName = "ReceivedTime";
104+
105+
model = new CreateIndexModel<ArtifactReceivedItems>(
106+
Builders<ArtifactReceivedItems>.IndexKeys.Ascending(s => s.Received),
107+
new CreateIndexOptions { ExpireAfter = TimeSpan.FromDays(7), Name = "ReceivedTime" }
108+
);
109+
110+
await MakeIndex(_artifactReceivedItemsCollection, indexName, model);
111+
}
112+
private static async Task MakeIndex<T>(IMongoCollection<T> collection, string indexName, CreateIndexModel<T> model)
113+
{
114+
Guard.Against.Null(collection, nameof(collection));
115+
116+
var asyncCursor = (await collection.Indexes.ListAsync());
117+
var bsonDocuments = (await asyncCursor.ToListAsync());
118+
var indexes = bsonDocuments.Select(_ => _.GetElement("name").Value.ToString()).ToList();
119+
120+
// If index not present create it else skip.
121+
if (!indexes.Any(i => i is not null && i.Equals(indexName)))
122+
{
123+
await collection.Indexes.CreateOneAsync(model);
124+
}
125+
}
126+
127+
public async Task<List<ArtifactReceivedItems>?> GetAllAsync(string workflowInstance, string taskId)
128+
{
129+
var result = await _artifactReceivedItemsCollection.FindAsync(a => a.WorkflowInstanceId == workflowInstance && a.TaskId == taskId).ConfigureAwait(false);
130+
return await result.ToListAsync().ConfigureAwait(false);
131+
}
132+
133+
public Task AddItemAsync(ArtifactReceivedItems item)
134+
{
135+
return _artifactReceivedItemsCollection.InsertOneAsync(item);
136+
}
137+
138+
public Task AddItemAsync(string workflowInstanceId, string taskId, List<Artifact> artifactsOutputs)
139+
{
140+
var artifacts = artifactsOutputs.Select(a => new ArtifactReceivedDetails()
141+
{
142+
Path = a.Path,
143+
Type = a.Type,
144+
Received = DateTime.UtcNow
145+
});
146+
147+
var item = new ArtifactReceivedItems()
148+
{
149+
WorkflowInstanceId = workflowInstanceId,
150+
TaskId = taskId,
151+
Artifacts = artifacts.ToList()
152+
};
153+
154+
return _artifactReceivedItemsCollection.InsertOneAsync(item);
155+
}
156+
157+
public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId,
158+
IEnumerable<Artifact> artifactsOutputs)
159+
{
160+
var artifacts = artifactsOutputs.Select(a => new ArtifactReceivedDetails()
161+
{
162+
Path = a.Path,
163+
Type = a.Type,
164+
Received = DateTime.UtcNow
165+
});
166+
167+
var item = new ArtifactReceivedItems()
168+
{
169+
Id = workflowInstanceId + taskId,
170+
WorkflowInstanceId = workflowInstanceId,
171+
TaskId = taskId,
172+
Artifacts = artifacts.ToList()
173+
};
174+
175+
var result = await _artifactReceivedItemsCollection
176+
.FindAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId).ConfigureAwait(false);
177+
var existing = await result.FirstOrDefaultAsync().ConfigureAwait(false);
178+
179+
try
180+
{
181+
if (existing == null)
182+
{
183+
await _artifactReceivedItemsCollection.InsertOneAsync(item).ConfigureAwait(false);
184+
}
185+
else
186+
{
187+
item.Artifacts = item.Artifacts.Concat(existing.Artifacts).ToList();
188+
var update = Builders<ArtifactReceivedItems>.Update.Set(a => a.Artifacts, item.Artifacts);
189+
await _artifactReceivedItemsCollection
190+
.UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update)
191+
.ConfigureAwait(false);
192+
}
193+
}
194+
catch (Exception ex)
195+
{
196+
197+
throw;
198+
}
199+
200+
201+
}
202+
}
203+
}

src/WorkflowManager/Database/Repositories/RepositoryBase.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories
2424
{
2525
public abstract class RepositoryBase
2626
{
27-
public static async Task<long> CountAsync<T>(IMongoCollection<T> collection, FilterDefinition<T>? filter)
28-
=> await collection.CountDocumentsAsync(filter ?? Builders<T>.Filter.Empty);
27+
public static Task<long> CountAsync<T>(IMongoCollection<T> collection, FilterDefinition<T>? filter)
28+
=> collection.CountDocumentsAsync(filter ?? Builders<T>.Filter.Empty);
2929

3030
/// <summary>
3131
/// Get All T that match filters provided.
@@ -44,7 +44,7 @@ public static async Task<IList<T>> GetAllAsync<T>(IMongoCollection<T> collection
4444
.Skip(skip)
4545
.Limit(limit)
4646
.Sort(sortFunction)
47-
.ToListAsync();
47+
.ToListAsync().ConfigureAwait(false);
4848
}
4949

5050
public static async Task<IList<T>> GetAllAsync<T>(IMongoCollection<T> collection, FilterDefinition<T> filterFunction, SortDefinition<T> sortFunction, int? skip = null, int? limit = null)
@@ -54,7 +54,7 @@ public static async Task<IList<T>> GetAllAsync<T>(IMongoCollection<T> collection
5454
.Skip(skip)
5555
.Limit(limit)
5656
.Sort(sortFunction)
57-
.ToListAsync();
57+
.ToListAsync().ConfigureAwait(false);
5858
}
5959
}
6060
}

src/WorkflowManager/Logging/Log.200000.Workflow.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ public static partial class Log
8181
[LoggerMessage(EventId = 200019, Level = LogLevel.Debug, Message = "Task destination condition for task {taskId} with resolved condition: {resolvedConditional} resolved to false. initial conditional: {conditions}")]
8282
public static partial void TaskDestinationConditionFalse(this ILogger logger, string resolvedConditional, string conditions, string taskId);
8383

84+
[LoggerMessage(EventId = 200020, Level = LogLevel.Warning, Message = "Use new ArtifactReceived Queue for continuation messages.")]
85+
public static partial void DontUseWorkflowReceivedForPayload(this ILogger logger);
86+
8487
// Conditions Resolver
8588
[LoggerMessage(EventId = 210000, Level = LogLevel.Warning, Message = "Failed to parse condition: {condition}. resolvedConditional: {resolvedConditional}")]
8689
public static partial void FailedToParseCondition(this ILogger logger, string resolvedConditional, string condition, Exception ex);

src/WorkflowManager/Logging/Log.500000.Messaging.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,20 @@ public static partial class Log
7171
[LoggerMessage(EventId = 500016, Level = LogLevel.Debug, Message = "Export complete message received.")]
7272
public static partial void ExportCompleteReceived(this ILogger logger);
7373

74-
[LoggerMessage(EventId = 200017, Level = LogLevel.Debug, Message = "Workflow continuation event so not creating payload.")]
74+
[LoggerMessage(EventId = 500017, Level = LogLevel.Debug, Message = "ArtifactReceived message so not creating payload.")]
7575
public static partial void WorkflowContinuation(this ILogger logger);
76+
77+
[LoggerMessage(EventId = 500018, Level = LogLevel.Debug, Message = "ArtifactReceived message received.")]
78+
public static partial void ArtifactReceivedReceived(this ILogger logger);
79+
80+
[LoggerMessage(EventId = 500019, Level = LogLevel.Error, Message = "ArtifactReceived message {messageId} failed unexpectedly (no workflowId or TaskId ?) and has been requeued.")]
81+
public static partial void ArtifactReceivedRequeuePayloadCreateError(this ILogger logger, string messageId);
82+
83+
[LoggerMessage(EventId = 500020, Level = LogLevel.Error, Message = "ArtifactReveived message {messageId} failed unexpectedly and has been requeued.")]
84+
public static partial void ArtifactReceivedRequeueUnknownError(this ILogger logger, string messageId, Exception ex);
85+
86+
[LoggerMessage(EventId = 500021, Level = LogLevel.Error, Message = "ArtifactReveived message {messageId} is invalid and has been rejected without being requeued.")]
87+
public static partial void ArtifactReceivedRejectValidationError(this ILogger logger, string messageId);
88+
7689
}
7790
}

src/WorkflowManager/Logging/Log.700000.Artifact.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,12 @@ public static partial class Log
4444

4545
[LoggerMessage(EventId = 700007, Level = LogLevel.Information, Message = "Task Dispatch resolved successfully output artifacts: PayloadId: {payloadId}, TaskId: {taskId}, WorkflowInstanceId: {workflowInstanceId}, WorkflowRevisionId: {workflowRevisionId}, output artifact object: {pathOutputArtifacts}")]
4646
public static partial void LogGeneralTaskDispatchInformation(this ILogger logger, string payloadId, string taskId, string workflowInstanceId, string workflowRevisionId, string pathOutputArtifacts);
47+
48+
[LoggerMessage(EventId = 700008, Level = LogLevel.Warning, Message = "Unexpected Artifacts output artifacts: TaskId: {taskId}, WorkflowInstanceId: {workflowInstanceId}, output artifact object: {unexpectedArtifacts}")]
49+
public static partial void UnexpectedArtifactsReceived(this ILogger logger, string taskId, string workflowInstanceId, string unexpectedArtifacts);
50+
51+
[LoggerMessage(EventId = 700009, Level = LogLevel.Debug, Message = "Mandatory output artifacts for task {taskId} are missing. waiting for remaining artifacts... {missingArtifacts}")]
52+
public static partial void MandatoryOutputArtifactsMissingForTask(this ILogger logger, string taskId, string missingArtifacts);
53+
4754
}
4855
}

0 commit comments

Comments
 (0)