Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ This component depends on `événement`, which is an implementation of the

* `data`: Emitted whenever data was read from the source
with a single mixed argument for incoming data.
* `end`: Emitted when the source has reached the `eof`.
* `end`: Emitted when the source has successfully reached the end
of the stream (EOF).
This event will only be emitted if the *end* was reached successfully, not
if the stream was interrupted due to an error or explicitly closed.
Also note that not all streams know the concept of a "successful end".
* `error`: Emitted when an error occurs
with a single `Exception` argument for error instance.
* `close`: Emitted when the connection is closed.
* `close`: Emitted when the stream is closed.

### Methods

* `isReadable()`: Check if the stream is still in a state allowing it to be
read from. It becomes unreadable when the connection ends, closes or an
read from. It becomes unreadable when the stream ends, closes or an
error occurs.
* `pause()`: Remove the data source file descriptor from the event loop. This
allows you to throttle incoming data.
Expand All @@ -46,7 +50,7 @@ This component depends on `événement`, which is an implementation of the
to accept more data.
* `error`: Emitted whenever an error occurs
with a single `Exception` argument for error instance.
* `close`: Emitted whenever the connection is closed.
* `close`: Emitted whenever the stream is closed.
* `pipe`: Emitted whenever a readable stream is `pipe()`d into this stream
with a single `ReadableStreamInterface` argument for source stream.

Expand Down
1 change: 1 addition & 0 deletions src/BufferedSink.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public function __construct()
public function handlePipeEvent($source)
{
Util::forwardEvents($source, $this, array('error'));
$source->on('close', array($this, 'close'));
}

public function handleErrorEvent($e)
Expand Down
1 change: 0 additions & 1 deletion src/ReadableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public function close()
}

$this->closed = true;
$this->emit('end');
$this->emit('close');
$this->removeAllListeners();
}
Expand Down
9 changes: 4 additions & 5 deletions src/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public function close()
$this->readable = false;
$this->writable = false;

$this->emit('end');
$this->emit('close');
$this->loop->removeStream($this->stream);
$this->buffer->close();
Expand Down Expand Up @@ -171,10 +170,10 @@ public function handleData($stream)

if ($data !== '') {
$this->emit('data', array($data));
}

if (!is_resource($stream) || feof($stream)) {
$this->end();
} else{
// no data read => we reached the end and close the stream
$this->emit('end');
$this->close();
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/ThroughStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public function end($data = null)
$this->readable->emit('data', array($this->filter($data)));
}

$this->writable->end($data);
$this->readable->emit('end');

$this->writable->end();
}
}
1 change: 0 additions & 1 deletion src/WritableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public function close()
}

$this->closed = true;
$this->emit('end');
$this->emit('close');
$this->removeAllListeners();
}
Expand Down
10 changes: 10 additions & 0 deletions tests/ReadableStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ public function closeShouldClose()
$this->assertFalse($readable->isReadable());
}

/** @test */
public function closeShouldEmitCloseEvent()
{
$readable = new ReadableStream();
$readable->on('close', $this->expectCallableOnce());
$readable->on('end', $this->expectCallableNever());

$readable->close();
}

/** @test */
public function doubleCloseShouldWork()
{
Expand Down
16 changes: 15 additions & 1 deletion tests/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ public function testConstructorAcceptsBuffer()
$this->assertSame($buffer, $conn->getBuffer());
}

public function testCloseShouldEmitCloseEvent()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createLoopMock();

$conn = new Stream($stream, $loop);
$conn->on('close', $this->expectCallableOnce());
$conn->on('end', $this->expectCallableNever());

$conn->close();

$this->assertFalse($conn->isReadable());
}

/**
* @covers React\Stream\Stream::__construct
* @covers React\Stream\Stream::handleData
Expand Down Expand Up @@ -114,7 +128,7 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi

$conn->handleData($stream);

$this->assertFalse($conn->isReadable());
$this->assertTrue($conn->isReadable());
$this->assertEquals(100000, strlen($capturedData));
}

Expand Down
10 changes: 10 additions & 0 deletions tests/ThroughStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ public function pipingStuffIntoItShouldWork()
$readable->emit('data', array('foo'));
}

/** @test */
public function endShouldEmitEndAndClose()
{
$through = new ThroughStream();
$through->on('data', $this->expectCallableNever());
$through->on('end', $this->expectCallableOnce());
$through->on('close', $this->expectCallableOnce());
$through->end();
}

/** @test */
public function endShouldCloseTheStream()
{
Expand Down
10 changes: 10 additions & 0 deletions tests/WritableStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public function closeShouldClose()
$this->assertFalse($through->isWritable());
}

/** @test */
public function closeShouldEmitCloseEvent()
{
$through = new WritableStream();
$through->on('close', $this->expectCallableOnce());
$through->on('end', $this->expectCallableNever());

$through->close();
}

/** @test */
public function doubleCloseShouldWork()
{
Expand Down