Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 36 additions & 33 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use React\Stream\ReadableStreamInterface;
use React\Stream\WritableStreamInterface;
use React\Stream\Util;
use Psr\Http\Message\RequestInterface;

/**
* The `Request` class is responsible for streaming the incoming request body
Expand All @@ -24,11 +25,8 @@
class Request extends EventEmitter implements ReadableStreamInterface
{
private $readable = true;
private $method;
private $path;
private $query;
private $httpVersion;
private $headers;
private $request;
private $stream;

// metadata, implicitly added externally
public $remoteAddress;
Expand All @@ -42,13 +40,23 @@ class Request extends EventEmitter implements ReadableStreamInterface
*
* @internal
*/
public function __construct($method, $path, $query = array(), $httpVersion = '1.1', $headers = array())
public function __construct(RequestInterface $request, ReadableStreamInterface $stream)
{
$this->method = $method;
$this->path = $path;
$this->query = $query;
$this->httpVersion = $httpVersion;
$this->headers = $headers;
$this->request = $request;
$this->stream = $stream;

$that = $this;
// forward data and end events from body stream to request
$stream->on('data', function ($data) use ($that) {
$that->emit('data', array($data));
});
$stream->on('end', function () use ($that) {
$that->emit('end');
});
$stream->on('error', function ($error) use ($that) {
$that->emit('error', array($error));
});
$stream->on('close', array($this, 'close'));
}

/**
Expand All @@ -58,7 +66,7 @@ public function __construct($method, $path, $query = array(), $httpVersion = '1.
*/
public function getMethod()
{
return $this->method;
return $this->request->getMethod();
}

/**
Expand All @@ -68,7 +76,7 @@ public function getMethod()
*/
public function getPath()
{
return $this->path;
return $this->request->getUri()->getPath();
}

/**
Expand All @@ -78,7 +86,10 @@ public function getPath()
*/
public function getQueryParams()
{
return $this->query;
$params = array();
parse_str($this->request->getUri()->getQuery(), $params);

return $params;
}

/**
Expand All @@ -88,7 +99,7 @@ public function getQueryParams()
*/
public function getProtocolVersion()
{
return $this->httpVersion;
return $this->request->getProtocolVersion();
}

/**
Expand All @@ -102,7 +113,7 @@ public function getProtocolVersion()
*/
public function getHeaders()
{
return $this->headers;
return $this->request->getHeaders();
}

/**
Expand All @@ -113,18 +124,7 @@ public function getHeaders()
*/
public function getHeader($name)
{
$found = array();

$name = strtolower($name);
foreach ($this->headers as $key => $value) {
if (strtolower($key) === $name) {
foreach((array)$value as $one) {
$found[] = $one;
}
}
}

return $found;
return $this->request->getHeader($name);
}

/**
Expand All @@ -135,7 +135,7 @@ public function getHeader($name)
*/
public function getHeaderLine($name)
{
return implode(', ', $this->getHeader($name));
return $this->request->getHeaderLine($name);
}

/**
Expand All @@ -146,7 +146,7 @@ public function getHeaderLine($name)
*/
public function hasHeader($name)
{
return !!$this->getHeader($name);
return $this->request->hasHeader($name);
}

/**
Expand All @@ -164,7 +164,7 @@ public function hasHeader($name)
*/
public function expectsContinue()
{
return $this->httpVersion !== '1.0' && '100-continue' === strtolower($this->getHeaderLine('Expect'));
return $this->getProtocolVersion() !== '1.0' && '100-continue' === strtolower($this->getHeaderLine('Expect'));
}

public function isReadable()
Expand All @@ -178,7 +178,7 @@ public function pause()
return;
}

$this->emit('pause');
$this->stream->pause();
}

public function resume()
Expand All @@ -187,7 +187,7 @@ public function resume()
return;
}

$this->emit('resume');
$this->stream->resume();
}

public function close()
Expand All @@ -196,7 +196,10 @@ public function close()
return;
}

// request closed => stop reading from the stream by pausing it
$this->readable = false;
$this->stream->pause();

$this->emit('close');
$this->removeAllListeners();
}
Expand Down
16 changes: 1 addition & 15 deletions src/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,7 @@ private function parseRequest($data)
{
list($headers, $bodyBuffer) = explode("\r\n\r\n", $data, 2);

$psrRequest = g7\parse_request($headers);

$parsedQuery = array();
$queryString = $psrRequest->getUri()->getQuery();
if ($queryString) {
parse_str($queryString, $parsedQuery);
}

$request = new Request(
$psrRequest->getMethod(),
$psrRequest->getUri()->getPath(),
$parsedQuery,
$psrRequest->getProtocolVersion(),
$psrRequest->getHeaders()
);
$request = g7\parse_request($headers);

return array($request, $bodyBuffer);
}
Expand Down
33 changes: 9 additions & 24 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Evenement\EventEmitter;
use React\Socket\ServerInterface as SocketServerInterface;
use React\Socket\ConnectionInterface;
use Psr\Http\Message\RequestInterface;

/**
* The `Server` class is responsible for handling incoming connections and then
Expand Down Expand Up @@ -87,7 +88,7 @@ public function handleConnection(ConnectionInterface $conn)
$that = $this;
$parser = new RequestHeaderParser();
$listener = array($parser, 'feed');
$parser->on('headers', function (Request $request, $bodyBuffer) use ($conn, $listener, $parser, $that) {
$parser->on('headers', function (RequestInterface $request, $bodyBuffer) use ($conn, $listener, $parser, $that) {
// parsing request completed => stop feeding parser
$conn->removeListener('data', $listener);

Expand All @@ -111,7 +112,7 @@ public function handleConnection(ConnectionInterface $conn)
}

/** @internal */
public function handleRequest(ConnectionInterface $conn, Request $request)
public function handleRequest(ConnectionInterface $conn, RequestInterface $request)
{
// only support HTTP/1.1 and HTTP/1.0 requests
if ($request->getProtocolVersion() !== '1.1' && $request->getProtocolVersion() !== '1.0') {
Expand All @@ -138,13 +139,6 @@ public function handleRequest(ConnectionInterface $conn, Request $request)
}

$response = new Response($conn, $request->getProtocolVersion());
$response->on('close', array($request, 'close'));

// attach remote ip to the request as metadata
$request->remoteAddress = trim(
parse_url('tcp://' . $conn->getRemoteAddress(), PHP_URL_HOST),
'[]'
);

$stream = $conn;
if ($request->hasHeader('Transfer-Encoding')) {
Expand All @@ -155,22 +149,13 @@ public function handleRequest(ConnectionInterface $conn, Request $request)
}
}

// forward pause/resume calls to underlying connection
$request->on('pause', array($conn, 'pause'));
$request->on('resume', array($conn, 'resume'));

// request closed => stop reading from the stream by pausing it
// stream closed => close request
$request->on('close', array($stream, 'pause'));
$stream->on('close', array($request, 'close'));
$request = new Request($request, $stream);

// forward data and end events from body stream to request
$stream->on('end', function() use ($request) {
$request->emit('end');
});
$stream->on('data', function ($data) use ($request) {
$request->emit('data', array($data));
});
// attach remote ip to the request as metadata
$request->remoteAddress = trim(
parse_url('tcp://' . $conn->getRemoteAddress(), PHP_URL_HOST),
'[]'
);

$this->emit('request', array($request, $response));
}
Expand Down
10 changes: 4 additions & 6 deletions tests/RequestHeaderParserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ public function testHeadersEventShouldReturnRequestAndBodyBuffer()
$data .= 'RANDOM DATA';
$parser->feed($data);

$this->assertInstanceOf('React\Http\Request', $request);
$this->assertInstanceOf('Psr\Http\Message\RequestInterface', $request);
$this->assertSame('GET', $request->getMethod());
$this->assertSame('/', $request->getPath());
$this->assertSame(array(), $request->getQueryParams());
$this->assertEquals('http://example.com/', $request->getUri());
$this->assertSame('1.1', $request->getProtocolVersion());
$this->assertSame(array('Host' => array('example.com:80'), 'Connection' => array('close')), $request->getHeaders());

Expand Down Expand Up @@ -83,10 +82,9 @@ public function testHeadersEventShouldParsePathAndQueryString()
$data = $this->createAdvancedPostRequest();
$parser->feed($data);

$this->assertInstanceOf('React\Http\Request', $request);
$this->assertInstanceOf('Psr\Http\Message\RequestInterface', $request);
$this->assertSame('POST', $request->getMethod());
$this->assertSame('/foo', $request->getPath());
$this->assertSame(array('bar' => 'baz'), $request->getQueryParams());
$this->assertEquals('http://example.com/foo?bar=baz', $request->getUri());
$this->assertSame('1.1', $request->getProtocolVersion());
$headers = array(
'Host' => array('example.com:80'),
Expand Down
Loading