10
10
use React \EventLoop \Loop ;
11
11
use React \EventLoop \TimerInterface ;
12
12
use React \Promise \Deferred ;
13
- use Rx \Subject \Subject ;
14
13
use WyriHaximus \Metrics \Label ;
15
14
16
15
use function count ;
17
16
use function React \Async \await ;
18
17
use function spl_object_hash ;
19
18
use function spl_object_id ;
20
- use function WyriHaximus \React \awaitObservable ;
21
19
22
20
use const WyriHaximus \Constants \Numeric \ZERO ;
23
21
@@ -40,7 +38,7 @@ final class EventLoopBridge
40
38
41
39
private TimerInterface |null $ timer = null ;
42
40
43
- /** @var array<int, Subject > */
41
+ /** @var array<int, Stream > */
44
42
private array $ channels = [];
45
43
46
44
/** @var array<int, Deferred> */
@@ -68,8 +66,7 @@ public function withMetrics(Metrics $metrics): self
68
66
/** @return iterable<mixed> */
69
67
public function observe (Channel $ channel ): iterable
70
68
{
71
- $ subject = new Subject ();
72
- $ this ->channels [spl_object_id ($ channel )] = $ subject ;
69
+ $ this ->channels [spl_object_id ($ channel )] = new Stream ();
73
70
$ this ->events ->addChannel ($ channel );
74
71
75
72
if ($ this ->metrics instanceof Metrics) {
@@ -78,7 +75,7 @@ public function observe(Channel $channel): iterable
78
75
79
76
$ this ->startTimer ();
80
77
81
- return awaitObservable ( $ subject );
78
+ yield from $ this -> channels [ spl_object_id ( $ channel )]-> iterable ( );
82
79
}
83
80
84
81
public function await (Future $ future ): mixed
@@ -228,7 +225,7 @@ private function handleFutureReadEvent(Events\Event $event): void
228
225
229
226
private function handleChannelReadEvent (Events \Event $ event ): void
230
227
{
231
- $ this ->channels [spl_object_id ($ event ->object )]->onNext ($ event ->value );
228
+ $ this ->channels [spl_object_id ($ event ->object )]->value ($ event ->value );
232
229
$ this ->events ->addChannel ($ event ->object ); /** @phpstan-ignore-line */
233
230
234
231
if (! ($ this ->metrics instanceof Metrics)) {
@@ -240,7 +237,7 @@ private function handleChannelReadEvent(Events\Event $event): void
240
237
241
238
private function handleCloseEvent (Events \Event $ event ): void
242
239
{
243
- $ this ->channels [spl_object_id ($ event ->object )]->onCompleted ();
240
+ $ this ->channels [spl_object_id ($ event ->object )]->done ();
244
241
unset($ this ->channels [spl_object_id ($ event ->object )]);
245
242
246
243
if (! ($ this ->metrics instanceof Metrics)) {
0 commit comments