Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"illuminate/broadcasting": "^6.3|^7.0|^8.0",
"illuminate/console": "^6.3|^7.0|^8.0",
"illuminate/http": "^6.3|^7.0|^8.0",
"illuminate/queue": "^6.3|^7.0|^8.0",
"illuminate/routing": "^6.3|^7.0|^8.0",
"illuminate/support": "^6.3|^7.0|^8.0",
"pusher/pusher-php-server": "^3.0|^4.0",
Expand Down
2 changes: 1 addition & 1 deletion config/websockets.php
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@

'redis' => [

'connection' => 'default',
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),

/*
|--------------------------------------------------------------------------
Expand Down
26 changes: 26 additions & 0 deletions docs/horizontal-scaling/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,29 @@ You can set the connection name to the Redis database under `redis`:
```

The connections can be found in your `config/database.php` file, under the `redis` key.

## Async Redis Queue

The default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server.

To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example.

Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver:

```php
'connections' => [
'async-redis' => [
'driver' => 'async-redis',
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
],
]
```

Also, make sure that the default queue driver is set to `async-redis`:

```
QUEUE_CONNECTION=async-redis
```
10 changes: 10 additions & 0 deletions src/ChannelManagers/RedisChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,16 @@ public function getPublishClient()
return $this->publishClient;
}

/**
* Get the Redis client used by other classes.
*
* @return Client
*/
public function getRedisClient()
{
return $this->getPublishClient();
}

/**
* Get the unique identifier for the server.
*
Expand Down
24 changes: 24 additions & 0 deletions src/Queue/AsyncRedisConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

namespace BeyondCode\LaravelWebSockets\Queue;

use Illuminate\Queue\Connectors\RedisConnector;

class AsyncRedisConnector extends RedisConnector
{
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new AsyncRedisQueue(
$this->redis, $config['queue'],
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60,
$config['block_for'] ?? null
);
}
}
25 changes: 25 additions & 0 deletions src/Queue/AsyncRedisQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace BeyondCode\LaravelWebSockets\Queue;

use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use Illuminate\Queue\RedisQueue;

class AsyncRedisQueue extends RedisQueue
{
/**
* Get the connection for the queue.
*
* @return \BeyondCode\LaravelWebSockets\Contracts\ChannelManager|\Illuminate\Redis\Connections\Connection
*/
public function getConnection()
{
$channelManager = $this->container->bound(ChannelManager::class)
? $this->container->make(ChannelManager::class)
: null;

return $channelManager && method_exists($channelManager, 'getRedisClient')
? $channelManager->getRedisClient()
: parent::getConnection();
}
}
22 changes: 20 additions & 2 deletions src/WebSocketsServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard;
use BeyondCode\LaravelWebSockets\Server\Router;
use Illuminate\Support\Facades\Gate;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\Facades\Route;
use Illuminate\Support\ServiceProvider;

Expand All @@ -36,6 +37,12 @@ public function boot()
__DIR__.'/../database/migrations/0000_00_00_000000_rename_statistics_counters.php' => database_path('migrations/0000_00_00_000000_rename_statistics_counters.php'),
], 'migrations');

$this->registerAsyncRedisQueueDriver();

$this->registerRouter();

$this->registerManagers();

$this->registerStatistics();

$this->registerDashboard();
Expand All @@ -50,8 +57,19 @@ public function boot()
*/
public function register()
{
$this->registerRouter();
$this->registerManagers();
//
}

/**
* Register the async, non-blocking Redis queue driver.
*
* @return void
*/
protected function registerAsyncRedisQueueDriver()
{
Queue::extend('async-redis', function () {
return new Queue\AsyncRedisConnector($this->app['redis']);
});
}

/**
Expand Down
213 changes: 213 additions & 0 deletions tests/AsyncRedisQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
<?php

namespace BeyondCode\LaravelWebSockets\Test;

use BeyondCode\LaravelWebSockets\Queue\AsyncRedisConnector;
use Illuminate\Queue\Queue;
use Illuminate\Support\InteractsWithTime;
use Mockery as m;

class AsyncRedisQueueTest extends TestCase
{
use InteractsWithTime;

/**
* The testing queue for Redis.
*
* @var \Illuminate\Queue\RedisQueue
*/
private $queue;

/**
* {@inheritdoc}
*/
public function setUp(): void
{
parent::setUp();

$this->runOnlyOnRedisReplication();

$connector = new AsyncRedisConnector($this->app['redis'], 'default');

$this->queue = $connector->connect([
'queue' => 'default',
'retry_after' => 60,
'block_for' => null,
]);

$this->queue->setContainer($this->app);
}

/**
* {@inheritdoc}
*/
protected function tearDown(): void
{
parent::tearDown();

m::close();
}

public function test_expired_jobs_are_pushed_with_async_and_popped_with_sync()
{
$jobs = [
new RedisQueueIntegrationTestJob(0),
new RedisQueueIntegrationTestJob(1),
new RedisQueueIntegrationTestJob(2),
new RedisQueueIntegrationTestJob(3),
];

$this->queue->later(1000, $jobs[0]);
$this->queue->later(-200, $jobs[1]);
$this->queue->later(-300, $jobs[2]);
$this->queue->later(-100, $jobs[3]);

$this->getPublishClient()
->zcard('queues:default:delayed')
->then(function ($count) {
$this->assertEquals(4, $count);
});

$this->unregisterManagers();

$this->assertEquals($jobs[2], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
$this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
$this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
$this->assertNull($this->queue->pop());

$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));
$this->assertEquals(3, $this->app['redis']->connection()->zcard('queues:default:reserved'));
}

public function test_jobs_are_pushed_with_async_and_released_with_sync()
{
$this->queue->push(
$job = new RedisQueueIntegrationTestJob(30)
);

$this->unregisterManagers();

$this->getPublishClient()
->assertCalledCount(1, 'eval');

$redisJob = $this->queue->pop();

$before = $this->currentTime();

$redisJob->release(1000);

$after = $this->currentTime();

// check the content of delayed queue
$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));

$results = $this->app['redis']->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['withscores' => true]);

$payload = array_keys($results)[0];

$score = $results[$payload];

$this->assertGreaterThanOrEqual($before + 1000, $score);
$this->assertLessThanOrEqual($after + 1000, $score);

$decoded = json_decode($payload);

$this->assertEquals(1, $decoded->attempts);
$this->assertEquals($job, unserialize($decoded->data->command));

$this->assertNull($this->queue->pop());
}

public function test_jobs_are_pushed_with_async_and_deleted_with_sync()
{
$this->queue->push(
$job = new RedisQueueIntegrationTestJob(30)
);

$this->unregisterManagers();

$this->getPublishClient()
->assertCalledCount(1, 'eval');

$redisJob = $this->queue->pop();

$redisJob->delete();

$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:delayed'));
$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:reserved'));
$this->assertEquals(0, $this->app['redis']->connection()->llen('queues:default'));

$this->assertNull($this->queue->pop());
}

public function test_jobs_are_pushed_with_async_and_cleared_with_sync()
{
if (! method_exists($this->queue, 'clear')) {
$this->markTestSkipped('The Queue has no clear() method to test.');
}

$job1 = new RedisQueueIntegrationTestJob(30);
$job2 = new RedisQueueIntegrationTestJob(40);

$this->queue->push($job1);
$this->queue->push($job2);

$this->getPublishClient()
->assertCalledCount(2, 'eval');

$this->unregisterManagers();

$this->assertEquals(2, $this->queue->clear(null));
$this->assertEquals(0, $this->queue->size());
}

public function test_jobs_are_pushed_with_async_and_size_reflects_in_async_size()
{
$this->queue->size()->then(function ($count) {
$this->assertEquals(0, $count);
});

$this->queue->push(new RedisQueueIntegrationTestJob(1));

$this->queue->size()->then(function ($count) {
$this->assertEquals(1, $count);
});

$this->queue->later(60, new RedisQueueIntegrationTestJob(2));

$this->queue->size()->then(function ($count) {
$this->assertEquals(2, $count);
});

$this->queue->push(new RedisQueueIntegrationTestJob(3));

$this->queue->size()->then(function ($count) {
$this->assertEquals(3, $count);
});

$this->unregisterManagers();

$job = $this->queue->pop();

$this->registerManagers();

$this->queue->size()->then(function ($count) {
$this->assertEquals(3, $count);
});
}
}

class RedisQueueIntegrationTestJob
{
public $i;

public function __construct($i)
{
$this->i = $i;
}

public function handle()
{
//
}
}
Loading