diff --git a/src/Io/ConnectionUpcaster.php b/src/Io/ConnectionUpcaster.php index 3ce6159..7f0b7e0 100644 --- a/src/Io/ConnectionUpcaster.php +++ b/src/Io/ConnectionUpcaster.php @@ -22,6 +22,7 @@ public function __construct(DuplexStreamInterface $stream) $this->stream = $stream; Util::forwardEvents($stream, $this, array('data', 'end', 'close', 'error', 'drain')); + $this->stream->on('close', array($this, 'close')); } public function isReadable() diff --git a/src/Message/ReadableBodyStream.php b/src/Message/ReadableBodyStream.php index df4e997..a577788 100644 --- a/src/Message/ReadableBodyStream.php +++ b/src/Message/ReadableBodyStream.php @@ -2,10 +2,9 @@ namespace Clue\React\Buzz\Message; +use Evenement\EventEmitter; use Psr\Http\Message\StreamInterface; -use React\Stream\ReadableStream; use React\Stream\ReadableStreamInterface; -use Evenement\EventEmitter; use React\Stream\Util; use React\Stream\WritableStreamInterface; @@ -14,6 +13,9 @@ */ class ReadableBodyStream extends EventEmitter implements ReadableStreamInterface, StreamInterface { + private $input; + private $closed = false; + public function __construct(ReadableStreamInterface $input) { $this->input = $input; @@ -24,19 +26,24 @@ public function __construct(ReadableStreamInterface $input) }); $input->on('error', function ($error) use ($that) { $that->emit('error', array($error, $that)); + $that->close(); }); $input->on('end', function () use ($that) { $that->emit('end', array($that)); - $that->emit('close', array($that)); - }); - $input->on('close', function () use ($that) { - $that->emit('close', array($that)); + $that->close(); }); + $input->on('close', array($that, 'close')); } public function close() { - $this->input->close(); + if (!$this->closed) { + $this->closed = true; + $this->input->close(); + + $this->emit('close', array($this)); + $this->removeAllListeners(); + } } public function isReadable() diff --git a/tests/FunctionalBrowserTest.php b/tests/FunctionalBrowserTest.php index 2b4181b..46bae23 100644 --- a/tests/FunctionalBrowserTest.php +++ b/tests/FunctionalBrowserTest.php @@ -154,6 +154,7 @@ public function testPostString() $this->assertEquals('hello world', $data['data']); } + /** @group online */ public function testPostStreamChunked() { // httpbin used to support `Transfer-Encoding: chunked` for requests, diff --git a/tests/Message/ReadableBodyStreamTest.php b/tests/Message/ReadableBodyStreamTest.php index c4fe2ab..dc23c2a 100644 --- a/tests/Message/ReadableBodyStreamTest.php +++ b/tests/Message/ReadableBodyStreamTest.php @@ -1,6 +1,7 @@ stream->close(); } + public function testCloseWillEmitCloseEvent() + { + $this->input = new ThroughStream(); + $this->stream = new ReadableBodyStream($this->input); + + $called = 0; + $this->stream->on('close', function () use (&$called) { + ++$called; + }); + + $this->stream->close(); + $this->stream->close(); + + $this->assertEquals(1, $called); + } + + public function testCloseInputWillEmitCloseEvent() + { + $this->input = new ThroughStream(); + $this->stream = new ReadableBodyStream($this->input); + + $called = 0; + $this->stream->on('close', function () use (&$called) { + ++$called; + }); + + $this->input->close(); + $this->input->close(); + + $this->assertEquals(1, $called); + } + + public function testEndInputWillEmitCloseEvent() + { + $this->input = new ThroughStream(); + $this->stream = new ReadableBodyStream($this->input); + + $called = 0; + $this->stream->on('close', function () use (&$called) { + ++$called; + }); + + $this->input->end(); + $this->input->end(); + + $this->assertEquals(1, $called); + } + public function testPauseWillPauseInputStream() { $this->input->expects($this->once())->method('pause');