Skip to content

Commit 18e2b98

Browse files
authored
Merge pull request #185 from tencentyun/feature_gavinhgchen_c0c83cc2
Add SetCheckPartCrc64() for MultiPutObjectReq
2 parents 6ba765e + 783aacc commit 18e2b98

File tree

6 files changed

+130
-42
lines changed

6 files changed

+130
-42
lines changed

include/op/file_upload_task.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ class FileUploadTask : public Poco::Runnable {
8282
void SetCaLocation(const std::string& ca_location);
8383
void SetSslCtxCb(SSLCtxCallback cb, void *data);
8484

85+
void SetCheckCrc64(bool check_crc64) {
86+
mb_check_crc64 = check_crc64;
87+
}
88+
89+
uint64_t GetCrc64Value() const {
90+
return m_crc64_value;
91+
}
92+
8593
private:
8694
std::string m_full_url;
8795
std::map<std::string, std::string> m_headers;
@@ -104,6 +112,9 @@ class FileUploadTask : public Poco::Runnable {
104112
std::string m_ca_location;
105113
SSLCtxCallback m_ssl_ctx_cb;
106114
void *m_user_data;
115+
116+
bool mb_check_crc64;
117+
uint64_t m_crc64_value;
107118
};
108119

109120
} // namespace qcloud_cos

include/op/object_op.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,18 @@
2020

2121
namespace qcloud_cos {
2222

23+
class PartBufInfo {
24+
public:
25+
unsigned char* buf;
26+
size_t len;
27+
28+
public:
29+
PartBufInfo() {
30+
buf = nullptr;
31+
len = 0;
32+
}
33+
};
34+
2335
class FileUploadTask;
2436
class FileCopyTask;
2537

@@ -430,6 +442,7 @@ class ObjectOp : public BaseOp {
430442
const std::vector<std::string>& already_exist_parts,
431443
bool resume_flag, std::vector<std::string>* etags_ptr,
432444
std::vector<uint64_t>* part_numbers_ptr,
445+
uint64_t& crc64_file,
433446
const SharedTransferHandler& handler = nullptr,
434447
bool change_backup_domain = false);
435448

@@ -445,7 +458,8 @@ class ObjectOp : public BaseOp {
445458
void FillUploadTask(const std::string& upload_id, const std::string& host,
446459
const std::string& path, unsigned char* file_content_buf,
447460
uint64_t len, uint64_t part_number,
448-
FileUploadTask* task_ptr, bool sign_header_host);
461+
FileUploadTask* task_ptr, bool sign_header_host,
462+
bool check_crc64);
449463

450464
void FillCopyTask(const std::string& upload_id, const std::string& host,
451465
const std::string& path, uint64_t part_number,

include/request/object_req.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ class PutObjectByFileReq : public PutObjectReq {
375375
#if defined(_WIN32)
376376
mb_is_widechar_path = false;
377377
#endif
378+
mb_check_part_crc64 = false;
378379
}
379380

380381
virtual ~PutObjectByFileReq() {}
@@ -392,6 +393,15 @@ class PutObjectByFileReq : public PutObjectReq {
392393
m_local_file_path = local_file_path;
393394
}
394395

396+
// MultiPutObjectReq: use crc64 instead of md5 for consistency check
397+
void SetCheckPartCrc64(bool part_crc64) {
398+
mb_check_part_crc64 = part_crc64;
399+
}
400+
401+
bool CheckPartCrc64() const {
402+
return mb_check_part_crc64;
403+
}
404+
395405
std::string GetLocalFilePath() const { return m_local_file_path; }
396406

397407
#if defined(_WIN32)
@@ -411,6 +421,7 @@ class PutObjectByFileReq : public PutObjectReq {
411421
#if defined(_WIN32)
412422
bool mb_is_widechar_path; // 标识文件路径是否为宽字符
413423
#endif
424+
bool mb_check_part_crc64;
414425
};
415426

416427
class DeleteObjectReq : public ObjectReq {

src/op/file_upload_task.cpp

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "util/http_sender.h"
77
#include "util/string_util.h"
88
#include "util/codec_util.h"
9+
#include "util/crc64.h"
910
#ifdef USE_OPENSSL_MD5
1011
#include <openssl/md5.h>
1112
#endif
@@ -32,7 +33,9 @@ FileUploadTask::FileUploadTask(const std::string& full_url,
3233
m_verify_cert(verify_cert),
3334
m_ca_location(ca_location),
3435
m_ssl_ctx_cb(ssl_ctx_cb),
35-
m_user_data(user_data) {}
36+
m_user_data(user_data),
37+
mb_check_crc64(false),
38+
m_crc64_value(0) {}
3639

3740
FileUploadTask::FileUploadTask(
3841
const std::string& full_url,
@@ -58,7 +61,9 @@ FileUploadTask::FileUploadTask(
5861
m_verify_cert(verify_cert),
5962
m_ca_location(ca_location),
6063
m_ssl_ctx_cb(ssl_ctx_cb),
61-
m_user_data(user_data) {}
64+
m_user_data(user_data),
65+
mb_check_crc64(false),
66+
m_crc64_value(0) {}
6267

6368
FileUploadTask::FileUploadTask(
6469
const std::string& full_url,
@@ -84,7 +89,9 @@ FileUploadTask::FileUploadTask(
8489
m_verify_cert(verify_cert),
8590
m_ca_location(ca_location),
8691
m_ssl_ctx_cb(ssl_ctx_cb),
87-
m_user_data(user_data) {}
92+
m_user_data(user_data),
93+
mb_check_crc64(false),
94+
m_crc64_value(0) {}
8895

8996
void FileUploadTask::run() {
9097
m_resp = "";
@@ -154,12 +161,19 @@ void FileUploadTask::SetSslCtxCb(SSLCtxCallback cb, void *data) {
154161

155162
void FileUploadTask::UploadTask() {
156163
std::string md5_str;
157-
#ifdef USE_OPENSSL_MD5
158-
unsigned char digest[MD5_DIGEST_LENGTH];
159-
MD5((const unsigned char *)m_data_buf_ptr, m_data_len, digest);
160-
md5_str = CodecUtil::DigestToHex(digest, MD5_DIGEST_LENGTH);
161-
#else
162-
{
164+
// 数据一致性校验采用crc64
165+
if (mb_check_crc64) {
166+
m_crc64_value = 0;
167+
m_crc64_value = CRC64::CalcCRC(m_crc64_value, static_cast<void*>(m_data_buf_ptr), m_data_len);
168+
SDK_LOG_DBG("Part Crc64: %" PRIu64, m_crc64_value);
169+
}
170+
// 没有crc64则默认走md5校验
171+
else {
172+
#ifdef USE_OPENSSL_MD5
173+
unsigned char digest[MD5_DIGEST_LENGTH];
174+
MD5((const unsigned char *)m_data_buf_ptr, m_data_len, digest);
175+
md5_str = CodecUtil::DigestToHex(digest, MD5_DIGEST_LENGTH);
176+
#else
163177
// 计算上传的md5
164178
Poco::MD5Engine md5;
165179
std::string body((const char*)m_data_buf_ptr, m_data_len);
@@ -168,8 +182,9 @@ void FileUploadTask::UploadTask() {
168182
Poco::StreamCopier::copyStream(istr, dos);
169183
dos.close();
170184
md5_str = Poco::DigestEngine::digestToHex(md5.digest());
185+
#endif
186+
SDK_LOG_DBG("Part Md5: %s", md5_str.c_str());
171187
}
172-
#endif
173188

174189
int loop = 0;
175190
do {
@@ -204,16 +219,33 @@ void FileUploadTask::UploadTask() {
204219
continue;
205220
}
206221

207-
std::map<std::string, std::string>::const_iterator c_itr =
208-
m_resp_headers.find("ETag");
209-
if (c_itr == m_resp_headers.end() ||
210-
StringUtil::Trim(c_itr->second, "\"") != md5_str) {
211-
SDK_LOG_ERR(
212-
"Response etag is not correct, try again. Expect md5 is %s, but "
213-
"return etag is %s.",
214-
md5_str.c_str(), StringUtil::Trim(c_itr->second, "\"").c_str());
215-
m_is_task_success = false;
216-
continue;
222+
// crc64一致性校验
223+
if (mb_check_crc64) {
224+
std::map<std::string, std::string>::const_iterator c_itr =
225+
m_resp_headers.find(kRespHeaderXCosHashCrc64Ecma);
226+
if (c_itr == m_resp_headers.end() ||
227+
StringUtil::StringToUint64(c_itr->second) != m_crc64_value) {
228+
SDK_LOG_ERR(
229+
"Response x-cos-hash-crc64ecma is not correct, try again. Expect crc64 is %" PRIu64 ", but "
230+
"return crc64 is %s",
231+
m_crc64_value, c_itr->second.c_str());
232+
m_is_task_success = false;
233+
continue;
234+
}
235+
SDK_LOG_DBG("Part Crc64 Check Success.");
236+
} else {
237+
std::map<std::string, std::string>::const_iterator c_itr =
238+
m_resp_headers.find("ETag");
239+
if (c_itr == m_resp_headers.end() ||
240+
StringUtil::Trim(c_itr->second, "\"") != md5_str) {
241+
SDK_LOG_ERR(
242+
"Response etag is not correct, try again. Expect md5 is %s, but "
243+
"return etag is %s.",
244+
md5_str.c_str(), StringUtil::Trim(c_itr->second, "\"").c_str());
245+
m_is_task_success = false;
246+
continue;
247+
}
248+
SDK_LOG_DBG("Part Md5 Check Success.");
217249
}
218250

219251
m_is_task_success = true;

src/op/object_op.cpp

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -704,9 +704,11 @@ CosResult ObjectOp::MultiUploadObject(const PutObjectByFileReq& req,
704704
}
705705

706706
CosResult upload_result;
707+
708+
uint64_t crc64_origin = 0;
707709
upload_result =
708710
MultiThreadUpload(req, resume_uploadid, already_exist_parts, resume_flag,
709-
&etags, &part_numbers, handler, change_backup_domain);
711+
&etags, &part_numbers, crc64_origin, handler, change_backup_domain);
710712
// Cancel way
711713
if (handler && !handler->ShouldContinue()) {
712714
SetResultAndLogError(upload_result, "Request canceled by user");
@@ -759,18 +761,9 @@ CosResult ObjectOp::MultiUploadObject(const PutObjectByFileReq& req,
759761
// check crc64 if needed
760762
if (req.CheckCRC64() && comp_result.IsSucc() &&
761763
!comp_resp.GetXCosHashCrc64Ecma().empty()) {
762-
uint64_t crc64_origin = 0;
763-
#if defined(_WIN32)
764-
if (req.IsWideCharPath()) {
765-
crc64_origin = FileUtil::GetFileCrc64(req.GetWideCharLocalFilePath());
766-
} else {
767-
crc64_origin = FileUtil::GetFileCrc64(req.GetLocalFilePath());
768-
}
769-
#else
770-
crc64_origin = FileUtil::GetFileCrc64(req.GetLocalFilePath());
771-
#endif
772764
uint64_t crc64_server_resp =
773765
StringUtil::StringToUint64(comp_resp.GetXCosHashCrc64Ecma());
766+
SDK_LOG_DBG("File Crc64: %" PRIu64, crc64_origin);
774767
if (crc64_server_resp != crc64_origin) {
775768
std::string err_msg =
776769
"MultiUploadObject failed, crc64 check failed, crc64_origin: " +
@@ -1704,6 +1697,7 @@ CosResult ObjectOp::MultiThreadUpload(
17041697
const std::vector<std::string>& already_exist_parts, bool resume_flag,
17051698
std::vector<std::string>* etags_ptr,
17061699
std::vector<uint64_t>* part_numbers_ptr,
1700+
uint64_t& crc64_file,
17071701
const SharedTransferHandler& handler,
17081702
bool change_backup_domain) {
17091703
CosResult result;
@@ -1769,9 +1763,9 @@ CosResult ObjectOp::MultiThreadUpload(
17691763
return result;
17701764
}
17711765

1772-
unsigned char** file_content_buf = new unsigned char*[pool_size];
1766+
PartBufInfo *part_buf_info = new PartBufInfo[pool_size];
17731767
for (int i = 0; i < pool_size; ++i) {
1774-
file_content_buf[i] = new unsigned char[(size_t)part_size];
1768+
part_buf_info[i].buf = new unsigned char[(size_t)part_size];
17751769
}
17761770

17771771
std::string dest_url = GetRealUrl(host, path, req.IsHttps());
@@ -1790,6 +1784,7 @@ CosResult ObjectOp::MultiThreadUpload(
17901784

17911785
Poco::ThreadPool tp(pool_size);
17921786

1787+
crc64_file = 0;
17931788
// 3. 多线程upload
17941789
{
17951790
uint64_t part_number = 1;
@@ -1802,12 +1797,13 @@ CosResult ObjectOp::MultiThreadUpload(
18021797
}
18031798

18041799
for (; task_index < pool_size; ++task_index) {
1805-
fin.read((char*)file_content_buf[task_index], part_size);
1800+
fin.read((char *)(part_buf_info[task_index].buf), part_size);
18061801
std::streamsize read_len = fin.gcount();
18071802
if (read_len == 0 && fin.eof()) {
18081803
SDK_LOG_DBG("read over, task_index: %d", task_index);
18091804
break;
18101805
}
1806+
part_buf_info[task_index].len = static_cast<size_t>(read_len);
18111807

18121808
SDK_LOG_DBG("upload data, task_index=%d, file_size=%" PRIu64
18131809
", offset=%" PRIu64 ", len=%" PRIu64,
@@ -1828,8 +1824,8 @@ CosResult ObjectOp::MultiThreadUpload(
18281824
handler->UpdateProgress(read_len);
18291825
}
18301826
} else {
1831-
FillUploadTask(upload_id, host, path, file_content_buf[task_index],
1832-
read_len, part_number, ptask, req.SignHeaderHost());
1827+
FillUploadTask(upload_id, host, path, part_buf_info[task_index].buf,
1828+
read_len, part_number, ptask, req.SignHeaderHost(), req.CheckPartCrc64());
18331829
tp.start(*ptask);
18341830
}
18351831

@@ -1878,6 +1874,24 @@ CosResult ObjectOp::MultiThreadUpload(
18781874
task_fail_flag = true;
18791875
break;
18801876
}
1877+
1878+
// 根据每个part流式计算整个文件的crc64值
1879+
if (req.CheckCRC64()) {
1880+
// 如果已经计算了part的crc64值,只需要直接流式合并即可
1881+
if (ptask->GetCrc64Value() != 0) {
1882+
crc64_file = CRC64::CombineCRC(crc64_file, ptask->GetCrc64Value(),
1883+
static_cast<uintmax_t>(part_buf_info[task_index].len));
1884+
SDK_LOG_DBG("Combine Crc64: %" PRIu64 ", Part Crc64: %" PRIu64,
1885+
crc64_file, ptask->GetCrc64Value());
1886+
} else {
1887+
// 两种情况都有可能:
1888+
// 1、CheckPartCrc64()为false
1889+
// 2、此part是断点续传已经上传的part
1890+
crc64_file = CRC64::CalcCRC(crc64_file, static_cast<void *>(part_buf_info[task_index].buf),
1891+
part_buf_info[task_index].len);
1892+
SDK_LOG_DBG("Calc Crc64: %" PRIu64, crc64_file)
1893+
}
1894+
}
18811895
}
18821896

18831897
if (task_fail_flag) {
@@ -1898,9 +1912,9 @@ CosResult ObjectOp::MultiThreadUpload(
18981912
delete[] pptaskArr;
18991913

19001914
for (int i = 0; i < pool_size; ++i) {
1901-
delete[] file_content_buf[i];
1915+
delete[] part_buf_info[i].buf;
19021916
}
1903-
delete[] file_content_buf;
1917+
delete[] part_buf_info;
19041918

19051919
return result;
19061920
}
@@ -1987,8 +2001,10 @@ CosResult ObjectOp::SingleThreadUpload(
19872001
part_number, file_size, offset, read_len);
19882002

19892003
// 提前计算整个文件的crc64,用于整个合并分块完成后做crc64校验
1990-
crc64 = CRC64::CalcCRC(crc64, static_cast<void*>(file_content_buf),
1991-
static_cast<size_t>(read_len));
2004+
if (req.CheckCRC64()) {
2005+
crc64 = CRC64::CalcCRC(crc64, static_cast<void*>(file_content_buf),
2006+
static_cast<size_t>(read_len));
2007+
}
19922008

19932009
// Check the resume
19942010

@@ -2082,7 +2098,8 @@ uint64_t ObjectOp::GetContent(const std::string& src,
20822098
void ObjectOp::FillUploadTask(const std::string& upload_id,
20832099
const std::string& host, const std::string& path,
20842100
unsigned char* file_content_buf, uint64_t len,
2085-
uint64_t part_number, FileUploadTask* task_ptr, bool sign_header_host) {
2101+
uint64_t part_number, FileUploadTask* task_ptr,
2102+
bool sign_header_host, bool check_crc64) {
20862103
std::map<std::string, std::string> req_params;
20872104
req_params.insert(std::make_pair("uploadId", upload_id));
20882105
req_params.insert(
@@ -2113,6 +2130,7 @@ void ObjectOp::FillUploadTask(const std::string& upload_id,
21132130
task_ptr->AddHeaders(req_headers);
21142131
task_ptr->SetUploadBuf(file_content_buf, len);
21152132
task_ptr->SetPartNumber(part_number);
2133+
task_ptr->SetCheckCrc64(check_crc64);
21162134
}
21172135

21182136
void ObjectOp::FillCopyTask(const std::string& upload_id,

unittest/src/object_op_test.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2251,6 +2251,7 @@ TEST_F(ObjectOpTest, MultiPutObjectTest_OneStep) {
22512251
// 2. 上传
22522252
MultiPutObjectReq req(m_bucket_name, object_name, filename);
22532253
req.SetXCosServerSideEncryption("AES256");
2254+
req.SetCheckPartCrc64(true);
22542255
MultiPutObjectResp resp;
22552256

22562257
CosResult result = m_client->MultiPutObject(req, &resp);
@@ -3430,6 +3431,7 @@ TEST_F(ObjectOpTest, MultiUploadVaryPartSizeAndThreadPoolSize) {
34303431
MultiPutObjectReq multiupload_req(m_bucket_name, object_name, local_file);
34313432
MultiPutObjectResp multiupload_resp;
34323433
ASSERT_TRUE(multiupload_req.CheckCRC64());
3434+
multiupload_req.SetCheckPartCrc64(true);
34333435

34343436
// upload object
34353437
CosResult multiupload_result =

0 commit comments

Comments
 (0)