Skip to content

Commit e9855a3

Browse files
committed
remove the need for a double copy to minio
Signed-off-by: Neil South <neil.south@answerdigital.com>
1 parent d27c8e3 commit e9855a3

File tree

10 files changed

+45
-25
lines changed

10 files changed

+45
-25
lines changed

src/Api/Storage/FileStorageMetadata.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,5 +116,7 @@ public virtual void SetFailed()
116116
{
117117
File.SetFailed();
118118
}
119+
120+
public string PayloadId { get; set; }
119121
}
120122
}

src/InformaticsGateway/Services/Connectors/DataRetrievalService.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ private async Task NotifyNewInstance(InferenceRequest inferenceRequest, Dictiona
198198
{
199199
retrievedFiles[key].SetWorkflows(inferenceRequest.Application.Id);
200200
}
201+
var FileMeta = retrievedFiles[key];
202+
FileMeta.PayloadId = inferenceRequest.TransactionId;
201203
_uploadQueue.Queue(retrievedFiles[key]);
202204
await _payloadAssembler.Queue(inferenceRequest.TransactionId, retrievedFiles[key]).ConfigureAwait(false);
203205
}

src/InformaticsGateway/Services/Connectors/IPayloadAssembler.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17+
using System;
1718
using System.Threading;
1819
using System.Threading.Tasks;
1920
using Monai.Deploy.InformaticsGateway.Api.Storage;
@@ -30,15 +31,15 @@ internal interface IPayloadAssembler
3031
/// </summary>
3132
/// <param name="bucket">The bucket group the file belongs to.</param>
3233
/// <param name="file">Path to the file to be added to the payload bucket.</param>
33-
Task Queue(string bucket, FileStorageMetadata file);
34+
Task<Guid> Queue(string bucket, FileStorageMetadata file);
3435

3536
/// <summary>
3637
/// Queue a new file for the spcified payload bucket.
3738
/// </summary>
3839
/// <param name="bucket">The bucket group the file belongs to.</param>
3940
/// <param name="file">Path to the file to be added to the payload bucket.</param>
4041
/// <param name="timeout">Number of seconds to wait for additional files.</param>
41-
Task Queue(string bucket, FileStorageMetadata file, uint timeout);
42+
Task<Guid> Queue(string bucket, FileStorageMetadata file, uint timeout);
4243

4344
/// <summary>
4445
/// Dequeue a payload from the queue for the message broker to notify subscribers.

src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,15 @@ private async Task RemovePendingPayloads()
8787
/// </summary>
8888
/// <param name="bucket">Name of the bucket where the file would be added to</param>
8989
/// <param name="file">Instance to be queued</param>
90-
public async Task Queue(string bucket, FileStorageMetadata file) => await Queue(bucket, file, DEFAULT_TIMEOUT).ConfigureAwait(false);
90+
public async Task<Guid> Queue(string bucket, FileStorageMetadata file) => await Queue(bucket, file, DEFAULT_TIMEOUT).ConfigureAwait(false);
9191

9292
/// <summary>
9393
/// Queues a new instance of <see cref="FileStorageMetadata"/>.
9494
/// </summary>
9595
/// <param name="bucket">Name of the bucket where the file would be added to</param>
9696
/// <param name="file">Instance to be queued</param>
9797
/// <param name="timeout">Number of seconds the bucket shall wait before sending the payload to be processed. Note: timeout cannot be modified once the bucket is created.</param>
98-
public async Task Queue(string bucket, FileStorageMetadata file, uint timeout)
98+
public async Task<Guid> Queue(string bucket, FileStorageMetadata file, uint timeout)
9999
{
100100
Guard.Against.Null(file);
101101

@@ -106,6 +106,7 @@ public async Task Queue(string bucket, FileStorageMetadata file, uint timeout)
106106
var payload = await CreateOrGetPayload(bucket, file.CorrelationId, timeout).ConfigureAwait(false);
107107
payload.Add(file);
108108
_logger.FileAddedToBucket(payload.Key, payload.Count);
109+
return payload.PayloadId;
109110
}
110111

111112
/// <summary>

src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,12 @@ private async Task MoveFile(Guid payloadId, string identity, StorageObjectMetada
183183

184184
try
185185
{
186-
await _storageService.CopyObjectAsync(
187-
file.TemporaryBucketName,
188-
file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath),
189-
_options.Value.Storage.StorageServiceBucketName,
190-
file.GetPayloadPath(payloadId),
191-
cancellationToken).ConfigureAwait(false);
186+
//await _storageService.CopyObjectAsync(
187+
// file.TemporaryBucketName,
188+
// file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath),
189+
// _options.Value.Storage.StorageServiceBucketName,
190+
// file.GetPayloadPath(payloadId),
191+
// cancellationToken).ConfigureAwait(false);
192192

193193
await VerifyFileExists(payloadId, file, cancellationToken).ConfigureAwait(false);
194194
}
@@ -212,8 +212,8 @@ await _storageService.CopyObjectAsync(
212212

213213
try
214214
{
215-
_logger.DeletingFileFromTemporaryBbucket(file.TemporaryBucketName, identity, file.TemporaryPath);
216-
await _storageService.RemoveObjectAsync(file.TemporaryBucketName, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), cancellationToken).ConfigureAwait(false);
215+
//_logger.DeletingFileFromTemporaryBbucket(file.TemporaryBucketName, identity, file.TemporaryPath);
216+
//await _storageService.RemoveObjectAsync(file.TemporaryBucketName, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), cancellationToken).ConfigureAwait(false);
217217
}
218218
catch (Exception)
219219
{

src/InformaticsGateway/Services/Scp/ApplicationEntityHandler.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,14 @@ public async Task HandleInstanceAsync(DicomCStoreRequest request, string calledA
113113
}
114114

115115
await dicomInfo.SetDataStreams(request.File, request.File.ToJson(_dicomJsonOptions, _validateDicomValueOnJsonSerialization), _options.Value.Storage.TemporaryDataStorage, _fileSystem, _options.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false);
116-
_uploadQueue.Queue(dicomInfo);
117116

118117
var dicomTag = FellowOakDicom.DicomTag.Parse(_configuration.Grouping);
119118
_logger.QueueInstanceUsingDicomTag(dicomTag);
120119
var key = request.Dataset.GetSingleValue<string>(dicomTag);
121-
await _payloadAssembler.Queue(key, dicomInfo, _configuration.Timeout).ConfigureAwait(false);
120+
121+
var payloadid = await _payloadAssembler.Queue(key, dicomInfo, _configuration.Timeout).ConfigureAwait(false);
122+
dicomInfo.PayloadId = payloadid.ToString();
123+
_uploadQueue.Queue(dicomInfo);
122124
}
123125

124126
private bool AcceptsSopClass(string sopClassUid)

src/InformaticsGateway/Services/Storage/ObjectUploadService.cs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,12 @@ private async Task ProcessObject(int thread, FileStorageMetadata blob)
158158
case DicomFileStorageMetadata dicom:
159159
if (!string.IsNullOrWhiteSpace(dicom.JsonFile.TemporaryPath))
160160
{
161-
await UploadFileAndConfirm(dicom.Id, dicom.JsonFile, dicom.Source, dicom.Workflows, _cancellationTokenSource.Token).ConfigureAwait(false);
161+
await UploadFileAndConfirm(dicom.Id, dicom.JsonFile, dicom.Source, dicom.Workflows, blob.PayloadId, _cancellationTokenSource.Token).ConfigureAwait(false);
162162
}
163163
break;
164164
}
165165

166-
await UploadFileAndConfirm(blob.Id, blob.File, blob.Source, blob.Workflows, _cancellationTokenSource.Token).ConfigureAwait(false);
166+
await UploadFileAndConfirm(blob.Id, blob.File, blob.Source, blob.Workflows, blob.PayloadId, _cancellationTokenSource.Token).ConfigureAwait(false);
167167
}
168168
catch (Exception ex)
169169
{
@@ -177,7 +177,7 @@ private async Task ProcessObject(int thread, FileStorageMetadata blob)
177177
}
178178
}
179179

180-
private async Task UploadFileAndConfirm(string identifier, StorageObjectMetadata storageObjectMetadata, string source, List<string> workflows, CancellationToken cancellationToken)
180+
private async Task UploadFileAndConfirm(string identifier, StorageObjectMetadata storageObjectMetadata, string source, List<string> workflows, string payloadId, CancellationToken cancellationToken)
181181
{
182182
Guard.Against.NullOrWhiteSpace(identifier);
183183
Guard.Against.Null(storageObjectMetadata);
@@ -192,12 +192,15 @@ private async Task UploadFileAndConfirm(string identifier, StorageObjectMetadata
192192
var count = 3;
193193
do
194194
{
195-
await UploadFile(storageObjectMetadata, source, workflows, cancellationToken).ConfigureAwait(false);
195+
await UploadFile(storageObjectMetadata, source, workflows, payloadId, cancellationToken).ConfigureAwait(false);
196196
if (count-- <= 0)
197197
{
198198
throw new FileUploadException($"Failed to upload file after retries {identifier}.");
199199
}
200-
} while (!(await VerifyExists(storageObjectMetadata.GetTempStoragPath(_configuration.Value.Storage.RemoteTemporaryStoragePath), cancellationToken).ConfigureAwait(false)));
200+
} while (!(
201+
//await VerifyExists(storageObjectMetadata.GetTempStoragPath(_configuration.Value.Storage.RemoteTemporaryStoragePath), cancellationToken).ConfigureAwait(false)
202+
await VerifyExists(storageObjectMetadata.GetPayloadPath(Guid.Parse(payloadId)), cancellationToken).ConfigureAwait(false)
203+
));
201204
}
202205

203206
private async Task<bool> VerifyExists(string path, CancellationToken cancellationToken)
@@ -214,14 +217,15 @@ private async Task<bool> VerifyExists(string path, CancellationToken cancellatio
214217
{
215218
var internalCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
216219
internalCancellationTokenSource.CancelAfter(_configuration.Value.Storage.StorageServiceListTimeout);
217-
var exists = await _storageService.VerifyObjectExistsAsync(_configuration.Value.Storage.TemporaryStorageBucket, path).ConfigureAwait(false);
220+
//var exists = await _storageService.VerifyObjectExistsAsync(_configuration.Value.Storage.TemporaryStorageBucket, path).ConfigureAwait(false);
221+
var exists = await _storageService.VerifyObjectExistsAsync(_configuration.Value.Storage.StorageServiceBucketName, path).ConfigureAwait(false);
218222
_logger.VerifyFileExists(path, exists);
219223
return exists;
220224
})
221225
.ConfigureAwait(false);
222226
}
223227

224-
private async Task UploadFile(StorageObjectMetadata storageObjectMetadata, string source, List<string> workflows, CancellationToken cancellationToken)
228+
private async Task UploadFile(StorageObjectMetadata storageObjectMetadata, string source, List<string> workflows, string payloadId, CancellationToken cancellationToken)
225229
{
226230
_logger.UploadingFileToTemporaryStore(storageObjectMetadata.TemporaryPath);
227231
var metadata = new Dictionary<string, string>
@@ -242,10 +246,17 @@ await Policy
242246
{
243247
if (storageObjectMetadata.IsUploaded) { return; }
244248

249+
////////////////////
250+
var bucket = _configuration.Value.Storage.StorageServiceBucketName;
251+
var path = storageObjectMetadata.GetPayloadPath(Guid.Parse(payloadId));
252+
//////////////////
253+
245254
storageObjectMetadata.Data.Seek(0, System.IO.SeekOrigin.Begin);
246255
await _storageService.PutObjectAsync(
247-
_configuration.Value.Storage.TemporaryStorageBucket,
248-
storageObjectMetadata.GetTempStoragPath(_configuration.Value.Storage.RemoteTemporaryStoragePath),
256+
//_configuration.Value.Storage.TemporaryStorageBucket,
257+
bucket,
258+
//storageObjectMetadata.GetTempStoragPath(_configuration.Value.Storage.RemoteTemporaryStoragePath),
259+
path,
249260
storageObjectMetadata.Data,
250261
storageObjectMetadata.Data.Length,
251262
storageObjectMetadata.ContentType,

src/InformaticsGateway/Test/Services/Storage/ObjectUploadServiceTest.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ private async Task<DicomFileStorageMetadata> GenerateDicomFileStorageMetadata()
166166
};
167167
var dicomFile = new DicomFile(dataset);
168168
await file.SetDataStreams(dicomFile, "[]", TemporaryDataStorageLocation.Memory);
169+
file.PayloadId = Guid.NewGuid().ToString();
169170
return file;
170171
}
171172
}

tests/Integration.Test/StepDefinitions/ExportServicesStepDefinitions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ namespace Monai.Deploy.InformaticsGateway.Integration.Test.StepDefinitions
3636
[CollectionDefinition("SpecFlowNonParallelizableFeatures", DisableParallelization = true)]
3737
public class DicomDimseScuServicesStepDefinitions
3838
{
39-
internal static readonly TimeSpan DicomScpWaitTimeSpan = TimeSpan.FromMinutes(3);
39+
internal static readonly TimeSpan DicomScpWaitTimeSpan = TimeSpan.FromMinutes(20);
4040
private readonly InformaticsGatewayConfiguration _informaticsGatewayConfiguration;
4141
private readonly Configurations _configuration;
4242
private readonly DicomScp _dicomServer;

tests/Integration.Test/StepDefinitions/SharedDefinitions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ namespace Monai.Deploy.InformaticsGateway.Integration.Test.StepDefinitions
2626
[CollectionDefinition("SpecFlowNonParallelizableFeatures", DisableParallelization = true)]
2727
public class SharedDefinitions
2828
{
29-
internal static readonly TimeSpan MessageWaitTimeSpan = TimeSpan.FromMinutes(10);
29+
internal static readonly TimeSpan MessageWaitTimeSpan = TimeSpan.FromMinutes(3);
3030
private readonly InformaticsGatewayConfiguration _informaticsGatewayConfiguration;
3131
private readonly RabbitMqConsumer _receivedMessages;
3232
private readonly Assertions _assertions;

0 commit comments

Comments
 (0)