Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,103 @@ public void TestUploadInPartialServer(
Assert.Equal(UploadStatus.Completed, progress.Status);
}
}
#if NET6_0_OR_GREATER
/// <summary>
/// Verifies that a resumable upload completes successfully and validates crc32c checksum across different chunking configurations
/// and stream types of known size.
/// </summary>
[Theory, CombinatorialData]
public void TestChunkedUpload_CompletesAndVerifiesChecksumInPartialServer(
[CombinatorialValues(true, false)] bool knownSize,
[CombinatorialValues(80, 150)] int partialSize,
[CombinatorialValues(100, 200)] int chunkSize)
{
var actualChunkSize = Math.Min(partialSize, chunkSize);
var expectedCallCount = (UploadLength + actualChunkSize - 1) / actualChunkSize + 1;
using (var server = new MultiChunkPartialServer(_server, partialSize))
using (var service = new MockClientService(server.HttpPrefix))
{
var content = knownSize ? new MemoryStream(UploadTestBytes) : new UnknownSizeMemoryStream(UploadTestBytes);
var uploader = new TestResumableUpload(service, "MultiChunk", "POST", content, "text/plain", actualChunkSize);
var progress = uploader.Upload();
Assert.Equal(expectedCallCount, server.Requests.Count);
var request = server.Requests.Last();

if (knownSize)
{
Assert.Contains(ResumableUpload.GoogleHashHeader, request.Headers.AllKeys);
var headerValue = request.Headers[ResumableUpload.GoogleHashHeader];
var expectedBase64Hash = CalculateCrc32c(UploadTestBytes);
Assert.Equal($"crc32c={expectedBase64Hash}", headerValue);
}
else
{
Assert.DoesNotContain(ResumableUpload.GoogleHashHeader, request.Headers.AllKeys);
}

Assert.Equal(UploadTestBytes, server.Bytes);
Assert.Equal(UploadStatus.Completed, progress.Status);
}
}
Comment thread
mahendra-google marked this conversation as resolved.

/// <summary>
/// Verifies that the session initiation succeeds when a matching CRC32C checksum is provided.
/// </summary>
[Fact]
public async Task TestInitiateSessionWithMatchingCrc32c()
{
var content = new MemoryStream(UploadTestBytes);
using (var server = new MultiChunkServer(_server))
using (var service = new MockClientService(server.HttpPrefix))
{
var chunkSize = 1024;
var expectedBase64Hash = CalculateCrc32c(UploadTestBytes);
var body = new TestResumableUploadWithCrc32c { Crc32c = expectedBase64Hash };
var uploader = new TestResumableUpload(service, "MultiChunk", "POST", content, "text/plain", chunkSize)
{
Body = body
};

await uploader.InitiateSessionAsync();
// Assert that the initiation request was sent
Assert.Single(server.Requests);
var initiationRequest = server.Requests.First();

// Assert that the X-Goog-Hash header was sent with the correct CRC32C
Assert.Contains(ResumableUpload.GoogleHashHeader, initiationRequest.Headers.AllKeys);
Assert.Equal($"crc32c={expectedBase64Hash}", initiationRequest.Headers[ResumableUpload.GoogleHashHeader]);
Assert.True(uploader.Crc32cHeaderSentInInitiation);
}
}

/// <summary>
/// Verifies that the session initiation fails when a mismatched CRC32C checksum is provided.
/// </summary>
[Fact]
public async Task TestInitiateSessionWithMismatchingCrc32c()
{
var content = new MemoryStream(UploadTestBytes);
using (var server = new MultiChunkServer(_server))
using (var service = new MockClientService(server.HttpPrefix))
{
var chunkSize = 1024;
var expectedBase64Hash = CalculateCrc32c(UploadTestBytes);
var body = new TestResumableUploadWithCrc32c { Crc32c = "incorrectHash" };

var uploader = new TestResumableUpload(service, "MultiChunk", "POST", content, "text/plain", chunkSize)
{
Body = body
};

var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => uploader.InitiateSessionAsync());
Assert.Equal($"The calculated CRC32C of the stream ({expectedBase64Hash}) does not match the CRC32C provided in the object metadata ({body.Crc32c}).", exception.Message);

// Assert that no request was sent to the server
Assert.Empty(server.Requests);
Assert.False(uploader.Crc32cHeaderSentInInitiation);
}
}
#endif

/// <summary>
/// Server that expects an initial call with path and query parameters.
Expand Down Expand Up @@ -697,5 +794,18 @@ public void TestUploadWithRequestAndResponseBody(
Assert.Equal(1, reponseReceivedCount);
}
}

#if NET6_0_OR_GREATER
/// <summary>
/// Calculates the CRC32C checksum for the specified data.
/// </summary>
private string CalculateCrc32c(byte[] uploadTestBytes)
{
var crc32 = new System.IO.Hashing.Crc32();
crc32.Append(uploadTestBytes);
byte[] hashBytes = crc32.GetCurrentHash();
return Convert.ToBase64String(hashBytes);
}
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,15 @@ internal TestLogger(ITestOutputHelper outputHelper)
internal void WriteLine(string message, [CallerMemberName] string caller = null) =>
_outputHelper.WriteLine($"Test {_id:0000} at {DateTime.UtcNow:HH:mm:ss.fff}: {caller} - {message}");
}

#if NET6_0_OR_GREATER
/// <summary>
/// Resumable upload class that allows Crc32c to be set for testing.
/// </summary>
private class TestResumableUploadWithCrc32c
{
public string Crc32c { get; set; }
}
#endif
}
}
2 changes: 2 additions & 0 deletions Src/Support/Google.Apis/Google.Apis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ The library supports service requests, media upload and download, etc.
<ProjectReference Include="..\Google.Apis.Core\Google.Apis.Core.csproj" />

<PackageReference Include="ConfigureAwaitChecker.Analyzer" Version="5.0.0.1" PrivateAssets="All" />

<PackageReference Include="System.IO.Hashing" Version="8.0.0" Condition="'$(TargetFramework)' == 'net6.0'" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='net462'">
Expand Down
118 changes: 118 additions & 0 deletions Src/Support/Google.Apis/Upload/ResumableUpload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ public abstract class ResumableUpload
/// The x-goog-api-client header value used for resumable uploads initiated without any options or an HttpClient.
/// </summary>
private static readonly string DefaultGoogleApiClientHeader = new VersionHeaderBuilder().AppendDotNetEnvironment().AppendAssemblyVersion("gdcl", typeof(ResumableUpload)).ToString();

/// <summary>
/// The header name used to specify the crc32c hash of the stream for validation.
/// </summary>
internal const string GoogleHashHeader = "X-Goog-Hash";

#if NET6_0_OR_GREATER
/// <summary>
/// The size of the buffer (in bytes) used when reading the stream to calculate the CRC32C checksum.
/// Set to 80 KB (81920 bytes) to balance memory usage and I/O performance.
/// </summary>
private const int Crc32cCalculationBufferSize = 81920;
#endif
#endregion // Constants

#region Construction
Expand Down Expand Up @@ -142,6 +155,12 @@ public InitiatedResumableUpload(Uri uploadUri, Stream contentStream, ResumableUp
/// </summary>
internal ConfigurableHttpClient HttpClient { get; }

/// <summary>
/// Gets or sets a value indicating whether the CRC32C checksum header was included
/// during the initiation of the resumable upload session.
/// </summary>
public bool Crc32cHeaderSentInInitiation { get; internal set; }

/// <summary>Gets or sets the stream to upload.</summary>
public Stream ContentStream { get; }

Expand Down Expand Up @@ -641,6 +660,24 @@ protected async Task<bool> SendNextChunkAsync(Stream stream, CancellationToken c
BytesClientSent = BytesServerReceived + contentLength;
Logger.Debug("MediaUpload[{0}] - Sending bytes={1}-{2}", UploadUri, BytesServerReceived,
BytesClientSent - 1);
bool isFinalChunk = BytesClientSent == StreamLength;
if (isFinalChunk && !Crc32cHeaderSentInInitiation)
{
string trailingCrc32c = await CalculateCrc32cAsync(ContentStream, cancellationToken).ConfigureAwait(false);
if (trailingCrc32c != null)
{
request.Headers.TryAddWithoutValidation(GoogleHashHeader, $"crc32c={trailingCrc32c}");
}
}
else
{
// Explicitly ensure the header is NOT present for non-final chunks
// (Just in case it was added by a default modifier elsewhere)
if (request.Headers.Contains(GoogleHashHeader))
{
request.Headers.Remove(GoogleHashHeader);
}
}

// We can't assume that the server actually received all the data. It almost always does,
// but just occasionally it'll return a 308 that makes us resend a chunk. We need to
Expand All @@ -664,6 +701,54 @@ protected async Task<bool> SendNextChunkAsync(Stream stream, CancellationToken c
return completed;
}

#if NET6_0_OR_GREATER
/// <summary>
/// Calculates the CRC32C hash of the entire stream and returns it as a Base64 string.
/// </summary>
internal async Task<string> CalculateCrc32cAsync(Stream stream, CancellationToken cancellationToken)
{
if (stream == null || !stream.CanSeek)
{
return null;
}

long originalPosition = stream.Position;
try
{
stream.Position = 0;

var crc32c = new System.IO.Hashing.Crc32();
byte[] buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(Crc32cCalculationBufferSize);
try
{
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) > 0)
{
crc32c.Append(new ReadOnlySpan<byte>(buffer, 0, bytesRead));
}
}
finally
{
System.Buffers.ArrayPool<byte>.Shared.Return(buffer);
}

byte[] hashBytes = crc32c.GetCurrentHash();
return Convert.ToBase64String(hashBytes);
}
finally
{
// Restore the stream position. Important so the upload continues correctly.
stream.Position = originalPosition;
}
}
#else
/// <summary>
/// Calculates the CRC32C hash of the entire stream and returns it as a Base64 string.
/// </summary>
internal Task<string> CalculateCrc32cAsync(Stream stream, CancellationToken cancellationToken) =>
Task.FromResult<string>(null);
#endif

/// <summary>Handles a media upload HTTP response.</summary>
/// <returns><c>True</c> if the entire media has been completely uploaded.</returns>
private async Task<bool> HandleResponse(HttpResponseMessage response)
Expand Down Expand Up @@ -1063,6 +1148,10 @@ public class ResumableUpload<TRequest> : ResumableUpload
/// <summary>The uploadType parameter value for resumable uploads.</summary>
private const string Resumable = "resumable";

/// <summary>
/// The identifier string for the CRC32C hash.</summary>
private const string Crc32c = "Crc32c";

#endregion // Constants

#region Construction
Expand Down Expand Up @@ -1133,6 +1222,29 @@ protected ResumableUpload(IClientService service, string path, string httpMethod
public override async Task<Uri> InitiateSessionAsync(CancellationToken cancellationToken = default(CancellationToken))
{
HttpRequestMessage request = CreateInitializeRequest();
Crc32cHeaderSentInInitiation = false;
if (Body != null)
{
var metadataHash = GetPropertyFromGenericBody(Body, Crc32c);
if (!string.IsNullOrEmpty(metadataHash))
{
string calculatedHash = await CalculateCrc32cAsync(ContentStream, cancellationToken).ConfigureAwait(false);
if (calculatedHash != null)
{
if (metadataHash != calculatedHash)
{
throw new InvalidOperationException(
$"The calculated CRC32C of the stream ({calculatedHash}) does not match the CRC32C provided in the object metadata ({metadataHash}).");
}
request.Headers.TryAddWithoutValidation(GoogleHashHeader, $"crc32c={calculatedHash}");
Crc32cHeaderSentInInitiation = true;
}
else
{
ApplicationContext.Logger.ForType<ResumableUpload<TRequest>>().Warning("A CRC32C hash was provided in the request body, but the stream is not seekable. The hash will not be validated on the client, and will not be sent to the server.");
}
}
}
Options?.ModifySessionInitiationRequest?.Invoke(request);
var response = await Service.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -1211,6 +1323,12 @@ private void SetAllPropertyValues(RequestBuilder requestBuilder)
}
}

/// <summary>
/// Retrieves the value of a specific property from a generic object using reflection.
/// </summary>
internal string GetPropertyFromGenericBody(TRequest body, string propertyName) =>
body?.GetType().GetProperty(propertyName)?.GetValue(body) as string;

#endregion Upload Implementation
}

Expand Down
Loading