@@ -16,6 +16,7 @@ class LibUvLoop implements LoopInterface
1616 private $ futureTickQueue ;
1717 private $ timerEvents ;
1818 private $ events = [];
19+ private $ flags = [];
1920 private $ listeners = [];
2021 private $ running ;
2122 private $ streamListener ;
@@ -65,10 +66,7 @@ public function removeReadStream($stream)
6566
6667 unset($ this ->listeners [(int ) $ stream ]['read ' ]);
6768
68- if (!isset ($ this ->listeners [(int ) $ stream ]['read ' ])
69- && !isset ($ this ->listeners [(int ) $ stream ]['write ' ])) {
70- unset($ this ->events [(int ) $ stream ]);
71- }
69+ $ this ->____removeStream ($ stream );
7270 }
7371
7472 /**
@@ -82,10 +80,7 @@ public function removeWriteStream($stream)
8280
8381 unset($ this ->listeners [(int ) $ stream ]['write ' ]);
8482
85- if (!isset ($ this ->listeners [(int ) $ stream ]['read ' ])
86- && !isset ($ this ->listeners [(int ) $ stream ]['write ' ])) {
87- unset($ this ->events [(int ) $ stream ]);
88- }
83+ $ this ->____removeStream ($ stream );
8984 }
9085
9186 /**
@@ -94,12 +89,10 @@ public function removeWriteStream($stream)
9489 public function removeStream ($ stream )
9590 {
9691 if (isset ($ this ->events [(int ) $ stream ])) {
97-
98- \uv_poll_stop ($ this ->events [(int ) $ stream ]);
99-
10092 unset($ this ->listeners [(int ) $ stream ]['read ' ]);
10193 unset($ this ->listeners [(int ) $ stream ]['write ' ]);
102- unset($ this ->events [(int ) $ stream ]);
94+
95+ $ this ->____removeStream ($ stream );
10396 }
10497 }
10598
@@ -207,6 +200,39 @@ public function stop()
207200 }
208201
209202 private function addStream ($ stream )
203+ {
204+ // Run in tick or else things epically fail with loop->watchers[w->fd] == w
205+ $ this ->futureTick (function () use ($ stream ) {
206+ if (!isset ($ this ->events [(int ) $ stream ])) {
207+ $ this ->events [(int ) $ stream ] = \uv_poll_init_socket ($ this ->uv , $ stream );
208+ }
209+
210+ $ this ->pollStream ($ stream );
211+ });
212+ }
213+
214+ // To do: get latest changes in from react:master so we can use this method name internally
215+ private function ____removeStream ($ stream )
216+ {
217+ // Run in tick or else things epically fail with loop->watchers[w->fd] == w
218+ $ this ->futureTick (function () use ($ stream ) {
219+ if (!isset ($ this ->events [(int ) $ stream ])) {
220+ return ;
221+ }
222+
223+ if (!isset ($ this ->listeners [(int ) $ stream ]['read ' ])
224+ && !isset ($ this ->listeners [(int ) $ stream ]['write ' ])) {
225+ \uv_poll_stop ($ this ->events [(int ) $ stream ]);
226+ unset($ this ->events [(int ) $ stream ]);
227+ unset($ this ->flags [(int ) $ stream ]);
228+ return ;
229+ }
230+
231+ $ this ->pollStream ($ stream );
232+ });
233+ }
234+
235+ private function pollStream ($ stream )
210236 {
211237 $ flags = 0 ;
212238 if (isset ($ this ->listeners [(int ) $ stream ]['read ' ])) {
@@ -217,14 +243,13 @@ private function addStream($stream)
217243 $ flags |= \UV ::WRITABLE ;
218244 }
219245
220- if (!isset ($ this ->events [(int ) $ stream ])) {
221- $ event = \uv_poll_init_socket ($ this ->uv , $ stream );
222- $ this ->events [(int ) $ stream ] = $ event ;
223- } else {
224- $ event = $ this ->events [(int ) $ stream ];
246+ if (isset ($ this ->flags [(int ) $ stream ]) && $ this ->flags [(int ) $ stream ] == $ flags ) {
247+ return ;
225248 }
226249
227- \uv_poll_start ($ event , $ flags , $ this ->streamListener );
250+ $ this ->flags [(int ) $ stream ] = $ flags ;
251+
252+ \uv_poll_start ($ this ->events [(int ) $ stream ], $ flags , $ this ->streamListener );
228253 }
229254
230255 /**
@@ -236,24 +261,15 @@ private function createStreamListener()
236261 {
237262 $ callback = function ($ event , $ status , $ events , $ stream ) {
238263 if ($ status !== 0 ) {
239-
240- $ flags = 0 ;
241- if (isset ($ this ->listeners [(int ) $ stream ]['read ' ])) {
242- $ flags |= \UV ::READABLE ;
243- }
244-
245- if (isset ($ this ->listeners [(int ) $ stream ]['write ' ])) {
246- $ flags |= \UV ::WRITABLE ;
247- }
248-
249- \uv_poll_start ($ event , $ flags , $ this ->streamListener );
264+ unset($ this ->flags [(int ) $ stream ]);
265+ $ this ->pollStream ($ stream );
250266 }
251267
252- if (isset ($ this ->listeners [(int ) $ stream ]['read ' ])) {
268+ if (isset ($ this ->listeners [(int ) $ stream ]['read ' ]) && $ events & \ UV :: READABLE ) {
253269 call_user_func ($ this ->listeners [(int ) $ stream ]['read ' ], $ stream );
254270 }
255271
256- if (isset ($ this ->listeners [(int ) $ stream ]['write ' ])) {
272+ if (isset ($ this ->listeners [(int ) $ stream ]['write ' ]) && $ events & \ UV :: WRITABLE ) {
257273 call_user_func ($ this ->listeners [(int ) $ stream ]['write ' ], $ stream );
258274 }
259275 };
0 commit comments