@@ -77,7 +77,6 @@ public async Task MoveFilesAsync(Payload payload, ActionBlock<Payload> moveQueue
7777 var stopwatch = Stopwatch . StartNew ( ) ;
7878 try
7979 {
80- await Move ( payload , cancellationToken ) . ConfigureAwait ( false ) ;
8180 await NotifyIfCompleted ( payload , notificationQueue , cancellationToken ) . ConfigureAwait ( false ) ;
8281 }
8382 catch ( Exception ex )
@@ -127,149 +126,6 @@ private async Task NotifyIfCompleted(Payload payload, ActionBlock<Payload> notif
127126 }
128127 }
129128
130- private async Task Move ( Payload payload , CancellationToken cancellationToken )
131- {
132- Guard . Against . Null ( payload ) ;
133-
134- _logger . MovingFIlesInPayload ( payload . PayloadId , _options . Value . Storage . StorageServiceBucketName ) ;
135-
136- var options = new ParallelOptions
137- {
138- CancellationToken = cancellationToken ,
139- MaxDegreeOfParallelism = _options . Value . Storage . ConcurrentUploads
140- } ;
141-
142- var exceptions = new List < Exception > ( ) ;
143- await Parallel . ForEachAsync ( payload . Files , options , async ( file , cancellationToke ) =>
144- {
145- try
146- {
147- switch ( file )
148- {
149- case DicomFileStorageMetadata dicom :
150- if ( ! string . IsNullOrWhiteSpace ( dicom . JsonFile . TemporaryPath ) )
151- {
152- await MoveFile ( payload . PayloadId , dicom . Id , dicom . JsonFile , cancellationToken ) . ConfigureAwait ( false ) ;
153- }
154- break ;
155- }
156-
157- await MoveFile ( payload . PayloadId , file . Id , file . File , cancellationToken ) . ConfigureAwait ( false ) ;
158- }
159- catch ( Exception ex )
160- {
161- exceptions . Add ( ex ) ;
162- }
163- } ) . ConfigureAwait ( false ) ;
164-
165- if ( exceptions . Any ( ) )
166- {
167- throw new AggregateException ( exceptions ) ;
168- }
169- }
170-
171- private async Task MoveFile ( Guid payloadId , string identity , StorageObjectMetadata file , CancellationToken cancellationToken )
172- {
173- Guard . Against . NullOrWhiteSpace ( identity ) ;
174- Guard . Against . Null ( file ) ;
175-
176- if ( file . IsMoveCompleted )
177- {
178- _logger . AlreadyMoved ( payloadId , file . UploadPath ) ;
179- return ;
180- }
181-
182- _logger . MovingFileToPayloadDirectory ( payloadId , file . UploadPath ) ;
183-
184- try
185- {
186- //await _storageService.CopyObjectAsync(
187- // file.TemporaryBucketName,
188- // file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath),
189- // _options.Value.Storage.StorageServiceBucketName,
190- // file.GetPayloadPath(payloadId),
191- // cancellationToken).ConfigureAwait(false);
192-
193- await VerifyFileExists ( payloadId , file , cancellationToken ) . ConfigureAwait ( false ) ;
194- }
195- catch ( StorageObjectNotFoundException ex ) when ( ex . Message . Contains ( "Not found" , StringComparison . OrdinalIgnoreCase ) ) // TODO: StorageLib shall not throw any errors from MINIO
196- {
197- // when file cannot be found on the Storage Service, we assume file has been moved previously by verifying the file exists on destination.
198- _logger . FileMissingInPayload ( payloadId , file . GetTempStoragPath ( _options . Value . Storage . RemoteTemporaryStoragePath ) , ex ) ;
199- await VerifyFileExists ( payloadId , file , cancellationToken ) . ConfigureAwait ( false ) ;
200- }
201- catch ( StorageConnectionException ex )
202- {
203- _logger . StorageServiceConnectionError ( ex ) ;
204- throw new PayloadNotifyException ( PayloadNotifyException . FailureReason . ServiceUnavailable ) ;
205- }
206- catch ( Exception ex )
207- {
208- _logger . PayloadMoveException ( ex ) ;
209- await LogFilesInMinIo ( file . TemporaryBucketName , cancellationToken ) . ConfigureAwait ( false ) ;
210- throw new FileMoveException ( file . GetTempStoragPath ( _options . Value . Storage . RemoteTemporaryStoragePath ) , file . UploadPath , ex ) ;
211- }
212-
213- try
214- {
215- //_logger.DeletingFileFromTemporaryBbucket(file.TemporaryBucketName, identity, file.TemporaryPath);
216- //await _storageService.RemoveObjectAsync(file.TemporaryBucketName, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), cancellationToken).ConfigureAwait(false);
217- }
218- catch ( Exception )
219- {
220- _logger . ErrorDeletingFileAfterMoveComplete ( file . TemporaryBucketName , identity , file . TemporaryPath ) ;
221- }
222- finally
223- {
224- file . SetMoved ( _options . Value . Storage . StorageServiceBucketName ) ;
225- }
226- }
227-
228- private async Task VerifyFileExists ( Guid payloadId , StorageObjectMetadata file , CancellationToken cancellationToken )
229- {
230- await Policy
231- . Handle < VerifyObjectsException > ( )
232- . WaitAndRetryAsync (
233- _options . Value . Storage . Retries . RetryDelays ,
234- ( exception , timeSpan , retryCount , context ) =>
235- {
236- _logger . ErrorUploadingFileToTemporaryStore ( timeSpan , retryCount , exception ) ;
237- } )
238- . ExecuteAsync ( async ( ) =>
239- {
240- var internalCancellationTokenSource = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken ) ;
241- internalCancellationTokenSource . CancelAfter ( _options . Value . Storage . StorageServiceListTimeout ) ;
242- var exists = await _storageService . VerifyObjectExistsAsync ( _options . Value . Storage . StorageServiceBucketName , file . GetPayloadPath ( payloadId ) , cancellationToken ) . ConfigureAwait ( false ) ;
243- if ( ! exists )
244- {
245- _logger . FileMovedVerificationFailure ( payloadId , file . UploadPath ) ;
246- throw new PayloadNotifyException ( PayloadNotifyException . FailureReason . MoveFailure , false ) ;
247- }
248- } )
249- . ConfigureAwait ( false ) ;
250- }
251-
252- private async Task LogFilesInMinIo ( string bucketName , CancellationToken cancellationToken )
253- {
254- try
255- {
256- var internalCancellationTokenSource = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken ) ;
257- internalCancellationTokenSource . CancelAfter ( _options . Value . Storage . StorageServiceListTimeout ) ;
258- var listingResults = await _storageService . ListObjectsAsync ( bucketName , recursive : true , cancellationToken : internalCancellationTokenSource . Token ) . ConfigureAwait ( false ) ;
259- _logger . FilesFounddOnStorageService ( bucketName , listingResults . Count ) ;
260- var files = new List < string > ( ) ;
261- foreach ( var item in listingResults )
262- {
263- files . Add ( item . FilePath ) ;
264- }
265- _logger . FileFounddOnStorageService ( bucketName , string . Join ( Environment . NewLine , files ) ) ;
266- }
267- catch ( Exception ex )
268- {
269- _logger . ErrorListingFilesOnStorageService ( ex ) ;
270- }
271- }
272-
273129 private async Task < PayloadAction > UpdatePayloadState ( Payload payload , Exception ex , CancellationToken cancellationToken = default )
274130 {
275131 Guard . Against . Null ( payload ) ;
0 commit comments