Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.

Commit c5329ef

Browse files
authored
Allow embedders to schedule a callback on all engine managed threads. (#15980)
`FlutterEnginePostCallbackOnAllNativeThreads` schedule a callback to be run on all engine managed threads. The engine will attempt to service this callback the next time the message loops for each managed thread is idle. Since the engine manages the entire lifecycle of multiple threads, there is no opportunity for the embedders to finely tune the priorities of threads directly, or, perform other thread specific configuration (for example, setting thread names for tracing). This callback gives embedders a chance to affect such tuning. Fixes flutter/flutter#49551 Fixes b/143774406 Fixes b/148278215 Fixes b/148278931
1 parent cbf4536 commit c5329ef

File tree

10 files changed

+345
-17
lines changed

10 files changed

+345
-17
lines changed

fml/concurrent_message_loop.cc

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
2626
WorkerMain();
2727
});
2828
}
29+
30+
for (const auto& worker : workers_) {
31+
worker_thread_ids_.emplace_back(worker.get_id());
32+
}
2933
}
3034

3135
ConcurrentMessageLoop::~ConcurrentMessageLoop() {
@@ -73,25 +77,43 @@ void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
7377
void ConcurrentMessageLoop::WorkerMain() {
7478
while (true) {
7579
std::unique_lock lock(tasks_mutex_);
76-
tasks_condition_.wait(lock,
77-
[&]() { return tasks_.size() > 0 || shutdown_; });
80+
tasks_condition_.wait(lock, [&]() {
81+
return tasks_.size() > 0 || shutdown_ || HasThreadTasksLocked();
82+
});
7883

79-
if (tasks_.size() == 0) {
80-
// This can only be caused by shutdown.
81-
FML_DCHECK(shutdown_);
82-
break;
84+
// Shutdown cannot be read with the task mutex unlocked.
85+
bool shutdown_now = shutdown_;
86+
fml::closure task;
87+
std::vector<fml::closure> thread_tasks;
88+
89+
if (tasks_.size() != 0) {
90+
task = tasks_.front();
91+
tasks_.pop();
8392
}
8493

85-
auto task = tasks_.front();
86-
tasks_.pop();
94+
if (HasThreadTasksLocked()) {
95+
thread_tasks = GetThreadTasksLocked();
96+
FML_DCHECK(!HasThreadTasksLocked());
97+
}
8798

88-
// Don't hold onto the mutex while the task is being executed as it could
89-
// itself try to post another tasks to this message loop.
99+
// Don't hold onto the mutex while tasks are being executed as they could
100+
// themselves try to post more tasks to the message loop.
90101
lock.unlock();
91102

92103
TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
93-
// Execute the one tasks we woke up for.
94-
task();
104+
// Execute the primary task we woke up for.
105+
if (task) {
106+
task();
107+
}
108+
109+
// Execute any thread tasks.
110+
for (const auto& thread_task : thread_tasks) {
111+
thread_task();
112+
}
113+
114+
if (shutdown_now) {
115+
break;
116+
}
95117
}
96118
}
97119

@@ -101,6 +123,31 @@ void ConcurrentMessageLoop::Terminate() {
101123
tasks_condition_.notify_all();
102124
}
103125

126+
void ConcurrentMessageLoop::PostTaskToAllWorkers(fml::closure task) {
127+
if (!task) {
128+
return;
129+
}
130+
131+
std::scoped_lock lock(tasks_mutex_);
132+
for (const auto& worker_thread_id : worker_thread_ids_) {
133+
thread_tasks_[worker_thread_id].emplace_back(task);
134+
}
135+
tasks_condition_.notify_all();
136+
}
137+
138+
bool ConcurrentMessageLoop::HasThreadTasksLocked() const {
139+
return thread_tasks_.count(std::this_thread::get_id()) > 0;
140+
}
141+
142+
std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
143+
auto found = thread_tasks_.find(std::this_thread::get_id());
144+
FML_DCHECK(found != thread_tasks_.end());
145+
std::vector<fml::closure> pending_tasks;
146+
std::swap(pending_tasks, found->second);
147+
thread_tasks_.erase(found);
148+
return pending_tasks;
149+
}
150+
104151
ConcurrentTaskRunner::ConcurrentTaskRunner(
105152
std::weak_ptr<ConcurrentMessageLoop> weak_loop)
106153
: weak_loop_(std::move(weak_loop)) {}

fml/concurrent_message_loop.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#define FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_
77

88
#include <condition_variable>
9+
#include <map>
910
#include <queue>
1011
#include <thread>
1112

@@ -30,6 +31,8 @@ class ConcurrentMessageLoop
3031

3132
void Terminate();
3233

34+
void PostTaskToAllWorkers(fml::closure task);
35+
3336
private:
3437
friend ConcurrentTaskRunner;
3538

@@ -38,6 +41,8 @@ class ConcurrentMessageLoop
3841
std::mutex tasks_mutex_;
3942
std::condition_variable tasks_condition_;
4043
std::queue<fml::closure> tasks_;
44+
std::vector<std::thread::id> worker_thread_ids_;
45+
std::map<std::thread::id, std::vector<fml::closure>> thread_tasks_;
4146
bool shutdown_ = false;
4247

4348
ConcurrentMessageLoop(size_t worker_count);
@@ -46,6 +51,10 @@ class ConcurrentMessageLoop
4651

4752
void PostTask(const fml::closure& task);
4853

54+
bool HasThreadTasksLocked() const;
55+
56+
std::vector<fml::closure> GetThreadTasksLocked();
57+
4958
FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentMessageLoop);
5059
};
5160

runtime/dart_vm.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,4 +504,8 @@ DartVM::GetConcurrentWorkerTaskRunner() const {
504504
return concurrent_message_loop_->GetTaskRunner();
505505
}
506506

507+
std::shared_ptr<fml::ConcurrentMessageLoop> DartVM::GetConcurrentMessageLoop() {
508+
return concurrent_message_loop_;
509+
}
510+
507511
} // namespace flutter

runtime/dart_vm.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,19 @@ class DartVM {
147147
std::shared_ptr<fml::ConcurrentTaskRunner> GetConcurrentWorkerTaskRunner()
148148
const;
149149

150+
//----------------------------------------------------------------------------
151+
/// @brief The concurrent message loop hosts threads that are used by the
152+
/// engine to perform tasks long running background tasks.
153+
/// Typically, to post tasks to this message loop, the
154+
/// `GetConcurrentWorkerTaskRunner` method may be used.
155+
///
156+
/// @see GetConcurrentWorkerTaskRunner
157+
///
158+
/// @return The concurrent message loop used by this running Dart VM
159+
/// instance.
160+
///
161+
std::shared_ptr<fml::ConcurrentMessageLoop> GetConcurrentMessageLoop();
162+
150163
private:
151164
const Settings settings_;
152165
std::shared_ptr<fml::ConcurrentMessageLoop> concurrent_message_loop_;

shell/common/shell.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,14 @@ class Shell final : public PlatformView::Delegate,
352352
/// @brief Accessor for the disable GPU SyncSwitch
353353
std::shared_ptr<fml::SyncSwitch> GetIsGpuDisabledSyncSwitch() const;
354354

355+
//----------------------------------------------------------------------------
356+
/// @brief Get a pointer to the Dart VM used by this running shell
357+
/// instance.
358+
///
359+
/// @return The Dart VM pointer.
360+
///
361+
DartVM* GetDartVM();
362+
355363
private:
356364
using ServiceProtocolHandler =
357365
std::function<bool(const ServiceProtocol::Handler::ServiceProtocolMap&,
@@ -424,8 +432,6 @@ class Shell final : public PlatformView::Delegate,
424432
std::unique_ptr<Rasterizer> rasterizer,
425433
std::unique_ptr<ShellIOManager> io_manager);
426434

427-
DartVM* GetDartVM();
428-
429435
void ReportTimings();
430436

431437
// |PlatformView::Delegate|

shell/platform/embedder/embedder.cc

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1724,7 +1724,6 @@ FlutterEngineResult FlutterEnginePostDartObject(
17241724
return kSuccess;
17251725
}
17261726

1727-
FLUTTER_EXPORT
17281727
FlutterEngineResult FlutterEngineNotifyLowMemoryWarning(
17291728
FLUTTER_API_SYMBOL(FlutterEngine) raw_engine) {
17301729
auto engine = reinterpret_cast<flutter::EmbedderEngine*>(raw_engine);
@@ -1747,3 +1746,27 @@ FlutterEngineResult FlutterEngineNotifyLowMemoryWarning(
17471746
kInternalInconsistency,
17481747
"Could not dispatch the low memory notification message.");
17491748
}
1749+
1750+
FlutterEngineResult FlutterEnginePostCallbackOnAllNativeThreads(
1751+
FLUTTER_API_SYMBOL(FlutterEngine) engine,
1752+
FlutterNativeThreadCallback callback,
1753+
void* user_data) {
1754+
if (engine == nullptr) {
1755+
return LOG_EMBEDDER_ERROR(kInvalidArguments, "Invalid engine handle.");
1756+
}
1757+
1758+
if (callback == nullptr) {
1759+
return LOG_EMBEDDER_ERROR(kInvalidArguments,
1760+
"Invalid native thread callback.");
1761+
}
1762+
1763+
return reinterpret_cast<flutter::EmbedderEngine*>(engine)
1764+
->PostTaskOnEngineManagedNativeThreads(
1765+
[callback, user_data](FlutterNativeThreadType type) {
1766+
callback(type, user_data);
1767+
})
1768+
? kSuccess
1769+
: LOG_EMBEDDER_ERROR(kInvalidArguments,
1770+
"Internal error while attempting to post "
1771+
"tasks to all threads.");
1772+
}

shell/platform/embedder/embedder.h

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,31 @@ typedef struct {
934934
};
935935
} FlutterEngineDartObject;
936936

937+
/// This enum allows embedders to determine the type of the engine thread in the
938+
/// FlutterNativeThreadCallback. Based on the thread type, the embedder may be
939+
/// able to tweak the thread priorities for optimum performance.
940+
typedef enum {
941+
/// The Flutter Engine considers the thread on which the FlutterEngineRun call
942+
/// is made to be the platform thread. There is only one such thread per
943+
/// engine instance.
944+
kFlutterNativeThreadTypePlatform,
945+
/// This is the thread the Flutter Engine uses to execute rendering commands
946+
/// based on the selected client rendering API. There is only one such thread
947+
/// per engine instance.
948+
kFlutterNativeThreadTypeRender,
949+
/// This is a dedicated thread on which the root Dart isolate is serviced.
950+
/// There is only one such thread per engine instance.
951+
kFlutterNativeThreadTypeUI,
952+
/// Multiple threads are used by the Flutter engine to perform long running
953+
/// background tasks.
954+
kFlutterNativeThreadTypeWorker,
955+
} FlutterNativeThreadType;
956+
957+
/// A callback made by the engine in response to
958+
/// `FlutterEnginePostCallbackOnAllNativeThreads` on all internal thread.
959+
typedef void (*FlutterNativeThreadCallback)(FlutterNativeThreadType type,
960+
void* user_data);
961+
937962
typedef struct {
938963
/// The size of this struct. Must be sizeof(FlutterProjectArgs).
939964
size_t struct_size;
@@ -1667,6 +1692,45 @@ FLUTTER_EXPORT
16671692
FlutterEngineResult FlutterEngineNotifyLowMemoryWarning(
16681693
FLUTTER_API_SYMBOL(FlutterEngine) engine);
16691694

1695+
//------------------------------------------------------------------------------
1696+
/// @brief Schedule a callback to be run on all engine managed threads.
1697+
/// The engine will attempt to service this callback the next time
1698+
/// the message loop for each managed thread is idle. Since the
1699+
/// engine manages the entire lifecycle of multiple threads, there
1700+
/// is no opportunity for the embedders to finely tune the
1701+
/// priorities of threads directly, or, perform other thread
1702+
/// specific configuration (for example, setting thread names for
1703+
/// tracing). This callback gives embedders a chance to affect such
1704+
/// tuning.
1705+
///
1706+
/// @attention This call is expensive and must be made as few times as
1707+
/// possible. The callback must also return immediately as not doing
1708+
/// so may risk performance issues (especially for callbacks of type
1709+
/// kFlutterNativeThreadTypeUI and kFlutterNativeThreadTypeRender).
1710+
///
1711+
/// @attention Some callbacks (especially the ones of type
1712+
/// kFlutterNativeThreadTypeWorker) may be called after the
1713+
/// FlutterEngine instance has shut down. Embedders must be careful
1714+
/// in handling the lifecycle of objects associated with the user
1715+
/// data baton.
1716+
///
1717+
/// @attention In case there are multiple running Flutter engine instances,
1718+
/// their workers are shared.
1719+
///
1720+
/// @param[in] engine A running engine instance.
1721+
/// @param[in] callback The callback that will get called multiple times on
1722+
/// each engine managed thread.
1723+
/// @param[in] user_data A baton passed by the engine to the callback. This
1724+
/// baton is not interpreted by the engine in any way.
1725+
///
1726+
/// @return Returns if the callback was successfully posted to all threads.
1727+
///
1728+
FLUTTER_EXPORT
1729+
FlutterEngineResult FlutterEnginePostCallbackOnAllNativeThreads(
1730+
FLUTTER_API_SYMBOL(FlutterEngine) engine,
1731+
FlutterNativeThreadCallback callback,
1732+
void* user_data);
1733+
16701734
#if defined(__cplusplus)
16711735
} // extern "C"
16721736
#endif

shell/platform/embedder/embedder_engine.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,34 @@ bool EmbedderEngine::RunTask(const FlutterTask* task) {
247247
task->task);
248248
}
249249

250-
const Shell& EmbedderEngine::GetShell() const {
250+
bool EmbedderEngine::PostTaskOnEngineManagedNativeThreads(
251+
std::function<void(FlutterNativeThreadType)> closure) const {
252+
if (!IsValid() || closure == nullptr) {
253+
return false;
254+
}
255+
256+
const auto trampoline = [closure](FlutterNativeThreadType type,
257+
fml::RefPtr<fml::TaskRunner> runner) {
258+
runner->PostTask([closure, type] { closure(type); });
259+
};
260+
261+
// Post the task to all thread host threads.
262+
const auto& task_runners = shell_->GetTaskRunners();
263+
trampoline(kFlutterNativeThreadTypeRender, task_runners.GetGPUTaskRunner());
264+
trampoline(kFlutterNativeThreadTypeWorker, task_runners.GetIOTaskRunner());
265+
trampoline(kFlutterNativeThreadTypeUI, task_runners.GetUITaskRunner());
266+
trampoline(kFlutterNativeThreadTypePlatform,
267+
task_runners.GetPlatformTaskRunner());
268+
269+
// Post the task to all worker threads.
270+
auto vm = shell_->GetDartVM();
271+
vm->GetConcurrentMessageLoop()->PostTaskToAllWorkers(
272+
[closure]() { closure(kFlutterNativeThreadTypeWorker); });
273+
274+
return true;
275+
}
276+
277+
Shell& EmbedderEngine::GetShell() {
251278
FML_DCHECK(shell_);
252279
return *shell_.get();
253280
}

shell/platform/embedder/embedder_engine.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ class EmbedderEngine {
8080

8181
bool RunTask(const FlutterTask* task);
8282

83-
const Shell& GetShell() const;
83+
bool PostTaskOnEngineManagedNativeThreads(
84+
std::function<void(FlutterNativeThreadType)> closure) const;
85+
86+
Shell& GetShell();
8487

8588
private:
8689
const std::unique_ptr<EmbedderThreadHost> thread_host_;

0 commit comments

Comments
 (0)