1111#include < optional>
1212
1313#include " flutter/fml/make_copyable.h"
14+ #include " flutter/fml/message_loop.h"
1415#include " flutter/fml/task_source.h"
1516#include " flutter/fml/thread_local.h"
1617
1718namespace fml {
1819
20+ std::mutex gThreadMergingLock ;
21+
1922std::mutex MessageLoopTaskQueues::creation_mutex_;
2023
2124const size_t TaskQueueId::kUnmerged = ULONG_MAX;
@@ -127,29 +130,7 @@ bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
127130fml::closure MessageLoopTaskQueues::GetNextTaskToRun (TaskQueueId queue_id,
128131 fml::TimePoint from_time) {
129132 std::lock_guard guard (queue_mutex_);
130- if (!HasPendingTasksUnlocked (queue_id)) {
131- return nullptr ;
132- }
133- TaskSource::TopTask top = PeekNextTaskUnlocked (queue_id);
134-
135- if (!HasPendingTasksUnlocked (queue_id)) {
136- WakeUpUnlocked (queue_id, fml::TimePoint::Max ());
137- } else {
138- WakeUpUnlocked (queue_id, GetNextWakeTimeUnlocked (queue_id));
139- }
140-
141- if (top.task .GetTargetTime () > from_time) {
142- return nullptr ;
143- }
144- fml::closure invocation = top.task .GetTask ();
145- queue_entries_.at (top.task_queue_id )
146- ->task_source ->PopTask (top.task .GetTaskSourceGrade ());
147- {
148- std::scoped_lock creation (creation_mutex_);
149- const auto task_source_grade = top.task .GetTaskSourceGrade ();
150- tls_task_source_grade.reset (new TaskSourceGradeHolder{task_source_grade});
151- }
152- return invocation;
133+ return GetNextTaskToRunUnlocked (queue_id, from_time);
153134}
154135
155136void MessageLoopTaskQueues::WakeUpUnlocked (TaskQueueId queue_id,
@@ -194,24 +175,7 @@ void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
194175std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify (
195176 TaskQueueId queue_id) const {
196177 std::lock_guard guard (queue_mutex_);
197- std::vector<fml::closure> observers;
198-
199- if (queue_entries_.at (queue_id)->subsumed_by != _kUnmerged) {
200- return observers;
201- }
202-
203- for (const auto & observer : queue_entries_.at (queue_id)->task_observers ) {
204- observers.push_back (observer.second );
205- }
206-
207- auto & subsumed_set = queue_entries_.at (queue_id)->owner_of ;
208- for (auto & subsumed : subsumed_set) {
209- for (const auto & observer : queue_entries_.at (subsumed)->task_observers ) {
210- observers.push_back (observer.second );
211- }
212- }
213-
214- return observers;
178+ return GetObserversToNotifyUnlocked (queue_id);
215179}
216180
217181void MessageLoopTaskQueues::SetWakeable (TaskQueueId queue_id,
@@ -262,7 +226,9 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
262226 << subsumed_entry->subsumed_by ;
263227 return false ;
264228 }
229+
265230 // All checking is OK, set merged state.
231+ std::scoped_lock lock (gThreadMergingLock );
266232 owner_entry->owner_of .insert (subsumed);
267233 subsumed_entry->subsumed_by = owner;
268234
@@ -319,11 +285,7 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner, TaskQueueId subsumed) {
319285bool MessageLoopTaskQueues::Owns (TaskQueueId owner,
320286 TaskQueueId subsumed) const {
321287 std::lock_guard guard (queue_mutex_);
322- if (owner == _kUnmerged || subsumed == _kUnmerged) {
323- return false ;
324- }
325- auto & subsumed_set = queue_entries_.at (owner)->owner_of ;
326- return subsumed_set.find (subsumed) != subsumed_set.end ();
288+ return OwnsUnlocked (owner, subsumed);
327289}
328290
329291std::set<TaskQueueId> MessageLoopTaskQueues::GetSubsumedTaskQueueId (
@@ -346,6 +308,33 @@ void MessageLoopTaskQueues::ResumeSecondarySource(TaskQueueId queue_id) {
346308 }
347309}
348310
311+ void MessageLoopTaskQueues::FlushExpiredTasksNow (TaskQueueId queue_id) {
312+ std::lock_guard guard (queue_mutex_);
313+ if (!MessageLoop::IsInitializedForCurrentThread ()) {
314+ return ;
315+ }
316+
317+ TaskQueueId current_queue_id = MessageLoop::GetCurrentTaskQueueId ();
318+ if (current_queue_id == queue_id ||
319+ OwnsUnlocked (current_queue_id, queue_id) ||
320+ OwnsUnlocked (queue_id, current_queue_id)) {
321+ const auto now = fml::TimePoint::Now ();
322+ fml::closure invocation;
323+ do {
324+ invocation = GetNextTaskToRunUnlocked (queue_id, now);
325+ if (!invocation) {
326+ return ;
327+ }
328+ invocation ();
329+ std::vector<fml::closure> observers =
330+ GetObserversToNotifyUnlocked (queue_id);
331+ for (const auto & observer : observers) {
332+ observer ();
333+ }
334+ } while (invocation);
335+ }
336+ }
337+
349338// Subsumed queues will never have pending tasks.
350339// Owning queues will consider both their and their subsumed tasks.
351340bool MessageLoopTaskQueues::HasPendingTasksUnlocked (
@@ -407,4 +396,64 @@ TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
407396 return top_task.value ();
408397}
409398
399+ bool MessageLoopTaskQueues::OwnsUnlocked (TaskQueueId owner,
400+ TaskQueueId subsumed) const {
401+ if (owner == _kUnmerged || subsumed == _kUnmerged) {
402+ return false ;
403+ }
404+ auto & subsumed_set = queue_entries_.at (owner)->owner_of ;
405+ return subsumed_set.find (subsumed) != subsumed_set.end ();
406+ }
407+
408+ fml::closure MessageLoopTaskQueues::GetNextTaskToRunUnlocked (
409+ TaskQueueId queue_id,
410+ fml::TimePoint from_time) {
411+ if (!HasPendingTasksUnlocked (queue_id)) {
412+ return nullptr ;
413+ }
414+
415+ TaskSource::TopTask top = PeekNextTaskUnlocked (queue_id);
416+
417+ if (!HasPendingTasksUnlocked (queue_id)) {
418+ WakeUpUnlocked (queue_id, fml::TimePoint::Max ());
419+ } else {
420+ WakeUpUnlocked (queue_id, GetNextWakeTimeUnlocked (queue_id));
421+ }
422+
423+ if (top.task .GetTargetTime () > from_time) {
424+ return nullptr ;
425+ }
426+
427+ fml::closure invocation = top.task .GetTask ();
428+ queue_entries_.at (top.task_queue_id )
429+ ->task_source ->PopTask (top.task .GetTaskSourceGrade ());
430+ {
431+ std::scoped_lock creation (creation_mutex_);
432+ const auto task_source_grade = top.task .GetTaskSourceGrade ();
433+ tls_task_source_grade.reset (new TaskSourceGradeHolder{task_source_grade});
434+ }
435+ return invocation;
436+ }
437+
438+ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotifyUnlocked (
439+ TaskQueueId queue_id) const {
440+ std::vector<fml::closure> observers;
441+
442+ if (queue_entries_.at (queue_id)->subsumed_by != _kUnmerged) {
443+ return observers;
444+ }
445+
446+ for (const auto & observer : queue_entries_.at (queue_id)->task_observers ) {
447+ observers.push_back (observer.second );
448+ }
449+
450+ auto & subsumed_set = queue_entries_.at (queue_id)->owner_of ;
451+ for (auto & subsumed : subsumed_set) {
452+ for (const auto & observer : queue_entries_.at (subsumed)->task_observers ) {
453+ observers.push_back (observer.second );
454+ }
455+ }
456+
457+ return observers;
458+ }
410459} // namespace fml
0 commit comments