diff --git a/README.md b/README.md index dce4b60..efcc90e 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ descriptor based implementation with an in-memory write buffer. * [WritableResourceStream](#writableresourcestream) * [DuplexResourceStream](#duplexresourcestream) * [ThroughStream](#throughstream) + * [CompositeStream](#compositestream) * [Usage](#usage) * [Install](#install) * [Tests](#tests) @@ -1010,6 +1011,44 @@ $through->on('data', $this->expectCallableNever())); $through->write(2); ``` +### CompositeStream + +The `CompositeStream` implements the +[`DuplexStreamInterface`](#duplexstreaminterface) and can be used to create a +single duplex stream from two individual streams implementing +[`ReadableStreamInterface`](#readablestreaminterface) and +[`WritableStreamInterface`](#writablestreaminterface) respectively. + +This is useful for some APIs which may require a single +[`DuplexStreamInterface`](#duplexstreaminterface) or simply because it's often +more convenient to work with a single stream instance like this: + +```php +$stdin = new ReadableStreamResource(STDIN, $loop); +$stdout = new WritableStreamResource(STDOUT, $loop); + +$stdio = new CompositeStream($stdin, $stdout); + +$stdio->on('data', function ($chunk) use ($stdio) { + $stdio->write('You said: ' . $chunk); +}); +``` + +This is a well-behaving stream which forwards all stream events from the +underlying streams and forwards all streams calls to the underlying streams. + +If you `write()` to the duplex stream, it will simply `write()` to the +writable side and return its status. + +If you `end()` the duplex stream, it will `end()` the writable side and will +`pause()` the readable side. + +If you `close()` the duplex stream, both input streams will be closed. +If either of the two input streams emits a `close` event, the duplex stream +will also close. +If either of the two input streams is already closed while constructing the +duplex stream, it will `close()` the other side and return a closed stream. + ## Usage ```php $loop = React\EventLoop\Factory::create(); diff --git a/src/CompositeStream.php b/src/CompositeStream.php index 084385b..d9253e4 100644 --- a/src/CompositeStream.php +++ b/src/CompositeStream.php @@ -15,6 +15,10 @@ public function __construct(ReadableStreamInterface $readable, WritableStreamInt $this->readable = $readable; $this->writable = $writable; + if (!$readable->isReadable() || !$writable->isWritable()) { + return $this->close(); + } + Util::forwardEvents($this->readable, $this, array('data', 'end', 'error')); Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe')); diff --git a/tests/CompositeStreamTest.php b/tests/CompositeStreamTest.php index d1c9257..4b4b2f4 100644 --- a/tests/CompositeStreamTest.php +++ b/tests/CompositeStreamTest.php @@ -10,18 +10,68 @@ */ class CompositeStreamTest extends TestCase { + /** @test */ + public function itShouldCloseReadableIfNotWritable() + { + $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable + ->expects($this->once()) + ->method('isReadable') + ->willReturn(true); + $readable + ->expects($this->once()) + ->method('close'); + + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable + ->expects($this->once()) + ->method('isWritable') + ->willReturn(false); + + $composite = new CompositeStream($readable, $writable); + + $composite->on('close', $this->expectCallableNever()); + $composite->close(); + } + + /** @test */ + public function itShouldCloseWritableIfNotReadable() + { + $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable + ->expects($this->once()) + ->method('isReadable') + ->willReturn(false); + + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable + ->expects($this->once()) + ->method('close'); + + $composite = new CompositeStream($readable, $writable); + + $composite->on('close', $this->expectCallableNever()); + $composite->close(); + } + /** @test */ public function itShouldForwardWritableCallsToWritableStream() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable + ->expects($this->once()) + ->method('isReadable') + ->willReturn(true); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable ->expects($this->once()) ->method('write') ->with('foo'); $writable - ->expects($this->once()) - ->method('isWritable'); + ->expects($this->exactly(2)) + ->method('isWritable') + ->willReturn(true); $composite = new CompositeStream($readable, $writable); $composite->write('foo'); @@ -33,14 +83,16 @@ public function itShouldForwardReadableCallsToReadableStream() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); $readable - ->expects($this->once()) - ->method('isReadable'); + ->expects($this->exactly(2)) + ->method('isReadable') + ->willReturn(true); $readable ->expects($this->once()) ->method('pause'); $readable ->expects($this->once()) ->method('resume'); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable ->expects($this->any()) @@ -57,15 +109,19 @@ public function itShouldForwardReadableCallsToReadableStream() public function itShouldNotForwardResumeIfStreamIsNotWritable() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable + ->expects($this->once()) + ->method('isReadable') + ->willReturn(true); $readable ->expects($this->never()) ->method('resume'); $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable - ->expects($this->once()) + ->expects($this->exactly(2)) ->method('isWritable') - ->willReturn(false); + ->willReturnOnConsecutiveCalls(true, false); $composite = new CompositeStream($readable, $writable); $composite->resume(); @@ -75,7 +131,16 @@ public function itShouldNotForwardResumeIfStreamIsNotWritable() public function endShouldDelegateToWritableWithData() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable + ->expects($this->once()) + ->method('isReadable') + ->willReturn(true); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable + ->expects($this->once()) + ->method('isWritable') + ->willReturn(true); $writable ->expects($this->once()) ->method('end') @@ -89,10 +154,19 @@ public function endShouldDelegateToWritableWithData() public function closeShouldCloseBothStreams() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable + ->expects($this->once()) + ->method('isReadable') + ->willReturn(true); $readable ->expects($this->once()) ->method('close'); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable + ->expects($this->once()) + ->method('isWritable') + ->willReturn(true); $writable ->expects($this->once()) ->method('close'); @@ -132,6 +206,11 @@ public function itShouldReceiveForwardedEvents() public function itShouldHandlePipingCorrectly() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable + ->expects($this->once()) + ->method('isReadable') + ->willReturn(true); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable->expects($this->any())->method('isWritable')->willReturn(True); $writable @@ -150,8 +229,10 @@ public function itShouldHandlePipingCorrectly() public function itShouldForwardPipeCallsToReadableStream() { $readable = new ThroughStream(); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable->expects($this->any())->method('isWritable')->willReturn(True); + $composite = new CompositeStream($readable, $writable); $output = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();