-
Notifications
You must be signed in to change notification settings - Fork 4
Ai 230 #894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Ai 230 #894
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
7786e8f
changes to add output artifact and validation
lillie-dae b97b6d5
minor change to using statements
lillie-dae b7d06ad
remove comment
lillie-dae 7c26b74
Merge branch 'develop' into AI-229-add-output-artifact-and-validation
lillie-dae 1584e56
add header files
lillie-dae 6351295
Merge branch 'AI-229-add-output-artifact-and-validation' of https://g…
lillie-dae 7a2e104
import missing references
lillie-dae ca2fa88
updated packages
lillie-dae 84d7db0
bump package
lillie-dae ab793e1
bump package to release version
lillie-dae d5d1abc
adding lisener for artifact received
neildsouth ffe1282
updated ProcessArtifactReceived
lillie-dae 7c9554f
small fixups
neildsouth f7dfdaf
added artifact repo
lillie-dae 541eeef
Merge branch 'AI-230' of https://github.com/Project-MONAI/monai-deplo…
lillie-dae 8f8781d
fix up tests
neildsouth 9c35e22
fix tests
neildsouth 4206b93
merge in dev
lillie-dae 9b5a5a9
added EventPayloadValidatorTests and WorkflowExecuterServiceTests and…
lillie-dae d0b61b2
reduced code duplication
lillie-dae 4d32c70
adding indexs
neildsouth a9898e4
minor fixes
lillie-dae 69d7137
minor fixes
lillie-dae ee61cf9
minor fixes
lillie-dae 5388837
final adjustments
neildsouth df25fa2
merge in
neildsouth File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
52 changes: 52 additions & 0 deletions
52
src/WorkflowManager/Database/Interfaces/IArtifactsRepository.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Copyright 2022 MONAI Consortium | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| using System.Collections.Generic; | ||
| using System.Threading.Tasks; | ||
| using Artifact = Monai.Deploy.Messaging.Common.Artifact; | ||
|
|
||
| namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories | ||
| { | ||
| public interface IArtifactsRepository | ||
| { | ||
| /// <summary> | ||
| /// Gets All ArtifactsReceivedItems by workflowInstance and taskId. | ||
| /// </summary> | ||
| /// <param name="workflowInstance"></param> | ||
| /// <param name="taskId"></param> | ||
| /// <returns></returns> | ||
| Task<List<ArtifactReceivedItems>?> GetAllAsync(string workflowInstance, string taskId); | ||
|
|
||
| /// <summary> | ||
| /// Adds an item to the ArtifactsReceivedItems collection. | ||
| /// </summary> | ||
| /// <param name="item"></param> | ||
| /// <returns></returns> | ||
| Task AddItemAsync(ArtifactReceivedItems item); | ||
|
|
||
| /// <summary> | ||
| /// Adds an item to the ArtifactsReceivedItems collection. | ||
| /// </summary> | ||
| /// <param name="workflowInstanceId"></param> | ||
| /// <param name="taskId"></param> | ||
| /// <param name="artifactsOutputs"></param> | ||
| /// <returns></returns> | ||
| Task AddItemAsync(string workflowInstanceId, string taskId, List<Artifact> artifactsOutputs); | ||
|
|
||
| Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId, | ||
| IEnumerable<Artifact> artifactsOutputs); | ||
| } | ||
| } |
193 changes: 193 additions & 0 deletions
193
src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,193 @@ | ||
| /* | ||
| * Copyright 2022 MONAI Consortium | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Threading.Tasks; | ||
| using Ardalis.GuardClauses; | ||
| using Microsoft.Extensions.Logging; | ||
| using Microsoft.Extensions.Options; | ||
| using Monai.Deploy.WorkflowManager.Common.Database.Options; | ||
| using MongoDB.Driver; | ||
| using Artifact = Monai.Deploy.Messaging.Common.Artifact; | ||
|
|
||
| namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories | ||
| { | ||
| public class ArtifactReceivedDetails : Artifact | ||
| { | ||
| /// <summary> | ||
| /// Gets or Sets LastReceived. | ||
| /// </summary> | ||
| public DateTime? Received { get; set; } = null; | ||
|
|
||
| public static ArtifactReceivedDetails FromArtifact(Artifact artifact) => | ||
| new() | ||
| { | ||
| Received = DateTime.UtcNow, | ||
| Type = artifact.Type, | ||
| Path = artifact.Path | ||
| }; | ||
| } | ||
|
|
||
| public class ArtifactReceivedItems | ||
| { | ||
| /// <summary> | ||
| /// Gets or Sets the Id. | ||
| /// </summary> | ||
| public double Id { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Gets or Sets WorkflowInstanceId. | ||
| /// </summary> | ||
| public string WorkflowInstanceId { get; set; } = string.Empty; | ||
|
|
||
| /// <summary> | ||
| /// Gets or Sets TaskId. | ||
| /// </summary> | ||
| public string TaskId { get; set; } = string.Empty; | ||
|
|
||
| /// <summary> | ||
| /// Gets or Sets Artifacts. | ||
| /// </summary> | ||
| public List<ArtifactReceivedDetails> Artifacts { get; set; } = new(); | ||
|
|
||
| /// <summary> | ||
| /// The date Time this was received | ||
| /// </summary> | ||
| public DateTime Received { get; set; } = DateTime.UtcNow; | ||
| } | ||
|
|
||
| public class ArtifactsRepository : IArtifactsRepository | ||
| { | ||
| private readonly ILogger<ArtifactsRepository> _logger; | ||
| private readonly IMongoCollection<ArtifactReceivedItems> _artifactReceivedItemsCollection; | ||
|
|
||
| public ArtifactsRepository( | ||
| IMongoClient client, | ||
| IOptions<WorkloadManagerDatabaseSettings> bookStoreDatabaseSettings, | ||
| ILogger<ArtifactsRepository> logger) | ||
| { | ||
| if (client == null) | ||
| { | ||
| throw new ArgumentNullException(nameof(client)); | ||
| } | ||
|
|
||
| _logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||
| var mongoDatabase = client.GetDatabase(bookStoreDatabaseSettings.Value.DatabaseName); | ||
| _artifactReceivedItemsCollection = mongoDatabase.GetCollection<ArtifactReceivedItems>("ArtifactReceivedItems"); | ||
| EnsureIndex().GetAwaiter().GetResult(); | ||
| } | ||
| private async Task EnsureIndex() | ||
| { | ||
| var indexName = "ArtifactReceivedWorkflowInstanceIdTaskIdIndex"; | ||
|
|
||
| var model = new CreateIndexModel<ArtifactReceivedItems>( | ||
| Builders<ArtifactReceivedItems>.IndexKeys.Ascending(s => s.WorkflowInstanceId).Ascending(s => s.TaskId), | ||
| new CreateIndexOptions { Name = indexName } | ||
| ); | ||
|
|
||
| await MakeIndex(_artifactReceivedItemsCollection, indexName, model); | ||
|
|
||
| indexName = "ReceivedTime"; | ||
|
|
||
| model = new CreateIndexModel<ArtifactReceivedItems>( | ||
| Builders<ArtifactReceivedItems>.IndexKeys.Ascending(s => s.Received), | ||
| new CreateIndexOptions { ExpireAfter = TimeSpan.FromDays(7), Name = "ReceivedTime" } | ||
| ); | ||
|
|
||
| await MakeIndex(_artifactReceivedItemsCollection, indexName, model); | ||
| } | ||
| private static async Task MakeIndex<T>(IMongoCollection<T> collection, string indexName, CreateIndexModel<T> model) | ||
| { | ||
| Guard.Against.Null(collection, nameof(collection)); | ||
|
|
||
| var asyncCursor = (await collection.Indexes.ListAsync()); | ||
| var bsonDocuments = (await asyncCursor.ToListAsync()); | ||
| var indexes = bsonDocuments.Select(_ => _.GetElement("name").Value.ToString()).ToList(); | ||
|
|
||
| // If index not present create it else skip. | ||
| if (!indexes.Any(i => i is not null && i.Equals(indexName))) | ||
| { | ||
| await collection.Indexes.CreateOneAsync(model); | ||
| } | ||
| } | ||
|
|
||
| public async Task<List<ArtifactReceivedItems>?> GetAllAsync(string workflowInstance, string taskId) | ||
| { | ||
| var result = await _artifactReceivedItemsCollection.FindAsync(a => a.WorkflowInstanceId == workflowInstance && a.TaskId == taskId).ConfigureAwait(false); | ||
| return await result.ToListAsync().ConfigureAwait(false); | ||
| } | ||
|
|
||
| public Task AddItemAsync(ArtifactReceivedItems item) | ||
| { | ||
| return _artifactReceivedItemsCollection.InsertOneAsync(item); | ||
| } | ||
|
|
||
| public Task AddItemAsync(string workflowInstanceId, string taskId, List<Artifact> artifactsOutputs) | ||
| { | ||
| var artifacts = artifactsOutputs.Select(a => new ArtifactReceivedDetails() | ||
| { | ||
| Path = a.Path, | ||
| Type = a.Type, | ||
| Received = DateTime.UtcNow | ||
| }); | ||
|
|
||
| var item = new ArtifactReceivedItems() | ||
| { | ||
| WorkflowInstanceId = workflowInstanceId, | ||
| TaskId = taskId, | ||
| Artifacts = artifacts.ToList() | ||
| }; | ||
|
|
||
| return _artifactReceivedItemsCollection.InsertOneAsync(item); | ||
| } | ||
|
|
||
| public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId, | ||
| IEnumerable<Artifact> artifactsOutputs) | ||
| { | ||
| var artifacts = artifactsOutputs.Select(a => new ArtifactReceivedDetails() | ||
| { | ||
| Path = a.Path, | ||
| Type = a.Type, | ||
| Received = DateTime.UtcNow | ||
| }); | ||
|
|
||
| var item = new ArtifactReceivedItems() | ||
| { | ||
| WorkflowInstanceId = workflowInstanceId, | ||
| TaskId = taskId, | ||
| Artifacts = artifacts.ToList() | ||
| }; | ||
|
|
||
| var result = await _artifactReceivedItemsCollection | ||
| .FindAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId).ConfigureAwait(false); | ||
| var existing = await result.FirstOrDefaultAsync().ConfigureAwait(false); | ||
|
|
||
| if (existing == null) | ||
| { | ||
| await _artifactReceivedItemsCollection.InsertOneAsync(item).ConfigureAwait(false); | ||
| } | ||
| else | ||
| { | ||
| var update = Builders<ArtifactReceivedItems>.Update.Set(a => a.Artifacts, item.Artifacts); | ||
| await _artifactReceivedItemsCollection | ||
| .UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update) | ||
| .ConfigureAwait(false); | ||
| } | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.