From 69f54501d44a11e3ca44f0b3b4242f8695b833d6 Mon Sep 17 00:00:00 2001 From: Mykola Pokhylets Date: Fri, 12 Jan 2024 16:17:49 +0100 Subject: [PATCH 1/4] Added benchmark for adding jobs to default actor when visiting a tree --- benchmark/CMakeLists.txt | 1 + benchmark/single-source/AsyncTree.swift | 77 +++++++++++++++++++++++++ benchmark/utils/main.swift | 2 + 3 files changed, 80 insertions(+) create mode 100644 benchmark/single-source/AsyncTree.swift diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index cf29da88138c9..71b33cdc2d80a 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -40,6 +40,7 @@ set(SWIFT_BENCH_MODULES single-source/ArrayRemoveAll single-source/ArraySetElement single-source/ArraySubscript + single-source/AsyncTree single-source/BinaryFloatingPointConversionFromBinaryInteger single-source/BinaryFloatingPointProperties single-source/BitCount diff --git a/benchmark/single-source/AsyncTree.swift b/benchmark/single-source/AsyncTree.swift new file mode 100644 index 0000000000000..ed67ef69c30a8 --- /dev/null +++ b/benchmark/single-source/AsyncTree.swift @@ -0,0 +1,77 @@ +//===--- AsyncTree.swift -------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2021 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import TestsUtils +import Dispatch + +public var benchmarks: [BenchmarkInfo] { + guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else { + return [] + } + return [ + BenchmarkInfo( + name: "AsyncTree.100", + runFunction: run_AsyncTree(treeSize: 100), + tags: [.concurrency] + ), + BenchmarkInfo( + name: "AsyncTree.5000", + runFunction: run_AsyncTree(treeSize: 5000), + tags: [.concurrency] + ) + ] +} + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +private actor MyActor { + let g: DispatchGroup + + init(_ g: DispatchGroup) { + self.g = g + } + + func test(_ n: Int) { + let L = n / 2 + let R = n - 1 - L + + if L > 0 { + Task { + self.test(L) + } + } + + if R > 0 { + Task { + self.test(R) + } + } + + g.leave() + } +} + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +private func run_AsyncTree(treeSize: Int) -> (Int) -> Void { + return { n in + for _ in 0.. Date: Fri, 12 Jan 2024 14:40:36 +0100 Subject: [PATCH 2/4] Fixed quadratic performance of ListMerger when each executed job creates 2+ new jobs of the same priority See https://forums.swift.org/t/quadratic-performance-of-the-listmerger-in-specific-use-case/69393 --- include/swift/Basic/ListMerger.h | 47 +++++++--- stdlib/public/Concurrency/Actor.cpp | 90 +++++++++++-------- .../Concurrency/CooperativeGlobalExecutor.inc | 4 +- utils/test-list-merger/Makefile | 2 +- utils/test-list-merger/TestListMerger.cpp | 4 +- 5 files changed, 89 insertions(+), 58 deletions(-) diff --git a/include/swift/Basic/ListMerger.h b/include/swift/Basic/ListMerger.h index fd056402434bf..a2ee9fb56188c 100644 --- a/include/swift/Basic/ListMerger.h +++ b/include/swift/Basic/ListMerger.h @@ -19,6 +19,7 @@ #define SWIFT_BASIC_LISTMERGER_H #include +#include namespace swift { @@ -62,13 +63,31 @@ namespace swift { /// to the merger and before being released except by the merger. template class ListMerger { +public: + class LastInsertionPoint { + friend class ListMerger; + Node node = Node(); + bool isKnownLastOfEquals = false; + + public: + LastInsertionPoint() {} + + void nodeWasRemoved(Node removedNode) { + if (node == removedNode) { + *this = LastInsertionPoint(); + } + } + }; + +private: Node root; - Node lastInsertionPoint = Node(); - bool lastInsertionPointIsKnownLastOfEquals = false; + LastInsertionPoint lastInsertionPoint; + public: /// Construct a merger with the given sorted list as its current list. - ListMerger(Node initialList = Node()) - : root(initialList) {} + ListMerger(Node initialList = Node(), + LastInsertionPoint insertionPoint = LastInsertionPoint()) + : root(initialList), lastInsertionPoint(insertionPoint) {} /// Add a single node to this merger's current list. /// @@ -86,7 +105,7 @@ class ListMerger { Node stopper = Node(); // If we have a previous insertion point, compare against it. - if (Node lastIP = lastInsertionPoint) { + if (Node lastIP = lastInsertionPoint.node) { int comparison = NodeTraits::compare(lastIP, newNode); // If it compares equal, put the new node immediately after the @@ -169,7 +188,7 @@ class ListMerger { // If we have a previous insertion point, check for the presumed-common // case that we're inserting something that should immediately follow it. - if (auto lastIP = lastInsertionPoint) { + if (auto lastIP = lastInsertionPoint.node) { lastIP = findLastOfEqualsFromLastIP(lastIP); // Compare against the next node after lastIP, if it exists. @@ -246,7 +265,7 @@ class ListMerger { // If we have a previous insertion point, compare the new root // against it. - if (Node lastIP = lastInsertionPoint) { + if (Node lastIP = lastInsertionPoint.node) { int comparison = NodeTraits::compare(lastIP, rootOfNewList); // If it compares equal, we've got an insertion point where @@ -341,10 +360,10 @@ class ListMerger { /// Get the current list that's been built up, and clear the internal /// state of this merger. - Node release() { - Node result = root; + std::tuple release() { + auto result = std::make_tuple(root, lastInsertionPoint); root = Node(); - lastInsertionPoint = Node(); + lastInsertionPoint = LastInsertionPoint(); return result; } @@ -352,16 +371,16 @@ class ListMerger { /// Set the last point at which we inserted a node, and specify /// whether we know it was the last in its sequence of equals. void setLastInsertionPoint(Node lastIP, bool knownEndOfEquals) { - lastInsertionPoint = lastIP; - lastInsertionPointIsKnownLastOfEquals = knownEndOfEquals; + lastInsertionPoint.node = lastIP; + lastInsertionPoint.isKnownLastOfEquals = knownEndOfEquals; } /// Given the value of lastInsertionPoint (passed in to avoid /// reloading it), find the last node in the sequence of equals that /// contains it. Node findLastOfEqualsFromLastIP(Node lastIP) const { - assert(lastIP == lastInsertionPoint); - if (!lastInsertionPointIsKnownLastOfEquals) + assert(lastIP == lastInsertionPoint.node); + if (!lastInsertionPoint.isKnownLastOfEquals) return findLastOfEquals(lastIP); return lastIP; } diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index abb8b948be036..99c7e2f4ffa6d 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -981,6 +981,35 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus { } }; +#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS + +/// Given that a job is enqueued normally on a default actor, get/set +/// the next job in the actor's queue. +static JobRef getNextJobInQueue(Job *job) { + return *reinterpret_cast(job->SchedulerPrivate); +} +static void setNextJobInQueue(Job *job, JobRef next) { + *reinterpret_cast(job->SchedulerPrivate) = next; +} + +namespace { + +struct JobQueueTraits { + static Job *getNext(Job *job) { + return getNextJobInQueue(job).getAsPreprocessedJob(); + } + static void setNext(Job *job, Job *next) { + setNextJobInQueue(job, JobRef::getPreprocessed(next)); + } + static int compare(Job *lhs, Job *rhs) { + return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority()); + } +}; + +} // end anonymous namespace + +#endif + #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES #define ACTIVE_ACTOR_STATUS_SIZE (4 * (sizeof(uintptr_t))) #else @@ -1052,10 +1081,13 @@ class DefaultActorImpl : public HeapObject { // enforce alignment. This is space that is available for us to use in // the future alignas(sizeof(ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)]; + + using ListMerger = swift::ListMerger; + ListMerger::LastInsertionPoint lastInsertionPoint = + ListMerger::LastInsertionPoint(); #endif // TODO (rokhinip): Make this a flagset bool isDistributedRemoteActor; - public: /// Properly construct an actor, except for the heap header. void initialize(bool isDistributedRemote = false) { @@ -1128,6 +1160,10 @@ class DefaultActorImpl : public HeapObject { /// It can be done when actor transitions from Idle to Scheduled or /// when actor gets a priority override and we schedule a stealer. void scheduleActorProcessJob(JobPriority priority); + + Job *preprocessQueue(JobRef start); + Job *preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, + Job *existingProcessedJobsToMergeInto); #endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */ void deallocateUnconditional(); @@ -1203,31 +1239,6 @@ static NonDefaultDistributedActorImpl *asImpl(NonDefaultDistributedActor *actor) /*****************************************************************************/ #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS -/// Given that a job is enqueued normally on a default actor, get/set -/// the next job in the actor's queue. -static JobRef getNextJobInQueue(Job *job) { - return *reinterpret_cast(job->SchedulerPrivate); -} -static void setNextJobInQueue(Job *job, JobRef next) { - *reinterpret_cast(job->SchedulerPrivate) = next; -} - -namespace { - -struct JobQueueTraits { - static Job *getNext(Job *job) { - return getNextJobInQueue(job).getAsPreprocessedJob(); - } - static void setNext(Job *job, Job *next) { - setNextJobInQueue(job, JobRef::getPreprocessed(next)); - } - static int compare(Job *lhs, Job *rhs) { - return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority()); - } -}; - -} // end anonymous namespace - // Called with the actor drain lock held // @@ -1240,15 +1251,14 @@ struct JobQueueTraits { // and the previous start. We can then process these jobs and merge them into // the already processed list of jobs from the previous iteration of // preprocessQueue -static Job * -preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingProcessedJobsToMergeInto) -{ +Job *DefaultActorImpl::preprocessQueue(JobRef unprocessedStart, + JobRef unprocessedEnd, + Job *existingProcessedJobsToMergeInto) { assert(existingProcessedJobsToMergeInto != NULL); assert(unprocessedStart.needsPreprocessing()); assert(unprocessedStart.getAsJob() != unprocessedEnd.getAsJob()); // Build up a list of jobs we need to preprocess - using ListMerger = swift::ListMerger; ListMerger jobsToProcess; // Get just the prefix list of unprocessed jobs @@ -1263,19 +1273,20 @@ preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingPro } // Finish processing the unprocessed jobs - Job *newProcessedJobs = jobsToProcess.release(); + Job *newProcessedJobs = std::get<0>(jobsToProcess.release()); assert(newProcessedJobs); - ListMerger mergedList(existingProcessedJobsToMergeInto); + ListMerger mergedList(existingProcessedJobsToMergeInto, lastInsertionPoint); mergedList.merge(newProcessedJobs); - return mergedList.release(); + Job *result; + std::tie(result, lastInsertionPoint) = mergedList.release(); + return result; } // Called with the actor drain lock held. // // Preprocess the queue starting from the top -static Job * -preprocessQueue(JobRef start) { +Job *DefaultActorImpl::preprocessQueue(JobRef start) { if (!start) { return NULL; } @@ -1288,7 +1299,6 @@ preprocessQueue(JobRef start) { // There exist some jobs which haven't been preprocessed // Build up a list of jobs we need to preprocess - using ListMerger = swift::ListMerger; ListMerger jobsToProcess; Job *wellFormedListStart = NULL; @@ -1311,18 +1321,19 @@ preprocessQueue(JobRef start) { } // Finish processing the unprocessed jobs - auto processedJobHead = jobsToProcess.release(); + auto processedJobHead = std::get<0>(jobsToProcess.release()); assert(processedJobHead); Job *firstJob = NULL; if (wellFormedListStart) { // Merge it with already known well formed list if we have one. - ListMerger mergedList(wellFormedListStart); + ListMerger mergedList(wellFormedListStart, lastInsertionPoint); mergedList.merge(processedJobHead); - firstJob = mergedList.release(); + std::tie(firstJob, lastInsertionPoint) = mergedList.release(); } else { // Nothing to merge with, just return the head we already have firstJob = processedJobHead; + lastInsertionPoint = ListMerger::LastInsertionPoint(); } return firstJob; @@ -1528,6 +1539,7 @@ Job * DefaultActorImpl::drainOne() { if (_status().compare_exchange_weak(oldState, newState, /* success */ std::memory_order_relaxed, /* failure */ std::memory_order_relaxed)) { + lastInsertionPoint.nodeWasRemoved(firstJob); SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this); traceActorStateTransition(this, oldState, newState, distributedActorIsRemote); concurrency::trace::actor_dequeue(this, firstJob); diff --git a/stdlib/public/Concurrency/CooperativeGlobalExecutor.inc b/stdlib/public/Concurrency/CooperativeGlobalExecutor.inc index d0fde89996384..904b9adebe269 100644 --- a/stdlib/public/Concurrency/CooperativeGlobalExecutor.inc +++ b/stdlib/public/Concurrency/CooperativeGlobalExecutor.inc @@ -108,7 +108,7 @@ static void swift_task_enqueueGlobalImpl(Job *job) { JobQueueMerger merger(JobQueue); merger.insert(job); - JobQueue = merger.release(); + JobQueue = std::get<0>(merger.release()); } /// Enqueues a task on the main executor. @@ -202,7 +202,7 @@ static void recognizeReadyDelayedJobs() { nextDelayedJob = next; } - JobQueue = readyJobs.release(); + JobQueue = std::get<0>(readyJobs.release()); DelayedJobQueue = nextDelayedJob; } diff --git a/utils/test-list-merger/Makefile b/utils/test-list-merger/Makefile index dffafbd9f0397..5e5edb024c86a 100644 --- a/utils/test-list-merger/Makefile +++ b/utils/test-list-merger/Makefile @@ -1,7 +1,7 @@ SWIFT_SRCROOT=${CURDIR}/../.. SRCROOT=${SWIFT_SRCROOT}/.. LLVM_SRCROOT=${SRCROOT}/llvm/ -LLVM_OBJROOT=${SRCROOT}/build/Ninja-DebugAssert/llvm-macosx-x86_64 +LLVM_OBJROOT=${SRCROOT}/build/Ninja-DebugAssert/llvm-macosx-$(shell uname -m) HEADERS=${SWIFT_SRCROOT}/include/swift/Basic/ListMerger.h diff --git a/utils/test-list-merger/TestListMerger.cpp b/utils/test-list-merger/TestListMerger.cpp index c05e976ed710a..481a7ea84d216 100644 --- a/utils/test-list-merger/TestListMerger.cpp +++ b/utils/test-list-merger/TestListMerger.cpp @@ -161,7 +161,7 @@ static void runInsertAndMergeTest(llvm::ArrayRef values) { assert(!lastMergeEntry && "ended while still building a merge list"); entries.sort(creationOrder); - entries.checkSameAs(merger.release()); + entries.checkSameAs(std::get<0>(merger.release())); } static void runInsertAtFrontTest(llvm::ArrayRef values) { @@ -171,7 +171,7 @@ static void runInsertAtFrontTest(llvm::ArrayRef values) { merger.insertAtFront(entries.create(value)); } entries.sort(reverseCreationOrder); - entries.checkSameAs(merger.release()); + entries.checkSameAs(std::get<0>(merger.release())); } static void runConcreteTests() { From 40c38f98034735218d1feacbd34a57d7103ee78d Mon Sep 17 00:00:00 2001 From: Mykola Pokhylets Date: Mon, 1 Apr 2024 19:12:31 +0200 Subject: [PATCH 3/4] Using multiple insertion points to ensure all jobs are always inserted in O(1) Fully separated unprocessed jobs and processed jobs Reverse jobs after updating status to minimise contention --- include/swift/ABI/MetadataValues.h | 26 + include/swift/Basic/HeaderFooterLayout.h | 39 ++ include/swift/Basic/ListMerger.h | 402 ---------------- include/swift/Basic/PriorityQueue.h | 161 +++++++ stdlib/public/Concurrency/Actor.cpp | 444 +++++++----------- .../Concurrency/CooperativeGlobalExecutor.inc | 24 +- stdlib/public/Concurrency/Tracing.h | 3 +- stdlib/public/Concurrency/TracingSignpost.h | 7 +- stdlib/public/Concurrency/TracingStubs.h | 3 +- utils/test-list-merger/Makefile | 13 - utils/test-list-merger/TestListMerger.cpp | 252 ---------- utils/test-priority-queue/Makefile | 11 + .../test-priority-queue/TestPriorityQueue.cpp | 246 ++++++++++ 13 files changed, 660 insertions(+), 971 deletions(-) create mode 100644 include/swift/Basic/HeaderFooterLayout.h delete mode 100644 include/swift/Basic/ListMerger.h create mode 100644 include/swift/Basic/PriorityQueue.h delete mode 100644 utils/test-list-merger/Makefile delete mode 100644 utils/test-list-merger/TestListMerger.cpp create mode 100644 utils/test-priority-queue/Makefile create mode 100644 utils/test-priority-queue/TestPriorityQueue.cpp diff --git a/include/swift/ABI/MetadataValues.h b/include/swift/ABI/MetadataValues.h index 4691e87a4aa7c..93074ae25f66b 100644 --- a/include/swift/ABI/MetadataValues.h +++ b/include/swift/ABI/MetadataValues.h @@ -2511,6 +2511,32 @@ inline int descendingPriorityOrder(JobPriority lhs, return (lhs == rhs ? 0 : lhs > rhs ? -1 : 1); } +enum { PriorityBucketCount = 5 }; + +inline int getPriorityBucketIndex(JobPriority priority) { + // Any unknown priorities will be rounded up to a known one. + // Priorities higher than UserInteractive are clamped to UserInteractive. + // Jobs of unknown priorities will end up in the same bucket as jobs of a + // corresponding known priority. Within the bucket they will be sorted in + // FIFO order. + if (priority > JobPriority::UserInitiated) { + // UserInteractive and higher + return 0; + } else if (priority > JobPriority::Default) { + // UserInitiated + return 1; + } else if (priority > JobPriority::Utility) { + // Default + return 2; + } else if (priority > JobPriority::Background) { + // Utility + return 3; + } else { + // Background and lower + return 4; + } +} + inline JobPriority withUserInteractivePriorityDowngrade(JobPriority priority) { return (priority == JobPriority::UserInteractive) ? JobPriority::UserInitiated : priority; diff --git a/include/swift/Basic/HeaderFooterLayout.h b/include/swift/Basic/HeaderFooterLayout.h new file mode 100644 index 0000000000000..31effb09ac2be --- /dev/null +++ b/include/swift/Basic/HeaderFooterLayout.h @@ -0,0 +1,39 @@ +//===--- HeaderFooterLayout.h -----------------------------------*- C++ -*-===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#ifndef SWIFT_BASIC_HEADER_FOOTER_LAYOUT_H +#define SWIFT_BASIC_HEADER_FOOTER_LAYOUT_H + +namespace swift { + +template +struct HeaderFooterLayoutPadding { +private: + enum : ptrdiff_t { + maxFooterOffset = TotalSize - (ptrdiff_t)sizeof(Footer), + footerAlignment = (ptrdiff_t)alignof(Footer), + footerOffset = maxFooterOffset - (maxFooterOffset % footerAlignment), + size = footerOffset - (ptrdiff_t)sizeof(Header) + }; + char padding[size]; +}; + +template +struct HeaderFooterLayout + : Header, + HeaderFooterLayoutPadding, + Footer {}; + +} // namespace swift + +#endif // SWIFT_BASIC_HEADER_FOOTER_LAYOUT_H + diff --git a/include/swift/Basic/ListMerger.h b/include/swift/Basic/ListMerger.h deleted file mode 100644 index a2ee9fb56188c..0000000000000 --- a/include/swift/Basic/ListMerger.h +++ /dev/null @@ -1,402 +0,0 @@ -//===--- ListMerger.h - Merging sorted linked lists -------------*- C++ -*-===// -// -// This source file is part of the Swift.org open source project -// -// Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors -// -//===----------------------------------------------------------------------===// -// -// This file defines a class that helps with maintaining and merging a -// sorted linked list. -// -//===----------------------------------------------------------------------===// - -#ifndef SWIFT_BASIC_LISTMERGER_H -#define SWIFT_BASIC_LISTMERGER_H - -#include -#include - -namespace swift { - -/// A class for building and merging sorted linked lists. -/// -/// The `Node` type parameter represents a reference to a list node. -/// Conceptually, a `Node` value is either null or a reference to an -/// object with an abstract sort value and a `next` reference -/// (another `Node` value). -/// -/// A null reference can be created by explicitly default-constructing -/// the `Node` type, e.g. with `Node()`. Converting a `Node` value -/// contextually to `bool` tests whether the node is a null reference. -/// `Node` values can be compared with the `==` and `!=` operators, -/// and equality with `Node()` is equivalent to a `bool` conversion. -/// These conditions are designed to allow pointer types to be used -/// directly, but they also permit other types. `ListMerger` is not -/// currently written to support smart pointer types efficiently, -/// however. -/// -/// The sort value and `next` reference are not accessed directly; -/// instead, they are accessed with `static` functions on the -/// `NodeTraits` type parameter: -/// -/// ``` -/// /// Return the current value of the next reference. -/// static Node getNext(Node n); -/// -/// /// Set the current value of the next reference. -/// static void setNext(Node n, Node next); -/// -/// /// Compare the sort value of this node with that of another -/// /// node, returning negative (<), zero (==), or positive (>). -/// /// A node must compare equal to itself. A sorted list obeys -/// /// the condition that each node in the list compares <= the next. -/// static int compare(Node lhs, Node rhs); -/// ``` -/// -/// The merger holds a current list of nodes. The sort value and -/// next references of nodes must not be accessed after being added -/// to the merger and before being released except by the merger. -template -class ListMerger { -public: - class LastInsertionPoint { - friend class ListMerger; - Node node = Node(); - bool isKnownLastOfEquals = false; - - public: - LastInsertionPoint() {} - - void nodeWasRemoved(Node removedNode) { - if (node == removedNode) { - *this = LastInsertionPoint(); - } - } - }; - -private: - Node root; - LastInsertionPoint lastInsertionPoint; - -public: - /// Construct a merger with the given sorted list as its current list. - ListMerger(Node initialList = Node(), - LastInsertionPoint insertionPoint = LastInsertionPoint()) - : root(initialList), lastInsertionPoint(insertionPoint) {} - - /// Add a single node to this merger's current list. - /// - /// The next reference of the node will be overwritten and does not - /// need to be meaningful. - /// - /// The relative order of nodes in the current list will not change, - /// and if there are nodes in the current list which compare equal - /// to the new node, it will be inserted after them. - void insert(Node newNode) { - assert(newNode && "inserting a null node"); - - Node prev = Node(); - Node cur = root; - Node stopper = Node(); - - // If we have a previous insertion point, compare against it. - if (Node lastIP = lastInsertionPoint.node) { - int comparison = NodeTraits::compare(lastIP, newNode); - - // If it compares equal, put the new node immediately after the - // last in the sequence of equals that contains it. This is a - // common fast path when we're adding many nodes that compare equal. - if (comparison == 0) { - lastIP = findLastOfEqualsFromLastIP(lastIP); - NodeTraits::setNext(newNode, NodeTraits::getNext(lastIP)); - NodeTraits::setNext(lastIP, newNode); - setLastInsertionPoint(newNode, /*known last of equals*/ true); - return; - - // If the new node must follow the last insertion node, we can - // at least start the search there. - } else if (comparison < 0) { - lastIP = findLastOfEqualsFromLastIP(lastIP); - prev = lastIP; - cur = NodeTraits::getNext(lastIP); - - // Otherwise, we can at least end the search at the last inserted - // node. - } else { - stopper = lastIP; - } - } - - // Invariants: - // root == [ ..., prev, cur, ... ] - // prev <= newRoot - - // Scan forward looking for either `end` or a node that strictly - // follows the new node. - while (cur != stopper && NodeTraits::compare(cur, newNode) <= 0) { - prev = cur; - cur = NodeTraits::getNext(cur); - } - - NodeTraits::setNext(newNode, cur); - if (prev) { - NodeTraits::setNext(prev, newNode); - } else { - root = newNode; - } - setLastInsertionPoint(newNode, /*known last of equals*/ true); - } - - /// Add a single node to this merger's current list. - /// - /// The next reference of the node will be overwritten and does not - /// need to be meaningful. - /// - /// The relative order of nodes in the current list will not change, - /// and if there are nodes in the current list which compare equal - /// to the new node, it will be inserted *before* them. - /// - /// This is useful for the pattern where nodes are naturally encountered - /// in the opposite of their desired order in the final list and - /// need to be reversed. It generally doesn't make any sense to mix - /// this with calls to insert or merge on the same merger. - void insertAtFront(Node newNode) { - assert(newNode && "inserting a null node"); - - auto insertBetween = [newNode, this](Node prev, Node next) { - if (prev) { - assert(NodeTraits::getNext(prev) == next); - assert(NodeTraits::compare(prev, newNode) < 0); - NodeTraits::setNext(prev, newNode); - } else { - assert(root == next); - root = newNode; - } - - assert(!next || NodeTraits::compare(newNode, next) <= 0); - NodeTraits::setNext(newNode, next); - setLastInsertionPoint(prev, /*known last of equals*/ true); - }; - - Node prev = Node(); - Node cur = root; - - // If we have a previous insertion point, check for the presumed-common - // case that we're inserting something that should immediately follow it. - if (auto lastIP = lastInsertionPoint.node) { - lastIP = findLastOfEqualsFromLastIP(lastIP); - - // Compare against the next node after lastIP, if it exists. - if (Node nextAfterLastIP = NodeTraits::getNext(lastIP)) { - int comparison = NodeTraits::compare(nextAfterLastIP, newNode); - - // If the new node compares equal to the next node, insert here. - if (comparison == 0) { - insertBetween(lastIP, nextAfterLastIP); - return; - } - - // If the new node should follow the next node, start scanning - // after it. - if (comparison < 0) { - prev = nextAfterLastIP; - cur = NodeTraits::getNext(nextAfterLastIP); - } - - // Otherwise, we'll need to scan from the beginning. - - // If there is no next node, compare against the previous. - } else { - int comparison = NodeTraits::compare(lastIP, newNode); - - // If the new node should follow the last node, we can - // insert here. - if (comparison < 0) { - insertBetween(lastIP, Node()); - return; - } - - // Otherwise, we'll need to scan from the beginning. - } - } - - assert(!prev || NodeTraits::compare(prev, newNode) < 0); - - // Scan forward, looking for a node which the new node must be - // inserted prior to. - // Invariant: prev < newNode, if prev exists - while (cur) { - // Compare the new node against the current IP. - int comparison = NodeTraits::compare(cur, newNode); - - // If the new node isn't strictly greater than cur, insert here. - if (comparison >= 0) break; - - // Otherwise, continue. - prev = cur; - cur = NodeTraits::getNext(prev); - } - - insertBetween(prev, cur); - } - - /// Add a sorted list of nodes to this merger's current list. - /// The list must be well-formed (i.e. appropriately terminated). - /// - /// The relative order of nodes in both the current and the new list - /// will not change. If there are nodes in the current list which - /// compare equal to nodes in the new list, they will appear before - /// the new nodes. - /// - /// For example, if the current list is `[1@A, 1@B, 2@C]`, and the new - /// list is `[0@D, 1@E, 2@F]`, the current list after the merge will - /// be `[0@D, 1@A, 1@B, 1@E, 2@C, 2@F]`. - void merge(Node rootOfNewList) { - if (!rootOfNewList) return; - - Node prev = Node(); - Node cur = root; - Node stopper = Node(); - - // If we have a previous insertion point, compare the new root - // against it. - if (Node lastIP = lastInsertionPoint.node) { - int comparison = NodeTraits::compare(lastIP, rootOfNewList); - - // If it compares equal, we've got an insertion point where - // we can place rootOfNewList: the end of the sequence of - // equals that includes lastIP. This is a common fast path - // when we have many nodes that compare equal. - if (comparison == 0) { - lastIP = findLastOfEqualsFromLastIP(lastIP); - prev = lastIP; - cur = NodeTraits::getNext(lastIP); - goto foundInsertionPoint; // seems to be the best option - - // If the new node must follow the last insertion point, we can - // at least start the search there. - } else if (comparison < 0) { - lastIP = findLastOfEqualsFromLastIP(lastIP); - prev = lastIP; - cur = NodeTraits::getNext(lastIP); - - // Otherwise, we can end the initial search at that position. - } else { - stopper = lastIP; - } - } - - while (rootOfNewList) { - // Invariants: - // root == [ ..., prev, cur, ... ] - // prev <= rootOfNewList - - // Check if the position between prev and cur is where we should - // insert the root of the new list. - if (cur != stopper && NodeTraits::compare(cur, rootOfNewList) <= 0) { - prev = cur; - cur = NodeTraits::getNext(cur); - continue; - } - - // Place rootOfNewList at this position. Note that this might not be - // a proper splice because there may be nodes following prev that - // are now no longer reflected in the existing list. - if (!prev) { - root = rootOfNewList; - } else { - foundInsertionPoint: - NodeTraits::setNext(prev, rootOfNewList); - } - - // If we've run out of nodes in the existing list, it *is* - // a proper splice, and we're done. - if (!cur) { - assert(!stopper); - setLastInsertionPoint(rootOfNewList, /*known end of equals*/ false); - return; - } - - // If not, scan forward in the new list looking for a node that - // cur should precede. - Node prevInNewList = rootOfNewList; - Node curInNewList = NodeTraits::getNext(rootOfNewList); - while (curInNewList && NodeTraits::compare(cur, curInNewList) > 0) { - prevInNewList = curInNewList; - curInNewList = NodeTraits::getNext(curInNewList); - } - - // prevInNewList < cur <= curInNewList (if it exists) - - // Turn this: - // root == [ ..., prev, cur, ... ] - // rootOfNewList == [ ..., prevInNewList, curInNewList, ... ] - // into: - // root == [ ..., prev, rootOfNewList, ..., prevInNewList, - // cur, ... ] - // rootOfNewList' == [ curInNewList, ... ] - // - // Note that the next insertion point we'll check is *after* cur, - // since we know that cur <= curInNewList. - - NodeTraits::setNext(prevInNewList, cur); - rootOfNewList = curInNewList; - prev = cur; - cur = NodeTraits::getNext(cur); - - setLastInsertionPoint(prevInNewList, /*known end of equals*/ true); - - // Any stopper we have was only known to exceed the original root - // node of the new list, which we've now inserted. From now on, - // we'll need to scan to the end of the list. - stopper = Node(); - } - } - - /// Get the current list that's been built up, and clear the internal - /// state of this merger. - std::tuple release() { - auto result = std::make_tuple(root, lastInsertionPoint); - root = Node(); - lastInsertionPoint = LastInsertionPoint(); - return result; - } - -private: - /// Set the last point at which we inserted a node, and specify - /// whether we know it was the last in its sequence of equals. - void setLastInsertionPoint(Node lastIP, bool knownEndOfEquals) { - lastInsertionPoint.node = lastIP; - lastInsertionPoint.isKnownLastOfEquals = knownEndOfEquals; - } - - /// Given the value of lastInsertionPoint (passed in to avoid - /// reloading it), find the last node in the sequence of equals that - /// contains it. - Node findLastOfEqualsFromLastIP(Node lastIP) const { - assert(lastIP == lastInsertionPoint.node); - if (!lastInsertionPoint.isKnownLastOfEquals) - return findLastOfEquals(lastIP); - return lastIP; - } - - /// Find the last node in the sequence of equals that contains `node`. - static Node findLastOfEquals(Node node) { - while (Node next = NodeTraits::getNext(node)) { - int comparison = NodeTraits::compare(node, next); - assert(comparison <= 0 && "list is out of order"); - if (comparison < 0) break; - node = next; - } - return node; - } -}; - -} // end namespace swift - -#endif diff --git a/include/swift/Basic/PriorityQueue.h b/include/swift/Basic/PriorityQueue.h new file mode 100644 index 0000000000000..8f0e55f4fd121 --- /dev/null +++ b/include/swift/Basic/PriorityQueue.h @@ -0,0 +1,161 @@ +//===--- PriorityQueue.h - Merging sorted linked lists ----------*- C++ -*-===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// +// +// This file defines a class that helps with maintaining and merging a +// sorted linked list. +// +//===----------------------------------------------------------------------===// + +#ifndef SWIFT_BASIC_PRIORITYQUEUE_H +#define SWIFT_BASIC_PRIORITYQUEUE_H + +#include + +namespace swift { + +/// A simple linked list of nodes. +template +class SimpleQueue { +public: + Node head; + + SimpleQueue() : head() {} + + void prepend(Node newNode) { + assert(newNode && "inserting a null node"); + NodeTraits::setNext(newNode, head); + head = newNode; + } +}; + +/// A class for priority FIFO queue with a fixed number of priorities. +/// +/// The `Node` type parameter represents a reference to a list node. +/// Conceptually, a `Node` value is either null or a reference to an +/// object with an abstract sort value and a `next` reference +/// (another `Node` value). +/// +/// A null reference can be created by explicitly default-constructing +/// the `Node` type, e.g. with `Node()`. Converting a `Node` value +/// contextually to `bool` tests whether the node is a null reference. +/// `Node` values can be compared with the `==` and `!=` operators, +/// and equality with `Node()` is equivalent to a `bool` conversion. +/// These conditions are designed to allow pointer types to be used +/// directly, but they also permit other types. `ListMerger` is not +/// currently written to support smart pointer types efficiently, +/// however. +/// +/// The sort value and `next` reference are not accessed directly; +/// instead, they are accessed with `static` functions on the +/// `NodeTraits` type parameter: +/// +/// ``` +/// /// Return the current value of the next reference. +/// static Node getNext(Node n); +/// +/// /// Set the current value of the next reference. +/// static void setNext(Node n, Node next); +/// +/// /// Total number of priority buckets. +/// enum { prioritiesCount = ... }; +/// +/// /// Returns priority of the Node as value between 0 and `prioritiesCount-1`. +/// /// Smaller indices have higher priority. +/// static int getPriorityIndex(Node); +/// ``` +/// +/// All nodes are stored in a single linked list, sorted by priority. +/// Within the same priority jobs are sorted in the FIFO order. +/// +template +class PriorityQueue { +private: + /// Head of the linked list. + Node head; + /// Last node of the corresponding priority, or null if no nodes of that + /// priority exist. + Node tails[NodeTraits::prioritiesCount]; + +public: + PriorityQueue() : head(), tails{} {} + + /// Add a single node to this queue. + /// + /// The next reference of the node will be overwritten and does not + /// need to be meaningful. + /// + /// The relative order of the existing nodes in the queue will not change, + /// and if there are nodes in the current list which compare equal + /// to the new node, it will be inserted after them. + void enqueue(Node newNode) { + assert(newNode && "inserting a null node"); + int priorityIndex = NodeTraits::getPriorityIndex(newNode); + enqueueRun(priorityIndex, newNode, newNode); + } + + /// Add a chain of nodes of mixed priorities to this queue. + void enqueueContentsOf(Node otherHead) { + Node runHead = otherHead; + while (runHead) { + int priorityIndex = NodeTraits::getPriorityIndex(runHead); + + // Find run of jobs of the same priority + Node runTail = runHead; + Node next = NodeTraits::getNext(runTail); + while (true) { + if (!next) break; + if (NodeTraits::getPriorityIndex(next) != priorityIndex) break; + runTail = next; + next = NodeTraits::getNext(runTail); + } + + enqueueRun(priorityIndex, runHead, runTail); + runHead = next; + } + } + + Node dequeue() { + if (!head) { + return head; + } + auto result = head; + int resultIndex = NodeTraits::getPriorityIndex(result); + head = NodeTraits::getNext(result); + if (!head || resultIndex != NodeTraits::getPriorityIndex(head)) { + tails[resultIndex] = Node(); + } + return result; + } + + Node peek() const { return head; } + bool empty() const { return !head; } +private: + void enqueueRun(int priorityIndex, Node runHead, Node runTail) { + for (int i = priorityIndex;; i--) { + if (i < 0) { + NodeTraits::setNext(runTail, head); + head = runHead; + break; + } + if (tails[i]) { + NodeTraits::setNext(runTail, NodeTraits::getNext(tails[i])); + NodeTraits::setNext(tails[i], runHead); + break; + } + } + tails[priorityIndex] = runTail; + } +}; + +} // end namespace swift + +#endif diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 99c7e2f4ffa6d..7d799966a9d75 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -23,7 +23,8 @@ #include "swift/ABI/Actor.h" #include "swift/ABI/Task.h" #include "TaskPrivate.h" -#include "swift/Basic/ListMerger.h" +#include "swift/Basic/HeaderFooterLayout.h" +#include "swift/Basic/PriorityQueue.h" #include "swift/Concurrency/Actor.h" #include "swift/Runtime/AccessibleFunction.h" #include "swift/Runtime/Atomic.h" @@ -616,66 +617,6 @@ class ProcessOutOfLineJob : public Job { } }; -class JobRef { - enum : uintptr_t { - NeedsPreprocessing = 0x1, - JobMask = ~uintptr_t(NeedsPreprocessing) - }; - - /// A Job* that may have one of the two bits above mangled into it. - uintptr_t Value; - - JobRef(Job *job, unsigned flags) - : Value(reinterpret_cast(job) | flags) {} -public: - constexpr JobRef() : Value(0) {} - - /// Return a reference to a job that's been properly preprocessed. - static JobRef getPreprocessed(Job *job) { - /// We allow null pointers here. - return { job, 0 }; - } - - /// Return a reference to a job that hasn't been preprocessed yet. - static JobRef getUnpreprocessed(Job *job) { - assert(job && "passing a null job"); - return { job, NeedsPreprocessing }; - } - - /// Is this a null reference? - operator bool() const { return Value != 0; } - - /// Does this job need to be pre-processed before we can treat - /// the job queue as a proper queue? - bool needsPreprocessing() const { - return Value & NeedsPreprocessing; - } - - /// Is this an unprocessed message to the actor, rather than a job? - bool isMessage() const { - return false; // For now, we have no messages - } - - Job *getAsJob() const { - assert(!isMessage()); - return reinterpret_cast(Value & JobMask); - } - Job *getAsPreprocessedJob() const { - assert(!isMessage() && !needsPreprocessing()); - return reinterpret_cast(Value); - } - - /// Get the Job pointer with no preconditions on its type, for tracing. - Job *getRawJob() const { return reinterpret_cast(Value & JobMask); } - - bool operator==(JobRef other) const { - return Value == other.Value; - } - bool operator!=(JobRef other) const { - return Value != other.Value; - } -}; - /// Similar to the ActiveTaskStatus, this denotes the ActiveActorState for /// tracking the atomic state of the actor /// @@ -688,16 +629,17 @@ class JobRef { /// * Pointer to list of jobs enqueued in actor /// /// It is important for all of this information to be in the same atomic so that -/// when the actor's state changes, the information is visible to all threads that -/// may be modifying the actor, allowing the algorithm to eventually converge. +/// when the actor's state changes, the information is visible to all threads +/// that may be modifying the actor, allowing the algorithm to eventually +/// converge. /// -/// In order to provide priority escalation support with actors, deeper integration is -/// required with the OS in order to have the intended side effects. On Darwin, Swift -/// Concurrency Tasks runs on dispatch's queues. As such, we need to use an -/// encoding of thread identity vended by libdispatch called dispatch_lock_t, -/// and a futex-style dispatch API in order to escalate the priority of a -/// thread. Henceforth, the dispatch_lock_t tracked in the ActiveActorStatus -/// will be called the DrainLock. +/// In order to provide priority escalation support with actors, deeper +/// integration is required with the OS in order to have the intended side +/// effects. On Darwin, Swift Concurrency Tasks runs on dispatch's queues. As +/// such, we need to use an encoding of thread identity vended by libdispatch +/// called dispatch_lock_t, and a futex-style dispatch API in order to escalate +/// the priority of a thread. Henceforth, the dispatch_lock_t tracked in the +/// ActiveActorStatus will be called the DrainLock. /// /// When a thread starts running on an actor, it's identity is recorded in the /// ActiveActorStatus. This way, if a higher priority job is enqueued behind the @@ -713,25 +655,25 @@ class JobRef { /// /// 32 bit systems with SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION=1 /// -/// Flags Drain Lock Unused JobRef +/// Flags Drain Lock Unused Job* /// |----------------------|----------------------|----------------------|-------------------| /// 32 bits 32 bits 32 bits 32 bits /// /// 64 bit systems with SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION=1 /// -/// Flags Drain Lock JobRef +/// Flags Drain Lock Job* /// |----------------------|-------------------|----------------------| /// 32 bits 32 bits 64 bits /// /// 32 bit systems with SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION=0 /// -/// Flags JobRef +/// Flags Job* /// |----------------------|----------------------| /// 32 bits 32 bits // /// 64 bit systems with SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION=0 /// -/// Flags Unused JobRef +/// Flags Unused Job* /// |----------------------|----------------------|---------------------| /// 32 bits 32 bits 64 bits /// @@ -769,14 +711,13 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus { uint32_t Flags; LLVM_ATTRIBUTE_UNUSED uint32_t Unused = {}; #endif - JobRef FirstJob; + Job *FirstJob; #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION - ActiveActorStatus(uint32_t flags, dispatch_lock_t drainLockValue, JobRef job) - : Flags(flags), DrainLock(drainLockValue), FirstJob(job) {} + ActiveActorStatus(uint32_t flags, dispatch_lock_t drainLockValue, Job *job) + : Flags(flags), DrainLock(drainLockValue), FirstJob(job) {} #else - ActiveActorStatus(uint32_t flags, JobRef job) - : Flags(flags), FirstJob(job) {} + ActiveActorStatus(uint32_t flags, Job *job) : Flags(flags), FirstJob(job) {} #endif uint32_t getActorState() const { @@ -797,10 +738,9 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus { #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION constexpr ActiveActorStatus() - : Flags(), DrainLock(DLOCK_OWNER_NULL), FirstJob(JobRef()) {} + : Flags(), DrainLock(DLOCK_OWNER_NULL), FirstJob(nullptr) {} #else - constexpr ActiveActorStatus() - : Flags(), FirstJob(JobRef()) {} + constexpr ActiveActorStatus() : Flags(), FirstJob(nullptr) {} #endif bool isIdle() const { @@ -927,10 +867,8 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus { #endif } - JobRef getFirstJob() const { - return FirstJob; - } - ActiveActorStatus withFirstJob(JobRef firstJob) const { + Job *getFirstUnprioritisedJob() const { return FirstJob; } + ActiveActorStatus withFirstUnprioritisedJob(Job *firstJob) const { #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION return ActiveActorStatus(Flags, DrainLock, firstJob); #else @@ -975,8 +913,7 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus { break; } concurrency::trace::actor_state_changed( - actor, getFirstJob().getRawJob(), getFirstJob().needsPreprocessing(), - traceState, distributedActorIsRemote, + actor, getFirstUnprioritisedJob(), traceState, distributedActorIsRemote, isMaxPriorityEscalated(), static_cast(getMaxPriority())); } }; @@ -985,29 +922,23 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus { /// Given that a job is enqueued normally on a default actor, get/set /// the next job in the actor's queue. -static JobRef getNextJobInQueue(Job *job) { - return *reinterpret_cast(job->SchedulerPrivate); +static Job *getNextJob(Job *job) { + return *reinterpret_cast(job->SchedulerPrivate); } -static void setNextJobInQueue(Job *job, JobRef next) { - *reinterpret_cast(job->SchedulerPrivate) = next; +static void setNextJob(Job *job, Job *next) { + *reinterpret_cast(job->SchedulerPrivate) = next; } -namespace { - struct JobQueueTraits { - static Job *getNext(Job *job) { - return getNextJobInQueue(job).getAsPreprocessedJob(); - } - static void setNext(Job *job, Job *next) { - setNextJobInQueue(job, JobRef::getPreprocessed(next)); - } - static int compare(Job *lhs, Job *rhs) { - return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority()); + static Job *getNext(Job *job) { return getNextJob(job); } + static void setNext(Job *job, Job *next) { setNextJob(job, next); } + + enum { prioritiesCount = PriorityBucketCount }; + static int getPriorityIndex(Job *job) { + return getPriorityBucketIndex(job->getPriority()); } }; -} // end anonymous namespace - #endif #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES @@ -1019,6 +950,46 @@ static_assert(sizeof(ActiveActorStatus) == ACTIVE_ACTOR_STATUS_SIZE, "ActiveActorStatus is of incorrect size"); #endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */ +class DefaultActorImplHeader : public HeapObject { +protected: +#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS + // If actors are locks, we don't need to maintain any extra bookkeeping in the + // ActiveActorStatus since all threads which are contending will block + // synchronously, no job queue is needed and the lock will handle all priority + // escalation logic + Mutex drainLock; +#else + // Note: There is some padding that is added here by the compiler in order to + // enforce alignment. This is space that is available for us to use in + // the future + alignas(sizeof(ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)]; +#endif + // TODO (rokhinip): Make this a flagset + bool isDistributedRemoteActor; +}; + +// All the fields accessed under the actor's lock should be moved +// to the end of the default-actor reservation to minimize false sharing. +// The memory following the DefaultActorImpl object are the stored properties of +// the actor, which are all accessed only by the current processing thread. +class DefaultActorImplFooter { +protected: +#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS + using SimpleQueue = swift::SimpleQueue; + using PriorityQueue = swift::PriorityQueue; + + // When enqueued, jobs are atomically added to a linked list with the head + // stored inside ActiveActorStatus. This list contains jobs in the LIFO order + // regardless of their priorities. + // + // When the processing thread sees new incoming jobs in + // ActiveActorStatus, it reverses them and inserts them into + // prioritizedJobs in the appropriate priority bucket. + // + PriorityQueue prioritizedJobs; +#endif +}; + /// The default actor implementation. /// /// Ownership of the actor is subtle. Jobs are assumed to keep the actor @@ -1069,25 +1040,10 @@ static_assert(sizeof(ActiveActorStatus) == ACTIVE_ACTOR_STATUS_SIZE, /// processing job for an actor at a given time. Stealers jobs support does not /// exist yet. As a result, the subset of rules that currently apply /// are (1), (3), (5), (6). -class DefaultActorImpl : public HeapObject { -#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS - // If actors are locks, we don't need to maintain any extra bookkeeping in the - // ActiveActorStatus since all threads which are contending will block - // synchronously, no job queue is needed and the lock will handle all priority - // escalation logic - Mutex drainLock; -#else - // Note: There is some padding that is added here by the compiler in order to - // enforce alignment. This is space that is available for us to use in - // the future - alignas(sizeof(ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)]; - - using ListMerger = swift::ListMerger; - ListMerger::LastInsertionPoint lastInsertionPoint = - ListMerger::LastInsertionPoint(); -#endif - // TODO (rokhinip): Make this a flagset - bool isDistributedRemoteActor; +class DefaultActorImpl + : public HeaderFooterLayout { public: /// Properly construct an actor, except for the heap header. void initialize(bool isDistributedRemote = false) { @@ -1096,6 +1052,7 @@ class DefaultActorImpl : public HeapObject { new (&this->drainLock) Mutex(); #else _status().store(ActiveActorStatus(), std::memory_order_relaxed); + new (&this->prioritizedJobs) PriorityQueue(); #endif SWIFT_TASK_DEBUG_LOG("Creating default actor %p", this); concurrency::trace::actor_create(this); @@ -1123,7 +1080,8 @@ class DefaultActorImpl : public HeapObject { /// new priority void enqueueStealer(Job *job, JobPriority priority); - // The calling thread must be holding the actor lock while calling this + /// Dequeues one job from `prioritizedJobs`. + /// The calling thread must be holding the actor lock while calling this Job *drainOne(); #endif @@ -1135,11 +1093,11 @@ class DefaultActorImpl : public HeapObject { #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS swift::atomic &_status() { - return reinterpret_cast&> (this->StatusStorage); + return reinterpret_cast &>(this->StatusStorage); } const swift::atomic &_status() const { - return reinterpret_cast&> (this->StatusStorage); + return reinterpret_cast &>(this->StatusStorage); } // Only for static assert use below, not for actual use otherwise @@ -1161,9 +1119,14 @@ class DefaultActorImpl : public HeapObject { /// when actor gets a priority override and we schedule a stealer. void scheduleActorProcessJob(JobPriority priority); - Job *preprocessQueue(JobRef start); - Job *preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, - Job *existingProcessedJobsToMergeInto); + /// Atomically takes a list of jobs from ActiveActorStatus, reversing them in + /// the process. Returns jobs of mixed priorities in FIFO order. + SimpleQueue collectJobs(); + + /// Check for new jobs in the incoming queue and move them to the + /// processing queue. + /// Called with actor lock held on current thread. + void processJobs(); #endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */ void deallocateUnconditional(); @@ -1240,109 +1203,9 @@ static NonDefaultDistributedActorImpl *asImpl(NonDefaultDistributedActor *actor) #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS -// Called with the actor drain lock held -// -// This function is called when we hit a conflict between preprocessQueue and -// a concurrent enqueuer resulting in unprocessed jobs being queued up in the -// middle. -// -// We need to find the unprocessed jobs enqueued by the enqueuer and process -// them - We know that these unprocessed jobs must exist between the new head -// and the previous start. We can then process these jobs and merge them into -// the already processed list of jobs from the previous iteration of -// preprocessQueue -Job *DefaultActorImpl::preprocessQueue(JobRef unprocessedStart, - JobRef unprocessedEnd, - Job *existingProcessedJobsToMergeInto) { - assert(existingProcessedJobsToMergeInto != NULL); - assert(unprocessedStart.needsPreprocessing()); - assert(unprocessedStart.getAsJob() != unprocessedEnd.getAsJob()); - - // Build up a list of jobs we need to preprocess - ListMerger jobsToProcess; - - // Get just the prefix list of unprocessed jobs - auto current = unprocessedStart; - while (current != unprocessedEnd) { - assert(current.needsPreprocessing()); - // Advance current to next pointer and process current unprocessed job - auto job = current.getAsJob(); - current = getNextJobInQueue(job); - - jobsToProcess.insertAtFront(job); - } - - // Finish processing the unprocessed jobs - Job *newProcessedJobs = std::get<0>(jobsToProcess.release()); - assert(newProcessedJobs); - - ListMerger mergedList(existingProcessedJobsToMergeInto, lastInsertionPoint); - mergedList.merge(newProcessedJobs); - Job *result; - std::tie(result, lastInsertionPoint) = mergedList.release(); - return result; -} - -// Called with the actor drain lock held. -// -// Preprocess the queue starting from the top -Job *DefaultActorImpl::preprocessQueue(JobRef start) { - if (!start) { - return NULL; - } - - // Entire queue is well formed, no pre-processing needed - if (!start.needsPreprocessing()) { - return start.getAsPreprocessedJob(); - } - - // There exist some jobs which haven't been preprocessed - - // Build up a list of jobs we need to preprocess - ListMerger jobsToProcess; - - Job *wellFormedListStart = NULL; - - auto current = start; - while (current) { - if (!current.needsPreprocessing()) { - // We can assume that everything from here onwards as being well formed - // and sorted - wellFormedListStart = current.getAsPreprocessedJob(); - break; - } - - // Advance current to next pointer and insert current fella to jobsToProcess - // list - auto job = current.getAsJob(); - current = getNextJobInQueue(job); - - jobsToProcess.insertAtFront(job); - } - - // Finish processing the unprocessed jobs - auto processedJobHead = std::get<0>(jobsToProcess.release()); - assert(processedJobHead); - - Job *firstJob = NULL; - if (wellFormedListStart) { - // Merge it with already known well formed list if we have one. - ListMerger mergedList(wellFormedListStart, lastInsertionPoint); - mergedList.merge(processedJobHead); - std::tie(firstJob, lastInsertionPoint) = mergedList.release(); - } else { - // Nothing to merge with, just return the head we already have - firstJob = processedJobHead; - lastInsertionPoint = ListMerger::LastInsertionPoint(); - } - - return firstJob; -} - static void traceJobQueue(DefaultActorImpl *actor, Job *first) { - concurrency::trace::actor_note_job_queue(actor, first, [](Job *job) { - return getNextJobInQueue(job).getAsPreprocessedJob(); - }); + concurrency::trace::actor_note_job_queue( + actor, first, [](Job *job) { return getNextJob(job); }); } static SWIFT_ATTRIBUTE_ALWAYS_INLINE void traceActorStateTransition(DefaultActorImpl *actor, @@ -1382,11 +1245,9 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) { auto newState = oldState; // Link this into the queue in the atomic state - JobRef currentHead = oldState.getFirstJob(); - setNextJobInQueue(job, currentHead); - JobRef newHead = JobRef::getUnpreprocessed(job); - - newState = newState.withFirstJob(newHead); + Job *currentHead = oldState.getFirstUnprioritisedJob(); + setNextJob(job, currentHead); + newState = newState.withFirstUnprioritisedJob(job); if (oldState.isIdle()) { // Schedule the actor @@ -1511,57 +1372,70 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) { } -// Called with actor lock held on current thread -Job * DefaultActorImpl::drainOne() { - SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this); - +DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() { // Pairs with the store release in DefaultActorImpl::enqueue bool distributedActorIsRemote = swift_distributed_actor_is_remote(this); auto oldState = _status().load(SWIFT_MEMORY_ORDER_CONSUME); _swift_tsan_consume(this); - auto jobToPreprocessFrom = oldState.getFirstJob(); - Job *firstJob = preprocessQueue(jobToPreprocessFrom); - traceJobQueue(this, firstJob); - + SimpleQueue result; + // We must ensure that any jobs not seen by collectJobs() don't have any + // dangling references to the jobs that have been collected. For that we must + // atomically set head pointer to NULL. If it fails because more jobs have + // been added in the meantime, we have to re-read the head pointer. while (true) { - assert(oldState.isAnyRunning()); - - if (!firstJob) { - // Nothing to drain, short circuit - SWIFT_TASK_DEBUG_LOG("No jobs to drain on actor %p", this); - return NULL; + // If there aren't any new jobs in the incoming queue, we can return + // immediately without updating the status. + if (!oldState.getFirstUnprioritisedJob()) { + return result; } + assert(oldState.isAnyRunning()); auto newState = oldState; - // Dequeue the first job and set up a new head - newState = newState.withFirstJob(getNextJobInQueue(firstJob)); - if (_status().compare_exchange_weak(oldState, newState, - /* success */ std::memory_order_relaxed, - /* failure */ std::memory_order_relaxed)) { - lastInsertionPoint.nodeWasRemoved(firstJob); - SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this); - traceActorStateTransition(this, oldState, newState, distributedActorIsRemote); - concurrency::trace::actor_dequeue(this, firstJob); - return firstJob; + newState = newState.withFirstUnprioritisedJob(nullptr); + + if (_status().compare_exchange_weak( + oldState, newState, + /* success */ std::memory_order_relaxed, + /* failure */ std::memory_order_relaxed)) { + SWIFT_TASK_DEBUG_LOG("Collected some jobs from actor %p", this); + traceActorStateTransition(this, oldState, newState, + distributedActorIsRemote); + break; } + } - // We failed the weak cmpxchg spuriously, go through loop again. - if (oldState.getFirstJob().getAsJob() == jobToPreprocessFrom.getAsJob()) { - continue; - } + // Collect jobs, reversing them in the process + auto job = oldState.getFirstUnprioritisedJob(); + while (job) { + auto next = getNextJob(job); + result.prepend(job); + job = next; + } - // There were new items concurrently added to the queue. We need to - // preprocess the newly added unprocessed items and merge them to the already - // preprocessed list. - // - // The newly merged items that need to be preprocessed, are between the head - // of the linked list, and the last job we did the previous preprocessQueue - // on - firstJob = preprocessQueue(oldState.getFirstJob(), jobToPreprocessFrom, firstJob); - jobToPreprocessFrom = oldState.getFirstJob(); - traceJobQueue(this, firstJob); + return result; +} + +// Called with actor lock held on current thread +void DefaultActorImpl::processJobs() { + SimpleQueue jobs = collectJobs(); + prioritizedJobs.enqueueContentsOf(jobs.head); +} + +// Called with actor lock held on current thread +Job *DefaultActorImpl::drainOne() { + SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this); + + processJobs(); + traceJobQueue(this, prioritizedJobs.peek()); + auto firstJob = prioritizedJobs.dequeue(); + if (!firstJob) { + SWIFT_TASK_DEBUG_LOG("No jobs to drain on actor %p", this); + } else { + SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this); + concurrency::trace::actor_dequeue(this, firstJob); } + return firstJob; } // Called from processing jobs which are created to drain an actor. We need to @@ -1679,16 +1553,18 @@ void DefaultActorImpl::destroy() { #if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS // TODO (rokhinip): Do something to assert that the lock is unowned #else - auto oldState = _status().load(std::memory_order_relaxed); + auto oldState = _status().load(std::memory_order_acquire); // Tasks on an actor are supposed to keep the actor alive until they start // running and we can only get here if ref count of the object = 0 which means // there should be no more tasks enqueued on the actor. - assert(!oldState.getFirstJob() && "actor has queued jobs at destruction"); + assert(!oldState.getFirstUnprioritisedJob() && "actor has queued jobs at destruction"); if (oldState.isIdle()) { - return; + assert(prioritizedJobs.empty() && "actor has queued jobs at destruction"); + return; } assert(oldState.isRunning() && "actor scheduled but not running at destruction"); + // In running state we cannot safely access prioritizedJobs to assert that it is empty. #endif } @@ -1740,7 +1616,7 @@ retry:; bool distributedActorIsRemote = swift_distributed_actor_is_remote(this); auto oldState = _status().load(std::memory_order_relaxed); while (true) { - + bool assertNoJobs = false; if (asDrainer) { #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION if (!oldState.isScheduled()) { @@ -1781,7 +1657,10 @@ retry:; } assert(oldState.getMaxPriority() == JobPriority::Unspecified); - assert(!oldState.getFirstJob()); + assert(!oldState.getFirstUnprioritisedJob()); + // We cannot assert here that prioritizedJobs is empty, + // because lock is not held yet. Raise a flag to assert after getting the lock. + assertNoJobs = true; } // Taking the drain lock clears the max priority escalated bit because we've @@ -1794,6 +1673,9 @@ retry:; std::memory_order_acquire, std::memory_order_relaxed)) { _swift_tsan_acquire(this); + if (assertNoJobs) { + assert(prioritizedJobs.empty()); + } traceActorStateTransition(this, oldState, newState, distributedActorIsRemote); return true; } @@ -1842,7 +1724,8 @@ bool DefaultActorImpl::unlock(bool forceUnlock) } auto newState = oldState; - if (oldState.getFirstJob()) { + // Lock is still held at this point, so it is safe to access prioritizedJobs + if (!prioritizedJobs.empty() || oldState.getFirstUnprioritisedJob()) { // There is work left to do, don't unlock the actor if (!forceUnlock) { SWIFT_TASK_DEBUG_LOG("Unlock-ing actor %p failed", this); @@ -1880,7 +1763,6 @@ bool DefaultActorImpl::unlock(bool forceUnlock) if (newState.isScheduled()) { // See ownership rule (6) in DefaultActorImpl - assert(newState.getFirstJob()); scheduleActorProcessJob(newState.getMaxPriority()); } else { // See ownership rule (5) in DefaultActorImpl diff --git a/stdlib/public/Concurrency/CooperativeGlobalExecutor.inc b/stdlib/public/Concurrency/CooperativeGlobalExecutor.inc index 904b9adebe269..31ef7912c962e 100644 --- a/stdlib/public/Concurrency/CooperativeGlobalExecutor.inc +++ b/stdlib/public/Concurrency/CooperativeGlobalExecutor.inc @@ -28,7 +28,7 @@ # include #endif #include -#include "swift/Basic/ListMerger.h" +#include "swift/Basic/PriorityQueue.h" #if __has_include() # include @@ -50,11 +50,12 @@ struct JobQueueTraits { static void setNext(Job *job, Job *next) { storage(job) = next; } - static int compare(Job *lhs, Job *rhs) { - return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority()); + enum { prioritiesCount = PriorityBucketCount }; + static int getPriorityIndex(Job *job) { + return getPriorityBucketIndex(job->getPriority()); } }; -using JobQueueMerger = ListMerger; +using JobPriorityQueue = PriorityQueue; using JobDeadline = std::chrono::time_point; @@ -98,17 +99,14 @@ struct JobDeadlineStorage { } // end anonymous namespace -static Job *JobQueue = nullptr; +static JobPriorityQueue JobQueue; static Job *DelayedJobQueue = nullptr; /// Insert a job into the cooperative global queue. SWIFT_CC(swift) static void swift_task_enqueueGlobalImpl(Job *job) { assert(job && "no job provided"); - - JobQueueMerger merger(JobQueue); - merger.insert(job); - JobQueue = std::get<0>(merger.release()); + JobQueue.enqueue(job); } /// Enqueues a task on the main executor. @@ -188,7 +186,6 @@ static void recognizeReadyDelayedJobs() { if (!nextDelayedJob) return; auto now = std::chrono::steady_clock::now(); - JobQueueMerger readyJobs(JobQueue); // Pull jobs off of the delayed-jobs queue whose deadline has been // reached, and add them to the ready queue. @@ -198,11 +195,9 @@ static void recognizeReadyDelayedJobs() { JobDeadlineStorage<>::destroy(nextDelayedJob); auto next = JobQueueTraits::getNext(nextDelayedJob); - readyJobs.insert(nextDelayedJob); + JobQueue.enqueue(nextDelayedJob); nextDelayedJob = next; } - - JobQueue = std::get<0>(readyJobs.release()); DelayedJobQueue = nextDelayedJob; } @@ -232,8 +227,7 @@ static Job *claimNextFromCooperativeGlobalQueue() { recognizeReadyDelayedJobs(); // If there's a job in the primary queue, run it. - if (auto job = JobQueue) { - JobQueue = JobQueueTraits::getNext(job); + if (auto job = JobQueue.dequeue()) { return job; } diff --git a/stdlib/public/Concurrency/Tracing.h b/stdlib/public/Concurrency/Tracing.h index 7b28fa8087caa..0059cf1d494ad 100644 --- a/stdlib/public/Concurrency/Tracing.h +++ b/stdlib/public/Concurrency/Tracing.h @@ -47,8 +47,7 @@ void actor_dequeue(HeapObject *actor, Job *job); // State values are: // Idle = 0, Scheduled = 1, Running = 2, Zombie_ReadyForDeallocation = 3, // invalid/unknown = 255 -void actor_state_changed(HeapObject *actor, Job *firstJob, - bool needsPreprocessing, uint8_t state, +void actor_state_changed(HeapObject *actor, Job *firstJob, uint8_t state, bool isDistributedRemote, bool isPriorityEscalated, uint8_t maxPriority); diff --git a/stdlib/public/Concurrency/TracingSignpost.h b/stdlib/public/Concurrency/TracingSignpost.h index f7595199e54f0..5374bf18bbc2d 100644 --- a/stdlib/public/Concurrency/TracingSignpost.h +++ b/stdlib/public/Concurrency/TracingSignpost.h @@ -140,8 +140,7 @@ inline void actor_dequeue(HeapObject *actor, Job *job) { } } -inline void actor_state_changed(HeapObject *actor, Job *firstJob, - bool needsPreprocessing, uint8_t state, +inline void actor_state_changed(HeapObject *actor, Job *firstJob, uint8_t state, bool isDistributedRemote, bool isPriorityEscalated, uint8_t maxPriority) { ENSURE_LOGS(); @@ -150,8 +149,8 @@ inline void actor_state_changed(HeapObject *actor, Job *firstJob, "actor=%p needsPreprocessing=%d " "state=%u isDistributedRemote=%{bool}d " "isPriorityEscalated=%{bool}d, maxPriority=%u", - actor, needsPreprocessing, state, isDistributedRemote, - isPriorityEscalated, maxPriority); + actor, (firstJob != nullptr), state, + isDistributedRemote, isPriorityEscalated, maxPriority); } inline void actor_note_job_queue(HeapObject *actor, Job *first, diff --git a/stdlib/public/Concurrency/TracingStubs.h b/stdlib/public/Concurrency/TracingStubs.h index 4ab35aeb1745f..4e8399af4dd73 100644 --- a/stdlib/public/Concurrency/TracingStubs.h +++ b/stdlib/public/Concurrency/TracingStubs.h @@ -33,8 +33,7 @@ inline void actor_enqueue(HeapObject *actor, Job *job) {} inline void actor_dequeue(HeapObject *actor, Job *job) {} -inline void actor_state_changed(HeapObject *actor, Job *firstJob, - bool needsPreprocessing, uint8_t state, +inline void actor_state_changed(HeapObject *actor, Job *firstJob, uint8_t state, bool isDistributedRemote, bool isPriorityEscalated, uint8_t maxPriority) { } diff --git a/utils/test-list-merger/Makefile b/utils/test-list-merger/Makefile deleted file mode 100644 index 5e5edb024c86a..0000000000000 --- a/utils/test-list-merger/Makefile +++ /dev/null @@ -1,13 +0,0 @@ -SWIFT_SRCROOT=${CURDIR}/../.. -SRCROOT=${SWIFT_SRCROOT}/.. -LLVM_SRCROOT=${SRCROOT}/llvm/ -LLVM_OBJROOT=${SRCROOT}/build/Ninja-DebugAssert/llvm-macosx-$(shell uname -m) - -HEADERS=${SWIFT_SRCROOT}/include/swift/Basic/ListMerger.h - -CXXFLAGS=-Wall -std=c++17 -stdlib=libc++ -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -I${OBJROOT}/include -I${SWIFT_SRCROOT}/include -I${LLVM_SRCROOT}/include -I${LLVM_OBJROOT}/include - -TestListMerger: TestListMerger.o - $(CXX) -L${LLVM_OBJROOT}/lib -lLLVMSupport $< -o $@ - -TestListMerger.o: ${HEADERS} diff --git a/utils/test-list-merger/TestListMerger.cpp b/utils/test-list-merger/TestListMerger.cpp deleted file mode 100644 index 481a7ea84d216..0000000000000 --- a/utils/test-list-merger/TestListMerger.cpp +++ /dev/null @@ -1,252 +0,0 @@ -#include "swift/Basic/ListMerger.h" -#include "llvm/ADT/ArrayRef.h" - -#include -#include -#include - -using namespace swift; - -static int compare_unsigned(unsigned lhs, unsigned rhs) { - return (lhs < rhs ? -1 : lhs > rhs ? 1 : 0); -} - -namespace { -enum EntryOrder { - creationOrder, - reverseCreationOrder -}; - -struct Entry { - unsigned id; - unsigned value; - Entry *next; -}; - -class EntryFactory { - std::vector> entries; - unsigned nextID = 0; -public: - Entry *create(unsigned value) { - auto entry = new Entry{nextID++, value, nullptr}; - entries.emplace_back(entry); - return entry; - } - - /// Sort the entries in this list. - /// - /// \param reverseCreationOrder - if true, then order equal-value - /// nodes in the reverse of creation order; otherwise - /// creator order of creation order - void sort(EntryOrder order) { - std::sort(entries.begin(), entries.end(), - [=](const std::unique_ptr &lhs, - const std::unique_ptr &rhs) { - if (lhs->value != rhs->value) return lhs->value < rhs->value; - return order == creationOrder - ? lhs->id < rhs->id - : lhs->id > rhs->id; - }); - } - - void checkSameAs(Entry *list) { - for (auto &entry: entries) { - std::cout << " " << list->value << " (" << list->id << ")\n"; - assert(list == entry.get()); - list = list->next; - }; - assert(list == nullptr); - } -}; - -struct EntryListTraits { - static Entry *getNext(Entry *e) { return e->next; } - static void setNext(Entry *e, Entry *next) { e->next = next; } - static int compare(Entry *lhs, Entry *rhs) { - return compare_unsigned(lhs->value, rhs->value); - } -}; - -using EntryListMerger = ListMerger; - -enum Op { - insert, - beginMerge, - endMerge -}; - -/// An instruction to the test harness: either a simple value -/// (an "insert") or one of the special instructions. -struct Instruction { - Op op; - unsigned value; - - Instruction(Op op) : op(op), value(0) { assert(op != insert); } - Instruction(unsigned value) : op(insert), value(value) {} - - friend std::ostream &operator<<(std::ostream &str, const Instruction &inst) { - switch (inst.op) { - case insert: str << inst.value; break; - case beginMerge: str << "beginMerge"; break; - case endMerge: str << "endMerge"; break; - } - return str; - } -}; -} // end anonymous namespace - -template -static std::ostream &operator<<(std::ostream &str, llvm::ArrayRef list) { - str << "{"; - for (auto b = list.begin(), i = b, e = list.end(); i != e; ++i) { - if (i != b) str << ", "; - str << *i; - } - str << "}"; - return str; -} - -template -static std::ostream &operator<<(std::ostream &str, const std::vector &list) { - return (str << llvm::ArrayRef(list)); -} - -static void runInsertAndMergeTest(llvm::ArrayRef values) { - EntryFactory entries; - EntryListMerger merger; - - // Between beginMerge and endMerge instructions, values don't get - // inserted immediately: they build up into a separate list of items - // that will be merged at the time of the endMerge. We record this - // mode by making lastMergeEntry non-null. - Entry *firstMergeEntry; - Entry **lastMergeEntry = nullptr; - - for (auto &inst : values) { - switch (inst.op) { - case insert: { - // Create the new entry. - Entry *entry = entries.create(inst.value); - - // If we're building a merge list, append to the end of it. - if (lastMergeEntry) { - *lastMergeEntry = entry; - lastMergeEntry = &entry->next; - - // Otherwise, just do an insertion. - } else { - merger.insert(entry); - } - break; - } - - case beginMerge: - assert(!lastMergeEntry && "already building a merge list"); - lastMergeEntry = &firstMergeEntry; - break; - - case endMerge: - assert(lastMergeEntry && "not building a merge list?"); - // Cap off the merge list we built. - *lastMergeEntry = nullptr; - - // Do the merge. - merger.merge(firstMergeEntry); - - // We're no longer building a merge list. - lastMergeEntry = nullptr; - break; - } - } - assert(!lastMergeEntry && "ended while still building a merge list"); - - entries.sort(creationOrder); - entries.checkSameAs(std::get<0>(merger.release())); -} - -static void runInsertAtFrontTest(llvm::ArrayRef values) { - EntryFactory entries; - EntryListMerger merger; - for (auto value: values) { - merger.insertAtFront(entries.create(value)); - } - entries.sort(reverseCreationOrder); - entries.checkSameAs(std::get<0>(merger.release())); -} - -static void runConcreteTests() { - runInsertAndMergeTest({ 5, 0, 3, 0, 1, 0, 7 }); -} - -namespace { -struct TestConfig { - unsigned numTests; - unsigned numEntries; - unsigned maxValue; -}; - -} - -static void runInsertAndMergeTests(const TestConfig &config) { - std::random_device randomDevice; - std::default_random_engine e(randomDevice()); - std::uniform_int_distribution valueDist(0, config.maxValue); - // Chance of entering or exiting a merge. - std::uniform_int_distribution mergeDist(0, 20); - - std::vector ins; - for (unsigned testN = 0; testN < config.numTests; ++testN) { - ins.clear(); - - const size_t noMerge = -1; - size_t mergeStart = noMerge; - for (unsigned i = 0; i < config.numEntries || mergeStart != noMerge; ++i) { - if (mergeDist(e) == 0) { - if (mergeStart != noMerge) { - std::sort(ins.begin() + mergeStart, ins.end(), - [](const Instruction &lhs, const Instruction &rhs) { - return lhs.value < rhs.value; - }); - ins.push_back(endMerge); - mergeStart = noMerge; - } else { - ins.push_back(beginMerge); - mergeStart = ins.size(); - } - } else { - ins.push_back(valueDist(e)); - } - } - - std::cout << "runInsertAndMergeTest(" << ins << ");" << std::endl; - runInsertAndMergeTest(ins); - } -} - -static void runInsertAtFrontTests(const TestConfig &config) { - std::random_device randomDevice; - std::default_random_engine e(randomDevice()); - std::uniform_int_distribution valueDist(0, config.maxValue); - - std::vector ins; - for (unsigned testN = 0; testN < config.numTests; ++testN) { - ins.clear(); - for (unsigned i = 0; i < config.numEntries; ++i) { - ins.push_back(valueDist(e)); - } - - std::cout << "runInsertAtFrontTest(" << ins << ");" << std::endl; - runInsertAtFrontTest(ins); - } -} - -int main() { - TestConfig config = { - .numTests = 1000, - .numEntries = 2000, - .maxValue = 3 - }; - runConcreteTests(); - runInsertAndMergeTests(config); - runInsertAtFrontTests(config); -} diff --git a/utils/test-priority-queue/Makefile b/utils/test-priority-queue/Makefile new file mode 100644 index 0000000000000..0e94628b51e9d --- /dev/null +++ b/utils/test-priority-queue/Makefile @@ -0,0 +1,11 @@ +SWIFT_SRCROOT=${CURDIR}/../.. +SRCROOT=${SWIFT_SRCROOT}/.. + +HEADERS=${SWIFT_SRCROOT}/include/swift/Basic/PriorityQueue.h + +CXXFLAGS=-Wall -std=c++17 -stdlib=libc++ -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -I${OBJROOT}/include -I${SWIFT_SRCROOT}/include + +TestPriorityQueue: TestPriorityQueue.o + $(CXX) $< -o $@ + +TestPriorityQueue.o: ${HEADERS} diff --git a/utils/test-priority-queue/TestPriorityQueue.cpp b/utils/test-priority-queue/TestPriorityQueue.cpp new file mode 100644 index 0000000000000..11489e4666c27 --- /dev/null +++ b/utils/test-priority-queue/TestPriorityQueue.cpp @@ -0,0 +1,246 @@ +#include +#include +#include +#include + +#define private public +#include "swift/Basic/PriorityQueue.h" + +using namespace swift; + +namespace { + +enum EntryOrder { priorityOrder, reverseCreationOrder }; + +struct Entry { + unsigned id; + unsigned value; + Entry *next; +}; + +class EntryFactory { + std::vector> entries; + unsigned nextID = 0; + +public: + Entry *create(unsigned value) { + auto entry = new Entry{nextID++, value, nullptr}; + entries.emplace_back(entry); + return entry; + } + + void remove(Entry *e) { + auto it = std::find_if( + entries.begin(), entries.end(), + [=](const std::unique_ptr &ptr) { return ptr.get() == e; }); + assert(it != entries.end()); + entries.erase(it); + } + + /// Sort the entries in this list. + /// + /// \param reverseCreationOrder - if true, then order equal-value + /// nodes in the reverse of creation order; otherwise + /// creator order of creation order + void sort(EntryOrder order) { + if (order == priorityOrder) { + std::sort(entries.begin(), entries.end(), + [=](const std::unique_ptr &lhs, + const std::unique_ptr &rhs) { + if (lhs->value != rhs->value) + return lhs->value < rhs->value; + return lhs->id < rhs->id; + }); + } else { + std::sort( + entries.begin(), entries.end(), + [=](const std::unique_ptr &lhs, + const std::unique_ptr &rhs) { return lhs->id > rhs->id; }); + } + } + + void checkSameAs(Entry *list) { + for (auto &entry : entries) { + std::cout << " " << list->value << " (" << list->id << ")\n"; + assert(list == entry.get()); + list = list->next; + }; + assert(list == nullptr); + } +}; + +struct EntryListTraits { + static Entry *getNext(Entry *e) { return e->next; } + static void setNext(Entry *e, Entry *next) { e->next = next; } + enum { prioritiesCount = 4 }; + static int getPriorityIndex(Entry *e) { + assert(e->value < prioritiesCount); + return (int)e->value; + } +}; + +using EntrySimpleQueue = SimpleQueue; +using EntryPriorityQueue = PriorityQueue; + +static void runPrependTest() { + std::cout << "runPrependTest()" << std::endl; + EntryFactory entries; + EntrySimpleQueue queue; + assert(!queue.head); + assert(!queue.tail); + + auto first = entries.create(3); + queue.prepend(first); + assert(queue.head == first); + assert(queue.tail == first); + + auto second = entries.create(0); + queue.prepend(second); + assert(queue.head == second); + assert(queue.tail == first); + + auto third = entries.create(2); + queue.prepend(third); + assert(queue.head == third); + assert(queue.tail == first); + + entries.sort(reverseCreationOrder); + entries.checkSameAs(queue.head); +} + +static void runEnqueueDequeueTest() { + std::cout << "runEnqueueDequeueTest()" << std::endl; + EntryFactory entries; + EntryPriorityQueue queue; + assert(!queue.head); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(!queue.tails[2]); + assert(!queue.tails[3]); + + auto first = entries.create(3); + queue.enqueue(first); + assert(queue.head == first); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(!queue.tails[2]); + assert(queue.tails[3] == first); + + auto second = entries.create(3); + queue.enqueue(second); + assert(queue.head == first); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(!queue.tails[2]); + assert(queue.tails[3] == second); + + auto third = entries.create(1); + queue.enqueue(third); + assert(queue.head == third); + assert(!queue.tails[0]); + assert(queue.tails[1] == third); + assert(!queue.tails[2]); + assert(queue.tails[3] == second); + + auto fourth = entries.create(2); + queue.enqueue(fourth); + assert(queue.head == third); + assert(!queue.tails[0]); + assert(queue.tails[1] == third); + assert(queue.tails[2] == fourth); + assert(queue.tails[3] == second); + + entries.sort(priorityOrder); + entries.checkSameAs(queue.head); + + auto pop = [&](Entry *expected) { + auto e = queue.dequeue(); + assert(e == expected); + entries.remove(e); + entries.sort(priorityOrder); + entries.checkSameAs(queue.head); + }; + + pop(third); + assert(queue.head == fourth); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(queue.tails[2] == fourth); + assert(queue.tails[3] == second); + + pop(fourth); + assert(queue.head == first); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(!queue.tails[2]); + assert(queue.tails[3] == second); + + pop(first); + assert(queue.head == second); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(!queue.tails[2]); + assert(queue.tails[3] == second); + + pop(second); + assert(!queue.head); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(!queue.tails[2]); + assert(!queue.tails[3]); + + assert(!queue.dequeue()); + assert(!queue.head); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(!queue.tails[2]); + assert(!queue.tails[3]); +} + +static void runConcreteTests() { + runPrependTest(); + runEnqueueDequeueTest(); +} + +struct TestConfig { + unsigned numTests; + unsigned maxEntries; +}; + +static void runEnqueueDequeueTests(const TestConfig &config) { + std::random_device randomDevice; + std::default_random_engine e(randomDevice()); + std::uniform_int_distribution valueDist( + 0, EntryListTraits::prioritiesCount - 1); + std::uniform_int_distribution numEntriesDist(0, config.maxEntries); + + for (unsigned testN = 0; testN < config.numTests; ++testN) { + std::cout << "runEnqueueDequeueTests() #" << testN << std::endl; + EntryFactory entries; + EntryPriorityQueue queue; + unsigned numEntries = numEntriesDist(e); + std::cout << "numEntries = " << numEntries << std::endl; + for (unsigned i = 0; i < numEntries; ++i) { + auto value = valueDist(e); + std::cout << "-- " << value << std::endl; + queue.enqueue(entries.create(value)); + } + entries.sort(priorityOrder); + entries.checkSameAs(queue.head); + for (unsigned i = 0; i < numEntries; ++i) { + auto e = queue.dequeue(); + std::cout << "pop " << e->value << std::endl; + entries.remove(e); + entries.sort(priorityOrder); + entries.checkSameAs(queue.head); + } + } +} + +} // namespace + +int main() { + runConcreteTests(); + TestConfig config = {.numTests = 1000, .maxEntries = 20}; + runEnqueueDequeueTests(config); +} From 9ba09ff63d6ea26cf26158f5a32dd009fe65047f Mon Sep 17 00:00:00 2001 From: Mykola Pokhylets Date: Mon, 6 May 2024 12:56:15 +0200 Subject: [PATCH 4/4] Process incoming queue when obtaining drainer lock --- include/swift/Basic/PriorityQueue.h | 36 ++-- stdlib/public/Concurrency/Actor.cpp | 107 ++++----- .../test-priority-queue/TestPriorityQueue.cpp | 203 ++++++++++++------ 3 files changed, 213 insertions(+), 133 deletions(-) diff --git a/include/swift/Basic/PriorityQueue.h b/include/swift/Basic/PriorityQueue.h index 8f0e55f4fd121..bcdc74b246bbd 100644 --- a/include/swift/Basic/PriorityQueue.h +++ b/include/swift/Basic/PriorityQueue.h @@ -22,21 +22,6 @@ namespace swift { -/// A simple linked list of nodes. -template -class SimpleQueue { -public: - Node head; - - SimpleQueue() : head() {} - - void prepend(Node newNode) { - assert(newNode && "inserting a null node"); - NodeTraits::setNext(newNode, head); - head = newNode; - } -}; - /// A class for priority FIFO queue with a fixed number of priorities. /// /// The `Node` type parameter represents a reference to a list node. @@ -104,23 +89,25 @@ class PriorityQueue { /// Add a chain of nodes of mixed priorities to this queue. void enqueueContentsOf(Node otherHead) { + if (!otherHead) return; Node runHead = otherHead; - while (runHead) { - int priorityIndex = NodeTraits::getPriorityIndex(runHead); - + int priorityIndex = NodeTraits::getPriorityIndex(runHead); + do { // Find run of jobs of the same priority Node runTail = runHead; Node next = NodeTraits::getNext(runTail); - while (true) { - if (!next) break; - if (NodeTraits::getPriorityIndex(next) != priorityIndex) break; + int nextRunPriorityIndex = badIndex; + while (next) { + nextRunPriorityIndex = NodeTraits::getPriorityIndex(next); + if (nextRunPriorityIndex != priorityIndex) break; runTail = next; next = NodeTraits::getNext(runTail); } enqueueRun(priorityIndex, runHead, runTail); runHead = next; - } + priorityIndex = nextRunPriorityIndex; + } while(runHead); } Node dequeue() { @@ -139,6 +126,11 @@ class PriorityQueue { Node peek() const { return head; } bool empty() const { return !head; } private: + // Use large negative value to increase chance of causing segfault if this + // value ends up being used for indexing. Using -1 would cause accessing + // `head`, which is less noticeable. + static const int badIndex = std::numeric_limits::min(); + void enqueueRun(int priorityIndex, Node runHead, Node runTail) { for (int i = priorityIndex;; i--) { if (i < 0) { diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 7d799966a9d75..a63ad82435d1b 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -975,7 +975,6 @@ class DefaultActorImplHeader : public HeapObject { class DefaultActorImplFooter { protected: #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS - using SimpleQueue = swift::SimpleQueue; using PriorityQueue = swift::PriorityQueue; // When enqueued, jobs are atomically added to a linked list with the head @@ -1080,9 +1079,13 @@ class DefaultActorImpl /// new priority void enqueueStealer(Job *job, JobPriority priority); - /// Dequeues one job from `prioritizedJobs`. + /// Dequeues one job from `prioritisedJobs`. /// The calling thread must be holding the actor lock while calling this Job *drainOne(); + + /// Atomically claims incoming jobs from ActiveActorStatus, and calls `handleUnprioritizedJobs()`. + /// Called with actor lock held on current thread. + void processIncomingQueue(); #endif /// Check if the actor is actually a distributed *remote* actor. @@ -1119,14 +1122,10 @@ class DefaultActorImpl /// when actor gets a priority override and we schedule a stealer. void scheduleActorProcessJob(JobPriority priority); - /// Atomically takes a list of jobs from ActiveActorStatus, reversing them in - /// the process. Returns jobs of mixed priorities in FIFO order. - SimpleQueue collectJobs(); - - /// Check for new jobs in the incoming queue and move them to the - /// processing queue. + /// Processes claimed incoming jobs into `prioritizedJobs`. + /// Incoming jobs are of mixed priorities and in LIFO order. /// Called with actor lock held on current thread. - void processJobs(); + void handleUnprioritizedJobs(Job *head); #endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */ void deallocateUnconditional(); @@ -1372,13 +1371,12 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) { } -DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() { +void DefaultActorImpl::processIncomingQueue() { // Pairs with the store release in DefaultActorImpl::enqueue bool distributedActorIsRemote = swift_distributed_actor_is_remote(this); auto oldState = _status().load(SWIFT_MEMORY_ORDER_CONSUME); _swift_tsan_consume(this); - SimpleQueue result; // We must ensure that any jobs not seen by collectJobs() don't have any // dangling references to the jobs that have been collected. For that we must // atomically set head pointer to NULL. If it fails because more jobs have @@ -1387,7 +1385,7 @@ DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() { // If there aren't any new jobs in the incoming queue, we can return // immediately without updating the status. if (!oldState.getFirstUnprioritisedJob()) { - return result; + return; } assert(oldState.isAnyRunning()); @@ -1405,28 +1403,26 @@ DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() { } } - // Collect jobs, reversing them in the process - auto job = oldState.getFirstUnprioritisedJob(); - while (job) { - auto next = getNextJob(job); - result.prepend(job); - job = next; - } - - return result; + handleUnprioritizedJobs(oldState.getFirstUnprioritisedJob()); } // Called with actor lock held on current thread -void DefaultActorImpl::processJobs() { - SimpleQueue jobs = collectJobs(); - prioritizedJobs.enqueueContentsOf(jobs.head); +void DefaultActorImpl::handleUnprioritizedJobs(Job *head) { + // Reverse jobs from LIFO to FIFO order + Job *reversed = nullptr; + while (head) { + auto next = getNextJob(head); + setNextJob(head, reversed); + reversed = head; + head = next; + } + prioritizedJobs.enqueueContentsOf(reversed); } // Called with actor lock held on current thread Job *DefaultActorImpl::drainOne() { SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this); - processJobs(); traceJobQueue(this, prioritizedJobs.peek()); auto firstJob = prioritizedJobs.dequeue(); if (!firstJob) { @@ -1490,39 +1486,40 @@ static void defaultActorDrain(DefaultActorImpl *actor) { TaskExecutorRef::undefined()); while (true) { - if (shouldYieldThread()) { - currentActor->unlock(true); - break; - } - Job *job = currentActor->drainOne(); if (job == NULL) { // No work left to do, try unlocking the actor. This may fail if there is // work concurrently enqueued in which case, we'd try again in the loop - if (!currentActor->unlock(false)) { - continue; + if (currentActor->unlock(false)) { + break; + } + } else { + if (AsyncTask *task = dyn_cast(job)) { + auto taskExecutor = task->getPreferredTaskExecutor(); + trackingInfo.setTaskExecutor(taskExecutor); } - break; - } - if (AsyncTask *task = dyn_cast(job)) { - auto taskExecutor = task->getPreferredTaskExecutor(); - trackingInfo.setTaskExecutor(taskExecutor); + // This thread is now going to follow the task on this actor. It may hop off + // the actor + runJobInEstablishedExecutorContext(job); + + // We could have come back from the job on a generic executor and not as + // part of a default actor. If so, there is no more work left for us to do + // here. + auto currentExecutor = trackingInfo.getActiveExecutor(); + if (!currentExecutor.isDefaultActor()) { + currentActor = nullptr; + break; + } + currentActor = asImpl(currentExecutor.getDefaultActor()); } - // This thread is now going to follow the task on this actor. It may hop off - // the actor - runJobInEstablishedExecutorContext(job); - - // We could have come back from the job on a generic executor and not as - // part of a default actor. If so, there is no more work left for us to do - // here. - auto currentExecutor = trackingInfo.getActiveExecutor(); - if (!currentExecutor.isDefaultActor()) { - currentActor = nullptr; + if (shouldYieldThread()) { + currentActor->unlock(true); break; } - currentActor = asImpl(currentExecutor.getDefaultActor()); + + currentActor->processIncomingQueue(); } // Leave the tracking info. @@ -1668,6 +1665,17 @@ retry:; auto newState = oldState.withRunning(); newState = newState.withoutEscalatedPriority(); + // Claim incoming jobs when obtaining lock as a drainer, to save one + // round of atomic load and compare-exchange. + // This is not useful when obtaining lock for assuming thread during actor + // switching, because arbitrary use code can run between locking and + // draining the next job. So we still need to call processIncomingQueue() to + // check for higher priority jobs that could have been scheduled in the + // meantime. And processing is more efficient when done in larger batches. + if (asDrainer) { + newState = newState.withFirstUnprioritisedJob(nullptr); + } + // This needs an acquire since we are taking a lock if (_status().compare_exchange_weak(oldState, newState, std::memory_order_acquire, @@ -1677,6 +1685,9 @@ retry:; assert(prioritizedJobs.empty()); } traceActorStateTransition(this, oldState, newState, distributedActorIsRemote); + if (asDrainer) { + handleUnprioritizedJobs(oldState.getFirstUnprioritisedJob()); + } return true; } } diff --git a/utils/test-priority-queue/TestPriorityQueue.cpp b/utils/test-priority-queue/TestPriorityQueue.cpp index 11489e4666c27..6f63b18bbfc7c 100644 --- a/utils/test-priority-queue/TestPriorityQueue.cpp +++ b/utils/test-priority-queue/TestPriorityQueue.cpp @@ -10,8 +10,6 @@ using namespace swift; namespace { -enum EntryOrder { priorityOrder, reverseCreationOrder }; - struct Entry { unsigned id; unsigned value; @@ -42,24 +40,18 @@ class EntryFactory { /// \param reverseCreationOrder - if true, then order equal-value /// nodes in the reverse of creation order; otherwise /// creator order of creation order - void sort(EntryOrder order) { - if (order == priorityOrder) { - std::sort(entries.begin(), entries.end(), - [=](const std::unique_ptr &lhs, - const std::unique_ptr &rhs) { - if (lhs->value != rhs->value) - return lhs->value < rhs->value; - return lhs->id < rhs->id; - }); - } else { - std::sort( - entries.begin(), entries.end(), - [=](const std::unique_ptr &lhs, - const std::unique_ptr &rhs) { return lhs->id > rhs->id; }); - } + void sort() { + std::sort(entries.begin(), entries.end(), + [=](const std::unique_ptr &lhs, + const std::unique_ptr &rhs) { + if (lhs->value != rhs->value) + return lhs->value < rhs->value; + return lhs->id < rhs->id; + }); } - void checkSameAs(Entry *list) { + void checkSameAs(Entry *list, unsigned line) { + std::cout << " <<< check " << line << std::endl; for (auto &entry : entries) { std::cout << " " << list->value << " (" << list->id << ")\n"; assert(list == entry.get()); @@ -79,34 +71,29 @@ struct EntryListTraits { } }; -using EntrySimpleQueue = SimpleQueue; using EntryPriorityQueue = PriorityQueue; -static void runPrependTest() { - std::cout << "runPrependTest()" << std::endl; - EntryFactory entries; - EntrySimpleQueue queue; - assert(!queue.head); - assert(!queue.tail); - - auto first = entries.create(3); - queue.prepend(first); - assert(queue.head == first); - assert(queue.tail == first); +struct ListBuilder { + Entry *head; + Entry **pTail; - auto second = entries.create(0); - queue.prepend(second); - assert(queue.head == second); - assert(queue.tail == first); + ListBuilder(): head(), pTail(&head) {} + ListBuilder(ListBuilder const &) = delete; + ListBuilder(ListBuilder &&) = delete; - auto third = entries.create(2); - queue.prepend(third); - assert(queue.head == third); - assert(queue.tail == first); + void append(Entry *e) { + *pTail = e; + e->next = nullptr; + pTail = &e->next; + } - entries.sort(reverseCreationOrder); - entries.checkSameAs(queue.head); -} + Entry *take() { + Entry* result = head; + head = nullptr; + pTail = &head; + return result; + } +}; static void runEnqueueDequeueTest() { std::cout << "runEnqueueDequeueTest()" << std::endl; @@ -150,15 +137,15 @@ static void runEnqueueDequeueTest() { assert(queue.tails[2] == fourth); assert(queue.tails[3] == second); - entries.sort(priorityOrder); - entries.checkSameAs(queue.head); + entries.sort(); + entries.checkSameAs(queue.head, __LINE__); auto pop = [&](Entry *expected) { auto e = queue.dequeue(); assert(e == expected); entries.remove(e); - entries.sort(priorityOrder); - entries.checkSameAs(queue.head); + entries.sort(); + entries.checkSameAs(queue.head, __LINE__); }; pop(third); @@ -197,24 +184,73 @@ static void runEnqueueDequeueTest() { assert(!queue.tails[3]); } -static void runConcreteTests() { - runPrependTest(); - runEnqueueDequeueTest(); -} +static void runEnqueueContentsOfTest() { + std::cout << "runEnqueueContentsOfTest()" << std::endl; + EntryFactory entries; + EntryPriorityQueue queue; + ListBuilder builder; + assert(!queue.head); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(!queue.tails[2]); + assert(!queue.tails[3]); -struct TestConfig { - unsigned numTests; - unsigned maxEntries; -}; + queue.enqueueContentsOf(builder.take()); + assert(!queue.head); + assert(!queue.tails[0]); + assert(!queue.tails[1]); + assert(!queue.tails[2]); + assert(!queue.tails[3]); + + auto first = entries.create(3); + builder.append(first); + auto second = entries.create(3); + builder.append(second); + auto third = entries.create(1); + builder.append(third); + auto fourth = entries.create(2); + builder.append(fourth); + auto fifth = entries.create(2); + builder.append(fifth); + auto sixth = entries.create(1); + builder.append(sixth); + + queue.enqueueContentsOf(builder.take()); + assert(queue.head == third); + assert(!queue.tails[0]); + assert(queue.tails[1] == sixth); + assert(queue.tails[2] == fifth); + assert(queue.tails[3] == second); + entries.sort(); + entries.checkSameAs(queue.head, __LINE__); + + auto seventh = entries.create(0); + builder.append(seventh); + auto eighth = entries.create(0); + builder.append(eighth); + auto ninth = entries.create(0); + builder.append(ninth); + auto tenth = entries.create(3); + builder.append(tenth); + queue.enqueueContentsOf(builder.take()); + + assert(queue.head == seventh); + assert(queue.tails[0] == ninth); + assert(queue.tails[1] == sixth); + assert(queue.tails[2] == fifth); + assert(queue.tails[3] == tenth); + entries.sort(); + entries.checkSameAs(queue.head, __LINE__); +} -static void runEnqueueDequeueTests(const TestConfig &config) { +static void runEnqueueDequeueTests(unsigned numTests, unsigned maxEntries) { std::random_device randomDevice; std::default_random_engine e(randomDevice()); std::uniform_int_distribution valueDist( 0, EntryListTraits::prioritiesCount - 1); - std::uniform_int_distribution numEntriesDist(0, config.maxEntries); + std::uniform_int_distribution numEntriesDist(0, maxEntries); - for (unsigned testN = 0; testN < config.numTests; ++testN) { + for (unsigned testN = 0; testN < numTests; ++testN) { std::cout << "runEnqueueDequeueTests() #" << testN << std::endl; EntryFactory entries; EntryPriorityQueue queue; @@ -225,14 +261,54 @@ static void runEnqueueDequeueTests(const TestConfig &config) { std::cout << "-- " << value << std::endl; queue.enqueue(entries.create(value)); } - entries.sort(priorityOrder); - entries.checkSameAs(queue.head); + entries.sort(); + entries.checkSameAs(queue.head, __LINE__); for (unsigned i = 0; i < numEntries; ++i) { auto e = queue.dequeue(); std::cout << "pop " << e->value << std::endl; entries.remove(e); - entries.sort(priorityOrder); - entries.checkSameAs(queue.head); + entries.sort(); + entries.checkSameAs(queue.head, __LINE__); + } + } +} + +static void runEnqueueContentsOfTests(unsigned numTests, unsigned maxChains, unsigned maxEntries) { + std::random_device randomDevice; + std::default_random_engine e(randomDevice()); + std::uniform_int_distribution valueDist( + 0, EntryListTraits::prioritiesCount - 1); + std::uniform_int_distribution numChainsDist(1, maxChains); + std::uniform_int_distribution numEntriesDist(0, maxEntries); + + for (unsigned testN = 0; testN < numTests; ++testN) { + std::cout << "runEnqueueContentsOfTests() #" << testN << std::endl; + EntryFactory entries; + EntryPriorityQueue queue; + unsigned totalEntries = 0; + unsigned numChains = numChainsDist(e); + std::cout << "numChains = " << numChains << std::endl; + for (unsigned i = 0; i < numChains; ++i) { + unsigned numEntries = numEntriesDist(e); + std::cout << "numEntries = " << numEntries << std::endl; + totalEntries += numEntries; + + ListBuilder builder; + for (unsigned j = 0; j < numEntries; ++j) { + auto value = valueDist(e); + std::cout << "-- " << value << std::endl; + builder.append(entries.create(value)); + } + queue.enqueueContentsOf(builder.take()); + } + entries.sort(); + entries.checkSameAs(queue.head, __LINE__); + for (unsigned i = 0; i < totalEntries; ++i) { + auto e = queue.dequeue(); + std::cout << "pop " << e->value << std::endl; + entries.remove(e); + entries.sort(); + entries.checkSameAs(queue.head, __LINE__); } } } @@ -240,7 +316,8 @@ static void runEnqueueDequeueTests(const TestConfig &config) { } // namespace int main() { - runConcreteTests(); - TestConfig config = {.numTests = 1000, .maxEntries = 20}; - runEnqueueDequeueTests(config); + runEnqueueDequeueTest(); + runEnqueueContentsOfTest(); + runEnqueueDequeueTests(1000, 20); + runEnqueueContentsOfTests(1000, 10, 20); }