diff --git a/README.md b/README.md index fa36171..133b653 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,30 @@ descriptor based implementation with an in-memory write buffer. This component depends on `événement`, which is an implementation of the `EventEmitter`. -## Readable Streams - -### EventEmitter Events +**Table of contents** + +* [API](#api) + * [ReadableStreamInterface](#readablestreaminterface) + * [EventEmitter Events](#eventemitter-events) + * [isReadable()](#isreadable) + * [pause()](#pause) + * [resume()](#resume) + * [pipe()](#pipe) + * [close()](#close) + * [WritableStreamInterface](#writablestreaminterface) + * [EventEmitter Events](#eventemitter-events-1) + * [isWritable()](#iswritable) + * [write()](#write) + * [end()](#end) + * [close()](#close-1) +* [Usage](#usage) +* [Install](#install) + +## API + +### ReadableStreamInterface + +#### EventEmitter Events * `data`: Emitted whenever data was read from the source with a single mixed argument for incoming data. @@ -29,16 +50,91 @@ This component depends on `événement`, which is an implementation of the with a single `Exception` argument for error instance. * `close`: Emitted when the stream is closed. -### Methods +#### isReadable() + +The `isReadable(): bool` method can be used to +check whether this stream is in a readable state (not closed already). + +This method can be used to check if the stream still accepts incoming +data events or if it is ended or closed already. +Once the stream is non-readable, no further `data` or `end` events SHOULD +be emitted. + +```php +assert($stream->isReadable() === false); + +$stream->on('data', assertNeverCalled()); +$stream->on('end', assertNeverCalled()); +``` + +A successfully opened stream always MUST start in readable mode. + +Once the stream ends or closes, it MUST switch to non-readable mode. +This can happen any time, explicitly through `close()` or +implicitly due to a remote close or an unrecoverable transmission error. +Once a stream has switched to non-readable mode, it MUST NOT transition +back to readable mode. + +If this stream is a `DuplexStreamInterface`, you should also notice +how the writable side of the stream also implements an `isWritable()` +method. Unless this is a half-open duplex stream, they SHOULD usually +have the same return value. + +#### pause() + +The `pause(): void` method can be used to +pause reading incoming data events. + +Removes the data source file descriptor from the event loop. This +allows you to throttle incoming data. -* `isReadable()`: Check if the stream is still in a state allowing it to be - read from. It becomes unreadable when the stream ends, closes or an - error occurs. -* `pause()`: Remove the data source file descriptor from the event loop. This - allows you to throttle incoming data. -* `resume()`: Re-attach the data source after a `pause()`. -* `pipe(WritableStreamInterface $dest, array $options = [])`: -Pipe all the data from this readable source into the given writable destination. +Unless otherwise noted, a successfully opened stream SHOULD NOT start +in paused state. + +Once the stream is paused, no futher `data` or `end` events SHOULD +be emitted. + +```php +$stream->pause(); + +$stream->on('data', assertShouldNeverCalled()); +$stream->on('end', assertShouldNeverCalled()); +``` + +This method is advisory-only, though generally not recommended, the +stream MAY continue emitting `data` events. + +You can continue processing events by calling `resume()` again. + +Note that both methods can be called any number of times, in particular +calling `pause()` more than once SHOULD NOT have any effect. + +See also `resume()`. + +#### resume() + +The `resume(): void` method can be used to +resume reading incoming data events. + +Re-attach the data source after a previous `pause()`. + +```php +$stream->pause(); + +$loop->addTimer(1.0, function () use ($stream) { + $stream->resume(); +}); +``` + +Note that both methods can be called any number of times, in particular +calling `resume()` without a prior `pause()` SHOULD NOT have any effect. + +See also `pause()`. + +#### pipe() + +The `pipe(WritableStreamInterface $dest, array $options = [])` method can be used to +pipe all the data from this readable source into the given writable destination. Automatically sends all incoming data to the destination. Automatically throttles the source based on what the destination can handle. @@ -103,9 +199,38 @@ $source->pipe($dest); $dest->close(); // calls $source->pause() ``` -## Writable Streams +#### close() -### EventEmitter Events +The `close(): void` method can be used to +close the stream (forcefully). + +This method can be used to (forcefully) close the stream. + +```php +$stream->close(); +``` + +After calling this method, the stream MUST switch into a non-readable +mode, see also `isReadable()`. +This means that no further `data` or `end` events SHOULD be emitted. + +```php +$stream->close(); +assert($stream->isReadable() === false); + +$stream->on('data', assertNeverCalled()); +$stream->on('end', assertNeverCalled()); +``` + +If this stream is a `DuplexStreamInterface`, you should also notice +how the writable side of the stream also implements a `close()` method. +In other words, after calling this method, the stream MUST switch into +non-writable AND non-readable mode, see also `isWritable()`. +Note that this method should not be confused with the `end()` method. + +### WritableStreamInterface + +#### EventEmitter Events * `drain`: Emitted if the write buffer became full previously and is now ready to accept more data. @@ -115,16 +240,181 @@ $dest->close(); // calls $source->pause() * `pipe`: Emitted whenever a readable stream is `pipe()`d into this stream with a single `ReadableStreamInterface` argument for source stream. -### Methods +#### isWritable() + +The `isWritable(): bool` method can be used to +check whether this stream is in a writable state (not closed already). + +This method can be used to check if the stream still accepts writing +any data or if it is ended or closed already. +Writing any data to a non-writable stream is a NO-OP: + +```php +assert($stream->isWritable() === false); + +$stream->write('end'); // NO-OP +$stream->end('end'); // NO-OP +``` + +A successfully opened stream always MUST start in writable mode. + +Once the stream ends or closes, it MUST switch to non-writable mode. +This can happen any time, explicitly through `end()` or `close()` or +implicitly due to a remote close or an unrecoverable transmission error. +Once a stream has switched to non-writable mode, it MUST NOT transition +back to writable mode. + +If this stream is a `DuplexStreamInterface`, you should also notice +how the readable side of the stream also implements an `isReadable()` +method. Unless this is a half-open duplex stream, they SHOULD usually +have the same return value. + +#### write() + +The `write(mixed $data): bool` method can be used to +write some data into the stream. + +A successful write MUST be confirmed with a boolean `true`, which means +that either the data was written (flushed) immediately or is buffered and +scheduled for a future write. Note that this interface gives you no +control over explicitly flushing the buffered data, as finding the +appropriate time for this is beyond the scope of this interface and left +up to the implementation of this interface. + +Many common streams (such as a TCP/IP connection or file-based stream) +may choose to buffer all given data and schedule a future flush by using +an underlying EventLoop to check when the resource is actually writable. + +If a stream cannot handle writing (or flushing) the data, it SHOULD emit +an `error` event and MAY `close()` the stream if it can not recover from +this error. + +If the internal buffer is full after adding `$data`, then `write()` +SHOULD return `false`, indicating that the caller should stop sending +data until the buffer `drain`s. + +Similarly, if the the stream is not writable (already in a closed state) +it MUST NOT process the given `$data` and SHOULD return `false`, +indicating that the caller should stop sending data. + +The given `$data` argument MAY be of mixed type, but it's usually +recommended it SHOULD be a `string` value or MAY use a type that allows +representation as a `string` for maximum compatibility. + +Many common streams (such as a TCP/IP connection or a file-based stream) +will only accept the raw (binary) payload data that is transferred over +the wire as chunks of `string` values. + +Due to the stream-based nature of this, the sender may send any number +of chunks with varying sizes. There are no guarantees that these chunks +will be received with the exact same framing the sender intended to send. +In other words, many lower-level protocols (such as TCP/IP) transfer the +data in chunks that may be anywhere between single-byte values to several +dozens of kilobytes. You may want to apply a higher-level protocol to +these low-level data chunks in order to achieve proper message framing. + +#### end() + +The `end(mixed $data = null): void` method can be used to +successfully end the stream (after optionally sending some final data). + +This method can be used to successfully end the stream, i.e. close +the stream after sending out all data that is currently buffered. + +```php +$stream->write('hello'); +$stream->write('world'); +$stream->end(); +``` + +If there's no data currently buffered and nothing to be flushed, then +this method MAY `close()` the stream immediately. + +If there's still data in the buffer that needs to be flushed first, then +this method SHOULD try to write out this data and only then `close()` +the stream. + +Note that this interface gives you no control over explicitly flushing +the buffered data, as finding the appropriate time for this is beyond the +scope of this interface and left up to the implementation of this +interface. + +Many common streams (such as a TCP/IP connection or file-based stream) +may choose to buffer all given data and schedule a future flush by using +an underlying EventLoop to check when the resource is actually writable. + +You can optionally pass some final data that is written to the stream +before ending the stream. If a non-`null` value is given as `$data`, then +this method will behave just like calling `write($data)` before ending +with no data. + +```php +// shorter version +$stream->end('bye'); + +// same as longer version +$stream->write('bye'); +$stream->end(); +``` + +After calling this method, the stream MUST switch into a non-writable +mode, see also `isWritable()`. +This means that no further writes are possible, so any additional +`write()` or `end()` calls have no effect. + +```php +$stream->end(); +assert($stream->isWritable() === false); + +$stream->write('nope'); // NO-OP +$stream->end(); // NO-OP +``` + +Note that this method should not be confused with the `close()` method. + +#### close() + +The `close(): void` method can be used to +close the stream (forcefully). + +This method can be used to forcefully close the stream, i.e. close +the stream without waiting for any buffered data to be flushed. +If there's still data in the buffer, this data SHOULD be discarded. + +```php +$stream->close(); +``` + +After calling this method, the stream MUST switch into a non-writable +mode, see also `isWritable()`. +This means that no further writes are possible, so any additional +`write()` or `end()` calls have no effect. + +```php +$stream->close(); +assert($stream->isWritable() === false); + +$stream->write('nope'); // NO-OP +$stream->end(); // NO-OP +``` + +Note that this method should not be confused with the `end()` method. +Unlike the `end()` method, this method does not take care of any existing +buffers and simply discards any buffer contents. +Likewise, this method may also be called after calling `end()` on a +stream in order to stop waiting for the stream to flush its final data. + +```php +$stream->end(); +$loop->addTimer(1.0, function () use ($stream) { + $stream->close(); +}); +``` -* `isWritable()`: Check if the stream can still be written to. It cannot be - written to after an error or when it is closed. -* `write($data)`: Write some data into the stream. If the stream cannot handle - it, it should buffer the data or emit and `error` event. If the internal - buffer is full after adding `$data`, `write` should return false, indicating - that the caller should stop sending data until the buffer `drain`s. -* `end($data = null)`: Optionally write some final data to the stream, empty - the buffer, then close it. +If this stream is a `DuplexStreamInterface`, you should also notice +how the readable side of the stream also implements a `close()` method. +In other words, after calling this method, the stream MUST switch into +non-writable AND non-readable mode, see also `isReadable()`. ## Usage ```php diff --git a/src/Buffer.php b/src/Buffer.php index d2ea7b4..614edb3 100644 --- a/src/Buffer.php +++ b/src/Buffer.php @@ -33,7 +33,7 @@ public function isWritable() public function write($data) { if (!$this->writable) { - return; + return false; } $this->data .= $data; diff --git a/src/BufferedSink.php b/src/BufferedSink.php index 730c808..1ef409e 100644 --- a/src/BufferedSink.php +++ b/src/BufferedSink.php @@ -31,8 +31,14 @@ public function handleErrorEvent($e) public function write($data) { + if ($this->closed) { + return false; + } + $this->buffer .= $data; $this->deferred->progress($data); + + return true; } public function close() diff --git a/src/ReadableStreamInterface.php b/src/ReadableStreamInterface.php index e4a5d14..0009d1a 100644 --- a/src/ReadableStreamInterface.php +++ b/src/ReadableStreamInterface.php @@ -12,8 +12,89 @@ */ interface ReadableStreamInterface extends EventEmitterInterface { + /** + * Checks whether this stream is in a readable state (not closed already). + * + * This method can be used to check if the stream still accepts incoming + * data events or if it is ended or closed already. + * Once the stream is non-readable, no further `data` or `end` events SHOULD + * be emitted. + * + * ```php + * assert($stream->isReadable() === false); + * + * $stream->on('data', assertNeverCalled()); + * $stream->on('end', assertNeverCalled()); + * ``` + * + * A successfully opened stream always MUST start in readable mode. + * + * Once the stream ends or closes, it MUST switch to non-readable mode. + * This can happen any time, explicitly through `close()` or + * implicitly due to a remote close or an unrecoverable transmission error. + * Once a stream has switched to non-readable mode, it MUST NOT transition + * back to readable mode. + * + * If this stream is a `DuplexStreamInterface`, you should also notice + * how the writable side of the stream also implements an `isWritable()` + * method. Unless this is a half-open duplex stream, they SHOULD usually + * have the same return value. + * + * @return bool + */ public function isReadable(); + + /** + * Pauses reading incoming data events. + * + * Removes the data source file descriptor from the event loop. This + * allows you to throttle incoming data. + * + * Unless otherwise noted, a successfully opened stream SHOULD NOT start + * in paused state. + * + * Once the stream is paused, no futher `data` or `end` events SHOULD + * be emitted. + * + * ```php + * $stream->pause(); + * + * $stream->on('data', assertShouldNeverCalled()); + * $stream->on('end', assertShouldNeverCalled()); + * ``` + * + * This method is advisory-only, though generally not recommended, the + * stream MAY continue emitting `data` events. + * + * You can continue processing events by calling `resume()` again. + * + * Note that both methods can be called any number of times, in particular + * calling `pause()` more than once SHOULD NOT have any effect. + * + * @see self::resume() + * @return void + */ public function pause(); + + /** + * Resumes reading incoming data events. + * + * Re-attach the data source after a previous `pause()`. + * + * ```php + * $stream->pause(); + * + * $loop->addTimer(1.0, function () use ($stream) { + * $stream->resume(); + * }); + * ``` + * + * Note that both methods can be called any number of times, in particular + * calling `resume()` without a prior `pause()` SHOULD NOT have any effect. + * + * @see self::pause() + * @return void + */ public function resume(); /** @@ -88,5 +169,35 @@ public function resume(); */ public function pipe(WritableStreamInterface $dest, array $options = array()); + /** + * Closes the stream (forcefully). + * + * This method can be used to (forcefully) close the stream. + * + * ```php + * $stream->close(); + * ``` + * + * After calling this method, the stream MUST switch into a non-readable + * mode, see also `isReadable()`. + * This means that no further `data` or `end` events SHOULD be emitted. + * + * ```php + * $stream->close(); + * assert($stream->isReadable() === false); + * + * $stream->on('data', assertNeverCalled()); + * $stream->on('end', assertNeverCalled()); + * ``` + * + * If this stream is a `DuplexStreamInterface`, you should also notice + * how the writable side of the stream also implements a `close()` method. + * In other words, after calling this method, the stream MUST switch into + * non-writable AND non-readable mode, see also `isWritable()`. + * Note that this method should not be confused with the `end()` method. + * + * @return void + * @see WritableStreamInterface::close() + */ public function close(); } diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 29789df..93db37f 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -19,7 +19,13 @@ public function filter($data) public function write($data) { + if (!$this->writable) { + return false; + } + $this->readable->emit('data', array($this->filter($data))); + + return true; } public function end($data = null) diff --git a/src/WritableStream.php b/src/WritableStream.php index 612450a..7b96b62 100644 --- a/src/WritableStream.php +++ b/src/WritableStream.php @@ -10,6 +10,7 @@ class WritableStream extends EventEmitter implements WritableStreamInterface public function write($data) { + return false; } public function end($data = null) diff --git a/src/WritableStreamInterface.php b/src/WritableStreamInterface.php index 93eac90..0f33686 100644 --- a/src/WritableStreamInterface.php +++ b/src/WritableStreamInterface.php @@ -12,8 +12,190 @@ */ interface WritableStreamInterface extends EventEmitterInterface { + /** + * Checks whether this stream is in a writable state (not closed already). + * + * This method can be used to check if the stream still accepts writing + * any data or if it is ended or closed already. + * Writing any data to a non-writable stream is a NO-OP: + * + * ```php + * assert($stream->isWritable() === false); + * + * $stream->write('end'); // NO-OP + * $stream->end('end'); // NO-OP + * ``` + * + * A successfully opened stream always MUST start in writable mode. + * + * Once the stream ends or closes, it MUST switch to non-writable mode. + * This can happen any time, explicitly through `end()` or `close()` or + * implicitly due to a remote close or an unrecoverable transmission error. + * Once a stream has switched to non-writable mode, it MUST NOT transition + * back to writable mode. + * + * If this stream is a `DuplexStreamInterface`, you should also notice + * how the readable side of the stream also implements an `isReadable()` + * method. Unless this is a half-open duplex stream, they SHOULD usually + * have the same return value. + * + * @return bool + */ public function isWritable(); + + /** + * Write some data into the stream. + * + * A successful write MUST be confirmed with a boolean `true`, which means + * that either the data was written (flushed) immediately or is buffered and + * scheduled for a future write. Note that this interface gives you no + * control over explicitly flushing the buffered data, as finding the + * appropriate time for this is beyond the scope of this interface and left + * up to the implementation of this interface. + * + * Many common streams (such as a TCP/IP connection or file-based stream) + * may choose to buffer all given data and schedule a future flush by using + * an underlying EventLoop to check when the resource is actually writable. + * + * If a stream cannot handle writing (or flushing) the data, it SHOULD emit + * an `error` event and MAY `close()` the stream if it can not recover from + * this error. + * + * If the internal buffer is full after adding `$data`, then `write()` + * SHOULD return `false`, indicating that the caller should stop sending + * data until the buffer `drain`s. + * + * Similarly, if the the stream is not writable (already in a closed state) + * it MUST NOT process the given `$data` and SHOULD return `false`, + * indicating that the caller should stop sending data. + * + * The given `$data` argument MAY be of mixed type, but it's usually + * recommended it SHOULD be a `string` value or MAY use a type that allows + * representation as a `string` for maximum compatibility. + * + * Many common streams (such as a TCP/IP connection or a file-based stream) + * will only accept the raw (binary) payload data that is transferred over + * the wire as chunks of `string` values. + * + * Due to the stream-based nature of this, the sender may send any number + * of chunks with varying sizes. There are no guarantees that these chunks + * will be received with the exact same framing the sender intended to send. + * In other words, many lower-level protocols (such as TCP/IP) transfer the + * data in chunks that may be anywhere between single-byte values to several + * dozens of kilobytes. You may want to apply a higher-level protocol to + * these low-level data chunks in order to achieve proper message framing. + * + * @param mixed|string $data + * @return bool + */ public function write($data); + + /** + * Successfully ends the stream (after optionally sending some final data). + * + * This method can be used to successfully end the stream, i.e. close + * the stream after sending out all data that is currently buffered. + * + * ```php + * $stream->write('hello'); + * $stream->write('world'); + * $stream->end(); + * ``` + * + * If there's no data currently buffered and nothing to be flushed, then + * this method MAY `close()` the stream immediately. + * + * If there's still data in the buffer that needs to be flushed first, then + * this method SHOULD try to write out this data and only then `close()` + * the stream. + * + * Note that this interface gives you no control over explicitly flushing + * the buffered data, as finding the appropriate time for this is beyond the + * scope of this interface and left up to the implementation of this + * interface. + * + * Many common streams (such as a TCP/IP connection or file-based stream) + * may choose to buffer all given data and schedule a future flush by using + * an underlying EventLoop to check when the resource is actually writable. + * + * You can optionally pass some final data that is written to the stream + * before ending the stream. If a non-`null` value is given as `$data`, then + * this method will behave just like calling `write($data)` before ending + * with no data. + * + * ```php + * // shorter version + * $stream->end('bye'); + * + * // same as longer version + * $stream->write('bye'); + * $stream->end(); + * ``` + * + * After calling this method, the stream MUST switch into a non-writable + * mode, see also `isWritable()`. + * This means that no further writes are possible, so any additional + * `write()` or `end()` calls have no effect. + * + * ```php + * $stream->end(); + * assert($stream->isWritable() === false); + * + * $stream->write('nope'); // NO-OP + * $stream->end(); // NO-OP + * ``` + * + * Note that this method should not be confused with the `close()` method. + * + * @param mixed|string|null $data + * @return void + */ public function end($data = null); + + /** + * Closes the stream (forcefully). + * + * This method can be used to forcefully close the stream, i.e. close + * the stream without waiting for any buffered data to be flushed. + * If there's still data in the buffer, this data SHOULD be discarded. + * + * ```php + * $stream->close(); + * ``` + * + * After calling this method, the stream MUST switch into a non-writable + * mode, see also `isWritable()`. + * This means that no further writes are possible, so any additional + * `write()` or `end()` calls have no effect. + * + * ```php + * $stream->close(); + * assert($stream->isWritable() === false); + * + * $stream->write('nope'); // NO-OP + * $stream->end(); // NO-OP + * ``` + * + * Note that this method should not be confused with the `end()` method. + * Unlike the `end()` method, this method does not take care of any existing + * buffers and simply discards any buffer contents. + * Likewise, this method may also be called after calling `end()` on a + * stream in order to stop waiting for the stream to flush its final data. + * + * ```php + * $stream->end(); + * $loop->addTimer(1.0, function () use ($stream) { + * $stream->close(); + * }); + * ``` + * + * If this stream is a `DuplexStreamInterface`, you should also notice + * how the readable side of the stream also implements a `close()` method. + * In other words, after calling this method, the stream MUST switch into + * non-writable AND non-readable mode, see also `isReadable()`. + * + * @return void + * @see ReadableStreamInterface::close() + */ public function close(); }