diff --git a/src/Stream.php b/src/Stream.php index 56ed511..a23a4a8 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -99,7 +99,7 @@ public function resume() public function write($data) { if (!$this->writable) { - return; + return false; } return $this->buffer->write($data); diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 93db37f..2b9a9be 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -4,6 +4,8 @@ class ThroughStream extends CompositeStream { + private $paused = false; + public function __construct() { $readable = new ReadableStream(); @@ -17,19 +19,35 @@ public function filter($data) return $data; } + public function pause() + { + parent::pause(); + $this->paused = true; + } + + public function resume() + { + parent::resume(); + $this->paused = false; + } + public function write($data) { - if (!$this->writable) { + if (!$this->writable->isWritable()) { return false; } $this->readable->emit('data', array($this->filter($data))); - return true; + return $this->writable->isWritable() && !$this->paused; } public function end($data = null) { + if (!$this->writable->isWritable()) { + return; + } + if (null !== $data) { $this->readable->emit('data', array($this->filter($data))); } diff --git a/tests/BufferedSinkTest.php b/tests/BufferedSinkTest.php index 527bb39..d5ebcfb 100644 --- a/tests/BufferedSinkTest.php +++ b/tests/BufferedSinkTest.php @@ -133,6 +133,15 @@ public function writeShouldTriggerProgressOnPromise() $sink->end('baz'); } + /** @test */ + public function writeAfterEndShouldReturnFalse() + { + $sink = new BufferedSink(); + $sink->end(); + + $this->assertFalse($sink->write('foo')); + } + /** @test */ public function forwardedErrorsFromPipeShouldRejectPromise() { diff --git a/tests/CompositeStreamTest.php b/tests/CompositeStreamTest.php index ceaeee9..f802a87 100644 --- a/tests/CompositeStreamTest.php +++ b/tests/CompositeStreamTest.php @@ -125,6 +125,44 @@ public function itShouldHandlePipingCorrectly() $input->emit('data', array('foo')); } + /** @test */ + public function itShouldForwardPauseUpstreamWhenPipedTo() + { + $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable->expects($this->any())->method('isReadable')->willReturn(true); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable->expects($this->any())->method('isWritable')->willReturn(true); + + $composite = new CompositeStream($readable, $writable); + + $input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause', 'resume'))->getMock(); + $input + ->expects($this->once()) + ->method('pause'); + + $input->pipe($composite); + $composite->pause(); + } + + /** @test */ + public function itShouldForwardResumeUpstreamWhenPipedTo() + { + $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable->expects($this->any())->method('isReadable')->willReturn(true); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable->expects($this->any())->method('isWritable')->willReturn(true); + + $composite = new CompositeStream($readable, $writable); + + $input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause', 'resume'))->getMock(); + $input + ->expects($this->once()) + ->method('resume'); + + $input->pipe($composite); + $composite->resume(); + } + /** @test */ public function itShouldForwardPauseAndResumeUpstreamWhenPipedTo() { diff --git a/tests/StreamTest.php b/tests/StreamTest.php index b2fe2e9..b6d7d1d 100644 --- a/tests/StreamTest.php +++ b/tests/StreamTest.php @@ -57,6 +57,32 @@ public function testCloseShouldEmitCloseEvent() $this->assertFalse($conn->isReadable()); } + public function testEndShouldEndBuffer() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + + $buffer = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $buffer->expects($this->once())->method('end')->with('foo'); + + $conn = new Stream($stream, $loop, $buffer); + $conn->end('foo'); + } + + + public function testEndAfterCloseIsNoOp() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + + $buffer = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $buffer->expects($this->never())->method('end'); + + $conn = new Stream($stream, $loop); + $conn->close(); + $conn->end(); + } + /** * @covers React\Stream\Stream::__construct * @covers React\Stream\Stream::handleData @@ -193,7 +219,7 @@ public function testEndedStreamsShouldNotWrite() $stream = fopen($file, 'r'); $this->assertSame("foo\n", fgets($stream)); - $this->assertNull($res); + $this->assertFalse($res); unlink($file); } diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index d353c60..b20100e 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -62,6 +62,53 @@ public function endShouldWriteDataBeforeClosing() $this->assertFalse($through->isWritable()); } + /** @test */ + public function endTwiceShouldOnlyEmitOnce() + { + $through = new ThroughStream(); + $through->on('data', $this->expectCallableOnce('first')); + $through->end('first'); + $through->end('ignored'); + } + + /** @test */ + public function writeAfterEndShouldReturnFalse() + { + $through = new ThroughStream(); + $through->on('data', $this->expectCallableNever()); + $through->end(); + + $this->assertFalse($through->write('foo')); + } + + /** @test */ + public function writeDataWillCloseStreamShouldReturnFalse() + { + $through = new ThroughStream(); + $through->on('data', array($through, 'close')); + + $this->assertFalse($through->write('foo')); + } + + /** @test */ + public function writeDataToPausedShouldReturnFalse() + { + $through = new ThroughStream(); + $through->pause(); + + $this->assertFalse($through->write('foo')); + } + + /** @test */ + public function writeDataToResumedShouldReturnTrue() + { + $through = new ThroughStream(); + $through->pause(); + $through->resume(); + + $this->assertTrue($through->write('foo')); + } + /** @test */ public function itShouldBeReadableByDefault() {