From 8a328f76bf62ca64525499f07ced2703650a84b4 Mon Sep 17 00:00:00 2001 From: maddoxwang Date: Thu, 4 Aug 2022 17:22:39 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=85=E5=AD=98=E6=95=B0=E6=8D=AE=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E4=B8=8A=E4=BC=A0=E6=8E=A5=E5=8F=A3=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?&=E7=BA=BF=E7=A8=8B=E6=B1=A0=E7=BA=BF=E7=A8=8B=E6=95=B0?= =?UTF-8?q?=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/cos_api.h | 2 ++ include/op/object_op.h | 2 +- include/request/object_req.h | 9 +++++++++ src/cos_api.cpp | 20 +++++++++++++++++++- src/op/object_op.cpp | 11 ++++++++--- 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/include/cos_api.h b/include/cos_api.h index a19b4f5..ff8d6cf 100644 --- a/include/cos_api.h +++ b/include/cos_api.h @@ -723,6 +723,8 @@ class CosAPI { /// \return 返回context SharedAsyncContext AsyncPutObject(const AsyncPutObjectReq& req); + SharedAsyncContext AsyncPutObject(const AsyncPutObjectByStreamReq& req); + /// \brief /// 异步上传对象,封装了初始化分块上传、分块上传、完成分块上传三步,支持断点续传 /// \param req MultiPutObjectAsync请求 diff --git a/include/op/object_op.h b/include/op/object_op.h index 66853dc..7d5bb56 100644 --- a/include/op/object_op.h +++ b/include/op/object_op.h @@ -103,7 +103,7 @@ class ObjectOp : public BaseOp { /// /// \return 返回HTTP请求的状态码及错误信息 CosResult PutObject(const PutObjectByStreamReq& req, - PutObjectByStreamResp* resp); + PutObjectByStreamResp* resp, const SharedTransferHandler& handler=nullptr); /// \brief 删除Object /// diff --git a/include/request/object_req.h b/include/request/object_req.h index 69edea8..6b42153 100644 --- a/include/request/object_req.h +++ b/include/request/object_req.h @@ -1710,6 +1710,15 @@ class AsyncPutObjectReq : public PutObjectByFileReq { virtual ~AsyncPutObjectReq() {} }; +class AsyncPutObjectByStreamReq : public PutObjectByStreamReq { + public: + AsyncPutObjectByStreamReq(const std::string& bucket_name, const std::string& object_name, + std::istream& in_stream) + : PutObjectByStreamReq(bucket_name, object_name, in_stream) {} + + virtual ~AsyncPutObjectByStreamReq() {} +}; + class AsyncMultiPutObjectReq : public PutObjectByFileReq { public: AsyncMultiPutObjectReq(const std::string& bucket_name, const std::string& object_name, diff --git a/src/cos_api.cpp b/src/cos_api.cpp index 6e248ee..3fa0f09 100644 --- a/src/cos_api.cpp +++ b/src/cos_api.cpp @@ -18,7 +18,8 @@ int CosAPI::s_cos_obj_num = 0; std::mutex g_init_lock; Poco::TaskManager& GetGlobalTaskManager() { - static Poco::TaskManager task_manager; + static Poco::ThreadPool async_thread_pool("aysnc_pool", 2, CosSysConfig::GetAsynThreadPoolSize()); + static Poco::TaskManager task_manager(async_thread_pool); return task_manager; } @@ -500,6 +501,23 @@ SharedAsyncContext CosAPI::AsyncPutObject(const AsyncPutObjectReq& req) { return context; } +SharedAsyncContext CosAPI::AsyncPutObject(const AsyncPutObjectByStreamReq& req) { + SharedTransferHandler handler(new TransferHandler()); + handler->SetRequest(reinterpret_cast(&req)); + auto& is = req.GetStream(); + is.seekg(0, std::ios::end); + handler->SetTotalSize(is.tellg()); + is.seekg(0, std::ios::beg); + TaskFunc fn = [=]() { + PutObjectByStreamResp resp; + m_object_op.PutObject(req, &resp, handler); + }; + GetGlobalTaskManager().start(new AsyncTask(std::move(fn))); + SharedAsyncContext context(new AsyncContext(handler)); + return context; +} + + SharedAsyncContext CosAPI::AsyncMultiPutObject(const AsyncMultiPutObjectReq& req) { SharedTransferHandler handler(new TransferHandler()); handler->SetRequest(reinterpret_cast(&req)); diff --git a/src/op/object_op.cpp b/src/op/object_op.cpp index 6680bed..9826ac8 100644 --- a/src/op/object_op.cpp +++ b/src/op/object_op.cpp @@ -377,7 +377,7 @@ CosResult ObjectOp::MultiGetObject(const GetObjectByFileReq& req, } CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req, - PutObjectByStreamResp* resp) { + PutObjectByStreamResp* resp, const SharedTransferHandler& handler) { CosResult result; std::string host = CosSysConfig::GetHost(GetAppId(), m_config->GetRegion(), req.GetBucketName()); @@ -410,7 +410,7 @@ CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req, } result = UploadAction(host, path, req, additional_headers, - additional_params, is, resp); + additional_params, is, resp, handler); // V4 Etag长度为40字节 if (result.IsSucc() && need_check_etag && @@ -423,7 +423,12 @@ CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req, md5_str.c_str(), resp->GetEtag().c_str(), resp->GetXCosRequestId().c_str()); } - + if(result.IsSucc() && handler) { + handler->UpdateStatus(TransferStatus::COMPLETED, result, resp->GetHeaders(), + resp->GetBody()); + } else if(handler) { + handler->UpdateStatus(TransferStatus::FAILED, result); + } return result; }