Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fml/message_loop_task_queues.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
TaskQueueId subsumed) const {
std::lock_guard guard(queue_mutex_);
return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed;
return subsumed == queue_entries_.at(owner)->owner_of;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would you mind adding a TODO, and filing an issue about reverting this logic once Android is fixed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually might not need to revert this as I think this make sense. waiting for @iskakaushik to confirm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, this is good.

}

// Subsumed queues will never have pending tasks.
Expand Down
7 changes: 7 additions & 0 deletions fml/message_loop_task_queues_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ TEST(MessageLoopTaskQueue, NotifyObserversWhileCreatingQueues) {
before_second_observer.Signal();
notify_observers.join();
}

TEST(MessageLoopTaskQueue, QueueDoNotOwnItself) {
auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
auto queue_id = task_queue->CreateTaskQueue();
ASSERT_FALSE(task_queue->Owns(queue_id, queue_id));
}

// TODO(chunhtai): This unit-test is flaky and sometimes fails asynchronizely
// after the test has finished.
// https://github.com/flutter/flutter/issues/43858
Expand Down
71 changes: 55 additions & 16 deletions fml/raster_thread_merger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,97 @@ RasterThreadMerger::RasterThreadMerger(fml::TaskQueueId platform_queue_id,
gpu_queue_id_(gpu_queue_id),
task_queues_(fml::MessageLoopTaskQueues::GetInstance()),
lease_term_(kLeaseNotSet) {
is_merged_ = task_queues_->Owns(platform_queue_id_, gpu_queue_id_);
FML_CHECK(!task_queues_->Owns(platform_queue_id_, gpu_queue_id_));
}

void RasterThreadMerger::MergeWithLease(size_t lease_term) {
if (TaskQueuesAreSame()) {
return;
}

FML_DCHECK(lease_term > 0) << "lease_term should be positive.";
if (!is_merged_) {
is_merged_ = task_queues_->Merge(platform_queue_id_, gpu_queue_id_);
std::scoped_lock lock(lease_term_mutex_);
if (!IsMergedUnSafe()) {
bool success = task_queues_->Merge(platform_queue_id_, gpu_queue_id_);
FML_CHECK(success) << "Unable to merge the raster and platform threads.";
lease_term_ = lease_term;
}
merged_condition_.notify_one();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grab lock.

}

void RasterThreadMerger::UnMergeNow() {
if (TaskQueuesAreSame()) {
return;
}

std::scoped_lock lock(lease_term_mutex_);
lease_term_ = 0;
bool success = task_queues_->Unmerge(platform_queue_id_);
FML_CHECK(success) << "Unable to un-merge the raster and platform threads.";
}

bool RasterThreadMerger::IsOnPlatformThread() const {
return MessageLoop::GetCurrentTaskQueueId() == platform_queue_id_;
}

bool RasterThreadMerger::IsOnRasterizingThread() const {
if (is_merged_) {
bool RasterThreadMerger::IsOnRasterizingThread() {
if (IsMergedUnSafe()) {
return IsOnPlatformThread();
} else {
return !IsOnPlatformThread();
}
}

void RasterThreadMerger::ExtendLeaseTo(size_t lease_term) {
FML_DCHECK(lease_term > 0) << "lease_term should be positive.";
if (TaskQueuesAreSame()) {
return;
}
std::scoped_lock lock(lease_term_mutex_);
FML_DCHECK(IsMergedUnSafe()) << "lease_term should be positive.";
if (lease_term_ != kLeaseNotSet &&
static_cast<int>(lease_term) > lease_term_) {
lease_term_ = lease_term;
}
}

bool RasterThreadMerger::IsMerged() const {
return is_merged_;
bool RasterThreadMerger::IsMerged() {
std::scoped_lock lock(lease_term_mutex_);
return IsMergedUnSafe();
}

RasterThreadStatus RasterThreadMerger::DecrementLease() {
if (!is_merged_) {
return RasterThreadStatus::kRemainsUnmerged;
bool RasterThreadMerger::IsMergedUnSafe() {
return lease_term_ > 0 || TaskQueuesAreSame();
}

bool RasterThreadMerger::TaskQueuesAreSame() {
return platform_queue_id_ == gpu_queue_id_;
}

void RasterThreadMerger::WaitUntilMerged() {
if (TaskQueuesAreSame()) {
return;
}
FML_CHECK(IsOnPlatformThread());
std::unique_lock<std::mutex> lock(lease_term_mutex_);
merged_condition_.wait(lock, [&] { return IsMergedUnSafe(); });
}

// we haven't been set to merge.
if (lease_term_ == kLeaseNotSet) {
RasterThreadStatus RasterThreadMerger::DecrementLease() {
if (TaskQueuesAreSame()) {
return RasterThreadStatus::kRemainsMerged;
}
std::unique_lock<std::mutex> lock(lease_term_mutex_);
if (!IsMergedUnSafe()) {
return RasterThreadStatus::kRemainsUnmerged;
}

FML_DCHECK(lease_term_ > 0)
<< "lease_term should always be positive when merged.";
lease_term_--;
if (lease_term_ == 0) {
bool success = task_queues_->Unmerge(platform_queue_id_);
FML_CHECK(success) << "Unable to un-merge the raster and platform threads.";
is_merged_ = false;
// |UnMergeNow| is going to acquire the lock again.
lock.unlock();
UnMergeNow();
return RasterThreadStatus::kUnmergedNow;
}

Expand Down
36 changes: 33 additions & 3 deletions fml/raster_thread_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#ifndef FML_SHELL_COMMON_TASK_RUNNER_MERGER_H_
#define FML_SHELL_COMMON_TASK_RUNNER_MERGER_H_

#include <condition_variable>
#include <mutex>
#include "flutter/fml/macros.h"
#include "flutter/fml/memory/ref_counted.h"
#include "flutter/fml/message_loop_task_queues.h"
Expand All @@ -28,23 +30,45 @@ class RasterThreadMerger
// When the caller merges with a lease term of say 2. The threads
// are going to remain merged until 2 invocations of |DecreaseLease|,
// unless an |ExtendLeaseTo| gets called.
//
// If the task queues are the same, we consider them statically merged.
// When task queues are statically merged this method becomes no-op.
void MergeWithLease(size_t lease_term);

// Un-merges the threads now, and resets the lease term to 0.
//
// Must be executed on the raster task runner.
//
// If the task queues are the same, we consider them statically merged.
// When task queues are statically merged, we never unmerge them and
// this method becomes no-op.
void UnMergeNow();

// If the task queues are the same, we consider them statically merged.
// When task queues are statically merged this method becomes no-op.
void ExtendLeaseTo(size_t lease_term);

// Returns |RasterThreadStatus::kUnmergedNow| if this call resulted in
// splitting the raster and platform threads. Reduces the lease term by 1.
//
// If the task queues are the same, we consider them statically merged.
// When task queues are statically merged this method becomes no-op.
RasterThreadStatus DecrementLease();

bool IsMerged() const;
bool IsMerged();

// Waits until the threads are merged.
//
// Must run on the platform task runner.
void WaitUntilMerged();

RasterThreadMerger(fml::TaskQueueId platform_queue_id,
fml::TaskQueueId gpu_queue_id);

// Returns true if the current thread owns rasterizing.
// When the threads are merged, platform thread owns rasterizing.
// When un-merged, raster thread owns rasterizing.
bool IsOnRasterizingThread() const;
bool IsOnRasterizingThread();

// Returns true if the current thread is the platform thread.
bool IsOnPlatformThread() const;
Expand All @@ -55,7 +79,13 @@ class RasterThreadMerger
fml::TaskQueueId gpu_queue_id_;
fml::RefPtr<fml::MessageLoopTaskQueues> task_queues_;
std::atomic_int lease_term_;
bool is_merged_;
std::condition_variable merged_condition_;
std::mutex lease_term_mutex_;

bool IsMergedUnSafe();
// The platform_queue_id and gpu_queue_id are exactly the same.
// We consider the threads are always merged and cannot be unmerged.
bool TaskQueuesAreSame();

FML_FRIEND_REF_COUNTED_THREAD_SAFE(RasterThreadMerger);
FML_FRIEND_MAKE_REF_COUNTED(RasterThreadMerger);
Expand Down
93 changes: 93 additions & 0 deletions fml/raster_thread_merger_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,96 @@ TEST(RasterThreadMerger, LeaseExtension) {
thread1.join();
thread2.join();
}

TEST(RasterThreadMerger, WaitUntilMerged) {
fml::RefPtr<fml::RasterThreadMerger> raster_thread_merger;

fml::AutoResetWaitableEvent create_thread_merger_latch;
fml::MessageLoop* loop_platform = nullptr;
fml::AutoResetWaitableEvent latch_platform;
fml::AutoResetWaitableEvent term_platform;
fml::AutoResetWaitableEvent latch_merged;
std::thread thread_platform([&]() {
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop_platform = &fml::MessageLoop::GetCurrent();
latch_platform.Signal();
create_thread_merger_latch.Wait();
raster_thread_merger->WaitUntilMerged();
latch_merged.Signal();
term_platform.Wait();
});

const int kNumFramesMerged = 5;
fml::MessageLoop* loop_raster = nullptr;
fml::AutoResetWaitableEvent term_raster;
std::thread thread_raster([&]() {
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop_raster = &fml::MessageLoop::GetCurrent();
latch_platform.Wait();
fml::TaskQueueId qid_platform =
loop_platform->GetTaskRunner()->GetTaskQueueId();
fml::TaskQueueId qid_raster =
loop_raster->GetTaskRunner()->GetTaskQueueId();
raster_thread_merger =
fml::MakeRefCounted<fml::RasterThreadMerger>(qid_platform, qid_raster);
ASSERT_FALSE(raster_thread_merger->IsMerged());
create_thread_merger_latch.Signal();
raster_thread_merger->MergeWithLease(kNumFramesMerged);
term_raster.Wait();
});

latch_merged.Wait();
ASSERT_TRUE(raster_thread_merger->IsMerged());

for (int i = 0; i < kNumFramesMerged; i++) {
ASSERT_TRUE(raster_thread_merger->IsMerged());
raster_thread_merger->DecrementLease();
}

ASSERT_FALSE(raster_thread_merger->IsMerged());

term_platform.Signal();
term_raster.Signal();
thread_platform.join();
thread_raster.join();
}

TEST(RasterThreadMerger, HandleTaskQueuesAreTheSame) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing checks for ASSERT_TRUE(raster_thread_merger_->TaskQueuesAreSame()), and ASSERT_FALSE(raster_thread_merger_->TaskQueuesAreSame()) in a separate TEST.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make raster_thread_merger_->TaskQueuesAreSame() private. The user of the raster_thread_merger shouldn't need to know if the merging is static or dynamic. So the test only needed to test if the threads are merged.

fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
fml::AutoResetWaitableEvent term1;
std::thread thread1([&loop1, &latch1, &term1]() {
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop1 = &fml::MessageLoop::GetCurrent();
latch1.Signal();
term1.Wait();
});

latch1.Wait();

fml::TaskQueueId qid1 = loop1->GetTaskRunner()->GetTaskQueueId();
fml::TaskQueueId qid2 = qid1;
const auto raster_thread_merger_ =
fml::MakeRefCounted<fml::RasterThreadMerger>(qid1, qid2);
// Statically merged.
ASSERT_TRUE(raster_thread_merger_->IsMerged());

// Test decrement lease and unmerge are both no-ops.
// The task queues should be always merged.
const int kNumFramesMerged = 5;
raster_thread_merger_->MergeWithLease(kNumFramesMerged);

for (int i = 0; i < kNumFramesMerged; i++) {
ASSERT_TRUE(raster_thread_merger_->IsMerged());
raster_thread_merger_->DecrementLease();
}

ASSERT_TRUE(raster_thread_merger_->IsMerged());

// Wait until merged should also return immediately.
raster_thread_merger_->WaitUntilMerged();
ASSERT_TRUE(raster_thread_merger_->IsMerged());

term1.Signal();
thread1.join();
}
22 changes: 22 additions & 0 deletions shell/common/rasterizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ void Rasterizer::Teardown() {
compositor_context_->OnGrContextDestroyed();
surface_.reset();
last_layer_tree_.reset();
if (raster_thread_merger_.get() != nullptr &&
raster_thread_merger_.get()->IsMerged()) {
raster_thread_merger_->UnMergeNow();
}
}

void Rasterizer::NotifyLowMemoryWarning() const {
Expand Down Expand Up @@ -659,6 +663,24 @@ std::optional<size_t> Rasterizer::GetResourceCacheMaxBytes() const {
return std::nullopt;
}

bool Rasterizer::EnsureThreadsAreMerged() {
if (surface_ == nullptr || raster_thread_merger_.get() == nullptr) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what case surface_ = nullptr ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the rasterizer has already been torn down, the surface_ would be null. And in that case we shouldn't try to merge threads or wait the threads to be merged.

return false;
}
fml::TaskRunner::RunNowOrPostTask(
delegate_.GetTaskRunners().GetRasterTaskRunner(),
[weak_this = weak_factory_.GetWeakPtr(),
thread_merger = raster_thread_merger_]() {
if (weak_this->surface_ == nullptr) {
return;
}
thread_merger->MergeWithLease(10);
});
raster_thread_merger_->WaitUntilMerged();
FML_DCHECK(raster_thread_merger_->IsMerged());
return true;
}

Rasterizer::Screenshot::Screenshot() {}

Rasterizer::Screenshot::Screenshot(sk_sp<SkData> p_data, SkISize p_size)
Expand Down
16 changes: 16 additions & 0 deletions shell/common/rasterizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,22 @@ class Rasterizer final : public SnapshotDelegate {
///
std::optional<size_t> GetResourceCacheMaxBytes() const;

//----------------------------------------------------------------------------
/// @brief Makes sure the raster task runner and the platform task runner
/// are merged.
///
/// @attention If raster and platform task runners are not the same or not
/// merged, this method will try to merge the task runners,
/// blocking the current thread until the 2 task runners are
/// merged.
///
/// @return `true` if raster and platform task runners are the same.
/// `true` if/when raster and platform task runners are merged.
/// `false` if the surface or the |RasterThreadMerger| has not
/// been initialized.
///
bool EnsureThreadsAreMerged();

private:
Delegate& delegate_;
std::unique_ptr<Surface> surface_;
Expand Down
Loading