From b79bd7fcdfe8cd753b59233090b019c1829a8434 Mon Sep 17 00:00:00 2001 From: sevavietl Date: Wed, 23 May 2018 12:19:19 +0300 Subject: [PATCH] Add sync/async sockets differentiation for broker --- src/Broker.php | 68 +++++++++++++++++++++++------------- src/Producer/SyncProcess.php | 4 +-- src/SocketFactory.php | 25 +++++++++++++ tests/Base/BrokerTest.php | 33 +++++++++++++++-- 4 files changed, 102 insertions(+), 28 deletions(-) create mode 100644 src/SocketFactory.php diff --git a/src/Broker.php b/src/Broker.php index b0553d43..fc6c365c 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -7,6 +7,7 @@ use Kafka\Sasl\Plain; use Kafka\Sasl\Scram; use function array_keys; +use function array_walk_recursive; use function explode; use function in_array; use function serialize; @@ -18,6 +19,9 @@ class Broker { use SingletonTrait; + public const SOCKET_MODE_ASYNC = 0; + public const SOCKET_MODE_SYNC = 1; + /** * @var int */ @@ -43,6 +47,9 @@ class Broker */ private $dataSockets = []; + /** @var SocketFactory */ + private $socketFactory; + /** * @var callable|null */ @@ -53,6 +60,11 @@ class Broker */ private $config; + public function setSocketFactory(SocketFactory $socketFactory): void + { + $this->socketFactory = $socketFactory; + } + public function setProcess(callable $process): void { $this->process = $process; @@ -134,12 +146,12 @@ public function getBrokers(): array return $this->brokers; } - public function getMetaConnect(string $key, bool $modeSync = false): ?CommonSocket + public function getMetaConnect(string $key, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket { - return $this->getConnect($key, 'metaSockets', $modeSync); + return $this->getConnect($key, 'metaSockets', $mode); } - public function getRandConnect(bool $modeSync = false): ?CommonSocket + public function getRandConnect(int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket { $nodeIds = array_keys($this->brokers); shuffle($nodeIds); @@ -148,24 +160,24 @@ public function getRandConnect(bool $modeSync = false): ?CommonSocket return null; } - return $this->getMetaConnect((string) $nodeIds[0], $modeSync); + return $this->getMetaConnect((string) $nodeIds[0], $mode); } - public function getDataConnect(string $key, bool $modeSync = false): ?CommonSocket + public function getDataConnect(string $key, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket { - return $this->getConnect($key, 'dataSockets', $modeSync); + return $this->getConnect($key, 'dataSockets', $mode); } - public function getConnect(string $key, string $type, bool $modeSync = false): ?CommonSocket + public function getConnect(string $key, string $type, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket { - if (isset($this->{$type}[$key])) { - return $this->{$type}[$key]; + if (isset($this->{$type}[$key][$mode])) { + return $this->{$type}[$key][$mode]; } if (isset($this->brokers[$key])) { $hostname = $this->brokers[$key]; - if (isset($this->{$type}[$hostname])) { - return $this->{$type}[$hostname]; + if (isset($this->{$type}[$hostname][$mode])) { + return $this->{$type}[$hostname][$mode]; } } @@ -182,19 +194,19 @@ public function getConnect(string $key, string $type, bool $modeSync = false): ? [$host, $port] = explode(':', $key); } - if ($host === null || $port === null || (! $modeSync && $this->process === null)) { + if ($host === null || $port === null || ($mode === self::SOCKET_MODE_ASYNC && $this->process === null)) { return null; } try { - $socket = $this->getSocket((string) $host, (int) $port, $modeSync); + $socket = $this->getSocket((string) $host, (int) $port, $mode); if ($socket instanceof Socket && $this->process !== null) { $socket->setOnReadable($this->process); } $socket->connect(); - $this->{$type}[$key] = $socket; + $this->{$type}[$key][$mode] = $socket; return $socket; } catch (\Throwable $e) { @@ -205,30 +217,29 @@ public function getConnect(string $key, string $type, bool $modeSync = false): ? public function clear(): void { - foreach ($this->metaSockets as $key => $socket) { - $socket->close(); - } - foreach ($this->dataSockets as $key => $socket) { + $sockets = [$this->metaSockets, $this->dataSockets]; + + array_walk_recursive($sockets, function (CommonSocket $socket): void { $socket->close(); - } + }); + $this->brokers = []; } /** * @throws \Kafka\Exception */ - public function getSocket(string $host, int $port, bool $modeSync): CommonSocket + public function getSocket(string $host, int $port, int $mode): CommonSocket { $saslProvider = $this->judgeConnectionConfig(); - if ($modeSync) { - return new SocketSync($host, $port, $this->config, $saslProvider); + if ($mode === self::SOCKET_MODE_SYNC) { + return $this->getSocketFactory()->createSocketSync($host, $port, $this->config, $saslProvider); } - return new Socket($host, $port, $this->config, $saslProvider); + return $this->getSocketFactory()->createSocket($host, $port, $this->config, $saslProvider); } - /** * @throws \Kafka\Exception */ @@ -281,4 +292,13 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism throw new Exception(sprintf('"%s" is an invalid SASL mechanism', $mechanism)); } + + private function getSocketFactory(): SocketFactory + { + if ($this->socketFactory === null) { + $this->socketFactory = new SocketFactory(); + } + + return $this->socketFactory; + } } diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 71b1cb06..206a8045 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -67,7 +67,7 @@ public function send(array $recordSet): array $sendData = $this->convertRecordSet($recordSet); $result = []; foreach ($sendData as $brokerId => $topicList) { - $connect = $broker->getDataConnect((string) $brokerId, true); + $connect = $broker->getDataConnect((string) $brokerId, Broker::SOCKET_MODE_SYNC); if ($connect === null) { return []; @@ -118,7 +118,7 @@ public function syncMeta(): void $broker = $this->getBroker(); foreach ($brokerHost as $host) { - $socket = $broker->getMetaConnect($host, true); + $socket = $broker->getMetaConnect($host, Broker::SOCKET_MODE_SYNC); if ($socket === null) { continue; diff --git a/src/SocketFactory.php b/src/SocketFactory.php new file mode 100644 index 00000000..4633800a --- /dev/null +++ b/src/SocketFactory.php @@ -0,0 +1,25 @@ +assertEquals($topics, $broker->getTopics()); } - public function getConnect(): void + public function testGetConnect(): void { $broker = $this->getBroker(); $data = [ @@ -128,6 +129,34 @@ public function getConnect(): void $this->assertNull($result); } + public function testGetConnectSyncAndAsyncForTheSameBroker(): void + { + $socket = $this->getMockBuilder(Socket::class) + ->disableOriginalConstructor() + ->getMock(); + + $socketSync = $this->getMockBuilder(SocketSync::class) + ->disableOriginalConstructor() + ->getMock(); + + $socketFactory = $this->getMockBuilder(SocketFactory::class) + ->setMethods(['createSocket', 'createSocketSync']) + ->getMock(); + + $socketFactory->method('createSocket') + ->willReturn($socket); + $socketFactory->method('createSocketSync') + ->willReturn($socketSync); + + $broker = $this->getBroker(); + $broker->setSocketFactory($socketFactory); + $broker->setProcess(function (): void { + }); + + $this->assertSame($socket, $broker->getConnect('kafka:9092', 'metaSockets')); + $this->assertSame($socketSync, $broker->getConnect('kafka:9092', 'metaSockets', Broker::SOCKET_MODE_SYNC)); + } + public function testConnectRandFalse(): void { $broker = $this->getBroker(); @@ -141,7 +170,7 @@ public function testGetSocketNotSetConfig(): void $broker = $this->getBroker(); $hostname = '127.0.0.1'; $port = 9092; - $socket = $broker->getSocket($hostname, $port, true); + $socket = $broker->getSocket($hostname, $port, Broker::SOCKET_MODE_SYNC); $this->assertInstanceOf(SocketSync::class, $socket); }