Skip to content

Commit 06043f1

Browse files
authored
内存数据异步上传接口实现&线程池线程数设置 (#117)
1 parent c7da7c0 commit 06043f1

File tree

5 files changed

+39
-5
lines changed

5 files changed

+39
-5
lines changed

include/cos_api.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,8 @@ class CosAPI {
723723
/// \return 返回context
724724
SharedAsyncContext AsyncPutObject(const AsyncPutObjectReq& req);
725725

726+
SharedAsyncContext AsyncPutObject(const AsyncPutObjectByStreamReq& req);
727+
726728
/// \brief
727729
/// 异步上传对象,封装了初始化分块上传、分块上传、完成分块上传三步,支持断点续传
728730
/// \param req MultiPutObjectAsync请求

include/op/object_op.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class ObjectOp : public BaseOp {
103103
///
104104
/// \return 返回HTTP请求的状态码及错误信息
105105
CosResult PutObject(const PutObjectByStreamReq& req,
106-
PutObjectByStreamResp* resp);
106+
PutObjectByStreamResp* resp, const SharedTransferHandler& handler=nullptr);
107107

108108
/// \brief 删除Object
109109
///

include/request/object_req.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1712,6 +1712,15 @@ class AsyncPutObjectReq : public PutObjectByFileReq {
17121712
virtual ~AsyncPutObjectReq() {}
17131713
};
17141714

1715+
class AsyncPutObjectByStreamReq : public PutObjectByStreamReq {
1716+
public:
1717+
AsyncPutObjectByStreamReq(const std::string& bucket_name, const std::string& object_name,
1718+
std::istream& in_stream)
1719+
: PutObjectByStreamReq(bucket_name, object_name, in_stream) {}
1720+
1721+
virtual ~AsyncPutObjectByStreamReq() {}
1722+
};
1723+
17151724
class AsyncMultiPutObjectReq : public PutObjectByFileReq {
17161725
public:
17171726
AsyncMultiPutObjectReq(const std::string& bucket_name, const std::string& object_name,

src/cos_api.cpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ int CosAPI::s_cos_obj_num = 0;
1818
std::mutex g_init_lock;
1919

2020
Poco::TaskManager& GetGlobalTaskManager() {
21-
static Poco::TaskManager task_manager;
21+
static Poco::ThreadPool async_thread_pool("aysnc_pool", 2, CosSysConfig::GetAsynThreadPoolSize());
22+
static Poco::TaskManager task_manager(async_thread_pool);
2223
return task_manager;
2324
}
2425

@@ -501,6 +502,23 @@ SharedAsyncContext CosAPI::AsyncPutObject(const AsyncPutObjectReq& req) {
501502
return context;
502503
}
503504

505+
SharedAsyncContext CosAPI::AsyncPutObject(const AsyncPutObjectByStreamReq& req) {
506+
SharedTransferHandler handler(new TransferHandler());
507+
handler->SetRequest(reinterpret_cast<const void*>(&req));
508+
auto& is = req.GetStream();
509+
is.seekg(0, std::ios::end);
510+
handler->SetTotalSize(is.tellg());
511+
is.seekg(0, std::ios::beg);
512+
TaskFunc fn = [=]() {
513+
PutObjectByStreamResp resp;
514+
m_object_op.PutObject(req, &resp, handler);
515+
};
516+
GetGlobalTaskManager().start(new AsyncTask(std::move(fn)));
517+
SharedAsyncContext context(new AsyncContext(handler));
518+
return context;
519+
}
520+
521+
504522
SharedAsyncContext CosAPI::AsyncMultiPutObject(const AsyncMultiPutObjectReq& req) {
505523
SharedTransferHandler handler(new TransferHandler());
506524
handler->SetRequest(reinterpret_cast<const void*>(&req));

src/op/object_op.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ CosResult ObjectOp::MultiGetObject(const GetObjectByFileReq& req,
377377
}
378378

379379
CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req,
380-
PutObjectByStreamResp* resp) {
380+
PutObjectByStreamResp* resp, const SharedTransferHandler& handler) {
381381
CosResult result;
382382
std::string host = CosSysConfig::GetHost(GetAppId(), m_config->GetRegion(),
383383
req.GetBucketName());
@@ -410,7 +410,7 @@ CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req,
410410
}
411411

412412
result = UploadAction(host, path, req, additional_headers,
413-
additional_params, is, resp);
413+
additional_params, is, resp, handler);
414414

415415
// V4 Etag长度为40字节
416416
if (result.IsSucc() && need_check_etag &&
@@ -423,7 +423,12 @@ CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req,
423423
md5_str.c_str(), resp->GetEtag().c_str(),
424424
resp->GetXCosRequestId().c_str());
425425
}
426-
426+
if(result.IsSucc() && handler) {
427+
handler->UpdateStatus(TransferStatus::COMPLETED, result, resp->GetHeaders(),
428+
resp->GetBody());
429+
} else if(handler) {
430+
handler->UpdateStatus(TransferStatus::FAILED, result);
431+
}
427432
return result;
428433
}
429434

0 commit comments

Comments
 (0)