88
99#include < iostream>
1010#include < memory>
11+ #include < optional>
1112
1213#include " flutter/fml/make_copyable.h"
13- #include " flutter/fml/message_loop_impl.h"
1414#include " flutter/fml/task_source.h"
1515#include " flutter/fml/thread_local.h"
1616
@@ -25,7 +25,7 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;
2525
2626namespace {
2727
28- // iOS prior to version 9 prevents c++11 thread_local and __thread specefier ,
28+ // iOS prior to version 9 prevents c++11 thread_local and __thread specifier ,
2929// having us resort to boxed enum containers.
3030class TaskSourceGradeHolder {
3131 public:
@@ -41,9 +41,7 @@ FML_THREAD_LOCAL ThreadLocalUniquePtr<TaskSourceGradeHolder>
4141 tls_task_source_grade;
4242
4343TaskQueueEntry::TaskQueueEntry (TaskQueueId created_for_arg)
44- : owner_of(_kUnmerged),
45- subsumed_by (_kUnmerged),
46- created_for(created_for_arg) {
44+ : subsumed_by(_kUnmerged), created_for(created_for_arg) {
4745 wakeable = NULL ;
4846 task_observers = TaskObservers ();
4947 task_source = std::make_unique<TaskSource>(created_for);
@@ -76,20 +74,21 @@ void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
7674 std::lock_guard guard (queue_mutex_);
7775 const auto & queue_entry = queue_entries_.at (queue_id);
7876 FML_DCHECK (queue_entry->subsumed_by == _kUnmerged);
79- TaskQueueId subsumed = queue_entry->owner_of ;
80- queue_entries_.erase (queue_id);
81- if (subsumed != _kUnmerged) {
77+ auto & subsumed_set = queue_entry->owner_of ;
78+ for (auto & subsumed : subsumed_set) {
8279 queue_entries_.erase (subsumed);
8380 }
81+ // Erase owner queue_id at last to avoid &subsumed_set from being invalid
82+ queue_entries_.erase (queue_id);
8483}
8584
8685void MessageLoopTaskQueues::DisposeTasks (TaskQueueId queue_id) {
8786 std::lock_guard guard (queue_mutex_);
8887 const auto & queue_entry = queue_entries_.at (queue_id);
8988 FML_DCHECK (queue_entry->subsumed_by == _kUnmerged);
90- TaskQueueId subsumed = queue_entry->owner_of ;
89+ auto & subsumed_set = queue_entry->owner_of ;
9190 queue_entry->task_source ->ShutDown ();
92- if ( subsumed != _kUnmerged ) {
91+ for ( auto & subsumed : subsumed_set ) {
9392 queue_entries_.at (subsumed)->task_source ->ShutDown ();
9493 }
9594}
@@ -170,8 +169,8 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
170169 size_t total_tasks = 0 ;
171170 total_tasks += queue_entry->task_source ->GetNumPendingTasks ();
172171
173- TaskQueueId subsumed = queue_entry->owner_of ;
174- if ( subsumed != _kUnmerged ) {
172+ auto & subsumed_set = queue_entry->owner_of ;
173+ for ( auto & subsumed : subsumed_set ) {
175174 const auto & subsumed_entry = queue_entries_.at (subsumed);
176175 total_tasks += subsumed_entry->task_source ->GetNumPendingTasks ();
177176 }
@@ -205,8 +204,8 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
205204 observers.push_back (observer.second );
206205 }
207206
208- TaskQueueId subsumed = queue_entries_.at (queue_id)->owner_of ;
209- if ( subsumed != _kUnmerged ) {
207+ auto & subsumed_set = queue_entries_.at (queue_id)->owner_of ;
208+ for ( auto & subsumed : subsumed_set ) {
210209 for (const auto & observer : queue_entries_.at (subsumed)->task_observers ) {
211210 observers.push_back (observer.second );
212211 }
@@ -230,22 +229,41 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
230229 std::lock_guard guard (queue_mutex_);
231230 auto & owner_entry = queue_entries_.at (owner);
232231 auto & subsumed_entry = queue_entries_.at (subsumed);
233-
234- if (owner_entry-> owner_of == subsumed ) {
232+ auto & subsumed_set = owner_entry-> owner_of ;
233+ if (subsumed_set. find (subsumed) != subsumed_set. end () ) {
235234 return true ;
236235 }
237236
238- std::vector<TaskQueueId> owner_subsumed_keys = {
239- owner_entry->owner_of , owner_entry->subsumed_by , subsumed_entry->owner_of ,
240- subsumed_entry->subsumed_by };
237+ // Won't check owner_entry->owner_of, because it may contains items when
238+ // merged with other different queues.
241239
242- for (auto key : owner_subsumed_keys) {
243- if (key != _kUnmerged) {
244- return false ;
245- }
240+ // Ensure owner_entry->subsumed_by being _kUnmerged
241+ if (owner_entry->subsumed_by != _kUnmerged) {
242+ FML_LOG (WARNING) << " Thread merging failed: owner_entry was already "
243+ " subsumed by others, owner="
244+ << owner << " , subsumed=" << subsumed
245+ << " , owner->subsumed_by=" << owner_entry->subsumed_by ;
246+ return false ;
246247 }
247-
248- owner_entry->owner_of = subsumed;
248+ // Ensure subsumed_entry->owner_of being empty
249+ if (!subsumed_entry->owner_of .empty ()) {
250+ FML_LOG (WARNING)
251+ << " Thread merging failed: subsumed_entry already owns others, owner="
252+ << owner << " , subsumed=" << subsumed
253+ << " , subsumed->owner_of.size()=" << subsumed_entry->owner_of .size ();
254+ return false ;
255+ }
256+ // Ensure subsumed_entry->subsumed_by being _kUnmerged
257+ if (subsumed_entry->subsumed_by != _kUnmerged) {
258+ FML_LOG (WARNING) << " Thread merging failed: subsumed_entry was already "
259+ " subsumed by others, owner="
260+ << owner << " , subsumed=" << subsumed
261+ << " , subsumed->subsumed_by="
262+ << subsumed_entry->subsumed_by ;
263+ return false ;
264+ }
265+ // All checking is OK, set merged state.
266+ owner_entry->owner_of .insert (subsumed);
249267 subsumed_entry->subsumed_by = owner;
250268
251269 if (HasPendingTasksUnlocked (owner)) {
@@ -255,16 +273,37 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
255273 return true ;
256274}
257275
258- bool MessageLoopTaskQueues::Unmerge (TaskQueueId owner) {
276+ bool MessageLoopTaskQueues::Unmerge (TaskQueueId owner, TaskQueueId subsumed ) {
259277 std::lock_guard guard (queue_mutex_);
260278 const auto & owner_entry = queue_entries_.at (owner);
261- const TaskQueueId subsumed = owner_entry->owner_of ;
262- if (subsumed == _kUnmerged) {
279+ if (owner_entry->owner_of .empty ()) {
280+ FML_LOG (WARNING)
281+ << " Thread unmerging failed: owner_entry doesn't own anyone, owner="
282+ << owner << " , subsumed=" << subsumed;
283+ return false ;
284+ }
285+ if (owner_entry->subsumed_by != _kUnmerged) {
286+ FML_LOG (WARNING)
287+ << " Thread unmerging failed: owner_entry was subsumed by others, owner="
288+ << owner << " , subsumed=" << subsumed
289+ << " , owner_entry->subsumed_by=" << owner_entry->subsumed_by ;
290+ return false ;
291+ }
292+ if (queue_entries_.at (subsumed)->subsumed_by == _kUnmerged) {
293+ FML_LOG (WARNING) << " Thread unmerging failed: subsumed_entry wasn't "
294+ " subsumed by others, owner="
295+ << owner << " , subsumed=" << subsumed;
296+ return false ;
297+ }
298+ if (owner_entry->owner_of .find (subsumed) == owner_entry->owner_of .end ()) {
299+ FML_LOG (WARNING) << " Thread unmerging failed: owner_entry didn't own the "
300+ " given subsumed queue id, owner="
301+ << owner << " , subsumed=" << subsumed;
263302 return false ;
264303 }
265304
266305 queue_entries_.at (subsumed)->subsumed_by = _kUnmerged;
267- owner_entry->owner_of = _kUnmerged ;
306+ owner_entry->owner_of . erase (subsumed) ;
268307
269308 if (HasPendingTasksUnlocked (owner)) {
270309 WakeUpUnlocked (owner, GetNextWakeTimeUnlocked (owner));
@@ -280,11 +319,14 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
280319bool MessageLoopTaskQueues::Owns (TaskQueueId owner,
281320 TaskQueueId subsumed) const {
282321 std::lock_guard guard (queue_mutex_);
283- return owner != _kUnmerged && subsumed != _kUnmerged &&
284- subsumed == queue_entries_.at (owner)->owner_of ;
322+ if (owner == _kUnmerged || subsumed == _kUnmerged) {
323+ return false ;
324+ }
325+ auto & subsumed_set = queue_entries_.at (owner)->owner_of ;
326+ return subsumed_set.find (subsumed) != subsumed_set.end ();
285327}
286328
287- TaskQueueId MessageLoopTaskQueues::GetSubsumedTaskQueueId (
329+ std::set< TaskQueueId> MessageLoopTaskQueues::GetSubsumedTaskQueueId (
288330 TaskQueueId owner) const {
289331 std::lock_guard guard (queue_mutex_);
290332 return queue_entries_.at (owner)->owner_of ;
@@ -318,13 +360,11 @@ bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
318360 return true ;
319361 }
320362
321- const TaskQueueId subsumed = entry->owner_of ;
322- if (subsumed == _kUnmerged) {
323- // this is not an owner and queue is empty.
324- return false ;
325- } else {
326- return !queue_entries_.at (subsumed)->task_source ->IsEmpty ();
327- }
363+ auto & subsumed_set = entry->owner_of ;
364+ return std::any_of (
365+ subsumed_set.begin (), subsumed_set.end (), [&](const auto & subsumed) {
366+ return !queue_entries_.at (subsumed)->task_source ->IsEmpty ();
367+ });
328368}
329369
330370fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked (
@@ -336,32 +376,35 @@ TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
336376 TaskQueueId owner) const {
337377 FML_DCHECK (HasPendingTasksUnlocked (owner));
338378 const auto & entry = queue_entries_.at (owner);
339- const TaskQueueId subsumed = entry->owner_of ;
340- if (subsumed == _kUnmerged) {
379+ if ( entry->owner_of . empty ()) {
380+ FML_CHECK (!entry-> task_source -> IsEmpty ());
341381 return entry->task_source ->Top ();
342382 }
343383
384+ // Use optional for the memory of TopTask object.
385+ std::optional<TaskSource::TopTask> top_task;
386+
387+ std::function<void (const TaskSource*)> top_task_updater =
388+ [&top_task](const TaskSource* source) {
389+ if (source && !source->IsEmpty ()) {
390+ TaskSource::TopTask other_task = source->Top ();
391+ if (!top_task.has_value () || top_task->task > other_task.task ) {
392+ top_task.emplace (other_task);
393+ }
394+ }
395+ };
396+
344397 TaskSource* owner_tasks = entry->task_source .get ();
345- TaskSource* subsumed_tasks = queue_entries_.at (subsumed)->task_source .get ();
346-
347- // we are owning another task queue
348- const bool subsumed_has_task = !subsumed_tasks->IsEmpty ();
349- const bool owner_has_task = !owner_tasks->IsEmpty ();
350- fml::TaskQueueId top_queue_id = owner;
351- if (owner_has_task && subsumed_has_task) {
352- const auto owner_task = owner_tasks->Top ();
353- const auto subsumed_task = subsumed_tasks->Top ();
354- if (owner_task.task > subsumed_task.task ) {
355- top_queue_id = subsumed;
356- } else {
357- top_queue_id = owner;
358- }
359- } else if (owner_has_task) {
360- top_queue_id = owner;
361- } else {
362- top_queue_id = subsumed;
398+ top_task_updater (owner_tasks);
399+
400+ for (TaskQueueId subsumed : entry->owner_of ) {
401+ TaskSource* subsumed_tasks = queue_entries_.at (subsumed)->task_source .get ();
402+ top_task_updater (subsumed_tasks);
363403 }
364- return queue_entries_.at (top_queue_id)->task_source ->Top ();
404+ // At least one task at the top because PeekNextTaskUnlocked() is called after
405+ // HasPendingTasksUnlocked()
406+ FML_CHECK (top_task.has_value ());
407+ return top_task.value ();
365408}
366409
367410} // namespace fml
0 commit comments