2424
2525#include < chrono>
2626#include < thread>
27+ #include " swift/Basic/ListMerger.h"
2728
28- static Job *JobQueue = nullptr ;
29+ namespace {
2930
30- class DelayedJob {
31- public:
32- Job *job;
33- unsigned long long when;
34- DelayedJob *next;
31+ struct JobQueueTraits {
32+ static Job *&storage (Job *cur) {
33+ return reinterpret_cast <Job*&>(cur->SchedulerPrivate [0 ]);
34+ }
3535
36- DelayedJob (Job *job, unsigned long long when) : job(job), when(when), next(nullptr ) {}
36+ static Job *getNext (Job *job) {
37+ return storage (job);
38+ }
39+ static void setNext (Job *job, Job *next) {
40+ storage (job) = next;
41+ }
42+ static int compare (Job *lhs, Job *rhs) {
43+ return descendingPriorityOrder (lhs->getPriority (), rhs->getPriority ());
44+ }
3745};
46+ using JobQueueMerger = ListMerger<Job*, JobQueueTraits>;
3847
39- static DelayedJob *DelayedJobQueue = nullptr ;
48+ using JobDeadline = std::chrono::time_point<std::chrono::steady_clock> ;
4049
41- // / Get the next-in-queue storage slot.
42- static Job *&nextInQueue (Job *cur) {
43- return reinterpret_cast <Job*&>(cur->SchedulerPrivate [Job::NextWaitingTaskIndex]);
44- }
50+ template <bool = (sizeof (JobDeadline) <= sizeof (void *) &&
51+ alignof (JobDeadline) <= alignof (void *))>
52+ struct JobDeadlineStorage ;
53+
54+ // / Specialization for when JobDeadline fits in SchedulerPrivate.
55+ template <>
56+ struct JobDeadlineStorage <true > {
57+ static JobDeadline &storage (Job *job) {
58+ return reinterpret_cast <JobDeadline&>(job->SchedulerPrivate [1 ]);
59+ }
60+ static JobDeadline get (Job *job) {
61+ return storage (job);
62+ }
63+ static void set (Job *job, JobDeadline deadline) {
64+ new (static_cast <void *>(&storage (job))) JobDeadline (deadline);
65+ }
66+ static void destroy (Job *job) {
67+ storage (job).~JobDeadline ();
68+ }
69+ };
70+
71+ // / Specialization for when JobDeadline doesn't fit in SchedulerPrivate.
72+ template <>
73+ struct JobDeadlineStorage <false > {
74+ static JobDeadline *&storage (Job *job) {
75+ return reinterpret_cast <JobDeadline*&>(job->SchedulerPrivate [1 ]);
76+ }
77+ static JobDeadline get (Job *job) {
78+ return *storage (job);
79+ }
80+ static void set (Job *job, JobDeadline deadline) {
81+ storage (job) = new JobDeadline (deadline);
82+ }
83+ static void destroy (Job *job) {
84+ delete storage (job);
85+ }
86+ };
87+
88+ } // end anonymous namespace
89+
90+ static Job *JobQueue = nullptr ;
91+ static Job *DelayedJobQueue = nullptr ;
4592
4693// / Insert a job into the cooperative global queue.
4794SWIFT_CC (swift)
4895static void swift_task_enqueueGlobalImpl(Job *job) {
4996 assert (job && " no job provided" );
5097
51- Job **position = &JobQueue;
52- while (auto cur = *position) {
53- // If we find a job with lower priority, insert here.
54- if (cur->getPriority () < newJob->getPriority ()) {
55- nextInQueue (newJob) = cur;
56- *position = newJob;
57- return ;
58- }
59-
60- // Otherwise, keep advancing through the queue.
61- position = &nextInQueue (cur);
62- }
63- nextInQueue (newJob) = nullptr ;
64- *position = newJob;
98+ JobQueueMerger merger (JobQueue);
99+ merger.insert (job);
100+ JobQueue = merger.release ();
65101}
66102
67103// / Enqueues a task on the main executor.
@@ -72,60 +108,80 @@ static void swift_task_enqueueMainExecutorImpl(Job *job) {
72108 swift_task_enqueueGlobalImpl (job);
73109}
74110
75- static unsigned long long currentNanos () {
76- auto now = std::chrono::steady_clock::now ();
77- auto nowNanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
78- auto value = std::chrono::duration_cast<std::chrono::nanoseconds>(nowNanos.time_since_epoch ());
79- return value.count ();
80- }
81-
82111// / Insert a job into the cooperative global queue with a delay.
83112SWIFT_CC (swift)
84- static void swift_task_enqueueGlobalWithDelayImpl(unsigned long long delay,
85- Job *job ) {
86- assert (job && " no job provided" );
113+ static void swift_task_enqueueGlobalWithDelayImpl(JobDelay delay,
114+ Job *newJob ) {
115+ assert (newJob && " no job provided" );
87116
88- DelayedJob **position = &DelayedJobQueue;
89- DelayedJob *newJob = new DelayedJob (job, currentNanos () + delay);
117+ auto deadline = std::chrono::steady_clock::now ()
118+ + std::chrono::duration_cast<JobDeadline::duration>(
119+ std::chrono::nanoseconds (delay));
120+ JobDeadlineStorage<>::set (newJob, deadline);
90121
122+ Job **position = &DelayedJobQueue;
91123 while (auto cur = *position) {
92- // If we find a job with lower priority, insert here.
93- if (cur->when > newJob->when ) {
94- newJob->next = cur;
124+ // If we find a job with a later deadline, insert here.
125+ // Note that we maintain FIFO order.
126+ if (deadline < JobDeadlineStorage<>::get (cur)) {
127+ JobQueueTraits::setNext (newJob, cur);
95128 *position = newJob;
96129 return ;
97130 }
98131
99132 // Otherwise, keep advancing through the queue.
100- position = &cur-> next ;
133+ position = &JobQueueTraits::storage ( cur) ;
101134 }
135+ JobQueueTraits::setNext (newJob, nullptr );
102136 *position = newJob;
103137}
104138
139+ // / Recognize jobs in the delayed-jobs queue that are ready to execute
140+ // / and move them to the primary queue.
141+ static void recognizeReadyDelayedJobs () {
142+ // Process all the delayed jobs.
143+ auto nextDelayedJob = DelayedJobQueue;
144+ if (!nextDelayedJob) return ;
145+
146+ auto now = std::chrono::steady_clock::now ();
147+ JobQueueMerger readyJobs (JobQueue);
148+
149+ // Pull jobs off of the delayed-jobs queue whose deadline has been
150+ // reached, and add them to the ready queue.
151+ while (nextDelayedJob &&
152+ JobDeadlineStorage<>::get (nextDelayedJob) <= now) {
153+ // Destroy the storage of the deadline in the job.
154+ JobDeadlineStorage<>::destroy (nextDelayedJob);
155+
156+ auto next = JobQueueTraits::getNext (nextDelayedJob);
157+ readyJobs.insert (nextDelayedJob);
158+ nextDelayedJob = next;
159+ }
160+
161+ JobQueue = readyJobs.release ();
162+ DelayedJobQueue = nextDelayedJob;
163+ }
164+
105165// / Claim the next job from the cooperative global queue.
106166static Job *claimNextFromCooperativeGlobalQueue () {
107- // Check delayed jobs first
108167 while (true ) {
109- if (auto delayedJob = DelayedJobQueue) {
110- if (delayedJob->when < currentNanos ()) {
111- DelayedJobQueue = delayedJob->next ;
112- auto job = delayedJob->job ;
113-
114- delete delayedJob;
115-
116- return job;
117- }
118- }
168+ // Move any delayed jobs that are now ready into the primary queue.
169+ recognizeReadyDelayedJobs ();
170+
171+ // If there's a job in the primary queue, run it.
119172 if (auto job = JobQueue) {
120- JobQueue = nextInQueue (job);
173+ JobQueue = JobQueueTraits::getNext (job);
121174 return job;
122175 }
123- // there are only delayed jobs left, but they are not ready,
124- // so we sleep until the first one is
176+
177+ // If there are only delayed jobs left, sleep until the next deadline.
178+ // TODO: should the donator have some say in this?
125179 if (auto delayedJob = DelayedJobQueue) {
126- std::this_thread::sleep_for (std::chrono::nanoseconds (delayedJob->when - currentNanos ()));
180+ auto deadline = JobDeadlineStorage<>::get (delayedJob);
181+ std::this_thread::sleep_until (deadline);
127182 continue ;
128183 }
184+
129185 return nullptr ;
130186 }
131187}
0 commit comments