diff --git a/src/ExtEventLoop.php b/src/ExtEventLoop.php index b1215de8..a15f5315 100644 --- a/src/ExtEventLoop.php +++ b/src/ExtEventLoop.php @@ -27,11 +27,12 @@ final class ExtEventLoop implements LoopInterface private $timerCallback; private $timerEvents; private $streamCallback; - private $streamEvents = []; - private $streamFlags = []; - private $streamRefs = []; + private $readEvents = []; + private $writeEvents = []; private $readListeners = []; private $writeListeners = []; + private $readRefs = []; + private $writeRefs = []; private $running; private $signals; private $signalEvents = []; @@ -70,56 +71,65 @@ function ($signal) { public function addReadStream($stream, callable $listener) { $key = (int) $stream; + if (isset($this->readListeners[$key])) { + return; + } - if (!isset($this->readListeners[$key])) { - $this->readListeners[$key] = $listener; - $this->subscribeStreamEvent($stream, Event::READ); + $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::READ, $this->streamCallback); + $event->add(); + $this->readEvents[$key] = $event; + $this->readListeners[$key] = $listener; + + // ext-event does not increase refcount on stream resources for PHP 7+ + // manually keep track of stream resource to prevent premature garbage collection + if (PHP_VERSION_ID >= 70000) { + $this->readRefs[$key] = $stream; } } public function addWriteStream($stream, callable $listener) { $key = (int) $stream; - - if (!isset($this->writeListeners[$key])) { - $this->writeListeners[$key] = $listener; - $this->subscribeStreamEvent($stream, Event::WRITE); + if (isset($this->writeListeners[$key])) { + return; } - } - public function removeReadStream($stream) - { - $key = (int) $stream; + $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::WRITE, $this->streamCallback); + $event->add(); + $this->writeEvents[$key] = $event; + $this->writeListeners[$key] = $listener; - if (isset($this->readListeners[$key])) { - unset($this->readListeners[$key]); - $this->unsubscribeStreamEvent($stream, Event::READ); + // ext-event does not increase refcount on stream resources for PHP 7+ + // manually keep track of stream resource to prevent premature garbage collection + if (PHP_VERSION_ID >= 70000) { + $this->writeRefs[$key] = $stream; } } - public function removeWriteStream($stream) + public function removeReadStream($stream) { $key = (int) $stream; - if (isset($this->writeListeners[$key])) { - unset($this->writeListeners[$key]); - $this->unsubscribeStreamEvent($stream, Event::WRITE); + if (isset($this->readEvents[$key])) { + $this->readEvents[$key]->free(); + unset( + $this->readEvents[$key], + $this->readListeners[$key], + $this->readRefs[$key] + ); } } - private function removeStream($stream) + public function removeWriteStream($stream) { $key = (int) $stream; - if (isset($this->streamEvents[$key])) { - $this->streamEvents[$key]->free(); - + if (isset($this->writeEvents[$key])) { + $this->writeEvents[$key]->free(); unset( - $this->streamFlags[$key], - $this->streamEvents[$key], - $this->readListeners[$key], + $this->writeEvents[$key], $this->writeListeners[$key], - $this->streamRefs[$key] + $this->writeRefs[$key] ); } } @@ -175,7 +185,7 @@ public function run() $flags = EventBase::LOOP_ONCE; if (!$this->running || !$this->futureTickQueue->isEmpty()) { $flags |= EventBase::LOOP_NONBLOCK; - } elseif (!$this->streamEvents && !$this->timerEvents->count()) { + } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count()) { break; } @@ -207,64 +217,6 @@ private function scheduleTimer(TimerInterface $timer) $event->add($timer->getInterval()); } - /** - * Create a new ext-event Event object, or update the existing one. - * - * @param resource $stream - * @param integer $flag Event::READ or Event::WRITE - */ - private function subscribeStreamEvent($stream, $flag) - { - $key = (int) $stream; - - if (isset($this->streamEvents[$key])) { - $event = $this->streamEvents[$key]; - $flags = ($this->streamFlags[$key] |= $flag); - - $event->del(); - $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback); - } else { - $event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback); - - $this->streamEvents[$key] = $event; - $this->streamFlags[$key] = $flag; - - // ext-event does not increase refcount on stream resources for PHP 7+ - // manually keep track of stream resource to prevent premature garbage collection - if (PHP_VERSION_ID >= 70000) { - $this->streamRefs[$key] = $stream; - } - } - - $event->add(); - } - - /** - * Update the ext-event Event object for this stream to stop listening to - * the given event type, or remove it entirely if it's no longer needed. - * - * @param resource $stream - * @param integer $flag Event::READ or Event::WRITE - */ - private function unsubscribeStreamEvent($stream, $flag) - { - $key = (int) $stream; - - $flags = $this->streamFlags[$key] &= ~$flag; - - if (0 === $flags) { - $this->removeStream($stream); - - return; - } - - $event = $this->streamEvents[$key]; - - $event->del(); - $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback); - $event->add(); - } - /** * Create a callback used as the target of timer events. * diff --git a/src/ExtLibeventLoop.php b/src/ExtLibeventLoop.php index 30750afb..6f6379ff 100644 --- a/src/ExtLibeventLoop.php +++ b/src/ExtLibeventLoop.php @@ -43,8 +43,8 @@ final class ExtLibeventLoop implements LoopInterface private $timerCallback; private $timerEvents; private $streamCallback; - private $streamEvents = []; - private $streamFlags = []; + private $readEvents = []; + private $writeEvents = []; private $readListeners = []; private $writeListeners = []; private $running; @@ -88,21 +88,33 @@ function ($signal) { public function addReadStream($stream, callable $listener) { $key = (int) $stream; - - if (!isset($this->readListeners[$key])) { - $this->readListeners[$key] = $listener; - $this->subscribeStreamEvent($stream, EV_READ); + if (isset($this->readListeners[$key])) { + return; } + + $event = event_new(); + event_set($event, $stream, EV_PERSIST | EV_READ, $this->streamCallback); + event_base_set($event, $this->eventBase); + event_add($event); + + $this->readEvents[$key] = $event; + $this->readListeners[$key] = $listener; } public function addWriteStream($stream, callable $listener) { $key = (int) $stream; - - if (!isset($this->writeListeners[$key])) { - $this->writeListeners[$key] = $listener; - $this->subscribeStreamEvent($stream, EV_WRITE); + if (isset($this->writeListeners[$key])) { + return; } + + $event = event_new(); + event_set($event, $stream, EV_PERSIST | EV_WRITE, $this->streamCallback); + event_base_set($event, $this->eventBase); + event_add($event); + + $this->writeEvents[$key] = $event; + $this->writeListeners[$key] = $listener; } public function removeReadStream($stream) @@ -110,8 +122,14 @@ public function removeReadStream($stream) $key = (int) $stream; if (isset($this->readListeners[$key])) { - unset($this->readListeners[$key]); - $this->unsubscribeStreamEvent($stream, EV_READ); + $event = $this->readEvents[$key]; + event_del($event); + event_free($event); + + unset( + $this->readEvents[$key], + $this->readListeners[$key] + ); } } @@ -120,25 +138,12 @@ public function removeWriteStream($stream) $key = (int) $stream; if (isset($this->writeListeners[$key])) { - unset($this->writeListeners[$key]); - $this->unsubscribeStreamEvent($stream, EV_WRITE); - } - } - - private function removeStream($stream) - { - $key = (int) $stream; - - if (isset($this->streamEvents[$key])) { - $event = $this->streamEvents[$key]; - + $event = $this->writeEvents[$key]; event_del($event); event_free($event); unset( - $this->streamFlags[$key], - $this->streamEvents[$key], - $this->readListeners[$key], + $this->writeEvents[$key], $this->writeListeners[$key] ); } @@ -166,7 +171,6 @@ public function cancelTimer(TimerInterface $timer) { if ($this->timerEvents->contains($timer)) { $event = $this->timerEvents[$timer]; - event_del($event); event_free($event); @@ -199,7 +203,7 @@ public function run() $flags = EVLOOP_ONCE; if (!$this->running || !$this->futureTickQueue->isEmpty()) { $flags |= EVLOOP_NONBLOCK; - } elseif (!$this->streamEvents && !$this->timerEvents->count()) { + } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count()) { break; } @@ -226,61 +230,6 @@ private function scheduleTimer(TimerInterface $timer) event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND); } - /** - * Create a new ext-libevent event resource, or update the existing one. - * - * @param resource $stream - * @param integer $flag EV_READ or EV_WRITE - */ - private function subscribeStreamEvent($stream, $flag) - { - $key = (int) $stream; - - if (isset($this->streamEvents[$key])) { - $event = $this->streamEvents[$key]; - $flags = $this->streamFlags[$key] |= $flag; - - event_del($event); - event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback); - } else { - $event = event_new(); - - event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback); - event_base_set($event, $this->eventBase); - - $this->streamEvents[$key] = $event; - $this->streamFlags[$key] = $flag; - } - - event_add($event); - } - - /** - * Update the ext-libevent event resource for this stream to stop listening to - * the given event type, or remove it entirely if it's no longer needed. - * - * @param resource $stream - * @param integer $flag EV_READ or EV_WRITE - */ - private function unsubscribeStreamEvent($stream, $flag) - { - $key = (int) $stream; - - $flags = $this->streamFlags[$key] &= ~$flag; - - if (0 === $flags) { - $this->removeStream($stream); - - return; - } - - $event = $this->streamEvents[$key]; - - event_del($event); - event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback); - event_add($event); - } - /** * Create a callback used as the target of timer events. * diff --git a/tests/AbstractLoopTest.php b/tests/AbstractLoopTest.php index 7ef881a3..f07f6877 100644 --- a/tests/AbstractLoopTest.php +++ b/tests/AbstractLoopTest.php @@ -184,6 +184,77 @@ public function testRemoveStreamForWriteOnly() $this->tickLoop($this->loop); } + public function testRemoveReadAndWriteStreamFromLoopOnceResourceClosesEndsLoop() + { + list($stream, $other) = $this->createSocketPair(); + stream_set_blocking($stream, false); + stream_set_blocking($other, false); + + // dummy writable handler + $this->loop->addWriteStream($stream, function () { }); + + // remove stream when the stream is readable (closes) + $this->loop->addReadStream($stream, function ($stream) { + $this->loop->removeReadStream($stream); + $this->loop->removeWriteStream($stream); + fclose($stream); + }); + + // close other side + fclose($other); + + $this->assertRunFasterThan($this->tickTimeout); + } + + public function testRemoveReadAndWriteStreamFromLoopOnceResourceClosesOnEndOfFileEndsLoop() + { + list($stream, $other) = $this->createSocketPair(); + stream_set_blocking($stream, false); + stream_set_blocking($other, false); + + // dummy writable handler + $this->loop->addWriteStream($stream, function () { }); + + // remove stream when the stream is readable (closes) + $this->loop->addReadStream($stream, function ($stream) { + $data = fread($stream, 1024); + if ($data !== '') { + return; + } + + $this->loop->removeReadStream($stream); + $this->loop->removeWriteStream($stream); + fclose($stream); + }); + + // send data and close stream + fwrite($other, str_repeat('.', 60000)); + $this->loop->addTimer(0.01, function () use ($other) { + fclose($other); + }); + + $this->assertRunFasterThan(0.1); + } + + public function testRemoveReadAndWriteStreamFromLoopWithClosingResourceEndsLoop() + { + // get only one part of the pair to ensure the other side will close immediately + list($stream) = $this->createSocketPair(); + stream_set_blocking($stream, false); + + // dummy writable handler + $this->loop->addWriteStream($stream, function () { }); + + // remove stream when the stream is readable (closes) + $this->loop->addReadStream($stream, function ($stream) { + $this->loop->removeReadStream($stream); + $this->loop->removeWriteStream($stream); + fclose($stream); + }); + + $this->assertRunFasterThan($this->tickTimeout); + } + public function testRemoveInvalid() { list ($stream) = $this->createSocketPair();