Skip to content

Commit 6af787d

Browse files
committed
DownloadDirectoryProgress events
1 parent 22e8716 commit 6af787d

File tree

6 files changed

+724
-113
lines changed

6 files changed

+724
-113
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/DownloadDirectoryCommand.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,39 @@ internal partial class DownloadDirectoryCommand : BaseCommand<TransferUtilityDow
4848
long _transferredBytes;
4949
string _currentFile;
5050

51+
#region Event Firing Methods
52+
53+
private void FireTransferInitiatedEvent()
54+
{
55+
var transferInitiatedEventArgs = new DownloadDirectoryInitiatedEventArgs(_request);
56+
_request.OnRaiseDownloadDirectoryInitiatedEvent(transferInitiatedEventArgs);
57+
}
58+
59+
private void FireTransferCompletedEvent(TransferUtilityDownloadDirectoryResponse response)
60+
{
61+
var transferCompletedEventArgs = new DownloadDirectoryCompletedEventArgs(
62+
_request,
63+
response,
64+
Interlocked.Read(ref _transferredBytes),
65+
_totalBytes,
66+
_numberOfFilesDownloaded,
67+
_totalNumberOfFilesToDownload);
68+
_request.OnRaiseDownloadDirectoryCompletedEvent(transferCompletedEventArgs);
69+
}
70+
71+
private void FireTransferFailedEvent()
72+
{
73+
var eventArgs = new DownloadDirectoryFailedEventArgs(
74+
_request,
75+
Interlocked.Read(ref _transferredBytes),
76+
_totalBytes,
77+
_numberOfFilesDownloaded,
78+
_totalNumberOfFilesToDownload);
79+
_request.OnRaiseDownloadDirectoryFailedEvent(eventArgs);
80+
}
81+
82+
#endregion
83+
5184
internal DownloadDirectoryCommand(IAmazonS3 s3Client, TransferUtilityDownloadDirectoryRequest request)
5285
{
5386
if (s3Client == null)

sdk/src/Services/S3/Custom/Transfer/Internal/_async/DownloadCommand.async.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ internal partial class DownloadCommand : BaseCommand<TransferUtilityDownloadResp
3232
{
3333
public override async Task<TransferUtilityDownloadResponse> ExecuteAsync(CancellationToken cancellationToken)
3434
{
35-
ValidateRequest();
36-
3735
FireTransferInitiatedEvent();
38-
36+
37+
ValidateRequest();
38+
3939
GetObjectRequest getRequest = ConvertToGetObjectRequest(this._request);
4040

4141
var maxRetries = _s3Client.Config.MaxErrorRetry;

sdk/src/Services/S3/Custom/Transfer/Internal/_async/SimpleUploadCommand.async.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ public override async Task<TransferUtilityUploadResponse> ExecuteAsync(Cancellat
3232
{
3333
try
3434
{
35+
FireTransferInitiatedEvent();
36+
3537
if (AsyncThrottler != null)
3638
{
3739
await this.AsyncThrottler.WaitAsync(cancellationToken)
3840
.ConfigureAwait(continueOnCapturedContext: false);
3941
}
4042

41-
FireTransferInitiatedEvent();
42-
4343
var putRequest = ConstructRequest();
4444
var response = await _s3Client.PutObjectAsync(putRequest, cancellationToken)
4545
.ConfigureAwait(continueOnCapturedContext: false);

sdk/src/Services/S3/Custom/Transfer/Internal/_bcl+netstandard/DownloadDirectoryCommand.cs

Lines changed: 123 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -43,134 +43,150 @@ internal DownloadDirectoryCommand(IAmazonS3 s3Client, TransferUtilityDownloadDir
4343

4444
public override async Task<TransferUtilityDownloadDirectoryResponse> ExecuteAsync(CancellationToken cancellationToken)
4545
{
46-
ValidateRequest();
47-
EnsureDirectoryExists(new DirectoryInfo(this._request.LocalDirectory));
48-
49-
List<S3Object> objs;
50-
string listRequestPrefix;
46+
// Fire transfer initiated event immediately at the start
47+
FireTransferInitiatedEvent();
48+
5149
try
5250
{
53-
ListObjectsRequest listRequest = ConstructListObjectRequest();
54-
listRequestPrefix = listRequest.Prefix;
55-
objs = await GetS3ObjectsToDownloadAsync(listRequest, cancellationToken).ConfigureAwait(false);
56-
}
57-
catch (AmazonS3Exception ex)
58-
{
59-
if (ex.StatusCode != System.Net.HttpStatusCode.NotImplemented)
60-
throw;
51+
ValidateRequest();
52+
EnsureDirectoryExists(new DirectoryInfo(this._request.LocalDirectory));
6153

62-
ListObjectsV2Request listRequestV2 = ConstructListObjectRequestV2();
63-
listRequestPrefix = listRequestV2.Prefix;
64-
objs = await GetS3ObjectsToDownloadV2Async(listRequestV2, cancellationToken).ConfigureAwait(false);
65-
}
54+
List<S3Object> objs;
55+
string listRequestPrefix;
56+
try
57+
{
58+
ListObjectsRequest listRequest = ConstructListObjectRequest();
59+
listRequestPrefix = listRequest.Prefix;
60+
objs = await GetS3ObjectsToDownloadAsync(listRequest, cancellationToken).ConfigureAwait(false);
61+
}
62+
catch (AmazonS3Exception ex)
63+
{
64+
if (ex.StatusCode != System.Net.HttpStatusCode.NotImplemented)
65+
throw;
6666

67-
this._totalNumberOfFilesToDownload = objs.Count;
67+
ListObjectsV2Request listRequestV2 = ConstructListObjectRequestV2();
68+
listRequestPrefix = listRequestV2.Prefix;
69+
objs = await GetS3ObjectsToDownloadV2Async(listRequestV2, cancellationToken).ConfigureAwait(false);
70+
}
6871

69-
SemaphoreSlim asyncThrottler = null;
70-
CancellationTokenSource internalCts = null;
72+
this._totalNumberOfFilesToDownload = objs.Count;
7173

72-
try
73-
{
74-
asyncThrottler = DownloadFilesConcurrently ?
75-
new SemaphoreSlim(this._config.ConcurrentServiceRequests) :
76-
new SemaphoreSlim(1);
74+
SemaphoreSlim asyncThrottler = null;
75+
CancellationTokenSource internalCts = null;
7776

78-
internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
79-
var pendingTasks = new List<Task>();
80-
foreach (S3Object s3o in objs)
77+
try
8178
{
82-
if (s3o.Key.EndsWith("/", StringComparison.Ordinal))
83-
continue;
84-
85-
await asyncThrottler.WaitAsync(cancellationToken)
86-
.ConfigureAwait(continueOnCapturedContext: false);
79+
asyncThrottler = DownloadFilesConcurrently ?
80+
new SemaphoreSlim(this._config.ConcurrentServiceRequests) :
81+
new SemaphoreSlim(1);
8782

88-
try
83+
internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
84+
var pendingTasks = new List<Task>();
85+
foreach (S3Object s3o in objs)
8986
{
90-
cancellationToken.ThrowIfCancellationRequested();
91-
if (internalCts.IsCancellationRequested)
92-
{
93-
// Operation cancelled as one of the download requests failed with an exception,
94-
// don't schedule any more download tasks.
95-
// Don't throw an OperationCanceledException here as we want to process the
96-
// responses and throw the original exception.
97-
break;
98-
}
87+
if (s3o.Key.EndsWith("/", StringComparison.Ordinal))
88+
continue;
9989

100-
// Valid for serial uploads when
101-
// TransferUtilityDownloadDirectoryRequest.DownloadFilesConcurrently is set to false.
102-
int prefixLength = listRequestPrefix.Length;
90+
await asyncThrottler.WaitAsync(cancellationToken)
91+
.ConfigureAwait(continueOnCapturedContext: false);
10392

104-
// If DisableSlashCorrection is enabled (i.e. S3Directory is a key prefix) and it doesn't end with '/' then we need the parent directory to properly construct download path.
105-
if (_request.DisableSlashCorrection && !listRequestPrefix.EndsWith("/"))
93+
try
10694
{
107-
prefixLength = listRequestPrefix.LastIndexOf("/") + 1;
95+
cancellationToken.ThrowIfCancellationRequested();
96+
if (internalCts.IsCancellationRequested)
97+
{
98+
// Operation cancelled as one of the download requests failed with an exception,
99+
// don't schedule any more download tasks.
100+
// Don't throw an OperationCanceledException here as we want to process the
101+
// responses and throw the original exception.
102+
break;
103+
}
104+
105+
// Valid for serial uploads when
106+
// TransferUtilityDownloadDirectoryRequest.DownloadFilesConcurrently is set to false.
107+
int prefixLength = listRequestPrefix.Length;
108+
109+
// If DisableSlashCorrection is enabled (i.e. S3Directory is a key prefix) and it doesn't end with '/' then we need the parent directory to properly construct download path.
110+
if (_request.DisableSlashCorrection && !listRequestPrefix.EndsWith("/"))
111+
{
112+
prefixLength = listRequestPrefix.LastIndexOf("/") + 1;
113+
}
114+
115+
this._currentFile = s3o.Key.Substring(prefixLength);
116+
117+
TransferUtilityDownloadRequest downloadRequest = ConstructTransferUtilityDownloadRequest(s3o, prefixLength);
118+
119+
Action<Exception> onFailure = (ex) =>
120+
{
121+
this._request.OnRaiseObjectDownloadFailedEvent(
122+
new ObjectDownloadFailedEventArgs(
123+
this._request,
124+
downloadRequest,
125+
ex));
126+
};
127+
128+
var isValid = await _failurePolicy.ExecuteAsync(
129+
() => {
130+
//Ensure the target file is a rooted within LocalDirectory. Otherwise error.
131+
if(!InternalSDKUtils.IsFilePathRootedWithDirectoryPath(downloadRequest.FilePath, _request.LocalDirectory))
132+
{
133+
throw new AmazonClientException($"The file {downloadRequest.FilePath} is not allowed outside of the target directory {_request.LocalDirectory}.");
134+
}
135+
136+
return Task.CompletedTask;
137+
},
138+
onFailure,
139+
internalCts
140+
).ConfigureAwait(false);
141+
if (!isValid) continue;
142+
143+
var task = _failurePolicy.ExecuteAsync(
144+
async () => {
145+
var command = new DownloadCommand(this._s3Client, downloadRequest);
146+
await command.ExecuteAsync(internalCts.Token)
147+
.ConfigureAwait(false);
148+
},
149+
onFailure,
150+
internalCts
151+
);
152+
153+
pendingTasks.Add(task);
108154
}
109-
110-
this._currentFile = s3o.Key.Substring(prefixLength);
111-
112-
TransferUtilityDownloadRequest downloadRequest = ConstructTransferUtilityDownloadRequest(s3o, prefixLength);
113-
114-
Action<Exception> onFailure = (ex) =>
155+
finally
115156
{
116-
this._request.OnRaiseObjectDownloadFailedEvent(
117-
new ObjectDownloadFailedEventArgs(
118-
this._request,
119-
downloadRequest,
120-
ex));
121-
};
122-
123-
var isValid = await _failurePolicy.ExecuteAsync(
124-
() => {
125-
//Ensure the target file is a rooted within LocalDirectory. Otherwise error.
126-
if(!InternalSDKUtils.IsFilePathRootedWithDirectoryPath(downloadRequest.FilePath, _request.LocalDirectory))
127-
{
128-
throw new AmazonClientException($"The file {downloadRequest.FilePath} is not allowed outside of the target directory {_request.LocalDirectory}.");
129-
}
130-
131-
return Task.CompletedTask;
132-
},
133-
onFailure,
134-
internalCts
135-
).ConfigureAwait(false);
136-
if (!isValid) continue;
137-
138-
var task = _failurePolicy.ExecuteAsync(
139-
async () => {
140-
var command = new DownloadCommand(this._s3Client, downloadRequest);
141-
await command.ExecuteAsync(internalCts.Token)
142-
.ConfigureAwait(false);
143-
},
144-
onFailure,
145-
internalCts
146-
);
147-
148-
pendingTasks.Add(task);
157+
asyncThrottler.Release();
158+
}
149159
}
150-
finally
160+
await TaskHelpers.WhenAllOrFirstExceptionAsync(pendingTasks, cancellationToken)
161+
.ConfigureAwait(continueOnCapturedContext: false);
162+
163+
var response = new TransferUtilityDownloadDirectoryResponse
151164
{
152-
asyncThrottler.Release();
153-
}
165+
ObjectsDownloaded = _numberOfFilesDownloaded,
166+
ObjectsFailed = _errors.Count,
167+
Errors = _errors.ToList(),
168+
Result = _errors.Count == 0 ?
169+
DirectoryResult.Success :
170+
(_numberOfFilesDownloaded > 0 ?
171+
DirectoryResult.PartialSuccess :
172+
DirectoryResult.Failure)
173+
};
174+
175+
// Fire transfer completed event on success
176+
FireTransferCompletedEvent(response);
177+
return response;
154178
}
155-
await TaskHelpers.WhenAllOrFirstExceptionAsync(pendingTasks, cancellationToken)
156-
.ConfigureAwait(continueOnCapturedContext: false);
157-
158-
return new TransferUtilityDownloadDirectoryResponse
179+
finally
159180
{
160-
ObjectsDownloaded = _numberOfFilesDownloaded,
161-
ObjectsFailed = _errors.Count,
162-
Errors = _errors.ToList(),
163-
Result = _errors.Count == 0 ?
164-
DirectoryResult.Success :
165-
(_numberOfFilesDownloaded > 0 ?
166-
DirectoryResult.PartialSuccess :
167-
DirectoryResult.Failure)
168-
};
181+
internalCts?.Dispose();
182+
asyncThrottler?.Dispose();
183+
}
169184
}
170-
finally
185+
catch (Exception)
171186
{
172-
internalCts.Dispose();
173-
asyncThrottler.Dispose();
187+
// Fire transfer failed event on exception
188+
FireTransferFailedEvent();
189+
throw;
174190
}
175191
}
176192

0 commit comments

Comments
 (0)