|
2 | 2 |
|
3 | 3 | namespace React\Async;
|
4 | 4 |
|
| 5 | +use Fiber; |
5 | 6 | use React\EventLoop\Loop;
|
6 | 7 | use React\Promise\CancellablePromiseInterface;
|
7 | 8 | use React\Promise\Deferred;
|
|
155 | 156 | */
|
156 | 157 | function async(callable $function): callable
|
157 | 158 | {
|
158 |
| - return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void { |
159 |
| - $fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void { |
160 |
| - try { |
161 |
| - $resolve($function(...$args)); |
162 |
| - } catch (\Throwable $exception) { |
163 |
| - $reject($exception); |
| 159 | + return static function (mixed ...$args) use ($function): PromiseInterface { |
| 160 | + $fiber = null; |
| 161 | + $promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void { |
| 162 | + $fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void { |
| 163 | + try { |
| 164 | + $resolve($function(...$args)); |
| 165 | + } catch (\Throwable $exception) { |
| 166 | + $reject($exception); |
| 167 | + } finally { |
| 168 | + fiberMap()->unregister($fiber); |
| 169 | + } |
| 170 | + }); |
| 171 | + |
| 172 | + fiberMap()->register($fiber); |
| 173 | + |
| 174 | + $fiber->start(); |
| 175 | + }, function () use (&$fiber): void { |
| 176 | + if ($fiber instanceof Fiber) { |
| 177 | + fiberMap()->cancel($fiber); |
| 178 | + foreach (fiberMap()->getPromises($fiber) as $promise) { |
| 179 | + if (method_exists($promise, 'cancel')) { |
| 180 | + $promise->cancel(); |
| 181 | + } |
| 182 | + } |
164 | 183 | }
|
165 | 184 | });
|
166 | 185 |
|
167 |
| - $fiber->start(); |
168 |
| - }); |
| 186 | + $lowLevelFiber = \Fiber::getCurrent(); |
| 187 | + if ($lowLevelFiber !== null) { |
| 188 | + fiberMap()->attachPromise($lowLevelFiber, $promise); |
| 189 | + } |
| 190 | + |
| 191 | + return $promise; |
| 192 | + }; |
169 | 193 | }
|
170 | 194 |
|
171 | 195 |
|
@@ -230,6 +254,13 @@ function await(PromiseInterface $promise): mixed
|
230 | 254 | $rejected = false;
|
231 | 255 | $resolvedValue = null;
|
232 | 256 | $rejectedThrowable = null;
|
| 257 | + $lowLevelFiber = \Fiber::getCurrent(); |
| 258 | + |
| 259 | + if ($lowLevelFiber !== null) { |
| 260 | + if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) { |
| 261 | + $promise->cancel(); |
| 262 | + } |
| 263 | + } |
233 | 264 |
|
234 | 265 | $promise->then(
|
235 | 266 | function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
|
@@ -285,6 +316,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
|
285 | 316 | throw $rejectedThrowable;
|
286 | 317 | }
|
287 | 318 |
|
| 319 | + if ($lowLevelFiber !== null) { |
| 320 | + fiberMap()->attachPromise($lowLevelFiber, $promise); |
| 321 | + } |
| 322 | + |
288 | 323 | $fiber = FiberFactory::create();
|
289 | 324 |
|
290 | 325 | return $fiber->suspend();
|
@@ -601,3 +636,17 @@ function waterfall(array $tasks): PromiseInterface
|
601 | 636 |
|
602 | 637 | return $deferred->promise();
|
603 | 638 | }
|
| 639 | + |
| 640 | +/** |
| 641 | + * @internal |
| 642 | + */ |
| 643 | +function fiberMap(): FiberMap |
| 644 | +{ |
| 645 | + static $wm = null; |
| 646 | + |
| 647 | + if ($wm === null) { |
| 648 | + $wm = new FiberMap(); |
| 649 | + } |
| 650 | + |
| 651 | + return $wm; |
| 652 | +} |
0 commit comments