Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7f8752a
Implementation of two or more threads merging
eggfly Jul 4, 2021
06ab2a9
Merge branch 'flutter:master' into master_multiple_thread_mergers
eggfly Jul 23, 2021
1d56764
Format code
eggfly Jul 23, 2021
49a883d
Format gn code
eggfly Jul 23, 2021
8d50b2c
Remove error log
eggfly Jul 23, 2021
6ff3fb7
Add licenses for new files
eggfly Jul 23, 2021
1f675be
Remove some comments and add EnsureInitializedForCurrentThread
eggfly Jul 23, 2021
686b7be
Remove used code
eggfly Jul 23, 2021
30b21ca
Disable fml_unittests only for test
eggfly Jul 23, 2021
b0a2b4f
Some modifications after first review
eggfly Jul 25, 2021
dfda6a5
Change the owner_of member's doc
eggfly Jul 25, 2021
c0effef
Factor out a wrapper to avoid repeated code for raster_thread_merger_…
eggfly Jul 26, 2021
560af31
Merge branch 'flutter:master' into master_multiple_thread_mergers
eggfly Jul 27, 2021
747cb12
Use optional instead of vector in PeekNextTaskUnlocked method
eggfly Jul 27, 2021
42b0808
Add detail logs for merge error cases
eggfly Jul 27, 2021
7be5911
Update Merge/Unmerge docs, add more tests
eggfly Jul 27, 2021
6883d9d
Use raw pointers, add some comment
eggfly Jul 27, 2021
1ce6a56
Fix a wrong dispose order to avoid objc test failure
eggfly Jul 28, 2021
61eda81
fix raw pointer problem
eggfly Jul 28, 2021
2330090
Fix windows msvc build error
eggfly Jul 28, 2021
dd555db
Fix wrong member order which causes windows fml_unittests failure
eggfly Jul 29, 2021
ec8df0f
Rename some files and change to better doc
eggfly Jul 30, 2021
8263643
Remove the static map, store the parent merger in shell instead.
eggfly Aug 3, 2021
1d48d15
Let every merger has their own lease_term to avoid multi-decrement bug
eggfly Aug 5, 2021
5dba812
Merge branch 'flutter:master' into master_multiple_thread_mergers
eggfly Aug 5, 2021
116a022
Remove unused const declaration
eggfly Aug 5, 2021
9f25cb3
Fix some nits and write better doc after code review
eggfly Aug 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/licenses_golden/licenses_flutter
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ FILE: ../../../flutter/fml/posix_wrappers.h
FILE: ../../../flutter/fml/raster_thread_merger.cc
FILE: ../../../flutter/fml/raster_thread_merger.h
FILE: ../../../flutter/fml/raster_thread_merger_unittests.cc
FILE: ../../../flutter/fml/shared_thread_merger.cc
FILE: ../../../flutter/fml/shared_thread_merger.h
FILE: ../../../flutter/fml/size.h
FILE: ../../../flutter/fml/status.h
FILE: ../../../flutter/fml/synchronization/atomic_object.h
Expand Down
2 changes: 2 additions & 0 deletions fml/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ source_set("fml") {
"posix_wrappers.h",
"raster_thread_merger.cc",
"raster_thread_merger.h",
"shared_thread_merger.cc",
"shared_thread_merger.h",
"size.h",
"synchronization/atomic_object.h",
"synchronization/count_down_latch.cc",
Expand Down
13 changes: 10 additions & 3 deletions fml/memory/task_runner_checker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace fml {

TaskRunnerChecker::TaskRunnerChecker()
: initialized_queue_id_(InitTaskQueueId()),
subsumed_queue_id_(
subsumed_queue_ids_(
MessageLoopTaskQueues::GetInstance()->GetSubsumedTaskQueueId(
initialized_queue_id_)){};

Expand All @@ -17,8 +17,15 @@ TaskRunnerChecker::~TaskRunnerChecker() = default;
bool TaskRunnerChecker::RunsOnCreationTaskRunner() const {
FML_CHECK(fml::MessageLoop::IsInitializedForCurrentThread());
const auto current_queue_id = MessageLoop::GetCurrentTaskQueueId();
return RunsOnTheSameThread(current_queue_id, initialized_queue_id_) ||
RunsOnTheSameThread(current_queue_id, subsumed_queue_id_);
if (RunsOnTheSameThread(current_queue_id, initialized_queue_id_)) {
return true;
}
for (auto& subsumed : subsumed_queue_ids_) {
if (RunsOnTheSameThread(current_queue_id, subsumed)) {
return true;
}
}
return false;
};

bool TaskRunnerChecker::RunsOnTheSameThread(TaskQueueId queue_a,
Expand Down
2 changes: 1 addition & 1 deletion fml/memory/task_runner_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TaskRunnerChecker final {

private:
TaskQueueId initialized_queue_id_;
TaskQueueId subsumed_queue_id_;
std::set<TaskQueueId> subsumed_queue_ids_;

TaskQueueId InitTaskQueueId();
};
Expand Down
6 changes: 3 additions & 3 deletions fml/memory/task_runner_checker_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ TEST(TaskRunnerCheckerTests, MergedTaskRunnersRunsOnTheSameThread) {
fml::TaskQueueId qid2 = loop2->GetTaskRunner()->GetTaskQueueId();
const auto raster_thread_merger_ =
fml::MakeRefCounted<fml::RasterThreadMerger>(qid1, qid2);
const int kNumFramesMerged = 5;
const size_t kNumFramesMerged = 5;

raster_thread_merger_->MergeWithLease(kNumFramesMerged);

// merged, running on the same thread
EXPECT_EQ(TaskRunnerChecker::RunsOnTheSameThread(qid1, qid2), true);

for (int i = 0; i < kNumFramesMerged; i++) {
for (size_t i = 0; i < kNumFramesMerged; i++) {
ASSERT_TRUE(raster_thread_merger_->IsMerged());
raster_thread_merger_->DecrementLease();
}
Expand Down Expand Up @@ -154,7 +154,7 @@ TEST(TaskRunnerCheckerTests,
});
latch3.Wait();

fml::MessageLoopTaskQueues::GetInstance()->Unmerge(qid1);
fml::MessageLoopTaskQueues::GetInstance()->Unmerge(qid1, qid2);

fml::AutoResetWaitableEvent latch4;
loop2->GetTaskRunner()->PostTask([&]() {
Expand Down
4 changes: 2 additions & 2 deletions fml/memory/weak_ptr_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ TEST(TaskRunnerAffineWeakPtrTest, ShouldNotCrashIfRunningOnTheSameTaskRunner) {
fml::TaskQueueId qid2 = loop2->GetTaskRunner()->GetTaskQueueId();
const auto raster_thread_merger_ =
fml::MakeRefCounted<fml::RasterThreadMerger>(qid1, qid2);
const int kNumFramesMerged = 5;
const size_t kNumFramesMerged = 5;

raster_thread_merger_->MergeWithLease(kNumFramesMerged);

loop2_task_start_latch.Signal();
loop2_task_finish_latch.Wait();

for (int i = 0; i < kNumFramesMerged; i++) {
for (size_t i = 0; i < kNumFramesMerged; i++) {
ASSERT_TRUE(raster_thread_merger_->IsMerged());
raster_thread_merger_->DecrementLease();
}
Expand Down
163 changes: 103 additions & 60 deletions fml/message_loop_task_queues.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

#include <iostream>
#include <memory>
#include <optional>

#include "flutter/fml/make_copyable.h"
#include "flutter/fml/message_loop_impl.h"
#include "flutter/fml/task_source.h"
#include "flutter/fml/thread_local.h"

Expand All @@ -25,7 +25,7 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;

namespace {

// iOS prior to version 9 prevents c++11 thread_local and __thread specefier,
// iOS prior to version 9 prevents c++11 thread_local and __thread specifier,
// having us resort to boxed enum containers.
class TaskSourceGradeHolder {
public:
Expand All @@ -41,9 +41,7 @@ FML_THREAD_LOCAL ThreadLocalUniquePtr<TaskSourceGradeHolder>
tls_task_source_grade;

TaskQueueEntry::TaskQueueEntry(TaskQueueId created_for_arg)
: owner_of(_kUnmerged),
subsumed_by(_kUnmerged),
created_for(created_for_arg) {
: subsumed_by(_kUnmerged), created_for(created_for_arg) {
wakeable = NULL;
task_observers = TaskObservers();
task_source = std::make_unique<TaskSource>(created_for);
Expand Down Expand Up @@ -76,20 +74,21 @@ void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entries_.erase(queue_id);
if (subsumed != _kUnmerged) {
auto& subsumed_set = queue_entry->owner_of;
for (auto& subsumed : subsumed_set) {
queue_entries_.erase(subsumed);
}
// Erase owner queue_id at last to avoid &subsumed_set from being invalid
queue_entries_.erase(queue_id);
}

void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
auto& subsumed_set = queue_entry->owner_of;
queue_entry->task_source->ShutDown();
if (subsumed != _kUnmerged) {
for (auto& subsumed : subsumed_set) {
queue_entries_.at(subsumed)->task_source->ShutDown();
}
}
Expand Down Expand Up @@ -170,8 +169,8 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
size_t total_tasks = 0;
total_tasks += queue_entry->task_source->GetNumPendingTasks();

TaskQueueId subsumed = queue_entry->owner_of;
if (subsumed != _kUnmerged) {
auto& subsumed_set = queue_entry->owner_of;
for (auto& subsumed : subsumed_set) {
const auto& subsumed_entry = queue_entries_.at(subsumed);
total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
}
Expand Down Expand Up @@ -205,8 +204,8 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
observers.push_back(observer.second);
}

TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
if (subsumed != _kUnmerged) {
auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
for (auto& subsumed : subsumed_set) {
for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
observers.push_back(observer.second);
}
Expand All @@ -230,22 +229,41 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
std::lock_guard guard(queue_mutex_);
auto& owner_entry = queue_entries_.at(owner);
auto& subsumed_entry = queue_entries_.at(subsumed);

if (owner_entry->owner_of == subsumed) {
auto& subsumed_set = owner_entry->owner_of;
if (subsumed_set.find(subsumed) != subsumed_set.end()) {
return true;
}

std::vector<TaskQueueId> owner_subsumed_keys = {
owner_entry->owner_of, owner_entry->subsumed_by, subsumed_entry->owner_of,
subsumed_entry->subsumed_by};
// Won't check owner_entry->owner_of, because it may contains items when
// merged with other different queues.

for (auto key : owner_subsumed_keys) {
if (key != _kUnmerged) {
return false;
}
// Ensure owner_entry->subsumed_by being _kUnmerged
if (owner_entry->subsumed_by != _kUnmerged) {
FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
"subsumed by others, owner="
<< owner << ", subsumed=" << subsumed
<< ", owner->subsumed_by=" << owner_entry->subsumed_by;
return false;
}

owner_entry->owner_of = subsumed;
// Ensure subsumed_entry->owner_of being empty
if (!subsumed_entry->owner_of.empty()) {
FML_LOG(WARNING)
<< "Thread merging failed: subsumed_entry already owns others, owner="
<< owner << ", subsumed=" << subsumed
<< ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
return false;
}
// Ensure subsumed_entry->subsumed_by being _kUnmerged
if (subsumed_entry->subsumed_by != _kUnmerged) {
FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
"subsumed by others, owner="
<< owner << ", subsumed=" << subsumed
<< ", subsumed->subsumed_by="
<< subsumed_entry->subsumed_by;
return false;
}
// All checking is OK, set merged state.
owner_entry->owner_of.insert(subsumed);
subsumed_entry->subsumed_by = owner;

if (HasPendingTasksUnlocked(owner)) {
Expand All @@ -255,16 +273,37 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
return true;
}

bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner, TaskQueueId subsumed) {
std::lock_guard guard(queue_mutex_);
const auto& owner_entry = queue_entries_.at(owner);
const TaskQueueId subsumed = owner_entry->owner_of;
if (subsumed == _kUnmerged) {
if (owner_entry->owner_of.empty()) {
FML_LOG(WARNING)
<< "Thread unmerging failed: owner_entry doesn't own anyone, owner="
<< owner << ", subsumed=" << subsumed;
return false;
}
if (owner_entry->subsumed_by != _kUnmerged) {
FML_LOG(WARNING)
<< "Thread unmerging failed: owner_entry was subsumed by others, owner="
<< owner << ", subsumed=" << subsumed
<< ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
return false;
}
if (queue_entries_.at(subsumed)->subsumed_by == _kUnmerged) {
FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
"subsumed by others, owner="
<< owner << ", subsumed=" << subsumed;
return false;
}
if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
"given subsumed queue id, owner="
<< owner << ", subsumed=" << subsumed;
return false;
}

queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
owner_entry->owner_of = _kUnmerged;
owner_entry->owner_of.erase(subsumed);

if (HasPendingTasksUnlocked(owner)) {
WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
Expand All @@ -280,11 +319,14 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
TaskQueueId subsumed) const {
std::lock_guard guard(queue_mutex_);
return owner != _kUnmerged && subsumed != _kUnmerged &&
subsumed == queue_entries_.at(owner)->owner_of;
if (owner == _kUnmerged || subsumed == _kUnmerged) {
return false;
}
auto& subsumed_set = queue_entries_.at(owner)->owner_of;
return subsumed_set.find(subsumed) != subsumed_set.end();
}

TaskQueueId MessageLoopTaskQueues::GetSubsumedTaskQueueId(
std::set<TaskQueueId> MessageLoopTaskQueues::GetSubsumedTaskQueueId(
TaskQueueId owner) const {
std::lock_guard guard(queue_mutex_);
return queue_entries_.at(owner)->owner_of;
Expand Down Expand Up @@ -318,13 +360,11 @@ bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
return true;
}

const TaskQueueId subsumed = entry->owner_of;
if (subsumed == _kUnmerged) {
// this is not an owner and queue is empty.
return false;
} else {
return !queue_entries_.at(subsumed)->task_source->IsEmpty();
}
auto& subsumed_set = entry->owner_of;
return std::any_of(
subsumed_set.begin(), subsumed_set.end(), [&](const auto& subsumed) {
return !queue_entries_.at(subsumed)->task_source->IsEmpty();
});
}

fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
Expand All @@ -336,32 +376,35 @@ TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
TaskQueueId owner) const {
FML_DCHECK(HasPendingTasksUnlocked(owner));
const auto& entry = queue_entries_.at(owner);
const TaskQueueId subsumed = entry->owner_of;
if (subsumed == _kUnmerged) {
if (entry->owner_of.empty()) {
FML_CHECK(!entry->task_source->IsEmpty());
return entry->task_source->Top();
}

// Use optional for the memory of TopTask object.
std::optional<TaskSource::TopTask> top_task;

std::function<void(const TaskSource*)> top_task_updater =
[&top_task](const TaskSource* source) {
if (source && !source->IsEmpty()) {
TaskSource::TopTask other_task = source->Top();
if (!top_task.has_value() || top_task->task > other_task.task) {
top_task.emplace(other_task);
}
}
};

TaskSource* owner_tasks = entry->task_source.get();
TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();

// we are owning another task queue
const bool subsumed_has_task = !subsumed_tasks->IsEmpty();
const bool owner_has_task = !owner_tasks->IsEmpty();
fml::TaskQueueId top_queue_id = owner;
if (owner_has_task && subsumed_has_task) {
const auto owner_task = owner_tasks->Top();
const auto subsumed_task = subsumed_tasks->Top();
if (owner_task.task > subsumed_task.task) {
top_queue_id = subsumed;
} else {
top_queue_id = owner;
}
} else if (owner_has_task) {
top_queue_id = owner;
} else {
top_queue_id = subsumed;
top_task_updater(owner_tasks);

for (TaskQueueId subsumed : entry->owner_of) {
TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();
top_task_updater(subsumed_tasks);
}
return queue_entries_.at(top_queue_id)->task_source->Top();
// At least one task at the top because PeekNextTaskUnlocked() is called after
// HasPendingTasksUnlocked()
FML_CHECK(top_task.has_value());
return top_task.value();
}

} // namespace fml
Loading