diff --git a/src/Factory.php b/src/Factory.php index 9a481e35..36cd2d5c 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -13,6 +13,8 @@ public static function create() return new LibEvLoop; } elseif (class_exists('EventBase', false)) { return new ExtEventLoop; + } elseif (function_exists('uv_default_loop')) { + return new LibUvLoop(); } return new StreamSelectLoop(); diff --git a/src/LibUvLoop.php b/src/LibUvLoop.php new file mode 100644 index 00000000..df19aba2 --- /dev/null +++ b/src/LibUvLoop.php @@ -0,0 +1,334 @@ +loop; + } + + return NULL; + } + + public function __construct() + { + $this->loop = uv_loop_new(); + $this->timers = new SplObjectStorage(); + $this->nextTickQueue = new NextTickQueue($this); + $this->futureTickQueue = new FutureTickQueue($this); + } + + /** + * {@inheritdoc} + */ + public function addReadStream($stream, callable $listener) + { + $this->addStream($stream, $listener, \UV::READABLE); + } + + /** + * {@inheritdoc} + */ + public function addWriteStream($stream, callable $listener) + { + $this->addStream($stream, $listener, \UV::WRITABLE); + } + + /** + * {@inheritdoc} + */ + public function removeReadStream($stream) + { + if (!isset($this->events[(int) $stream])) { + return; + } + + uv_poll_stop($this->events[(int) $stream]); + unset($this->listeners[(int) $stream]['read']); + + if (!isset($this->listeners[(int) $stream]['read']) + && !isset($this->listeners[(int) $stream]['write'])) { + unset($this->events[(int) $stream]); + } + } + + /** + * {@inheritdoc} + */ + public function removeWriteStream($stream) + { + if (!isset($this->events[(int) $stream])) { + return; + } + + uv_poll_stop($this->events[(int) $stream]); + unset($this->listeners[(int) $stream]['write']); + + if (!isset($this->listeners[(int) $stream]['read']) + && !isset($this->listeners[(int) $stream]['write'])) { + unset($this->events[(int) $stream]); + } + } + + /** + * {@inheritdoc} + */ + public function removeStream($stream) + { + if (isset($this->events[(int) $stream])) { + + uv_poll_stop($this->events[(int) $stream]); + + unset($this->listeners[(int) $stream]['read']); + unset($this->listeners[(int) $stream]['write']); + unset($this->events[(int) $stream]); + } + } + + /** + * {@inheritdoc} + */ + public function addTimer($interval, callable $callback) + { + return $this->createTimer($interval, $callback, 0); + } + + /** + * {@inheritdoc} + */ + public function addPeriodicTimer($interval, callable $callback) + { + return $this->createTimer($interval, $callback, 1); + } + + /** + * {@inheritdoc} + */ + public function cancelTimer(TimerInterface $timer) + { + uv_timer_stop($this->timers[$timer]); + + $this->timers->detach($timer); + } + + /** + * {@inheritdoc} + */ + public function isTimerActive(TimerInterface $timer) + { + return $this->timers->contains($timer); + } + + /** + * {@inheritdoc} + */ + public function nextTick(callable $listener) + { + $this->nextTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function futureTick(callable $listener) + { + $this->futureTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function tick() + { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + $flags = \UV::RUN_ONCE; + if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) { + $flags = \UV::RUN_NOWAIT; + } elseif (empty($this->events) && !$this->timers->count() && !$this->tasks) { + $this->running = false; + return; + } + + uv_run($this->loop, $flags); + } + + /** + * Wrap a task related callback, this will keep track of pending tasks + * and keep the event loop running accordingly. + * + * @param callable $task the callback you want executed when task completes + * @return callable wrapper to task tracking + */ + public function taskCallback($task) + { + $callback = function() use ($task) { + $this->tasks--; + call_user_func_array($task, func_get_args()); + }; + $this->tasks++; + + return $callback; + } + + /** + * {@inheritdoc} + */ + public function run() + { + $this->running = true; + + while ($this->running) { + $this->tick(); + } + } + + /** + * {@inheritdoc} + */ + public function stop() + { + $this->running = false; + } + + /* PRIVATE */ + + private function addStream($stream, $listener, $flags) + { + $meta = stream_get_meta_data($stream); + if (get_resource_type($stream) == "Unknown" || !(strpos($meta['stream_type'], 'socket')) ) { + throw new \InvalidArgumentException("Stream must be a resource of type socket."); + + return false; + } + + $currentFlag = 0; + if (isset($this->listeners[(int) $stream]['read'])) { + $currentFlag |= \UV::READABLE; + } + + if (isset($this->listeners[(int) $stream]['write'])) { + $currentFlag |= \UV::WRITABLE; + } + + if (($flags & \UV::READABLE) === $flags) { + $this->listeners[(int) $stream]['read'] = $listener; + } elseif (($flags & \UV::WRITABLE) === $flags) { + $this->listeners[(int) $stream]['write'] = $listener; + } + + if (!isset($this->events[(int) $stream])) { + $event = uv_poll_init($this->loop, $stream); + $this->events[(int) $stream] = $event; + } else { + $event = $this->events[(int) $stream]; + } + + $listener = $this->createStreamListener(); + uv_poll_start($event, $currentFlag | $flags, $listener); + } + + /** + * Create a stream listener + * + * @return callable Returns a callback + */ + private function createStreamListener() + { + $loop = $this; + + $callback = function ($poll, $status, $event, $stream) use ($loop, &$callback) { + if ($status < 0) { + + if (isset($loop->listeners[(int) $stream]['read'])) { + call_user_func(array($this, 'removeReadStream'), $stream); + } + + if (isset($loop->writeListeners[(int) $stream]['write'])) { + call_user_func(array($this, 'removeWriteStream'), $stream); + } + + return; + } + + if (($event & \UV::READABLE) && isset($loop->listeners[(int) $stream]['read'])) { + call_user_func($loop->listeners[(int) $stream]['read'], $stream); + } + + if (($event & \UV::WRITABLE) && isset($loop->listeners[(int) $stream]['write'])) { + call_user_func($loop->listeners[(int) $stream]['write'], $stream); + } + }; + + return $callback; + } + + /** + * Add callback and configured a timer + * + * @param Int $interval The interval of the timer + * @param Callable $callback The callback to be executed + * @param int $periodic 0 = one-off, 1 = periodic + * @return Timer Returns a timer instance + */ + private function createTimer($interval, $callback, $periodic) + { + $timer = new Timer($this, $interval, $callback, $periodic); + $resource = uv_timer_init($this->loop); + + $timers = $this->timers; + $timers->attach($timer, $resource); + + $callback = $this->wrapTimerCallback($timer, $periodic); + uv_timer_start($resource, $interval * 1000, $interval * 1000, $callback); + + return $timer; + } + + /** + * Create a timer wrapper for periodic/one-off timers + * + * @param Timer $timer Timer object + * @param int $periodic 0 = one-off, 1 = periodic + * @return Callable wrapper + */ + private function wrapTimerCallback($timer, $periodic) + { + $callback = function () use ($timer, $periodic) { + + call_user_func($timer->getCallback(), $timer); + + if (!$periodic) { + $timer->cancel(); + } + }; + + return $callback; + } +} diff --git a/tests/LibUvLoopTest.php b/tests/LibUvLoopTest.php new file mode 100644 index 00000000..5d341edd --- /dev/null +++ b/tests/LibUvLoopTest.php @@ -0,0 +1,541 @@ +markTestSkipped('libuv tests skipped because ext-uv is not installed.'); + } + + if(!is_null($this->loop)) { + return $this->loop; + } + + return new LibUvLoop(); + } + + public function testLibEventConstructor() + { + $loop = new LibUvLoop(); + } + + public function createStream() + { + $sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + + stream_set_blocking($sockets[0], 0); + stream_set_blocking($sockets[1], 0); + + return $sockets; + } + + public function writeToStream($stream, $content) + { + fwrite($stream, $content); + } + + /** + * Make sure event loop throws exception, as libuv only supports + * network socket streams. + * @group socketonly + */ + public function testCanOnlyAddSocketStream() + { + $this->setExpectedException('InvalidArgumentException'); + + $fp = fopen("php://temp", "r+"); + $this->loop->addReadStream($fp, function(){}); + + } + + public function testAddReadStream() + { + $streams = $this->createStream(); + + $this->loop->addReadStream($streams[1], $this->expectCallableExactly(2)); + + $this->writeToStream($streams[0], "foo\n"); + $this->loop->tick(); + + $this->writeToStream($streams[0], "bar\n"); + $this->loop->tick(); + } + + public function testAddWriteStream() + { + $input = $this->createStream(); + + $this->loop->addWriteStream($input[0], $this->expectCallableExactly(2)); + $this->loop->tick(); + $this->loop->tick(); + } + + public function testRemoveReadStreamInstantly() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[1], $this->expectCallableNever()); + $this->loop->removeReadStream($input[1]); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + + //cleanup + $this->loop->removeStream($input[0]); + } + + public function testRemoveReadStreamAfterReading() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[1], $this->expectCallableOnce()); + + $this->writeToStream($input[0], "foo\n"); + $this->loop->tick(); + + $this->loop->removeReadStream($input[1]); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + + //cleanup + $this->loop->removeStream($input[0]); + } + + public function testRemoveWriteStreamInstantly() + { + $input = $this->createStream(); + + $this->loop->addWriteStream($input[0], $this->expectCallableNever()); + $this->loop->removeWriteStream($input[0]); + $this->loop->tick(); + } + + public function testRemoveWriteStreamAfterWriting() + { + $input = $this->createStream(); + + $this->loop->addWriteStream($input[0], $this->expectCallableOnce()); + $this->loop->tick(); + + $this->loop->removeWriteStream($input[0]); + $this->loop->tick(); + } + + public function testRemoveStreamInstantly() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[0], $this->expectCallableNever()); + $this->loop->addWriteStream($input[0], $this->expectCallableNever()); + $this->loop->removeStream($input[0]); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + } + + public function testRemoveStreamForReadOnly() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[1], $this->expectCallableNever()); + $this->loop->addWriteStream($input[0], $this->expectCallableOnce()); + $this->loop->removeReadStream($input[1]); + + $this->writeToStream($input[0], "foo\n"); + $this->loop->tick(); + + //cleanup + $this->loop->removeStream($input[0]); + } + + public function testRemoveStreamForWriteOnly() + { + $input = $this->createStream(); + + $this->writeToStream($input[0], "foo\n"); + + $this->loop->addReadStream($input[1], $this->expectCallableOnce()); + $this->loop->addWriteStream($input[0], $this->expectCallableNever()); + $this->loop->removeWriteStream($input[0]); + + $this->loop->tick(); + + //cleanup + $this->loop->removeStream($input[1]); + } + + public function testRemoveStream() + { + $input = $this->createStream(); + + $this->loop->addReadStream($input[1], $this->expectCallableOnce()); + $this->loop->addWriteStream($input[0], $this->expectCallableOnce()); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + + $this->loop->removeStream($input[0]); + $this->loop->removeStream($input[1]); + + $this->writeToStream($input[0], "bar\n"); + $this->loop->tick(); + } + + public function testRemoveInvalid() + { + $input = $this->createStream(); + + // remove a valid stream from the event loop that was never added in the first place + $this->loop->removeReadStream($input[0]); + $this->loop->removeWriteStream($input[0]); + $this->loop->removeStream($input[0]); + } + + /** @test */ + public function emptyRunShouldSimplyReturn() + { + $this->assertRunFasterThan(0.005); + } + + /** @test */ + public function runShouldReturnWhenNoMoreFds() + { + $input = $this->createStream(); + + $loop = $this->loop; + $this->loop->addReadStream($input[1], function ($stream) use ($input) { + $this->loop->removeStream($stream); + $this->loop->removeStream($input[0]); + }); + + $this->writeToStream($input[0], "foo\n"); + + $this->assertRunFasterThan(0.005); + } + + /** @test */ + public function stopShouldStopRunningLoop() + { + $input = $this->createStream(); + + $loop = $this->loop; + $this->loop->addReadStream($input[1], function ($stream) use ($loop) { + $loop->stop(); + }); + + $this->writeToStream($input[0], "foo\n"); + + $this->assertRunFasterThan(0.005); + } + + public function testStopShouldPreventRunFromBlocking() + { + $this->loop->addTimer( + 1, + function () { + $this->fail('Timer was executed.'); + } + ); + + $this->loop->nextTick( + function () { + $this->loop->stop(); + } + ); + + $this->assertRunFasterThan(0.005); + } + + public function testIgnoreRemovedCallback() + { + // two independent streams, both should be readable right away + $input = $this->createStream(); + // $stream2 = $this->createStream(); + + $loop = $this->loop; + $loop->addReadStream($input[1], function ($stream) use ($loop, $input) { + // stream1 is readable, remove stream2 as well => this will invalidate its callback + $loop->removeReadStream($stream); + $loop->removeReadStream($input[0]); + }); + + // this callback would have to be called as well, but the first stream already removed us + $loop->addReadStream($input[0], $this->expectCallableNever()); + + $this->writeToStream($input[0], "foo\n"); + $this->writeToStream($input[1], "foo\n"); + + $loop->run(); + } + + public function testNextTick() + { + $called = false; + + $callback = function ($loop) use (&$called) { + $this->assertSame($this->loop, $loop); + $called = true; + }; + + $this->loop->nextTick($callback); + + $this->assertFalse($called); + + $this->loop->tick(); + + $this->assertTrue($called); + } + + public function testNextTickFiresBeforeIO() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () { + echo 'stream' . PHP_EOL; + } + ); + + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL . 'stream' . PHP_EOL); + + $this->loop->tick(); + } + + public function testRecursiveNextTick() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () { + echo 'stream' . PHP_EOL; + } + ); + + $this->loop->nextTick( + function () { + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL . 'stream' . PHP_EOL); + + $this->loop->tick(); + } + + public function testRunWaitsForNextTickEvents() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () use ($stream) { + $this->loop->removeStream($stream[0]); + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL); + + $this->loop->run(); + + $this->writeToStream($stream[0], "foo\n"); + } + + public function testNextTickEventGeneratedByFutureTick() + { + $this->loop->futureTick( + function () { + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL); + + $this->loop->run(); + } + + public function testNextTickEventGeneratedByTimer() + { + $this->loop->addTimer( + 0.001, + function () { + $this->loop->nextTick( + function () { + echo 'next-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('next-tick' . PHP_EOL); + + $this->loop->run(); + } + + public function testFutureTick() + { + $called = false; + + $callback = function ($loop) use (&$called) { + $this->assertSame($this->loop, $loop); + $called = true; + }; + + $this->loop->futureTick($callback); + + $this->assertFalse($called); + + $this->loop->tick(); + + $this->assertTrue($called); + } + + public function testFutureTickFiresBeforeIO() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () { + echo 'stream' . PHP_EOL; + } + ); + + $this->loop->futureTick( + function () { + echo 'future-tick' . PHP_EOL; + } + ); + + $this->writeToStream($stream[0], "foo\n"); + $this->expectOutputString('future-tick' . PHP_EOL . 'stream' . PHP_EOL); + + $this->loop->tick(); + } + + public function testRecursiveFutureTick() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () use ($stream) { + echo 'stream' . PHP_EOL; + $this->loop->removeWriteStream($stream[0]); + } + ); + + $this->loop->futureTick( + function () { + echo 'future-tick-1' . PHP_EOL; + $this->loop->futureTick( + function () { + echo 'future-tick-2' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('future-tick-1' . PHP_EOL . 'stream' . PHP_EOL . 'future-tick-2' . PHP_EOL); + + $this->loop->run(); + + $this->writeToStream($stream[0], "foo\n"); + } + + public function testRunWaitsForFutureTickEvents() + { + $stream = $this->createStream(); + + $this->loop->addWriteStream( + $stream[0], + function () use ($stream) { + $this->loop->removeStream($stream[0]); + $this->loop->futureTick( + function () { + echo 'future-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('future-tick' . PHP_EOL); + + $this->loop->run(); + + $this->writeToStream($stream[0], "foo\n"); + } + + public function testFutureTickEventGeneratedByNextTick() + { + $this->loop->nextTick( + function () { + $this->loop->futureTick( + function () { + echo 'future-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('future-tick' . PHP_EOL); + + $this->loop->run(); + } + + public function testFutureTickEventGeneratedByTimer() + { + $this->loop->addTimer( + 0.001, + function () { + $this->loop->futureTick( + function () { + echo 'future-tick' . PHP_EOL; + } + ); + } + ); + + $this->expectOutputString('future-tick' . PHP_EOL); + + $this->loop->run(); + } + + private function assertRunFasterThan($maxInterval) + { + $start = microtime(true); + + $this->loop->run(); + + $end = microtime(true); + $interval = $end - $start; + + $this->assertLessThan($maxInterval, $interval); + } +} diff --git a/travis-init.sh b/travis-init.sh index 63deeb85..59166a22 100755 --- a/travis-init.sh +++ b/travis-init.sh @@ -31,6 +31,19 @@ if [[ "$TRAVIS_PHP_VERSION" != "hhvm" && popd echo "extension=libev.so" >> "$(php -r 'echo php_ini_loaded_file();')" + # install 'libuv' + git clone --recursive --branch v1.0.0-rc2 --depth 1 https://github.com/joyent/libuv + pushd libuv + ./autogen.sh && ./configure && make && sudo make install + popd + + #install 'php-uv' + git clone --recursive --branch libuv-1.0 --depth 1 https://github.com/steverhoades/php-uv + pushd php-uv + phpize && ./configure --with-uv --enable-httpparser && make && sudo make install + echo "extension=uv.so" >> "$(php -r 'echo php_ini_loaded_file();')" + popd + fi composer install --dev --prefer-source