Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/op/file_upload_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ class FileUploadTask : public Poco::Runnable {
void SetCaLocation(const std::string& ca_location);
void SetSslCtxCb(SSLCtxCallback cb, void *data);

void SetCheckCrc64(bool check_crc64) {
mb_check_crc64 = check_crc64;
}

uint64_t GetCrc64Value() const {
return m_crc64_value;
}

private:
std::string m_full_url;
std::map<std::string, std::string> m_headers;
Expand All @@ -104,6 +112,9 @@ class FileUploadTask : public Poco::Runnable {
std::string m_ca_location;
SSLCtxCallback m_ssl_ctx_cb;
void *m_user_data;

bool mb_check_crc64;
uint64_t m_crc64_value;
};

} // namespace qcloud_cos
16 changes: 15 additions & 1 deletion include/op/object_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@

namespace qcloud_cos {

class PartBufInfo {
public:
unsigned char* buf;
size_t len;

public:
PartBufInfo() {
buf = nullptr;
len = 0;
}
};

class FileUploadTask;
class FileCopyTask;

Expand Down Expand Up @@ -430,6 +442,7 @@ class ObjectOp : public BaseOp {
const std::vector<std::string>& already_exist_parts,
bool resume_flag, std::vector<std::string>* etags_ptr,
std::vector<uint64_t>* part_numbers_ptr,
uint64_t& crc64_file,
const SharedTransferHandler& handler = nullptr,
bool change_backup_domain = false);

Expand All @@ -445,7 +458,8 @@ class ObjectOp : public BaseOp {
void FillUploadTask(const std::string& upload_id, const std::string& host,
const std::string& path, unsigned char* file_content_buf,
uint64_t len, uint64_t part_number,
FileUploadTask* task_ptr, bool sign_header_host);
FileUploadTask* task_ptr, bool sign_header_host,
bool check_crc64);

void FillCopyTask(const std::string& upload_id, const std::string& host,
const std::string& path, uint64_t part_number,
Expand Down
11 changes: 11 additions & 0 deletions include/request/object_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ class PutObjectByFileReq : public PutObjectReq {
#if defined(_WIN32)
mb_is_widechar_path = false;
#endif
mb_check_part_crc64 = false;
}

virtual ~PutObjectByFileReq() {}
Expand All @@ -392,6 +393,15 @@ class PutObjectByFileReq : public PutObjectReq {
m_local_file_path = local_file_path;
}

// MultiPutObjectReq: use crc64 instead of md5 for consistency check
void SetCheckPartCrc64(bool part_crc64) {
mb_check_part_crc64 = part_crc64;
}

bool CheckPartCrc64() const {
return mb_check_part_crc64;
}

std::string GetLocalFilePath() const { return m_local_file_path; }

#if defined(_WIN32)
Expand All @@ -411,6 +421,7 @@ class PutObjectByFileReq : public PutObjectReq {
#if defined(_WIN32)
bool mb_is_widechar_path; // 标识文件路径是否为宽字符
#endif
bool mb_check_part_crc64;
};

class DeleteObjectReq : public ObjectReq {
Expand Down
72 changes: 52 additions & 20 deletions src/op/file_upload_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "util/http_sender.h"
#include "util/string_util.h"
#include "util/codec_util.h"
#include "util/crc64.h"
#ifdef USE_OPENSSL_MD5
#include <openssl/md5.h>
#endif
Expand All @@ -32,7 +33,9 @@ FileUploadTask::FileUploadTask(const std::string& full_url,
m_verify_cert(verify_cert),
m_ca_location(ca_location),
m_ssl_ctx_cb(ssl_ctx_cb),
m_user_data(user_data) {}
m_user_data(user_data),
mb_check_crc64(false),
m_crc64_value(0) {}

FileUploadTask::FileUploadTask(
const std::string& full_url,
Expand All @@ -58,7 +61,9 @@ FileUploadTask::FileUploadTask(
m_verify_cert(verify_cert),
m_ca_location(ca_location),
m_ssl_ctx_cb(ssl_ctx_cb),
m_user_data(user_data) {}
m_user_data(user_data),
mb_check_crc64(false),
m_crc64_value(0) {}

FileUploadTask::FileUploadTask(
const std::string& full_url,
Expand All @@ -84,7 +89,9 @@ FileUploadTask::FileUploadTask(
m_verify_cert(verify_cert),
m_ca_location(ca_location),
m_ssl_ctx_cb(ssl_ctx_cb),
m_user_data(user_data) {}
m_user_data(user_data),
mb_check_crc64(false),
m_crc64_value(0) {}

void FileUploadTask::run() {
m_resp = "";
Expand Down Expand Up @@ -154,12 +161,19 @@ void FileUploadTask::SetSslCtxCb(SSLCtxCallback cb, void *data) {

void FileUploadTask::UploadTask() {
std::string md5_str;
#ifdef USE_OPENSSL_MD5
unsigned char digest[MD5_DIGEST_LENGTH];
MD5((const unsigned char *)m_data_buf_ptr, m_data_len, digest);
md5_str = CodecUtil::DigestToHex(digest, MD5_DIGEST_LENGTH);
#else
{
// 数据一致性校验采用crc64
if (mb_check_crc64) {
m_crc64_value = 0;
m_crc64_value = CRC64::CalcCRC(m_crc64_value, static_cast<void*>(m_data_buf_ptr), m_data_len);
SDK_LOG_DBG("Part Crc64: %" PRIu64, m_crc64_value);
}
// 没有crc64则默认走md5校验
else {
#ifdef USE_OPENSSL_MD5
unsigned char digest[MD5_DIGEST_LENGTH];
MD5((const unsigned char *)m_data_buf_ptr, m_data_len, digest);
md5_str = CodecUtil::DigestToHex(digest, MD5_DIGEST_LENGTH);
#else
// 计算上传的md5
Poco::MD5Engine md5;
std::string body((const char*)m_data_buf_ptr, m_data_len);
Expand All @@ -168,8 +182,9 @@ void FileUploadTask::UploadTask() {
Poco::StreamCopier::copyStream(istr, dos);
dos.close();
md5_str = Poco::DigestEngine::digestToHex(md5.digest());
#endif
SDK_LOG_DBG("Part Md5: %s", md5_str.c_str());
}
#endif

int loop = 0;
do {
Expand Down Expand Up @@ -204,16 +219,33 @@ void FileUploadTask::UploadTask() {
continue;
}

std::map<std::string, std::string>::const_iterator c_itr =
m_resp_headers.find("ETag");
if (c_itr == m_resp_headers.end() ||
StringUtil::Trim(c_itr->second, "\"") != md5_str) {
SDK_LOG_ERR(
"Response etag is not correct, try again. Expect md5 is %s, but "
"return etag is %s.",
md5_str.c_str(), StringUtil::Trim(c_itr->second, "\"").c_str());
m_is_task_success = false;
continue;
// crc64一致性校验
if (mb_check_crc64) {
std::map<std::string, std::string>::const_iterator c_itr =
m_resp_headers.find(kRespHeaderXCosHashCrc64Ecma);
if (c_itr == m_resp_headers.end() ||
StringUtil::StringToUint64(c_itr->second) != m_crc64_value) {
SDK_LOG_ERR(
"Response x-cos-hash-crc64ecma is not correct, try again. Expect crc64 is %" PRIu64 ", but "
"return crc64 is %s",
m_crc64_value, c_itr->second.c_str());
m_is_task_success = false;
continue;
}
SDK_LOG_DBG("Part Crc64 Check Success.");
} else {
std::map<std::string, std::string>::const_iterator c_itr =
m_resp_headers.find("ETag");
if (c_itr == m_resp_headers.end() ||
StringUtil::Trim(c_itr->second, "\"") != md5_str) {
SDK_LOG_ERR(
"Response etag is not correct, try again. Expect md5 is %s, but "
"return etag is %s.",
md5_str.c_str(), StringUtil::Trim(c_itr->second, "\"").c_str());
m_is_task_success = false;
continue;
}
SDK_LOG_DBG("Part Md5 Check Success.");
}

m_is_task_success = true;
Expand Down
60 changes: 39 additions & 21 deletions src/op/object_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 +704,11 @@ CosResult ObjectOp::MultiUploadObject(const PutObjectByFileReq& req,
}

CosResult upload_result;

uint64_t crc64_origin = 0;
upload_result =
MultiThreadUpload(req, resume_uploadid, already_exist_parts, resume_flag,
&etags, &part_numbers, handler, change_backup_domain);
&etags, &part_numbers, crc64_origin, handler, change_backup_domain);
// Cancel way
if (handler && !handler->ShouldContinue()) {
SetResultAndLogError(upload_result, "Request canceled by user");
Expand Down Expand Up @@ -759,18 +761,9 @@ CosResult ObjectOp::MultiUploadObject(const PutObjectByFileReq& req,
// check crc64 if needed
if (req.CheckCRC64() && comp_result.IsSucc() &&
!comp_resp.GetXCosHashCrc64Ecma().empty()) {
uint64_t crc64_origin = 0;
#if defined(_WIN32)
if (req.IsWideCharPath()) {
crc64_origin = FileUtil::GetFileCrc64(req.GetWideCharLocalFilePath());
} else {
crc64_origin = FileUtil::GetFileCrc64(req.GetLocalFilePath());
}
#else
crc64_origin = FileUtil::GetFileCrc64(req.GetLocalFilePath());
#endif
uint64_t crc64_server_resp =
StringUtil::StringToUint64(comp_resp.GetXCosHashCrc64Ecma());
SDK_LOG_DBG("File Crc64: %" PRIu64, crc64_origin);
if (crc64_server_resp != crc64_origin) {
std::string err_msg =
"MultiUploadObject failed, crc64 check failed, crc64_origin: " +
Expand Down Expand Up @@ -1704,6 +1697,7 @@ CosResult ObjectOp::MultiThreadUpload(
const std::vector<std::string>& already_exist_parts, bool resume_flag,
std::vector<std::string>* etags_ptr,
std::vector<uint64_t>* part_numbers_ptr,
uint64_t& crc64_file,
const SharedTransferHandler& handler,
bool change_backup_domain) {
CosResult result;
Expand Down Expand Up @@ -1769,9 +1763,9 @@ CosResult ObjectOp::MultiThreadUpload(
return result;
}

unsigned char** file_content_buf = new unsigned char*[pool_size];
PartBufInfo *part_buf_info = new PartBufInfo[pool_size];
for (int i = 0; i < pool_size; ++i) {
file_content_buf[i] = new unsigned char[(size_t)part_size];
part_buf_info[i].buf = new unsigned char[(size_t)part_size];
}

std::string dest_url = GetRealUrl(host, path, req.IsHttps());
Expand All @@ -1790,6 +1784,7 @@ CosResult ObjectOp::MultiThreadUpload(

Poco::ThreadPool tp(pool_size);

crc64_file = 0;
// 3. 多线程upload
{
uint64_t part_number = 1;
Expand All @@ -1802,12 +1797,13 @@ CosResult ObjectOp::MultiThreadUpload(
}

for (; task_index < pool_size; ++task_index) {
fin.read((char*)file_content_buf[task_index], part_size);
fin.read((char *)(part_buf_info[task_index].buf), part_size);
std::streamsize read_len = fin.gcount();
if (read_len == 0 && fin.eof()) {
SDK_LOG_DBG("read over, task_index: %d", task_index);
break;
}
part_buf_info[task_index].len = static_cast<size_t>(read_len);

SDK_LOG_DBG("upload data, task_index=%d, file_size=%" PRIu64
", offset=%" PRIu64 ", len=%" PRIu64,
Expand All @@ -1828,8 +1824,8 @@ CosResult ObjectOp::MultiThreadUpload(
handler->UpdateProgress(read_len);
}
} else {
FillUploadTask(upload_id, host, path, file_content_buf[task_index],
read_len, part_number, ptask, req.SignHeaderHost());
FillUploadTask(upload_id, host, path, part_buf_info[task_index].buf,
read_len, part_number, ptask, req.SignHeaderHost(), req.CheckPartCrc64());
tp.start(*ptask);
}

Expand Down Expand Up @@ -1878,6 +1874,24 @@ CosResult ObjectOp::MultiThreadUpload(
task_fail_flag = true;
break;
}

// 根据每个part流式计算整个文件的crc64值
if (req.CheckCRC64()) {
// 如果已经计算了part的crc64值,只需要直接流式合并即可
if (ptask->GetCrc64Value() != 0) {
crc64_file = CRC64::CombineCRC(crc64_file, ptask->GetCrc64Value(),
static_cast<uintmax_t>(part_buf_info[task_index].len));
SDK_LOG_DBG("Combine Crc64: %" PRIu64 ", Part Crc64: %" PRIu64,
crc64_file, ptask->GetCrc64Value());
} else {
// 两种情况都有可能:
// 1、CheckPartCrc64()为false
// 2、此part是断点续传已经上传的part
crc64_file = CRC64::CalcCRC(crc64_file, static_cast<void *>(part_buf_info[task_index].buf),
part_buf_info[task_index].len);
SDK_LOG_DBG("Calc Crc64: %" PRIu64, crc64_file)
}
}
}

if (task_fail_flag) {
Expand All @@ -1898,9 +1912,9 @@ CosResult ObjectOp::MultiThreadUpload(
delete[] pptaskArr;

for (int i = 0; i < pool_size; ++i) {
delete[] file_content_buf[i];
delete[] part_buf_info[i].buf;
}
delete[] file_content_buf;
delete[] part_buf_info;

return result;
}
Expand Down Expand Up @@ -1987,8 +2001,10 @@ CosResult ObjectOp::SingleThreadUpload(
part_number, file_size, offset, read_len);

// 提前计算整个文件的crc64,用于整个合并分块完成后做crc64校验
crc64 = CRC64::CalcCRC(crc64, static_cast<void*>(file_content_buf),
static_cast<size_t>(read_len));
if (req.CheckCRC64()) {
crc64 = CRC64::CalcCRC(crc64, static_cast<void*>(file_content_buf),
static_cast<size_t>(read_len));
}

// Check the resume

Expand Down Expand Up @@ -2082,7 +2098,8 @@ uint64_t ObjectOp::GetContent(const std::string& src,
void ObjectOp::FillUploadTask(const std::string& upload_id,
const std::string& host, const std::string& path,
unsigned char* file_content_buf, uint64_t len,
uint64_t part_number, FileUploadTask* task_ptr, bool sign_header_host) {
uint64_t part_number, FileUploadTask* task_ptr,
bool sign_header_host, bool check_crc64) {
std::map<std::string, std::string> req_params;
req_params.insert(std::make_pair("uploadId", upload_id));
req_params.insert(
Expand Down Expand Up @@ -2113,6 +2130,7 @@ void ObjectOp::FillUploadTask(const std::string& upload_id,
task_ptr->AddHeaders(req_headers);
task_ptr->SetUploadBuf(file_content_buf, len);
task_ptr->SetPartNumber(part_number);
task_ptr->SetCheckCrc64(check_crc64);
}

void ObjectOp::FillCopyTask(const std::string& upload_id,
Expand Down
2 changes: 2 additions & 0 deletions unittest/src/object_op_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2251,6 +2251,7 @@ TEST_F(ObjectOpTest, MultiPutObjectTest_OneStep) {
// 2. 上传
MultiPutObjectReq req(m_bucket_name, object_name, filename);
req.SetXCosServerSideEncryption("AES256");
req.SetCheckPartCrc64(true);
MultiPutObjectResp resp;

CosResult result = m_client->MultiPutObject(req, &resp);
Expand Down Expand Up @@ -3430,6 +3431,7 @@ TEST_F(ObjectOpTest, MultiUploadVaryPartSizeAndThreadPoolSize) {
MultiPutObjectReq multiupload_req(m_bucket_name, object_name, local_file);
MultiPutObjectResp multiupload_resp;
ASSERT_TRUE(multiupload_req.CheckCRC64());
multiupload_req.SetCheckPartCrc64(true);

// upload object
CosResult multiupload_result =
Expand Down