Skip to content

Commit 8be2a9d

Browse files
Orri Erlingfacebook-github-bot
authored andcommitted
Add AsyncSource template for speculative execution (#903)
Summary: AsyncSource encapsulates a background computation. These can be scheduled on a background executor. The difference between this and a future is that when the user requires the result, AsyncSource will perform the async peration on the caller's thread and will turn the background operation into a no-op. Pull Request resolved: #903 Reviewed By: pedroerp Differential Revision: D33687621 Pulled By: oerling fbshipit-source-id: 46f3e7613bd67bc67e7b43336fc64e0d74c9c8ba
1 parent e4aca81 commit 8be2a9d

File tree

3 files changed

+210
-0
lines changed

3 files changed

+210
-0
lines changed

velox/common/base/AsyncSource.h

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <functional>
20+
#include <memory>
21+
22+
#include <folly/executors/QueuedImmediateExecutor.h>
23+
#include <folly/futures/Future.h>
24+
25+
namespace facebook::velox {
26+
27+
// A future-like object that prefabricates Items on an executor and
28+
// allows consumer threads to pick items as they are ready. If the
29+
// consumer needs the item before the executor started making it,
30+
// the consumer will make it instead. If multiple consumers request
31+
// the same item, exactly one gets it.
32+
template <typename Item>
33+
class AsyncSource {
34+
public:
35+
explicit AsyncSource(std::function<std::unique_ptr<Item>()> make)
36+
: make_(make) {}
37+
38+
// Makes an item if it is not already made. To be called on a background
39+
// executor.
40+
void prepare() {
41+
std::function<std::unique_ptr<Item>()> make = nullptr;
42+
{
43+
std::lock_guard<std::mutex> l(mutex_);
44+
if (!make_) {
45+
return;
46+
}
47+
making_ = true;
48+
std::swap(make, make_);
49+
}
50+
item_ = make();
51+
{
52+
std::lock_guard<std::mutex> l(mutex_);
53+
making_ = false;
54+
if (promise_) {
55+
promise_->setValue(true);
56+
promise_ = nullptr;
57+
}
58+
}
59+
}
60+
61+
// Returns the item to the first caller and nullptr to subsequent callers. If
62+
// the item is preparing on the executor, waits for the item and otherwise
63+
// makes it on the caller thread.
64+
std::unique_ptr<Item> move() {
65+
std::function<std::unique_ptr<Item>()> make = nullptr;
66+
folly::SemiFuture<bool> wait(false);
67+
{
68+
std::lock_guard<std::mutex> l(mutex_);
69+
if (item_) {
70+
return std::move(item_);
71+
}
72+
if (promise_) {
73+
// Somebody else is now waiting for the item to be made.
74+
return nullptr;
75+
}
76+
if (making_) {
77+
promise_ = std::make_unique<folly::Promise<bool>>();
78+
wait = promise_->getSemiFuture();
79+
} else {
80+
if (!make_) {
81+
return nullptr;
82+
}
83+
std::swap(make, make_);
84+
}
85+
}
86+
// Outside of mutex_.
87+
if (make) {
88+
return make();
89+
}
90+
auto& exec = folly::QueuedImmediateExecutor::instance();
91+
std::move(wait).via(&exec).wait();
92+
std::lock_guard<std::mutex> l(mutex_);
93+
return std::move(item_);
94+
}
95+
96+
// If true, move() will not block. But there is no guarantee that somebody
97+
// else will not get the item first.
98+
bool hasValue() const {
99+
return item_ != nullptr;
100+
}
101+
102+
private:
103+
std::mutex mutex_;
104+
// True if 'prepare() is making the item.
105+
bool making_{false};
106+
std::unique_ptr<folly::Promise<bool>> promise_;
107+
std::unique_ptr<Item> item_;
108+
std::function<std::unique_ptr<Item>()> make_;
109+
};
110+
} // namespace facebook::velox
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/common/base/AsyncSource.h"
18+
#include <fmt/format.h>
19+
#include <folly/Random.h>
20+
#include <folly/Synchronized.h>
21+
#include <gtest/gtest.h>
22+
#include <thread>
23+
24+
using namespace facebook::velox;
25+
26+
// A sample class to be constructed via AsyncSource.
27+
struct Gizmo {
28+
explicit Gizmo(int32_t _id) : id(_id) {}
29+
30+
const int32_t id;
31+
};
32+
33+
TEST(AsyncSourceTest, basic) {
34+
AsyncSource<Gizmo> gizmo([]() { return std::make_unique<Gizmo>(11); });
35+
EXPECT_FALSE(gizmo.hasValue());
36+
gizmo.prepare();
37+
EXPECT_TRUE(gizmo.hasValue());
38+
auto value = gizmo.move();
39+
EXPECT_FALSE(gizmo.hasValue());
40+
EXPECT_EQ(11, value->id);
41+
}
42+
43+
TEST(AsyncSourceTest, threads) {
44+
constexpr int32_t kNumThreads = 10;
45+
constexpr int32_t kNumGizmos = 2000;
46+
folly::Synchronized<std::unordered_set<int32_t>> results;
47+
std::vector<std::shared_ptr<AsyncSource<Gizmo>>> gizmos;
48+
for (auto i = 0; i < kNumGizmos; ++i) {
49+
gizmos.push_back(std::make_shared<AsyncSource<Gizmo>>([i]() {
50+
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // NOLINT
51+
return std::make_unique<Gizmo>(i);
52+
}));
53+
}
54+
55+
std::vector<std::thread> threads;
56+
threads.reserve(kNumThreads);
57+
for (int32_t threadIndex = 0; threadIndex < kNumThreads; ++threadIndex) {
58+
threads.push_back(std::thread([threadIndex, &gizmos, &results]() {
59+
if (threadIndex < kNumThreads / 2) {
60+
// The first half of the threads prepare Gizmos in the background.
61+
for (auto i = 0; i < kNumGizmos; ++i) {
62+
gizmos[i]->prepare();
63+
}
64+
} else {
65+
// The rest of the threads first get random Gizmos and then do a pass
66+
// over all the Gizmos to make sure all get collected. We assert that
67+
// each Gizmo is obtained once.
68+
folly::Random::DefaultGenerator rng;
69+
for (auto i = 0; i < kNumGizmos / 3; ++i) {
70+
auto gizmo =
71+
gizmos[folly::Random::rand32(rng) % gizmos.size()]->move();
72+
if (gizmo) {
73+
results.withWLock([&](auto& set) {
74+
EXPECT_TRUE(set.find(gizmo->id) == set.end());
75+
set.insert(gizmo->id);
76+
});
77+
}
78+
}
79+
for (auto i = 0; i < gizmos.size(); ++i) {
80+
auto gizmo = gizmos[i]->move();
81+
if (gizmo) {
82+
results.withWLock([&](auto& set) {
83+
EXPECT_TRUE(set.find(gizmo->id) == set.end());
84+
set.insert(gizmo->id);
85+
});
86+
}
87+
}
88+
}
89+
}));
90+
}
91+
for (auto& thread : threads) {
92+
thread.join();
93+
}
94+
results.withRLock([&](auto& set) {
95+
for (auto i = 0; i < kNumGizmos; ++i) {
96+
EXPECT_TRUE(set.find(i) != set.end());
97+
}
98+
});
99+
}

velox/common/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
add_executable(
1616
velox_common_test
17+
AsyncSourceTest.cpp
1718
ExceptionTests.cpp
1819
RangeTest.cpp
1920
BitUtilTest.cpp

0 commit comments

Comments
 (0)