Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions PubSub/PubSubInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace BeyondCode\LaravelWebSockets\PubSub;

use React\EventLoop\LoopInterface;

interface PubSubInterface
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add a new interface, so that we can implement other drivers such as zeroMQ or something similar. I don't need this functionality, but it will be asked for.

{
public function publish(string $appId, array $payload): bool;

public function subscribe(LoopInterface $loop): PubSubInterface;
}
118 changes: 118 additions & 0 deletions PubSub/Redis/RedisClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
<?php

namespace BeyondCode\LaravelWebSockets\PubSub\Redis;

use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use Clue\React\Block;
use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;
use Illuminate\Support\Str;
use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;

class RedisClient implements PubSubInterface
{

const REDIS_KEY = ':websockets:replication:';
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this configurable. Not sure how to keep this uncluttered in the config however.

protected $apps;
protected $loop;
protected $serverId;
protected $publishClient;
protected $subscribeClient;

public function __construct()
{
$this->apps = collect(config('websockets.apps'));
$this->serverId = Str::uuid()->toString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set a unique server id, so that we don't get double messages on the current server.

}

public function publish(string $appId, array $payload): bool
{
$payload['appId'] = $appId;
$payload['serverId'] = $this->serverId;
$this->publishClient->publish(self::REDIS_KEY, json_encode($payload));
return true;
}

public function subscribe(LoopInterface $loop): PubSubInterface
{
$this->loop = $loop;
[$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may wonder why we need both a publish connection and a subscribe connection.
clue/reactphp-redis#39 (comment)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I'm more wondering why you are blocking here. Is this bit ran inside the websocket server, or somewhere outside it in blocking PHP land?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't await, you will end up returning a promise. Which isn't okay if you want the publish client to end up in the container. So what we do here is block until the promises resolve on the loop, and then push the actual instances on the variables instead of promises. Hopefully that makes sense.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how are you restarting the loop? Or is this at the point where the loop hasn't been started yet by the server? Since await* functions stop the loop once the promises resolves/rejects.

Another option would be to do something like https://github.com/PHP-DI-Definitions/clue-redis-client/blob/master/src/WaitingClient.php (would pull out that client into another package if you wish to go that road) and fake the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is going to be slightly confusing, because I don't fully understand Ratchet. But my understanding is that there is an event loop that is always running that you can push processes onto. The loop is started from the console command here:

$this->loop = LoopFactory::create();

You don't need to restart the loop. Just block for a split second to get the redis connection promise to resolve, and then the loop continues.

Since this subscribe command only runs once, it will just connect when you boot up the websocket server, and then continue along in the background.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting client stand alone package is up at https://packagist.org/packages/wyrihaximus/react-redis-waiting-client I'll be adding integration tests for subscribing to that over the next few days

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, because it's clear the blocking approach was taken from my comment here: simonhamp/laravel-echo-ratchet-server#2 (comment), the reason I did it that way initially I think is just to lower the startup time by doing both connections at once. It's not strictly necessary. And like @snellingio said, it only ever happens right before the server is started, so there's no concern of the loop stopping.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting client stand alone package is up at packagist.org/packages/wyrihaximus/react-redis-waiting-client I'll be adding integration tests for subscribing to that over the next few days

A better alternative for that package is up: clue/reactphp-redis#82

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lazy client release is out. That should make some thing simpler: https://github.com/clue/reactphp-redis/releases/tag/v2.3.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @WyriHaximus

I'll try to pick up this PR in a bit, might make a new PR based on this one to finish it up.

return $this->publishClient;
}

protected function publishConnection(): PromiseInterface
{
$connectionUri = $this->getConnectionUri();
$factory = new Factory($this->loop);
return $factory->createClient($connectionUri)->then(
function (Client $client) {
$this->publishClient = $client;
return $this;
}
);
}


protected function subscribeConnection(): PromiseInterface
{
$connectionUri = $this->getConnectionUri();
$factory = new Factory($this->loop);
return $factory->createClient($connectionUri)->then(
function (Client $client) {
$this->subscribeClient = $client;
$this->onConnected();
return $this;
}
);
}

protected function getConnectionUri()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a test (and maybe a fallback), but it just pulls from the laravel app's config.

{
$name = config('websockets.replication.connection') ?? 'default';
$config = config('database.redis.' . $name);
$host = $config['host'];
$port = $config['port'] ? (':' . $config['port']) : ':6379';

$query = [];
if ($config['password']) {
$query['password'] = $config['password'];
}
if ($config['database']) {
$query['database'] = $config['database'];
}
$query = http_build_query($query);

return "redis://$host$port" . ($query ? '?' . $query : '');
}

protected function onConnected()
{
$this->subscribeClient->subscribe(self::REDIS_KEY);
$this->subscribeClient->on('message', function ($channel, $payload) {
$this->onMessage($channel, $payload);
});
}

protected function onMessage($channel, $payload)
{
$payload = json_decode($payload);

if ($this->serverId === $payload->serverId) {
return false;
}

/* @var $channelManager ChannelManager */
$channelManager = app(ChannelManager::class);
$channelSearch = $channelManager->find($payload->appId, $payload->channel);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love how we need to pass in an appId, but this is the only way to get the correct channel. We probably need to unset the appId and the channel from the payload before broadcasting. I considered doing a new Message or Envelope class to hold the payload logic, and never got around to doing it.


if ($channelSearch === null) {
return false;
}

$channel->broadcast($payload);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure what the difference between broadcast, broadcastToEveryone, and broadcastToEveryoneExcept. But calling broadcast seems to work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to support this feature: https://laravel.com/docs/5.7/broadcasting#broadcasting-events

If a socketId is provided in the payload, then broadcastToEveryoneExcept should be called. Essentially the point of this is for example in a chat client, a user sent out a message, there's no reason to send an event about that message back to the same user who just sent it, since the frontend already has that message in its state. It's just a minor optimization that makes it send N-1 messages instead of N (where N is number of connections in the channel)

return true;
}

}
2 changes: 2 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
"php": "^7.1",
"ext-json": "*",
"cboden/ratchet": "^0.4.1",
"clue/block-react": "^1.3",
"clue/buzz-react": "^2.5",
"clue/redis-react": "^2.2",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we can't use the default redis driver. We could probably use it for publishing, but not for subscribing. Reason is that it is blocking :(

"guzzlehttp/psr7": "^1.5",
"illuminate/broadcasting": "5.7.*",
"illuminate/console": "5.7.*",
Expand Down
14 changes: 14 additions & 0 deletions config/websockets.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@
'passphrase' => null,
],

/*
* You can enable replication to publish and subscribe to messages across the driver
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to keep the config light, but this is something we should probably expand on.

*/
'replication' => [
'enabled' => false,

'driver' => 'redis',

'redis' => [
'connection' => 'default',
],
],


/*
* Channel Manager
* This class handles how channel persistence is handled.
Expand Down
22 changes: 22 additions & 0 deletions src/Console/StartWebSocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use React\Dns\Resolver\Factory as DnsFactory;
use React\Dns\Resolver\Resolver as ReactDnsResolver;
use BeyondCode\LaravelWebSockets\Statistics\DnsResolver;
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient;
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger;
Expand Down Expand Up @@ -44,6 +46,7 @@ public function handle()
->configureMessageLogger()
->configureConnectionLogger()
->registerEchoRoutes()
->configurePubSubReplication()
->startWebSocketServer();
}

Expand Down Expand Up @@ -127,6 +130,25 @@ protected function startWebSocketServer()
->run();
}


protected function configurePubSubReplication()
{
if (config('websockets.replication.enabled') !== true) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably allow for cli integration here as well.

return $this;
}

if (config('websockets.replication.driver') === 'redis') {
$connection = (new RedisClient())->subscribe($this->loop);
}

app()->singleton(PubSubInterface::class, function () use ($connection) {
return $connection;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can break if the driver isn't set to redis and replication is enabled. Maybe we should throw an exception if the connection isn't set.

});

return $this;
}


protected function getDnsResolver(): ReactDnsResolver
{
if (! config('websockets.statistics.perform_dns_lookup')) {
Expand Down
2 changes: 1 addition & 1 deletion src/HttpApi/Controllers/TriggerEventController.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function __invoke(Request $request)
'channel' => $channelName,
'event' => $request->json()->get('name'),
'data' => $request->json()->get('data'),
], $request->json()->get('socket_id'));
], $request->json()->get('socket_id'), $request->appId);

DashboardLogger::apiMessage(
$request->appId,
Expand Down
9 changes: 7 additions & 2 deletions src/WebSockets/Channels/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use stdClass;
use Ratchet\ConnectionInterface;
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;

Expand Down Expand Up @@ -87,11 +88,15 @@ public function broadcast($payload)

public function broadcastToOthers(ConnectionInterface $connection, $payload)
{
$this->broadcastToEveryoneExcept($payload, $connection->socketId);
$this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id);
}

public function broadcastToEveryoneExcept($payload, ?string $socketId = null)
public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I don't love adding the appId here, but I didn't see another easy way to get it into the class.

{
if (config('websockets.replication.enabled') === true) {
app()->get(PubSubInterface::class)->publish($appId, $payload);
}

if (is_null($socketId)) {
return $this->broadcast($payload);
}
Expand Down