diff --git a/src/ChunkedDecoder.php b/src/ChunkedDecoder.php new file mode 100644 index 00000000..e9b68608 --- /dev/null +++ b/src/ChunkedDecoder.php @@ -0,0 +1,144 @@ +input = $input; + + $this->input->on('data', array($this, 'handleChunkHeader')); + $this->input->on('end', array($this, 'handleEnd')); + $this->input->on('error', array($this, 'handleError')); + $this->input->on('close', array($this, 'close')); + } + + public function isReadable() + { + return ! $this->closed && $this->input->isReadable(); + } + + public function pause() + { + $this->input->pause(); + } + + public function resume() + { + $this->input->resume(); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->buffer = ''; + + $this->closed = true; + + $this->input->close(); + + $this->emit('close'); + $this->removeAllListeners(); + } + + + /** @internal */ + public function handleChunkHeader($data) + { + $this->buffer .= $data; + + $hexValue = strtok($this->buffer, static::CRLF); + if (dechex(hexdec($hexValue)) != $hexValue) { + $this->emit('error', array(new \Exception('Unable to identify ' . $hexValue . 'as hexadecimal number'))); + $this->close(); + return; + } + + if (strpos($this->buffer, static::CRLF) !== false) { + $this->chunkSize = hexdec($hexValue); + + $data = substr($this->buffer, strlen($hexValue) + 2); + $this->buffer = ''; + // Chunk header is complete + $this->input->removeListener('data', array($this, 'handleChunkHeader')); + $this->input->on('data', array($this, 'handleChunkData')); + if ($data !== '') { + $this->input->emit('data', array($data)); + } + } + } + + /** @internal */ + public function handleChunkData($data) + { + $this->buffer .= $data; + $chunk = substr($this->buffer, 0, $this->chunkSize); + $this->actualChunksize = strlen($chunk); + + if ($this->chunkSize === $this->actualChunksize) { + if ($this->chunkSize === 0 && strpos($this->buffer, static::CRLF) !== false) { + $this->emit('end', array()); + $this->close(); + return; + } + + if (strpos($this->buffer, static::CRLF) === false) { + return; + } + + $data = substr($this->buffer , $this->chunkSize + 2); + $this->emit('data', array($chunk)); + + $this->buffer = ''; + $this->chunkSize = 0; + // chunk body is complete + $this->input->removeListener('data', array($this, 'handleChunkData')); + $this->input->on('data', array($this, 'handleChunkHeader')); + if ($data !== '') { + $this->input->emit('data', array($data)); + } + } + } + + /** @internal */ + public function handleEnd() + { + if (!$this->closed) { + $this->buffer = ''; + $this->emit('end'); + $this->close(); + } + } + + /** @internal */ + public function handleError(\Exception $e) + { + $this->emit('error', array($e)); + $this->close(); + } +} diff --git a/src/Server.php b/src/Server.php index f6e45a53..add44d6e 100644 --- a/src/Server.php +++ b/src/Server.php @@ -22,7 +22,9 @@ public function __construct(SocketServerInterface $io) // TODO: multipart parsing $parser = new RequestHeaderParser(); - $parser->on('headers', function (Request $request, $bodyBuffer) use ($conn, $parser, $that) { + $listener = array($parser, 'feed'); + + $parser->on('headers', function (Request $request, $bodyBuffer) use ($conn, $parser, $that, $listener) { // attach remote ip to the request as metadata $request->remoteAddress = $conn->getRemoteAddress(); @@ -30,18 +32,11 @@ public function __construct(SocketServerInterface $io) $request->on('pause', array($conn, 'pause')); $request->on('resume', array($conn, 'resume')); - $that->handleRequest($conn, $request, $bodyBuffer); + $conn->removeListener('data', $listener); - $conn->removeListener('data', array($parser, 'feed')); - $conn->on('end', function () use ($request) { - $request->emit('end'); - }); - $conn->on('data', function ($data) use ($request) { - $request->emit('data', array($data)); - }); + $that->handleRequest($conn, $request, $bodyBuffer); }); - $listener = array($parser, 'feed'); $conn->on('data', $listener); $parser->on('error', function() use ($conn, $listener, $that) { // TODO: return 400 response @@ -62,7 +57,24 @@ public function handleRequest(ConnectionInterface $conn, Request $request, $body return; } + $stream = $conn; + if ($request->hasHeader('Transfer-Encoding')) { + $transferEncodingHeader = $request->getHeader('Transfer-Encoding'); + // 'chunked' must always be the final value of 'Transfer-Encoding' according to: https://tools.ietf.org/html/rfc7230#section-3.3.1 + if (strtolower(end($transferEncodingHeader)) === 'chunked') { + $stream = new ChunkedDecoder($conn); + } + } + + $stream->on('data', function ($data) use ($request) { + $request->emit('data', array($data)); + }); + + $stream->on('end', function () use ($request) { + $request->emit('end', array()); + }); + $this->emit('request', array($request, $response)); - $request->emit('data', array($bodyBuffer)); + $conn->emit('data', array($bodyBuffer)); } } diff --git a/tests/ChunkedDecoderTest.php b/tests/ChunkedDecoderTest.php new file mode 100644 index 00000000..98a719cd --- /dev/null +++ b/tests/ChunkedDecoderTest.php @@ -0,0 +1,230 @@ +input = new ReadableStream(); + $this->parser = new ChunkedDecoder($this->input); + } + + public function testSimpleChunk() + { + $this->parser->on('data', $this->expectCallableOnceWith('hello')); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('close', $this->expectCallableNever()); + + $this->input->emit('data', array("5\r\nhello\r\n")); + } + + public function testTwoChunks() + { + $this->parser->on('data', $this->expectCallableConsecutive(2, array('hello', 'bla'))); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('close', $this->expectCallableNever()); + + $this->input->emit('data', array("5\r\nhello\r\n3\r\nbla\r\n")); + } + + public function testEnd() + { + $this->parser->on('end', $this->expectCallableOnce()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('error', $this->expectCallableNever()); + + $this->input->emit('data', array("0\r\n\r\n")); + } + + public function testParameterWithEnd() + { + $this->parser->on('data', $this->expectCallableConsecutive(2, array('hello', 'bla'))); + $this->parser->on('end', $this->expectCallableOnce()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('error', $this->expectCallableNever()); + + $this->input->emit('data', array("5\r\nhello\r\n3\r\nbla\r\n0\r\n\r\n")); + } + + public function testInvalidChunk() + { + $this->parser->on('data', $this->expectCallableNever()); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('error', $this->expectCallableOnce()); + + $this->input->emit('data', array("bla\r\n")); + } + + public function testNeverEnd() + { + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('close', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableNever()); + + $this->input->emit('data', array("0\r\n")); + } + + public function testWrongChunkHex() + { + $this->parser->on('error', $this->expectCallableOnce()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableNever()); + + $this->input->emit('data', array("2\r\na\r\n5\r\nhello\r\n")); + } + + public function testSplittedChunk() + { + $this->parser->on('data', $this->expectCallableOnceWith('welt')); + $this->parser->on('close', $this->expectCallableNever()); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableNever()); + + $this->input->emit('data', array("4\r\n")); + $this->input->emit('data', array("welt\r\n")); + } + + public function testSplittedHeader() + { + $this->parser->on('data', $this->expectCallableOnceWith('welt')); + $this->parser->on('close', $this->expectCallableNever()); + $this->parser->on('end', $this->expectCallableNever());# + $this->parser->on('error', $this->expectCallableNever()); + + + $this->input->emit('data', array("4")); + $this->input->emit('data', array("\r\nwelt\r\n")); + } + + public function testSplittedBoth() + { + $this->parser->on('data', $this->expectCallableOnceWith('welt')); + $this->parser->on('close', $this->expectCallableNever()); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableNever()); + + $this->input->emit('data', array("4")); + $this->input->emit('data', array("\r\n")); + $this->input->emit('data', array("welt\r\n")); + } + + public function testCompletlySplitted() + { + $this->parser->on('data', $this->expectCallableOnceWith('welt')); + $this->parser->on('close', $this->expectCallableNever()); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableNever()); + + $this->input->emit('data', array("4")); + $this->input->emit('data', array("\r\n")); + $this->input->emit('data', array("we")); + $this->input->emit('data', array("lt\r\n")); + } + + public function testMixed() + { + $this->parser->on('data', $this->expectCallableConsecutive(2, array('welt', 'hello'))); + $this->parser->on('close', $this->expectCallableNever()); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableNever()); + + $this->input->emit('data', array("4")); + $this->input->emit('data', array("\r\n")); + $this->input->emit('data', array("we")); + $this->input->emit('data', array("lt\r\n")); + $this->input->emit('data', array("5\r\nhello\r\n")); + } + + public function testBigger() + { + $this->parser->on('data', $this->expectCallableConsecutive(2, array('abcdeabcdeabcdea', 'hello'))); + $this->parser->on('close', $this->expectCallableNever()); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableNever()); + + $this->input->emit('data', array("1")); + $this->input->emit('data', array("0")); + $this->input->emit('data', array("\r\n")); + $this->input->emit('data', array("abcdeabcdeabcdea\r\n")); + $this->input->emit('data', array("5\r\nhello\r\n")); + } + + public function testOneUnfinished() + { + $this->parser->on('data', $this->expectCallableOnceWith('bla')); + $this->parser->on('close', $this->expectCallableNever()); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableNever()); + + $this->input->emit('data', array("3\r\n")); + $this->input->emit('data', array("bla\r\n")); + $this->input->emit('data', array("5\r\nhello")); + } + + public function testHandleError() + { + $this->parser->on('error', $this->expectCallableOnce()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableNever()); + + $this->input->emit('error', array(new \RuntimeException())); + + $this->assertFalse($this->parser->isReadable()); + } + + public function testPauseStream() + { + $input = $this->getMock('React\Stream\ReadableStreamInterface'); + $input->expects($this->once())->method('pause'); + + $parser = new ChunkedDecoder($input); + $parser->pause(); + } + + public function testResumeStream() + { + $input = $this->getMock('React\Stream\ReadableStreamInterface'); + $input->expects($this->once())->method('pause'); + + $parser = new ChunkedDecoder($input); + $parser->pause(); + $parser->resume(); + } + + public function testPipeStream() + { + $dest = $this->getMock('React\Stream\WritableStreamInterface'); + + $ret = $this->parser->pipe($dest); + + $this->assertSame($dest, $ret); + } + + public function testHandleClose() + { + $this->parser->on('close', $this->expectCallableOnce()); + + $this->input->close(); + $this->input->emit('end', array()); + + $this->assertFalse($this->parser->isReadable()); + } + + public function testOutputStreamCanCloseInputStream() + { + $input = new ReadableStream(); + $input->on('close', $this->expectCallableOnce()); + + $stream = new ChunkedDecoder($input); + $stream->on('close', $this->expectCallableOnce()); + + $stream->close(); + + $this->assertFalse($input->isReadable()); + } +} diff --git a/tests/ServerTest.php b/tests/ServerTest.php index b9234521..61623e8b 100644 --- a/tests/ServerTest.php +++ b/tests/ServerTest.php @@ -155,6 +155,221 @@ public function testParserErrorEmitted() $this->connection->expects($this->never())->method('write'); } + public function testBodyDataWillBeSendViaRequestEvent() + { + $server = new Server($this->socket); + + $dataEvent = $this->expectCallableOnceWith('hello'); + $endEvent = $this->expectCallableNever(); + + $server->on('request', function (Request $request, Response $response) use ($dataEvent, $endEvent) { + $request->on('data', $dataEvent); + $request->on('end', $endEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Content-Length: 5\r\n"; + $data .= "\r\n"; + $data .= "hello"; + + $this->connection->emit('data', array($data)); + } + + public function testChunkedEncodedRequestWillBeParsedForRequestEvent() + { + $server = new Server($this->socket); + + $dataEvent = $this->expectCallableOnceWith('hello'); + $endEvent = $this->expectCallableOnce(); + + $server->on('request', function (Request $request, Response $response) use ($dataEvent, $endEvent) { + $request->on('data', $dataEvent); + $request->on('end', $endEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + $data .= "5\r\nhello\r\n"; + $data .= "0\r\n\r\n"; + + $this->connection->emit('data', array($data)); + } + + public function testChunkedEncodedRequestAdditionalDataWontBeEmitted() + { + $server = new Server($this->socket); + + $dataEvent = $this->expectCallableOnceWith('hello'); + $endEvent = $this->expectCallableOnce(); + + $server->on('request', function (Request $request, Response $response) use ($dataEvent, $endEvent) { + $request->on('data', $dataEvent); + $request->on('end', $endEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + $data .= "5\r\nhello\r\n"; + $data .= "0\r\n\r\n"; + $data .= "2\r\nhi\r\n"; + + $this->connection->emit('data', array($data)); + } + + public function testEmptyChunkedEncodedRequest() + { + $server = new Server($this->socket); + + $dataEvent = $this->expectCallableNever(); + $endEvent = $this->expectCallableOnce(); + + $server->on('request', function (Request $request, Response $response) use ($dataEvent, $endEvent) { + $request->on('data', $dataEvent); + $request->on('end', $endEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + $data .= "0\r\n\r\n"; + + $this->connection->emit('data', array($data)); + } + + public function testOneChunkWillBeEmittedDelayed() + { + $server = new Server($this->socket); + + $dataEvent = $this->expectCallableOnceWith('hello'); + $endEvent = $this->expectCallableOnce(); + + $server->on('request', function (Request $request, Response $response) use ($dataEvent, $endEvent) { + $request->on('data', $dataEvent); + $request->on('end', $endEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + $data .= "5\r\nhel"; + + $this->connection->emit('data', array($data)); + + $data = "lo\r\n"; + $data .= "0\r\n\r\n"; + + $this->connection->emit('data', array($data)); + } + + public function testEmitTwoChunksDelayed() + { + $server = new Server($this->socket); + + $dataEvent = $this->expectCallableConsecutive(2, array('hello', 'world')); + $endEvent = $this->expectCallableOnce(); + + $server->on('request', function (Request $request, Response $response) use ($dataEvent, $endEvent) { + $request->on('data', $dataEvent); + $request->on('end', $endEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: chunked\r\n"; + $data .= "\r\n"; + $data .= "5\r\nhello\r\n"; + + $this->connection->emit('data', array($data)); + + $data = "5\r\nworld\r\n"; + $data .= "0\r\n\r\n"; + + $this->connection->emit('data', array($data)); + } + + /** + * All transfer-coding names are case-insensitive according to: + * https://tools.ietf.org/html/rfc7230#section-4 + */ + public function testChunkedIsUpperCase() + { + $server = new Server($this->socket); + + $dataEvent = $this->expectCallableOnceWith('hello'); + $endEvent = $this->expectCallableOnce(); + + $server->on('request', function (Request $request, Response $response) use ($dataEvent, $endEvent) { + $request->on('data', $dataEvent); + $request->on('end', $endEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: CHUNKED\r\n"; + $data .= "\r\n"; + $data .= "5\r\nhello\r\n"; + $data .= "0\r\n\r\n"; + + $this->connection->emit('data', array($data)); + } + + /** + * All transfer-coding names are case-insensitive according to: + * https://tools.ietf.org/html/rfc7230#section-4 + */ + public function testChunkedIsMixedUpperAndLowerCase() + { + $server = new Server($this->socket); + + $dataEvent = $this->expectCallableOnceWith('hello'); + $endEvent = $this->expectCallableOnce(); + + $server->on('request', function (Request $request, Response $response) use ($dataEvent, $endEvent) { + $request->on('data', $dataEvent); + $request->on('end', $endEvent); + }); + + $this->socket->emit('connection', array($this->connection)); + + $data = "GET / HTTP/1.1\r\n"; + $data .= "Host: example.com:80\r\n"; + $data .= "Connection: close\r\n"; + $data .= "Transfer-Encoding: CHunKeD\r\n"; + $data .= "\r\n"; + $data .= "5\r\nhello\r\n"; + $data .= "0\r\n\r\n"; + + $this->connection->emit('data', array($data)); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; diff --git a/tests/TestCase.php b/tests/TestCase.php index 24fe27f2..ee6d3191 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -34,6 +34,31 @@ protected function expectCallableNever() return $mock; } + protected function expectCallableOnceWith($value) + { + $mock = $this->createCallableMock(); + $mock + ->expects($this->once()) + ->method('__invoke') + ->with($this->equalTo($value)); + + return $mock; + } + + protected function expectCallableConsecutive($numberOfCalls, array $with) + { + $mock = $this->createCallableMock(); + + for ($i = 0; $i < $numberOfCalls; $i++) { + $mock + ->expects($this->at($i)) + ->method('__invoke') + ->with($this->equalTo($with[$i])); + } + + return $mock; + } + protected function createCallableMock() { return $this