diff --git a/README.md b/README.md index 996e0ee9..6c809c2b 100644 --- a/README.md +++ b/README.md @@ -637,9 +637,9 @@ know what you're doing. Internally, the `SecureServer` has to set the required TLS context options on the underlying stream resources. These resources are not exposed through any of the interfaces defined in this -package, but only through the `React\Stream\Stream` class. +package, but only through the internal `Connection` class. The `TcpServer` class is guaranteed to emit connections that implement -the `ConnectionInterface` and also extend the `Stream` class in order to +the `ConnectionInterface` and uses the internal `Connection` class in order to expose these underlying resources. If you use a custom `ServerInterface` and its `connection` event does not meet this requirement, the `SecureServer` will emit an `error` event and @@ -1237,8 +1237,11 @@ options as described above. All versions of PHP prior to 5.6.8 suffered from a buffering issue where reading from a streaming TLS connection could be one `data` event behind. This library implements a work-around to try to flush the complete incoming -data buffers on these versions, but we have seen reports of people saying this -could still affect some older versions (`5.5.23`, `5.6.7`, and `5.6.8`). +data buffers on these legacy PHP versions, which has a penalty of around 10% of +throughput on all connections. +With this work-around, we have not been able to reproduce this issue anymore, +but we have seen reports of people saying this could still affect some of the +older PHP versions (`5.5.23`, `5.6.7`, and `5.6.8`). Note that this only affects *some* higher-level streaming protocols, such as IRC over TLS, but should not affect HTTP over TLS (HTTPS). Further investigation of this issue is needed. diff --git a/composer.json b/composer.json index afdf3c51..8864e7a6 100644 --- a/composer.json +++ b/composer.json @@ -8,14 +8,14 @@ "evenement/evenement": "~2.0|~1.0", "react/dns": "0.4.*|0.3.*", "react/event-loop": "0.4.*|0.3.*", - "react/stream": "^0.6 || ^0.5 || ^0.4.5", + "react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.5", "react/promise": "^2.1 || ^1.2", "react/promise-timer": "~1.0" }, "require-dev": { "clue/block-react": "^1.1", "phpunit/phpunit": "~4.8", - "react/stream": "^0.6" + "react/stream": "^1.0 || ^0.7 || ^0.6" }, "autoload": { "psr-4": { diff --git a/src/Connection.php b/src/Connection.php index ad22025c..528cb76d 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -2,7 +2,12 @@ namespace React\Socket; +use Evenement\EventEmitter; +use React\EventLoop\LoopInterface; +use React\Stream\DuplexResourceStream; use React\Stream\Stream; +use React\Stream\Util; +use React\Stream\WritableStreamInterface; /** * The actual connection implementation for ConnectionInterface @@ -12,7 +17,7 @@ * @see ConnectionInterface * @internal */ -class Connection extends Stream implements ConnectionInterface +class Connection extends EventEmitter implements ConnectionInterface { /** * Internal flag whether encryption has been enabled on this connection @@ -24,6 +29,84 @@ class Connection extends Stream implements ConnectionInterface */ public $encryptionEnabled = false; + /** @internal */ + public $stream; + + private $input; + + public function __construct($resource, LoopInterface $loop) + { + // PHP < 5.6.8 suffers from a buffer indicator bug on secure TLS connections + // as a work-around we always read the complete buffer until its end. + // The buffer size is limited due to TCP/IP buffers anyway, so this + // should not affect usage otherwise. + // See https://bugs.php.net/bug.php?id=65137 + // https://bugs.php.net/bug.php?id=41631 + // https://github.com/reactphp/socket-client/issues/24 + $clearCompleteBuffer = (version_compare(PHP_VERSION, '5.6.8', '<')); + + // @codeCoverageIgnoreStart + if (class_exists('React\Stream\Stream')) { + // legacy react/stream < 0.7 requires additional buffer property + $this->input = new Stream($resource, $loop); + if ($clearCompleteBuffer) { + $this->input->bufferSize = null; + } + } else { + // preferred react/stream >= 0.7 accepts buffer parameter + $this->input = new DuplexResourceStream($resource, $loop, $clearCompleteBuffer ? -1 : null); + } + // @codeCoverageIgnoreEnd + + $this->stream = $resource; + + Util::forwardEvents($this->input, $this, array('data', 'end', 'error', 'close', 'pipe', 'drain')); + + $this->input->on('close', array($this, 'close')); + } + + public function isReadable() + { + return $this->input->isReadable(); + } + + public function isWritable() + { + return $this->input->isWritable(); + } + + public function pause() + { + $this->input->pause(); + } + + public function resume() + { + $this->input->resume(); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + return $this->input->pipe($dest, $options); + } + + public function write($data) + { + return $this->input->write($data); + } + + public function end($data = null) + { + $this->input->end($data); + } + + public function close() + { + $this->input->close(); + $this->handleClose(); + $this->removeAllListeners(); + } + public function handleClose() { if (!is_resource($this->stream)) { diff --git a/src/SecureConnector.php b/src/SecureConnector.php index e418864d..99cde108 100644 --- a/src/SecureConnector.php +++ b/src/SecureConnector.php @@ -3,7 +3,6 @@ namespace React\Socket; use React\EventLoop\LoopInterface; -use React\Stream\Stream; use React\Promise; final class SecureConnector implements ConnectorInterface @@ -41,9 +40,9 @@ public function connect($uri) return $this->connector->connect($uri)->then(function (ConnectionInterface $connection) use ($context, $encryption) { // (unencrypted) TCP/IP connection succeeded - if (!$connection instanceof Stream) { + if (!$connection instanceof Connection) { $connection->close(); - throw new \UnexpectedValueException('Connection MUST extend Stream in order to access underlying stream resource'); + throw new \UnexpectedValueException('Base connector does not use internal Connection class exposing stream resource'); } // set required SSL/TLS context options diff --git a/src/SecureServer.php b/src/SecureServer.php index 64cbeda9..3d621a57 100644 --- a/src/SecureServer.php +++ b/src/SecureServer.php @@ -6,7 +6,6 @@ use React\EventLoop\LoopInterface; use React\Socket\TcpServer; use React\Socket\ConnectionInterface; -use React\Stream\Stream; /** * The `SecureServer` class implements the `ServerInterface` and is responsible @@ -101,9 +100,9 @@ final class SecureServer extends EventEmitter implements ServerInterface * Internally, the `SecureServer` has to set the required TLS context options on * the underlying stream resources. * These resources are not exposed through any of the interfaces defined in this - * package, but only through the `React\Stream\Stream` class. + * package, but only through the internal `Connection` class. * The `TcpServer` class is guaranteed to emit connections that implement - * the `ConnectionInterface` and also extend the `Stream` class in order to + * the `ConnectionInterface` and uses the internal `Connection` class in order to * expose these underlying resources. * If you use a custom `ServerInterface` and its `connection` event does not * meet this requirement, the `SecureServer` will emit an `error` event and @@ -163,8 +162,8 @@ public function close() /** @internal */ public function handleConnection(ConnectionInterface $connection) { - if (!$connection instanceof Stream) { - $this->emit('error', array(new \UnexpectedValueException('Connection event MUST emit an instance extending Stream in order to access underlying stream resource'))); + if (!$connection instanceof Connection) { + $this->emit('error', array(new \UnexpectedValueException('Base server does not use internal Connection class exposing stream resource'))); $connection->end(); return; } diff --git a/src/StreamEncryption.php b/src/StreamEncryption.php index e73d9416..bb3356ca 100644 --- a/src/StreamEncryption.php +++ b/src/StreamEncryption.php @@ -21,22 +21,11 @@ class StreamEncryption private $errstr; private $errno; - private $wrapSecure = false; - public function __construct(LoopInterface $loop, $server = true) { $this->loop = $loop; $this->server = $server; - // See https://bugs.php.net/bug.php?id=65137 - // https://bugs.php.net/bug.php?id=41631 - // https://github.com/reactphp/socket-client/issues/24 - // On versions affected by this bug we need to fread the stream until we - // get an empty string back because the buffer indicator could be wrong - if (version_compare(PHP_VERSION, '5.6.8', '<')) { - $this->wrapSecure = true; - } - if ($server) { $this->method = STREAM_CRYPTO_METHOD_TLS_SERVER; @@ -100,16 +89,11 @@ public function toggle(Connection $stream, $toggle) $toggleCrypto(); } - $wrap = $this->wrapSecure && $toggle; $loop = $this->loop; - return $deferred->promise()->then(function () use ($stream, $socket, $wrap, $loop, $toggle) { + return $deferred->promise()->then(function () use ($stream, $socket, $loop, $toggle) { $loop->removeReadStream($socket); - if ($wrap) { - $stream->bufferSize = null; - } - $stream->encryptionEnabled = $toggle; $stream->resume(); diff --git a/tests/FunctionalSecureServerTest.php b/tests/FunctionalSecureServerTest.php index b591cbf9..f9e719dd 100644 --- a/tests/FunctionalSecureServerTest.php +++ b/tests/FunctionalSecureServerTest.php @@ -3,7 +3,6 @@ namespace React\Tests\Socket; use React\EventLoop\Factory; -use React\Stream\Stream; use React\Socket\SecureServer; use React\Socket\ConnectionInterface; use React\Socket\TcpServer; @@ -60,7 +59,7 @@ public function testWritesDataToConnection() $promise = $connector->connect($server->getAddress()); $local = Block\await($promise, $loop, self::TIMEOUT); - /* @var $local React\Stream\Stream */ + /* @var $local ConnectionInterface */ $local->on('data', $this->expectCallableOnceWith('foo')); @@ -346,7 +345,7 @@ public function testEmitsErrorIfConnectionIsNotSecureHandshake() $connector = new TcpConnector($loop); $promise = $connector->connect(str_replace('tls://', '', $server->getAddress())); - $promise->then(function (Stream $stream) { + $promise->then(function (ConnectionInterface $stream) { $stream->write("GET / HTTP/1.0\r\n\r\n"); }); diff --git a/tests/IntegrationTest.php b/tests/IntegrationTest.php index d571cf0b..e17ea9c7 100644 --- a/tests/IntegrationTest.php +++ b/tests/IntegrationTest.php @@ -7,7 +7,6 @@ use React\Socket\Connector; use React\Socket\SecureConnector; use React\Socket\TcpConnector; -use React\Stream\BufferedSink; use Clue\React\Block; use React\Socket\DnsConnector; @@ -28,7 +27,7 @@ public function gettingStuffFromGoogleShouldWork() $conn->write("GET / HTTP/1.0\r\n\r\n"); - $response = Block\await(BufferedSink::createPromise($conn), $loop, self::TIMEOUT); + $response = $this->buffer($conn, $loop, self::TIMEOUT); $this->assertRegExp('#^HTTP/1\.0#', $response); } @@ -47,7 +46,7 @@ public function gettingEncryptedStuffFromGoogleShouldWork() $conn->write("GET / HTTP/1.0\r\n\r\n"); - $response = Block\await(BufferedSink::createPromise($conn), $loop, self::TIMEOUT); + $response = $this->buffer($conn, $loop, self::TIMEOUT); $this->assertRegExp('#^HTTP/1\.0#', $response); } @@ -76,7 +75,7 @@ public function gettingEncryptedStuffFromGoogleShouldWorkIfHostIsResolvedFirst() $conn->write("GET / HTTP/1.0\r\n\r\n"); - $response = Block\await(BufferedSink::createPromise($conn), $loop, self::TIMEOUT); + $response = $this->buffer($conn, $loop, self::TIMEOUT); $this->assertRegExp('#^HTTP/1\.0#', $response); } @@ -94,7 +93,7 @@ public function gettingPlaintextStuffFromEncryptedGoogleShouldNotWork() $conn->write("GET / HTTP/1.0\r\n\r\n"); - $response = Block\await(BufferedSink::createPromise($conn), $loop, self::TIMEOUT); + $response = $this->buffer($conn, $loop, self::TIMEOUT); $this->assertNotRegExp('#^HTTP/1\.0#', $response); } diff --git a/tests/SecureIntegrationTest.php b/tests/SecureIntegrationTest.php index c4cee886..357acb65 100644 --- a/tests/SecureIntegrationTest.php +++ b/tests/SecureIntegrationTest.php @@ -7,12 +7,11 @@ use React\Socket\SecureServer; use React\Socket\TcpConnector; use React\Socket\SecureConnector; -use React\Stream\Stream; use Clue\React\Block; use React\Promise\Promise; use Evenement\EventEmitterInterface; use React\Promise\Deferred; -use React\Stream\BufferedSink; +use React\Socket\ConnectionInterface; class SecureIntegrationTest extends TestCase { @@ -49,7 +48,7 @@ public function tearDown() public function testConnectToServer() { $client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT); - /* @var $client Stream */ + /* @var $client ConnectionInterface */ $client->close(); } @@ -61,7 +60,7 @@ public function testConnectToServerEmitsConnection() $promiseClient = $this->connector->connect($this->address); list($_, $client) = Block\awaitAll(array($promiseServer, $promiseClient), $this->loop, self::TIMEOUT); - /* @var $client Stream */ + /* @var $client ConnectionInterface */ $client->close(); } @@ -70,14 +69,14 @@ public function testSendSmallDataToServerReceivesOneChunk() { // server expects one connection which emits one data event $received = new Deferred(); - $this->server->on('connection', function (Stream $peer) use ($received) { + $this->server->on('connection', function (ConnectionInterface $peer) use ($received) { $peer->on('data', function ($chunk) use ($received) { $received->resolve($chunk); }); }); $client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT); - /* @var $client Stream */ + /* @var $client ConnectionInterface */ $client->write('hello'); @@ -92,7 +91,7 @@ public function testSendSmallDataToServerReceivesOneChunk() public function testSendDataWithEndToServerReceivesAllData() { $disconnected = new Deferred(); - $this->server->on('connection', function (Stream $peer) use ($disconnected) { + $this->server->on('connection', function (ConnectionInterface $peer) use ($disconnected) { $received = ''; $peer->on('data', function ($chunk) use (&$received) { $received .= $chunk; @@ -103,7 +102,7 @@ public function testSendDataWithEndToServerReceivesAllData() }); $client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT); - /* @var $client Stream */ + /* @var $client ConnectionInterface */ $data = str_repeat('a', 200000); $client->end($data); @@ -117,14 +116,14 @@ public function testSendDataWithEndToServerReceivesAllData() public function testSendDataWithoutEndingToServerReceivesAllData() { $received = ''; - $this->server->on('connection', function (Stream $peer) use (&$received) { + $this->server->on('connection', function (ConnectionInterface $peer) use (&$received) { $peer->on('data', function ($chunk) use (&$received) { $received .= $chunk; }); }); $client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT); - /* @var $client Stream */ + /* @var $client ConnectionInterface */ $data = str_repeat('d', 200000); $client->write($data); @@ -139,12 +138,12 @@ public function testSendDataWithoutEndingToServerReceivesAllData() public function testConnectToServerWhichSendsSmallDataReceivesOneChunk() { - $this->server->on('connection', function (Stream $peer) { + $this->server->on('connection', function (ConnectionInterface $peer) { $peer->write('hello'); }); $client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT); - /* @var $client Stream */ + /* @var $client ConnectionInterface */ // await client to report one "data" event $receive = $this->createPromiseForEvent($client, 'data', $this->expectCallableOnceWith('hello')); @@ -156,15 +155,15 @@ public function testConnectToServerWhichSendsSmallDataReceivesOneChunk() public function testConnectToServerWhichSendsDataWithEndReceivesAllData() { $data = str_repeat('b', 100000); - $this->server->on('connection', function (Stream $peer) use ($data) { + $this->server->on('connection', function (ConnectionInterface $peer) use ($data) { $peer->end($data); }); $client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT); - /* @var $client Stream */ + /* @var $client ConnectionInterface */ // await data from client until it closes - $received = Block\await(BufferedSink::createPromise($client), $this->loop, self::TIMEOUT); + $received = $this->buffer($client, $this->loop, self::TIMEOUT); $this->assertEquals($data, $received); } @@ -172,12 +171,12 @@ public function testConnectToServerWhichSendsDataWithEndReceivesAllData() public function testConnectToServerWhichSendsDataWithoutEndingReceivesAllData() { $data = str_repeat('c', 100000); - $this->server->on('connection', function (Stream $peer) use ($data) { + $this->server->on('connection', function (ConnectionInterface $peer) use ($data) { $peer->write($data); }); $client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT); - /* @var $client Stream */ + /* @var $client ConnectionInterface */ // buffer incoming data for 0.1s (should be plenty of time) $received = ''; diff --git a/tests/TcpServerTest.php b/tests/TcpServerTest.php index 642cb5f3..9a23d64e 100644 --- a/tests/TcpServerTest.php +++ b/tests/TcpServerTest.php @@ -3,8 +3,8 @@ namespace React\Tests\Socket; use React\EventLoop\StreamSelectLoop; -use React\Stream\Stream; use React\Socket\TcpServer; +use React\Stream\DuplexResourceStream; class TcpServerTest extends TestCase { @@ -55,10 +55,6 @@ public function testConnectionWithManyClients() $this->loop->tick(); } - /** - * @covers React\EventLoop\StreamSelectLoop::tick - * @covers React\Socket\Connection::handleData - */ public function testDataEventWillNotBeEmittedWhenClientSendsNoData() { $client = stream_socket_client('tcp://localhost:'.$this->port); @@ -72,10 +68,6 @@ public function testDataEventWillNotBeEmittedWhenClientSendsNoData() $this->loop->tick(); } - /** - * @covers React\EventLoop\StreamSelectLoop::tick - * @covers React\Socket\Connection::handleData - */ public function testDataWillBeEmittedWithDataClientSends() { $client = stream_socket_client('tcp://localhost:'.$this->port); @@ -91,10 +83,6 @@ public function testDataWillBeEmittedWithDataClientSends() $this->loop->tick(); } - /** - * @covers React\EventLoop\StreamSelectLoop::tick - * @covers React\Socket\Connection::handleData - */ public function testDataWillBeEmittedEvenWhenClientShutsDownAfterSending() { $client = stream_socket_client('tcp://localhost:' . $this->port); @@ -110,22 +98,6 @@ public function testDataWillBeEmittedEvenWhenClientShutsDownAfterSending() $this->loop->tick(); } - public function testDataWillBeFragmentedToBufferSize() - { - $client = stream_socket_client('tcp://localhost:' . $this->port); - - fwrite($client, "Hello World!\n"); - - $mock = $this->expectCallableOnceWith("He"); - - $this->server->on('connection', function ($conn) use ($mock) { - $conn->bufferSize = 2; - $conn->on('data', $mock); - }); - $this->loop->tick(); - $this->loop->tick(); - } - public function testLoopWillEndWhenServerIsClosed() { // explicitly unset server because we already call close() @@ -167,7 +139,7 @@ public function testLoopWillEndWhenServerIsClosedAfterSingleConnection() public function testDataWillBeEmittedInMultipleChunksWhenClientSendsExcessiveAmounts() { $client = stream_socket_client('tcp://localhost:' . $this->port); - $stream = new Stream($client, $this->loop); + $stream = new DuplexResourceStream($client, $this->loop); $bytes = 1024 * 1024; $stream->end(str_repeat('*', $bytes)); diff --git a/tests/TestCase.php b/tests/TestCase.php index b1c850ec..3360e836 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -2,6 +2,11 @@ namespace React\Tests\Socket; +use React\Stream\ReadableStreamInterface; +use React\EventLoop\LoopInterface; +use Clue\React\Block; +use React\Promise\Promise; + class TestCase extends \PHPUnit_Framework_TestCase { protected function expectCallableExactly($amount) @@ -49,4 +54,30 @@ protected function createCallableMock() { return $this->getMock('React\Tests\Socket\Stub\CallableStub'); } + + protected function buffer(ReadableStreamInterface $stream, LoopInterface $loop, $timeout) + { + if (!$stream->isReadable()) { + return ''; + } + + return Block\await(new Promise( + function ($resolve, $reject) use ($stream) { + $buffer = ''; + $stream->on('data', function ($chunk) use (&$buffer) { + $buffer .= $chunk; + }); + + $stream->on('error', $reject); + + $stream->on('close', function () use (&$buffer, $resolve) { + $resolve($buffer); + }); + }, + function () use ($stream) { + $stream->close(); + throw new \RuntimeException(); + } + ), $loop, $timeout); + } }