diff --git a/src/Server.php b/src/Server.php index 4c7eaf9f..0a623c95 100644 --- a/src/Server.php +++ b/src/Server.php @@ -33,8 +33,6 @@ */ class Server extends EventEmitter { - private $io; - /** * Creates a HTTP server that accepts connections from the given socket. * @@ -65,49 +63,36 @@ class Server extends EventEmitter */ public function __construct(SocketServerInterface $io) { - $this->io = $io; - $that = $this; - - $this->io->on('connection', function (ConnectionInterface $conn) use ($that) { - // TODO: http 1.1 keep-alive - // TODO: chunked transfer encoding (also for outgoing data) - // TODO: multipart parsing - - $parser = new RequestHeaderParser(); - $parser->on('headers', function (Request $request, $bodyBuffer) use ($conn, $parser, $that) { - // attach remote ip to the request as metadata - $request->remoteAddress = trim( - parse_url('tcp://' . $conn->getRemoteAddress(), PHP_URL_HOST), - '[]' - ); + $io->on('connection', array($this, 'handleConnection')); + } - // forward pause/resume calls to underlying connection - $request->on('pause', array($conn, 'pause')); - $request->on('resume', array($conn, 'resume')); + /** @internal */ + public function handleConnection(ConnectionInterface $conn) + { + $that = $this; + $parser = new RequestHeaderParser(); + $listener = array($parser, 'feed'); + $parser->on('headers', function (Request $request, $bodyBuffer) use ($conn, $listener, $parser, $that) { + // parsing request completed => stop feeding parser + $conn->removeListener('data', $listener); - $that->handleRequest($conn, $request, $bodyBuffer); + $that->handleRequest($conn, $request); - $conn->removeListener('data', array($parser, 'feed')); - $conn->on('end', function () use ($request) { - $request->emit('end'); - }); - $conn->on('data', function ($data) use ($request) { - $request->emit('data', array($data)); - }); - }); + if ($bodyBuffer !== '') { + $request->emit('data', array($bodyBuffer)); + } + }); - $listener = array($parser, 'feed'); - $conn->on('data', $listener); - $parser->on('error', function() use ($conn, $listener, $that) { - // TODO: return 400 response - $conn->removeListener('data', $listener); - $that->emit('error', func_get_args()); - }); + $conn->on('data', $listener); + $parser->on('error', function() use ($conn, $listener, $that) { + // TODO: return 400 response + $conn->removeListener('data', $listener); + $that->emit('error', func_get_args()); }); } /** @internal */ - public function handleRequest(ConnectionInterface $conn, Request $request, $bodyBuffer) + public function handleRequest(ConnectionInterface $conn, Request $request) { $response = new Response($conn); $response->on('close', array($request, 'close')); @@ -118,10 +103,30 @@ public function handleRequest(ConnectionInterface $conn, Request $request, $body return; } - $this->emit('request', array($request, $response)); + // attach remote ip to the request as metadata + $request->remoteAddress = trim( + parse_url('tcp://' . $conn->getRemoteAddress(), PHP_URL_HOST), + '[]' + ); - if ($bodyBuffer !== '') { - $request->emit('data', array($bodyBuffer)); - } + // forward pause/resume calls to underlying connection + $request->on('pause', array($conn, 'pause')); + $request->on('resume', array($conn, 'resume')); + + // closing the request currently emits an "end" event + // stop reading from the connection by pausing it + $request->on('end', function () use ($conn) { + $conn->pause(); + }); + + // forward connection events to request + $conn->on('end', function () use ($request) { + $request->emit('end'); + }); + $conn->on('data', function ($data) use ($request) { + $request->emit('data', array($data)); + }); + + $this->emit('request', array($request, $response)); } } diff --git a/tests/ServerTest.php b/tests/ServerTest.php index 0b934da3..90a7517d 100644 --- a/tests/ServerTest.php +++ b/tests/ServerTest.php @@ -117,6 +117,20 @@ public function testRequestResumeWillbeForwardedToConnection() $this->connection->emit('data', array($data)); } + public function testRequestCloseWillPauseConnection() + { + $server = new Server($this->socket); + $server->on('request', function (Request $request) { + $request->close(); + }); + + $this->connection->expects($this->once())->method('pause'); + $this->socket->emit('connection', array($this->connection)); + + $data = $this->createGetRequest(); + $this->connection->emit('data', array($data)); + } + public function testRequestEventWithoutBodyWillNotEmitData() { $never = $this->expectCallableNever();