Skip to content
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"guzzlehttp/psr7": "^1.0",
"react/socket": "^0.4",
"react/stream": "^0.4",
"evenement/evenement": "^2.0"
"evenement/evenement": "^2.0",
"react/promise": "^2.2"
},
"autoload": {
"psr-4": {
Expand Down
113 changes: 113 additions & 0 deletions src/StreamingBodyParser/ContentLengthBufferedSink.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
<?php

namespace React\Http\StreamingBodyParser;

use React\Http\Request;
use React\Promise\Deferred;
use React\Promise\ExtendedPromiseInterface;

/**
* Buffer the data coming in from a request until the specified length is reached.
* Or until the promise is canceled.
*
* @internal
*/
class ContentLengthBufferedSink
{
/**
* @var Deferred
*/
private $deferred;

/**
* @var Request
*/
private $request;

/**
* @var string
*/
private $buffer = '';

/**
* @var int
*/
private $length;

/**
* @param Request $request
* @param int $length
* @return ExtendedPromiseInterface
*/
public static function createPromise(Request $request, $length)
{
return (new static($request, $length))->getDeferred()->promise();
}

/**
* @param Request $request
* @param int $length
*/
protected function __construct(Request $request, $length)
{
$this->deferred = new Deferred(function (callable $resolve) {
$this->request->removeListener('data', [$this, 'feed']);
$this->request->removeListener('end', [$this, 'finish']);

$resolve($this->buffer);
});
$this->request = $request;
$this->length = (int)$length;
$this->request->on('data', [$this, 'feed']);
$this->request->on('end', [$this, 'finish']);
$this->check();
}

/**
* @param string $data
*
* @internal
*/
public function feed($data)
{
$this->buffer .= $data;

$this->check();
}

/**
* @internal
*/
public function finish()
{
$this->resolve();
}

/**
* Check if we reached the expected length and when so resolve promise
*/
protected function check()
{
if (strlen($this->buffer) >= $this->length) {
$this->resolve();
}
}

protected function resolve()
{
if (strlen($this->buffer) > $this->length) {
$this->buffer = substr($this->buffer, 0, $this->length);
}
$this->request->removeListener('data', [$this, 'feed']);
$this->request->removeListener('end', [$this, 'finish']);
$this->deferred->resolve($this->buffer);
}

/**
* @return Deferred
*/
protected function getDeferred()
{
return $this->deferred;
}
}
64 changes: 64 additions & 0 deletions tests/StreamingBodyParser/ContentLengthBufferedSinkTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

namespace React\Tests\Http\StreamingBodyParser;

use React\Http\StreamingBodyParser\ContentLengthBufferedSink;
use React\Http\Request;
use React\Tests\Http\TestCase;

class ContentLengthBufferedSinkTest extends TestCase
{
public function testCreatePromise()
{
$expectedBuffer = '0123456789';
$catchedBuffer = '';
$length = 10;
$request = new Request('GET', 'http://example.com/');
ContentLengthBufferedSink::createPromise($request, $length)->then(function ($buffer) use (&$catchedBuffer) {
$catchedBuffer = $buffer;
});
$request->emit('data', ['012345678']);
$request->emit('data', ['90123456789']);
$this->assertSame($expectedBuffer, $catchedBuffer);
}

public function testCancelPromise()
{
$expectedBuffer = '012345678';
$catchedBuffer = '';
$length = 10;
$request = new Request('GET', 'http://example.com/');
$promise = ContentLengthBufferedSink::createPromise($request, $length)->then(function ($buffer) use (&$catchedBuffer) {
$catchedBuffer = $buffer;
});
$request->emit('data', ['012345678']);
$promise->cancel();
$request->emit('data', ['90123456789']);
$this->assertSame($expectedBuffer, $catchedBuffer);
}

public function testRequestEnd()
{
$expectedBuffer = '012345678';
$catchedBuffer = '';
$length = 10;
$request = new Request('GET', 'http://example.com/');
ContentLengthBufferedSink::createPromise($request, $length)->then(function ($buffer) use (&$catchedBuffer) {
$catchedBuffer = $buffer;
});
$request->emit('data', ['012345678']);
$request->close();
$request->emit('data', ['90123456789']);
$this->assertSame($expectedBuffer, $catchedBuffer);
}

public function testZeroLengthBuffer()
{
$catchedBuffer = null;
$request = new Request('GET', 'http://example.com/');
ContentLengthBufferedSink::createPromise($request, 0)->then(function ($buffer) use (&$catchedBuffer) {
$catchedBuffer = $buffer;
});
$this->assertSame('', $catchedBuffer);
}
}