Skip to content

Commit e96d4a6

Browse files
committed
refactor upload directory and fix concurrency bug
1 parent 6a31b25 commit e96d4a6

File tree

1 file changed

+202
-123
lines changed

1 file changed

+202
-123
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/_bcl+netstandard/UploadDirectoryCommand.cs

Lines changed: 202 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -31,154 +31,233 @@ internal partial class UploadDirectoryCommand : BaseCommand<TransferUtilityUploa
3131

3232
public override async Task<TransferUtilityUploadDirectoryResponse> ExecuteAsync(CancellationToken cancellationToken)
3333
{
34+
// Step 1: Setup paths and discover files
3435
string prefix = GetKeyPrefix();
3536
string basePath = new DirectoryInfo(this._request.Directory).FullName;
3637

37-
_logger.DebugFormat("UploadDirectoryCommand starting. BasePath={0}, Prefix={1}, UploadFilesConcurrently={2}, ConcurrentServiceRequests={3}",
38+
_logger.DebugFormat("UploadDirectoryCommand.ExecuteAsync: Starting - BasePath={0}, Prefix={1}, UploadFilesConcurrently={2}, ConcurrentServiceRequests={3}",
3839
basePath, prefix, UploadFilesConcurrently, this._config.ConcurrentServiceRequests);
3940

40-
string[] filePaths = await GetFiles(basePath, this._request.SearchPattern, this._request.SearchOption, cancellationToken)
41-
.ConfigureAwait(continueOnCapturedContext: false);
41+
// Step 2: Discover files to upload
42+
string[] filePaths = await DiscoverFilesAsync(basePath, cancellationToken)
43+
.ConfigureAwait(false);
44+
4245
this._totalNumberOfFiles = filePaths.Length;
46+
_logger.DebugFormat("UploadDirectoryCommand.ExecuteAsync: Discovered {0} file(s) to upload. TotalBytes={1}",
47+
_totalNumberOfFiles, _totalBytes);
4348

44-
_logger.DebugFormat("Discovered {0} file(s) to upload. TotalBytes={1}", _totalNumberOfFiles, _totalBytes);
45-
46-
// Two-level throttling architecture:
47-
// 1. File-level throttler: Controls how many files are uploaded concurrently
48-
// 2. HTTP-level throttler: Controls total HTTP requests across ALL file uploads
49-
//
50-
// Example with ConcurrentServiceRequests = 10:
51-
// - fileOperationThrottler = 10: Up to 10 files can upload simultaneously
52-
// - sharedHttpRequestThrottler = 10: All 10 files share 10 total HTTP request slots
53-
// - Without HTTP throttler: Would result in 10 files × 10 parts = 100 concurrent HTTP requests
54-
// - With HTTP throttler: Enforces 10 total concurrent HTTP requests across all files
55-
//
56-
// This prevents resource exhaustion when uploading many large files with multipart uploads.
57-
SemaphoreSlim sharedHttpRequestThrottler = null;
58-
SemaphoreSlim fileOperationThrottler = null;
59-
CancellationTokenSource internalCts = null;
60-
try
49+
// Step 3: Setup resources and execute uploads
50+
using (var resources = CreateUploadResources(cancellationToken))
6151
{
62-
var pendingTasks = new List<Task>();
63-
64-
// File-level throttler: Controls concurrent file operations
65-
fileOperationThrottler = UploadFilesConcurrently ?
66-
new SemaphoreSlim(this._config.ConcurrentServiceRequests) :
67-
new SemaphoreSlim(1);
68-
_logger.DebugFormat("Created fileOperationThrottler with initial count={0}", UploadFilesConcurrently ? this._config.ConcurrentServiceRequests : 1);
69-
70-
// HTTP-level throttler: Shared across all uploads to control total HTTP concurrency
71-
sharedHttpRequestThrottler = this._utility.S3Client is Amazon.S3.Internal.IAmazonS3Encryption ?
72-
// If we are using AmazonS3EncryptionClient, don't set the HTTP throttler.
73-
// The fileOperationThrottler will be used to control how many files are uploaded in parallel.
74-
// Each upload (multipart) will upload parts serially.
75-
null :
76-
// Use a throttler which will be shared between simple and multipart uploads
77-
// to control total concurrent HTTP requests across all file operations.
78-
new SemaphoreSlim(this._config.ConcurrentServiceRequests);
79-
if (sharedHttpRequestThrottler == null)
80-
{
81-
_logger.Debug(null, "sharedHttpRequestThrottler disabled due to encryption client. Multipart uploads will be serial per file.");
82-
}
83-
else
84-
{
85-
_logger.DebugFormat("Created sharedHttpRequestThrottler with initial count={0}", this._config.ConcurrentServiceRequests);
86-
}
52+
await ExecuteParallelUploadsAsync(
53+
filePaths,
54+
basePath,
55+
prefix,
56+
resources,
57+
cancellationToken)
58+
.ConfigureAwait(false);
59+
}
60+
61+
// Step 4: Build and return response
62+
_logger.DebugFormat("UploadDirectoryCommand.ExecuteAsync: Completed - FilesSuccessfullyUploaded={0}, FilesFailed={1}",
63+
_numberOfFilesSuccessfullyUploaded, _errors.Count);
64+
65+
return BuildResponse();
66+
}
67+
68+
/// <summary>
69+
/// Encapsulates disposable resources used during directory upload.
70+
/// </summary>
71+
private sealed class UploadResources : IDisposable
72+
{
73+
public SemaphoreSlim HttpRequestThrottler { get; }
74+
public CancellationTokenSource InternalCancellationTokenSource { get; }
75+
76+
public UploadResources(
77+
SemaphoreSlim httpRequestThrottler,
78+
CancellationTokenSource cancellationTokenSource)
79+
{
80+
HttpRequestThrottler = httpRequestThrottler;
81+
InternalCancellationTokenSource = cancellationTokenSource;
82+
}
8783

88-
internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
84+
public void Dispose()
85+
{
86+
InternalCancellationTokenSource?.Dispose();
87+
HttpRequestThrottler?.Dispose();
88+
}
89+
}
8990

90-
foreach (string filepath in filePaths)
91+
/// <summary>
92+
/// Discovers files to upload from the local directory and calculates total bytes.
93+
/// </summary>
94+
private async Task<string[]> DiscoverFilesAsync(string basePath, CancellationToken cancellationToken)
95+
{
96+
return await Task.Run(() =>
97+
{
98+
var filePaths = Directory.GetFiles(
99+
basePath,
100+
this._request.SearchPattern,
101+
this._request.SearchOption);
102+
103+
foreach (var filePath in filePaths)
91104
{
92-
_logger.DebugFormat("Waiting for fileOperationThrottler to schedule file.");
93-
await fileOperationThrottler.WaitAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
94-
_logger.DebugFormat("Acquired fileOperationThrottler. Currently scheduled: {0}", pendingTasks.Count + 1);
95-
96-
try
97-
{
98-
cancellationToken.ThrowIfCancellationRequested();
99-
if (internalCts.IsCancellationRequested)
100-
{
101-
// Operation cancelled as one of the upload requests failed with an exception,
102-
// don't schedule any more upload tasks.
103-
// Don't throw an OperationCanceledException here as we want to process the
104-
// responses and throw the original exception.
105-
_logger.Debug(null, "Internal cancellation requested; breaking out of scheduling loop.");
106-
break;
107-
}
108-
109-
var uploadRequest = ConstructRequest(basePath, filepath, prefix);
110-
111-
Action<Exception> onFailure = (ex) =>
112-
{
113-
this._request.OnRaiseObjectUploadFailedEvent(
114-
new ObjectUploadFailedEventArgs(
115-
this._request,
116-
uploadRequest,
117-
ex));
118-
};
119-
120-
var task = _failurePolicy.ExecuteAsync(
121-
async () => {
122-
_logger.DebugFormat("Starting upload command");
123-
var command = _utility.GetUploadCommand(uploadRequest, sharedHttpRequestThrottler);
124-
await command.ExecuteAsync(internalCts.Token)
125-
.ConfigureAwait(false);
126-
var uploaded = Interlocked.Increment(ref _numberOfFilesSuccessfullyUploaded);
127-
_logger.DebugFormat("Completed upload. FilesSuccessfullyUploaded={0}", uploaded);
128-
},
129-
onFailure,
130-
internalCts
131-
);
132-
133-
pendingTasks.Add(task);
134-
_logger.DebugFormat("Scheduled upload task. PendingTasks=01}", pendingTasks.Count);
135-
}
136-
finally
137-
{
138-
fileOperationThrottler.Release();
139-
}
105+
_totalBytes += new FileInfo(filePath).Length;
140106
}
141107

142-
_logger.DebugFormat("Awaiting completion of {0} scheduled task(s)", pendingTasks.Count);
143-
await TaskHelpers.WhenAllOrFirstExceptionAsync(pendingTasks, cancellationToken)
144-
.ConfigureAwait(continueOnCapturedContext: false);
108+
return filePaths;
109+
}, cancellationToken).ConfigureAwait(false);
110+
}
111+
112+
/// <summary>
113+
/// Creates resources needed for parallel uploads with proper throttling.
114+
///
115+
/// Throttling architecture:
116+
/// - Task pool pattern (ForEachWithConcurrencyAsync): Controls concurrent file uploads
117+
/// - HttpRequestThrottler: Controls total HTTP requests across ALL file uploads
118+
///
119+
/// Example with ConcurrentServiceRequests = 10:
120+
/// - Task pool creates max 10 concurrent file upload tasks
121+
/// - HttpRequestThrottler = 10: All files share 10 total HTTP request slots
122+
/// - Without HTTP throttler: 10 multipart files × 10 parts = 100 concurrent HTTP requests
123+
/// - With HTTP throttler: Enforces 10 total concurrent HTTP requests across all files
124+
///
125+
/// Special case: When using AmazonS3EncryptionClient, HTTP throttler is disabled.
126+
/// The task pool concurrency control is sufficient since encryption uploads are serial per file.
127+
/// </summary>
128+
private UploadResources CreateUploadResources(CancellationToken cancellationToken)
129+
{
130+
SemaphoreSlim httpRequestThrottler = null;
131+
132+
// HTTP-level throttler: Shared across all uploads to control total HTTP concurrency
133+
// Disabled for encryption client since each upload processes parts serially
134+
if (this._utility.S3Client is Amazon.S3.Internal.IAmazonS3Encryption)
135+
{
136+
_logger.DebugFormat("UploadDirectoryCommand.CreateUploadResources: HTTP throttler disabled for encryption client. Multipart uploads will be serial per file.");
145137
}
146-
finally
138+
else
147139
{
148-
internalCts.Dispose();
149-
fileOperationThrottler.Dispose();
150-
sharedHttpRequestThrottler?.Dispose();
151-
_logger.DebugFormat("UploadDirectoryCommand finished. FilesSuccessfullyUploaded={0}", _numberOfFilesSuccessfullyUploaded);
140+
httpRequestThrottler = new SemaphoreSlim(this._config.ConcurrentServiceRequests);
141+
_logger.DebugFormat("UploadDirectoryCommand.CreateUploadResources: Created HTTP throttler with MaxConcurrentRequests={0}",
142+
this._config.ConcurrentServiceRequests);
152143
}
153144

145+
var internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
146+
147+
return new UploadResources(httpRequestThrottler, internalCts);
148+
}
149+
150+
/// <summary>
151+
/// Executes parallel uploads of all files using task pool pattern.
152+
/// Only creates as many tasks as the concurrency limit allows (not all files up front).
153+
/// </summary>
154+
private async Task ExecuteParallelUploadsAsync(
155+
string[] filePaths,
156+
string basePath,
157+
string prefix,
158+
UploadResources resources,
159+
CancellationToken cancellationToken)
160+
{
161+
int concurrencyLevel = UploadFilesConcurrently
162+
? this._config.ConcurrentServiceRequests
163+
: 1;
164+
165+
_logger.DebugFormat("UploadDirectoryCommand.ExecuteParallelUploadsAsync: Starting task pool with ConcurrencyLevel={0}, TotalFiles={1}",
166+
concurrencyLevel, filePaths.Length);
167+
168+
await TaskHelpers.ForEachWithConcurrencyAsync(
169+
filePaths,
170+
concurrencyLevel,
171+
async (filepath, ct) =>
172+
{
173+
ct.ThrowIfCancellationRequested();
174+
175+
await UploadSingleFileAsync(
176+
filepath,
177+
basePath,
178+
prefix,
179+
resources.HttpRequestThrottler,
180+
resources.InternalCancellationTokenSource)
181+
.ConfigureAwait(false);
182+
},
183+
cancellationToken)
184+
.ConfigureAwait(false);
185+
186+
_logger.DebugFormat("UploadDirectoryCommand.ExecuteParallelUploadsAsync: Task pool completed - FilesSuccessfullyUploaded={0}, FilesFailed={1}",
187+
_numberOfFilesSuccessfullyUploaded, _errors.Count);
188+
}
189+
190+
/// <summary>
191+
/// Uploads a single file to S3 with failure handling.
192+
/// </summary>
193+
private async Task UploadSingleFileAsync(
194+
string filepath,
195+
string basePath,
196+
string prefix,
197+
SemaphoreSlim httpRequestThrottler,
198+
CancellationTokenSource internalCts)
199+
{
200+
if (internalCts.IsCancellationRequested)
201+
return;
202+
203+
var uploadRequest = ConstructRequest(basePath, filepath, prefix);
204+
205+
// Create failure callback
206+
Action<Exception> onFailure = (ex) =>
207+
{
208+
this._request.OnRaiseObjectUploadFailedEvent(
209+
new ObjectUploadFailedEventArgs(
210+
this._request,
211+
uploadRequest,
212+
ex));
213+
};
214+
215+
// Execute upload with failure policy
216+
await _failurePolicy.ExecuteAsync(
217+
() => ExecuteUploadCommandAsync(uploadRequest, httpRequestThrottler, internalCts.Token),
218+
onFailure,
219+
internalCts
220+
).ConfigureAwait(false);
221+
}
222+
223+
/// <summary>
224+
/// Creates and executes the appropriate upload command for the file.
225+
/// </summary>
226+
private async Task ExecuteUploadCommandAsync(
227+
TransferUtilityUploadRequest uploadRequest,
228+
SemaphoreSlim httpRequestThrottler,
229+
CancellationToken cancellationToken)
230+
{
231+
_logger.DebugFormat("UploadDirectoryCommand.ExecuteUploadCommandAsync: Starting upload command");
232+
233+
var command = _utility.GetUploadCommand(uploadRequest, httpRequestThrottler);
234+
await command.ExecuteAsync(cancellationToken).ConfigureAwait(false);
235+
236+
var uploaded = Interlocked.Increment(ref _numberOfFilesSuccessfullyUploaded);
237+
_logger.DebugFormat("UploadDirectoryCommand.ExecuteUploadCommandAsync: Completed upload. FilesSuccessfullyUploaded={0}", uploaded);
238+
}
239+
240+
/// <summary>
241+
/// Builds the response object based on upload results.
242+
/// </summary>
243+
private TransferUtilityUploadDirectoryResponse BuildResponse()
244+
{
154245
var response = new TransferUtilityUploadDirectoryResponse
155246
{
156247
ObjectsUploaded = _numberOfFilesSuccessfullyUploaded,
157248
ObjectsFailed = _errors.Count,
158249
Errors = _errors.ToList(),
159-
Result = _errors.Count == 0 ?
160-
DirectoryResult.Success :
161-
(_numberOfFilesSuccessfullyUploaded > 0 ?
162-
DirectoryResult.PartialSuccess :
163-
DirectoryResult.Failure)
250+
Result = _errors.Count == 0
251+
? DirectoryResult.Success
252+
: (_numberOfFilesSuccessfullyUploaded > 0
253+
? DirectoryResult.PartialSuccess
254+
: DirectoryResult.Failure)
164255
};
165256

166-
_logger.DebugFormat("Response summary: Uploaded={0}, Failed={1}, Result={2}", response.ObjectsUploaded, response.ObjectsFailed, response.Result);
167-
return response;
168-
}
169-
170-
private Task<string[]> GetFiles(string path, string searchPattern, SearchOption searchOption, CancellationToken cancellationToken)
171-
{
172-
return Task.Run(() =>
173-
{
174-
var filePaths = Directory.GetFiles(path, searchPattern, searchOption);
175-
foreach (var filePath in filePaths)
176-
{
177-
_totalBytes += new FileInfo(filePath).Length;
178-
}
179-
return filePaths;
180-
}, cancellationToken);
257+
_logger.DebugFormat("UploadDirectoryCommand.BuildResponse: Uploaded={0}, Failed={1}, Result={2}",
258+
response.ObjectsUploaded, response.ObjectsFailed, response.Result);
181259

260+
return response;
182261
}
183262
}
184263
}

0 commit comments

Comments
 (0)