From b584d0cacba0117470da9d2fbcaec543143cbb61 Mon Sep 17 00:00:00 2001 From: Sam Snelling Date: Mon, 17 Dec 2018 06:12:53 -0600 Subject: [PATCH 01/30] Update with pub sub replication and redis driver --- PubSub/PubSubInterface.php | 12 ++ PubSub/Redis/RedisClient.php | 118 ++++++++++++++++++ composer.json | 2 + config/websockets.php | 14 +++ src/Console/StartWebSocketServer.php | 20 +++ .../Controllers/TriggerEventController.php | 2 +- src/WebSockets/Channels/Channel.php | 9 +- 7 files changed, 174 insertions(+), 3 deletions(-) create mode 100644 PubSub/PubSubInterface.php create mode 100644 PubSub/Redis/RedisClient.php diff --git a/PubSub/PubSubInterface.php b/PubSub/PubSubInterface.php new file mode 100644 index 0000000000..3a28ffc81c --- /dev/null +++ b/PubSub/PubSubInterface.php @@ -0,0 +1,12 @@ +apps = collect(config('websockets.apps')); + $this->serverId = Str::uuid()->toString(); + } + + public function publish(string $appId, array $payload): bool + { + $payload['appId'] = $appId; + $payload['serverId'] = $this->serverId; + $this->publishClient->publish(self::REDIS_KEY, json_encode($payload)); + return true; + } + + public function subscribe(LoopInterface $loop): PubSubInterface + { + $this->loop = $loop; + [$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop); + return $this->publishClient; + } + + protected function publishConnection(): PromiseInterface + { + $connectionUri = $this->getConnectionUri(); + $factory = new Factory($this->loop); + return $factory->createClient($connectionUri)->then( + function (Client $client) { + $this->publishClient = $client; + return $this; + } + ); + } + + + protected function subscribeConnection(): PromiseInterface + { + $connectionUri = $this->getConnectionUri(); + $factory = new Factory($this->loop); + return $factory->createClient($connectionUri)->then( + function (Client $client) { + $this->subscribeClient = $client; + $this->onConnected(); + return $this; + } + ); + } + + protected function getConnectionUri() + { + $name = config('websockets.replication.connection') ?? 'default'; + $config = config('database.redis.' . $name); + $host = $config['host']; + $port = $config['port'] ? (':' . $config['port']) : ':6379'; + + $query = []; + if ($config['password']) { + $query['password'] = $config['password']; + } + if ($config['database']) { + $query['database'] = $config['database']; + } + $query = http_build_query($query); + + return "redis://$host$port" . ($query ? '?' . $query : ''); + } + + protected function onConnected() + { + $this->subscribeClient->subscribe(self::REDIS_KEY); + $this->subscribeClient->on('message', function ($channel, $payload) { + $this->onMessage($channel, $payload); + }); + } + + protected function onMessage($channel, $payload) + { + $payload = json_decode($payload); + + if ($this->serverId === $payload->serverId) { + return false; + } + + /* @var $channelManager ChannelManager */ + $channelManager = app(ChannelManager::class); + $channelSearch = $channelManager->find($payload->appId, $payload->channel); + + if ($channelSearch === null) { + return false; + } + + $channel->broadcast($payload); + return true; + } + +} \ No newline at end of file diff --git a/composer.json b/composer.json index 87aa7ba307..8a2133ef58 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,9 @@ "php": "^7.1", "ext-json": "*", "cboden/ratchet": "^0.4.1", + "clue/block-react": "^1.3", "clue/buzz-react": "^2.5", + "clue/redis-react": "^2.2", "facade/ignition-contracts": "^1.0", "guzzlehttp/psr7": "^1.5", "illuminate/broadcasting": "5.7.* || 5.8.* || ^6.0", diff --git a/config/websockets.php b/config/websockets.php index 6a2e7f0379..f5b43e3c95 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -124,6 +124,20 @@ 'passphrase' => env('LARAVEL_WEBSOCKETS_SSL_PASSPHRASE', null), ], + /* + * You can enable replication to publish and subscribe to messages across the driver + */ + 'replication' => [ + 'enabled' => false, + + 'driver' => 'redis', + + 'redis' => [ + 'connection' => 'default', + ], + ], + + /* * Channel Manager * This class handles how channel persistence is handled. diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index a4e8ff219c..e014e29334 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -10,6 +10,8 @@ use React\EventLoop\Factory as LoopFactory; use React\Dns\Resolver\Factory as DnsFactory; use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; +use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface; +use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger; @@ -45,6 +47,7 @@ public function handle() ->configureConnectionLogger() ->registerEchoRoutes() ->registerCustomRoutes() + ->configurePubSubReplication() ->startWebSocketServer(); } @@ -135,6 +138,23 @@ protected function startWebSocketServer() ->run(); } + protected function configurePubSubReplication() + { + if (config('websockets.replication.enabled') !== true) { + return $this; + } + + if (config('websockets.replication.driver') === 'redis') { + $connection = (new RedisClient())->subscribe($this->loop); + } + + app()->singleton(PubSubInterface::class, function () use ($connection) { + return $connection; + }); + + return $this; + } + protected function getDnsResolver(): ResolverInterface { if (! config('websockets.statistics.perform_dns_lookup')) { diff --git a/src/HttpApi/Controllers/TriggerEventController.php b/src/HttpApi/Controllers/TriggerEventController.php index 076407126c..ee8bcb3e25 100644 --- a/src/HttpApi/Controllers/TriggerEventController.php +++ b/src/HttpApi/Controllers/TriggerEventController.php @@ -19,7 +19,7 @@ public function __invoke(Request $request) 'channel' => $channelName, 'event' => $request->json()->get('name'), 'data' => $request->json()->get('data'), - ], $request->json()->get('socket_id')); + ], $request->json()->get('socket_id'), $request->appId); DashboardLogger::apiMessage( $request->appId, diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 9415b0bebb..a6136d476a 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -5,6 +5,7 @@ use stdClass; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; +use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface; use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; @@ -88,11 +89,15 @@ public function broadcast($payload) public function broadcastToOthers(ConnectionInterface $connection, $payload) { - $this->broadcastToEveryoneExcept($payload, $connection->socketId); + $this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id); } - public function broadcastToEveryoneExcept($payload, ?string $socketId = null) + public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null) { + if (config('websockets.replication.enabled') === true) { + app()->get(PubSubInterface::class)->publish($appId, $payload); + } + if (is_null($socketId)) { return $this->broadcast($payload); } From c203d24469a1bf1c3f60431cbec8674bb6482931 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Sun, 24 Mar 2019 00:56:47 -0400 Subject: [PATCH 02/30] Clean up some typos, add some type hints, StyleCI fixes --- .gitignore | 3 +- config/websockets.php | 1 - src/Apps/ConfigAppProvider.php | 10 ++-- src/Console/StartWebSocketServer.php | 3 +- src/Facades/StatisticsLogger.php | 5 +- src/Facades/WebSocketsRouter.php | 5 +- src/HttpApi/Controllers/Controller.php | 51 +++++++++++-------- src/Server/Router.php | 2 +- src/WebSockets/Channels/Channel.php | 7 ++- .../ChannelManagers/ArrayChannelManager.php | 2 +- 10 files changed, 55 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index e45efd8d6f..4071d4e359 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ composer.lock docs vendor coverage -.phpunit.result.cache \ No newline at end of file +.phpunit.result.cache +.idea/ diff --git a/config/websockets.php b/config/websockets.php index f5b43e3c95..3826580c66 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -137,7 +137,6 @@ ], ], - /* * Channel Manager * This class handles how channel persistence is handled. diff --git a/src/Apps/ConfigAppProvider.php b/src/Apps/ConfigAppProvider.php index 0476abac29..b9b7ab7715 100644 --- a/src/Apps/ConfigAppProvider.php +++ b/src/Apps/ConfigAppProvider.php @@ -19,7 +19,7 @@ public function all(): array { return $this->apps ->map(function (array $appAttributes) { - return $this->instanciate($appAttributes); + return $this->instantiate($appAttributes); }) ->toArray(); } @@ -30,7 +30,7 @@ public function findById($appId): ?App ->apps ->firstWhere('id', $appId); - return $this->instanciate($appAttributes); + return $this->instantiate($appAttributes); } public function findByKey(string $appKey): ?App @@ -39,7 +39,7 @@ public function findByKey(string $appKey): ?App ->apps ->firstWhere('key', $appKey); - return $this->instanciate($appAttributes); + return $this->instantiate($appAttributes); } public function findBySecret(string $appSecret): ?App @@ -48,10 +48,10 @@ public function findBySecret(string $appSecret): ?App ->apps ->firstWhere('secret', $appSecret); - return $this->instanciate($appAttributes); + return $this->instantiate($appAttributes); } - protected function instanciate(?array $appAttributes): ?App + protected function instantiate(?array $appAttributes): ?App { if (! $appAttributes) { return null; diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index e014e29334..8a882a59b4 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -11,9 +11,10 @@ use React\Dns\Resolver\Factory as DnsFactory; use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface; -use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient; +use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; +use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient; use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger; use BeyondCode\LaravelWebSockets\Server\WebSocketServerFactory; use BeyondCode\LaravelWebSockets\Server\Logger\ConnectionLogger; diff --git a/src/Facades/StatisticsLogger.php b/src/Facades/StatisticsLogger.php index 858d63fe55..095d796258 100644 --- a/src/Facades/StatisticsLogger.php +++ b/src/Facades/StatisticsLogger.php @@ -5,7 +5,10 @@ use Illuminate\Support\Facades\Facade; use BeyondCode\LaravelWebSockets\Statistics\Logger\StatisticsLogger as StatisticsLoggerInterface; -/** @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger */ +/** + * @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger + * @mixin \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger + */ class StatisticsLogger extends Facade { protected static function getFacadeAccessor() diff --git a/src/Facades/WebSocketsRouter.php b/src/Facades/WebSocketsRouter.php index 2c7b75a423..925f6856e7 100644 --- a/src/Facades/WebSocketsRouter.php +++ b/src/Facades/WebSocketsRouter.php @@ -4,7 +4,10 @@ use Illuminate\Support\Facades\Facade; -/** @see \BeyondCode\LaravelWebSockets\Server\Router */ +/** + * @see \BeyondCode\LaravelWebSockets\Server\Router + * @mixin \BeyondCode\LaravelWebSockets\Server\Router + */ class WebSocketsRouter extends Facade { protected static function getFacadeAccessor() diff --git a/src/HttpApi/Controllers/Controller.php b/src/HttpApi/Controllers/Controller.php index 975e8ef47d..48ecb5d2bc 100644 --- a/src/HttpApi/Controllers/Controller.php +++ b/src/HttpApi/Controllers/Controller.php @@ -46,7 +46,11 @@ public function onOpen(ConnectionInterface $connection, RequestInterface $reques $this->requestBuffer = (string) $request->getBody(); - $this->checkContentLength($connection); + if (! $this->checkContentLength()) { + return; + } + + $this->handleRequest($connection); } protected function findContentLength(array $headers): int @@ -60,31 +64,38 @@ public function onMessage(ConnectionInterface $from, $msg) { $this->requestBuffer .= $msg; - $this->checkContentLength($from); + if (! $this->checkContentLength()) { + return; + } + + $this->handleRequest($from); } - protected function checkContentLength(ConnectionInterface $connection) + protected function checkContentLength() { - if (strlen($this->requestBuffer) === $this->contentLength) { - $serverRequest = (new ServerRequest( - $this->request->getMethod(), - $this->request->getUri(), - $this->request->getHeaders(), - $this->requestBuffer, - $this->request->getProtocolVersion() - ))->withQueryParams(QueryParameters::create($this->request)->all()); + return strlen($this->requestBuffer) !== $this->contentLength; + } - $laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest)); + protected function handleRequest(ConnectionInterface $connection) + { + $serverRequest = (new ServerRequest( + $this->request->getMethod(), + $this->request->getUri(), + $this->request->getHeaders(), + $this->requestBuffer, + $this->request->getProtocolVersion() + ))->withQueryParams(QueryParameters::create($this->request)->all()); - $this - ->ensureValidAppId($laravelRequest->appId) - ->ensureValidSignature($laravelRequest); + $laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest)); - $response = $this($laravelRequest); + $this + ->ensureValidAppId($laravelRequest->appId) + ->ensureValidSignature($laravelRequest); - $connection->send(JsonResponse::create($response)); - $connection->close(); - } + $response = $this($laravelRequest); + + $connection->send(JsonResponse::create($response)); + $connection->close(); } public function onClose(ConnectionInterface $connection) @@ -122,7 +133,7 @@ protected function ensureValidSignature(Request $request) /* * The `auth_signature` & `body_md5` parameters are not included when calculating the `auth_signature` value. * - * The `appId`, `appKey` & `channelName` parameters are actually route paramaters and are never supplied by the client. + * The `appId`, `appKey` & `channelName` parameters are actually route parameters and are never supplied by the client. */ $params = Arr::except($request->query(), ['auth_signature', 'body_md5', 'appId', 'appKey', 'channelName']); diff --git a/src/Server/Router.php b/src/Server/Router.php index 3ce668525e..1950acb8d9 100644 --- a/src/Server/Router.php +++ b/src/Server/Router.php @@ -94,7 +94,7 @@ protected function getRoute(string $method, string $uri, $action): Route * If the given action is a class that handles WebSockets, then it's not a regular * controller but a WebSocketHandler that needs to converted to a WsServer. * - * If the given action is a regular controller we'll just instanciate it. + * If the given action is a regular controller we'll just instantiate it. */ $action = is_subclass_of($action, MessageComponentInterface::class) ? $this->createWebSocketsServer($action) diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index a6136d476a..f050fb24c6 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -95,11 +95,14 @@ public function broadcastToOthers(ConnectionInterface $connection, $payload) public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null) { if (config('websockets.replication.enabled') === true) { - app()->get(PubSubInterface::class)->publish($appId, $payload); + // Also broadcast via the other websocket instances + app()->get(PubSubInterface::class) + ->publish($appId, $payload); } if (is_null($socketId)) { - return $this->broadcast($payload); + $this->broadcast($payload); + return; } foreach ($this->subscribedConnections as $connection) { diff --git a/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php b/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php index 9664c65842..34651600d4 100644 --- a/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php +++ b/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php @@ -15,7 +15,7 @@ class ArrayChannelManager implements ChannelManager /** @var string */ protected $appId; - /** @var array */ + /** @var Channel[][] */ protected $channels = []; public function findOrCreate(string $appId, string $channelName): Channel From e454f53eaaaaaa4f2e42fab65460eb556f626d47 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 25 Mar 2019 18:00:54 -0400 Subject: [PATCH 03/30] Initial implementation of Redis as a pub/sub backend, WIP TODO: - Presence channels need the user lists stored in Redis (tricky, requires a lot of changes and async code in HTTP controllers) - Channels in Redis should be scoped by the app ID --- PubSub/PubSubInterface.php | 12 -- PubSub/Redis/RedisClient.php | 118 ----------- composer.json | 3 +- src/Console/StartWebSocketServer.php | 11 +- src/HttpApi/Controllers/Controller.php | 8 +- src/PubSub/Redis/RedisClient.php | 204 ++++++++++++++++++++ src/PubSub/Redis/RedisPusherBroadcaster.php | 150 ++++++++++++++ src/PubSub/ReplicationInterface.php | 43 +++++ src/WebSockets/Channels/Channel.php | 26 ++- src/WebSocketsServiceProvider.php | 24 ++- 10 files changed, 448 insertions(+), 151 deletions(-) delete mode 100644 PubSub/PubSubInterface.php delete mode 100644 PubSub/Redis/RedisClient.php create mode 100644 src/PubSub/Redis/RedisClient.php create mode 100644 src/PubSub/Redis/RedisPusherBroadcaster.php create mode 100644 src/PubSub/ReplicationInterface.php diff --git a/PubSub/PubSubInterface.php b/PubSub/PubSubInterface.php deleted file mode 100644 index 3a28ffc81c..0000000000 --- a/PubSub/PubSubInterface.php +++ /dev/null @@ -1,12 +0,0 @@ -apps = collect(config('websockets.apps')); - $this->serverId = Str::uuid()->toString(); - } - - public function publish(string $appId, array $payload): bool - { - $payload['appId'] = $appId; - $payload['serverId'] = $this->serverId; - $this->publishClient->publish(self::REDIS_KEY, json_encode($payload)); - return true; - } - - public function subscribe(LoopInterface $loop): PubSubInterface - { - $this->loop = $loop; - [$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop); - return $this->publishClient; - } - - protected function publishConnection(): PromiseInterface - { - $connectionUri = $this->getConnectionUri(); - $factory = new Factory($this->loop); - return $factory->createClient($connectionUri)->then( - function (Client $client) { - $this->publishClient = $client; - return $this; - } - ); - } - - - protected function subscribeConnection(): PromiseInterface - { - $connectionUri = $this->getConnectionUri(); - $factory = new Factory($this->loop); - return $factory->createClient($connectionUri)->then( - function (Client $client) { - $this->subscribeClient = $client; - $this->onConnected(); - return $this; - } - ); - } - - protected function getConnectionUri() - { - $name = config('websockets.replication.connection') ?? 'default'; - $config = config('database.redis.' . $name); - $host = $config['host']; - $port = $config['port'] ? (':' . $config['port']) : ':6379'; - - $query = []; - if ($config['password']) { - $query['password'] = $config['password']; - } - if ($config['database']) { - $query['database'] = $config['database']; - } - $query = http_build_query($query); - - return "redis://$host$port" . ($query ? '?' . $query : ''); - } - - protected function onConnected() - { - $this->subscribeClient->subscribe(self::REDIS_KEY); - $this->subscribeClient->on('message', function ($channel, $payload) { - $this->onMessage($channel, $payload); - }); - } - - protected function onMessage($channel, $payload) - { - $payload = json_decode($payload); - - if ($this->serverId === $payload->serverId) { - return false; - } - - /* @var $channelManager ChannelManager */ - $channelManager = app(ChannelManager::class); - $channelSearch = $channelManager->find($payload->appId, $payload->channel); - - if ($channelSearch === null) { - return false; - } - - $channel->broadcast($payload); - return true; - } - -} \ No newline at end of file diff --git a/composer.json b/composer.json index 8a2133ef58..e21a3fc9ce 100644 --- a/composer.json +++ b/composer.json @@ -25,9 +25,8 @@ "php": "^7.1", "ext-json": "*", "cboden/ratchet": "^0.4.1", - "clue/block-react": "^1.3", "clue/buzz-react": "^2.5", - "clue/redis-react": "^2.2", + "clue/redis-react": "^2.3", "facade/ignition-contracts": "^1.0", "guzzlehttp/psr7": "^1.5", "illuminate/broadcasting": "5.7.* || 5.8.* || ^6.0", diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index 8a882a59b4..f92039c3f4 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -9,8 +9,7 @@ use React\Dns\Resolver\ResolverInterface; use React\EventLoop\Factory as LoopFactory; use React\Dns\Resolver\Factory as DnsFactory; -use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; -use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; @@ -146,13 +145,11 @@ protected function configurePubSubReplication() } if (config('websockets.replication.driver') === 'redis') { - $connection = (new RedisClient())->subscribe($this->loop); + app()->singleton(ReplicationInterface::class, function () { + return (new RedisClient())->boot($this->loop); + }); } - app()->singleton(PubSubInterface::class, function () use ($connection) { - return $connection; - }); - return $this; } diff --git a/src/HttpApi/Controllers/Controller.php b/src/HttpApi/Controllers/Controller.php index 48ecb5d2bc..863a5075e8 100644 --- a/src/HttpApi/Controllers/Controller.php +++ b/src/HttpApi/Controllers/Controller.php @@ -46,7 +46,7 @@ public function onOpen(ConnectionInterface $connection, RequestInterface $reques $this->requestBuffer = (string) $request->getBody(); - if (! $this->checkContentLength()) { + if (! $this->verifyContentLength()) { return; } @@ -64,16 +64,16 @@ public function onMessage(ConnectionInterface $from, $msg) { $this->requestBuffer .= $msg; - if (! $this->checkContentLength()) { + if (! $this->verifyContentLength()) { return; } $this->handleRequest($from); } - protected function checkContentLength() + protected function verifyContentLength() { - return strlen($this->requestBuffer) !== $this->contentLength; + return strlen($this->requestBuffer) === $this->contentLength; } protected function handleRequest(ConnectionInterface $connection) diff --git a/src/PubSub/Redis/RedisClient.php b/src/PubSub/Redis/RedisClient.php new file mode 100644 index 0000000000..a393ac10f0 --- /dev/null +++ b/src/PubSub/Redis/RedisClient.php @@ -0,0 +1,204 @@ +serverId = Str::uuid()->toString(); + } + + /** + * Boot the RedisClient, initializing the connections + * + * @param LoopInterface $loop + * @return ReplicationInterface + */ + public function boot(LoopInterface $loop): ReplicationInterface + { + $this->loop = $loop; + + $connectionUri = $this->getConnectionUri(); + $factory = new Factory($this->loop); + + $this->publishClient = $factory->createLazyClient($connectionUri); + $this->subscribeClient = $factory->createLazyClient($connectionUri); + + $this->subscribeClient->on('message', function ($channel, $payload) { + $this->onMessage($channel, $payload); + }); + + return $this; + } + + /** + * Handle a message received from Redis on a specific channel + * + * @param string $redisChannel + * @param string $payload + * @return bool + */ + protected function onMessage(string $redisChannel, string $payload) + { + $payload = json_decode($payload); + + // Ignore messages sent by ourselves + if (isset($payload->serverId) && $this->serverId === $payload->serverId) { + return false; + } + + // We need to put the channel name in the payload + $payload->channel = $redisChannel; + + /* @var $channelManager ChannelManager */ + $channelManager = app(ChannelManager::class); + + // Load the Channel instance, if any + $channel = $channelManager->find($payload->appId, $payload->channel); + if ($channel === null) { + return false; + } + + $socket = $payload->socket; + + // Remove the internal keys from the payload + unset($payload->socket); + unset($payload->serverId); + unset($payload->appId); + + // Push the message out to connected websocket clients + $channel->broadcastToEveryoneExcept($payload, $socket); + + return true; + } + + /** + * Subscribe to a channel on behalf of websocket user + * + * @param string $appId + * @param string $channel + * @return bool + */ + public function subscribe(string $appId, string $channel): bool + { + if (! isset($this->subscribedChannels[$channel])) { + // We're not subscribed to the channel yet, subscribe and set the count to 1 + $this->subscribeClient->__call('subscribe', [$channel]); + $this->subscribedChannels[$channel] = 1; + } else { + // Increment the subscribe count if we've already subscribed + $this->subscribedChannels[$channel]++; + } + + return true; + } + + /** + * Unsubscribe from a channel on behalf of a websocket user + * + * @param string $appId + * @param string $channel + * @return bool + */ + public function unsubscribe(string $appId, string $channel): bool + { + if (! isset($this->subscribedChannels[$channel])) { + return false; + } + + // Decrement the subscription count for this channel + $this->subscribedChannels[$channel]--; + + // If we no longer have subscriptions to that channel, unsubscribe + if ($this->subscribedChannels[$channel] < 1) { + $this->subscribeClient->__call('unsubscribe', [$channel]); + unset($this->subscribedChannels[$channel]); + } + + return true; + } + + /** + * Publish a message to a channel on behalf of a websocket user + * + * @param string $appId + * @param string $channel + * @param stdClass $payload + * @return bool + */ + public function publish(string $appId, string $channel, stdClass $payload): bool + { + $payload->appId = $appId; + $payload->serverId = $this->serverId; + + $this->publishClient->__call('publish', [$channel, json_encode($payload)]); + + return true; + } + + /** + * Build the Redis connection URL from Laravel database config + * + * @return string + */ + protected function getConnectionUri() + { + $name = config('websockets.replication.connection') ?? 'default'; + $config = config("database.redis.$name"); + $host = $config['host']; + $port = $config['port'] ? (':' . $config['port']) : ':6379'; + + $query = []; + if ($config['password']) { + $query['password'] = $config['password']; + } + if ($config['database']) { + $query['database'] = $config['database']; + } + $query = http_build_query($query); + + return "redis://$host$port".($query ? '?'.$query : ''); + } +} diff --git a/src/PubSub/Redis/RedisPusherBroadcaster.php b/src/PubSub/Redis/RedisPusherBroadcaster.php new file mode 100644 index 0000000000..6f88179c7d --- /dev/null +++ b/src/PubSub/Redis/RedisPusherBroadcaster.php @@ -0,0 +1,150 @@ +pusher = $pusher; + $this->appId = $appId; + $this->redis = $redis; + $this->connection = $connection; + } + + /** + * Authenticate the incoming request for a given channel. + * + * @param \Illuminate\Http\Request $request + * @return mixed + * + * @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException + */ + public function auth($request) + { + $channelName = $this->normalizeChannelName($request->channel_name); + + if ($this->isGuardedChannel($request->channel_name) && + ! $this->retrieveUser($request, $channelName)) { + throw new AccessDeniedHttpException; + } + + return parent::verifyUserCanAccessChannel( + $request, $channelName + ); + } + + /** + * Return the valid authentication response. + * + * @param \Illuminate\Http\Request $request + * @param mixed $result + * @return mixed + * @throws \Pusher\PusherException + */ + public function validAuthenticationResponse($request, $result) + { + if (Str::startsWith($request->channel_name, 'private')) { + return $this->decodePusherResponse( + $request, $this->pusher->socket_auth($request->channel_name, $request->socket_id) + ); + } + + $channelName = $this->normalizeChannelName($request->channel_name); + + return $this->decodePusherResponse( + $request, + $this->pusher->presence_auth( + $request->channel_name, $request->socket_id, + $this->retrieveUser($request, $channelName)->getAuthIdentifier(), $result + ) + ); + } + + /** + * Decode the given Pusher response. + * + * @param \Illuminate\Http\Request $request + * @param mixed $response + * @return array + */ + protected function decodePusherResponse($request, $response) + { + if (! $request->input('callback', false)) { + return json_decode($response, true); + } + + return response()->json(json_decode($response, true)) + ->withCallback($request->callback); + } + + /** + * Broadcast the given event. + * + * @param array $channels + * @param string $event + * @param array $payload + * @return void + */ + public function broadcast(array $channels, $event, array $payload = []) + { + $connection = $this->redis->connection($this->connection); + + $payload = json_encode([ + 'appId' => $this->appId, + 'event' => $event, + 'data' => $payload, + 'socket' => Arr::pull($payload, 'socket'), + ]); + + foreach ($this->formatChannels($channels) as $channel) { + $connection->publish($channel, $payload); + } + } +} diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php new file mode 100644 index 0000000000..f1049c42a0 --- /dev/null +++ b/src/PubSub/ReplicationInterface.php @@ -0,0 +1,43 @@ +saveConnection($connection); + if (config('websockets.replication.enabled') === true) { + // Subscribe for broadcasted messages from the pub/sub backend + app(ReplicationInterface::class) + ->subscribe($connection->app->id, $this->channelName); + } + $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', 'channel' => $this->channelName, @@ -62,6 +68,12 @@ public function unsubscribe(ConnectionInterface $connection) { unset($this->subscribedConnections[$connection->socketId]); + if (config('websockets.replication.enabled') === true) { + // Unsubscribe from the pub/sub backend + app(ReplicationInterface::class) + ->unsubscribe($connection->app->id, $this->channelName); + } + if (! $this->hasConnections()) { DashboardLogger::vacated($connection, $this->channelName); } @@ -89,17 +101,17 @@ public function broadcast($payload) public function broadcastToOthers(ConnectionInterface $connection, $payload) { + if (config('websockets.replication.enabled') === true) { + // Also broadcast via the other websocket servers + app(ReplicationInterface::class) + ->publish($connection->app->id, $payload); + } + $this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id); } public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null) { - if (config('websockets.replication.enabled') === true) { - // Also broadcast via the other websocket instances - app()->get(PubSubInterface::class) - ->publish($appId, $payload); - } - if (is_null($socketId)) { $this->broadcast($payload); return; diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index 9c57842096..9057a484ed 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -2,12 +2,16 @@ namespace BeyondCode\LaravelWebSockets; +use Pusher\Pusher; +use Psr\Log\LoggerInterface; use Illuminate\Support\Facades\Gate; use Illuminate\Support\Facades\Route; use Illuminate\Support\ServiceProvider; +use Illuminate\Broadcasting\BroadcastManager; use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Apps\AppProvider; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; +use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisPusherBroadcaster; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\AuthenticateDashboard; @@ -19,7 +23,7 @@ class WebSocketsServiceProvider extends ServiceProvider { - public function boot() + public function boot(BroadcastManager $broadcastManager) { $this->publishes([ __DIR__.'/../config/websockets.php' => base_path('config/websockets.php'), @@ -41,6 +45,24 @@ public function boot() Console\StartWebSocketServer::class, Console\CleanStatistics::class, ]); + + $broadcastManager->extend('redis-pusher', function(array $config) { + $pusher = new Pusher( + $config['key'], $config['secret'], + $config['app_id'], $config['options'] ?? [] + ); + + if ($config['log'] ?? false) { + $pusher->setLogger($this->app->make(LoggerInterface::class)); + } + + return new RedisPusherBroadcaster( + $pusher, + $config['app_id'], + $this->app->make('redis'), + $config['connection'] ?? null + ); + }); } public function register() From 668cd29df0b0bdc60f6884033f1433d083d0dfea Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 25 Mar 2019 18:37:14 -0400 Subject: [PATCH 04/30] Fix style issues reported by StyleCI --- src/Console/StartWebSocketServer.php | 2 +- src/PubSub/Redis/RedisClient.php | 14 +++++++------- src/PubSub/ReplicationInterface.php | 8 ++++---- src/WebSockets/Channels/Channel.php | 3 ++- src/WebSocketsServiceProvider.php | 2 +- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index f92039c3f4..d00e69f998 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -9,12 +9,12 @@ use React\Dns\Resolver\ResolverInterface; use React\EventLoop\Factory as LoopFactory; use React\Dns\Resolver\Factory as DnsFactory; -use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient; use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Server\WebSocketServerFactory; use BeyondCode\LaravelWebSockets\Server\Logger\ConnectionLogger; use BeyondCode\LaravelWebSockets\Server\Logger\WebsocketsLogger; diff --git a/src/PubSub/Redis/RedisClient.php b/src/PubSub/Redis/RedisClient.php index a393ac10f0..6634ecd1c4 100644 --- a/src/PubSub/Redis/RedisClient.php +++ b/src/PubSub/Redis/RedisClient.php @@ -51,7 +51,7 @@ public function __construct() } /** - * Boot the RedisClient, initializing the connections + * Boot the RedisClient, initializing the connections. * * @param LoopInterface $loop * @return ReplicationInterface @@ -74,7 +74,7 @@ public function boot(LoopInterface $loop): ReplicationInterface } /** - * Handle a message received from Redis on a specific channel + * Handle a message received from Redis on a specific channel. * * @param string $redisChannel * @param string $payload @@ -115,7 +115,7 @@ protected function onMessage(string $redisChannel, string $payload) } /** - * Subscribe to a channel on behalf of websocket user + * Subscribe to a channel on behalf of websocket user. * * @param string $appId * @param string $channel @@ -136,7 +136,7 @@ public function subscribe(string $appId, string $channel): bool } /** - * Unsubscribe from a channel on behalf of a websocket user + * Unsubscribe from a channel on behalf of a websocket user. * * @param string $appId * @param string $channel @@ -161,7 +161,7 @@ public function unsubscribe(string $appId, string $channel): bool } /** - * Publish a message to a channel on behalf of a websocket user + * Publish a message to a channel on behalf of a websocket user. * * @param string $appId * @param string $channel @@ -179,7 +179,7 @@ public function publish(string $appId, string $channel, stdClass $payload): bool } /** - * Build the Redis connection URL from Laravel database config + * Build the Redis connection URL from Laravel database config. * * @return string */ @@ -188,7 +188,7 @@ protected function getConnectionUri() $name = config('websockets.replication.connection') ?? 'default'; $config = config("database.redis.$name"); $host = $config['host']; - $port = $config['port'] ? (':' . $config['port']) : ':6379'; + $port = $config['port'] ? (':'.$config['port']) : ':6379'; $query = []; if ($config['password']) { diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index f1049c42a0..5131ea3764 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -8,7 +8,7 @@ interface ReplicationInterface { /** - * Boot the pub/sub provider (open connections, initial subscriptions, etc.) + * Boot the pub/sub provider (open connections, initial subscriptions, etc). * * @param LoopInterface $loop * @return self @@ -16,7 +16,7 @@ interface ReplicationInterface public function boot(LoopInterface $loop): self; /** - * Publish a payload on a specific channel, for a specific app + * Publish a payload on a specific channel, for a specific app. * * @param string $appId * @param string $channel @@ -26,7 +26,7 @@ public function boot(LoopInterface $loop): self; public function publish(string $appId, string $channel, stdClass $payload): bool; /** - * Subscribe to receive messages for a channel + * Subscribe to receive messages for a channel. * * @param string $channel * @return bool @@ -34,7 +34,7 @@ public function publish(string $appId, string $channel, stdClass $payload): bool public function subscribe(string $appId, string $channel): bool; /** - * Unsubscribe from a channel + * Unsubscribe from a channel. * * @param string $channel * @return bool diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index b033b48938..605a7db1b3 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -5,8 +5,8 @@ use stdClass; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; -use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; class Channel @@ -114,6 +114,7 @@ public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?s { if (is_null($socketId)) { $this->broadcast($payload); + return; } diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index 9057a484ed..558c8ef429 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -46,7 +46,7 @@ public function boot(BroadcastManager $broadcastManager) Console\CleanStatistics::class, ]); - $broadcastManager->extend('redis-pusher', function(array $config) { + $broadcastManager->extend('redis-pusher', function (array $config) { $pusher = new Pusher( $config['key'], $config['secret'], $config['app_id'], $config['options'] ?? [] From eca8c7b8466a8214a0d80d7a674f293d641af9a2 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Fri, 29 Mar 2019 10:22:36 -0400 Subject: [PATCH 05/30] Scope pub/sub channels in Redis by appId to avoid crosstalk between apps --- src/PubSub/Redis/RedisClient.php | 47 ++++++++++++--------- src/PubSub/Redis/RedisPusherBroadcaster.php | 2 +- src/WebSockets/Channels/Channel.php | 7 ++- 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/PubSub/Redis/RedisClient.php b/src/PubSub/Redis/RedisClient.php index 6634ecd1c4..9e6048cb39 100644 --- a/src/PubSub/Redis/RedisClient.php +++ b/src/PubSub/Redis/RedisClient.php @@ -78,7 +78,6 @@ public function boot(LoopInterface $loop): ReplicationInterface * * @param string $redisChannel * @param string $payload - * @return bool */ protected function onMessage(string $redisChannel, string $payload) { @@ -86,32 +85,38 @@ protected function onMessage(string $redisChannel, string $payload) // Ignore messages sent by ourselves if (isset($payload->serverId) && $this->serverId === $payload->serverId) { - return false; + return; } - // We need to put the channel name in the payload - $payload->channel = $redisChannel; + // Pull out the app ID. See RedisPusherBroadcaster + $appId = $payload->appId; + + // We need to put the channel name in the payload. + // We strip the app ID from the channel name, websocket clients + // expect the channel name to not include the app ID. + $payload->channel = Str::after($redisChannel, "$appId:"); /* @var $channelManager ChannelManager */ $channelManager = app(ChannelManager::class); // Load the Channel instance, if any - $channel = $channelManager->find($payload->appId, $payload->channel); - if ($channel === null) { - return false; + $channel = $channelManager->find($appId, $payload->channel); + + // If no channel is found, none of our connections want to + // receive this message, so we ignore it. + if (! $channel) { + return; } - $socket = $payload->socket; + $socket = $payload->socket ?? null; - // Remove the internal keys from the payload + // Remove fields intended for internal use from the payload unset($payload->socket); unset($payload->serverId); unset($payload->appId); // Push the message out to connected websocket clients $channel->broadcastToEveryoneExcept($payload, $socket); - - return true; } /** @@ -123,13 +128,13 @@ protected function onMessage(string $redisChannel, string $payload) */ public function subscribe(string $appId, string $channel): bool { - if (! isset($this->subscribedChannels[$channel])) { + if (! isset($this->subscribedChannels["$appId:$channel"])) { // We're not subscribed to the channel yet, subscribe and set the count to 1 - $this->subscribeClient->__call('subscribe', [$channel]); - $this->subscribedChannels[$channel] = 1; + $this->subscribeClient->__call('subscribe', ["$appId:$channel"]); + $this->subscribedChannels["$appId:$channel"] = 1; } else { // Increment the subscribe count if we've already subscribed - $this->subscribedChannels[$channel]++; + $this->subscribedChannels["$appId:$channel"]++; } return true; @@ -144,17 +149,17 @@ public function subscribe(string $appId, string $channel): bool */ public function unsubscribe(string $appId, string $channel): bool { - if (! isset($this->subscribedChannels[$channel])) { + if (! isset($this->subscribedChannels["$appId:$channel"])) { return false; } // Decrement the subscription count for this channel - $this->subscribedChannels[$channel]--; + $this->subscribedChannels["$appId:$channel"]--; // If we no longer have subscriptions to that channel, unsubscribe - if ($this->subscribedChannels[$channel] < 1) { - $this->subscribeClient->__call('unsubscribe', [$channel]); - unset($this->subscribedChannels[$channel]); + if ($this->subscribedChannels["$appId:$channel"] < 1) { + $this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]); + unset($this->subscribedChannels["$appId:$channel"]); } return true; @@ -173,7 +178,7 @@ public function publish(string $appId, string $channel, stdClass $payload): bool $payload->appId = $appId; $payload->serverId = $this->serverId; - $this->publishClient->__call('publish', [$channel, json_encode($payload)]); + $this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]); return true; } diff --git a/src/PubSub/Redis/RedisPusherBroadcaster.php b/src/PubSub/Redis/RedisPusherBroadcaster.php index 6f88179c7d..990591414f 100644 --- a/src/PubSub/Redis/RedisPusherBroadcaster.php +++ b/src/PubSub/Redis/RedisPusherBroadcaster.php @@ -144,7 +144,7 @@ public function broadcast(array $channels, $event, array $payload = []) ]); foreach ($this->formatChannels($channels) as $channel) { - $connection->publish($channel, $payload); + $connection->publish("{$this->appId}:$channel", $payload); } } } diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 605a7db1b3..9db18adc7d 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -107,11 +107,14 @@ public function broadcastToOthers(ConnectionInterface $connection, $payload) ->publish($connection->app->id, $payload); } - $this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id); + $this->broadcastToEveryoneExcept($payload, $connection->socketId); } - public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null) + public function broadcastToEveryoneExcept($payload, ?string $socketId = null) { + // Performance optimization, if we don't have a socket ID, + // then we avoid running the if condition in the foreach loop below + // by calling broadcast() instead. if (is_null($socketId)) { $this->broadcast($payload); From 87c00fb3404adb4a8955f1e839047a6845eaf6d6 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Fri, 29 Mar 2019 10:51:13 -0400 Subject: [PATCH 06/30] app() -> $this->laravel in StartWebSocketServer --- src/Console/StartWebSocketServer.php | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index d00e69f998..4b68be33d4 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -63,8 +63,8 @@ protected function configureStatisticsLogger() $browser = new Browser($this->loop, $connector); - app()->singleton(StatisticsLoggerInterface::class, function () use ($browser) { - return new HttpStatisticsLogger(app(ChannelManager::class), $browser); + $this->laravel->singleton(StatisticsLoggerInterface::class, function () use ($browser) { + return new HttpStatisticsLogger($this->laravel->make(ChannelManager::class), $browser); }); $this->loop->addPeriodicTimer(config('websockets.statistics.interval_in_seconds'), function () { @@ -76,7 +76,7 @@ protected function configureStatisticsLogger() protected function configureHttpLogger() { - app()->singleton(HttpLogger::class, function () { + $this->laravel->singleton(HttpLogger::class, function () { return (new HttpLogger($this->output)) ->enable($this->option('debug') ?: config('app.debug')) ->verbose($this->output->isVerbose()); @@ -87,7 +87,7 @@ protected function configureHttpLogger() protected function configureMessageLogger() { - app()->singleton(WebsocketsLogger::class, function () { + $this->laravel->singleton(WebsocketsLogger::class, function () { return (new WebsocketsLogger($this->output)) ->enable($this->option('debug') ?: config('app.debug')) ->verbose($this->output->isVerbose()); @@ -98,7 +98,7 @@ protected function configureMessageLogger() protected function configureConnectionLogger() { - app()->bind(ConnectionLogger::class, function () { + $this->laravel->bind(ConnectionLogger::class, function () { return (new ConnectionLogger($this->output)) ->enable(config('app.debug')) ->verbose($this->output->isVerbose()); @@ -145,7 +145,7 @@ protected function configurePubSubReplication() } if (config('websockets.replication.driver') === 'redis') { - app()->singleton(ReplicationInterface::class, function () { + $this->laravel->singleton(ReplicationInterface::class, function () { return (new RedisClient())->boot($this->loop); }); } From 4baac7ef00f6e638555c9295590ddd7d6cf9762f Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Fri, 29 Mar 2019 15:33:46 -0400 Subject: [PATCH 07/30] Implement presence channel storage in Redis --- src/HttpApi/Controllers/Controller.php | 18 ++- .../Controllers/FetchChannelsController.php | 38 +++++- .../Controllers/FetchUsersController.php | 16 ++- src/PubSub/Redis/RedisClient.php | 67 ++++++++++ src/PubSub/ReplicationInterface.php | 40 ++++++ src/WebSockets/Channels/Channel.php | 17 ++- src/WebSockets/Channels/PresenceChannel.php | 118 ++++++++++++++---- src/WebSockets/Channels/PrivateChannel.php | 4 + 8 files changed, 287 insertions(+), 31 deletions(-) diff --git a/src/HttpApi/Controllers/Controller.php b/src/HttpApi/Controllers/Controller.php index 863a5075e8..7be3d89ce5 100644 --- a/src/HttpApi/Controllers/Controller.php +++ b/src/HttpApi/Controllers/Controller.php @@ -11,6 +11,7 @@ use Illuminate\Http\JsonResponse; use GuzzleHttp\Psr7\ServerRequest; use Illuminate\Support\Collection; +use React\Promise\PromiseInterface; use Ratchet\Http\HttpServerInterface; use Psr\Http\Message\RequestInterface; use BeyondCode\LaravelWebSockets\Apps\App; @@ -30,7 +31,7 @@ abstract class Controller implements HttpServerInterface /** @var int */ protected $contentLength; - /** @var \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager */ + /** @var ChannelManager */ protected $channelManager; public function __construct(ChannelManager $channelManager) @@ -92,8 +93,23 @@ protected function handleRequest(ConnectionInterface $connection) ->ensureValidAppId($laravelRequest->appId) ->ensureValidSignature($laravelRequest); + // Invoke the controller action $response = $this($laravelRequest); + // Allow for async IO in the controller action + if ($response instanceof PromiseInterface) { + $response->then(function ($response) use ($connection) { + $this->sendAndClose($connection, $response); + }); + + return; + } + + $this->sendAndClose($connection, $response); + } + + protected function sendAndClose(ConnectionInterface $connection, $response) + { $connection->send(JsonResponse::create($response)); $connection->close(); } diff --git a/src/HttpApi/Controllers/FetchChannelsController.php b/src/HttpApi/Controllers/FetchChannelsController.php index c57efe7520..73a82894d0 100644 --- a/src/HttpApi/Controllers/FetchChannelsController.php +++ b/src/HttpApi/Controllers/FetchChannelsController.php @@ -5,6 +5,9 @@ use Illuminate\Support\Str; use Illuminate\Http\Request; use Illuminate\Support\Collection; +use React\Promise\PromiseInterface; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; +use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel; use Symfony\Component\HttpKernel\Exception\HttpException; class FetchChannelsController extends Controller @@ -29,13 +32,42 @@ public function __invoke(Request $request) }); } + if (config('websockets.replication.enabled') === true) { + // We want to get the channel user count all in one shot when + // using a replication backend rather than doing individual queries. + // To do so, we first collect the list of channel names. + $channelNames = $channels->map(function (PresenceChannel $channel) use ($request) { + return $channel->getChannelName(); + })->toArray(); + + /** @var PromiseInterface $memberCounts */ + // We ask the replication backend to get us the member count per channel + $memberCounts = app(ReplicationInterface::class) + ->channelMemberCounts($request->appId, $channelNames); + + // We return a promise since the backend runs async. We get $counts back + // as a key-value array of channel names and their member count. + return $memberCounts->then(function (array $counts) use ($channels) { + return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) { + return $counts[$channel->getChannelName()]; + }); + }); + } + + return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) { + return $channel->getUserCount(); + }); + } + + protected function collectUserCounts(Collection $channels, array $attributes, callable $transformer) + { return [ - 'channels' => $channels->map(function ($channel) use ($attributes) { + 'channels' => $channels->map(function (PresenceChannel $channel) use ($transformer, $attributes) { $info = new \stdClass; if (in_array('user_count', $attributes)) { - $info->user_count = count($channel->getUsers()); + $info->user_count = $transformer($channel); } - + return $info; })->toArray() ?: new \stdClass, ]; diff --git a/src/HttpApi/Controllers/FetchUsersController.php b/src/HttpApi/Controllers/FetchUsersController.php index 87960e44f9..3d7ced71ae 100644 --- a/src/HttpApi/Controllers/FetchUsersController.php +++ b/src/HttpApi/Controllers/FetchUsersController.php @@ -4,6 +4,7 @@ use Illuminate\Http\Request; use Illuminate\Support\Collection; +use React\Promise\PromiseInterface; use Symfony\Component\HttpKernel\Exception\HttpException; use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel; @@ -21,8 +22,21 @@ public function __invoke(Request $request) throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"'); } + $users = $channel->getUsers($request->appId); + + if ($users instanceof PromiseInterface) { + return $users->then(function (array $users) { + return $this->collectUsers($users); + }); + } + + return $this->collectUsers($users); + } + + protected function collectUsers(array $users) + { return [ - 'users' => Collection::make($channel->getUsers())->map(function ($user) { + 'users' => Collection::make($users)->map(function ($user) { return ['id' => $user->user_id]; })->values(), ]; diff --git a/src/PubSub/Redis/RedisClient.php b/src/PubSub/Redis/RedisClient.php index 9e6048cb39..a2ea8dbb95 100644 --- a/src/PubSub/Redis/RedisClient.php +++ b/src/PubSub/Redis/RedisClient.php @@ -7,6 +7,7 @@ use Clue\React\Redis\Client; use Clue\React\Redis\Factory; use React\EventLoop\LoopInterface; +use React\Promise\PromiseInterface; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; @@ -183,6 +184,72 @@ public function publish(string $appId, string $channel, stdClass $payload): bool return true; } + /** + * Add a member to a channel. To be called when they have + * subscribed to the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + * @param string $data + */ + public function joinChannel(string $appId, string $channel, string $socketId, string $data) + { + $this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]); + } + + /** + * Remove a member from the channel. To be called when they have + * unsubscribed from the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + */ + public function leaveChannel(string $appId, string $channel, string $socketId) + { + $this->publishClient->__call('hdel', ["$appId:$channel", $socketId]); + } + + /** + * Retrieve the full information about the members in a presence channel. + * + * @param string $appId + * @param string $channel + * @return PromiseInterface + */ + public function channelMembers(string $appId, string $channel): PromiseInterface + { + return $this->publishClient->__call('hgetall', ["$appId:$channel"]) + ->then(function ($members) { + // The data is expected as objects, so we need to JSON decode + return array_walk($members, function ($user) { + return json_decode($user); + }); + }); + } + + /** + * Get the amount of users subscribed for each presence channel. + * + * @param string $appId + * @param array $channelNames + * @return PromiseInterface + */ + public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface + { + $this->publishClient->__call('multi', []); + + foreach ($channelNames as $channel) { + $this->publishClient->__call('hlen', ["$appId:$channel"]); + } + + return $this->publishClient->__call('exec', []) + ->then(function ($data) use ($channelNames) { + return array_combine($channelNames, $data); + }); + } + /** * Build the Redis connection URL from Laravel database config. * diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index 5131ea3764..e515e5c962 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -4,6 +4,7 @@ use stdClass; use React\EventLoop\LoopInterface; +use React\Promise\PromiseInterface; interface ReplicationInterface { @@ -40,4 +41,43 @@ public function subscribe(string $appId, string $channel): bool; * @return bool */ public function unsubscribe(string $appId, string $channel): bool; + + /** + * Add a member to a channel. To be called when they have + * subscribed to the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + * @param string $data + */ + public function joinChannel(string $appId, string $channel, string $socketId, string $data); + + /** + * Remove a member from the channel. To be called when they have + * unsubscribed from the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + */ + public function leaveChannel(string $appId, string $channel, string $socketId); + + /** + * Retrieve the full information about the members in a presence channel. + * + * @param string $appId + * @param string $channel + * @return PromiseInterface + */ + public function channelMembers(string $appId, string $channel): PromiseInterface; + + /** + * Get the amount of users subscribed for each presence channel. + * + * @param string $appId + * @param array $channelNames + * @return PromiseInterface + */ + public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface; } diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 9db18adc7d..b5c8413d49 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -22,6 +22,11 @@ public function __construct(string $channelName) $this->channelName = $channelName; } + public function getChannelName(): string + { + return $this->channelName; + } + public function hasConnections(): bool { return count($this->subscribedConnections) > 0; @@ -32,6 +37,9 @@ public function getSubscribedConnections(): array return $this->subscribedConnections; } + /** + * @throws InvalidSignature + */ protected function verifySignature(ConnectionInterface $connection, stdClass $payload) { $signature = "{$connection->socketId}:{$this->channelName}"; @@ -40,12 +48,15 @@ protected function verifySignature(ConnectionInterface $connection, stdClass $pa $signature .= ":{$payload->channel_data}"; } - if (Str::after($payload->auth, ':') !== hash_hmac('sha256', $signature, $connection->app->secret)) { + if (! hash_equals( + hash_hmac('sha256', $signature, $connection->app->secret), + Str::after($payload->auth, ':')) + ) { throw new InvalidSignature(); } } - /* + /** * @link https://pusher.com/docs/pusher_protocol#presence-channel-events */ public function subscribe(ConnectionInterface $connection, stdClass $payload) @@ -128,7 +139,7 @@ public function broadcastToEveryoneExcept($payload, ?string $socketId = null) } } - public function toArray(): array + public function toArray() { return [ 'occupied' => count($this->subscribedConnections) > 0, diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index bb6ec45f3c..21cab8798b 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -4,18 +4,43 @@ use stdClass; use Ratchet\ConnectionInterface; +use React\Promise\PromiseInterface; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; +use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; class PresenceChannel extends Channel { protected $users = []; - public function getUsers(): array + /** + * @param string $appId + * @return array|PromiseInterface + */ + public function getUsers(string $appId) { + if (config('websockets.replication.enabled') === true) { + // Get the members list from the replication backend + return app(ReplicationInterface::class) + ->channelMembers($appId, $this->channelName); + } + return $this->users; } - /* + /** + * @return array + */ + public function getUserCount() + { + return count($this->users); + } + + /** * @link https://pusher.com/docs/pusher_protocol#presence-channel-events + * + * @param ConnectionInterface $connection + * @param stdClass $payload + * @throws InvalidSignature */ public function subscribe(ConnectionInterface $connection, stdClass $payload) { @@ -26,12 +51,36 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) $channelData = json_decode($payload->channel_data); $this->users[$connection->socketId] = $channelData; - // Send the success event - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->channelName, - 'data' => json_encode($this->getChannelData()), - ])); + if (config('websockets.replication.enabled') === true) { + // Add the connection as a member of the channel + app(ReplicationInterface::class) + ->joinChannel( + $connection->app->id, + $this->channelName, + $connection->socketId, + json_encode($channelData) + ); + + // We need to pull the channel data from the replication backend, + // otherwise we won't be sending the full details of the channel + app(ReplicationInterface::class) + ->channelMembers($connection->app->id, $this->channelName) + ->then(function ($users) use ($connection) { + // Send the success event + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->channelName, + 'data' => json_encode($this->getChannelData($users)), + ])); + }); + } else { + // Send the success event + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->channelName, + 'data' => json_encode($this->getChannelData($this->users)), + ])); + } $this->broadcastToOthers($connection, [ 'event' => 'pusher_internal:member_added', @@ -48,6 +97,16 @@ public function unsubscribe(ConnectionInterface $connection) return; } + if (config('websockets.replication.enabled') === true) { + // Remove the connection as a member of the channel + app(ReplicationInterface::class) + ->leaveChannel( + $connection->app->id, + $this->channelName, + $connection->socketId + ); + } + $this->broadcastToOthers($connection, [ 'event' => 'pusher_internal:member_removed', 'channel' => $this->channelName, @@ -59,38 +118,51 @@ public function unsubscribe(ConnectionInterface $connection) unset($this->users[$connection->socketId]); } - protected function getChannelData(): array + /** + * @return PromiseInterface|array + */ + public function toArray(string $appId = null) { - return [ - 'presence' => [ - 'ids' => $this->getUserIds(), - 'hash' => $this->getHash(), - 'count' => count($this->users), - ], - ]; - } + if (config('websockets.replication.enabled') === true) { + return app(ReplicationInterface::class) + ->channelMembers($appId, $this->channelName) + ->then(function ($users) { + return array_merge(parent::toArray(), [ + 'user_count' => count($users), + ]); + }); + } - public function toArray(): array - { return array_merge(parent::toArray(), [ 'user_count' => count($this->users), ]); } - protected function getUserIds(): array + protected function getChannelData(array $users): array + { + return [ + 'presence' => [ + 'ids' => $this->getUserIds($users), + 'hash' => $this->getHash($users), + 'count' => count($users), + ], + ]; + } + + protected function getUserIds(array $users): array { $userIds = array_map(function ($channelData) { return (string) $channelData->user_id; - }, $this->users); + }, $users); return array_values($userIds); } - protected function getHash(): array + protected function getHash(array $users): array { $hash = []; - foreach ($this->users as $socketId => $channelData) { + foreach ($users as $socketId => $channelData) { $hash[$channelData->user_id] = $channelData->user_info; } diff --git a/src/WebSockets/Channels/PrivateChannel.php b/src/WebSockets/Channels/PrivateChannel.php index 34f3ac0fe7..03d8e428b7 100644 --- a/src/WebSockets/Channels/PrivateChannel.php +++ b/src/WebSockets/Channels/PrivateChannel.php @@ -4,9 +4,13 @@ use stdClass; use Ratchet\ConnectionInterface; +use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; class PrivateChannel extends Channel { + /** + * @throws InvalidSignature + */ public function subscribe(ConnectionInterface $connection, stdClass $payload) { $this->verifySignature($connection, $payload); From b7ae9bac4a7695f7499a1c50fef5769390ae53c5 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Fri, 5 Apr 2019 15:30:41 -0400 Subject: [PATCH 08/30] Add tests for replication, fix bugs in the implementation --- .../Controllers/FetchChannelController.php | 2 +- src/PubSub/Fake/FakeReplication.php | 126 ++++++++++++++++++ src/PubSub/Redis/RedisClient.php | 4 +- src/PubSub/ReplicationInterface.php | 2 + src/WebSockets/Channels/Channel.php | 4 +- src/WebSockets/Channels/PresenceChannel.php | 5 +- tests/Channels/ChannelReplicationTest.php | 17 +++ tests/Channels/ChannelTest.php | 2 +- .../PresenceChannelReplicationTest.php | 17 +++ tests/Channels/PresenceChannelTest.php | 71 ++++++++++ tests/ClientProviders/AppTest.php | 6 +- tests/HttpApi/FetchChannelReplicationTest.php | 17 +++ tests/HttpApi/FetchChannelTest.php | 32 +++++ .../HttpApi/FetchChannelsReplicationTest.php | 17 +++ tests/HttpApi/FetchUsersReplicationTest.php | 17 +++ tests/TestCase.php | 1 + tests/TestsReplication.php | 22 +++ 17 files changed, 351 insertions(+), 11 deletions(-) create mode 100644 src/PubSub/Fake/FakeReplication.php create mode 100644 tests/Channels/ChannelReplicationTest.php create mode 100644 tests/Channels/PresenceChannelReplicationTest.php create mode 100644 tests/HttpApi/FetchChannelReplicationTest.php create mode 100644 tests/HttpApi/FetchChannelsReplicationTest.php create mode 100644 tests/HttpApi/FetchUsersReplicationTest.php create mode 100644 tests/TestsReplication.php diff --git a/src/HttpApi/Controllers/FetchChannelController.php b/src/HttpApi/Controllers/FetchChannelController.php index 6a24fd5e2c..188e08cc4e 100644 --- a/src/HttpApi/Controllers/FetchChannelController.php +++ b/src/HttpApi/Controllers/FetchChannelController.php @@ -15,6 +15,6 @@ public function __invoke(Request $request) throw new HttpException(404, "Unknown channel `{$request->channelName}`."); } - return $channel->toArray(); + return $channel->toArray($request->appId); } } diff --git a/src/PubSub/Fake/FakeReplication.php b/src/PubSub/Fake/FakeReplication.php new file mode 100644 index 0000000000..5b3e42930a --- /dev/null +++ b/src/PubSub/Fake/FakeReplication.php @@ -0,0 +1,126 @@ +channels["$appId:$channel"][$socketId] = $data; + } + + /** + * Remove a member from the channel. To be called when they have + * unsubscribed from the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + */ + public function leaveChannel(string $appId, string $channel, string $socketId) + { + unset($this->channels["$appId:$channel"][$socketId]); + if (empty($this->channels["$appId:$channel"])) { + unset($this->channels["$appId:$channel"]); + } + } + + /** + * Retrieve the full information about the members in a presence channel. + * + * @param string $appId + * @param string $channel + * @return PromiseInterface + */ + public function channelMembers(string $appId, string $channel) : PromiseInterface + { + $data = array_map(function ($user) { + return json_decode($user); + }, $this->channels["$appId:$channel"]); + + return new FulfilledPromise($data); + } + + /** + * Get the amount of users subscribed for each presence channel. + * + * @param string $appId + * @param array $channelNames + * @return PromiseInterface + */ + public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface + { + $data = []; + + foreach ($channelNames as $channel) { + $data[$channel] = count($this->channels["$appId:$channel"]); + } + + return new FulfilledPromise($data); + } +} diff --git a/src/PubSub/Redis/RedisClient.php b/src/PubSub/Redis/RedisClient.php index a2ea8dbb95..4cc3e18917 100644 --- a/src/PubSub/Redis/RedisClient.php +++ b/src/PubSub/Redis/RedisClient.php @@ -223,9 +223,9 @@ public function channelMembers(string $appId, string $channel): PromiseInterface return $this->publishClient->__call('hgetall', ["$appId:$channel"]) ->then(function ($members) { // The data is expected as objects, so we need to JSON decode - return array_walk($members, function ($user) { + return array_map(function ($user) { return json_decode($user); - }); + }, $members); }); } diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index e515e5c962..3e120af528 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -29,6 +29,7 @@ public function publish(string $appId, string $channel, stdClass $payload): bool /** * Subscribe to receive messages for a channel. * + * @param string $appId * @param string $channel * @return bool */ @@ -37,6 +38,7 @@ public function subscribe(string $appId, string $channel): bool; /** * Unsubscribe from a channel. * + * @param string $appId * @param string $channel * @return bool */ diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index b5c8413d49..87e81e095a 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -115,7 +115,7 @@ public function broadcastToOthers(ConnectionInterface $connection, $payload) if (config('websockets.replication.enabled') === true) { // Also broadcast via the other websocket servers app(ReplicationInterface::class) - ->publish($connection->app->id, $payload); + ->publish($connection->app->id, $this->channelName, $payload); } $this->broadcastToEveryoneExcept($payload, $connection->socketId); @@ -139,7 +139,7 @@ public function broadcastToEveryoneExcept($payload, ?string $socketId = null) } } - public function toArray() + public function toArray(string $appId = null) { return [ 'occupied' => count($this->subscribedConnections) > 0, diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index 21cab8798b..b382bb6b11 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -82,7 +82,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) ])); } - $this->broadcastToOthers($connection, [ + $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_added', 'channel' => $this->channelName, 'data' => json_encode($channelData), @@ -107,7 +107,7 @@ public function unsubscribe(ConnectionInterface $connection) ); } - $this->broadcastToOthers($connection, [ + $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_removed', 'channel' => $this->channelName, 'data' => json_encode([ @@ -119,6 +119,7 @@ public function unsubscribe(ConnectionInterface $connection) } /** + * @param string|null $appId * @return PromiseInterface|array */ public function toArray(string $appId = null) diff --git a/tests/Channels/ChannelReplicationTest.php b/tests/Channels/ChannelReplicationTest.php new file mode 100644 index 0000000000..f8e08727f2 --- /dev/null +++ b/tests/Channels/ChannelReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/Channels/ChannelTest.php b/tests/Channels/ChannelTest.php index 41272fa194..ebaac7563a 100644 --- a/tests/Channels/ChannelTest.php +++ b/tests/Channels/ChannelTest.php @@ -123,7 +123,7 @@ public function channels_can_broadcast_messages_to_all_connections_except_the_gi $channel = $this->getChannel($connection1, 'test-channel'); - $channel->broadcastToOthers($connection1, [ + $channel->broadcastToOthers($connection1, (object) [ 'event' => 'broadcasted-event', 'channel' => 'test-channel', ]); diff --git a/tests/Channels/PresenceChannelReplicationTest.php b/tests/Channels/PresenceChannelReplicationTest.php new file mode 100644 index 0000000000..70702715b0 --- /dev/null +++ b/tests/Channels/PresenceChannelReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/Channels/PresenceChannelTest.php b/tests/Channels/PresenceChannelTest.php index 8a86560135..6add602202 100644 --- a/tests/Channels/PresenceChannelTest.php +++ b/tests/Channels/PresenceChannelTest.php @@ -59,4 +59,75 @@ public function clients_with_valid_auth_signatures_can_join_presence_channels() 'channel' => 'presence-channel', ]); } + + /** @test */ + public function clients_with_valid_auth_signatures_can_leave_presence_channels() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $channelData = [ + 'user_id' => 1, + 'user_info' => [ + 'name' => 'Marcel', + ], + ]; + + $signature = "{$connection->socketId}:presence-channel:".json_encode($channelData); + + $message = new Message(json_encode([ + 'event' => 'pusher:subscribe', + 'data' => [ + 'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret), + 'channel' => 'presence-channel', + 'channel_data' => json_encode($channelData), + ], + ])); + + $this->pusherServer->onMessage($connection, $message); + + $connection->assertSentEvent('pusher_internal:subscription_succeeded', [ + 'channel' => 'presence-channel', + ]); + + $message = new Message(json_encode([ + 'event' => 'pusher:unsubscribe', + 'data' => [ + 'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret), + 'channel' => 'presence-channel', + ], + ])); + + $this->pusherServer->onMessage($connection, $message); + } + + /** @test */ + public function clients_with_valid_auth_signatures_cannot_leave_channels_they_are_not_in() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $channelData = [ + 'user_id' => 1, + 'user_info' => [ + 'name' => 'Marcel', + ], + ]; + + $signature = "{$connection->socketId}:presence-channel:".json_encode($channelData); + + $message = new Message(json_encode([ + 'event' => 'pusher:unsubscribe', + 'data' => [ + 'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret), + 'channel' => 'presence-channel', + ], + ])); + + $this->pusherServer->onMessage($connection, $message); + + $this->markTestAsPassed(); + } } diff --git a/tests/ClientProviders/AppTest.php b/tests/ClientProviders/AppTest.php index 71393d7dae..73345ac155 100644 --- a/tests/ClientProviders/AppTest.php +++ b/tests/ClientProviders/AppTest.php @@ -11,7 +11,7 @@ class AppTest extends TestCase /** @test */ public function it_can_create_a_client() { - new App(1, 'appKey', 'appSecret', 'new'); + new App(1, 'appKey', 'appSecret'); $this->markTestAsPassed(); } @@ -21,7 +21,7 @@ public function it_will_not_accept_an_empty_appKey() { $this->expectException(InvalidApp::class); - new App(1, '', 'appSecret', 'new'); + new App(1, '', 'appSecret'); } /** @test */ @@ -29,6 +29,6 @@ public function it_will_not_accept_an_empty_appSecret() { $this->expectException(InvalidApp::class); - new App(1, 'appKey', '', 'new'); + new App(1, 'appKey', ''); } } diff --git a/tests/HttpApi/FetchChannelReplicationTest.php b/tests/HttpApi/FetchChannelReplicationTest.php new file mode 100644 index 0000000000..84f4c51a3a --- /dev/null +++ b/tests/HttpApi/FetchChannelReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/HttpApi/FetchChannelTest.php b/tests/HttpApi/FetchChannelTest.php index 50fcaf1316..dd4abf26b8 100644 --- a/tests/HttpApi/FetchChannelTest.php +++ b/tests/HttpApi/FetchChannelTest.php @@ -66,6 +66,38 @@ public function it_returns_the_channel_information() ], json_decode($response->getContent(), true)); } + /** @test */ + public function it_returns_presence_channel_information() + { + $this->joinPresenceChannel('presence-channel'); + $this->joinPresenceChannel('presence-channel'); + + $connection = new Connection(); + + $requestPath = '/apps/1234/channel/my-channel'; + $routeParams = [ + 'appId' => '1234', + 'channelName' => 'presence-channel', + ]; + + $queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath); + + $request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams)); + + $controller = app(FetchChannelController::class); + + $controller->onOpen($connection, $request); + + /** @var JsonResponse $response */ + $response = array_pop($connection->sentRawData); + + $this->assertSame([ + 'occupied' => true, + 'subscription_count' => 2, + 'user_count' => 2, + ], json_decode($response->getContent(), true)); + } + /** @test */ public function it_returns_404_for_invalid_channels() { diff --git a/tests/HttpApi/FetchChannelsReplicationTest.php b/tests/HttpApi/FetchChannelsReplicationTest.php new file mode 100644 index 0000000000..24eb9b419a --- /dev/null +++ b/tests/HttpApi/FetchChannelsReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/HttpApi/FetchUsersReplicationTest.php b/tests/HttpApi/FetchUsersReplicationTest.php new file mode 100644 index 0000000000..2d959a8ceb --- /dev/null +++ b/tests/HttpApi/FetchUsersReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 14d3655428..7b00aedb64 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -49,6 +49,7 @@ protected function getEnvironmentSetUp($app) 'id' => 1234, 'key' => 'TestKey', 'secret' => 'TestSecret', + 'host' => 'localhost', 'capacity' => null, 'enable_client_messages' => false, 'enable_statistics' => true, diff --git a/tests/TestsReplication.php b/tests/TestsReplication.php new file mode 100644 index 0000000000..c0fa2f0fdf --- /dev/null +++ b/tests/TestsReplication.php @@ -0,0 +1,22 @@ +singleton(ReplicationInterface::class, function () { + return (new FakeReplication())->boot(Factory::create()); + }); + + config([ + 'websockets.replication.enabled' => true, + 'websockets.replication.driver' => 'fake', + ]); + } +} From faf2c75d3d3241f40a4c94902f2028f63f3f7d2d Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 22 Apr 2019 11:05:28 -0400 Subject: [PATCH 09/30] Fix redis-pusher broadcast driver, wrong params for extend() callable --- src/WebSocketsServiceProvider.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index 558c8ef429..e9ce735ad0 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -46,7 +46,7 @@ public function boot(BroadcastManager $broadcastManager) Console\CleanStatistics::class, ]); - $broadcastManager->extend('redis-pusher', function (array $config) { + $broadcastManager->extend('redis-pusher', function ($app, array $config) { $pusher = new Pusher( $config['key'], $config['secret'], $config['app_id'], $config['options'] ?? [] From ed5503407e440a7beb32d1bc733f82ed760f7d12 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Wed, 15 May 2019 17:11:33 -0400 Subject: [PATCH 10/30] Fix mistake during rebase --- src/HttpApi/Controllers/FetchChannelsController.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/HttpApi/Controllers/FetchChannelsController.php b/src/HttpApi/Controllers/FetchChannelsController.php index 73a82894d0..0ea96814de 100644 --- a/src/HttpApi/Controllers/FetchChannelsController.php +++ b/src/HttpApi/Controllers/FetchChannelsController.php @@ -6,9 +6,9 @@ use Illuminate\Http\Request; use Illuminate\Support\Collection; use React\Promise\PromiseInterface; +use Symfony\Component\HttpKernel\Exception\HttpException; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel; -use Symfony\Component\HttpKernel\Exception\HttpException; class FetchChannelsController extends Controller { @@ -47,7 +47,7 @@ public function __invoke(Request $request) // We return a promise since the backend runs async. We get $counts back // as a key-value array of channel names and their member count. - return $memberCounts->then(function (array $counts) use ($channels) { + return $memberCounts->then(function (array $counts) use ($channels, $attributes) { return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) { return $counts[$channel->getChannelName()]; }); @@ -67,7 +67,7 @@ protected function collectUserCounts(Collection $channels, array $attributes, ca if (in_array('user_count', $attributes)) { $info->user_count = $transformer($channel); } - + return $info; })->toArray() ?: new \stdClass, ]; From d7c30f3b0f6105f97000e74ebb5864d8d063fde5 Mon Sep 17 00:00:00 2001 From: anthony Date: Sun, 28 Jul 2019 20:50:10 +0200 Subject: [PATCH 11/30] cleanup & refactor of pubsub code --- composer.json | 3 +- phpunit.xml.dist | 1 + src/Console/StartWebSocketServer.php | 12 +- .../RedisPusherBroadcaster.php | 2 +- src/PubSub/Drivers/EmptyClient.php | 112 ++++++++++++++++++ src/PubSub/{Redis => Drivers}/RedisClient.php | 3 +- src/WebSockets/Channels/Channel.php | 33 +++--- src/WebSocketsServiceProvider.php | 38 ++++-- .../Mocks/FakeReplicationClient.php | 4 +- tests/TestsReplication.php | 9 +- 10 files changed, 170 insertions(+), 47 deletions(-) rename src/PubSub/{Redis => Broadcasters}/RedisPusherBroadcaster.php (98%) create mode 100644 src/PubSub/Drivers/EmptyClient.php rename src/PubSub/{Redis => Drivers}/RedisClient.php (98%) rename src/PubSub/Fake/FakeReplication.php => tests/Mocks/FakeReplicationClient.php (96%) diff --git a/composer.json b/composer.json index e21a3fc9ce..f59061db93 100644 --- a/composer.json +++ b/composer.json @@ -42,7 +42,8 @@ "require-dev": { "mockery/mockery": "^1.2", "orchestra/testbench": "3.7.* || 3.8.* || ^4.0", - "phpunit/phpunit": "^7.0 || ^8.0" + "phpunit/phpunit": "^7.0 || ^8.0", + "predis/predis": "^1.1" }, "autoload": { "psr-4": { diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 5102c747e3..e1226ec7fd 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -27,5 +27,6 @@ + diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index 4b68be33d4..b88ec76b4b 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -12,7 +12,6 @@ use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; -use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient; use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Server\WebSocketServerFactory; @@ -117,7 +116,6 @@ protected function registerEchoRoutes() protected function registerCustomRoutes() { WebSocketsRouter::customRoutes(); - return $this; } @@ -140,15 +138,7 @@ protected function startWebSocketServer() protected function configurePubSubReplication() { - if (config('websockets.replication.enabled') !== true) { - return $this; - } - - if (config('websockets.replication.driver') === 'redis') { - $this->laravel->singleton(ReplicationInterface::class, function () { - return (new RedisClient())->boot($this->loop); - }); - } + app(ReplicationInterface::class)->boot($this->loop); return $this; } diff --git a/src/PubSub/Redis/RedisPusherBroadcaster.php b/src/PubSub/Broadcasters/RedisPusherBroadcaster.php similarity index 98% rename from src/PubSub/Redis/RedisPusherBroadcaster.php rename to src/PubSub/Broadcasters/RedisPusherBroadcaster.php index 990591414f..f1be3a5ece 100644 --- a/src/PubSub/Redis/RedisPusherBroadcaster.php +++ b/src/PubSub/Broadcasters/RedisPusherBroadcaster.php @@ -1,6 +1,6 @@ publishClient->__call('hset', ["$appId:$channel", 541561516, "qsgdqgsd"]); if (! isset($this->subscribedChannels["$appId:$channel"])) { // We're not subscribed to the channel yet, subscribe and set the count to 1 $this->subscribeClient->__call('subscribe', ["$appId:$channel"]); diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 87e81e095a..1d4d984e1c 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -14,12 +14,18 @@ class Channel /** @var string */ protected $channelName; + /** + * @var ReplicationInterface + */ + protected $pubSub; + /** @var \Ratchet\ConnectionInterface[] */ protected $subscribedConnections = []; public function __construct(string $channelName) { $this->channelName = $channelName; + $this->pubSub = app(ReplicationInterface::class); } public function getChannelName(): string @@ -48,7 +54,7 @@ protected function verifySignature(ConnectionInterface $connection, stdClass $pa $signature .= ":{$payload->channel_data}"; } - if (! hash_equals( + if (!hash_equals( hash_hmac('sha256', $signature, $connection->app->secret), Str::after($payload->auth, ':')) ) { @@ -63,11 +69,8 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) { $this->saveConnection($connection); - if (config('websockets.replication.enabled') === true) { - // Subscribe for broadcasted messages from the pub/sub backend - app(ReplicationInterface::class) - ->subscribe($connection->app->id, $this->channelName); - } + // Subscribe to broadcasted messages from the pub/sub backend + $this->pubSub->subscribe($connection->app->id, $this->channelName); $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', @@ -79,13 +82,10 @@ public function unsubscribe(ConnectionInterface $connection) { unset($this->subscribedConnections[$connection->socketId]); - if (config('websockets.replication.enabled') === true) { - // Unsubscribe from the pub/sub backend - app(ReplicationInterface::class) - ->unsubscribe($connection->app->id, $this->channelName); - } + // Unsubscribe from the pub/sub backend + $this->pubSub->unsubscribe($connection->app->id, $this->channelName); - if (! $this->hasConnections()) { + if (!$this->hasConnections()) { DashboardLogger::vacated($connection, $this->channelName); } } @@ -96,7 +96,7 @@ protected function saveConnection(ConnectionInterface $connection) $this->subscribedConnections[$connection->socketId] = $connection; - if (! $hadConnectionsPreviously) { + if (!$hadConnectionsPreviously) { DashboardLogger::occupied($connection, $this->channelName); } @@ -112,11 +112,8 @@ public function broadcast($payload) public function broadcastToOthers(ConnectionInterface $connection, $payload) { - if (config('websockets.replication.enabled') === true) { - // Also broadcast via the other websocket servers - app(ReplicationInterface::class) - ->publish($connection->app->id, $this->channelName, $payload); - } + // Also broadcast via the other websocket servers + $this->pubSub->publish($connection->app->id, $this->channelName, $payload); $this->broadcastToEveryoneExcept($payload, $connection->socketId); } diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index e9ce735ad0..bca993983b 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -2,6 +2,10 @@ namespace BeyondCode\LaravelWebSockets; +use BeyondCode\LaravelWebSockets\PubSub\Broadcasters\RedisPusherBroadcaster; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\EmptyClient; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use Pusher\Pusher; use Psr\Log\LoggerInterface; use Illuminate\Support\Facades\Gate; @@ -11,7 +15,6 @@ use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Apps\AppProvider; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; -use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisPusherBroadcaster; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\AuthenticateDashboard; @@ -23,15 +26,15 @@ class WebSocketsServiceProvider extends ServiceProvider { - public function boot(BroadcastManager $broadcastManager) + public function boot() { $this->publishes([ - __DIR__.'/../config/websockets.php' => base_path('config/websockets.php'), + __DIR__ . '/../config/websockets.php' => base_path('config/websockets.php'), ], 'config'); - if (! class_exists('CreateWebSocketsStatisticsEntries')) { + if (!class_exists('CreateWebSocketsStatisticsEntries')) { $this->publishes([ - __DIR__.'/../database/migrations/create_websockets_statistics_entries_table.php.stub' => database_path('migrations/'.date('Y_m_d_His', time()).'_create_websockets_statistics_entries_table.php'), + __DIR__ . '/../database/migrations/create_websockets_statistics_entries_table.php.stub' => database_path('migrations/' . date('Y_m_d_His', time()) . '_create_websockets_statistics_entries_table.php'), ], 'migrations'); } @@ -39,14 +42,31 @@ public function boot(BroadcastManager $broadcastManager) ->registerRoutes() ->registerDashboardGate(); - $this->loadViewsFrom(__DIR__.'/../resources/views/', 'websockets'); + $this->loadViewsFrom(__DIR__ . '/../resources/views/', 'websockets'); $this->commands([ Console\StartWebSocketServer::class, Console\CleanStatistics::class, ]); - $broadcastManager->extend('redis-pusher', function ($app, array $config) { + $this->configurePubSub(); + + } + + protected function configurePubSub() + { + if (config('websockets.replication.enabled') !== true || config('websockets.replication.driver') !== 'redis') { + $this->app->singleton(ReplicationInterface::class, function () { + return (new EmptyClient()); + }); + return; + } + + $this->app->singleton(ReplicationInterface::class, function () { + return (new RedisClient())->boot($this->loop); + }); + + app(BroadcastManager::class)->extend('redis-pusher', function ($app, array $config) { $pusher = new Pusher( $config['key'], $config['secret'], $config['app_id'], $config['options'] ?? [] @@ -67,7 +87,7 @@ public function boot(BroadcastManager $broadcastManager) public function register() { - $this->mergeConfigFrom(__DIR__.'/../config/websockets.php', 'websockets'); + $this->mergeConfigFrom(__DIR__ . '/../config/websockets.php', 'websockets'); $this->app->singleton('websockets.router', function () { return new Router(); @@ -88,7 +108,7 @@ protected function registerRoutes() Route::prefix(config('websockets.path'))->group(function () { Route::middleware(config('websockets.middleware', [AuthorizeDashboard::class]))->group(function () { Route::get('/', ShowDashboard::class); - Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']); + Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']); Route::post('auth', AuthenticateDashboard::class); Route::post('event', SendMessage::class); }); diff --git a/src/PubSub/Fake/FakeReplication.php b/tests/Mocks/FakeReplicationClient.php similarity index 96% rename from src/PubSub/Fake/FakeReplication.php rename to tests/Mocks/FakeReplicationClient.php index 5b3e42930a..5ad21b3f4c 100644 --- a/src/PubSub/Fake/FakeReplication.php +++ b/tests/Mocks/FakeReplicationClient.php @@ -1,6 +1,6 @@ singleton(ReplicationInterface::class, function () { - return (new FakeReplication())->boot(Factory::create()); + return (new FakeReplicationClient())->boot(Factory::create()); }); - config([ + Config::set([ 'websockets.replication.enabled' => true, - 'websockets.replication.driver' => 'fake', + 'websockets.replication.driver' => 'redis', ]); } } From 3c909b95c0ac951c1879b0c646ca9aff79a97019 Mon Sep 17 00:00:00 2001 From: anthony Date: Sun, 28 Jul 2019 21:05:00 +0200 Subject: [PATCH 12/30] remove predis from require-dev --- composer.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/composer.json b/composer.json index f59061db93..e21a3fc9ce 100644 --- a/composer.json +++ b/composer.json @@ -42,8 +42,7 @@ "require-dev": { "mockery/mockery": "^1.2", "orchestra/testbench": "3.7.* || 3.8.* || ^4.0", - "phpunit/phpunit": "^7.0 || ^8.0", - "predis/predis": "^1.1" + "phpunit/phpunit": "^7.0 || ^8.0" }, "autoload": { "psr-4": { From b5fcc137970665a132e63f4f2cdbe9f8001781d7 Mon Sep 17 00:00:00 2001 From: anthony Date: Sun, 28 Jul 2019 21:05:43 +0200 Subject: [PATCH 13/30] remove redis host --- phpunit.xml.dist | 1 - 1 file changed, 1 deletion(-) diff --git a/phpunit.xml.dist b/phpunit.xml.dist index e1226ec7fd..5102c747e3 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -27,6 +27,5 @@ - From 6e68d3d144294cecf3a644b52e04f325a14494a8 Mon Sep 17 00:00:00 2001 From: anthony Date: Sun, 28 Jul 2019 21:26:07 +0200 Subject: [PATCH 14/30] one line var doc --- src/WebSockets/Channels/Channel.php | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 1d4d984e1c..5e96ced313 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -14,9 +14,7 @@ class Channel /** @var string */ protected $channelName; - /** - * @var ReplicationInterface - */ + /** @var ReplicationInterface */ protected $pubSub; /** @var \Ratchet\ConnectionInterface[] */ From d43ac821d9f64b6ed4e322849092d0444e821719 Mon Sep 17 00:00:00 2001 From: anthony Date: Sun, 28 Jul 2019 21:28:35 +0200 Subject: [PATCH 15/30] remove test code --- src/PubSub/Drivers/RedisClient.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index 71888314b5..e4abe7ce8f 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -129,7 +129,6 @@ protected function onMessage(string $redisChannel, string $payload) */ public function subscribe(string $appId, string $channel): bool { - $this->publishClient->__call('hset', ["$appId:$channel", 541561516, "qsgdqgsd"]); if (! isset($this->subscribedChannels["$appId:$channel"])) { // We're not subscribed to the channel yet, subscribe and set the count to 1 $this->subscribeClient->__call('subscribe', ["$appId:$channel"]); From 11e1f89b5ec44a0462a50abb56dc5d2774697856 Mon Sep 17 00:00:00 2001 From: Arthur Vandenberghe Date: Sun, 28 Jul 2019 21:29:16 +0200 Subject: [PATCH 16/30] Merge pull request #1 from deviouspk/analysis-z3nD5L Apply fixes from StyleCI --- src/Console/StartWebSocketServer.php | 1 + src/PubSub/Drivers/EmptyClient.php | 3 --- src/WebSockets/Channels/Channel.php | 6 +++--- src/WebSocketsServiceProvider.php | 22 +++++++++++----------- tests/TestsReplication.php | 4 ++-- 5 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index b88ec76b4b..f3ee0e8929 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -116,6 +116,7 @@ protected function registerEchoRoutes() protected function registerCustomRoutes() { WebSocketsRouter::customRoutes(); + return $this; } diff --git a/src/PubSub/Drivers/EmptyClient.php b/src/PubSub/Drivers/EmptyClient.php index 9b24156a5a..84101fc608 100644 --- a/src/PubSub/Drivers/EmptyClient.php +++ b/src/PubSub/Drivers/EmptyClient.php @@ -10,7 +10,6 @@ class EmptyClient implements ReplicationInterface { - /** * Boot the pub/sub provider (open connections, initial subscriptions, etc). * @@ -70,7 +69,6 @@ public function unsubscribe(string $appId, string $channel) : bool */ public function joinChannel(string $appId, string $channel, string $socketId, string $data) { - } /** @@ -83,7 +81,6 @@ public function joinChannel(string $appId, string $channel, string $socketId, st */ public function leaveChannel(string $appId, string $channel, string $socketId) { - } /** diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 5e96ced313..d072cbd334 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -52,7 +52,7 @@ protected function verifySignature(ConnectionInterface $connection, stdClass $pa $signature .= ":{$payload->channel_data}"; } - if (!hash_equals( + if (! hash_equals( hash_hmac('sha256', $signature, $connection->app->secret), Str::after($payload->auth, ':')) ) { @@ -83,7 +83,7 @@ public function unsubscribe(ConnectionInterface $connection) // Unsubscribe from the pub/sub backend $this->pubSub->unsubscribe($connection->app->id, $this->channelName); - if (!$this->hasConnections()) { + if (! $this->hasConnections()) { DashboardLogger::vacated($connection, $this->channelName); } } @@ -94,7 +94,7 @@ protected function saveConnection(ConnectionInterface $connection) $this->subscribedConnections[$connection->socketId] = $connection; - if (!$hadConnectionsPreviously) { + if (! $hadConnectionsPreviously) { DashboardLogger::occupied($connection, $this->channelName); } diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index bca993983b..264ab701bc 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -2,10 +2,6 @@ namespace BeyondCode\LaravelWebSockets; -use BeyondCode\LaravelWebSockets\PubSub\Broadcasters\RedisPusherBroadcaster; -use BeyondCode\LaravelWebSockets\PubSub\Drivers\EmptyClient; -use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; -use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use Pusher\Pusher; use Psr\Log\LoggerInterface; use Illuminate\Support\Facades\Gate; @@ -14,9 +10,13 @@ use Illuminate\Broadcasting\BroadcastManager; use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Apps\AppProvider; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\EmptyClient; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard; +use BeyondCode\LaravelWebSockets\PubSub\Broadcasters\RedisPusherBroadcaster; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\AuthenticateDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\DashboardApiController; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager; @@ -29,12 +29,12 @@ class WebSocketsServiceProvider extends ServiceProvider public function boot() { $this->publishes([ - __DIR__ . '/../config/websockets.php' => base_path('config/websockets.php'), + __DIR__.'/../config/websockets.php' => base_path('config/websockets.php'), ], 'config'); - if (!class_exists('CreateWebSocketsStatisticsEntries')) { + if (! class_exists('CreateWebSocketsStatisticsEntries')) { $this->publishes([ - __DIR__ . '/../database/migrations/create_websockets_statistics_entries_table.php.stub' => database_path('migrations/' . date('Y_m_d_His', time()) . '_create_websockets_statistics_entries_table.php'), + __DIR__.'/../database/migrations/create_websockets_statistics_entries_table.php.stub' => database_path('migrations/'.date('Y_m_d_His', time()).'_create_websockets_statistics_entries_table.php'), ], 'migrations'); } @@ -42,7 +42,7 @@ public function boot() ->registerRoutes() ->registerDashboardGate(); - $this->loadViewsFrom(__DIR__ . '/../resources/views/', 'websockets'); + $this->loadViewsFrom(__DIR__.'/../resources/views/', 'websockets'); $this->commands([ Console\StartWebSocketServer::class, @@ -50,15 +50,15 @@ public function boot() ]); $this->configurePubSub(); - } protected function configurePubSub() { if (config('websockets.replication.enabled') !== true || config('websockets.replication.driver') !== 'redis') { $this->app->singleton(ReplicationInterface::class, function () { - return (new EmptyClient()); + return new EmptyClient(); }); + return; } @@ -87,7 +87,7 @@ protected function configurePubSub() public function register() { - $this->mergeConfigFrom(__DIR__ . '/../config/websockets.php', 'websockets'); + $this->mergeConfigFrom(__DIR__.'/../config/websockets.php', 'websockets'); $this->app->singleton('websockets.router', function () { return new Router(); diff --git a/tests/TestsReplication.php b/tests/TestsReplication.php index cc41b5a070..437f8b38e1 100644 --- a/tests/TestsReplication.php +++ b/tests/TestsReplication.php @@ -2,10 +2,10 @@ namespace BeyondCode\LaravelWebSockets\Tests; -use BeyondCode\LaravelWebSockets\Tests\Mocks\FakeReplicationClient; -use Illuminate\Support\Facades\Config; use React\EventLoop\Factory; +use Illuminate\Support\Facades\Config; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; +use BeyondCode\LaravelWebSockets\Tests\Mocks\FakeReplicationClient; trait TestsReplication { From 060b9860589e1ac80172e4684bde42de4f78accf Mon Sep 17 00:00:00 2001 From: anthony Date: Sun, 28 Jul 2019 21:33:30 +0200 Subject: [PATCH 17/30] resolve app from local variables --- src/WebSocketsServiceProvider.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index 264ab701bc..c117330591 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -66,7 +66,7 @@ protected function configurePubSub() return (new RedisClient())->boot($this->loop); }); - app(BroadcastManager::class)->extend('redis-pusher', function ($app, array $config) { + $this->app->get(BroadcastManager::class)->extend('redis-pusher', function ($app, array $config) { $pusher = new Pusher( $config['key'], $config['secret'], $config['app_id'], $config['options'] ?? [] @@ -95,11 +95,11 @@ public function register() $this->app->singleton(ChannelManager::class, function () { return config('websockets.channel_manager') !== null && class_exists(config('websockets.channel_manager')) - ? app(config('websockets.channel_manager')) : new ArrayChannelManager(); + ? $this->app->get(config('websockets.channel_manager')) : new ArrayChannelManager(); }); $this->app->singleton(AppProvider::class, function () { - return app(config('websockets.app_provider')); + return $this->app->get(config('websockets.app_provider')); }); } @@ -124,7 +124,7 @@ protected function registerRoutes() protected function registerDashboardGate() { Gate::define('viewWebSocketsDashboard', function ($user = null) { - return app()->environment('local'); + return $this->app->environment('local'); }); return $this; From f2b3347f89b65db50acf2517f6fb6aa1a67cb297 Mon Sep 17 00:00:00 2001 From: anthony Date: Sun, 28 Jul 2019 21:37:28 +0200 Subject: [PATCH 18/30] resolve app from local variables in console --- src/Console/StartWebSocketServer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index f3ee0e8929..979cd05558 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -139,7 +139,7 @@ protected function startWebSocketServer() protected function configurePubSubReplication() { - app(ReplicationInterface::class)->boot($this->loop); + $this->laravel->get(ReplicationInterface::class)->boot($this->loop); return $this; } From 373b993e64c9c24d57d08707e6f8b25ea3ecd1d7 Mon Sep 17 00:00:00 2001 From: anthony Date: Sun, 28 Jul 2019 21:57:24 +0200 Subject: [PATCH 19/30] rename emptyclient to localclient --- src/PubSub/Drivers/{EmptyClient.php => LocalClient.php} | 2 +- src/WebSocketsServiceProvider.php | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename src/PubSub/Drivers/{EmptyClient.php => LocalClient.php} (98%) diff --git a/src/PubSub/Drivers/EmptyClient.php b/src/PubSub/Drivers/LocalClient.php similarity index 98% rename from src/PubSub/Drivers/EmptyClient.php rename to src/PubSub/Drivers/LocalClient.php index 84101fc608..f610a0f9a6 100644 --- a/src/PubSub/Drivers/EmptyClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -8,7 +8,7 @@ use React\Promise\PromiseInterface; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; -class EmptyClient implements ReplicationInterface +class LocalClient implements ReplicationInterface { /** * Boot the pub/sub provider (open connections, initial subscriptions, etc). diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index c117330591..b34be3affc 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -10,7 +10,7 @@ use Illuminate\Broadcasting\BroadcastManager; use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Apps\AppProvider; -use BeyondCode\LaravelWebSockets\PubSub\Drivers\EmptyClient; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient; use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; @@ -56,7 +56,7 @@ protected function configurePubSub() { if (config('websockets.replication.enabled') !== true || config('websockets.replication.driver') !== 'redis') { $this->app->singleton(ReplicationInterface::class, function () { - return new EmptyClient(); + return new LocalClient(); }); return; From 00e8f3e1a8450900749dcb3a047280d48ee79ad8 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 29 Jul 2019 16:20:48 -0400 Subject: [PATCH 20/30] Add channel storage to LocalDriver to simplify PresenceChannel logic --- .../Controllers/FetchChannelsController.php | 36 +++--- src/PubSub/Drivers/LocalClient.php | 33 ++++- src/PubSub/Drivers/RedisClient.php | 2 +- src/WebSockets/Channels/PresenceChannel.php | 115 ++++++++---------- 4 files changed, 95 insertions(+), 91 deletions(-) diff --git a/src/HttpApi/Controllers/FetchChannelsController.php b/src/HttpApi/Controllers/FetchChannelsController.php index 0ea96814de..96f7141f43 100644 --- a/src/HttpApi/Controllers/FetchChannelsController.php +++ b/src/HttpApi/Controllers/FetchChannelsController.php @@ -32,30 +32,24 @@ public function __invoke(Request $request) }); } - if (config('websockets.replication.enabled') === true) { - // We want to get the channel user count all in one shot when - // using a replication backend rather than doing individual queries. - // To do so, we first collect the list of channel names. - $channelNames = $channels->map(function (PresenceChannel $channel) use ($request) { - return $channel->getChannelName(); - })->toArray(); + // We want to get the channel user count all in one shot when + // using a replication backend rather than doing individual queries. + // To do so, we first collect the list of channel names. + $channelNames = $channels->map(function (PresenceChannel $channel) use ($request) { + return $channel->getChannelName(); + })->toArray(); - /** @var PromiseInterface $memberCounts */ - // We ask the replication backend to get us the member count per channel - $memberCounts = app(ReplicationInterface::class) - ->channelMemberCounts($request->appId, $channelNames); + /** @var PromiseInterface $memberCounts */ + // We ask the replication backend to get us the member count per channel + $memberCounts = app(ReplicationInterface::class) + ->channelMemberCounts($request->appId, $channelNames); - // We return a promise since the backend runs async. We get $counts back - // as a key-value array of channel names and their member count. - return $memberCounts->then(function (array $counts) use ($channels, $attributes) { - return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) { - return $counts[$channel->getChannelName()]; - }); + // We return a promise since the backend runs async. We get $counts back + // as a key-value array of channel names and their member count. + return $memberCounts->then(function (array $counts) use ($channels, $attributes) { + return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) { + return $counts[$channel->getChannelName()]; }); - } - - return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) { - return $channel->getUserCount(); }); } diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php index f610a0f9a6..2dfc1faea0 100644 --- a/src/PubSub/Drivers/LocalClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -10,6 +10,13 @@ class LocalClient implements ReplicationInterface { + /** + * Mapping of the presence JSON data for users in each channel + * + * @var string[][] + */ + protected $channelData = []; + /** * Boot the pub/sub provider (open connections, initial subscriptions, etc). * @@ -31,6 +38,7 @@ public function boot(LoopInterface $loop) : ReplicationInterface */ public function publish(string $appId, string $channel, stdClass $payload) : bool { + // Nothing to do, nobody to publish to return true; } @@ -69,6 +77,7 @@ public function unsubscribe(string $appId, string $channel) : bool */ public function joinChannel(string $appId, string $channel, string $socketId, string $data) { + $this->channelData["$appId:$channel"][$socketId] = $data; } /** @@ -81,6 +90,10 @@ public function joinChannel(string $appId, string $channel, string $socketId, st */ public function leaveChannel(string $appId, string $channel, string $socketId) { + unset($this->channelData["$appId:$channel"][$socketId]); + if (empty($this->channelData["$appId:$channel"])) { + unset($this->channelData["$appId:$channel"]); + } } /** @@ -92,7 +105,14 @@ public function leaveChannel(string $appId, string $channel, string $socketId) */ public function channelMembers(string $appId, string $channel) : PromiseInterface { - return new FulfilledPromise(null); + $members = $this->channelData["$appId:$channel"] ?? []; + + // The data is expected as objects, so we need to JSON decode + $members = array_map(function ($user) { + return json_decode($user); + }, $members); + + return new FulfilledPromise($members); } /** @@ -104,6 +124,15 @@ public function channelMembers(string $appId, string $channel) : PromiseInterfac */ public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface { - return new FulfilledPromise(null); + $results = []; + + // Count the number of users per channel + foreach ($channelNames as $channel) { + $results[$channel] = isset($this->channelData["$appId:$channel"]) + ? count($this->channelData["$appId:$channel"]) + : 0; + } + + return new FulfilledPromise($results); } } diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index e4abe7ce8f..ce9c8fb4d3 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -97,7 +97,7 @@ protected function onMessage(string $redisChannel, string $payload) // expect the channel name to not include the app ID. $payload->channel = Str::after($redisChannel, "$appId:"); - /* @var $channelManager ChannelManager */ + /* @var ChannelManager $channelManager */ $channelManager = app(ChannelManager::class); // Load the Channel instance, if any diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index b382bb6b11..895e96a354 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -10,6 +10,16 @@ class PresenceChannel extends Channel { + /** + * Data for the users connected to this channel + * + * Note: If replication is enabled, this will only contain entries + * for the users directly connected to this server instance. Requests + * for data for all users in the channel should be routed through + * ReplicationInterface. + * + * @var string[] + */ protected $users = []; /** @@ -18,21 +28,9 @@ class PresenceChannel extends Channel */ public function getUsers(string $appId) { - if (config('websockets.replication.enabled') === true) { - // Get the members list from the replication backend - return app(ReplicationInterface::class) - ->channelMembers($appId, $this->channelName); - } - - return $this->users; - } - - /** - * @return array - */ - public function getUserCount() - { - return count($this->users); + // Get the members list from the replication backend + return app(ReplicationInterface::class) + ->channelMembers($appId, $this->channelName); } /** @@ -51,36 +49,27 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) $channelData = json_decode($payload->channel_data); $this->users[$connection->socketId] = $channelData; - if (config('websockets.replication.enabled') === true) { - // Add the connection as a member of the channel - app(ReplicationInterface::class) - ->joinChannel( - $connection->app->id, - $this->channelName, - $connection->socketId, - json_encode($channelData) - ); - - // We need to pull the channel data from the replication backend, - // otherwise we won't be sending the full details of the channel - app(ReplicationInterface::class) - ->channelMembers($connection->app->id, $this->channelName) - ->then(function ($users) use ($connection) { - // Send the success event - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->channelName, - 'data' => json_encode($this->getChannelData($users)), - ])); - }); - } else { - // Send the success event - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->channelName, - 'data' => json_encode($this->getChannelData($this->users)), - ])); - } + // Add the connection as a member of the channel + app(ReplicationInterface::class) + ->joinChannel( + $connection->app->id, + $this->channelName, + $connection->socketId, + json_encode($channelData) + ); + + // We need to pull the channel data from the replication backend, + // otherwise we won't be sending the full details of the channel + app(ReplicationInterface::class) + ->channelMembers($connection->app->id, $this->channelName) + ->then(function ($users) use ($connection) { + // Send the success event + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->channelName, + 'data' => json_encode($this->getChannelData($users)), + ])); + }); $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_added', @@ -97,15 +86,13 @@ public function unsubscribe(ConnectionInterface $connection) return; } - if (config('websockets.replication.enabled') === true) { - // Remove the connection as a member of the channel - app(ReplicationInterface::class) - ->leaveChannel( - $connection->app->id, - $this->channelName, - $connection->socketId - ); - } + // Remove the connection as a member of the channel + app(ReplicationInterface::class) + ->leaveChannel( + $connection->app->id, + $this->channelName, + $connection->socketId + ); $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_removed', @@ -124,19 +111,13 @@ public function unsubscribe(ConnectionInterface $connection) */ public function toArray(string $appId = null) { - if (config('websockets.replication.enabled') === true) { - return app(ReplicationInterface::class) - ->channelMembers($appId, $this->channelName) - ->then(function ($users) { - return array_merge(parent::toArray(), [ - 'user_count' => count($users), - ]); - }); - } - - return array_merge(parent::toArray(), [ - 'user_count' => count($this->users), - ]); + return app(ReplicationInterface::class) + ->channelMembers($appId, $this->channelName) + ->then(function ($users) { + return array_merge(parent::toArray(), [ + 'user_count' => count($users), + ]); + }); } protected function getChannelData(array $users): array From 990a075b201096c7b9b82e320e62a965868e7549 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 29 Jul 2019 17:20:22 -0400 Subject: [PATCH 21/30] Avoid calls to app() --- .../Controllers/FetchChannelsController.php | 16 ++++++++++++---- src/WebSockets/Channels/Channel.php | 10 +++++----- src/WebSockets/Channels/PresenceChannel.php | 15 +++++++-------- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/HttpApi/Controllers/FetchChannelsController.php b/src/HttpApi/Controllers/FetchChannelsController.php index 96f7141f43..ed44872a24 100644 --- a/src/HttpApi/Controllers/FetchChannelsController.php +++ b/src/HttpApi/Controllers/FetchChannelsController.php @@ -5,13 +5,23 @@ use Illuminate\Support\Str; use Illuminate\Http\Request; use Illuminate\Support\Collection; -use React\Promise\PromiseInterface; use Symfony\Component\HttpKernel\Exception\HttpException; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; +use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel; class FetchChannelsController extends Controller { + /** @var ReplicationInterface */ + protected $replication; + + public function __construct(ChannelManager $channelManager, ReplicationInterface $replication) + { + parent::__construct($channelManager); + + $this->replication = $replication; + } + public function __invoke(Request $request) { $attributes = []; @@ -39,10 +49,8 @@ public function __invoke(Request $request) return $channel->getChannelName(); })->toArray(); - /** @var PromiseInterface $memberCounts */ // We ask the replication backend to get us the member count per channel - $memberCounts = app(ReplicationInterface::class) - ->channelMemberCounts($request->appId, $channelNames); + $memberCounts = $this->replication->channelMemberCounts($request->appId, $channelNames); // We return a promise since the backend runs async. We get $counts back // as a key-value array of channel names and their member count. diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index d072cbd334..8cf146938a 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -15,7 +15,7 @@ class Channel protected $channelName; /** @var ReplicationInterface */ - protected $pubSub; + protected $replication; /** @var \Ratchet\ConnectionInterface[] */ protected $subscribedConnections = []; @@ -23,7 +23,7 @@ class Channel public function __construct(string $channelName) { $this->channelName = $channelName; - $this->pubSub = app(ReplicationInterface::class); + $this->replication = app(ReplicationInterface::class); } public function getChannelName(): string @@ -68,7 +68,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) $this->saveConnection($connection); // Subscribe to broadcasted messages from the pub/sub backend - $this->pubSub->subscribe($connection->app->id, $this->channelName); + $this->replication->subscribe($connection->app->id, $this->channelName); $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', @@ -81,7 +81,7 @@ public function unsubscribe(ConnectionInterface $connection) unset($this->subscribedConnections[$connection->socketId]); // Unsubscribe from the pub/sub backend - $this->pubSub->unsubscribe($connection->app->id, $this->channelName); + $this->replication->unsubscribe($connection->app->id, $this->channelName); if (! $this->hasConnections()) { DashboardLogger::vacated($connection, $this->channelName); @@ -111,7 +111,7 @@ public function broadcast($payload) public function broadcastToOthers(ConnectionInterface $connection, $payload) { // Also broadcast via the other websocket servers - $this->pubSub->publish($connection->app->id, $this->channelName, $payload); + $this->replication->publish($connection->app->id, $this->channelName, $payload); $this->broadcastToEveryoneExcept($payload, $connection->socketId); } diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index 895e96a354..2578c70574 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -5,7 +5,6 @@ use stdClass; use Ratchet\ConnectionInterface; use React\Promise\PromiseInterface; -use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; class PresenceChannel extends Channel @@ -24,12 +23,12 @@ class PresenceChannel extends Channel /** * @param string $appId - * @return array|PromiseInterface + * @return PromiseInterface */ public function getUsers(string $appId) { // Get the members list from the replication backend - return app(ReplicationInterface::class) + return $this->replication ->channelMembers($appId, $this->channelName); } @@ -50,7 +49,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) $this->users[$connection->socketId] = $channelData; // Add the connection as a member of the channel - app(ReplicationInterface::class) + $this->replication ->joinChannel( $connection->app->id, $this->channelName, @@ -60,7 +59,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) // We need to pull the channel data from the replication backend, // otherwise we won't be sending the full details of the channel - app(ReplicationInterface::class) + $this->replication ->channelMembers($connection->app->id, $this->channelName) ->then(function ($users) use ($connection) { // Send the success event @@ -87,7 +86,7 @@ public function unsubscribe(ConnectionInterface $connection) } // Remove the connection as a member of the channel - app(ReplicationInterface::class) + $this->replication ->leaveChannel( $connection->app->id, $this->channelName, @@ -107,11 +106,11 @@ public function unsubscribe(ConnectionInterface $connection) /** * @param string|null $appId - * @return PromiseInterface|array + * @return PromiseInterface */ public function toArray(string $appId = null) { - return app(ReplicationInterface::class) + return $this->replication ->channelMembers($appId, $this->channelName) ->then(function ($users) { return array_merge(parent::toArray(), [ From 091f56ea15bb4ee361901e0967cc39d502d837ae Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 29 Jul 2019 17:33:27 -0400 Subject: [PATCH 22/30] Simplify controller logic due to PresenceChannel logic changes --- .../Controllers/FetchChannelsController.php | 36 ++++++++----------- .../Controllers/FetchUsersController.php | 25 +++++-------- 2 files changed, 22 insertions(+), 39 deletions(-) diff --git a/src/HttpApi/Controllers/FetchChannelsController.php b/src/HttpApi/Controllers/FetchChannelsController.php index ed44872a24..0a81520ead 100644 --- a/src/HttpApi/Controllers/FetchChannelsController.php +++ b/src/HttpApi/Controllers/FetchChannelsController.php @@ -49,29 +49,21 @@ public function __invoke(Request $request) return $channel->getChannelName(); })->toArray(); - // We ask the replication backend to get us the member count per channel - $memberCounts = $this->replication->channelMemberCounts($request->appId, $channelNames); + // We ask the replication backend to get us the member count per channel. + // We get $counts back as a key-value array of channel names and their member count. + return $this->replication + ->channelMemberCounts($request->appId, $channelNames) + ->then(function (array $counts) use ($channels, $attributes) { + return [ + 'channels' => $channels->map(function (PresenceChannel $channel) use ($counts, $attributes) { + $info = new \stdClass; + if (in_array('user_count', $attributes)) { + $info->user_count = $counts[$channel->getChannelName()]; + } - // We return a promise since the backend runs async. We get $counts back - // as a key-value array of channel names and their member count. - return $memberCounts->then(function (array $counts) use ($channels, $attributes) { - return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) { - return $counts[$channel->getChannelName()]; + return $info; + })->toArray() ?: new \stdClass, + ]; }); - }); - } - - protected function collectUserCounts(Collection $channels, array $attributes, callable $transformer) - { - return [ - 'channels' => $channels->map(function (PresenceChannel $channel) use ($transformer, $attributes) { - $info = new \stdClass; - if (in_array('user_count', $attributes)) { - $info->user_count = $transformer($channel); - } - - return $info; - })->toArray() ?: new \stdClass, - ]; } } diff --git a/src/HttpApi/Controllers/FetchUsersController.php b/src/HttpApi/Controllers/FetchUsersController.php index 3d7ced71ae..9bae8c6f36 100644 --- a/src/HttpApi/Controllers/FetchUsersController.php +++ b/src/HttpApi/Controllers/FetchUsersController.php @@ -22,23 +22,14 @@ public function __invoke(Request $request) throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"'); } - $users = $channel->getUsers($request->appId); - - if ($users instanceof PromiseInterface) { - return $users->then(function (array $users) { - return $this->collectUsers($users); + return $channel + ->getUsers($request->appId) + ->then(function (array $users) { + return [ + 'users' => Collection::make($users)->map(function ($user) { + return ['id' => $user->user_id]; + })->values(), + ]; }); - } - - return $this->collectUsers($users); - } - - protected function collectUsers(array $users) - { - return [ - 'users' => Collection::make($users)->map(function ($user) { - return ['id' => $user->user_id]; - })->values(), - ]; } } From ef86f866680746b40b9a851d4a67e1fd9ff97774 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 29 Jul 2019 17:39:34 -0400 Subject: [PATCH 23/30] Attempt at making TriggerEventController also publish to other servers --- src/PubSub/Drivers/RedisClient.php | 2 +- src/WebSockets/Channels/Channel.php | 15 ++++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index ce9c8fb4d3..2c8d916d9e 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -117,7 +117,7 @@ protected function onMessage(string $redisChannel, string $payload) unset($payload->appId); // Push the message out to connected websocket clients - $channel->broadcastToEveryoneExcept($payload, $socket); + $channel->broadcastToEveryoneExcept($payload, $socket, $appId, false); } /** diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 8cf146938a..5d69510fdc 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -110,14 +110,19 @@ public function broadcast($payload) public function broadcastToOthers(ConnectionInterface $connection, $payload) { - // Also broadcast via the other websocket servers - $this->replication->publish($connection->app->id, $this->channelName, $payload); - - $this->broadcastToEveryoneExcept($payload, $connection->socketId); + $this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id); } - public function broadcastToEveryoneExcept($payload, ?string $socketId = null) + public function broadcastToEveryoneExcept($payload, ?string $socketId, string $appId, bool $publish = true) { + // Also broadcast via the other websocket server instances. + // This is set false in the Redis client because we don't want to cause a loop + // in this case. If this came from TriggerEventController, then we still want + // to publish to get the message out to other server instances. + if ($publish) { + $this->replication->publish($appId, $this->channelName, $payload); + } + // Performance optimization, if we don't have a socket ID, // then we avoid running the if condition in the foreach loop below // by calling broadcast() instead. From e259cac51eec9308774ed503688baba9d0e846ec Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Tue, 3 Sep 2019 11:50:10 -0400 Subject: [PATCH 24/30] Remove duplicate client mock client, simplify test trait --- tests/Mocks/FakeReplicationClient.php | 126 -------------------------- tests/TestsReplication.php | 5 +- 2 files changed, 2 insertions(+), 129 deletions(-) delete mode 100644 tests/Mocks/FakeReplicationClient.php diff --git a/tests/Mocks/FakeReplicationClient.php b/tests/Mocks/FakeReplicationClient.php deleted file mode 100644 index 5ad21b3f4c..0000000000 --- a/tests/Mocks/FakeReplicationClient.php +++ /dev/null @@ -1,126 +0,0 @@ -channels["$appId:$channel"][$socketId] = $data; - } - - /** - * Remove a member from the channel. To be called when they have - * unsubscribed from the channel. - * - * @param string $appId - * @param string $channel - * @param string $socketId - */ - public function leaveChannel(string $appId, string $channel, string $socketId) - { - unset($this->channels["$appId:$channel"][$socketId]); - if (empty($this->channels["$appId:$channel"])) { - unset($this->channels["$appId:$channel"]); - } - } - - /** - * Retrieve the full information about the members in a presence channel. - * - * @param string $appId - * @param string $channel - * @return PromiseInterface - */ - public function channelMembers(string $appId, string $channel) : PromiseInterface - { - $data = array_map(function ($user) { - return json_decode($user); - }, $this->channels["$appId:$channel"]); - - return new FulfilledPromise($data); - } - - /** - * Get the amount of users subscribed for each presence channel. - * - * @param string $appId - * @param array $channelNames - * @return PromiseInterface - */ - public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface - { - $data = []; - - foreach ($channelNames as $channel) { - $data[$channel] = count($this->channels["$appId:$channel"]); - } - - return new FulfilledPromise($data); - } -} diff --git a/tests/TestsReplication.php b/tests/TestsReplication.php index 437f8b38e1..e179ea0370 100644 --- a/tests/TestsReplication.php +++ b/tests/TestsReplication.php @@ -2,17 +2,16 @@ namespace BeyondCode\LaravelWebSockets\Tests; -use React\EventLoop\Factory; use Illuminate\Support\Facades\Config; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; -use BeyondCode\LaravelWebSockets\Tests\Mocks\FakeReplicationClient; trait TestsReplication { public function setupReplication() { app()->singleton(ReplicationInterface::class, function () { - return (new FakeReplicationClient())->boot(Factory::create()); + return new LocalClient(); }); Config::set([ From 5979f63af697753e7de1168e5f5ed7184c9dd246 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Tue, 3 Sep 2019 11:52:18 -0400 Subject: [PATCH 25/30] StyleCI fixes --- src/HttpApi/Controllers/FetchUsersController.php | 1 - src/PubSub/Drivers/LocalClient.php | 2 +- src/WebSockets/Channels/PresenceChannel.php | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/HttpApi/Controllers/FetchUsersController.php b/src/HttpApi/Controllers/FetchUsersController.php index 9bae8c6f36..3c404d3b49 100644 --- a/src/HttpApi/Controllers/FetchUsersController.php +++ b/src/HttpApi/Controllers/FetchUsersController.php @@ -4,7 +4,6 @@ use Illuminate\Http\Request; use Illuminate\Support\Collection; -use React\Promise\PromiseInterface; use Symfony\Component\HttpKernel\Exception\HttpException; use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel; diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php index 2dfc1faea0..9d5c5e20f7 100644 --- a/src/PubSub/Drivers/LocalClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -11,7 +11,7 @@ class LocalClient implements ReplicationInterface { /** - * Mapping of the presence JSON data for users in each channel + * Mapping of the presence JSON data for users in each channel. * * @var string[][] */ diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index 2578c70574..aec5bc8439 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -10,7 +10,7 @@ class PresenceChannel extends Channel { /** - * Data for the users connected to this channel + * Data for the users connected to this channel. * * Note: If replication is enabled, this will only contain entries * for the users directly connected to this server instance. Requests From e3c0cea77cb80e82e22236ff44bba6b491d9cbd7 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Tue, 3 Sep 2019 12:15:29 -0400 Subject: [PATCH 26/30] Fix tests failing on older versions of Laravel --- src/WebSocketsServiceProvider.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index b34be3affc..7f7fae17f3 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -95,11 +95,11 @@ public function register() $this->app->singleton(ChannelManager::class, function () { return config('websockets.channel_manager') !== null && class_exists(config('websockets.channel_manager')) - ? $this->app->get(config('websockets.channel_manager')) : new ArrayChannelManager(); + ? $this->app->make(config('websockets.channel_manager')) : new ArrayChannelManager(); }); $this->app->singleton(AppProvider::class, function () { - return $this->app->get(config('websockets.app_provider')); + return $this->app->make(config('websockets.app_provider')); }); } From db5837831bd8536375b86bb892a3f942418fc5f7 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 23 Sep 2019 15:51:00 -0400 Subject: [PATCH 27/30] Fix test warnings due to usage of deprecated assertArraySubset() Also changed app_id to strings where appropriate, in real apps they should be strings when read from environment, not ints. --- tests/ConnectionTest.php | 2 +- .../WebSocketsStatisticsControllerTest.php | 12 ++++++++---- tests/TestCase.php | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index 0c832ad8b7..3a6a974f6d 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -46,7 +46,7 @@ public function successful_connections_have_the_app_attached() $this->pusherServer->onOpen($connection); $this->assertInstanceOf(App::class, $connection->app); - $this->assertSame(1234, $connection->app->id); + $this->assertSame('1234', $connection->app->id); $this->assertSame('TestKey', $connection->app->key); $this->assertSame('TestSecret', $connection->app->secret); $this->assertSame('Test App', $connection->app->name); diff --git a/tests/Statistics/Controllers/WebSocketsStatisticsControllerTest.php b/tests/Statistics/Controllers/WebSocketsStatisticsControllerTest.php index 482f50b894..bfda847097 100644 --- a/tests/Statistics/Controllers/WebSocketsStatisticsControllerTest.php +++ b/tests/Statistics/Controllers/WebSocketsStatisticsControllerTest.php @@ -22,16 +22,20 @@ public function it_can_store_statistics() $this->assertCount(1, $entries); - $this->assertArraySubset($this->payload(), $entries->first()->attributesToArray()); + $actual = $entries->first()->attributesToArray(); + foreach ($this->payload() as $key => $value) { + $this->assertArrayHasKey($key, $actual); + $this->assertSame($value, $actual[$key]); + } } protected function payload(): array { return [ 'app_id' => config('websockets.apps.0.id'), - 'peak_connection_count' => 1, - 'websocket_message_count' => 2, - 'api_message_count' => 3, + 'peak_connection_count' => '1', + 'websocket_message_count' => '2', + 'api_message_count' => '3', ]; } } diff --git a/tests/TestCase.php b/tests/TestCase.php index 7b00aedb64..03896aff3c 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -46,7 +46,7 @@ protected function getEnvironmentSetUp($app) $app['config']->set('websockets.apps', [ [ 'name' => 'Test App', - 'id' => 1234, + 'id' => '1234', 'key' => 'TestKey', 'secret' => 'TestSecret', 'host' => 'localhost', From 6e851971c8b1c62476b95211a7afa112a349d19c Mon Sep 17 00:00:00 2001 From: rennokki Date: Thu, 13 Aug 2020 10:20:31 +0300 Subject: [PATCH 28/30] Update WebSocketsServiceProvider.php --- src/WebSocketsServiceProvider.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index d14de3284e..9877270ba0 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -19,12 +19,11 @@ use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager; use Pusher\Pusher; use Psr\Log\LoggerInterface; +use Illuminate\Broadcasting\BroadcastManager; use Illuminate\Support\Facades\Gate; use Illuminate\Support\Facades\Route; use Illuminate\Support\ServiceProvider; -use Illuminate\Broadcasting\BroadcastManager; use Illuminate\Support\Facades\Schema; -use Illuminate\Support\ServiceProvider; class WebSocketsServiceProvider extends ServiceProvider { From 3a0bcead1911fe11c056bde4a54c729881c5ce36 Mon Sep 17 00:00:00 2001 From: rennokki Date: Thu, 13 Aug 2020 13:53:08 +0300 Subject: [PATCH 29/30] wip --- config/websockets.php | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/config/websockets.php b/config/websockets.php index 613da45100..b52a631959 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -141,9 +141,6 @@ ], - /* - * You can enable replication to publish and subscribe to messages across the driver - */ /* |-------------------------------------------------------------------------- | Broadcasting Replication @@ -157,6 +154,7 @@ | WebSocket servers. | */ + 'replication' => [ 'enabled' => false, From ce84e8cc9f3debdc034e56b726ed371102b7a25e Mon Sep 17 00:00:00 2001 From: rennokki Date: Thu, 13 Aug 2020 13:55:28 +0300 Subject: [PATCH 30/30] wip --- config/websockets.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/websockets.php b/config/websockets.php index b52a631959..a2ca845400 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -140,7 +140,7 @@ 'allow_self_signed' => env('APP_ENV') !== 'production', ], - + /* |-------------------------------------------------------------------------- | Broadcasting Replication