Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ $http->on('error', function (Exception $e) {
});
```

An `error` event will be emitted for the `Request` if the validation of the body data fails.
This can be e.g. invalid chunked decoded data or an unexpected `end` event.

```php
$http->on('request', function (Request $request, Response $response) {
$request->on('error', function (\Exception $error) {
echo $error->getMessage();
});
});
```

Such an error will `pause` the connection instead of closing it. A response message
can still be sent.

### Request

The `Request` class is responsible for streaming the incoming request body
Expand Down
101 changes: 101 additions & 0 deletions src/CloseProtectionStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

namespace React\Http;

use Evenement\EventEmitter;
use React\Stream\ReadableStreamInterface;
use React\Stream\WritableStreamInterface;
use React\Stream\Util;

/** @internal
* This stream is used to protect the passed stream against closing.
* */
class CloseProtectionStream extends EventEmitter implements ReadableStreamInterface
{
private $connection;
private $closed = false;

/**
* @param ReadableStreamInterface $input stream that will be paused instead of closed on an 'close' event.
*/
public function __construct(ReadableStreamInterface $input)
{
$this->input = $input;

$this->input->on('data', array($this, 'handleData'));
$this->input->on('end', array($this, 'handleEnd'));
$this->input->on('error', array($this, 'handleError'));
$this->input->on('close', array($this, 'close'));
}

public function isReadable()
{
return !$this->closed && $this->input->isReadable();
}

public function pause()
{
if ($this->closed) {
return;
}

$this->input->pause();
}

public function resume()
{
if ($this->closed) {
return;
}

$this->input->resume();
}

public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);

return $dest;
}

public function close()
{
if ($this->closed) {
return;
}

$this->closed = true;

$this->emit('close');

// 'pause' the stream avoids additional traffic transferred by this stream
$this->input->pause();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this covered, but what happens if I call this:

$protection->close();
$protection->resume();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, this wasn't handled correctly. Fixed it and added some tests for it.


$this->input->removeListener('data', array($this, 'handleData'));
$this->input->removeListener('error', array($this, 'handleError'));
$this->input->removeListener('end', array($this, 'handleEnd'));
$this->input->removeListener('close', array($this, 'close'));

$this->removeAllListeners();
}

/** @internal */
public function handleData($data)
{
$this->emit('data', array($data));
}

/** @internal */
public function handleEnd()
{
$this->emit('end');
$this->close();
}

/** @internal */
public function handleError(\Exception $e)
{
$this->emit('error', array($e));
}

}
3 changes: 1 addition & 2 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,8 @@ public function close()
return;
}

// request closed => stop reading from the stream by pausing it
$this->readable = false;
$this->stream->pause();
$this->stream->close();

$this->emit('close');
$this->removeAllListeners();
Expand Down
6 changes: 3 additions & 3 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ public function handleRequest(ConnectionInterface $conn, RequestInterface $reque

$response = new Response($conn, $request->getProtocolVersion());

$stream = $conn;
$stream = new CloseProtectionStream($conn);
if ($request->hasHeader('Transfer-Encoding')) {
$transferEncodingHeader = $request->getHeader('Transfer-Encoding');
// 'chunked' must always be the final value of 'Transfer-Encoding' according to: https://tools.ietf.org/html/rfc7230#section-3.3.1
if (strtolower(end($transferEncodingHeader)) === 'chunked') {
$stream = new ChunkedDecoder($conn);
$stream = new ChunkedDecoder($stream);
}
} elseif ($request->hasHeader('Content-Length')) {
$string = $request->getHeaderLine('Content-Length');
Expand All @@ -157,7 +157,7 @@ public function handleRequest(ConnectionInterface $conn, RequestInterface $reque
return $this->writeError($conn, 400);
}

$stream = new LengthLimitedStream($conn, $contentLength);
$stream = new LengthLimitedStream($stream, $contentLength);
}

$request = new Request($request, $stream);
Expand Down
146 changes: 146 additions & 0 deletions tests/CloseProtectionStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
<?php

namespace React\Tests\Http;

use React\Http\CloseProtectionStream;
use React\Stream\ReadableStream;

class CloseProtectionStreamTest extends TestCase
{
public function testClosePausesTheInputStreamInsteadOfClosing()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->disableOriginalConstructor()->getMock();
$input->expects($this->once())->method('pause');
$input->expects($this->never())->method('close');

$protection = new CloseProtectionStream($input);
$protection->close();
}

public function testErrorWontCloseStream()
{
$input = new ReadableStream();

$protection = new CloseProtectionStream($input);
$protection->on('error', $this->expectCallableOnce());
$protection->on('close', $this->expectCallableNever());

$input->emit('error', array(new \RuntimeException()));

$this->assertTrue($protection->isReadable());
$this->assertTrue($input->isReadable());
}

public function testResumeStreamWillResumeInputStream()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$input->expects($this->once())->method('pause');
$input->expects($this->once())->method('resume');

$protection = new CloseProtectionStream($input);
$protection->pause();
$protection->resume();
}

public function testInputStreamIsNotReadableAfterClose()
{
$input = new ReadableStream();

$protection = new CloseProtectionStream($input);
$protection->on('close', $this->expectCallableOnce());

$input->close();

$this->assertFalse($protection->isReadable());
$this->assertFalse($input->isReadable());
}

public function testPipeStream()
{
$input = new ReadableStream();

$protection = new CloseProtectionStream($input);
$dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();

$ret = $protection->pipe($dest);

$this->assertSame($dest, $ret);
}

public function testStopEmittingDataAfterClose()
{
$input = new ReadableStream();

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());

$protection->on('close', $this->expectCallableOnce());

$protection->close();

$input->emit('data', array('hello'));

$this->assertFalse($protection->isReadable());
$this->assertTrue($input->isReadable());
}

public function testErrorIsNeverCalledAfterClose()
{
$input = new ReadableStream();

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
$protection->on('error', $this->expectCallableNever());
$protection->on('close', $this->expectCallableOnce());

$protection->close();

$input->emit('error', array(new \Exception()));

$this->assertFalse($protection->isReadable());
$this->assertTrue($input->isReadable());
}

public function testEndWontBeEmittedAfterClose()
{
$input = new ReadableStream();

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
$protection->on('close', $this->expectCallableOnce());

$protection->close();

$input->emit('end', array());

$this->assertFalse($protection->isReadable());
$this->assertTrue($input->isReadable());
}

public function testPauseAfterCloseHasNoEffect()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$input->expects($this->once())->method('pause');

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
$protection->on('close', $this->expectCallableOnce());

$protection->close();
$protection->pause();
}

public function testResumeAfterCloseHasNoEffect()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$input->expects($this->once())->method('pause');
$input->expects($this->never())->method('resume');

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
$protection->on('close', $this->expectCallableOnce());

$protection->close();
$protection->resume();
}
}
5 changes: 2 additions & 3 deletions tests/RequestTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ public function testCloseMultipleTimesEmitsCloseEventOnce()
$request->close();
}

public function testCloseWillPauseUnderlyingStream()
public function testCloseWillCloseUnderlyingStream()
{
$this->stream->expects($this->once())->method('pause');
$this->stream->expects($this->never())->method('close');
$this->stream->expects($this->once())->method('close');

$request = new Request(new Psr('GET', '/'), $this->stream);

Expand Down
Loading