diff --git a/impeller/base/BUILD.gn b/impeller/base/BUILD.gn index aa99334f465b7..a8436cff65327 100644 --- a/impeller/base/BUILD.gn +++ b/impeller/base/BUILD.gn @@ -14,6 +14,8 @@ impeller_component("base") { "comparable.cc", "comparable.h", "config.h", + "job_queue.cc", + "job_queue.h", "mask.h", "promise.cc", "promise.h", diff --git a/impeller/base/job_queue.cc b/impeller/base/job_queue.cc new file mode 100644 index 0000000000000..5d7a8df134427 --- /dev/null +++ b/impeller/base/job_queue.cc @@ -0,0 +1,104 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "impeller/base/job_queue.h" + +#include "flutter/fml/trace_event.h" +#include "impeller/base/validation.h" + +namespace impeller { + +JobQueue::JobQueue(std::shared_ptr task_runner) + : task_runner_(std::move(task_runner)) {} + +JobQueue::~JobQueue() = default; + +std::shared_ptr JobQueue::Make( + std::shared_ptr task_runner) { + return std::shared_ptr(new JobQueue(std::move(task_runner))); +} + +UniqueID JobQueue::AddJob(fml::closure job) { + auto id = UniqueID{}; + { + Lock lock(jobs_mutex_); + jobs_[id] = job; + } + ScheduleAndRunJob(id); + return id; +} + +void JobQueue::ScheduleAndRunJob(UniqueID id) { + task_runner_->PostTask([weak = weak_from_this(), id]() { + if (auto thiz = weak.lock()) { + thiz->RunJobNow(id); + } + }); +} + +void JobQueue::PrioritizeJob(UniqueID id) { + Lock lock(jobs_mutex_); + high_priority_job_ids_.push_back(id); +} + +void JobQueue::RunJobNow(UniqueID id) { + while (RunHighPriorityJob()) { + } + fml::closure job; + { + Lock lock(jobs_mutex_); + job = TakeJob(id); + } + if (job) { + TRACE_EVENT0("impeller", "RegularJob"); + job(); + } +} + +bool JobQueue::RunHighPriorityJob() { + fml::closure job; + { + Lock lock(jobs_mutex_); + if (high_priority_job_ids_.empty()) { + return false; + } + auto job_id = high_priority_job_ids_.front(); + high_priority_job_ids_.pop_front(); + job = TakeJob(job_id); + } + if (job) { + TRACE_EVENT0("impeller", "HighPriorityJob"); + job(); + } + return true; +} + +void JobQueue::DoJobNow(UniqueID id) { + fml::closure job; + { + Lock lock(jobs_mutex_); + job = TakeJob(id); + } + if (job) { + TRACE_EVENT0("impeller", "EagerJob"); + job(); + } +} + +fml::closure JobQueue::TakeJob(UniqueID id) { + auto found = jobs_.find(id); + if (found == jobs_.end()) { + return nullptr; + } + auto job = found->second; + jobs_.erase(found); + return job; +} + +const std::shared_ptr& JobQueue::GetTaskRunner() + const { + return task_runner_; +} + +} // namespace impeller diff --git a/impeller/base/job_queue.h b/impeller/base/job_queue.h new file mode 100644 index 0000000000000..dad196795126f --- /dev/null +++ b/impeller/base/job_queue.h @@ -0,0 +1,107 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef FLUTTER_IMPELLER_BASE_JOB_QUEUE_H_ +#define FLUTTER_IMPELLER_BASE_JOB_QUEUE_H_ + +#include +#include +#include + +#include "flutter/fml/closure.h" +#include "flutter/fml/concurrent_message_loop.h" +#include "impeller/base/comparable.h" +#include "impeller/base/thread.h" + +namespace impeller { + +//------------------------------------------------------------------------------ +/// @brief Manages a queue of jobs that execute on a concurrent task +/// runner. Jobs execute as soon as resources are available. Callers +/// have the ability to re-prioritize jobs or perform jobs eagerly +/// on their own threads if needed. +/// +/// The job queue and all its methods are thread-safe. +/// +class JobQueue final : public std::enable_shared_from_this { + public: + //---------------------------------------------------------------------------- + /// @brief Creates a job queue which schedules tasks on the given + /// concurrent task runner. + /// + /// @param[in] task_runner The task runner + /// + /// @return A job queue if one can be created. + /// + static std::shared_ptr Make( + std::shared_ptr task_runner); + + //---------------------------------------------------------------------------- + /// @brief Destroys the job queue. + /// + ~JobQueue(); + + JobQueue(const JobQueue&) = delete; + + JobQueue& operator=(const JobQueue&) = delete; + + //---------------------------------------------------------------------------- + /// @brief Adds a job to the job queue and schedules it for execution at + /// a later point in time. The job ID obtained may be used to + /// re-prioritize the job within the queue at a later point. + /// + /// @param[in] job The job + /// + /// @return The unique id for the job. + /// + UniqueID AddJob(fml::closure job); + + //---------------------------------------------------------------------------- + /// @brief Prioritize a previously added job for immediate execution on + /// the concurrent task runner. + /// + /// @param[in] id The job identifier. + /// + void PrioritizeJob(UniqueID id); + + //---------------------------------------------------------------------------- + /// @brief If the job has not already been completed, executes the job + /// immediately on the callers thread. + /// + /// This is useful if the current thread is going to idle anyway + /// and wants to participate in performing the job of the + /// concurrent task runner. + /// + /// @param[in] id The job identifier. + /// + void DoJobNow(UniqueID id); + + //---------------------------------------------------------------------------- + /// @brief Gets the task runner for the queue. + /// + /// @return The task runner. + /// + const std::shared_ptr& GetTaskRunner() const; + + private: + std::shared_ptr task_runner_; + Mutex jobs_mutex_; + std::map jobs_ IPLR_GUARDED_BY(jobs_mutex_); + std::deque high_priority_job_ids_ IPLR_GUARDED_BY(jobs_mutex_); + + JobQueue(std::shared_ptr task_runner); + + void ScheduleAndRunJob(UniqueID id); + + void RunJobNow(UniqueID id); + + bool RunHighPriorityJob(); + + [[nodiscard]] + fml::closure TakeJob(UniqueID id) IPLR_REQUIRES(jobs_mutex_); +}; + +} // namespace impeller + +#endif // FLUTTER_IMPELLER_BASE_JOB_QUEUE_H_ diff --git a/impeller/base/timing.h b/impeller/base/timing.h index 86ac48ceba327..f03740f4d4e84 100644 --- a/impeller/base/timing.h +++ b/impeller/base/timing.h @@ -9,6 +9,7 @@ namespace impeller { +using Nanoseconds = std::chrono::nanoseconds; using MillisecondsF = std::chrono::duration; using SecondsF = std::chrono::duration; using Clock = std::chrono::high_resolution_clock; diff --git a/impeller/renderer/backend/vulkan/pipeline_library_vk.cc b/impeller/renderer/backend/vulkan/pipeline_library_vk.cc index 9d6fc7e21d7f8..e958f2080e8da 100644 --- a/impeller/renderer/backend/vulkan/pipeline_library_vk.cc +++ b/impeller/renderer/backend/vulkan/pipeline_library_vk.cc @@ -33,9 +33,8 @@ PipelineLibraryVK::PipelineLibraryVK( pso_cache_(std::make_shared(std::move(caps), device_holder, std::move(cache_directory))), - worker_task_runner_(std::move(worker_task_runner)) { - FML_DCHECK(worker_task_runner_); - if (!pso_cache_->IsValid() || !worker_task_runner_) { + job_queue_(JobQueue::Make(std::move(worker_task_runner))) { + if (!pso_cache_->IsValid()) { return; } @@ -155,6 +154,19 @@ std::unique_ptr PipelineLibraryVK::CreateComputePipeline( ); } +fml::closure PostJobAndMakePrioritizationCallback( + const std::shared_ptr& queue, + fml::closure job) { + FML_DCHECK(job); + FML_DCHECK(queue); + auto id = queue->AddJob(std::move(job)); + return [id, weak = queue->weak_from_this()]() { + if (auto queue = weak.lock()) { + queue->DoJobNow(id); + } + }; +} + // |PipelineLibrary| PipelineFuture PipelineLibraryVK::GetPipeline( PipelineDescriptor descriptor, @@ -196,7 +208,8 @@ PipelineFuture PipelineLibraryVK::GetPipeline( }; if (async) { - worker_task_runner_->PostTask(generation_task); + pipeline_future.on_prioritize = PostJobAndMakePrioritizationCallback( + job_queue_, std::move(generation_task)); } else { generation_task(); } @@ -251,7 +264,7 @@ PipelineFuture PipelineLibraryVK::GetPipeline( }; if (async) { - worker_task_runner_->PostTask(generation_task); + GetWorkerTaskRunner()->PostTask(generation_task); } else { generation_task(); } @@ -281,7 +294,7 @@ void PipelineLibraryVK::DidAcquireSurfaceFrame() { } void PipelineLibraryVK::PersistPipelineCacheToDisk() { - worker_task_runner_->PostTask( + GetWorkerTaskRunner()->PostTask( [weak_cache = decltype(pso_cache_)::weak_type(pso_cache_)]() { auto cache = weak_cache.lock(); if (!cache) { @@ -297,7 +310,7 @@ const std::shared_ptr& PipelineLibraryVK::GetPSOCache() const { const std::shared_ptr& PipelineLibraryVK::GetWorkerTaskRunner() const { - return worker_task_runner_; + return job_queue_->GetTaskRunner(); } } // namespace impeller diff --git a/impeller/renderer/backend/vulkan/pipeline_library_vk.h b/impeller/renderer/backend/vulkan/pipeline_library_vk.h index ddc476f6d7fcf..215f683422fa0 100644 --- a/impeller/renderer/backend/vulkan/pipeline_library_vk.h +++ b/impeller/renderer/backend/vulkan/pipeline_library_vk.h @@ -10,6 +10,7 @@ #include "flutter/fml/concurrent_message_loop.h" #include "flutter/fml/unique_fd.h" #include "impeller/base/backend_cast.h" +#include "impeller/base/job_queue.h" #include "impeller/base/thread.h" #include "impeller/renderer/backend/vulkan/compute_pipeline_vk.h" #include "impeller/renderer/backend/vulkan/pipeline_cache_vk.h" @@ -39,7 +40,7 @@ class PipelineLibraryVK final std::weak_ptr device_holder_; std::shared_ptr pso_cache_; - std::shared_ptr worker_task_runner_; + std::shared_ptr job_queue_; Mutex pipelines_mutex_; PipelineMap pipelines_ IPLR_GUARDED_BY(pipelines_mutex_); Mutex compute_pipelines_mutex_; diff --git a/impeller/renderer/backend/vulkan/pipeline_vk.cc b/impeller/renderer/backend/vulkan/pipeline_vk.cc index 793a158a32e80..727a60b095839 100644 --- a/impeller/renderer/backend/vulkan/pipeline_vk.cc +++ b/impeller/renderer/backend/vulkan/pipeline_vk.cc @@ -451,7 +451,8 @@ std::unique_ptr PipelineVK::Create( const std::shared_ptr& device_holder, const std::weak_ptr& weak_library, std::shared_ptr immutable_sampler) { - TRACE_EVENT0("flutter", "PipelineVK::Create"); + TRACE_EVENT1("flutter", "PipelineVK::Create", "Name", + desc.GetLabel().c_str()); auto library = weak_library.lock(); diff --git a/impeller/renderer/pipeline.h b/impeller/renderer/pipeline.h index b3babc34a8537..ccd7be8010ea4 100644 --- a/impeller/renderer/pipeline.h +++ b/impeller/renderer/pipeline.h @@ -7,7 +7,8 @@ #include -#include "compute_pipeline_descriptor.h" +#include "flutter/fml/closure.h" +#include "impeller/base/timing.h" #include "impeller/renderer/compute_pipeline_builder.h" #include "impeller/renderer/compute_pipeline_descriptor.h" #include "impeller/renderer/context.h" @@ -24,11 +25,27 @@ class Pipeline; template struct PipelineFuture { std::optional descriptor; - std::shared_future>> future; - const std::shared_ptr> Get() const { return future.get(); } + fml::closure on_prioritize; + + PipelineFuture() = default; + + PipelineFuture(std::optional p_descriptor, + std::shared_future>> p_future) + : descriptor(std::move(p_descriptor)), future(std::move(p_future)) {} + + const std::shared_ptr> Get() const { + if (on_prioritize && + future.wait_for(Nanoseconds{0u}) == std::future_status::timeout) { + on_prioritize(); + } + return future.get(); + } bool IsValid() const { return future.valid(); } + + private: + std::shared_future>> future; }; //------------------------------------------------------------------------------