diff --git a/composer.json b/composer.json index 06a9d11e4e..33c4550f44 100644 --- a/composer.json +++ b/composer.json @@ -41,6 +41,7 @@ "illuminate/broadcasting": "^6.3|^7.0|^8.0", "illuminate/console": "^6.3|^7.0|^8.0", "illuminate/http": "^6.3|^7.0|^8.0", + "illuminate/queue": "^6.3|^7.0|^8.0", "illuminate/routing": "^6.3|^7.0|^8.0", "illuminate/support": "^6.3|^7.0|^8.0", "pusher/pusher-php-server": "^3.0|^4.0", diff --git a/config/websockets.php b/config/websockets.php index 9dcd4f6826..9bb34b4d34 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -137,7 +137,7 @@ 'redis' => [ - 'connection' => 'default', + 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), /* |-------------------------------------------------------------------------- diff --git a/docs/horizontal-scaling/redis.md b/docs/horizontal-scaling/redis.md index 4f6383583b..86759db300 100644 --- a/docs/horizontal-scaling/redis.md +++ b/docs/horizontal-scaling/redis.md @@ -40,3 +40,29 @@ You can set the connection name to the Redis database under `redis`: ``` The connections can be found in your `config/database.php` file, under the `redis` key. + +## Async Redis Queue + +The default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server. + +To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example. + +Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver: + +```php +'connections' => [ + 'async-redis' => [ + 'driver' => 'async-redis', + 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), + 'queue' => env('REDIS_QUEUE', 'default'), + 'retry_after' => 90, + 'block_for' => null, + ], +] +``` + +Also, make sure that the default queue driver is set to `async-redis`: + +``` +QUEUE_CONNECTION=async-redis +``` diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index c099bbfeb2..51a6d59e88 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -519,6 +519,16 @@ public function getPublishClient() return $this->publishClient; } + /** + * Get the Redis client used by other classes. + * + * @return Client + */ + public function getRedisClient() + { + return $this->getPublishClient(); + } + /** * Get the unique identifier for the server. * diff --git a/src/Queue/AsyncRedisConnector.php b/src/Queue/AsyncRedisConnector.php new file mode 100644 index 0000000000..ac730c306c --- /dev/null +++ b/src/Queue/AsyncRedisConnector.php @@ -0,0 +1,24 @@ +redis, $config['queue'], + $config['connection'] ?? $this->connection, + $config['retry_after'] ?? 60, + $config['block_for'] ?? null + ); + } +} diff --git a/src/Queue/AsyncRedisQueue.php b/src/Queue/AsyncRedisQueue.php new file mode 100644 index 0000000000..6f9874da29 --- /dev/null +++ b/src/Queue/AsyncRedisQueue.php @@ -0,0 +1,25 @@ +container->bound(ChannelManager::class) + ? $this->container->make(ChannelManager::class) + : null; + + return $channelManager && method_exists($channelManager, 'getRedisClient') + ? $channelManager->getRedisClient() + : parent::getConnection(); + } +} diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index e498c11934..f513caa473 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -11,6 +11,7 @@ use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard; use BeyondCode\LaravelWebSockets\Server\Router; use Illuminate\Support\Facades\Gate; +use Illuminate\Support\Facades\Queue; use Illuminate\Support\Facades\Route; use Illuminate\Support\ServiceProvider; @@ -36,6 +37,12 @@ public function boot() __DIR__.'/../database/migrations/0000_00_00_000000_rename_statistics_counters.php' => database_path('migrations/0000_00_00_000000_rename_statistics_counters.php'), ], 'migrations'); + $this->registerAsyncRedisQueueDriver(); + + $this->registerRouter(); + + $this->registerManagers(); + $this->registerStatistics(); $this->registerDashboard(); @@ -50,8 +57,19 @@ public function boot() */ public function register() { - $this->registerRouter(); - $this->registerManagers(); + // + } + + /** + * Register the async, non-blocking Redis queue driver. + * + * @return void + */ + protected function registerAsyncRedisQueueDriver() + { + Queue::extend('async-redis', function () { + return new Queue\AsyncRedisConnector($this->app['redis']); + }); } /** diff --git a/tests/AsyncRedisQueueTest.php b/tests/AsyncRedisQueueTest.php new file mode 100644 index 0000000000..89db9cd4f5 --- /dev/null +++ b/tests/AsyncRedisQueueTest.php @@ -0,0 +1,213 @@ +runOnlyOnRedisReplication(); + + $connector = new AsyncRedisConnector($this->app['redis'], 'default'); + + $this->queue = $connector->connect([ + 'queue' => 'default', + 'retry_after' => 60, + 'block_for' => null, + ]); + + $this->queue->setContainer($this->app); + } + + /** + * {@inheritdoc} + */ + protected function tearDown(): void + { + parent::tearDown(); + + m::close(); + } + + public function test_expired_jobs_are_pushed_with_async_and_popped_with_sync() + { + $jobs = [ + new RedisQueueIntegrationTestJob(0), + new RedisQueueIntegrationTestJob(1), + new RedisQueueIntegrationTestJob(2), + new RedisQueueIntegrationTestJob(3), + ]; + + $this->queue->later(1000, $jobs[0]); + $this->queue->later(-200, $jobs[1]); + $this->queue->later(-300, $jobs[2]); + $this->queue->later(-100, $jobs[3]); + + $this->getPublishClient() + ->zcard('queues:default:delayed') + ->then(function ($count) { + $this->assertEquals(4, $count); + }); + + $this->unregisterManagers(); + + $this->assertEquals($jobs[2], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $this->assertNull($this->queue->pop()); + + $this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed')); + $this->assertEquals(3, $this->app['redis']->connection()->zcard('queues:default:reserved')); + } + + public function test_jobs_are_pushed_with_async_and_released_with_sync() + { + $this->queue->push( + $job = new RedisQueueIntegrationTestJob(30) + ); + + $this->unregisterManagers(); + + $this->getPublishClient() + ->assertCalledCount(1, 'eval'); + + $redisJob = $this->queue->pop(); + + $before = $this->currentTime(); + + $redisJob->release(1000); + + $after = $this->currentTime(); + + // check the content of delayed queue + $this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed')); + + $results = $this->app['redis']->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['withscores' => true]); + + $payload = array_keys($results)[0]; + + $score = $results[$payload]; + + $this->assertGreaterThanOrEqual($before + 1000, $score); + $this->assertLessThanOrEqual($after + 1000, $score); + + $decoded = json_decode($payload); + + $this->assertEquals(1, $decoded->attempts); + $this->assertEquals($job, unserialize($decoded->data->command)); + + $this->assertNull($this->queue->pop()); + } + + public function test_jobs_are_pushed_with_async_and_deleted_with_sync() + { + $this->queue->push( + $job = new RedisQueueIntegrationTestJob(30) + ); + + $this->unregisterManagers(); + + $this->getPublishClient() + ->assertCalledCount(1, 'eval'); + + $redisJob = $this->queue->pop(); + + $redisJob->delete(); + + $this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:delayed')); + $this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:reserved')); + $this->assertEquals(0, $this->app['redis']->connection()->llen('queues:default')); + + $this->assertNull($this->queue->pop()); + } + + public function test_jobs_are_pushed_with_async_and_cleared_with_sync() + { + if (! method_exists($this->queue, 'clear')) { + $this->markTestSkipped('The Queue has no clear() method to test.'); + } + + $job1 = new RedisQueueIntegrationTestJob(30); + $job2 = new RedisQueueIntegrationTestJob(40); + + $this->queue->push($job1); + $this->queue->push($job2); + + $this->getPublishClient() + ->assertCalledCount(2, 'eval'); + + $this->unregisterManagers(); + + $this->assertEquals(2, $this->queue->clear(null)); + $this->assertEquals(0, $this->queue->size()); + } + + public function test_jobs_are_pushed_with_async_and_size_reflects_in_async_size() + { + $this->queue->size()->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->queue->push(new RedisQueueIntegrationTestJob(1)); + + $this->queue->size()->then(function ($count) { + $this->assertEquals(1, $count); + }); + + $this->queue->later(60, new RedisQueueIntegrationTestJob(2)); + + $this->queue->size()->then(function ($count) { + $this->assertEquals(2, $count); + }); + + $this->queue->push(new RedisQueueIntegrationTestJob(3)); + + $this->queue->size()->then(function ($count) { + $this->assertEquals(3, $count); + }); + + $this->unregisterManagers(); + + $job = $this->queue->pop(); + + $this->registerManagers(); + + $this->queue->size()->then(function ($count) { + $this->assertEquals(3, $count); + }); + } +} + +class RedisQueueIntegrationTestJob +{ + public $i; + + public function __construct($i) + { + $this->i = $i; + } + + public function handle() + { + // + } +} diff --git a/tests/Mocks/LazyClient.php b/tests/Mocks/LazyClient.php index abd07ced3e..539e7db413 100644 --- a/tests/Mocks/LazyClient.php +++ b/tests/Mocks/LazyClient.php @@ -57,7 +57,11 @@ public function __call($name, $args) $this->calls[] = [$name, $args]; if (! in_array($name, ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'onMessage'])) { - $this->redis->__call($name, $args); + if ($name === 'eval') { + $this->redis->{$name}(...$args); + } else { + $this->redis->__call($name, $args); + } } return new PromiseResolver( @@ -98,6 +102,26 @@ public function assertCalled($name) return $this; } + /** + * Check if the method got called. + * + * @param int $times + * @param string $name + * @return $this + */ + public function assertCalledCount(int $times, string $name) + { + $total = collect($this->getCalledFunctions())->filter(function ($function) use ($name) { + [$calledName, ] = $function; + + return $calledName === $name; + }); + + PHPUnit::assertCount($times, $total); + + return $this; + } + /** * Check if the method with args got called. * @@ -105,7 +129,7 @@ public function assertCalled($name) * @param array $args * @return $this */ - public function assertCalledWithArgs($name, array $args) + public function assertCalledWithArgs(string $name, array $args) { foreach ($this->getCalledFunctions() as $function) { [$calledName, $calledArgs] = $function; @@ -125,11 +149,12 @@ public function assertCalledWithArgs($name, array $args) /** * Check if the method with args got called an amount of times. * + * @param int $times * @param string $name * @param array $args * @return $this */ - public function assertCalledWithArgsCount($times = 1, $name, array $args) + public function assertCalledWithArgsCount(int $times, string $name, array $args) { $total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) { [$calledName, $calledArgs] = $function; @@ -148,7 +173,7 @@ public function assertCalledWithArgsCount($times = 1, $name, array $args) * @param string $name * @return $this */ - public function assertNotCalled($name) + public function assertNotCalled(string $name) { foreach ($this->getCalledFunctions() as $function) { [$calledName, ] = $function; @@ -172,7 +197,7 @@ public function assertNotCalled($name) * @param array $args * @return $this */ - public function assertNotCalledWithArgs($name, array $args) + public function assertNotCalledWithArgs(string $name, array $args) { foreach ($this->getCalledFunctions() as $function) { [$calledName, $calledArgs] = $function; @@ -192,11 +217,12 @@ public function assertNotCalledWithArgs($name, array $args) /** * Check if the method with args got called an amount of times. * + * @param int $times * @param string $name * @param array $args * @return $this */ - public function assertNotCalledWithArgsCount($times = 1, $name, array $args) + public function assertNotCalledWithArgsCount(int $times, string $name, array $args) { $total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) { [$calledName, $calledArgs] = $function; diff --git a/tests/TestCase.php b/tests/TestCase.php index 65447315d0..bcf7e287d6 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -124,21 +124,29 @@ public function getEnvironmentSetUp($app) 'prefix' => '', ]); - $app['config']->set( - 'broadcasting.connections.websockets', [ - 'driver' => 'pusher', - 'key' => 'TestKey', - 'secret' => 'TestSecret', - 'app_id' => '1234', - 'options' => [ - 'cluster' => 'mt1', - 'encrypted' => true, - 'host' => '127.0.0.1', - 'port' => 6001, - 'scheme' => 'http', - ], - ] - ); + $app['config']->set('broadcasting.connections.websockets', [ + 'driver' => 'pusher', + 'key' => 'TestKey', + 'secret' => 'TestSecret', + 'app_id' => '1234', + 'options' => [ + 'cluster' => 'mt1', + 'encrypted' => true, + 'host' => '127.0.0.1', + 'port' => 6001, + 'scheme' => 'http', + ], + ]); + + $app['config']->set('queue.default', 'async-redis'); + + $app['config']->set('queue.connections.async-redis', [ + 'driver' => 'async-redis', + 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), + 'queue' => env('REDIS_QUEUE', 'default'), + 'retry_after' => 90, + 'block_for' => null, + ]); $app['config']->set('auth.providers.users.model', Models\User::class); @@ -244,6 +252,16 @@ protected function registerManagers() $this->channelManager = $this->app->make(ChannelManager::class); } + /** + * Unregister the managers for testing purposes. + * + * @return void + */ + protected function unregisterManagers() + { + $this->app->offsetUnset(ChannelManager::class); + } + /** * Register the statistics collectors. *