Skip to content

Commit 33998fc

Browse files
committed
Add RecevObservable
1 parent aa15485 commit 33998fc

File tree

6 files changed

+217
-9
lines changed

6 files changed

+217
-9
lines changed

composer.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,24 @@
1414
"ext-parallel": "*",
1515
"react/event-loop": "^1.1",
1616
"react/promise": "^2.7",
17+
"reactivex/rxphp": "^2.0",
1718
"wyrihaximus/constants": "^1.4.3"
1819
},
1920
"require-dev": {
2021
"wyrihaximus/async-test-utilities": "^2.0",
2122
"wyrihaximus/ticking-promise": "^1.6"
2223
},
23-
"extra": {
24-
"unused": [
25-
"ext-parallel"
26-
]
27-
},
2824
"config": {
2925
"platform": {
3026
"php": "7.4"
3127
},
3228
"sort-packages": true
3329
},
30+
"extra": {
31+
"unused": [
32+
"ext-parallel"
33+
]
34+
},
3435
"autoload": {
3536
"psr-4": {
3637
"ReactParallel\\Streams\\": "src/"

composer.lock

Lines changed: 66 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

phpstan.neon

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
parameters:
22
checkMissingIterableValueType: false
3-
ignoreErrors: []
3+
ignoreErrors:
4+
- '#In method \"ReactParallel\\Streams\\RecvObservable::recv\", caught \"Throwable\" must be rethrown. Either catch a more specific exception or add a \"throw\" clause in the "catch" block to propagate the exception. More info: http:\/\/bit.ly\/failloud#'
45

56
includes:
67
- vendor/wyrihaximus/async-test-utilities/rules.neon

src/RecvObservable.php

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace ReactParallel\Streams;
4+
5+
use parallel\Events;
6+
use React\EventLoop\LoopInterface;
7+
use React\EventLoop\TimerInterface;
8+
use Rx\Disposable\CallbackDisposable;
9+
use Rx\DisposableInterface;
10+
use Rx\Observable;
11+
use Rx\ObserverInterface;
12+
use Rx\Subject\Subject;
13+
use Throwable;
14+
use function var_export;
15+
16+
final class RecvObservable
17+
{
18+
private LoopInterface $loop;
19+
20+
private Events $events;
21+
22+
public function __construct(LoopInterface $loop, Events $events)
23+
{
24+
$this->loop = $loop;
25+
$this->events = $events;
26+
}
27+
28+
public function recv(): Observable
29+
{
30+
var_export([__FILE__, __LINE__, 'a']);
31+
$subject = new Subject();
32+
33+
// Call 1K times per second
34+
$timer = $this->loop->addPeriodicTimer(0.05, function () use (&$timer, $subject): void {
35+
try {
36+
while ($event = $this->events->poll()) {
37+
var_export([__FILE__, __LINE__, 'e', $event]);
38+
if ($event->type === Events\Event\Type::Read) {
39+
echo 'read', PHP_EOL;
40+
$subject->onNext($event->value);
41+
42+
break;
43+
}
44+
45+
if ($event->type !== Events\Event\Type::Close) {
46+
var_export([__FILE__, __LINE__, 'not:closed']);
47+
break;
48+
}
49+
50+
echo 'close', PHP_EOL;
51+
52+
$subject->onCompleted();
53+
54+
if ($timer instanceof TimerInterface) {
55+
echo 'timer:a', PHP_EOL;
56+
$this->loop->cancelTimer($timer);
57+
}
58+
59+
$subject->onCompleted();
60+
61+
return;
62+
}
63+
} catch (Events\Error\Timeout $timeout) {
64+
var_export([__FILE__, __LINE__, 'to']);
65+
return;
66+
} catch (Throwable $throwable) {
67+
var_export([__FILE__, __LINE__, 't']);
68+
echo $throwable;
69+
if ($timer instanceof TimerInterface) {
70+
echo 'timer:b', PHP_EOL;
71+
$this->loop->cancelTimer($timer);
72+
}
73+
74+
$subject->onError($throwable);
75+
}
76+
});
77+
78+
var_export([__FILE__, __LINE__, 'b']);
79+
return $subject;
80+
}
81+
}

tests/SingleRecvObservableTest.php

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace ReactParallel\Tests\Streams;
4+
5+
use parallel\Channel;
6+
use parallel\Events;
7+
use React\EventLoop\Factory;
8+
use React\Promise\ExtendedPromiseInterface;
9+
use ReactParallel\Streams\RecvObservable;
10+
use ReactParallel\Streams\SingleRecv;
11+
use WyriHaximus\AsyncTestUtilities\AsyncTestCase;
12+
use ReactParallel\FutureToPromiseConverter\FutureToPromiseConverter;
13+
use ReactParallel\Runtime\Runtime;
14+
use parallel\Runtime\Error\Closed;
15+
use function sleep;
16+
use function WyriHaximus\React\timedPromise;
17+
use function parallel\run;
18+
19+
/**
20+
* @internal
21+
*/
22+
final class SingleRecvObservableTest extends AsyncTestCase
23+
{
24+
/**
25+
* @test
26+
*/
27+
public function recv(): void
28+
{
29+
$d = bin2hex(random_bytes(13));
30+
31+
$loop = Factory::create();
32+
$channel = Channel::make($d, Channel::Infinite);
33+
$events = new Events();
34+
$events->setTimeout(0);
35+
$events->addChannel($channel);
36+
37+
$recvObservable = new RecvObservable($loop, $events);
38+
39+
run(function () use ($channel, $d): void {
40+
sleep(1);
41+
echo '1:send', PHP_EOL;
42+
$channel->send(1);
43+
echo '1:done', PHP_EOL;
44+
});
45+
run(function () use ($channel, $d): void {
46+
sleep(2);
47+
echo '2:send', PHP_EOL;
48+
$channel->send(2);
49+
echo '2:done', PHP_EOL;
50+
});
51+
run(function () use ($channel): void {
52+
sleep(3);
53+
echo '3:close', PHP_EOL;
54+
$channel->close();
55+
echo '3:done', PHP_EOL;
56+
});
57+
58+
$rd = $this->await($recvObservable->recv()->toArray()->toPromise(), $loop, 3.3);
59+
60+
self::assertSame([$d, $d], $rd);
61+
}
62+
}

tests/SingleRecvTest.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public function recv(): void
3838
$channel->send($d);
3939
});
4040

41-
$loop->run();
4241
$rd = $this->await($singleRecv->recv(), $loop, 3.3);
4342

4443
self::assertSame($d, $rd);
@@ -63,7 +62,6 @@ public function timedOut(): void
6362
$channel->close();
6463
});
6564

66-
$loop->run();
6765
$rd = $this->await($singleRecv->recv(), $loop, 3.3);
6866

6967
self::assertNull($rd);

0 commit comments

Comments
 (0)