-
Notifications
You must be signed in to change notification settings - Fork 874
add failure policy to upload directory #4181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add failure policy to upload directory #4181
Conversation
| { | ||
| try { Directory.Delete(localDir, true); } catch { } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add tests for uploaddirectory concurrency similar to
using Amazon.S3;
using Amazon.S3.Model;
using Amazon.S3.Transfer;
using Amazon.S3.Transfer.Internal;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AWSSDK_DotNet.IntegrationTests.Utils;
namespace AWSSDK.UnitTests
{
[TestClass]
public class DownloadDirectoryCommandTestsTwo
{
private string _testDirectory;
private Mock<IAmazonS3> _mockS3Client;
private TransferUtilityConfig _config;
[TestInitialize]
public void Setup()
{
_testDirectory = Path.Combine(Path.GetTempPath(), "DownloadDirectoryCommandTests_" + Guid.NewGuid().ToString("N").Substring(0, 8));
Directory.CreateDirectory(_testDirectory);
_mockS3Client = new Mock<IAmazonS3>();
_config = new TransferUtilityConfig
{
ConcurrentServiceRequests = 4
};
// Setup default S3 client config
var s3Config = new AmazonS3Config
{
BufferSize = 8192,
};
_mockS3Client.Setup(c => c.Config).Returns(s3Config);
}
[TestCleanup]
public void Cleanup()
{
if (Directory.Exists(_testDirectory))
{
try
{
Directory.Delete(_testDirectory, true);
}
catch
{
// Ignore cleanup errors in tests
}
}
}
#region Concurrency Control Tests
/// <summary>
/// Tests that ConcurrentServiceRequests setting actually limits concurrent file downloads.
/// This test will FAIL on the current broken implementation, demonstrating that
/// ConcurrentServiceRequests is not being respected.
///
/// Expected: Max 2 concurrent downloads (ConcurrentServiceRequests = 2)
/// Actual (broken): 5 concurrent downloads (all files download simultaneously)
/// </summary>
[TestMethod]
public async Task ExecuteAsync_ConcurrentServiceRequests_RespectsLimit()
{
// Arrange
var request = CreateDownloadDirectoryRequest();
request.DownloadFilesConcurrently = true;
// Use a low limit to make violation obvious
var config = new TransferUtilityConfig
{
ConcurrentServiceRequests = 2 // Only 2 files should download simultaneously
};
// Track concurrent downloads using thread-safe counter
var currentConcurrentDownloads = 0;
var maxObservedConcurrency = 0;
var concurrencyLock = new object();
var files = new Dictionary<string, long>
{
{ "file1.dat", 5 * 1024 * 1024 }, // 5MB files
{ "file2.dat", 5 * 1024 * 1024 },
{ "file3.dat", 5 * 1024 * 1024 },
{ "file4.dat", 5 * 1024 * 1024 },
{ "file5.dat", 5 * 1024 * 1024 } // 5 files total
};
// Setup directory listing
var listResponse = CreateListObjectsResponse(files);
_mockS3Client.Setup(c => c.ListObjectsAsync(
It.IsAny<ListObjectsRequest>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync(listResponse);
// Override GetObjectAsync to track concurrency
_mockS3Client.Setup(c => c.GetObjectAsync(
It.IsAny<GetObjectRequest>(),
It.IsAny<CancellationToken>()))
.Returns(async (GetObjectRequest req, CancellationToken ct) =>
{
// Increment counter when download starts
lock (concurrencyLock)
{
currentConcurrentDownloads++;
maxObservedConcurrency = Math.Max(maxObservedConcurrency, currentConcurrentDownloads);
Console.WriteLine($"Download started for {req.Key}. Current concurrent: {currentConcurrentDownloads}, Max observed: {maxObservedConcurrency}");
}
try
{
// Simulate some download time to ensure overlap
await Task.Delay(100, ct);
// Return mock response
var fileName = req.Key.Split('/').Last();
var fileSize = files[fileName];
var data = GenerateTestData((int)fileSize);
return new GetObjectResponse
{
BucketName = req.BucketName,
Key = req.Key,
ContentLength = fileSize,
ResponseStream = new MemoryStream(data),
ETag = "\"test-etag\""
};
}
finally
{
// Decrement counter when download completes
lock (concurrencyLock)
{
currentConcurrentDownloads--;
Console.WriteLine($"Download completed for {req.Key}. Current concurrent: {currentConcurrentDownloads}");
}
}
});
var command = new DownloadDirectoryCommand(_mockS3Client.Object, request, config);
// Act
await command.ExecuteAsync(CancellationToken.None);
// Assert
Console.WriteLine($"Test Results: Expected max concurrency ≤ {config.ConcurrentServiceRequests}, Observed: {maxObservedConcurrency}");
Assert.AreEqual(2, config.ConcurrentServiceRequests, "Test setup verification");
Assert.IsTrue(maxObservedConcurrency <= config.ConcurrentServiceRequests,
$"Max concurrent downloads ({maxObservedConcurrency}) should not exceed ConcurrentServiceRequests ({config.ConcurrentServiceRequests})");
}
/// <summary>
/// Tests that sequential mode (DownloadFilesConcurrently = false) downloads only one file at a time.
/// This test will FAIL on the current broken implementation, demonstrating that
/// sequential mode is not working correctly.
///
/// Expected: Max 1 concurrent download (sequential mode)
/// Actual (broken): 3 concurrent downloads (all files download simultaneously despite sequential setting)
/// </summary>
[TestMethod]
public async Task ExecuteAsync_SequentialMode_DownloadsOneAtATime()
{
// Arrange
var request = CreateDownloadDirectoryRequest();
request.DownloadFilesConcurrently = false; // Sequential mode
var config = new TransferUtilityConfig
{
ConcurrentServiceRequests = 10 // High limit, but sequential should still be 1
};
// Track concurrent downloads
var currentConcurrentDownloads = 0;
var maxObservedConcurrency = 0;
var concurrencyLock = new object();
var files = new Dictionary<string, long>
{
{ "file1.dat", 1024 },
{ "file2.dat", 1024 },
{ "file3.dat", 1024 }
};
// Setup directory listing
var listResponse = CreateListObjectsResponse(files);
_mockS3Client.Setup(c => c.ListObjectsAsync(
It.IsAny<ListObjectsRequest>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync(listResponse);
// Override GetObjectAsync to track concurrency
_mockS3Client.Setup(c => c.GetObjectAsync(
It.IsAny<GetObjectRequest>(),
It.IsAny<CancellationToken>()))
.Returns(async (GetObjectRequest req, CancellationToken ct) =>
{
lock (concurrencyLock)
{
currentConcurrentDownloads++;
maxObservedConcurrency = Math.Max(maxObservedConcurrency, currentConcurrentDownloads);
Console.WriteLine($"Sequential download started for {req.Key}. Current concurrent: {currentConcurrentDownloads}, Max observed: {maxObservedConcurrency}");
}
try
{
await Task.Delay(50, ct); // Brief delay
var fileName = req.Key.Split('/').Last();
var fileSize = files[fileName];
var data = GenerateTestData((int)fileSize);
return new GetObjectResponse
{
BucketName = req.BucketName,
Key = req.Key,
ContentLength = fileSize,
ResponseStream = new MemoryStream(data),
ETag = "\"test-etag\""
};
}
finally
{
lock (concurrencyLock)
{
currentConcurrentDownloads--;
Console.WriteLine($"Sequential download completed for {req.Key}. Current concurrent: {currentConcurrentDownloads}");
}
}
});
var command = new DownloadDirectoryCommand(_mockS3Client.Object, request, config);
// Act
await command.ExecuteAsync(CancellationToken.None);
// Assert
Console.WriteLine($"Sequential Test Results: Expected max concurrency = 1, Observed: {maxObservedConcurrency}");
Assert.AreEqual(1, maxObservedConcurrency,
$"Sequential mode should only download 1 file at a time, but observed {maxObservedConcurrency}");
}
#endregion
#region Helper Methods
private TransferUtilityDownloadDirectoryRequest CreateDownloadDirectoryRequest(
string bucketName = "test-bucket",
string s3Directory = "prefix",
string localDirectory = null)
{
localDirectory = localDirectory ?? _testDirectory;
return new TransferUtilityDownloadDirectoryRequest
{
BucketName = bucketName,
S3Directory = s3Directory,
LocalDirectory = localDirectory
};
}
private ListObjectsResponse CreateListObjectsResponse(Dictionary<string, long> files)
{
var s3Objects = files.Select(f => new S3Object
{
Key = $"prefix/{f.Key}",
Size = f.Value
}).ToList();
return new ListObjectsResponse
{
S3Objects = s3Objects
};
}
private byte[] GenerateTestData(int size)
{
var data = new byte[size];
var random = new Random(42); // Fixed seed for reproducible tests
random.NextBytes(data);
return data;
}
#endregion
}
}
just modify to be upload. i want to ensure we dont break anything with that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| [assembly: InternalsVisibleTo("AWSSDK.UnitTests.S3.NetFramework, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db5f59f098d27276c7833875a6263a3cc74ab17ba9a9df0b52aedbe7252745db7274d5271fd79c1f08f668ecfa8eaab5626fa76adc811d3c8fc55859b0d09d3bc0a84eecd0ba891f2b8a2fc55141cdcc37c2053d53491e650a479967c3622762977900eddbf1252ed08a2413f00a28f3a0752a81203f03ccb7f684db373518b4")] | ||
| [assembly: InternalsVisibleTo("AWSSDK.UnitTests.NetFramework, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db5f59f098d27276c7833875a6263a3cc74ab17ba9a9df0b52aedbe7252745db7274d5271fd79c1f08f668ecfa8eaab5626fa76adc811d3c8fc55859b0d09d3bc0a84eecd0ba891f2b8a2fc55141cdcc37c2053d53491e650a479967c3622762977900eddbf1252ed08a2413f00a28f3a0752a81203f03ccb7f684db373518b4")] | ||
| [assembly: InternalsVisibleTo("AWSSDK.IntegrationTests.S3.NetFramework, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db5f59f098d27276c7833875a6263a3cc74ab17ba9a9df0b52aedbe7252745db7274d5271fd79c1f08f668ecfa8eaab5626fa76adc811d3c8fc55859b0d09d3bc0a84eecd0ba891f2b8a2fc55141cdcc37c2053d53491e650a479967c3622762977900eddbf1252ed08a2413f00a28f3a0752a81203f03ccb7f684db373518b4")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file and the csproj files get auto generated. there is a AssemblyInfo.tt file that generates these. so you need to modify that and then rerun the generator as well to recreate the csproj files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making something internal visible to an integration tests seems like an anti-pattern... Is it really needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get where you're coming from but in our current implementation we don't return a response to the end user so i cant use the exact same public interface. We plan on adding one that returns the response in the future, at which point we can remove this anti-pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated AssemblyInfo.tt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if I'm not following, but are we trying to reference from the test project then? (I thought it was this interface but from your comment it doesn't exist yet?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ive made the new api here #4187 probably we can remove this after we get it all merged in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds failure policy configuration and event notification capabilities to the TransferUtility's upload directory operation, extending the existing failure handling pattern from download directory to upload directory. The feature allows users to configure whether directory upload operations should abort or continue when individual file uploads fail, and provides event notifications when failures occur.
Key Changes
- Added
FailurePolicyproperty toTransferUtilityUploadDirectoryRequestwith default value ofAbortOnFailureto maintain backward compatibility - Added
ObjectUploadFailedEventevent to notify users when individual file uploads fail during directory upload operations - Extended
TransferUtilityUploadDirectoryResponsewith properties to track upload results:ObjectsUploaded,ObjectsFailed,Errors, andResult
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/src/Services/S3/Custom/Transfer/TransferUtilityUploadDirectoryRequest.cs | Added FailurePolicy property and ObjectUploadFailedEvent with comprehensive XML documentation |
| sdk/src/Services/S3/Custom/Transfer/TransferUtilityUploadDirectoryResponse.cs | Added properties to track upload results (success/failure counts, errors, overall result) |
| sdk/src/Services/S3/Custom/Transfer/Internal/UploadDirectoryCommand.cs | Integrated failure policy pattern and error tracking for upload operations |
| sdk/src/Services/S3/Custom/Transfer/Internal/_bcl+netstandard/UploadDirectoryCommand.cs | Modified ExecuteAsync to use failure policy, raise events, and return detailed response |
| sdk/src/Services/S3/Custom/Transfer/Internal/_async/BaseCommand.async.cs | Removed ExecuteCommandAsync helper method as failure policy pattern now handles this responsibility |
| sdk/test/Services/S3/UnitTests/Custom/FailurePolicyTests.cs | Added comprehensive unit tests for upload failure scenarios (continue/abort, partial/all failures, event handling) |
| sdk/test/Services/S3/IntegrationTests/TransferUtilityTests.cs | Added integration tests validating failure policy behavior against real S3 scenarios |
| sdk/src/Services/S3/Properties/AssemblyInfo.cs | Added InternalsVisibleTo attribute for integration test assembly to enable internal class testing |
| sdk/test/Services/S3/IntegrationTests/AWSSDK.IntegrationTests.S3.NetFramework.csproj | Added assembly signing configuration and standardized indentation to tabs |
| sdk/test/IntegrationTests/AWSSDK.IntegrationTestUtilities.NetFramework.csproj | Added assembly signing configuration |
| generator/.DevConfigs/c49077d9-90b3-437f-b316-6d8d8833ae77.json | Dev config with appropriate minor version bump and descriptive changelog entries |
| var uploadCommand = _utility.GetUploadCommand(uploadRequest, sharedHttpRequestThrottler); | ||
|
|
||
| var task = ExecuteCommandAsync(uploadCommand, internalCts); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add debug logging to uploaddirectorycommand and download directory command related to concurrency and everything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| var uploadRequest = ConstructRequest(basePath, filepath, prefix); | ||
| var uploadCommand = _utility.GetUploadCommand(uploadRequest, sharedHttpRequestThrottler); | ||
| _logger.DebugFormat("Constructed upload request for Key={0}, FilePath={1}", uploadRequest.Key, uploadRequest.FilePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if we are allowed to log key/file path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the verbose logs to be safe
|
|
||
| Action<Exception> onFailure = (ex) => | ||
| { | ||
| _logger.Debug(ex, "Upload failed for Key={0}, FilePath={1}.", uploadRequest.Key, uploadRequest.FilePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here and other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the verbose logs to be safe
| [TestInitialize] | ||
| public void Setup() | ||
| { | ||
| _testDirectory = Path.Combine(Path.GetTempPath(), "UploadDirectoryCommandTests_" + Guid.NewGuid().ToString("N").Substring(0, 8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just noting for others. these tests currently fail, but pass on development branch. we are working on another fix to fix the concurrency issues
|
approving because we have this PR #4186 to refactor things after |
|
i just saw one other thing. FailurePolicy.cs and the DirectoryResult.cs should not be in a model folder. it should be in line with the other Request/Response objects and also use the |
GarrettBeatty
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing to request changes to make sure class paths are updated
GarrettBeatty
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
going to approve assuming make the namespace file change updates
Description
Motivation and Context
DOTNET-8391
Testing
Ran dry run
Tested locally
Screenshots (if appropriate)
Types of changes
Checklist
License