Skip to content

Commit 4205a57

Browse files
author
gavinhgchen
committed
MultiPutObject opt continue
1 parent 1ef3996 commit 4205a57

File tree

4 files changed

+69
-35
lines changed

4 files changed

+69
-35
lines changed

include/op/file_upload_task.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,12 @@ class FileUploadTask : public Poco::Runnable {
8282
void SetCaLocation(const std::string& ca_location);
8383
void SetSslCtxCb(SSLCtxCallback cb, void *data);
8484

85-
void SetPartCrc64(uint64_t crc64) {
86-
m_part_crc64 = crc64;
85+
void SetCheckCrc64(bool check_crc64) {
86+
mb_check_crc64 = check_crc64;
87+
}
88+
89+
uint64_t GetCrc64Value() const {
90+
return m_crc64_value;
8791
}
8892

8993
private:
@@ -109,7 +113,8 @@ class FileUploadTask : public Poco::Runnable {
109113
SSLCtxCallback m_ssl_ctx_cb;
110114
void *m_user_data;
111115

112-
uint64_t m_part_crc64;
116+
bool mb_check_crc64;
117+
uint64_t m_crc64_value;
113118
};
114119

115120
} // namespace qcloud_cos

include/op/object_op.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,18 @@
2020

2121
namespace qcloud_cos {
2222

23+
class PartBufInfo {
24+
public:
25+
unsigned char* buf;
26+
size_t len;
27+
28+
public:
29+
PartBufInfo() {
30+
buf = nullptr;
31+
len = 0;
32+
}
33+
};
34+
2335
class FileUploadTask;
2436
class FileCopyTask;
2537

@@ -447,7 +459,7 @@ class ObjectOp : public BaseOp {
447459
const std::string& path, unsigned char* file_content_buf,
448460
uint64_t len, uint64_t part_number,
449461
FileUploadTask* task_ptr, bool sign_header_host,
450-
uint64_t crc64);
462+
bool check_crc64);
451463

452464
void FillCopyTask(const std::string& upload_id, const std::string& host,
453465
const std::string& path, uint64_t part_number,

src/op/file_upload_task.cpp

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ FileUploadTask::FileUploadTask(const std::string& full_url,
3434
m_ca_location(ca_location),
3535
m_ssl_ctx_cb(ssl_ctx_cb),
3636
m_user_data(user_data),
37-
m_part_crc64(0) {}
37+
mb_check_crc64(false),
38+
m_crc64_value(0) {}
3839

3940
FileUploadTask::FileUploadTask(
4041
const std::string& full_url,
@@ -61,7 +62,8 @@ FileUploadTask::FileUploadTask(
6162
m_ca_location(ca_location),
6263
m_ssl_ctx_cb(ssl_ctx_cb),
6364
m_user_data(user_data),
64-
m_part_crc64(0) {}
65+
mb_check_crc64(false),
66+
m_crc64_value(0) {}
6567

6668
FileUploadTask::FileUploadTask(
6769
const std::string& full_url,
@@ -88,7 +90,8 @@ FileUploadTask::FileUploadTask(
8890
m_ca_location(ca_location),
8991
m_ssl_ctx_cb(ssl_ctx_cb),
9092
m_user_data(user_data),
91-
m_part_crc64(0) {}
93+
mb_check_crc64(false),
94+
m_crc64_value(0) {}
9295

9396
void FileUploadTask::run() {
9497
m_resp = "";
@@ -158,8 +161,14 @@ void FileUploadTask::SetSslCtxCb(SSLCtxCallback cb, void *data) {
158161

159162
void FileUploadTask::UploadTask() {
160163
std::string md5_str;
164+
// 数据一致性校验采用crc64
165+
if (mb_check_crc64) {
166+
m_crc64_value = 0;
167+
m_crc64_value = CRC64::CalcCRC(m_crc64_value, static_cast<void*>(m_data_buf_ptr), m_data_len);
168+
SDK_LOG_DBG("Part Crc64: %" PRIu64, m_crc64_value);
169+
}
161170
// 没有crc64则默认走md5校验
162-
if (m_part_crc64 == 0) {
171+
else {
163172
#ifdef USE_OPENSSL_MD5
164173
unsigned char digest[MD5_DIGEST_LENGTH];
165174
MD5((const unsigned char *)m_data_buf_ptr, m_data_len, digest);
@@ -174,6 +183,7 @@ void FileUploadTask::UploadTask() {
174183
dos.close();
175184
md5_str = Poco::DigestEngine::digestToHex(md5.digest());
176185
#endif
186+
SDK_LOG_DBG("Part Md5: %s", md5_str.c_str());
177187
}
178188

179189
int loop = 0;
@@ -210,18 +220,19 @@ void FileUploadTask::UploadTask() {
210220
}
211221

212222
// crc64一致性校验
213-
if (m_part_crc64 != 0) {
223+
if (mb_check_crc64) {
214224
std::map<std::string, std::string>::const_iterator c_itr =
215225
m_resp_headers.find(kRespHeaderXCosHashCrc64Ecma);
216226
if (c_itr == m_resp_headers.end() ||
217-
StringUtil::StringToUint64(c_itr->second) != m_part_crc64) {
227+
StringUtil::StringToUint64(c_itr->second) != m_crc64_value) {
218228
SDK_LOG_ERR(
219229
"Response x-cos-hash-crc64ecma is not correct, try again. Expect crc64 is %" PRIu64 ", but "
220230
"return crc64 is %s",
221-
m_part_crc64, c_itr->second.c_str());
231+
m_crc64_value, c_itr->second.c_str());
222232
m_is_task_success = false;
223233
continue;
224234
}
235+
SDK_LOG_DBG("Part Crc64 Check Success.");
225236
} else {
226237
std::map<std::string, std::string>::const_iterator c_itr =
227238
m_resp_headers.find("ETag");
@@ -234,6 +245,7 @@ void FileUploadTask::UploadTask() {
234245
m_is_task_success = false;
235246
continue;
236247
}
248+
SDK_LOG_DBG("Part Md5 Check Success.");
237249
}
238250

239251
m_is_task_success = true;

src/op/object_op.cpp

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,7 @@ CosResult ObjectOp::MultiUploadObject(const PutObjectByFileReq& req,
763763
!comp_resp.GetXCosHashCrc64Ecma().empty()) {
764764
uint64_t crc64_server_resp =
765765
StringUtil::StringToUint64(comp_resp.GetXCosHashCrc64Ecma());
766+
SDK_LOG_DBG("File Crc64: %" PRIu64, crc64_origin);
766767
if (crc64_server_resp != crc64_origin) {
767768
std::string err_msg =
768769
"MultiUploadObject failed, crc64 check failed, crc64_origin: " +
@@ -1762,9 +1763,9 @@ CosResult ObjectOp::MultiThreadUpload(
17621763
return result;
17631764
}
17641765

1765-
unsigned char** file_content_buf = new unsigned char*[pool_size];
1766+
PartBufInfo *part_buf_info = new PartBufInfo[pool_size];
17661767
for (int i = 0; i < pool_size; ++i) {
1767-
file_content_buf[i] = new unsigned char[(size_t)part_size];
1768+
part_buf_info[i].buf = new unsigned char[(size_t)part_size];
17681769
}
17691770

17701771
std::string dest_url = GetRealUrl(host, path, req.IsHttps());
@@ -1796,18 +1797,18 @@ CosResult ObjectOp::MultiThreadUpload(
17961797
}
17971798

17981799
for (; task_index < pool_size; ++task_index) {
1799-
fin.read((char*)file_content_buf[task_index], part_size);
1800+
fin.read((char *)(part_buf_info[task_index].buf), part_size);
18001801
std::streamsize read_len = fin.gcount();
18011802
if (read_len == 0 && fin.eof()) {
18021803
SDK_LOG_DBG("read over, task_index: %d", task_index);
18031804
break;
18041805
}
1806+
part_buf_info[task_index].len = static_cast<size_t>(read_len);
18051807

18061808
SDK_LOG_DBG("upload data, task_index=%d, file_size=%" PRIu64
18071809
", offset=%" PRIu64 ", len=%" PRIu64,
18081810
task_index, file_size, offset, read_len);
18091811

1810-
uint64_t crc64_part = 0;
18111812
// Check the resume
18121813
FileUploadTask* ptask = pptaskArr[task_index];
18131814

@@ -1823,25 +1824,11 @@ CosResult ObjectOp::MultiThreadUpload(
18231824
handler->UpdateProgress(read_len);
18241825
}
18251826
} else {
1826-
// 计算每个part的crc64值
1827-
if (req.CheckPartCrc64()) {
1828-
crc64_part = CRC64::CalcCRC(crc64_part, static_cast<void*>(file_content_buf[task_index]), read_len);
1829-
}
1830-
FillUploadTask(upload_id, host, path, file_content_buf[task_index],
1831-
read_len, part_number, ptask, req.SignHeaderHost(), crc64_part);
1827+
FillUploadTask(upload_id, host, path, part_buf_info[task_index].buf,
1828+
read_len, part_number, ptask, req.SignHeaderHost(), req.CheckPartCrc64());
18321829
tp.start(*ptask);
18331830
}
18341831

1835-
// 根据每个part流式计算整个文件的crc64值
1836-
if (req.CheckCRC64()) {
1837-
// 如果已经计算了part的crc64值,只需要直接流式合并即可
1838-
if (crc64_part != 0) {
1839-
crc64_file = CRC64::CombineCRC(crc64_file, crc64_part, read_len);
1840-
} else {
1841-
crc64_file = CRC64::CalcCRC(crc64_file, static_cast<void*>(file_content_buf[task_index]), read_len);
1842-
}
1843-
}
1844-
18451832
offset += read_len;
18461833
part_numbers_ptr->push_back(part_number);
18471834
++part_number;
@@ -1887,6 +1874,24 @@ CosResult ObjectOp::MultiThreadUpload(
18871874
task_fail_flag = true;
18881875
break;
18891876
}
1877+
1878+
// 根据每个part流式计算整个文件的crc64值
1879+
if (req.CheckCRC64()) {
1880+
// 如果已经计算了part的crc64值,只需要直接流式合并即可
1881+
if (ptask->GetCrc64Value() != 0) {
1882+
crc64_file = CRC64::CombineCRC(crc64_file, ptask->GetCrc64Value(),
1883+
static_cast<uintmax_t>(part_buf_info[task_index].len));
1884+
SDK_LOG_DBG("Combine Crc64: %" PRIu64 ", Part Crc64: %" PRIu64,
1885+
crc64_file, ptask->GetCrc64Value());
1886+
} else {
1887+
// 两种情况都有可能:
1888+
// 1、CheckPartCrc64()为false
1889+
// 2、此part是断点续传已经上传的part
1890+
crc64_file = CRC64::CalcCRC(crc64_file, static_cast<void *>(part_buf_info[task_index].buf),
1891+
part_buf_info[task_index].len);
1892+
SDK_LOG_DBG("Calc Crc64: %" PRIu64, crc64_file)
1893+
}
1894+
}
18901895
}
18911896

18921897
if (task_fail_flag) {
@@ -1907,9 +1912,9 @@ CosResult ObjectOp::MultiThreadUpload(
19071912
delete[] pptaskArr;
19081913

19091914
for (int i = 0; i < pool_size; ++i) {
1910-
delete[] file_content_buf[i];
1915+
delete[] part_buf_info[i].buf;
19111916
}
1912-
delete[] file_content_buf;
1917+
delete[] part_buf_info;
19131918

19141919
return result;
19151920
}
@@ -2094,7 +2099,7 @@ void ObjectOp::FillUploadTask(const std::string& upload_id,
20942099
const std::string& host, const std::string& path,
20952100
unsigned char* file_content_buf, uint64_t len,
20962101
uint64_t part_number, FileUploadTask* task_ptr,
2097-
bool sign_header_host, uint64_t crc64) {
2102+
bool sign_header_host, bool check_crc64) {
20982103
std::map<std::string, std::string> req_params;
20992104
req_params.insert(std::make_pair("uploadId", upload_id));
21002105
req_params.insert(
@@ -2125,7 +2130,7 @@ void ObjectOp::FillUploadTask(const std::string& upload_id,
21252130
task_ptr->AddHeaders(req_headers);
21262131
task_ptr->SetUploadBuf(file_content_buf, len);
21272132
task_ptr->SetPartNumber(part_number);
2128-
task_ptr->SetPartCrc64(crc64);
2133+
task_ptr->SetCheckCrc64(check_crc64);
21292134
}
21302135

21312136
void ObjectOp::FillCopyTask(const std::string& upload_id,

0 commit comments

Comments
 (0)