From 1ef3996e727b6751f8b492d178f342556a40ab70 Mon Sep 17 00:00:00 2001 From: gavinhgchen Date: Fri, 11 Jul 2025 20:22:53 +0800 Subject: [PATCH 1/2] Add SetCheckPartCrc64() for MultiPutObjectReq --- include/op/file_upload_task.h | 6 ++++ include/op/object_op.h | 4 ++- include/request/object_req.h | 11 ++++++ src/op/file_upload_task.cpp | 60 ++++++++++++++++++++++----------- src/op/object_op.cpp | 43 ++++++++++++++--------- unittest/src/object_op_test.cpp | 2 ++ 6 files changed, 90 insertions(+), 36 deletions(-) diff --git a/include/op/file_upload_task.h b/include/op/file_upload_task.h index 7a38eba..e48be53 100644 --- a/include/op/file_upload_task.h +++ b/include/op/file_upload_task.h @@ -82,6 +82,10 @@ class FileUploadTask : public Poco::Runnable { void SetCaLocation(const std::string& ca_location); void SetSslCtxCb(SSLCtxCallback cb, void *data); + void SetPartCrc64(uint64_t crc64) { + m_part_crc64 = crc64; + } + private: std::string m_full_url; std::map m_headers; @@ -104,6 +108,8 @@ class FileUploadTask : public Poco::Runnable { std::string m_ca_location; SSLCtxCallback m_ssl_ctx_cb; void *m_user_data; + + uint64_t m_part_crc64; }; } // namespace qcloud_cos diff --git a/include/op/object_op.h b/include/op/object_op.h index e626d4d..505159e 100644 --- a/include/op/object_op.h +++ b/include/op/object_op.h @@ -430,6 +430,7 @@ class ObjectOp : public BaseOp { const std::vector& already_exist_parts, bool resume_flag, std::vector* etags_ptr, std::vector* part_numbers_ptr, + uint64_t& crc64_file, const SharedTransferHandler& handler = nullptr, bool change_backup_domain = false); @@ -445,7 +446,8 @@ class ObjectOp : public BaseOp { void FillUploadTask(const std::string& upload_id, const std::string& host, const std::string& path, unsigned char* file_content_buf, uint64_t len, uint64_t part_number, - FileUploadTask* task_ptr, bool sign_header_host); + FileUploadTask* task_ptr, bool sign_header_host, + uint64_t crc64); void FillCopyTask(const std::string& upload_id, const std::string& host, const std::string& path, uint64_t part_number, diff --git a/include/request/object_req.h b/include/request/object_req.h index 58f8253..91d3245 100644 --- a/include/request/object_req.h +++ b/include/request/object_req.h @@ -375,6 +375,7 @@ class PutObjectByFileReq : public PutObjectReq { #if defined(_WIN32) mb_is_widechar_path = false; #endif + mb_check_part_crc64 = false; } virtual ~PutObjectByFileReq() {} @@ -392,6 +393,15 @@ class PutObjectByFileReq : public PutObjectReq { m_local_file_path = local_file_path; } + // MultiPutObjectReq: use crc64 instead of md5 for consistency check + void SetCheckPartCrc64(bool part_crc64) { + mb_check_part_crc64 = part_crc64; + } + + bool CheckPartCrc64() const { + return mb_check_part_crc64; + } + std::string GetLocalFilePath() const { return m_local_file_path; } #if defined(_WIN32) @@ -411,6 +421,7 @@ class PutObjectByFileReq : public PutObjectReq { #if defined(_WIN32) bool mb_is_widechar_path; // 标识文件路径是否为宽字符 #endif + bool mb_check_part_crc64; }; class DeleteObjectReq : public ObjectReq { diff --git a/src/op/file_upload_task.cpp b/src/op/file_upload_task.cpp index 13ce018..5b29ea8 100644 --- a/src/op/file_upload_task.cpp +++ b/src/op/file_upload_task.cpp @@ -6,6 +6,7 @@ #include "util/http_sender.h" #include "util/string_util.h" #include "util/codec_util.h" +#include "util/crc64.h" #ifdef USE_OPENSSL_MD5 #include #endif @@ -32,7 +33,8 @@ FileUploadTask::FileUploadTask(const std::string& full_url, m_verify_cert(verify_cert), m_ca_location(ca_location), m_ssl_ctx_cb(ssl_ctx_cb), - m_user_data(user_data) {} + m_user_data(user_data), + m_part_crc64(0) {} FileUploadTask::FileUploadTask( const std::string& full_url, @@ -58,7 +60,8 @@ FileUploadTask::FileUploadTask( m_verify_cert(verify_cert), m_ca_location(ca_location), m_ssl_ctx_cb(ssl_ctx_cb), - m_user_data(user_data) {} + m_user_data(user_data), + m_part_crc64(0) {} FileUploadTask::FileUploadTask( const std::string& full_url, @@ -84,7 +87,8 @@ FileUploadTask::FileUploadTask( m_verify_cert(verify_cert), m_ca_location(ca_location), m_ssl_ctx_cb(ssl_ctx_cb), - m_user_data(user_data) {} + m_user_data(user_data), + m_part_crc64(0) {} void FileUploadTask::run() { m_resp = ""; @@ -154,12 +158,13 @@ void FileUploadTask::SetSslCtxCb(SSLCtxCallback cb, void *data) { void FileUploadTask::UploadTask() { std::string md5_str; -#ifdef USE_OPENSSL_MD5 - unsigned char digest[MD5_DIGEST_LENGTH]; - MD5((const unsigned char *)m_data_buf_ptr, m_data_len, digest); - md5_str = CodecUtil::DigestToHex(digest, MD5_DIGEST_LENGTH); -#else - { + // 没有crc64则默认走md5校验 + if (m_part_crc64 == 0) { + #ifdef USE_OPENSSL_MD5 + unsigned char digest[MD5_DIGEST_LENGTH]; + MD5((const unsigned char *)m_data_buf_ptr, m_data_len, digest); + md5_str = CodecUtil::DigestToHex(digest, MD5_DIGEST_LENGTH); + #else // 计算上传的md5 Poco::MD5Engine md5; std::string body((const char*)m_data_buf_ptr, m_data_len); @@ -168,8 +173,8 @@ void FileUploadTask::UploadTask() { Poco::StreamCopier::copyStream(istr, dos); dos.close(); md5_str = Poco::DigestEngine::digestToHex(md5.digest()); + #endif } -#endif int loop = 0; do { @@ -204,16 +209,31 @@ void FileUploadTask::UploadTask() { continue; } - std::map::const_iterator c_itr = - m_resp_headers.find("ETag"); - if (c_itr == m_resp_headers.end() || - StringUtil::Trim(c_itr->second, "\"") != md5_str) { - SDK_LOG_ERR( - "Response etag is not correct, try again. Expect md5 is %s, but " - "return etag is %s.", - md5_str.c_str(), StringUtil::Trim(c_itr->second, "\"").c_str()); - m_is_task_success = false; - continue; + // crc64一致性校验 + if (m_part_crc64 != 0) { + std::map::const_iterator c_itr = + m_resp_headers.find(kRespHeaderXCosHashCrc64Ecma); + if (c_itr == m_resp_headers.end() || + StringUtil::StringToUint64(c_itr->second) != m_part_crc64) { + SDK_LOG_ERR( + "Response x-cos-hash-crc64ecma is not correct, try again. Expect crc64 is %" PRIu64 ", but " + "return crc64 is %s", + m_part_crc64, c_itr->second.c_str()); + m_is_task_success = false; + continue; + } + } else { + std::map::const_iterator c_itr = + m_resp_headers.find("ETag"); + if (c_itr == m_resp_headers.end() || + StringUtil::Trim(c_itr->second, "\"") != md5_str) { + SDK_LOG_ERR( + "Response etag is not correct, try again. Expect md5 is %s, but " + "return etag is %s.", + md5_str.c_str(), StringUtil::Trim(c_itr->second, "\"").c_str()); + m_is_task_success = false; + continue; + } } m_is_task_success = true; diff --git a/src/op/object_op.cpp b/src/op/object_op.cpp index 9b5aa01..26f4015 100644 --- a/src/op/object_op.cpp +++ b/src/op/object_op.cpp @@ -704,9 +704,11 @@ CosResult ObjectOp::MultiUploadObject(const PutObjectByFileReq& req, } CosResult upload_result; + + uint64_t crc64_origin = 0; upload_result = MultiThreadUpload(req, resume_uploadid, already_exist_parts, resume_flag, - &etags, &part_numbers, handler, change_backup_domain); + &etags, &part_numbers, crc64_origin, handler, change_backup_domain); // Cancel way if (handler && !handler->ShouldContinue()) { SetResultAndLogError(upload_result, "Request canceled by user"); @@ -759,16 +761,6 @@ CosResult ObjectOp::MultiUploadObject(const PutObjectByFileReq& req, // check crc64 if needed if (req.CheckCRC64() && comp_result.IsSucc() && !comp_resp.GetXCosHashCrc64Ecma().empty()) { - uint64_t crc64_origin = 0; -#if defined(_WIN32) - if (req.IsWideCharPath()) { - crc64_origin = FileUtil::GetFileCrc64(req.GetWideCharLocalFilePath()); - } else { - crc64_origin = FileUtil::GetFileCrc64(req.GetLocalFilePath()); - } -#else - crc64_origin = FileUtil::GetFileCrc64(req.GetLocalFilePath()); -#endif uint64_t crc64_server_resp = StringUtil::StringToUint64(comp_resp.GetXCosHashCrc64Ecma()); if (crc64_server_resp != crc64_origin) { @@ -1704,6 +1696,7 @@ CosResult ObjectOp::MultiThreadUpload( const std::vector& already_exist_parts, bool resume_flag, std::vector* etags_ptr, std::vector* part_numbers_ptr, + uint64_t& crc64_file, const SharedTransferHandler& handler, bool change_backup_domain) { CosResult result; @@ -1790,6 +1783,7 @@ CosResult ObjectOp::MultiThreadUpload( Poco::ThreadPool tp(pool_size); + crc64_file = 0; // 3. 多线程upload { uint64_t part_number = 1; @@ -1813,6 +1807,7 @@ CosResult ObjectOp::MultiThreadUpload( ", offset=%" PRIu64 ", len=%" PRIu64, task_index, file_size, offset, read_len); + uint64_t crc64_part = 0; // Check the resume FileUploadTask* ptask = pptaskArr[task_index]; @@ -1828,11 +1823,25 @@ CosResult ObjectOp::MultiThreadUpload( handler->UpdateProgress(read_len); } } else { + // 计算每个part的crc64值 + if (req.CheckPartCrc64()) { + crc64_part = CRC64::CalcCRC(crc64_part, static_cast(file_content_buf[task_index]), read_len); + } FillUploadTask(upload_id, host, path, file_content_buf[task_index], - read_len, part_number, ptask, req.SignHeaderHost()); + read_len, part_number, ptask, req.SignHeaderHost(), crc64_part); tp.start(*ptask); } + // 根据每个part流式计算整个文件的crc64值 + if (req.CheckCRC64()) { + // 如果已经计算了part的crc64值,只需要直接流式合并即可 + if (crc64_part != 0) { + crc64_file = CRC64::CombineCRC(crc64_file, crc64_part, read_len); + } else { + crc64_file = CRC64::CalcCRC(crc64_file, static_cast(file_content_buf[task_index]), read_len); + } + } + offset += read_len; part_numbers_ptr->push_back(part_number); ++part_number; @@ -1987,8 +1996,10 @@ CosResult ObjectOp::SingleThreadUpload( part_number, file_size, offset, read_len); // 提前计算整个文件的crc64,用于整个合并分块完成后做crc64校验 - crc64 = CRC64::CalcCRC(crc64, static_cast(file_content_buf), - static_cast(read_len)); + if (req.CheckCRC64()) { + crc64 = CRC64::CalcCRC(crc64, static_cast(file_content_buf), + static_cast(read_len)); + } // Check the resume @@ -2082,7 +2093,8 @@ uint64_t ObjectOp::GetContent(const std::string& src, void ObjectOp::FillUploadTask(const std::string& upload_id, const std::string& host, const std::string& path, unsigned char* file_content_buf, uint64_t len, - uint64_t part_number, FileUploadTask* task_ptr, bool sign_header_host) { + uint64_t part_number, FileUploadTask* task_ptr, + bool sign_header_host, uint64_t crc64) { std::map req_params; req_params.insert(std::make_pair("uploadId", upload_id)); req_params.insert( @@ -2113,6 +2125,7 @@ void ObjectOp::FillUploadTask(const std::string& upload_id, task_ptr->AddHeaders(req_headers); task_ptr->SetUploadBuf(file_content_buf, len); task_ptr->SetPartNumber(part_number); + task_ptr->SetPartCrc64(crc64); } void ObjectOp::FillCopyTask(const std::string& upload_id, diff --git a/unittest/src/object_op_test.cpp b/unittest/src/object_op_test.cpp index d7cf059..21ce311 100644 --- a/unittest/src/object_op_test.cpp +++ b/unittest/src/object_op_test.cpp @@ -2251,6 +2251,7 @@ TEST_F(ObjectOpTest, MultiPutObjectTest_OneStep) { // 2. 上传 MultiPutObjectReq req(m_bucket_name, object_name, filename); req.SetXCosServerSideEncryption("AES256"); + req.SetCheckPartCrc64(true); MultiPutObjectResp resp; CosResult result = m_client->MultiPutObject(req, &resp); @@ -3430,6 +3431,7 @@ TEST_F(ObjectOpTest, MultiUploadVaryPartSizeAndThreadPoolSize) { MultiPutObjectReq multiupload_req(m_bucket_name, object_name, local_file); MultiPutObjectResp multiupload_resp; ASSERT_TRUE(multiupload_req.CheckCRC64()); + multiupload_req.SetCheckPartCrc64(true); // upload object CosResult multiupload_result = From 4205a578c1c20cb10a771687a6892e3a1d833d82 Mon Sep 17 00:00:00 2001 From: gavinhgchen Date: Sun, 13 Jul 2025 17:38:32 +0800 Subject: [PATCH 2/2] MultiPutObject opt continue --- include/op/file_upload_task.h | 11 ++++++-- include/op/object_op.h | 14 ++++++++- src/op/file_upload_task.cpp | 26 ++++++++++++----- src/op/object_op.cpp | 53 +++++++++++++++++++---------------- 4 files changed, 69 insertions(+), 35 deletions(-) diff --git a/include/op/file_upload_task.h b/include/op/file_upload_task.h index e48be53..38d9fda 100644 --- a/include/op/file_upload_task.h +++ b/include/op/file_upload_task.h @@ -82,8 +82,12 @@ class FileUploadTask : public Poco::Runnable { void SetCaLocation(const std::string& ca_location); void SetSslCtxCb(SSLCtxCallback cb, void *data); - void SetPartCrc64(uint64_t crc64) { - m_part_crc64 = crc64; + void SetCheckCrc64(bool check_crc64) { + mb_check_crc64 = check_crc64; + } + + uint64_t GetCrc64Value() const { + return m_crc64_value; } private: @@ -109,7 +113,8 @@ class FileUploadTask : public Poco::Runnable { SSLCtxCallback m_ssl_ctx_cb; void *m_user_data; - uint64_t m_part_crc64; + bool mb_check_crc64; + uint64_t m_crc64_value; }; } // namespace qcloud_cos diff --git a/include/op/object_op.h b/include/op/object_op.h index 505159e..8a64106 100644 --- a/include/op/object_op.h +++ b/include/op/object_op.h @@ -20,6 +20,18 @@ namespace qcloud_cos { +class PartBufInfo { +public: + unsigned char* buf; + size_t len; + +public: + PartBufInfo() { + buf = nullptr; + len = 0; + } +}; + class FileUploadTask; class FileCopyTask; @@ -447,7 +459,7 @@ class ObjectOp : public BaseOp { const std::string& path, unsigned char* file_content_buf, uint64_t len, uint64_t part_number, FileUploadTask* task_ptr, bool sign_header_host, - uint64_t crc64); + bool check_crc64); void FillCopyTask(const std::string& upload_id, const std::string& host, const std::string& path, uint64_t part_number, diff --git a/src/op/file_upload_task.cpp b/src/op/file_upload_task.cpp index 5b29ea8..642df46 100644 --- a/src/op/file_upload_task.cpp +++ b/src/op/file_upload_task.cpp @@ -34,7 +34,8 @@ FileUploadTask::FileUploadTask(const std::string& full_url, m_ca_location(ca_location), m_ssl_ctx_cb(ssl_ctx_cb), m_user_data(user_data), - m_part_crc64(0) {} + mb_check_crc64(false), + m_crc64_value(0) {} FileUploadTask::FileUploadTask( const std::string& full_url, @@ -61,7 +62,8 @@ FileUploadTask::FileUploadTask( m_ca_location(ca_location), m_ssl_ctx_cb(ssl_ctx_cb), m_user_data(user_data), - m_part_crc64(0) {} + mb_check_crc64(false), + m_crc64_value(0) {} FileUploadTask::FileUploadTask( const std::string& full_url, @@ -88,7 +90,8 @@ FileUploadTask::FileUploadTask( m_ca_location(ca_location), m_ssl_ctx_cb(ssl_ctx_cb), m_user_data(user_data), - m_part_crc64(0) {} + mb_check_crc64(false), + m_crc64_value(0) {} void FileUploadTask::run() { m_resp = ""; @@ -158,8 +161,14 @@ void FileUploadTask::SetSslCtxCb(SSLCtxCallback cb, void *data) { void FileUploadTask::UploadTask() { std::string md5_str; + // 数据一致性校验采用crc64 + if (mb_check_crc64) { + m_crc64_value = 0; + m_crc64_value = CRC64::CalcCRC(m_crc64_value, static_cast(m_data_buf_ptr), m_data_len); + SDK_LOG_DBG("Part Crc64: %" PRIu64, m_crc64_value); + } // 没有crc64则默认走md5校验 - if (m_part_crc64 == 0) { + else { #ifdef USE_OPENSSL_MD5 unsigned char digest[MD5_DIGEST_LENGTH]; MD5((const unsigned char *)m_data_buf_ptr, m_data_len, digest); @@ -174,6 +183,7 @@ void FileUploadTask::UploadTask() { dos.close(); md5_str = Poco::DigestEngine::digestToHex(md5.digest()); #endif + SDK_LOG_DBG("Part Md5: %s", md5_str.c_str()); } int loop = 0; @@ -210,18 +220,19 @@ void FileUploadTask::UploadTask() { } // crc64一致性校验 - if (m_part_crc64 != 0) { + if (mb_check_crc64) { std::map::const_iterator c_itr = m_resp_headers.find(kRespHeaderXCosHashCrc64Ecma); if (c_itr == m_resp_headers.end() || - StringUtil::StringToUint64(c_itr->second) != m_part_crc64) { + StringUtil::StringToUint64(c_itr->second) != m_crc64_value) { SDK_LOG_ERR( "Response x-cos-hash-crc64ecma is not correct, try again. Expect crc64 is %" PRIu64 ", but " "return crc64 is %s", - m_part_crc64, c_itr->second.c_str()); + m_crc64_value, c_itr->second.c_str()); m_is_task_success = false; continue; } + SDK_LOG_DBG("Part Crc64 Check Success."); } else { std::map::const_iterator c_itr = m_resp_headers.find("ETag"); @@ -234,6 +245,7 @@ void FileUploadTask::UploadTask() { m_is_task_success = false; continue; } + SDK_LOG_DBG("Part Md5 Check Success."); } m_is_task_success = true; diff --git a/src/op/object_op.cpp b/src/op/object_op.cpp index 26f4015..76e6b86 100644 --- a/src/op/object_op.cpp +++ b/src/op/object_op.cpp @@ -763,6 +763,7 @@ CosResult ObjectOp::MultiUploadObject(const PutObjectByFileReq& req, !comp_resp.GetXCosHashCrc64Ecma().empty()) { uint64_t crc64_server_resp = StringUtil::StringToUint64(comp_resp.GetXCosHashCrc64Ecma()); + SDK_LOG_DBG("File Crc64: %" PRIu64, crc64_origin); if (crc64_server_resp != crc64_origin) { std::string err_msg = "MultiUploadObject failed, crc64 check failed, crc64_origin: " + @@ -1762,9 +1763,9 @@ CosResult ObjectOp::MultiThreadUpload( return result; } - unsigned char** file_content_buf = new unsigned char*[pool_size]; + PartBufInfo *part_buf_info = new PartBufInfo[pool_size]; for (int i = 0; i < pool_size; ++i) { - file_content_buf[i] = new unsigned char[(size_t)part_size]; + part_buf_info[i].buf = new unsigned char[(size_t)part_size]; } std::string dest_url = GetRealUrl(host, path, req.IsHttps()); @@ -1796,18 +1797,18 @@ CosResult ObjectOp::MultiThreadUpload( } for (; task_index < pool_size; ++task_index) { - fin.read((char*)file_content_buf[task_index], part_size); + fin.read((char *)(part_buf_info[task_index].buf), part_size); std::streamsize read_len = fin.gcount(); if (read_len == 0 && fin.eof()) { SDK_LOG_DBG("read over, task_index: %d", task_index); break; } + part_buf_info[task_index].len = static_cast(read_len); SDK_LOG_DBG("upload data, task_index=%d, file_size=%" PRIu64 ", offset=%" PRIu64 ", len=%" PRIu64, task_index, file_size, offset, read_len); - uint64_t crc64_part = 0; // Check the resume FileUploadTask* ptask = pptaskArr[task_index]; @@ -1823,25 +1824,11 @@ CosResult ObjectOp::MultiThreadUpload( handler->UpdateProgress(read_len); } } else { - // 计算每个part的crc64值 - if (req.CheckPartCrc64()) { - crc64_part = CRC64::CalcCRC(crc64_part, static_cast(file_content_buf[task_index]), read_len); - } - FillUploadTask(upload_id, host, path, file_content_buf[task_index], - read_len, part_number, ptask, req.SignHeaderHost(), crc64_part); + FillUploadTask(upload_id, host, path, part_buf_info[task_index].buf, + read_len, part_number, ptask, req.SignHeaderHost(), req.CheckPartCrc64()); tp.start(*ptask); } - // 根据每个part流式计算整个文件的crc64值 - if (req.CheckCRC64()) { - // 如果已经计算了part的crc64值,只需要直接流式合并即可 - if (crc64_part != 0) { - crc64_file = CRC64::CombineCRC(crc64_file, crc64_part, read_len); - } else { - crc64_file = CRC64::CalcCRC(crc64_file, static_cast(file_content_buf[task_index]), read_len); - } - } - offset += read_len; part_numbers_ptr->push_back(part_number); ++part_number; @@ -1887,6 +1874,24 @@ CosResult ObjectOp::MultiThreadUpload( task_fail_flag = true; break; } + + // 根据每个part流式计算整个文件的crc64值 + if (req.CheckCRC64()) { + // 如果已经计算了part的crc64值,只需要直接流式合并即可 + if (ptask->GetCrc64Value() != 0) { + crc64_file = CRC64::CombineCRC(crc64_file, ptask->GetCrc64Value(), + static_cast(part_buf_info[task_index].len)); + SDK_LOG_DBG("Combine Crc64: %" PRIu64 ", Part Crc64: %" PRIu64, + crc64_file, ptask->GetCrc64Value()); + } else { + // 两种情况都有可能: + // 1、CheckPartCrc64()为false + // 2、此part是断点续传已经上传的part + crc64_file = CRC64::CalcCRC(crc64_file, static_cast(part_buf_info[task_index].buf), + part_buf_info[task_index].len); + SDK_LOG_DBG("Calc Crc64: %" PRIu64, crc64_file) + } + } } if (task_fail_flag) { @@ -1907,9 +1912,9 @@ CosResult ObjectOp::MultiThreadUpload( delete[] pptaskArr; for (int i = 0; i < pool_size; ++i) { - delete[] file_content_buf[i]; + delete[] part_buf_info[i].buf; } - delete[] file_content_buf; + delete[] part_buf_info; return result; } @@ -2094,7 +2099,7 @@ void ObjectOp::FillUploadTask(const std::string& upload_id, const std::string& host, const std::string& path, unsigned char* file_content_buf, uint64_t len, uint64_t part_number, FileUploadTask* task_ptr, - bool sign_header_host, uint64_t crc64) { + bool sign_header_host, bool check_crc64) { std::map req_params; req_params.insert(std::make_pair("uploadId", upload_id)); req_params.insert( @@ -2125,7 +2130,7 @@ void ObjectOp::FillUploadTask(const std::string& upload_id, task_ptr->AddHeaders(req_headers); task_ptr->SetUploadBuf(file_content_buf, len); task_ptr->SetPartNumber(part_number); - task_ptr->SetPartCrc64(crc64); + task_ptr->SetCheckCrc64(check_crc64); } void ObjectOp::FillCopyTask(const std::string& upload_id,