diff --git a/README.md b/README.md index 10e3b7a..a60df90 100644 --- a/README.md +++ b/README.md @@ -23,39 +23,37 @@ event loop, that will work, but it adds additional overhead when you have more t different major contexts. Share this bridge around so that other packages can use them, and only have one instance checking for events. -```php -use React\EventLoop\Factory; -use ReactParallel\EventLoop\EventLoopBridge; - -$loop = Factory::create(); -$eventLoopBridge = new EventLoopBridge($loop); - -$loop->run(); -``` - ## Channels Channels often have a stream of messages going over them, as such the bridge will convert them into an observable. ```php use parallel\Channel; -use React\EventLoop\Factory; +use React\EventLoop\Loop; use ReactParallel\EventLoop\EventLoopBridge; - -$loop = Factory::create(); -$eventLoopBridge = new EventLoopBridge($loop); - -$channel = new Channel(Channel::Infinite); -$eventLoopBridge->observe($channel)->subscribe(function (string $message) { - echo $message, PHP_EOL; -}); - -$loop->futureTick(function () use ($channel): void { - $channel->send('Hello World!'); - $channel->close(); -}); - -$loop->run(); +use function React\Async\async; +use function React\Async\await; +use function React\Promise\Timer\sleep; + +$eventLoopBridge = new EventLoopBridge(); + +Loop::futureTick(async(static function () use ($eventLoopBridge) { + /** @var Channel */ + $channel = new Channel(Channel::Infinite); + + Loop::futureTick(async(function () use ($channel): void { + $channel->send('Hello World!'); + // Don't close the channel right after writing to it, + // as it will be closed on both ends and the other + // thread won't receive your message + await(sleep(1)); + $channel->close(); + })); + + foreach ($eventLoopBridge->observe($channel) as $message) { + echo $message, PHP_EOL; + } +})); ``` ## Futures @@ -64,24 +62,20 @@ Where promises are push, futures are pull, as such the event loop will poll and available. ```php -use parallel\Channel; -use React\EventLoop\Factory; +use React\EventLoop\Loop; use ReactParallel\EventLoop\EventLoopBridge; use function parallel\run; +use function React\Async\async; -$loop = Factory::create(); -$eventLoopBridge = new EventLoopBridge($loop); - -$future = run(function (): string { - return 'Hello World!'; -}); +$eventLoopBridge = new EventLoopBridge(); -$channel = new Channel(Channel::Infinite); -$eventLoopBridge->await($future)->then(function (string $message) { - echo $message, PHP_EOL; -}); +Loop::futureTick(async(static function () use ($eventLoopBridge) { + $future = run(function (): string { + return 'Hello World!'; + }); -$loop->run(); + echo $eventLoopBridge->await($future), PHP_EOL; +})); ``` ## Metrics @@ -105,7 +99,7 @@ Please see [CONTRIBUTING](CONTRIBUTING.md) for details. ## License ## -Copyright 2024 [Cees-Jan Kiewiet](http://wyrihaximus.net/) +Copyright 2025 [Cees-Jan Kiewiet](http://wyrihaximus.net/) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation diff --git a/examples/channel.php b/examples/channel.php new file mode 100644 index 0000000..0fec80a --- /dev/null +++ b/examples/channel.php @@ -0,0 +1,30 @@ + */ + $channel = new Channel(Channel::Infinite); + + Loop::futureTick(async(function () use ($channel): void { + $channel->send('Hello World!'); + // Don't close the channel right after writing to it, + // as it will be closed on both ends and the other + // thread won't receive your message + await(sleep(1)); + $channel->close(); + })); + + foreach ($eventLoopBridge->observe($channel) as $message) { + echo $message, PHP_EOL; + } +})); diff --git a/examples/future.php b/examples/future.php new file mode 100644 index 0000000..d6123cd --- /dev/null +++ b/examples/future.php @@ -0,0 +1,18 @@ +await($future), PHP_EOL; +})); diff --git a/src/Stream.php b/src/Stream.php index 86d09df..f90cb4a 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -24,7 +24,7 @@ final class Stream implements StreamInterface public function __construct() { $this->queue = new SplQueue(); - $this->queue->setIteratorMode(SplQueue::IT_MODE_DELETE); + $this->queue->setIteratorMode(SplQueue::IT_MODE_DELETE | SplQueue::IT_MODE_DELETE); /** @psalm-suppress MixedPropertyTypeCoercion */ $this->wait = new Deferred(); }