Skip to content

Commit a35317d

Browse files
committed
Add new internal callAsync() API
1 parent 41e54b7 commit a35317d

File tree

6 files changed

+52
-52
lines changed

6 files changed

+52
-52
lines changed

phpstan.neon.dist

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,3 @@ parameters:
99
ignoreErrors:
1010
# ignore undefined methods due to magic `__call()` method
1111
- '/^Call to an undefined method Clue\\React\\Redis\\RedisClient::.+\(\)\.$/'
12-
- '/^Call to an undefined method Clue\\React\\Redis\\Io\\StreamingClient::.+\(\)\.$/'

src/Io/Factory.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,13 @@ public function createClient(string $uri): PromiseInterface
9696
// use `?password=secret` query or `user:secret@host` password form URL
9797
if (isset($args['password']) || isset($parts['pass'])) {
9898
$pass = $args['password'] ?? rawurldecode($parts['pass']); // @phpstan-ignore-line
99+
\assert(\is_string($pass));
99100
$promise = $promise->then(function (StreamingClient $redis) use ($pass, $uri) {
100-
return $redis->auth($pass)->then(
101+
return $redis->callAsync('auth', $pass)->then(
101102
function () use ($redis) {
102103
return $redis;
103104
},
104-
function (\Exception $e) use ($redis, $uri) {
105+
function (\Throwable $e) use ($redis, $uri) {
105106
$redis->close();
106107

107108
$const = '';
@@ -124,12 +125,13 @@ function (\Exception $e) use ($redis, $uri) {
124125
// use `?db=1` query or `/1` path (skip first slash)
125126
if (isset($args['db']) || (isset($parts['path']) && $parts['path'] !== '/')) {
126127
$db = $args['db'] ?? substr($parts['path'], 1); // @phpstan-ignore-line
128+
\assert(\is_string($db) && \preg_match('/^\d+$/', $db));
127129
$promise = $promise->then(function (StreamingClient $redis) use ($db, $uri) {
128-
return $redis->select($db)->then(
130+
return $redis->callAsync('select', $db)->then(
129131
function () use ($redis) {
130132
return $redis;
131133
},
132-
function (\Exception $e) use ($redis, $uri) {
134+
function (\Throwable $e) use ($redis, $uri) {
133135
$redis->close();
134136

135137
$const = '';

src/Io/StreamingClient.php

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,14 @@ public function __construct(DuplexStreamInterface $stream, ParserInterface $pars
7272
}
7373

7474
/**
75-
* @param string[] $args
7675
* @return PromiseInterface<mixed>
7776
*/
78-
public function __call(string $name, array $args): PromiseInterface
77+
public function callAsync(string $command, string ...$args): PromiseInterface
7978
{
8079
$request = new Deferred();
8180
$promise = $request->promise();
8281

83-
$name = strtolower($name);
82+
$command = strtolower($command);
8483

8584
// special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied
8685
static $pubsubs = ['subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'];
@@ -90,22 +89,22 @@ public function __call(string $name, array $args): PromiseInterface
9089
'Connection ' . ($this->closed ? 'closed' : 'closing'). ' (ENOTCONN)',
9190
defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107
9291
));
93-
} elseif (count($args) !== 1 && in_array($name, $pubsubs)) {
92+
} elseif (count($args) !== 1 && in_array($command, $pubsubs)) {
9493
$request->reject(new \InvalidArgumentException(
9594
'PubSub commands limited to single argument (EINVAL)',
9695
defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22
9796
));
98-
} elseif ($name === 'monitor') {
97+
} elseif ($command === 'monitor') {
9998
$request->reject(new \BadMethodCallException(
10099
'MONITOR command explicitly not supported (ENOTSUP)',
101100
defined('SOCKET_ENOTSUP') ? SOCKET_ENOTSUP : (defined('SOCKET_EOPNOTSUPP') ? SOCKET_EOPNOTSUPP : 95)
102101
));
103102
} else {
104-
$this->stream->write($this->serializer->getRequestMessage($name, $args));
103+
$this->stream->write($this->serializer->getRequestMessage($command, $args));
105104
$this->requests []= $request;
106105
}
107106

108-
if (in_array($name, $pubsubs)) {
107+
if (in_array($command, $pubsubs)) {
109108
$promise->then(function (array $array) {
110109
$first = array_shift($array);
111110

src/RedisClient.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,17 +178,17 @@ public function __call(string $name, array $args): PromiseInterface
178178
));
179179
}
180180

181-
return $this->client()->then(function (StreamingClient $redis) use ($name, $args) {
181+
return $this->client()->then(function (StreamingClient $redis) use ($name, $args): PromiseInterface {
182182
$this->awake();
183-
assert(\is_callable([$redis, $name])); // @phpstan-ignore-next-line
184-
return \call_user_func_array([$redis, $name], $args)->then(
183+
return $redis->callAsync($name, ...$args)->then(
185184
function ($result) {
186185
$this->idle();
187186
return $result;
188187
},
189-
function (\Exception $error) {
188+
function (\Throwable $e) {
189+
\assert($e instanceof \Exception);
190190
$this->idle();
191-
throw $error;
191+
throw $e;
192192
}
193193
);
194194
});

tests/Io/StreamingClientTest.php

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function testSending(): void
4646
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message'));
4747
$this->stream->expects($this->once())->method('write')->with($this->equalTo('message'));
4848

49-
$this->redis->ping();
49+
$this->redis->callAsync('ping');
5050
}
5151

5252
public function testClosingClientEmitsEvent(): void
@@ -121,7 +121,7 @@ public function testPingPong(): void
121121
{
122122
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'));
123123

124-
$promise = $this->redis->ping();
124+
$promise = $this->redis->callAsync('ping');
125125

126126
$this->redis->handleMessage(new BulkReply('PONG'));
127127

@@ -131,7 +131,7 @@ public function testPingPong(): void
131131

132132
public function testMonitorCommandIsNotSupported(): void
133133
{
134-
$promise = $this->redis->monitor();
134+
$promise = $this->redis->callAsync('monitor');
135135

136136
$promise->then(null, $this->expectCallableOnceWith(
137137
$this->logicalAnd(
@@ -148,7 +148,7 @@ public function testMonitorCommandIsNotSupported(): void
148148

149149
public function testErrorReply(): void
150150
{
151-
$promise = $this->redis->invalid();
151+
$promise = $this->redis->callAsync('invalid');
152152

153153
$err = new ErrorReply("ERR unknown command 'invalid'");
154154
$this->redis->handleMessage($err);
@@ -158,7 +158,7 @@ public function testErrorReply(): void
158158

159159
public function testClosingClientRejectsAllRemainingRequests(): void
160160
{
161-
$promise = $this->redis->ping();
161+
$promise = $this->redis->callAsync('ping');
162162
$this->redis->close();
163163

164164
$promise->then(null, $this->expectCallableOnceWith(
@@ -183,7 +183,7 @@ public function testClosingStreamRejectsAllRemainingRequests(): void
183183
assert($this->serializer instanceof SerializerInterface);
184184
$this->redis = new StreamingClient($stream, $this->parser, $this->serializer);
185185

186-
$promise = $this->redis->ping();
186+
$promise = $this->redis->callAsync('ping');
187187
$stream->close();
188188

189189
$promise->then(null, $this->expectCallableOnceWith(
@@ -201,9 +201,9 @@ public function testClosingStreamRejectsAllRemainingRequests(): void
201201

202202
public function testEndingClientRejectsAllNewRequests(): void
203203
{
204-
$this->redis->ping();
204+
$this->redis->callAsync('ping');
205205
$this->redis->end();
206-
$promise = $this->redis->ping();
206+
$promise = $this->redis->callAsync('ping');
207207

208208
$promise->then(null, $this->expectCallableOnceWith(
209209
$this->logicalAnd(
@@ -221,7 +221,7 @@ public function testEndingClientRejectsAllNewRequests(): void
221221
public function testClosedClientRejectsAllNewRequests(): void
222222
{
223223
$this->redis->close();
224-
$promise = $this->redis->ping();
224+
$promise = $this->redis->callAsync('ping');
225225

226226
$promise->then(null, $this->expectCallableOnceWith(
227227
$this->logicalAnd(
@@ -250,7 +250,7 @@ public function testEndingBusyClosesClientWhenNotBusyAnymore(): void
250250
++$closed;
251251
});
252252

253-
$promise = $this->redis->ping();
253+
$promise = $this->redis->callAsync('ping');
254254
$this->assertEquals(0, $closed);
255255

256256
$this->redis->end();
@@ -277,7 +277,7 @@ public function testReceivingUnexpectedMessageThrowsException(): void
277277

278278
public function testPubsubSubscribe(): StreamingClient
279279
{
280-
$promise = $this->redis->subscribe('test');
280+
$promise = $this->redis->callAsync('subscribe', 'test');
281281
$this->expectPromiseResolve($promise);
282282

283283
$this->redis->on('subscribe', $this->expectCallableOnce());
@@ -291,7 +291,7 @@ public function testPubsubSubscribe(): StreamingClient
291291
*/
292292
public function testPubsubPatternSubscribe(StreamingClient $client): StreamingClient
293293
{
294-
$promise = $client->psubscribe('demo_*');
294+
$promise = $client->callAsync('psubscribe', 'demo_*');
295295
$this->expectPromiseResolve($promise);
296296

297297
$client->on('psubscribe', $this->expectCallableOnce());
@@ -311,7 +311,7 @@ public function testPubsubMessage(StreamingClient $client): void
311311

312312
public function testSubscribeWithMultipleArgumentsRejects(): void
313313
{
314-
$promise = $this->redis->subscribe('a', 'b');
314+
$promise = $this->redis->callAsync('subscribe', 'a', 'b');
315315

316316
$promise->then(null, $this->expectCallableOnceWith(
317317
$this->logicalAnd(
@@ -328,7 +328,7 @@ public function testSubscribeWithMultipleArgumentsRejects(): void
328328

329329
public function testUnsubscribeWithoutArgumentsRejects(): void
330330
{
331-
$promise = $this->redis->unsubscribe();
331+
$promise = $this->redis->callAsync('unsubscribe');
332332

333333
$promise->then(null, $this->expectCallableOnceWith(
334334
$this->logicalAnd(

0 commit comments

Comments
 (0)