|
1 |
| -use std::any::Any; |
| 1 | +use std::sync::atomic::{AtomicBool, Ordering}; |
2 | 2 |
|
3 | 3 | use crate::job::StackJob;
|
4 | 4 | use crate::latch::SpinLatch;
|
5 |
| -use crate::registry::{self, WorkerThread}; |
6 |
| -use crate::tlv::{self, Tlv}; |
7 |
| -use crate::{FnContext, unwind}; |
| 5 | +use crate::{FnContext, registry, tlv, unwind}; |
8 | 6 |
|
9 | 7 | #[cfg(test)]
|
10 | 8 | mod tests;
|
@@ -134,68 +132,38 @@ where
|
134 | 132 | // Create virtual wrapper for task b; this all has to be
|
135 | 133 | // done here so that the stack frame can keep it all live
|
136 | 134 | // long enough.
|
137 |
| - let job_b = StackJob::new(tlv, call_b(oper_b), SpinLatch::new(worker_thread)); |
| 135 | + let job_b_started = AtomicBool::new(false); |
| 136 | + let job_b = StackJob::new( |
| 137 | + tlv, |
| 138 | + |migrated| { |
| 139 | + job_b_started.store(true, Ordering::Relaxed); |
| 140 | + call_b(oper_b)(migrated) |
| 141 | + }, |
| 142 | + SpinLatch::new(worker_thread), |
| 143 | + ); |
138 | 144 | let job_b_ref = job_b.as_job_ref();
|
139 | 145 | let job_b_id = job_b_ref.id();
|
140 | 146 | worker_thread.push(job_b_ref);
|
141 | 147 |
|
142 | 148 | // Execute task a; hopefully b gets stolen in the meantime.
|
143 | 149 | let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
|
144 |
| - let result_a = match status_a { |
145 |
| - Ok(v) => v, |
146 |
| - Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv), |
147 |
| - }; |
148 |
| - |
149 |
| - // Now that task A has finished, try to pop job B from the |
150 |
| - // local stack. It may already have been popped by job A; it |
151 |
| - // may also have been stolen. There may also be some tasks |
152 |
| - // pushed on top of it in the stack, and we will have to pop |
153 |
| - // those off to get to it. |
154 |
| - while !job_b.latch.probe() { |
155 |
| - if let Some(job) = worker_thread.take_local_job() { |
156 |
| - if job_b_id == job.id() { |
157 |
| - // Found it! Let's run it. |
158 |
| - // |
159 |
| - // Note that this could panic, but it's ok if we unwind here. |
160 |
| - |
161 |
| - // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. |
162 |
| - tlv::set(tlv); |
163 |
| - |
164 |
| - let result_b = job_b.run_inline(injected); |
165 |
| - return (result_a, result_b); |
166 |
| - } else { |
167 |
| - worker_thread.execute(job); |
168 |
| - } |
169 |
| - } else { |
170 |
| - // Local deque is empty. Time to steal from other |
171 |
| - // threads. |
172 |
| - worker_thread.wait_until(&job_b.latch); |
173 |
| - debug_assert!(job_b.latch.probe()); |
174 |
| - break; |
175 |
| - } |
176 |
| - } |
| 150 | + worker_thread.wait_for_jobs::<_, false>( |
| 151 | + &job_b.latch, |
| 152 | + || job_b_started.load(Ordering::Relaxed), |
| 153 | + |job| job.id() == job_b_id, |
| 154 | + |job| { |
| 155 | + debug_assert_eq!(job.id(), job_b_id); |
| 156 | + job_b.run_inline(injected); |
| 157 | + }, |
| 158 | + ); |
177 | 159 |
|
178 | 160 | // Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
|
179 | 161 | tlv::set(tlv);
|
180 | 162 |
|
| 163 | + let result_a = match status_a { |
| 164 | + Ok(v) => v, |
| 165 | + Err(err) => unwind::resume_unwinding(err), |
| 166 | + }; |
181 | 167 | (result_a, job_b.into_result())
|
182 | 168 | })
|
183 | 169 | }
|
184 |
| - |
185 |
| -/// If job A panics, we still cannot return until we are sure that job |
186 |
| -/// B is complete. This is because it may contain references into the |
187 |
| -/// enclosing stack frame(s). |
188 |
| -#[cold] // cold path |
189 |
| -unsafe fn join_recover_from_panic( |
190 |
| - worker_thread: &WorkerThread, |
191 |
| - job_b_latch: &SpinLatch<'_>, |
192 |
| - err: Box<dyn Any + Send>, |
193 |
| - tlv: Tlv, |
194 |
| -) -> ! { |
195 |
| - unsafe { worker_thread.wait_until(job_b_latch) }; |
196 |
| - |
197 |
| - // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. |
198 |
| - tlv::set(tlv); |
199 |
| - |
200 |
| - unwind::resume_unwinding(err) |
201 |
| -} |
0 commit comments