1+ <?php
2+
3+ namespace React \Socket ;
4+
5+ use React \Dns \Model \Message ;
6+ use React \Dns \Resolver \ResolverInterface ;
7+ use React \EventLoop \LoopInterface ;
8+ use React \EventLoop \TimerInterface ;
9+ use React \Promise ;
10+ use React \Promise \CancellablePromiseInterface ;
11+
12+ /**
13+ * @internal
14+ */
15+ final class HappyEyeBallsConnectionBuilder
16+ {
17+ const CONNECT_INTERVAL = 0.1 ;
18+ const RESOLVE_WAIT = 0.5 ;
19+
20+ public $ loop ;
21+ public $ connector ;
22+ public $ resolver ;
23+ public $ uri ;
24+ public $ host ;
25+ public $ resolved = array (
26+ Message::TYPE_A => false ,
27+ Message::TYPE_AAAA => false ,
28+ );
29+ public $ resolverPromises = array ();
30+ public $ connectionPromises = array ();
31+ public $ connectQueue = array ();
32+ public $ timer ;
33+ public $ parts ;
34+ public $ ipsCount = 0 ;
35+ public $ failureCount = 0 ;
36+ public $ resolve ;
37+ public $ reject ;
38+
39+ public function __construct (LoopInterface $ loop , ConnectorInterface $ connector , ResolverInterface $ resolver , $ uri , $ host , $ parts )
40+ {
41+ $ this ->loop = $ loop ;
42+ $ this ->connector = $ connector ;
43+ $ this ->resolver = $ resolver ;
44+ $ this ->uri = $ uri ;
45+ $ this ->host = $ host ;
46+ $ this ->parts = $ parts ;
47+ }
48+
49+ public function connect ()
50+ {
51+ $ that = $ this ;
52+ return new Promise \Promise (function ($ resolve , $ reject ) use ($ that ) {
53+ $ lookupResolve = function ($ type ) use ($ that , $ resolve , $ reject ) {
54+ return function (array $ ips ) use ($ that , $ type , $ resolve , $ reject ) {
55+ unset($ that ->resolverPromises [$ type ]);
56+ $ that ->resolved [$ type ] = true ;
57+
58+ $ that ->mixIpsIntoConnectQueue ($ ips );
59+
60+ if ($ that ->timer instanceof TimerInterface) {
61+ return ;
62+ }
63+
64+ $ that ->check ($ resolve , $ reject );
65+ };
66+ };
67+
68+ $ ipv4Deferred = null ;
69+ $ that ->resolverPromises [Message::TYPE_AAAA ] = $ that ->resolve (Message::TYPE_AAAA , $ reject )->then ($ lookupResolve (Message::TYPE_AAAA ))->then (function () use (&$ ipv4Deferred ) {
70+ if ($ ipv4Deferred instanceof Promise \Deferred) {
71+ $ ipv4Deferred ->resolve ();
72+ }
73+ });
74+ $ that ->resolverPromises [Message::TYPE_A ] = $ that ->resolve (Message::TYPE_A , $ reject )->then (function ($ ips ) use ($ that , &$ ipv4Deferred ) {
75+ if ($ that ->resolved [Message::TYPE_AAAA ] === true ) {
76+ return Promise \resolve ($ ips );
77+ }
78+
79+ /**
80+ * Delay A lookup by 50ms sending out connection to IPv4 addresses when IPv6 records haven't
81+ * resolved yet as per RFC.
82+ *
83+ * @link https://tools.ietf.org/html/rfc8305#section-3
84+ */
85+ $ ipv4Deferred = new Promise \Deferred ();
86+ $ deferred = new Promise \Deferred ();
87+
88+ $ timer = $ that ->loop ->addTimer ($ that ::RESOLVE_WAIT , function () use ($ deferred , $ ips ) {
89+ $ ipv4Deferred = null ;
90+ $ deferred ->resolve ($ ips );
91+ });
92+
93+ $ ipv4Deferred ->promise ()->then (function () use ($ that , $ timer , $ deferred , $ ips ) {
94+ $ that ->loop ->cancelTimer ($ timer );
95+ $ deferred ->resolve ($ ips );
96+ });
97+
98+ return $ deferred ->promise ();
99+ })->then ($ lookupResolve (Message::TYPE_A ));
100+ }, function ($ _ , $ reject ) use ($ that ) {
101+ $ that ->cleanUp ();
102+
103+ $ reject (new \RuntimeException ('Connection to ' . $ that ->uri . ' cancelled during DNS lookup ' ));
104+
105+ $ _ = $ reject = null ;
106+ });
107+ }
108+
109+ /**
110+ * @internal
111+ */
112+ public function resolve ($ type , $ reject )
113+ {
114+ $ that = $ this ;
115+ return $ that ->resolver ->resolveAll ($ that ->host , $ type )->then (null , function () use ($ type , $ reject , $ that ) {
116+ unset($ that ->resolverPromises [$ type ]);
117+ $ that ->resolved [$ type ] = true ;
118+
119+ if ($ that ->hasBeenResolved () === false ) {
120+ return ;
121+ }
122+
123+ if ($ that ->ipsCount === 0 ) {
124+ $ that ->resolved = null ;
125+ $ that ->resolverPromises = null ;
126+ $ reject (new \RuntimeException ('Connection to ' . $ that ->uri . ' failed during DNS lookup: DNS error ' ));
127+ }
128+ });
129+ }
130+
131+ /**
132+ * @internal
133+ */
134+ public function check ($ resolve , $ reject )
135+ {
136+ $ ip = \array_shift ($ this ->connectQueue );
137+
138+ if (\count ($ this ->connectQueue ) === 0 && $ this ->resolved [Message::TYPE_A ] === true && $ this ->resolved [Message::TYPE_AAAA ] === true && $ this ->timer instanceof TimerInterface) {
139+ $ this ->loop ->cancelTimer ($ this ->timer );
140+ $ this ->timer = null ;
141+ }
142+
143+ if ($ ip === null ) {
144+ return ;
145+ }
146+
147+ $ that = $ this ;
148+ $ that ->connectionPromises [$ ip ] = $ this ->attemptConnection ($ ip )->then (function ($ connection ) use ($ that , $ ip , $ resolve ) {
149+ unset($ that ->connectionPromises [$ ip ]);
150+
151+ $ that ->cleanUp ();
152+
153+ $ resolve ($ connection );
154+ }, function () use ($ that , $ ip , $ resolve , $ reject ) {
155+ unset($ that ->connectionPromises [$ ip ]);
156+
157+ $ that ->failureCount ++;
158+
159+ if ($ that ->hasBeenResolved () === false ) {
160+ return ;
161+ }
162+
163+ if ($ that ->ipsCount === $ that ->failureCount ) {
164+ $ that ->cleanUp ();
165+
166+ $ reject (new \RuntimeException ('All attempts to connect to " ' . $ that ->host . '" have failed ' ));
167+ }
168+ });
169+
170+ /**
171+ * As long as we haven't connected yet keep popping an IP address of the connect queue until one of them
172+ * succeeds or they all fail. We will wait 100ms between connection attempts as per RFC.
173+ *
174+ * @link https://tools.ietf.org/html/rfc8305#section-5
175+ */
176+ if ((\count ($ this ->connectQueue ) > 0 || ($ this ->resolved [Message::TYPE_A ] === false || $ this ->resolved [Message::TYPE_AAAA ] === false )) && $ this ->timer === null ) {
177+ $ this ->timer = $ this ->loop ->addPeriodicTimer (self ::CONNECT_INTERVAL , function () use ($ that , $ resolve , $ reject ) {
178+ $ that ->check ($ resolve , $ reject );
179+ });
180+ }
181+ }
182+
183+ /**
184+ * @internal
185+ */
186+ public function attemptConnection ($ ip )
187+ {
188+ $ promise = null ;
189+ $ that = $ this ;
190+
191+ return new Promise \Promise (
192+ function ($ resolve , $ reject ) use (&$ promise , $ that , $ ip ) {
193+ $ uri = '' ;
194+
195+ // prepend original scheme if known
196+ if (isset ($ that ->parts ['scheme ' ])) {
197+ $ uri .= $ that ->parts ['scheme ' ] . ':// ' ;
198+ }
199+
200+ if (\strpos ($ ip , ': ' ) !== false ) {
201+ // enclose IPv6 addresses in square brackets before appending port
202+ $ uri .= '[ ' . $ ip . '] ' ;
203+ } else {
204+ $ uri .= $ ip ;
205+ }
206+
207+ // append original port if known
208+ if (isset ($ that ->parts ['port ' ])) {
209+ $ uri .= ': ' . $ that ->parts ['port ' ];
210+ }
211+
212+ // append orignal path if known
213+ if (isset ($ that ->parts ['path ' ])) {
214+ $ uri .= $ that ->parts ['path ' ];
215+ }
216+
217+ // append original query if known
218+ if (isset ($ that ->parts ['query ' ])) {
219+ $ uri .= '? ' . $ that ->parts ['query ' ];
220+ }
221+
222+ // append original hostname as query if resolved via DNS and if
223+ // destination URI does not contain "hostname" query param already
224+ $ args = array ();
225+ \parse_str (isset ($ that ->parts ['query ' ]) ? $ that ->parts ['query ' ] : '' , $ args );
226+ if ($ that ->host !== $ ip && !isset ($ args ['hostname ' ])) {
227+ $ uri .= (isset ($ that ->parts ['query ' ]) ? '& ' : '? ' ) . 'hostname= ' . \rawurlencode ($ that ->host );
228+ }
229+
230+ // append original fragment if known
231+ if (isset ($ that ->parts ['fragment ' ])) {
232+ $ uri .= '# ' . $ that ->parts ['fragment ' ];
233+ }
234+
235+ $ promise = $ that ->connector ->connect ($ uri );
236+ $ promise ->then ($ resolve , $ reject );
237+ },
238+ function ($ _ , $ reject ) use (&$ promise , $ that ) {
239+ // cancellation should reject connection attempt
240+ // (try to) cancel pending connection attempt
241+ $ reject (new \RuntimeException ('Connection to ' . $ that ->uri . ' cancelled during connection attempt ' ));
242+
243+ if ($ promise instanceof CancellablePromiseInterface) {
244+ // overwrite callback arguments for PHP7+ only, so they do not show
245+ // up in the Exception trace and do not cause a possible cyclic reference.
246+ $ _ = $ reject = null ;
247+
248+ $ promise ->cancel ();
249+ $ promise = null ;
250+ }
251+ }
252+ );
253+ }
254+
255+ /**
256+ * @internal
257+ */
258+ public function cleanUp ()
259+ {
260+ /** @var CancellablePromiseInterface $promise */
261+ foreach ($ this ->connectionPromises as $ index => $ connectionPromise ) {
262+ if ($ connectionPromise instanceof CancellablePromiseInterface) {
263+ $ connectionPromise ->cancel ();
264+ }
265+ }
266+
267+ /** @var CancellablePromiseInterface $promise */
268+ foreach ($ this ->resolverPromises as $ index => $ resolverPromise ) {
269+ if ($ resolverPromise instanceof CancellablePromiseInterface) {
270+ $ resolverPromise ->cancel ();
271+ }
272+ }
273+
274+ if ($ this ->timer instanceof TimerInterface) {
275+ $ this ->loop ->cancelTimer ($ this ->timer );
276+ $ this ->timer = null ;
277+ }
278+ }
279+
280+ /**
281+ * @internal
282+ */
283+ public function hasBeenResolved ()
284+ {
285+ foreach ($ this ->resolved as $ typeHasBeenResolved ) {
286+ if ($ typeHasBeenResolved === false ) {
287+ return false ;
288+ }
289+ }
290+
291+ return true ;
292+ }
293+
294+ /**
295+ * Mixes an array of IP addresses into the connect queue in such a way they alternate when attempting to connect.
296+ * The goal behind it is first attempt to connect to IPv6, then to IPv4, then to IPv6 again until one of those
297+ * attempts succeeds.
298+ *
299+ * @link https://tools.ietf.org/html/rfc8305#section-4
300+ *
301+ * @internal
302+ */
303+ public function mixIpsIntoConnectQueue (array $ ips )
304+ {
305+ $ this ->ipsCount += \count ($ ips );
306+ $ connectQueueStash = $ this ->connectQueue ;
307+ $ this ->connectQueue = array ();
308+ while (\count ($ connectQueueStash ) > 0 || \count ($ ips ) > 0 ) {
309+ if (\count ($ ips ) > 0 ) {
310+ $ this ->connectQueue [] = \array_shift ($ ips );
311+ }
312+ if (\count ($ connectQueueStash ) > 0 ) {
313+ $ this ->connectQueue [] = \array_shift ($ connectQueueStash );
314+ }
315+ }
316+ }
317+ }
0 commit comments