Skip to content

Commit ff7a06c

Browse files
authored
Merge pull request #49 from clue-labs/iterable-v4
[4.x] Support iterable type for `parallel()` + `series()` + `waterfall()`
2 parents cfd52ac + 2343d9c commit ff7a06c

File tree

5 files changed

+296
-27
lines changed

5 files changed

+296
-27
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ $promise->then(function (int $bytes) {
396396

397397
### parallel()
398398

399-
The `parallel(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
399+
The `parallel(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
400400
like this:
401401

402402
```php
@@ -438,7 +438,7 @@ React\Async\parallel([
438438

439439
### series()
440440

441-
The `series(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
441+
The `series(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
442442
like this:
443443

444444
```php
@@ -480,7 +480,7 @@ React\Async\series([
480480

481481
### waterfall()
482482

483-
The `waterfall(array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
483+
The `waterfall(iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
484484
like this:
485485

486486
```php

src/functions.php

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -533,10 +533,10 @@ function coroutine(callable $function, mixed ...$args): PromiseInterface
533533
}
534534

535535
/**
536-
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
536+
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
537537
* @return PromiseInterface<array<mixed>,Exception>
538538
*/
539-
function parallel(array $tasks): PromiseInterface
539+
function parallel(iterable $tasks): PromiseInterface
540540
{
541541
$pending = [];
542542
$deferred = new Deferred(function () use (&$pending) {
@@ -548,15 +548,10 @@ function parallel(array $tasks): PromiseInterface
548548
$pending = [];
549549
});
550550
$results = [];
551-
$errored = false;
551+
$continue = true;
552552

553-
$numTasks = count($tasks);
554-
if (0 === $numTasks) {
555-
$deferred->resolve($results);
556-
}
557-
558-
$taskErrback = function ($error) use (&$pending, $deferred, &$errored) {
559-
$errored = true;
553+
$taskErrback = function ($error) use (&$pending, $deferred, &$continue) {
554+
$continue = false;
560555
$deferred->reject($error);
561556

562557
foreach ($pending as $promise) {
@@ -568,33 +563,39 @@ function parallel(array $tasks): PromiseInterface
568563
};
569564

570565
foreach ($tasks as $i => $task) {
571-
$taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) {
566+
$taskCallback = function ($result) use (&$results, &$pending, &$continue, $i, $deferred) {
572567
$results[$i] = $result;
568+
unset($pending[$i]);
573569

574-
if (count($results) === $numTasks) {
570+
if (!$pending && !$continue) {
575571
$deferred->resolve($results);
576572
}
577573
};
578574

579-
$promise = call_user_func($task);
575+
$promise = \call_user_func($task);
580576
assert($promise instanceof PromiseInterface);
581577
$pending[$i] = $promise;
582578

583579
$promise->then($taskCallback, $taskErrback);
584580

585-
if ($errored) {
581+
if (!$continue) {
586582
break;
587583
}
588584
}
589585

586+
$continue = false;
587+
if (!$pending) {
588+
$deferred->resolve($results);
589+
}
590+
590591
return $deferred->promise();
591592
}
592593

593594
/**
594-
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
595+
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
595596
* @return PromiseInterface<array<mixed>,Exception>
596597
*/
597-
function series(array $tasks): PromiseInterface
598+
function series(iterable $tasks): PromiseInterface
598599
{
599600
$pending = null;
600601
$deferred = new Deferred(function () use (&$pending) {
@@ -605,20 +606,31 @@ function series(array $tasks): PromiseInterface
605606
});
606607
$results = [];
607608

609+
if ($tasks instanceof \IteratorAggregate) {
610+
$tasks = $tasks->getIterator();
611+
assert($tasks instanceof \Iterator);
612+
}
613+
608614
/** @var callable():void $next */
609615
$taskCallback = function ($result) use (&$results, &$next) {
610616
$results[] = $result;
611617
$next();
612618
};
613619

614620
$next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) {
615-
if (0 === count($tasks)) {
621+
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
616622
$deferred->resolve($results);
617623
return;
618624
}
619625

620-
$task = array_shift($tasks);
621-
$promise = call_user_func($task);
626+
if ($tasks instanceof \Iterator) {
627+
$task = $tasks->current();
628+
$tasks->next();
629+
} else {
630+
$task = \array_shift($tasks);
631+
}
632+
633+
$promise = \call_user_func($task);
622634
assert($promise instanceof PromiseInterface);
623635
$pending = $promise;
624636

@@ -631,10 +643,10 @@ function series(array $tasks): PromiseInterface
631643
}
632644

633645
/**
634-
* @param array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
646+
* @param iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
635647
* @return PromiseInterface<mixed,Exception>
636648
*/
637-
function waterfall(array $tasks): PromiseInterface
649+
function waterfall(iterable $tasks): PromiseInterface
638650
{
639651
$pending = null;
640652
$deferred = new Deferred(function () use (&$pending) {
@@ -644,15 +656,26 @@ function waterfall(array $tasks): PromiseInterface
644656
$pending = null;
645657
});
646658

659+
if ($tasks instanceof \IteratorAggregate) {
660+
$tasks = $tasks->getIterator();
661+
assert($tasks instanceof \Iterator);
662+
}
663+
647664
/** @var callable $next */
648665
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
649-
if (0 === count($tasks)) {
666+
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
650667
$deferred->resolve($value);
651668
return;
652669
}
653670

654-
$task = array_shift($tasks);
655-
$promise = call_user_func_array($task, func_get_args());
671+
if ($tasks instanceof \Iterator) {
672+
$task = $tasks->current();
673+
$tasks->next();
674+
} else {
675+
$task = \array_shift($tasks);
676+
}
677+
678+
$promise = \call_user_func_array($task, func_get_args());
656679
assert($promise instanceof PromiseInterface);
657680
$pending = $promise;
658681

tests/ParallelTest.php

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class ParallelTest extends TestCase
1011
{
@@ -17,6 +18,19 @@ public function testParallelWithoutTasks()
1718
$promise->then($this->expectCallableOnceWith(array()));
1819
}
1920

21+
public function testParallelWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
22+
{
23+
$tasks = (function () {
24+
if (false) {
25+
yield;
26+
}
27+
})();
28+
29+
$promise = React\Async\parallel($tasks);
30+
31+
$promise->then($this->expectCallableOnceWith([]));
32+
}
33+
2034
public function testParallelWithTasks()
2135
{
2236
$tasks = array(
@@ -49,6 +63,38 @@ function () {
4963
$timer->assertInRange(0.1, 0.2);
5064
}
5165

66+
public function testParallelWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
67+
{
68+
$tasks = (function () {
69+
yield function () {
70+
return new Promise(function ($resolve) {
71+
Loop::addTimer(0.1, function () use ($resolve) {
72+
$resolve('foo');
73+
});
74+
});
75+
};
76+
yield function () {
77+
return new Promise(function ($resolve) {
78+
Loop::addTimer(0.11, function () use ($resolve) {
79+
$resolve('bar');
80+
});
81+
});
82+
};
83+
})();
84+
85+
$promise = React\Async\parallel($tasks);
86+
87+
$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));
88+
89+
$timer = new Timer($this);
90+
$timer->start();
91+
92+
Loop::run();
93+
94+
$timer->stop();
95+
$timer->assertInRange(0.1, 0.2);
96+
}
97+
5298
public function testParallelWithErrorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
5399
{
54100
$called = 0;
@@ -81,6 +127,25 @@ function () use (&$called) {
81127
$this->assertSame(2, $called);
82128
}
83129

130+
public function testParallelWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
131+
{
132+
$called = 0;
133+
134+
$tasks = (function () use (&$called) {
135+
while (true) {
136+
yield function () use (&$called) {
137+
return reject(new \RuntimeException('Rejected ' . ++$called));
138+
};
139+
}
140+
})();
141+
142+
$promise = React\Async\parallel($tasks);
143+
144+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
145+
146+
$this->assertSame(1, $called);
147+
}
148+
84149
public function testParallelWithErrorWillCancelPendingPromises()
85150
{
86151
$cancelled = 0;

tests/SeriesTest.php

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class SeriesTest extends TestCase
1011
{
@@ -17,6 +18,19 @@ public function testSeriesWithoutTasks()
1718
$promise->then($this->expectCallableOnceWith(array()));
1819
}
1920

21+
public function testSeriesWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
22+
{
23+
$tasks = (function () {
24+
if (false) {
25+
yield;
26+
}
27+
})();
28+
29+
$promise = React\Async\series($tasks);
30+
31+
$promise->then($this->expectCallableOnceWith([]));
32+
}
33+
2034
public function testSeriesWithTasks()
2135
{
2236
$tasks = array(
@@ -49,6 +63,38 @@ function () {
4963
$timer->assertInRange(0.10, 0.20);
5064
}
5165

66+
public function testSeriesWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
67+
{
68+
$tasks = (function () {
69+
yield function () {
70+
return new Promise(function ($resolve) {
71+
Loop::addTimer(0.051, function () use ($resolve) {
72+
$resolve('foo');
73+
});
74+
});
75+
};
76+
yield function () {
77+
return new Promise(function ($resolve) {
78+
Loop::addTimer(0.051, function () use ($resolve) {
79+
$resolve('bar');
80+
});
81+
});
82+
};
83+
})();
84+
85+
$promise = React\Async\series($tasks);
86+
87+
$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));
88+
89+
$timer = new Timer($this);
90+
$timer->start();
91+
92+
Loop::run();
93+
94+
$timer->stop();
95+
$timer->assertInRange(0.10, 0.20);
96+
}
97+
5298
public function testSeriesWithError()
5399
{
54100
$called = 0;
@@ -80,6 +126,47 @@ function () use (&$called) {
80126
$this->assertSame(1, $called);
81127
}
82128

129+
public function testSeriesWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
130+
{
131+
$called = 0;
132+
133+
$tasks = (function () use (&$called) {
134+
while (true) {
135+
yield function () use (&$called) {
136+
return reject(new \RuntimeException('Rejected ' . ++$called));
137+
};
138+
}
139+
})();
140+
141+
$promise = React\Async\series($tasks);
142+
143+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
144+
145+
$this->assertSame(1, $called);
146+
}
147+
148+
public function testSeriesWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
149+
{
150+
$tasks = new class() implements \IteratorAggregate {
151+
public $called = 0;
152+
153+
public function getIterator(): \Iterator
154+
{
155+
while (true) {
156+
yield function () {
157+
return reject(new \RuntimeException('Rejected ' . ++$this->called));
158+
};
159+
}
160+
}
161+
};
162+
163+
$promise = React\Async\series($tasks);
164+
165+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
166+
167+
$this->assertSame(1, $tasks->called);
168+
}
169+
83170
public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
84171
{
85172
$cancelled = 0;

0 commit comments

Comments
 (0)