Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/aws-cpp-sdk-transfer/include/aws/transfer/TransferHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ namespace Aws
*/
inline const Aws::String& GetTargetFilePath() const { return m_fileName; }

/**
* (Download only) Temporary file path used during download before atomic rename to target.
*/
inline const Aws::String& GetTempFilePath() const { return m_tempFilePath; }
inline void SetTempFilePath(const Aws::String& tempPath) { m_tempFilePath = tempPath; }

/**
* (Download only) version id of the object to retrieve; if not specified in constructor, then latest is used
*/
Expand Down Expand Up @@ -366,6 +372,11 @@ namespace Aws
void WaitUntilFinished() const;

const CreateDownloadStreamCallback& GetCreateDownloadStreamFunction() const { return m_createDownloadStreamFn; }
void SetCreateDownloadStreamFunction(const CreateDownloadStreamCallback& fn) {
std::lock_guard<std::mutex> lock(m_downloadStreamLock);
m_createDownloadStreamFn = fn;
m_downloadStream = nullptr;
}

/**
* Write @partStream to the configured output (f)stream.
Expand Down Expand Up @@ -409,6 +420,7 @@ namespace Aws
Aws::String m_bucket;
Aws::String m_key;
Aws::String m_fileName;
Aws::String m_tempFilePath;
Aws::String m_contentType;
Aws::String m_versionId;
Aws::String m_etag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ namespace Aws
*/
void SetChecksumForAlgorithm(const std::shared_ptr<PartState>& state, Aws::S3::Model::CompletedPart& part);

/**
* Validates downloaded file checksum against expected checksum from HeadObject.
* @param handle The transfer handle containing checksum and file path.
* @return Success outcome if validation passes or no checksum to validate, error outcome if validation fails.
*/
Aws::Utils::Outcome<Aws::NoResult, Aws::Client::AWSError<Aws::S3::S3Errors>> ValidateDownloadChecksum(const std::shared_ptr<TransferHandle>& handle);

static Aws::String DetermineFilePath(const Aws::String& directory, const Aws::String& prefix, const Aws::String& keyName);

Aws::Utils::ExclusiveOwnershipResourceManager<unsigned char*> m_bufferManager;
Expand Down
157 changes: 152 additions & 5 deletions src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ namespace Aws
return (path.find_last_of('/') == path.size() - 1 || path.find_last_of('\\') == path.size() - 1);
}

static Aws::String GenerateTempFilePath(const Aws::String& targetPath)
{
static const char* HEX_CHARS = "0123456789abcdef";
char uniqueId[9] = {0};
for (int i = 0; i < 8; ++i) {
uniqueId[i] = HEX_CHARS[rand() % 16];
}

size_t lastSlash = targetPath.find_last_of("/\\");
if (lastSlash != Aws::String::npos) {
return targetPath.substr(0, lastSlash + 1) + ".s3tmp." + uniqueId;
}
return ".s3tmp." + Aws::String(uniqueId);
}

template <typename RequestT>
static void SetChecksumOnRequest(RequestT& request, S3::Model::ChecksumAlgorithm checksumAlgorithm, const Aws::String& checksum) {
if (checksumAlgorithm == S3::Model::ChecksumAlgorithm::CRC64NVME) {
Expand Down Expand Up @@ -231,15 +246,19 @@ namespace Aws
const DownloadConfiguration& downloadConfig,
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context)
{
Aws::String tempFilePath = GenerateTempFilePath(writeToFile);

#ifdef _MSC_VER
auto createFileFn = [=]() { return Aws::New<Aws::FStream>(CLASS_TAG, Aws::Utils::StringUtils::ToWString(writeToFile.c_str()).c_str(),
auto createFileFn = [=]() { return Aws::New<Aws::FStream>(CLASS_TAG, Aws::Utils::StringUtils::ToWString(tempFilePath.c_str()).c_str(),
std::ios_base::out | std::ios_base::in | std::ios_base::binary | std::ios_base::trunc);};
#else
auto createFileFn = [=]() { return Aws::New<Aws::FStream>(CLASS_TAG, writeToFile.c_str(),
auto createFileFn = [=]() { return Aws::New<Aws::FStream>(CLASS_TAG, tempFilePath.c_str(),
std::ios_base::out | std::ios_base::in | std::ios_base::binary | std::ios_base::trunc);};
#endif

return DownloadFile(bucketName, keyName, createFileFn, downloadConfig, writeToFile, context);
auto handle = DownloadFile(bucketName, keyName, createFileFn, downloadConfig, writeToFile, context);
handle->SetTempFilePath(tempFilePath);
return handle;
}

std::shared_ptr<TransferHandle> TransferManager::RetryUpload(const Aws::String& fileName, const std::shared_ptr<TransferHandle>& retryHandle)
Expand Down Expand Up @@ -938,6 +957,17 @@ namespace Aws
handle->SetContentType(getObjectOutcome.GetResult().GetContentType());
handle->ChangePartToCompleted(partState, getObjectOutcome.GetResult().GetETag());
getObjectOutcome.GetResult().GetBody().flush();

auto checksumOutcome = ValidateDownloadChecksum(handle);
if (!checksumOutcome.IsSuccess()) {
handle->ChangePartToFailed(partState);
handle->UpdateStatus(TransferStatus::FAILED);
handle->SetError(checksumOutcome.GetError());
TriggerErrorCallback(handle, checksumOutcome.GetError());
TriggerTransferStatusUpdatedCallback(handle);
return;
}

handle->UpdateStatus(TransferStatus::COMPLETED);
}
else
Expand All @@ -946,7 +976,12 @@ namespace Aws
<< "] Failed to download object to Bucket: [" << handle->GetBucketName() << "] with Key: ["
<< handle->GetKey() << "] " << getObjectOutcome.GetError());
handle->ChangePartToFailed(partState);
handle->UpdateStatus(DetermineIfFailedOrCanceled(*handle));
auto finalStatus = DetermineIfFailedOrCanceled(*handle);
handle->UpdateStatus(finalStatus);

if (finalStatus == TransferStatus::FAILED && !handle->GetTempFilePath().empty()) {
Aws::FileSystem::RemoveFileIfExists(handle->GetTempFilePath().c_str());
}
handle->SetError(getObjectOutcome.GetError());

TriggerErrorCallback(handle, getObjectOutcome.GetError());
Expand Down Expand Up @@ -1000,6 +1035,21 @@ namespace Aws
handle->SetContentType(headObjectOutcome.GetResult().GetContentType());
handle->SetMetadata(headObjectOutcome.GetResult().GetMetadata());
handle->SetEtag(headObjectOutcome.GetResult().GetETag());

if (headObjectOutcome.GetResult().GetChecksumType() == S3::Model::ChecksumType::FULL_OBJECT) {
if (!headObjectOutcome.GetResult().GetChecksumCRC32().empty()) {
handle->SetChecksum(headObjectOutcome.GetResult().GetChecksumCRC32());
} else if (!headObjectOutcome.GetResult().GetChecksumCRC32C().empty()) {
handle->SetChecksum(headObjectOutcome.GetResult().GetChecksumCRC32C());
} else if (!headObjectOutcome.GetResult().GetChecksumSHA256().empty()) {
handle->SetChecksum(headObjectOutcome.GetResult().GetChecksumSHA256());
} else if (!headObjectOutcome.GetResult().GetChecksumSHA1().empty()) {
handle->SetChecksum(headObjectOutcome.GetResult().GetChecksumSHA1());
} else if (!headObjectOutcome.GetResult().GetChecksumCRC64NVME().empty()) {
handle->SetChecksum(headObjectOutcome.GetResult().GetChecksumCRC64NVME());
}
}

/* When bucket versioning is suspended, head object will return "null" for unversioned object.
* Send following GetObject with "null" as versionId will result in 403 access denied error if your IAM role or policy
* doesn't have GetObjectVersion permission.
Expand Down Expand Up @@ -1240,11 +1290,27 @@ namespace Aws
if (failedParts.size() == 0 && handle->GetBytesTransferred() == handle->GetBytesTotalSize())
{
outcome.GetResult().GetBody().flush();

auto checksumOutcome = ValidateDownloadChecksum(handle);
if (!checksumOutcome.IsSuccess()) {
handle->UpdateStatus(TransferStatus::FAILED);
handle->SetError(checksumOutcome.GetError());
TriggerErrorCallback(handle, checksumOutcome.GetError());
TriggerTransferStatusUpdatedCallback(handle);
partState->SetDownloadPartStream(nullptr);
return;
}

handle->UpdateStatus(TransferStatus::COMPLETED);
}
else
{
handle->UpdateStatus(DetermineIfFailedOrCanceled(*handle));
auto finalStatus = DetermineIfFailedOrCanceled(*handle);
handle->UpdateStatus(finalStatus);

if (finalStatus == TransferStatus::FAILED && !handle->GetTempFilePath().empty()) {
Aws::FileSystem::RemoveFileIfExists(handle->GetTempFilePath().c_str());
}
}
TriggerTransferStatusUpdatedCallback(handle);
}
Expand Down Expand Up @@ -1594,5 +1660,86 @@ namespace Aws
partFunc->second(part, state->GetChecksum());
}
}

Aws::Utils::Outcome<Aws::NoResult, Aws::Client::AWSError<Aws::S3::S3Errors>> TransferManager::ValidateDownloadChecksum(const std::shared_ptr<TransferHandle>& handle) {
if (handle->GetChecksum().empty()) {
if (!handle->GetTempFilePath().empty() && !handle->GetTargetFilePath().empty()) {
if (!Aws::FileSystem::RelocateFileOrDirectory(handle->GetTempFilePath().c_str(), handle->GetTargetFilePath().c_str())) {
AWS_LOGSTREAM_ERROR(CLASS_TAG, "Failed to rename temp file from " << handle->GetTempFilePath() << " to " << handle->GetTargetFilePath());
Aws::FileSystem::RemoveFileIfExists(handle->GetTempFilePath().c_str());
return Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::INTERNAL_FAILURE, "FileRenameError", "Failed to rename temporary file to target", false);
}
}
return Aws::NoResult();
}

Aws::String fileToValidate = handle->GetTempFilePath().empty() ? handle->GetTargetFilePath() : handle->GetTempFilePath();
if (fileToValidate.empty()) {
return Aws::NoResult();
}

std::shared_ptr<Aws::Utils::Crypto::Hash> hashCalculator;
if (m_transferConfig.checksumAlgorithm == S3::Model::ChecksumAlgorithm::CRC32) {
hashCalculator = Aws::MakeShared<Aws::Utils::Crypto::CRC32>("TransferManager");
} else if (m_transferConfig.checksumAlgorithm == S3::Model::ChecksumAlgorithm::CRC32C) {
hashCalculator = Aws::MakeShared<Aws::Utils::Crypto::CRC32C>("TransferManager");
} else if (m_transferConfig.checksumAlgorithm == S3::Model::ChecksumAlgorithm::SHA1) {
hashCalculator = Aws::MakeShared<Aws::Utils::Crypto::Sha1>("TransferManager");
} else if (m_transferConfig.checksumAlgorithm == S3::Model::ChecksumAlgorithm::SHA256) {
hashCalculator = Aws::MakeShared<Aws::Utils::Crypto::Sha256>("TransferManager");
} else {
hashCalculator = Aws::MakeShared<Aws::Utils::Crypto::CRC64>("TransferManager");
}

auto fileStream = Aws::MakeShared<Aws::FStream>("TransferManager",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are opening the file and re-reading the entire stream buffering and then failing the download if the file is different. this gives me two concerns:

  • a file that is corrupted is written to disc

Failure handling: The S3 Transfer Manager SHOULD download to a file in the same directory with .s3tmp.{uniqueIdentifier} (uniqueIdentifier <= 8 chars) suffix to indicate the file is in the process of being downloaded, but not yet complete. After all content has been written, rename the temporary file to the destination file using atomic rename operation. If there is an error, delete the temporary file. The S3 Transfer Manager SHOULD register cleanup handlers to remove the temporary file if the process terminates unexpectedly, if supported. The S3 Transfer Manager MUST NOT pre-validate available space because it can introduce race conditions and false failures.

This concerns me that this would cause an issue where we would leave a file that has failing data intergrity.

  • we are reading the stream twice.

on files that are massive think 5TB, this can introduce overhead. i think this is covered in the spec with

If full object checksum is calculated inline while downloading source, the S3 Transfer Manager MAY buffer full parts in-memory while flushing is blocked on updating the running checksum object.

this really means that while we write out to the file we want to be updating the checksum

fileToValidate.c_str(),
std::ios_base::in | std::ios_base::binary);

if (!fileStream->good()) {
AWS_LOGSTREAM_ERROR(CLASS_TAG, "Transfer handle [" << handle->GetId()
<< "] Failed to open downloaded file for checksum validation: " << fileToValidate);
if (!handle->GetTempFilePath().empty()) {
Aws::FileSystem::RemoveFileIfExists(handle->GetTempFilePath().c_str());
}
return Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::INTERNAL_FAILURE, "FileOpenError", "Failed to open downloaded file for checksum validation", false);
}

unsigned char buffer[8192];
while (fileStream->read(reinterpret_cast<char*>(buffer), sizeof(buffer)) || fileStream->gcount() > 0) {
hashCalculator->Update(buffer, static_cast<size_t>(fileStream->gcount()));
}

auto hashResult = hashCalculator->GetHash();
if (!hashResult.IsSuccess()) {
AWS_LOGSTREAM_ERROR(CLASS_TAG, "Transfer handle [" << handle->GetId()
<< "] Failed to calculate checksum for downloaded file");
if (!handle->GetTempFilePath().empty()) {
Aws::FileSystem::RemoveFileIfExists(handle->GetTempFilePath().c_str());
}
return Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::INTERNAL_FAILURE, "ChecksumCalculationError", "Failed to calculate checksum for downloaded file", false);
}

Aws::String calculatedChecksum = Aws::Utils::HashingUtils::Base64Encode(hashResult.GetResult());
if (calculatedChecksum != handle->GetChecksum()) {
AWS_LOGSTREAM_ERROR(CLASS_TAG, "Transfer handle [" << handle->GetId()
<< "] Checksum validation failed. Expected: " << handle->GetChecksum()
<< ", Calculated: " << calculatedChecksum);
if (!handle->GetTempFilePath().empty()) {
Aws::FileSystem::RemoveFileIfExists(handle->GetTempFilePath().c_str());
}
return Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::INTERNAL_FAILURE, "ChecksumMismatch", "Downloaded file checksum does not match expected checksum", false);
}

// Validation succeeded, rename temp to target
if (!handle->GetTempFilePath().empty() && !handle->GetTargetFilePath().empty()) {
if (!Aws::FileSystem::RelocateFileOrDirectory(handle->GetTempFilePath().c_str(), handle->GetTargetFilePath().c_str())) {
AWS_LOGSTREAM_ERROR(CLASS_TAG, "Failed to rename temp file from " << handle->GetTempFilePath() << " to " << handle->GetTargetFilePath());
Aws::FileSystem::RemoveFileIfExists(handle->GetTempFilePath().c_str());
return Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::INTERNAL_FAILURE, "FileRenameError", "Failed to rename temporary file to target", false);
}
}

return Aws::NoResult();
}
}
}
Loading