diff --git a/src/ReactMqttClient.php b/src/ReactMqttClient.php index 9199e25..dbe0182 100644 --- a/src/ReactMqttClient.php +++ b/src/ReactMqttClient.php @@ -19,7 +19,6 @@ use BinSoul\Net\Mqtt\StreamParser; use BinSoul\Net\Mqtt\Subscription; use Evenement\EventEmitter; -use Exception; use LogicException; use React\EventLoop\LoopInterface; use React\EventLoop\TimerInterface; @@ -30,6 +29,7 @@ use React\Socket\ConnectorInterface; use React\Stream\DuplexStreamInterface; use RuntimeException; +use Throwable; /** * Connects to a MQTT broker and subscribes to topics or publishes messages. @@ -114,7 +114,7 @@ public function __construct( $this->parser = new StreamParser(new DefaultPacketFactory()); } - $this->parser->onError(function (Exception $e) { + $this->parser->onError(function (Throwable $e) { $this->emitWarning($e); }); @@ -216,7 +216,7 @@ public function connect(string $host, int $port = 1883, Connection $connection = $this->emit('connect', [$connection, $this]); $deferred->resolve($result ?: $connection); }) - ->otherwise(function (Exception $e) use ($connection, $deferred) { + ->otherwise(function (Throwable $e) use ($connection, $deferred) { $this->isConnecting = false; $this->emitError($e); @@ -229,7 +229,7 @@ public function connect(string $host, int $port = 1883, Connection $connection = $this->emit('close', [$connection, $this]); }); }) - ->otherwise(function (Exception $e) use ($deferred) { + ->otherwise(function (Throwable $e) use ($deferred) { $this->isConnecting = false; $this->emitError($e); @@ -378,7 +378,7 @@ function () use ($message, $generator, $deferred) { static function ($value) use ($deferred) { $deferred->notify($value); }, - static function (Exception $e) use ($deferred) { + static function (Throwable $e) use ($deferred) { $deferred->reject($e); } ); @@ -391,11 +391,11 @@ static function (Exception $e) use ($deferred) { /** * Emits warnings. * - * @param Exception $e + * @param Throwable $e * * @return void */ - private function emitWarning(Exception $e): void + private function emitWarning(Throwable $e): void { $this->emit('warning', [$e, $this]); } @@ -403,11 +403,11 @@ private function emitWarning(Exception $e): void /** * Emits errors. * - * @param Exception $e + * @param Throwable $e * * @return void */ - private function emitError(Exception $e): void + private function emitError(Throwable $e): void { $this->emit('error', [$e, $this]); } @@ -451,13 +451,13 @@ static function () use ($deferred, $timeout, &$future) { $this->handleClose(); }); - $stream->on('error', function (Exception $e) { + $stream->on('error', function (Throwable $e) { $this->handleError($e); }); $deferred->resolve($stream); }) - ->otherwise(static function (Exception $e) use ($deferred) { + ->otherwise(static function (Throwable $e) use ($deferred) { $deferred->reject($e); }); @@ -496,7 +496,7 @@ function () { ); $deferred->resolve($result ?: $connection); - })->otherwise(static function (Exception $e) use ($deferred) { + })->otherwise(static function (Throwable $e) use ($deferred) { $deferred->reject($e); }); @@ -649,11 +649,11 @@ private function handleClose(): void /** * Handles errors of the stream. * - * @param Exception $e + * @param Throwable $e * * @return void */ - private function handleError(Exception $e): void + private function handleError(Throwable $e): void { $this->emitError($e); } @@ -670,7 +670,7 @@ private function startFlow(Flow $flow, bool $isSilent = false): ExtendedPromiseI { try { $packet = $flow->start(); - } catch (Exception $e) { + } catch (Throwable $e) { $this->emitError($e); return new RejectedPromise($e); @@ -708,7 +708,7 @@ private function continueFlow(ReactFlow $flow, Packet $packet): void { try { $response = $flow->next($packet); - } catch (Exception $e) { + } catch (Throwable $e) { $this->emitError($e); return;