diff --git a/README.md b/README.md index 34bd1d4..8a6e998 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,31 @@ $secureConnector = new React\SocketClient\SecureConnector($dnsConnector, $loop, )); ``` +### Connection timeouts + +The `TimeoutConnector` class decorates any given `Connector` instance. +It provides the same `create()` method, but will automatically reject the +underlying connection attempt if it takes too long. + +```php +$timeoutConnector = new React\SocketClient\TimeoutConnector($connector, 3.0, $loop); + +$timeoutConnector->create('google.com', 80)->then(function (React\Stream\Stream $stream) { + // connection succeeded within 3.0 seconds +}); +``` + +Pending connection attempts can be cancelled by cancelling its pending promise like so: + +```php +$promise = $timeoutConnector->create($host, $port); + +$promise->cancel(); +``` + +Calling `cancel()` on a pending promise will cancel the underlying connection +attempt, abort the timer and reject the resulting promise. + ### Unix domain sockets Similarly, the `UnixConnector` class can be used to connect to Unix domain socket (UDS) diff --git a/composer.json b/composer.json index ff58dc5..4ab2c9c 100644 --- a/composer.json +++ b/composer.json @@ -8,7 +8,8 @@ "react/dns": "0.4.*|0.3.*", "react/event-loop": "0.4.*|0.3.*", "react/stream": "0.4.*|0.3.*", - "react/promise": "~2.0|~1.1" + "react/promise": "~2.0|~1.1", + "react/promise-timer": "~1.0" }, "autoload": { "psr-4": { diff --git a/src/TimeoutConnector.php b/src/TimeoutConnector.php new file mode 100644 index 0000000..c4cfd5e --- /dev/null +++ b/src/TimeoutConnector.php @@ -0,0 +1,50 @@ +connector = $connector; + $this->timeout = $timeout; + $this->loop = $loop; + } + + public function create($host, $port) + { + $promise = $this->connector->create($host, $port); + + return Timer\timeout(new Promise( + function ($resolve, $reject) use ($promise) { + // resolve/reject with result of TCP/IP connection + $promise->then($resolve, $reject); + }, + function ($_, $reject) use ($promise) { + // cancellation should reject connection attempt + $reject(new \RuntimeException('Connection attempt cancelled during connection')); + + // forefully close TCP/IP connection if it completes despite cancellation + $promise->then(function (Stream $stream) { + $stream->close(); + }); + + // (try to) cancel pending TCP/IP connection + if ($promise instanceof CancellablePromiseInterface) { + $promise->cancel(); + } + } + ), $this->timeout, $this->loop); + } +} diff --git a/tests/TimeoutConnectorTest.php b/tests/TimeoutConnectorTest.php new file mode 100644 index 0000000..2cb0969 --- /dev/null +++ b/tests/TimeoutConnectorTest.php @@ -0,0 +1,125 @@ +getMock('React\SocketClient\ConnectorInterface'); + $connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise)); + + $loop = Factory::create(); + + $timeout = new TimeoutConnector($connector, 0.01, $loop); + + $timeout->create('google.com', 80)->then( + $this->expectCallableNever(), + $this->expectCallableOnce() + ); + + $loop->run(); + } + + public function testRejectsWhenConnectorRejects() + { + $promise = Promise\reject(new \RuntimeException()); + + $connector = $this->getMock('React\SocketClient\ConnectorInterface'); + $connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise)); + + $loop = Factory::create(); + + $timeout = new TimeoutConnector($connector, 5.0, $loop); + + $timeout->create('google.com', 80)->then( + $this->expectCallableNever(), + $this->expectCallableOnce() + ); + + $loop->run(); + } + + public function testResolvesWhenConnectorResolves() + { + $promise = Promise\resolve(); + + $connector = $this->getMock('React\SocketClient\ConnectorInterface'); + $connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise)); + + $loop = Factory::create(); + + $timeout = new TimeoutConnector($connector, 5.0, $loop); + + $timeout->create('google.com', 80)->then( + $this->expectCallableOnce(), + $this->expectCallableNever() + ); + + $loop->run(); + } + + public function testRejectsAndCancelsPendingPromiseOnTimeout() + { + $promise = new Promise\Promise(function () { }, $this->expectCallableOnce()); + + $connector = $this->getMock('React\SocketClient\ConnectorInterface'); + $connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise)); + + $loop = Factory::create(); + + $timeout = new TimeoutConnector($connector, 0.01, $loop); + + $timeout->create('google.com', 80)->then( + $this->expectCallableNever(), + $this->expectCallableOnce() + ); + + $loop->run(); + } + + public function testCancelsPendingPromiseOnCancel() + { + $promise = new Promise\Promise(function () { }, $this->expectCallableOnce()); + + $connector = $this->getMock('React\SocketClient\ConnectorInterface'); + $connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise)); + + $loop = Factory::create(); + + $timeout = new TimeoutConnector($connector, 0.01, $loop); + + $out = $timeout->create('google.com', 80); + $out->cancel(); + + $out->then($this->expectCallableNever(), $this->expectCallableOnce()); + } + + public function testCancelClosesStreamIfTcpResolvesDespiteCancellation() + { + $stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $stream->expects($this->once())->method('close'); + + $promise = new Promise\Promise(function () { }, function ($resolve) use ($stream) { + $resolve($stream); + }); + + $connector = $this->getMock('React\SocketClient\ConnectorInterface'); + $connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise)); + + $loop = Factory::create(); + + $timeout = new TimeoutConnector($connector, 0.01, $loop); + + $out = $timeout->create('google.com', 80); + $out->cancel(); + + $out->then($this->expectCallableNever(), $this->expectCallableOnce()); + } +}