Skip to content

Commit 8d7abf3

Browse files
committed
Multi Server Executor
1 parent d99bc4e commit 8d7abf3

File tree

1 file changed

+102
-0
lines changed

1 file changed

+102
-0
lines changed

src/Query/MultiServerExecutor.php

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?php
2+
3+
namespace React\Dns\Query;
4+
5+
use React\EventLoop\LoopInterface;
6+
use React\EventLoop\TimerInterface;
7+
use React\Promise\CancellablePromiseInterface;
8+
use React\Promise\Promise;
9+
10+
final class MultiServerExecutor implements ExecutorInterface
11+
{
12+
private $loop;
13+
14+
/**
15+
* @var ExecutorInterface[]
16+
*/
17+
public $executors = array();
18+
19+
private $executorsCount = 0;
20+
21+
public function __construct($nameservers, LoopInterface $loop)
22+
{
23+
$this->loop = $loop;
24+
foreach ($nameservers as $nameserver) {
25+
$this->executors[] = new UdpTransportExecutor($nameserver, $loop);
26+
$this->executorsCount++;
27+
}
28+
}
29+
30+
public function query(Query $query)
31+
{
32+
$that = $this;
33+
$executorsIndexes = range(0, $this->executorsCount - 1);
34+
shuffle($executorsIndexes);
35+
$loop = $this->loop;
36+
$promises = array();
37+
$timer = null;
38+
return new Promise(function ($resolve, $reject) use (&$promises, &$timer, &$executorsIndexes, $that, $loop, $query) {
39+
$resolveWrap = function ($index) use (&$promises, &$timer, $resolve, $loop) {
40+
return function ($result) use ($index, &$promises, &$timer, $resolve, $loop) {
41+
$resolve($result);
42+
43+
unset($promises[$index]);
44+
45+
if ($timer instanceof TimerInterface) {
46+
$loop->cancelTimer($timer);
47+
}
48+
49+
foreach ($promises as $promise) {
50+
if ($promise instanceof CancellablePromiseInterface) {
51+
$promise->cancel();
52+
}
53+
}
54+
};
55+
};
56+
$rejectWrap = function ($index) use (&$promises, &$timer, $reject, $loop) {
57+
return function ($error) use ($index, &$promises, &$timer, $reject, $loop) {
58+
$reject($error);
59+
60+
unset($promises[$index]);
61+
62+
if ($timer instanceof TimerInterface) {
63+
$loop->cancelTimer($timer);
64+
}
65+
66+
foreach ($promises as $promise) {
67+
if ($promise instanceof CancellablePromiseInterface) {
68+
$promise->cancel();
69+
}
70+
}
71+
};
72+
};
73+
$timer = $loop->addPeriodicTimer(0.05, function () use (&$promises, &$timer, &$executorsIndexes, $that, $loop, $query, $resolveWrap, $rejectWrap) {
74+
if (count($executorsIndexes) <= 0) {
75+
$loop->cancelTimer($timer);
76+
77+
return;
78+
}
79+
80+
$index = array_pop($executorsIndexes);
81+
$promise = $that->executors[$index]->query($query);
82+
$promise->then($resolveWrap($index), $rejectWrap);
83+
$promises[$index] = $promise;
84+
});
85+
86+
$index = array_pop($executorsIndexes);
87+
$promise = $that->executors[$index]->query($query);
88+
$promise->then($resolveWrap($index), $rejectWrap);
89+
$promises[$index] = $promise;
90+
}, function () use (&$promises, &$timer, $loop) {
91+
if ($timer instanceof TimerInterface) {
92+
$loop->cancelTimer($timer);
93+
}
94+
95+
foreach ($promises as $promise) {
96+
if ($promise instanceof CancellablePromiseInterface) {
97+
$promise->cancel();
98+
}
99+
}
100+
});
101+
}
102+
}

0 commit comments

Comments
 (0)