Skip to content

Commit 5314d99

Browse files
authored
Use llvm::unique_function in the async APIs (#166727)
This is needed to allow using these APIs with callable objects that transitively capture move-only constructs. These come up very widely when writing concurrent code such a `std::future`, `std::promise`, `std::unique_lock`, etc.
1 parent f8e9b89 commit 5314d99

File tree

3 files changed

+29
-12
lines changed

3 files changed

+29
-12
lines changed

llvm/include/llvm/Support/ThreadPool.h

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#define LLVM_SUPPORT_THREADPOOL_H
1515

1616
#include "llvm/ADT/DenseMap.h"
17+
#include "llvm/ADT/FunctionExtras.h"
1718
#include "llvm/Config/llvm-config.h"
1819
#include "llvm/Support/Compiler.h"
1920
#include "llvm/Support/Jobserver.h"
@@ -51,7 +52,7 @@ class ThreadPoolTaskGroup;
5152
class LLVM_ABI ThreadPoolInterface {
5253
/// The actual method to enqueue a task to be defined by the concrete
5354
/// implementation.
54-
virtual void asyncEnqueue(std::function<void()> Task,
55+
virtual void asyncEnqueue(llvm::unique_function<void()> Task,
5556
ThreadPoolTaskGroup *Group) = 0;
5657

5758
public:
@@ -95,22 +96,22 @@ class LLVM_ABI ThreadPoolInterface {
9596
/// used to wait for the task to finish and is *non-blocking* on destruction.
9697
template <typename Func>
9798
auto async(Func &&F) -> std::shared_future<decltype(F())> {
98-
return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
99-
nullptr);
99+
return asyncImpl(
100+
llvm::unique_function<decltype(F())()>(std::forward<Func>(F)), nullptr);
100101
}
101102

102103
template <typename Func>
103104
auto async(ThreadPoolTaskGroup &Group, Func &&F)
104105
-> std::shared_future<decltype(F())> {
105-
return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
106-
&Group);
106+
return asyncImpl(
107+
llvm::unique_function<decltype(F())()>(std::forward<Func>(F)), &Group);
107108
}
108109

109110
private:
110111
/// Asynchronous submission of a task to the pool. The returned future can be
111112
/// used to wait for the task to finish and is *non-blocking* on destruction.
112113
template <typename ResTy>
113-
std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
114+
std::shared_future<ResTy> asyncImpl(llvm::unique_function<ResTy()> Task,
114115
ThreadPoolTaskGroup *Group) {
115116
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
116117
asyncEnqueue([Future]() { Future.wait(); }, Group);
@@ -160,7 +161,7 @@ class LLVM_ABI StdThreadPool : public ThreadPoolInterface {
160161

161162
/// Asynchronous submission of a task to the pool. The returned future can be
162163
/// used to wait for the task to finish and is *non-blocking* on destruction.
163-
void asyncEnqueue(std::function<void()> Task,
164+
void asyncEnqueue(llvm::unique_function<void()> Task,
164165
ThreadPoolTaskGroup *Group) override {
165166
int requestedThreads;
166167
{
@@ -189,7 +190,8 @@ class LLVM_ABI StdThreadPool : public ThreadPoolInterface {
189190
mutable llvm::sys::RWMutex ThreadsLock;
190191

191192
/// Tasks waiting for execution in the pool.
192-
std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
193+
std::deque<std::pair<llvm::unique_function<void()>, ThreadPoolTaskGroup *>>
194+
Tasks;
193195

194196
/// Locking and signaling for accessing the Tasks queue.
195197
std::mutex QueueLock;
@@ -239,13 +241,14 @@ class LLVM_ABI SingleThreadExecutor : public ThreadPoolInterface {
239241
private:
240242
/// Asynchronous submission of a task to the pool. The returned future can be
241243
/// used to wait for the task to finish and is *non-blocking* on destruction.
242-
void asyncEnqueue(std::function<void()> Task,
244+
void asyncEnqueue(llvm::unique_function<void()> Task,
243245
ThreadPoolTaskGroup *Group) override {
244246
Tasks.emplace_back(std::make_pair(std::move(Task), Group));
245247
}
246248

247249
/// Tasks waiting for execution in the pool.
248-
std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
250+
std::deque<std::pair<llvm::unique_function<void()>, ThreadPoolTaskGroup *>>
251+
Tasks;
249252
};
250253

251254
#if LLVM_ENABLE_THREADS

llvm/lib/Support/ThreadPool.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *>
7373
// WaitingForGroup == nullptr means all tasks regardless of their group.
7474
void StdThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) {
7575
while (true) {
76-
std::function<void()> Task;
76+
llvm::unique_function<void()> Task;
7777
ThreadPoolTaskGroup *GroupOfTask;
7878
{
7979
std::unique_lock<std::mutex> LockGuard(QueueLock);
@@ -189,7 +189,7 @@ void StdThreadPool::processTasksWithJobserver() {
189189

190190
// While we hold a job slot, process tasks from the internal queue.
191191
while (true) {
192-
std::function<void()> Task;
192+
llvm::unique_function<void()> Task;
193193
ThreadPoolTaskGroup *GroupOfTask = nullptr;
194194

195195
{

llvm/unittests/Support/ThreadPool.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,20 @@ TYPED_TEST(ThreadPoolTest, Async) {
183183
ASSERT_EQ(2, i.load());
184184
}
185185

186+
TYPED_TEST(ThreadPoolTest, AsyncMoveOnly) {
187+
CHECK_UNSUPPORTED();
188+
DefaultThreadPool Pool;
189+
std::promise<int> p;
190+
std::future<int> f = p.get_future();
191+
Pool.async([this, p = std::move(p)]() mutable {
192+
this->waitForMainThread();
193+
p.set_value(42);
194+
});
195+
this->setMainThreadReady();
196+
Pool.wait();
197+
ASSERT_EQ(42, f.get());
198+
}
199+
186200
TYPED_TEST(ThreadPoolTest, GetFuture) {
187201
CHECK_UNSUPPORTED();
188202
DefaultThreadPool Pool(hardware_concurrency(2));

0 commit comments

Comments
 (0)