diff --git a/include/cos_api.h b/include/cos_api.h index a19b4f5..ff8d6cf 100644 --- a/include/cos_api.h +++ b/include/cos_api.h @@ -723,6 +723,8 @@ class CosAPI { /// \return 返回context SharedAsyncContext AsyncPutObject(const AsyncPutObjectReq& req); + SharedAsyncContext AsyncPutObject(const AsyncPutObjectByStreamReq& req); + /// \brief /// 异步上传对象,封装了初始化分块上传、分块上传、完成分块上传三步,支持断点续传 /// \param req MultiPutObjectAsync请求 diff --git a/include/op/object_op.h b/include/op/object_op.h index 66853dc..7d5bb56 100644 --- a/include/op/object_op.h +++ b/include/op/object_op.h @@ -103,7 +103,7 @@ class ObjectOp : public BaseOp { /// /// \return 返回HTTP请求的状态码及错误信息 CosResult PutObject(const PutObjectByStreamReq& req, - PutObjectByStreamResp* resp); + PutObjectByStreamResp* resp, const SharedTransferHandler& handler=nullptr); /// \brief 删除Object /// diff --git a/include/request/object_req.h b/include/request/object_req.h index c3d79b4..b0a2681 100644 --- a/include/request/object_req.h +++ b/include/request/object_req.h @@ -1712,6 +1712,15 @@ class AsyncPutObjectReq : public PutObjectByFileReq { virtual ~AsyncPutObjectReq() {} }; +class AsyncPutObjectByStreamReq : public PutObjectByStreamReq { + public: + AsyncPutObjectByStreamReq(const std::string& bucket_name, const std::string& object_name, + std::istream& in_stream) + : PutObjectByStreamReq(bucket_name, object_name, in_stream) {} + + virtual ~AsyncPutObjectByStreamReq() {} +}; + class AsyncMultiPutObjectReq : public PutObjectByFileReq { public: AsyncMultiPutObjectReq(const std::string& bucket_name, const std::string& object_name, diff --git a/src/cos_api.cpp b/src/cos_api.cpp index 4e10cb7..7014dd3 100644 --- a/src/cos_api.cpp +++ b/src/cos_api.cpp @@ -18,7 +18,8 @@ int CosAPI::s_cos_obj_num = 0; std::mutex g_init_lock; Poco::TaskManager& GetGlobalTaskManager() { - static Poco::TaskManager task_manager; + static Poco::ThreadPool async_thread_pool("aysnc_pool", 2, CosSysConfig::GetAsynThreadPoolSize()); + static Poco::TaskManager task_manager(async_thread_pool); return task_manager; } @@ -501,6 +502,23 @@ SharedAsyncContext CosAPI::AsyncPutObject(const AsyncPutObjectReq& req) { return context; } +SharedAsyncContext CosAPI::AsyncPutObject(const AsyncPutObjectByStreamReq& req) { + SharedTransferHandler handler(new TransferHandler()); + handler->SetRequest(reinterpret_cast(&req)); + auto& is = req.GetStream(); + is.seekg(0, std::ios::end); + handler->SetTotalSize(is.tellg()); + is.seekg(0, std::ios::beg); + TaskFunc fn = [=]() { + PutObjectByStreamResp resp; + m_object_op.PutObject(req, &resp, handler); + }; + GetGlobalTaskManager().start(new AsyncTask(std::move(fn))); + SharedAsyncContext context(new AsyncContext(handler)); + return context; +} + + SharedAsyncContext CosAPI::AsyncMultiPutObject(const AsyncMultiPutObjectReq& req) { SharedTransferHandler handler(new TransferHandler()); handler->SetRequest(reinterpret_cast(&req)); diff --git a/src/op/object_op.cpp b/src/op/object_op.cpp index 11a8bd8..e1458af 100644 --- a/src/op/object_op.cpp +++ b/src/op/object_op.cpp @@ -377,7 +377,7 @@ CosResult ObjectOp::MultiGetObject(const GetObjectByFileReq& req, } CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req, - PutObjectByStreamResp* resp) { + PutObjectByStreamResp* resp, const SharedTransferHandler& handler) { CosResult result; std::string host = CosSysConfig::GetHost(GetAppId(), m_config->GetRegion(), req.GetBucketName()); @@ -410,7 +410,7 @@ CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req, } result = UploadAction(host, path, req, additional_headers, - additional_params, is, resp); + additional_params, is, resp, handler); // V4 Etag长度为40字节 if (result.IsSucc() && need_check_etag && @@ -423,7 +423,12 @@ CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req, md5_str.c_str(), resp->GetEtag().c_str(), resp->GetXCosRequestId().c_str()); } - + if(result.IsSucc() && handler) { + handler->UpdateStatus(TransferStatus::COMPLETED, result, resp->GetHeaders(), + resp->GetBody()); + } else if(handler) { + handler->UpdateStatus(TransferStatus::FAILED, result); + } return result; }