Skip to content

Commit 65e740b

Browse files
committed
Happy Eye Balls Executor
1 parent d99bc4e commit 65e740b

File tree

2 files changed

+284
-0
lines changed

2 files changed

+284
-0
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?php
2+
3+
namespace React\Dns\Query;
4+
5+
use React\EventLoop\LoopInterface;
6+
use React\EventLoop\TimerInterface;
7+
use React\Promise\CancellablePromiseInterface;
8+
use React\Promise\Promise;
9+
use RuntimeException;
10+
11+
final class HappyEyeBallsExecutor implements ExecutorInterface
12+
{
13+
/**
14+
* @var ExecutorInterface[]
15+
*/
16+
public $executors = array();
17+
18+
private $loop;
19+
20+
private $executorsCount = 0;
21+
22+
/**
23+
* @param ExecutorInterface[] $executors
24+
* @param LoopInterface $loop
25+
*/
26+
public function __construct($executors, LoopInterface $loop)
27+
{
28+
$this->executors = $executors;
29+
$this->executorsCount = \count($this->executors);
30+
$this->loop = $loop;
31+
}
32+
33+
public function query(Query $query)
34+
{
35+
$that = $this;
36+
$executorsIndexes = \range(0, $this->executorsCount - 1);
37+
\shuffle($executorsIndexes);
38+
$loop = $this->loop;
39+
$promises = array();
40+
$timer = null;
41+
return new Promise(function ($resolve, $reject) use (&$promises, &$timer, &$executorsIndexes, $that, $loop, $query) {
42+
$resolveWrap = function ($index) use (&$promises, &$timer, $resolve, $loop) {
43+
return function ($result) use ($index, &$promises, &$timer, $resolve, $loop) {
44+
unset($promises[$index]);
45+
46+
$resolve($result);
47+
48+
if ($timer instanceof TimerInterface) {
49+
$loop->cancelTimer($timer);
50+
$timer = null;
51+
}
52+
53+
foreach ($promises as $promise) {
54+
if ($promise instanceof CancellablePromiseInterface) {
55+
$promise->cancel();
56+
}
57+
}
58+
};
59+
};
60+
$rejectWrap = function ($index) use (&$promises, &$timer, &$executorsIndexes, $reject, $loop) {
61+
return function ($error) use ($index, &$promises, &$timer, &$executorsIndexes, $reject, $loop) {
62+
unset($promises[$index]);
63+
64+
if (\count($promises) > 0 || \count($executorsIndexes) > 0) {
65+
return;
66+
}
67+
68+
$reject($error);
69+
};
70+
};
71+
$timer = $loop->addPeriodicTimer(0.05, function () use (&$promises, &$timer, &$executorsIndexes, $that, $loop, $query, $resolveWrap, $rejectWrap) {
72+
$index = \array_pop($executorsIndexes);
73+
$promise = $that->executors[$index]->query($query);
74+
$promise->then($resolveWrap($index), $rejectWrap($index));
75+
$promises[$index] = $promise;
76+
77+
78+
if (count($executorsIndexes) <= 0) {
79+
$loop->cancelTimer($timer);
80+
$timer = null;
81+
}
82+
});
83+
84+
$index = \array_pop($executorsIndexes);
85+
$promise = $that->executors[$index]->query($query);
86+
$promise->then($resolveWrap($index), $rejectWrap($index));
87+
$promises[$index] = $promise;
88+
}, function ($resolve, $reject) use (&$promises, &$timer, $loop) {
89+
if ($timer instanceof TimerInterface) {
90+
$loop->cancelTimer($timer);
91+
}
92+
93+
foreach ($promises as $promise) {
94+
if ($promise instanceof CancellablePromiseInterface) {
95+
$promise->cancel();
96+
}
97+
}
98+
99+
$reject(new RuntimeException('Lookup query has been canceled'));
100+
});
101+
}
102+
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
<?php
2+
3+
use React\Dns\Model\Message;
4+
use React\Dns\Protocol\BinaryDumper;
5+
use React\Dns\Protocol\Parser;
6+
use React\Dns\Query\HappyEyeBallsExecutor;
7+
use React\Dns\Query\Query;
8+
use React\Dns\Query\UdpTransportExecutor;
9+
use React\EventLoop\Factory;
10+
use React\EventLoop\LoopInterface;
11+
use React\Tests\Dns\TestCase;
12+
13+
class HappyEyeBallsExecutorTest extends TestCase
14+
{
15+
public $serverConnectCount = 0;
16+
public $serverWriteCount = 0;
17+
18+
public function testQueryWillResolve()
19+
{
20+
$loop = Factory::create();
21+
22+
$server = $this->createAnsweringServer($loop);
23+
$address = new UdpTransportExecutor(stream_socket_get_name($server, false), $loop);
24+
$executor = new HappyEyeBallsExecutor(array($address), $loop);
25+
26+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
27+
28+
$promise = $executor->query($query);
29+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
30+
31+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
32+
$this->assertSame(1, $this->serverConnectCount);
33+
$this->assertSame(1, $this->serverWriteCount);
34+
}
35+
36+
public function testQueryWillBeSendToAllServers()
37+
{
38+
$loop = Factory::create();
39+
40+
$answeringServer = $this->createWaitingAnsweringServer($loop, 0.1);
41+
$waitingServer = $this->createWaitingAnsweringServer($loop, 1);
42+
$answeringAddress = new UdpTransportExecutor(stream_socket_get_name($answeringServer, false), $loop);
43+
$waitingAddress = new UdpTransportExecutor(stream_socket_get_name($waitingServer, false), $loop);
44+
$executor = new HappyEyeBallsExecutor(array($answeringAddress, $waitingAddress), $loop);
45+
46+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
47+
48+
$promise = $executor->query($query);
49+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
50+
51+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
52+
$this->assertSame(2, $this->serverConnectCount);
53+
$this->assertSame(1, $this->serverWriteCount);
54+
}
55+
56+
public function testQueryWillNotFailWhenOneResponseIsTruncated()
57+
{
58+
$loop = Factory::create();
59+
60+
$servers = array();
61+
$addresses = array();
62+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.1, true);
63+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2);
64+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2);
65+
foreach ($servers as $server) {
66+
$addresses[] = new UdpTransportExecutor(stream_socket_get_name($server, false), $loop);
67+
}
68+
$executor = new HappyEyeBallsExecutor($addresses, $loop);
69+
70+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
71+
72+
$promise = $executor->query($query);
73+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
74+
75+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
76+
$this->assertSame(3, $this->serverConnectCount);
77+
$this->assertSame(2, $this->serverWriteCount);
78+
}
79+
80+
/**
81+
* @expectedException RuntimeException
82+
* @expectedExceptionMessage DNS query for google.com failed: The server returned a truncated result for a UDP query, but retrying via TCP is currently not supported
83+
*/
84+
public function testQueryWillFailWhenAllResponseAraTruncated()
85+
{
86+
$loop = Factory::create();
87+
88+
$servers = array();
89+
$addresses = array();
90+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.1, true);
91+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2, true);
92+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.3, true);
93+
foreach ($servers as $server) {
94+
$addresses[] = new UdpTransportExecutor(stream_socket_get_name($server, false), $loop);
95+
}
96+
$executor = new HappyEyeBallsExecutor($addresses, $loop);
97+
98+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
99+
100+
$promise = $executor->query($query);
101+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
102+
103+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
104+
$this->assertSame(2, $this->serverConnectCount);
105+
$this->assertSame(2, $this->serverWriteCount);
106+
}
107+
108+
/**
109+
* @expectedException RuntimeException
110+
* @expectedExceptionMessage Lookup query has been canceled
111+
*/
112+
public function testCancelPromiseWillCancelAllPendingQueries()
113+
{
114+
$loop = Factory::create();
115+
116+
$servers = array();
117+
$addresses = array();
118+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.1, true);
119+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2, true);
120+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.3, true);
121+
foreach ($servers as $server) {
122+
$addresses[] = new UdpTransportExecutor(stream_socket_get_name($server, false), $loop);
123+
}
124+
$executor = new HappyEyeBallsExecutor($addresses, $loop);
125+
126+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
127+
128+
$promise = $executor->query($query);
129+
$loop->futureTick(function () use ($promise) {
130+
$promise->cancel();
131+
});
132+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
133+
134+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
135+
$this->assertSame(2, $this->serverConnectCount);
136+
$this->assertSame(2, $this->serverWriteCount);
137+
}
138+
139+
private function createAnsweringServer(LoopInterface $loop)
140+
{
141+
$that = $this;
142+
$server = stream_socket_server('udp://127.0.0.1:0', $errno, $errstr, STREAM_SERVER_BIND);
143+
$loop->addReadStream($server, function ($server) use ($that) {
144+
$that->serverConnectCount++;
145+
$parser = new Parser();
146+
$dumper = new BinaryDumper();
147+
148+
$data = stream_socket_recvfrom($server, 512, 0, $peer);
149+
150+
$message = $parser->parseMessage($data);
151+
152+
stream_socket_sendto($server, $dumper->toBinary($message), 0, $peer);
153+
$that->serverWriteCount++;
154+
});
155+
156+
return $server;
157+
}
158+
159+
private function createWaitingAnsweringServer(LoopInterface $loop, $timerout, $truncated = false)
160+
{
161+
$that = $this;
162+
$server = stream_socket_server('udp://127.0.0.1:0', $errno, $errstr, STREAM_SERVER_BIND);
163+
$loop->addReadStream($server, function ($server) use ($loop, $timerout, $that, $truncated) {
164+
$that->serverConnectCount++;
165+
$parser = new Parser();
166+
167+
$data = stream_socket_recvfrom($server, 512, 0, $peer);
168+
169+
$message = $parser->parseMessage($data);
170+
$message->tc = $truncated;
171+
172+
$loop->addTimer($timerout, function () use ($server, $message, $peer, $that) {
173+
$dumper = new BinaryDumper();
174+
175+
stream_socket_sendto($server, $dumper->toBinary($message), 0, $peer);
176+
$that->serverWriteCount++;
177+
});
178+
});
179+
180+
return $server;
181+
}
182+
}

0 commit comments

Comments
 (0)