From 220f00a9cf11bbd996f172fae8f8bd740bd86fab Mon Sep 17 00:00:00 2001 From: Niels Theen Date: Wed, 1 Mar 2017 12:46:14 +0100 Subject: [PATCH 1/3] Test error events from other streams on request object --- tests/ServerTest.php | 118 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/tests/ServerTest.php b/tests/ServerTest.php index 32db8ef3..700d34bc 100644 --- a/tests/ServerTest.php +++ b/tests/ServerTest.php @@ -1023,6 +1023,124 @@ function ($data) use (&$buffer) { $this->assertInstanceOf('InvalidArgumentException', $error); } + public function testInvalidChunkHeaderResultsInErrorOnRequestStream() + { + $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); + $server = new Server($this->socket); + $server->on('request', function ($request, $response) use ($errorEvent){ + $request->on('error', $errorEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + $data .= "hello\r\hello\r\n"; + + $this->connection->emit('data', array($data)); + } + + public function testTooLongChunkHeaderResultsInErrorOnRequestStream() + { + $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); + $server = new Server($this->socket); + $server->on('request', function ($request, $response) use ($errorEvent){ + $request->on('error', $errorEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + for ($i = 0; $i < 1025; $i++) { + $data .= 'a'; + } + + $this->connection->emit('data', array($data)); + } + + public function testTooLongChunkBodyResultsInErrorOnRequestStream() + { + $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); + $server = new Server($this->socket); + $server->on('request', function ($request, $response) use ($errorEvent){ + $request->on('error', $errorEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + $data .= "5\r\nhello world\r\n"; + + $this->connection->emit('data', array($data)); + } + + public function testUnexpectedEndOfConnectionWillResultsInErrorOnRequestStream() + { + $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); + $server = new Server($this->socket); + $server->on('request', function ($request, $response) use ($errorEvent){ + $request->on('error', $errorEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + $data .= "5\r\nhello\r\n"; + + $this->connection->emit('data', array($data)); + $this->connection->emit('end'); + } + + public function testErrorInChunkedDecoderNeverClosesConnection() + { + $server = new Server($this->socket); + $server->on('request', $this->expectCallableOnce()); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + $data .= "hello\r\nhello\r\n"; + + $this->connection->emit('data', array($data)); + } + + public function testErrorInLengthLimitedStreamNeverClosesConnection() + { + $server = new Server($this->socket); + $server->on('request', $this->expectCallableOnce()); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Content-Length: 5\r\n"; + $data .= "\r\n"; + $data .= "hello"; + + $this->connection->emit('data', array($data)); + $this->connection->emit('end'); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; From f7d45c5327ccbc3ea704d75a07df1f1575c1ced5 Mon Sep 17 00:00:00 2001 From: Niels Theen Date: Tue, 21 Feb 2017 12:04:35 +0100 Subject: [PATCH 2/3] Protect streams against close of other streams on error --- src/CloseProtectionStream.php | 101 +++++++++++++++++++ src/Request.php | 3 +- src/Server.php | 6 +- tests/CloseProtectionStreamTest.php | 146 ++++++++++++++++++++++++++++ tests/RequestTest.php | 5 +- tests/ServerTest.php | 34 +++++++ 6 files changed, 287 insertions(+), 8 deletions(-) create mode 100644 src/CloseProtectionStream.php create mode 100644 tests/CloseProtectionStreamTest.php diff --git a/src/CloseProtectionStream.php b/src/CloseProtectionStream.php new file mode 100644 index 00000000..da4b2625 --- /dev/null +++ b/src/CloseProtectionStream.php @@ -0,0 +1,101 @@ +input = $input; + + $this->input->on('data', array($this, 'handleData')); + $this->input->on('end', array($this, 'handleEnd')); + $this->input->on('error', array($this, 'handleError')); + $this->input->on('close', array($this, 'close')); + } + + public function isReadable() + { + return !$this->closed && $this->input->isReadable(); + } + + public function pause() + { + if ($this->closed) { + return; + } + + $this->input->pause(); + } + + public function resume() + { + if ($this->closed) { + return; + } + + $this->input->resume(); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + $this->emit('close'); + + // 'pause' the stream avoids additional traffic transferred by this stream + $this->input->pause(); + + $this->input->removeListener('data', array($this, 'handleData')); + $this->input->removeListener('error', array($this, 'handleError')); + $this->input->removeListener('end', array($this, 'handleEnd')); + $this->input->removeListener('close', array($this, 'close')); + + $this->removeAllListeners(); + } + + /** @internal */ + public function handleData($data) + { + $this->emit('data', array($data)); + } + + /** @internal */ + public function handleEnd() + { + $this->emit('end'); + $this->close(); + } + + /** @internal */ + public function handleError(\Exception $e) + { + $this->emit('error', array($e)); + } + +} diff --git a/src/Request.php b/src/Request.php index ae8f6303..e3559560 100644 --- a/src/Request.php +++ b/src/Request.php @@ -196,9 +196,8 @@ public function close() return; } - // request closed => stop reading from the stream by pausing it $this->readable = false; - $this->stream->pause(); + $this->stream->close(); $this->emit('close'); $this->removeAllListeners(); diff --git a/src/Server.php b/src/Server.php index 4de7770b..a95028f0 100644 --- a/src/Server.php +++ b/src/Server.php @@ -140,12 +140,12 @@ public function handleRequest(ConnectionInterface $conn, RequestInterface $reque $response = new Response($conn, $request->getProtocolVersion()); - $stream = $conn; + $stream = new CloseProtectionStream($conn); if ($request->hasHeader('Transfer-Encoding')) { $transferEncodingHeader = $request->getHeader('Transfer-Encoding'); // 'chunked' must always be the final value of 'Transfer-Encoding' according to: https://tools.ietf.org/html/rfc7230#section-3.3.1 if (strtolower(end($transferEncodingHeader)) === 'chunked') { - $stream = new ChunkedDecoder($conn); + $stream = new ChunkedDecoder($stream); } } elseif ($request->hasHeader('Content-Length')) { $string = $request->getHeaderLine('Content-Length'); @@ -157,7 +157,7 @@ public function handleRequest(ConnectionInterface $conn, RequestInterface $reque return $this->writeError($conn, 400); } - $stream = new LengthLimitedStream($conn, $contentLength); + $stream = new LengthLimitedStream($stream, $contentLength); } $request = new Request($request, $stream); diff --git a/tests/CloseProtectionStreamTest.php b/tests/CloseProtectionStreamTest.php new file mode 100644 index 00000000..a85e7c10 --- /dev/null +++ b/tests/CloseProtectionStreamTest.php @@ -0,0 +1,146 @@ +getMockBuilder('React\Stream\ReadableStreamInterface')->disableOriginalConstructor()->getMock(); + $input->expects($this->once())->method('pause'); + $input->expects($this->never())->method('close'); + + $protection = new CloseProtectionStream($input); + $protection->close(); + } + + public function testErrorWontCloseStream() + { + $input = new ReadableStream(); + + $protection = new CloseProtectionStream($input); + $protection->on('error', $this->expectCallableOnce()); + $protection->on('close', $this->expectCallableNever()); + + $input->emit('error', array(new \RuntimeException())); + + $this->assertTrue($protection->isReadable()); + $this->assertTrue($input->isReadable()); + } + + public function testResumeStreamWillResumeInputStream() + { + $input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $input->expects($this->once())->method('pause'); + $input->expects($this->once())->method('resume'); + + $protection = new CloseProtectionStream($input); + $protection->pause(); + $protection->resume(); + } + + public function testInputStreamIsNotReadableAfterClose() + { + $input = new ReadableStream(); + + $protection = new CloseProtectionStream($input); + $protection->on('close', $this->expectCallableOnce()); + + $input->close(); + + $this->assertFalse($protection->isReadable()); + $this->assertFalse($input->isReadable()); + } + + public function testPipeStream() + { + $input = new ReadableStream(); + + $protection = new CloseProtectionStream($input); + $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + + $ret = $protection->pipe($dest); + + $this->assertSame($dest, $ret); + } + + public function testStopEmittingDataAfterClose() + { + $input = new ReadableStream(); + + $protection = new CloseProtectionStream($input); + $protection->on('data', $this->expectCallableNever()); + + $protection->on('close', $this->expectCallableOnce()); + + $protection->close(); + + $input->emit('data', array('hello')); + + $this->assertFalse($protection->isReadable()); + $this->assertTrue($input->isReadable()); + } + + public function testErrorIsNeverCalledAfterClose() + { + $input = new ReadableStream(); + + $protection = new CloseProtectionStream($input); + $protection->on('data', $this->expectCallableNever()); + $protection->on('error', $this->expectCallableNever()); + $protection->on('close', $this->expectCallableOnce()); + + $protection->close(); + + $input->emit('error', array(new \Exception())); + + $this->assertFalse($protection->isReadable()); + $this->assertTrue($input->isReadable()); + } + + public function testEndWontBeEmittedAfterClose() + { + $input = new ReadableStream(); + + $protection = new CloseProtectionStream($input); + $protection->on('data', $this->expectCallableNever()); + $protection->on('close', $this->expectCallableOnce()); + + $protection->close(); + + $input->emit('end', array()); + + $this->assertFalse($protection->isReadable()); + $this->assertTrue($input->isReadable()); + } + + public function testPauseAfterCloseHasNoEffect() + { + $input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $input->expects($this->once())->method('pause'); + + $protection = new CloseProtectionStream($input); + $protection->on('data', $this->expectCallableNever()); + $protection->on('close', $this->expectCallableOnce()); + + $protection->close(); + $protection->pause(); + } + + public function testResumeAfterCloseHasNoEffect() + { + $input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $input->expects($this->once())->method('pause'); + $input->expects($this->never())->method('resume'); + + $protection = new CloseProtectionStream($input); + $protection->on('data', $this->expectCallableNever()); + $protection->on('close', $this->expectCallableOnce()); + + $protection->close(); + $protection->resume(); + } +} diff --git a/tests/RequestTest.php b/tests/RequestTest.php index fa9705d3..3d010b7f 100644 --- a/tests/RequestTest.php +++ b/tests/RequestTest.php @@ -103,10 +103,9 @@ public function testCloseMultipleTimesEmitsCloseEventOnce() $request->close(); } - public function testCloseWillPauseUnderlyingStream() + public function testCloseWillCloseUnderlyingStream() { - $this->stream->expects($this->once())->method('pause'); - $this->stream->expects($this->never())->method('close'); + $this->stream->expects($this->once())->method('close'); $request = new Request(new Psr('GET', '/'), $this->stream); diff --git a/tests/ServerTest.php b/tests/ServerTest.php index 700d34bc..73f29e8e 100644 --- a/tests/ServerTest.php +++ b/tests/ServerTest.php @@ -1031,6 +1031,9 @@ public function testInvalidChunkHeaderResultsInErrorOnRequestStream() $request->on('error', $errorEvent); }); + $this->connection->expects($this->never())->method('close'); + $this->connection->expects($this->once())->method('pause'); + $this->socket->emit('connection', array($this->connection)); $data = "GET / HTTP/1.1\r\n"; @@ -1051,6 +1054,9 @@ public function testTooLongChunkHeaderResultsInErrorOnRequestStream() $request->on('error', $errorEvent); }); + $this->connection->expects($this->never())->method('close'); + $this->connection->expects($this->once())->method('pause'); + $this->socket->emit('connection', array($this->connection)); $data = "GET / HTTP/1.1\r\n"; @@ -1073,6 +1079,9 @@ public function testTooLongChunkBodyResultsInErrorOnRequestStream() $request->on('error', $errorEvent); }); + $this->connection->expects($this->never())->method('close'); + $this->connection->expects($this->once())->method('pause'); + $this->socket->emit('connection', array($this->connection)); $data = "GET / HTTP/1.1\r\n"; @@ -1093,6 +1102,9 @@ public function testUnexpectedEndOfConnectionWillResultsInErrorOnRequestStream() $request->on('error', $errorEvent); }); + $this->connection->expects($this->never())->method('close'); + $this->connection->expects($this->once())->method('pause'); + $this->socket->emit('connection', array($this->connection)); $data = "GET / HTTP/1.1\r\n"; @@ -1111,6 +1123,9 @@ public function testErrorInChunkedDecoderNeverClosesConnection() $server = new Server($this->socket); $server->on('request', $this->expectCallableOnce()); + $this->connection->expects($this->never())->method('close'); + $this->connection->expects($this->once())->method('pause'); + $this->socket->emit('connection', array($this->connection)); $data = "GET / HTTP/1.1\r\n"; @@ -1128,6 +1143,9 @@ public function testErrorInLengthLimitedStreamNeverClosesConnection() $server = new Server($this->socket); $server->on('request', $this->expectCallableOnce()); + $this->connection->expects($this->never())->method('close'); + $this->connection->expects($this->once())->method('pause'); + $this->socket->emit('connection', array($this->connection)); $data = "GET / HTTP/1.1\r\n"; @@ -1141,6 +1159,22 @@ public function testErrorInLengthLimitedStreamNeverClosesConnection() $this->connection->emit('end'); } + public function testCloseRequestWillPauseConnection() + { + $server = new Server($this->socket); + $server->on('request', function ($request, $response) { + $request->close(); + }); + + $this->connection->expects($this->never())->method('close'); + $this->connection->expects($this->once())->method('pause'); + + $this->socket->emit('connection', array($this->connection)); + + $data = $this->createGetRequest(); + $this->connection->emit('data', array($data)); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; From 9969b6792fdc196b0199ae47b519a94785bfad1d Mon Sep 17 00:00:00 2001 From: Niels Theen Date: Wed, 1 Mar 2017 13:09:05 +0100 Subject: [PATCH 3/3] Update README --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 39f3b901..cedaee25 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,20 @@ $http->on('error', function (Exception $e) { }); ``` +An `error` event will be emitted for the `Request` if the validation of the body data fails. +This can be e.g. invalid chunked decoded data or an unexpected `end` event. + +```php +$http->on('request', function (Request $request, Response $response) { + $request->on('error', function (\Exception $error) { + echo $error->getMessage(); + }); +}); +``` + +Such an error will `pause` the connection instead of closing it. A response message +can still be sent. + ### Request The `Request` class is responsible for streaming the incoming request body