Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions impeller/base/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ impeller_component("base") {
"comparable.cc",
"comparable.h",
"config.h",
"job_queue.cc",
"job_queue.h",
"mask.h",
"promise.cc",
"promise.h",
Expand Down
104 changes: 104 additions & 0 deletions impeller/base/job_queue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "impeller/base/job_queue.h"

#include "flutter/fml/trace_event.h"
#include "impeller/base/validation.h"

namespace impeller {

JobQueue::JobQueue(std::shared_ptr<fml::ConcurrentTaskRunner> task_runner)
: task_runner_(std::move(task_runner)) {}

JobQueue::~JobQueue() = default;

std::shared_ptr<JobQueue> JobQueue::Make(
std::shared_ptr<fml::ConcurrentTaskRunner> task_runner) {
return std::shared_ptr<JobQueue>(new JobQueue(std::move(task_runner)));
}

UniqueID JobQueue::AddJob(fml::closure job) {
auto id = UniqueID{};
{
Lock lock(jobs_mutex_);
jobs_[id] = job;
}
ScheduleAndRunJob(id);
return id;
}

void JobQueue::ScheduleAndRunJob(UniqueID id) {
task_runner_->PostTask([weak = weak_from_this(), id]() {
if (auto thiz = weak.lock()) {
thiz->RunJobNow(id);
}
});
}

void JobQueue::PrioritizeJob(UniqueID id) {
Lock lock(jobs_mutex_);
high_priority_job_ids_.push_back(id);
}

void JobQueue::RunJobNow(UniqueID id) {
while (RunHighPriorityJob()) {
}
fml::closure job;
{
Lock lock(jobs_mutex_);
job = TakeJob(id);
}
if (job) {
TRACE_EVENT0("impeller", "RegularJob");
job();
}
}

bool JobQueue::RunHighPriorityJob() {
fml::closure job;
{
Lock lock(jobs_mutex_);
if (high_priority_job_ids_.empty()) {
return false;
}
auto job_id = high_priority_job_ids_.front();
high_priority_job_ids_.pop_front();
job = TakeJob(job_id);
}
if (job) {
TRACE_EVENT0("impeller", "HighPriorityJob");
job();
}
return true;
}

void JobQueue::DoJobNow(UniqueID id) {
fml::closure job;
{
Lock lock(jobs_mutex_);
job = TakeJob(id);
}
if (job) {
TRACE_EVENT0("impeller", "EagerJob");
job();
}
}

fml::closure JobQueue::TakeJob(UniqueID id) {
auto found = jobs_.find(id);
if (found == jobs_.end()) {
return nullptr;
}
auto job = found->second;
jobs_.erase(found);
return job;
}

const std::shared_ptr<fml::ConcurrentTaskRunner>& JobQueue::GetTaskRunner()
const {
return task_runner_;
}

} // namespace impeller
107 changes: 107 additions & 0 deletions impeller/base/job_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef FLUTTER_IMPELLER_BASE_JOB_QUEUE_H_
#define FLUTTER_IMPELLER_BASE_JOB_QUEUE_H_

#include <deque>
#include <map>
#include <memory>

#include "flutter/fml/closure.h"
#include "flutter/fml/concurrent_message_loop.h"
#include "impeller/base/comparable.h"
#include "impeller/base/thread.h"

namespace impeller {

//------------------------------------------------------------------------------
/// @brief Manages a queue of jobs that execute on a concurrent task
/// runner. Jobs execute as soon as resources are available. Callers
/// have the ability to re-prioritize jobs or perform jobs eagerly
/// on their own threads if needed.
///
/// The job queue and all its methods are thread-safe.
///
class JobQueue final : public std::enable_shared_from_this<JobQueue> {
public:
//----------------------------------------------------------------------------
/// @brief Creates a job queue which schedules tasks on the given
/// concurrent task runner.
///
/// @param[in] task_runner The task runner
///
/// @return A job queue if one can be created.
///
static std::shared_ptr<JobQueue> Make(
std::shared_ptr<fml::ConcurrentTaskRunner> task_runner);

//----------------------------------------------------------------------------
/// @brief Destroys the job queue.
///
~JobQueue();

JobQueue(const JobQueue&) = delete;

JobQueue& operator=(const JobQueue&) = delete;

//----------------------------------------------------------------------------
/// @brief Adds a job to the job queue and schedules it for execution at
/// a later point in time. The job ID obtained may be used to
/// re-prioritize the job within the queue at a later point.
///
/// @param[in] job The job
///
/// @return The unique id for the job.
///
UniqueID AddJob(fml::closure job);

//----------------------------------------------------------------------------
/// @brief Prioritize a previously added job for immediate execution on
/// the concurrent task runner.
///
/// @param[in] id The job identifier.
///
void PrioritizeJob(UniqueID id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be used somewhere? My understanding is that every pipeline would be scheduled to be created, then there is a mechanism to boost the priority when the pipeline is requested?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I was going to add this the spot where the glyph atlas pipeline was being setup. Still WIP.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every pipeline will be scheduled eventually in the order in which the jobs were added. Jobs can skip to the front of the queue as necessary (this call). Jobs can be performed eagerly on anything too to avoid a context switch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to re-prioritize the glyph atlas shader? Can we simply move it to the front of the queue in ContentContext?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think thats what I mean? Just call PrioritizeJob on it immediately after?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger, will re-arrange the pipeline and remove PrioritizeJob. Keep the jobqueue still or pass?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stall is completely gone (at least in this app) after the rearrange.
Screenshot 2024-08-06 at 11 52 55 AM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'll need the job queue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think so too. Hopefully. Just checking the gallery and if I can suppress the regression on the mokey, that should be it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing in favor of #54373


//----------------------------------------------------------------------------
/// @brief If the job has not already been completed, executes the job
/// immediately on the callers thread.
///
/// This is useful if the current thread is going to idle anyway
/// and wants to participate in performing the job of the
/// concurrent task runner.
///
/// @param[in] id The job identifier.
///
void DoJobNow(UniqueID id);

//----------------------------------------------------------------------------
/// @brief Gets the task runner for the queue.
///
/// @return The task runner.
///
const std::shared_ptr<fml::ConcurrentTaskRunner>& GetTaskRunner() const;

private:
std::shared_ptr<fml::ConcurrentTaskRunner> task_runner_;
Mutex jobs_mutex_;
std::map<UniqueID, fml::closure> jobs_ IPLR_GUARDED_BY(jobs_mutex_);
std::deque<UniqueID> high_priority_job_ids_ IPLR_GUARDED_BY(jobs_mutex_);

JobQueue(std::shared_ptr<fml::ConcurrentTaskRunner> task_runner);

void ScheduleAndRunJob(UniqueID id);

void RunJobNow(UniqueID id);

bool RunHighPriorityJob();

[[nodiscard]]
fml::closure TakeJob(UniqueID id) IPLR_REQUIRES(jobs_mutex_);
};

} // namespace impeller

#endif // FLUTTER_IMPELLER_BASE_JOB_QUEUE_H_
1 change: 1 addition & 0 deletions impeller/base/timing.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

namespace impeller {

using Nanoseconds = std::chrono::nanoseconds;
using MillisecondsF = std::chrono::duration<float, std::milli>;
using SecondsF = std::chrono::duration<float>;
using Clock = std::chrono::high_resolution_clock;
Expand Down
27 changes: 20 additions & 7 deletions impeller/renderer/backend/vulkan/pipeline_library_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ PipelineLibraryVK::PipelineLibraryVK(
pso_cache_(std::make_shared<PipelineCacheVK>(std::move(caps),
device_holder,
std::move(cache_directory))),
worker_task_runner_(std::move(worker_task_runner)) {
FML_DCHECK(worker_task_runner_);
if (!pso_cache_->IsValid() || !worker_task_runner_) {
job_queue_(JobQueue::Make(std::move(worker_task_runner))) {
if (!pso_cache_->IsValid()) {
return;
}

Expand Down Expand Up @@ -155,6 +154,19 @@ std::unique_ptr<ComputePipelineVK> PipelineLibraryVK::CreateComputePipeline(
);
}

fml::closure PostJobAndMakePrioritizationCallback(
const std::shared_ptr<JobQueue>& queue,
fml::closure job) {
FML_DCHECK(job);
FML_DCHECK(queue);
auto id = queue->AddJob(std::move(job));
return [id, weak = queue->weak_from_this()]() {
if (auto queue = weak.lock()) {
queue->DoJobNow(id);
}
};
}

// |PipelineLibrary|
PipelineFuture<PipelineDescriptor> PipelineLibraryVK::GetPipeline(
PipelineDescriptor descriptor,
Expand Down Expand Up @@ -196,7 +208,8 @@ PipelineFuture<PipelineDescriptor> PipelineLibraryVK::GetPipeline(
};

if (async) {
worker_task_runner_->PostTask(generation_task);
pipeline_future.on_prioritize = PostJobAndMakePrioritizationCallback(
job_queue_, std::move(generation_task));
} else {
generation_task();
}
Expand Down Expand Up @@ -251,7 +264,7 @@ PipelineFuture<ComputePipelineDescriptor> PipelineLibraryVK::GetPipeline(
};

if (async) {
worker_task_runner_->PostTask(generation_task);
GetWorkerTaskRunner()->PostTask(generation_task);
} else {
generation_task();
}
Expand Down Expand Up @@ -281,7 +294,7 @@ void PipelineLibraryVK::DidAcquireSurfaceFrame() {
}

void PipelineLibraryVK::PersistPipelineCacheToDisk() {
worker_task_runner_->PostTask(
GetWorkerTaskRunner()->PostTask(
[weak_cache = decltype(pso_cache_)::weak_type(pso_cache_)]() {
auto cache = weak_cache.lock();
if (!cache) {
Expand All @@ -297,7 +310,7 @@ const std::shared_ptr<PipelineCacheVK>& PipelineLibraryVK::GetPSOCache() const {

const std::shared_ptr<fml::ConcurrentTaskRunner>&
PipelineLibraryVK::GetWorkerTaskRunner() const {
return worker_task_runner_;
return job_queue_->GetTaskRunner();
}

} // namespace impeller
3 changes: 2 additions & 1 deletion impeller/renderer/backend/vulkan/pipeline_library_vk.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "flutter/fml/concurrent_message_loop.h"
#include "flutter/fml/unique_fd.h"
#include "impeller/base/backend_cast.h"
#include "impeller/base/job_queue.h"
#include "impeller/base/thread.h"
#include "impeller/renderer/backend/vulkan/compute_pipeline_vk.h"
#include "impeller/renderer/backend/vulkan/pipeline_cache_vk.h"
Expand Down Expand Up @@ -39,7 +40,7 @@ class PipelineLibraryVK final

std::weak_ptr<DeviceHolderVK> device_holder_;
std::shared_ptr<PipelineCacheVK> pso_cache_;
std::shared_ptr<fml::ConcurrentTaskRunner> worker_task_runner_;
std::shared_ptr<JobQueue> job_queue_;
Mutex pipelines_mutex_;
PipelineMap pipelines_ IPLR_GUARDED_BY(pipelines_mutex_);
Mutex compute_pipelines_mutex_;
Expand Down
3 changes: 2 additions & 1 deletion impeller/renderer/backend/vulkan/pipeline_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ std::unique_ptr<PipelineVK> PipelineVK::Create(
const std::shared_ptr<DeviceHolderVK>& device_holder,
const std::weak_ptr<PipelineLibrary>& weak_library,
std::shared_ptr<SamplerVK> immutable_sampler) {
TRACE_EVENT0("flutter", "PipelineVK::Create");
TRACE_EVENT1("flutter", "PipelineVK::Create", "Name",
desc.GetLabel().c_str());

auto library = weak_library.lock();

Expand Down
23 changes: 20 additions & 3 deletions impeller/renderer/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

#include <future>

#include "compute_pipeline_descriptor.h"
#include "flutter/fml/closure.h"
#include "impeller/base/timing.h"
#include "impeller/renderer/compute_pipeline_builder.h"
#include "impeller/renderer/compute_pipeline_descriptor.h"
#include "impeller/renderer/context.h"
Expand All @@ -24,11 +25,27 @@ class Pipeline;
template <typename T>
struct PipelineFuture {
std::optional<T> descriptor;
std::shared_future<std::shared_ptr<Pipeline<T>>> future;

const std::shared_ptr<Pipeline<T>> Get() const { return future.get(); }
fml::closure on_prioritize;

PipelineFuture() = default;

PipelineFuture(std::optional<T> p_descriptor,
std::shared_future<std::shared_ptr<Pipeline<T>>> p_future)
: descriptor(std::move(p_descriptor)), future(std::move(p_future)) {}

const std::shared_ptr<Pipeline<T>> Get() const {
if (on_prioritize &&
future.wait_for(Nanoseconds{0u}) == std::future_status::timeout) {
on_prioritize();
}
return future.get();
}

bool IsValid() const { return future.valid(); }

private:
std::shared_future<std::shared_ptr<Pipeline<T>>> future;
};

//------------------------------------------------------------------------------
Expand Down