Skip to content

Commit 6b90cb2

Browse files
authored
Merge pull request #84 from clue-labs/writablestreaminterface
Add WritableResourceStream
2 parents 844083c + bcc9c2e commit 6b90cb2

File tree

10 files changed

+186
-105
lines changed

10 files changed

+186
-105
lines changed

README.md

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ descriptor based implementation with an in-memory write buffer.
3535
* [close()](#close-1)
3636
* [DuplexStreamInterface](#duplexstreaminterface)
3737
* [ReadableResourceStream](#readableresourcestream)
38+
* [WritableResourceStream](#writableresourcestream)
3839
* [Usage](#usage)
3940
* [Install](#install)
4041
* [Tests](#tests)
@@ -761,12 +762,73 @@ mean it reached EOF.
761762
$stream->bufferSize = 8192;
762763
```
763764

765+
### WritableResourceStream
766+
767+
The `WritableResourceStream` is a concrete implementation of the
768+
[`WritableStreamInterface`](#writablestreaminterface) for PHP's stream resources.
769+
770+
This can be used to represent a write-only resource like a file stream opened in
771+
writable mode or a stream such as `STDOUT` or `STDERR`:
772+
773+
```php
774+
$stream = new WritableResourceStream(STDOUT, $loop);
775+
$stream->write('hello!');
776+
$stream->end();
777+
```
778+
779+
See also [`WritableStreamInterface`](#writablestreaminterface) for more details.
780+
781+
The first parameter given to the constructor MUST be a valid stream resource
782+
that is opened for writing.
783+
Otherwise, it will throw an `InvalidArgumentException`:
784+
785+
```php
786+
// throws InvalidArgumentException
787+
$stream = new WritableResourceStream(false, $loop);
788+
```
789+
790+
Internally, this class tries to enable non-blocking mode on the stream resource
791+
which may not be supported for all stream resources.
792+
Most notably, this is not supported by pipes on Windows (STDOUT, STDERR etc.).
793+
If this fails, it will throw a `RuntimeException`:
794+
795+
```php
796+
// throws RuntimeException on Windows
797+
$stream = new WritableResourceStream(STDOUT, $loop);
798+
```
799+
800+
Once the constructor is called with a valid stream resource, this class will
801+
take care of the underlying stream resource.
802+
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
803+
stream resource manually.
804+
Should you need to access the underlying stream resource, you can use the public
805+
`$stream` property like this:
806+
807+
```php
808+
var_dump(stream_get_meta_data($stream->stream));
809+
```
810+
811+
Any `write()` calls to this class will not be performaned instantly, but will
812+
be performaned asynchronously, once the EventLoop reports the stream resource is
813+
ready to accept data.
814+
For this, it uses an in-memory buffer string to collect all outstanding writes.
815+
This buffer has a soft-limit applied which defines how much data it is willing
816+
to accept before the caller SHOULD stop sending further data.
817+
It currently defaults to 64 KiB and can be controlled through the public
818+
`$softLimit` property like this:
819+
820+
```php
821+
$stream->softLimit = 8192;
822+
```
823+
824+
See also [`write()`](#write) for more details.
825+
764826
## Usage
765827
```php
766828
$loop = React\EventLoop\Factory::create();
767829

768830
$source = new React\Stream\ReadableResourceStream(fopen('omg.txt', 'r'), $loop);
769-
$dest = new React\Stream\Stream(fopen('wtf.txt', 'w'), $loop);
831+
$dest = new React\Stream\WritableResourceStream(fopen('wtf.txt', 'w'), $loop);
770832

771833
$source->pipe($dest);
772834

examples/benchmark-throughput.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323

2424
// setup input and output streams and pipe inbetween
2525
$in = new React\Stream\ReadableResourceStream(fopen($if, 'r'), $loop);
26-
$out = new React\Stream\Stream(fopen($of, 'w'), $loop);
27-
$out->pause();
26+
$out = new React\Stream\WritableResourceStream(fopen($of, 'w'), $loop);
2827
$in->pipe($out);
2928

3029
// stop input stream in $t seconds

examples/cat.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
<?php
22

33
use React\EventLoop\Factory;
4-
use React\Stream\Stream;
54
use React\Stream\ReadableResourceStream;
5+
use React\Stream\WritableResourceStream;
66

77
require __DIR__ . '/../vendor/autoload.php';
88

99
$loop = Factory::create();
1010

11-
$stdout = new Stream(STDOUT, $loop);
12-
$stdout->pause();
13-
11+
$stdout = new WritableResourceStream(STDOUT, $loop);
1412
$stdin = new ReadableResourceStream(STDIN, $loop);
1513
$stdin->pipe($stdout);
1614

src/ReadableResourceStream.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public function __construct($stream, LoopInterface $loop)
4444

4545
// ensure resource is opened for reading (fopen mode must contain "r" or "+")
4646
$meta = stream_get_meta_data($stream);
47-
if (isset($meta['mode']) && strpos($meta['mode'], 'r') === strpos($meta['mode'], '+')) {
47+
if (isset($meta['mode']) && $meta['mode'] !== '' && strpos($meta['mode'], 'r') === strpos($meta['mode'], '+')) {
4848
throw new InvalidArgumentException('Given stream resource is not opened in read mode');
4949
}
5050

src/Stream.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public function __construct($stream, LoopInterface $loop, WritableStreamInterfac
5858
}
5959

6060
if ($buffer === null) {
61-
$buffer = new Buffer($stream, $loop);
61+
$buffer = new WritableResourceStream($stream, $loop);
6262
}
6363

6464
$this->stream = $stream;
@@ -189,7 +189,7 @@ public function handleClose()
189189
}
190190

191191
/**
192-
* @return WritableStreamInterface|Buffer
192+
* @return WritableStreamInterface
193193
*/
194194
public function getBuffer()
195195
{
Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
use Evenement\EventEmitter;
66
use React\EventLoop\LoopInterface;
77

8-
class Buffer extends EventEmitter implements WritableStreamInterface
8+
class WritableResourceStream extends EventEmitter implements WritableStreamInterface
99
{
1010
public $stream;
11-
public $listening = false;
1211
public $softLimit = 65536;
12+
13+
private $listening = false;
1314
private $writable = true;
1415
private $closed = false;
1516
private $loop;
@@ -21,6 +22,11 @@ public function __construct($stream, LoopInterface $loop)
2122
throw new \InvalidArgumentException('First parameter must be a valid stream resource');
2223
}
2324

25+
$meta = stream_get_meta_data($stream);
26+
if (isset($meta['mode']) && str_replace(array('b', 't'), '', $meta['mode']) === 'r') {
27+
throw new \InvalidArgumentException('Given stream resource is not opened in write mode');
28+
}
29+
2430
// this class relies on non-blocking I/O in order to not interrupt the event loop
2531
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
2632
if (stream_set_blocking($stream, 0) !== true) {
@@ -87,6 +93,7 @@ public function close()
8793
$this->removeAllListeners();
8894
}
8995

96+
/** @internal */
9097
public function handleWrite()
9198
{
9299
$error = null;

tests/ReadableResourceStreamTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public function testConstructorThrowsExceptionOnInvalidStream()
3434
*/
3535
public function testConstructorThrowsExceptionOnWriteOnlyStream()
3636
{
37+
if (defined('HHVM_VERSION')) {
38+
$this->markTestSkipped('HHVM does not report fopen mode for STDOUT');
39+
}
40+
3741
$loop = $this->createLoopMock();
3842

3943
$this->setExpectedException('InvalidArgumentException');

tests/StreamIntegrationTest.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use React\Stream\Stream;
66
use React\EventLoop as rel;
7+
use React\Stream\ReadableResourceStream;
78

89
class StreamIntegrationTest extends TestCase
910
{
@@ -231,7 +232,7 @@ public function testReadsSingleChunkFromProcessPipe($condition, $loopFactory)
231232

232233
$loop = $loopFactory();
233234

234-
$stream = new Stream(popen('echo test', 'r'), $loop);
235+
$stream = new ReadableResourceStream(popen('echo test', 'r'), $loop);
235236
$stream->on('data', $this->expectCallableOnceWith("test\n"));
236237
$stream->on('end', $this->expectCallableOnce());
237238
$stream->on('error', $this->expectCallableNever());
@@ -250,7 +251,7 @@ public function testReadsMultipleChunksFromProcessPipe($condition, $loopFactory)
250251

251252
$loop = $loopFactory();
252253

253-
$stream = new Stream(popen('echo -n a;sleep 0.1;echo -n b;sleep 0.1;echo -n c', 'r'), $loop);
254+
$stream = new ReadableResourceStream(popen('echo -n a;sleep 0.1;echo -n b;sleep 0.1;echo -n c', 'r'), $loop);
254255

255256
$buffer = '';
256257
$stream->on('data', function ($chunk) use (&$buffer) {
@@ -276,7 +277,7 @@ public function testReadsLongChunksFromProcessPipe($condition, $loopFactory)
276277

277278
$loop = $loopFactory();
278279

279-
$stream = new Stream(popen('dd if=/dev/zero bs=12345 count=1234 2>&-', 'r'), $loop);
280+
$stream = new ReadableResourceStream(popen('dd if=/dev/zero bs=12345 count=1234 2>&-', 'r'), $loop);
280281

281282
$bytes = 0;
282283
$stream->on('data', function ($chunk) use (&$bytes) {
@@ -302,7 +303,7 @@ public function testReadsNothingFromProcessPipeWithNoOutput($condition, $loopFac
302303

303304
$loop = $loopFactory();
304305

305-
$stream = new Stream(popen('true', 'r'), $loop);
306+
$stream = new ReadableResourceStream(popen('true', 'r'), $loop);
306307
$stream->on('data', $this->expectCallableNever());
307308
$stream->on('end', $this->expectCallableOnce());
308309
$stream->on('error', $this->expectCallableNever());

tests/UtilTest.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace React\Tests\Stream;
44

5-
use React\Stream\Buffer;
5+
use React\Stream\WritableResourceStream;
66
use React\Stream\ReadableStream;
77
use React\Stream\Util;
88
use React\Stream\WritableStream;
@@ -171,13 +171,13 @@ public function testPipeWithTooSlowWritableShouldResumeOnDrain()
171171
$this->assertFalse($readable->paused);
172172
}
173173

174-
public function testPipeWithBuffer()
174+
public function testPipeWithWritableResourceStream()
175175
{
176176
$readable = new Stub\ReadableStreamStub();
177177

178178
$stream = fopen('php://temp', 'r+');
179179
$loop = $this->createLoopMock();
180-
$buffer = new Buffer($stream, $loop);
180+
$buffer = new WritableResourceStream($stream, $loop);
181181

182182
$readable->pipe($buffer);
183183

0 commit comments

Comments
 (0)