Skip to content

Commit 95d8299

Browse files
committed
Cooperatively resolve hosts to avoid running same query concurrently
1 parent 0f30c6c commit 95d8299

File tree

6 files changed

+342
-2
lines changed

6 files changed

+342
-2
lines changed

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,26 @@ $executor = new RetryExecutor(
249249
);
250250
```
251251

252+
Note that this executor is entirely async and as such allows you to execute
253+
any number of queries concurrently. You should probably limit the number of
254+
concurrent queries in your application or you're very likely going to face
255+
rate limitations and bans on the resolver end. For many common applications,
256+
you may want to avoid sending the same query multiple times when the first
257+
one is still pending, so you will likely want to use this in combination with
258+
a `CoopExecutor` like this:
259+
260+
```php
261+
$executor = new CoopExecutor(
262+
new RetryExecutor(
263+
new TimeoutExecutor(
264+
new UdpTransportExecutor($loop),
265+
3.0,
266+
$loop
267+
)
268+
)
269+
);
270+
```
271+
252272
> Internally, this class uses PHP's UDP sockets and does not take advantage
253273
of [react/datagram](https://github.com/reactphp/datagram) purely for
254274
organizational reasons to avoid a cyclic dependency between the two

src/Query/CoopExecutor.php

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?php
2+
3+
namespace React\Dns\Query;
4+
5+
use React\Promise\Promise;
6+
7+
/**
8+
* Cooperatively resolves hosts via the given base executor to ensure same query is not run concurrently
9+
*
10+
* Wraps an existing `ExecutorInterface` to keep tracking of pending queries
11+
* and only starts a new query when the same query is not already pending. Once
12+
* the underlying query is fulfilled/rejected, it will forward its value to all
13+
* promises awaiting the same query.
14+
*
15+
* This means it will not limit concurrency for queries that differ, for example
16+
* when sending many queries for different host names or types.
17+
*
18+
* This is useful because all executors are entirely async and as such allow you
19+
* to execute any number of queries concurrently. You should probably limit the
20+
* number of concurrent queries in your application or you're very likely going
21+
* to face rate limitations and bans on the resolver end. For many common
22+
* applications, you may want to avoid sending the same query multiple times
23+
* when the first one is still pending, so you will likely want to use this in
24+
* combination with some other executor like this:
25+
*
26+
* ```php
27+
* $executor = new CoopExecutor(
28+
* new RetryExecutor(
29+
* new TimeoutExecutor(
30+
* new UdpTransportExecutor($loop),
31+
* 3.0,
32+
* $loop
33+
* )
34+
* )
35+
* );
36+
* ```
37+
*/
38+
class CoopExecutor implements ExecutorInterface
39+
{
40+
private $executor;
41+
private $pending = array();
42+
private $counts = array();
43+
44+
public function __construct(ExecutorInterface $base)
45+
{
46+
$this->executor = $base;
47+
}
48+
49+
public function query($nameserver, Query $query)
50+
{
51+
$key = $this->serializeQueryToIdentity($query);
52+
if (isset($this->pending[$key])) {
53+
// same query is already pending, so use shared reference to pending query
54+
$promise = $this->pending[$key];
55+
++$this->counts[$key];
56+
} else {
57+
// no such query pending, so start new query and keep reference until it's fulfilled or rejected
58+
$promise = $this->executor->query($nameserver, $query);
59+
$this->pending[$key] = $promise;
60+
$this->counts[$key] = 1;
61+
62+
$pending =& $this->pending;
63+
$counts =& $this->counts;
64+
$promise->then(function () use ($key, &$pending, &$counts) {
65+
unset($pending[$key], $counts[$key]);
66+
}, function () use ($key, &$pending, &$counts) {
67+
unset($pending[$key], $counts[$key]);
68+
});
69+
}
70+
71+
// Return a child promise awaiting the pending query.
72+
// Cancelling this child promise should only cancel the pending query
73+
// when no other child promise is awaiting the same query.
74+
$pending =& $this->pending;
75+
$counts =& $this->counts;
76+
return new Promise(function ($resolve, $reject) use ($promise) {
77+
$promise->then($resolve, $reject);
78+
}, function () use ($promise, $key, $query, &$pending, &$counts) {
79+
if (--$counts[$key] < 1) {
80+
unset($pending[$key], $counts[$key]);
81+
$promise->cancel();
82+
}
83+
throw new \RuntimeException('DNS query for ' . $query->name . ' has been cancelled');
84+
});
85+
}
86+
87+
private function serializeQueryToIdentity(Query $query)
88+
{
89+
return sprintf('%s:%s:%s', $query->name, $query->type, $query->class);
90+
}
91+
}

src/Query/HostsFileExecutor.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use React\Promise;
99

1010
/**
11-
* Resolves hosts from the givne HostsFile or falls back to another executor
11+
* Resolves hosts from the given HostsFile or falls back to another executor
1212
*
1313
* If the host is found in the hosts file, it will not be passed to the actual
1414
* DNS executor. If the host is not found in the hosts file, it will be passed

src/Query/UdpTransportExecutor.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,26 @@
6060
* );
6161
* ```
6262
*
63+
* Note that this executor is entirely async and as such allows you to execute
64+
* any number of queries concurrently. You should probably limit the number of
65+
* concurrent queries in your application or you're very likely going to face
66+
* rate limitations and bans on the resolver end. For many common applications,
67+
* you may want to avoid sending the same query multiple times when the first
68+
* one is still pending, so you will likely want to use this in combination with
69+
* a `CoopExecutor` like this:
70+
*
71+
* ```php
72+
* $executor = new CoopExecutor(
73+
* new RetryExecutor(
74+
* new TimeoutExecutor(
75+
* new UdpTransportExecutor($loop),
76+
* 3.0,
77+
* $loop
78+
* )
79+
* )
80+
* );
81+
* ```
82+
*
6383
* > Internally, this class uses PHP's UDP sockets and does not take advantage
6484
* of [react/datagram](https://github.com/reactphp/datagram) purely for
6585
* organizational reasons to avoid a cyclic dependency between the two

src/Resolver/Factory.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use React\Cache\CacheInterface;
77
use React\Dns\Config\HostsFile;
88
use React\Dns\Query\CachedExecutor;
9+
use React\Dns\Query\CoopExecutor;
910
use React\Dns\Query\ExecutorInterface;
1011
use React\Dns\Query\HostsFileExecutor;
1112
use React\Dns\Query\RecordCache;
@@ -77,7 +78,7 @@ protected function createExecutor(LoopInterface $loop)
7778

7879
protected function createRetryExecutor(LoopInterface $loop)
7980
{
80-
return new RetryExecutor($this->createExecutor($loop));
81+
return new CoopExecutor(new RetryExecutor($this->createExecutor($loop)));
8182
}
8283

8384
protected function createCachedExecutor(LoopInterface $loop, CacheInterface $cache)

tests/Query/CoopExecutorTest.php

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
<?php
2+
3+
use React\Dns\Query\CoopExecutor;
4+
use React\Dns\Model\Message;
5+
use React\Dns\Query\Query;
6+
use React\Promise\Promise;
7+
use React\Tests\Dns\TestCase;
8+
use React\Promise\Deferred;
9+
10+
class CoopExecutorTest extends TestCase
11+
{
12+
public function testQueryOnceWillPassExactQueryToBaseExecutor()
13+
{
14+
$pending = new Promise(function () { });
15+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
16+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
17+
$base->expects($this->once())->method('query')->with('8.8.8.8', $query)->willReturn($pending);
18+
$connector = new CoopExecutor($base);
19+
20+
$connector->query('8.8.8.8', $query);
21+
}
22+
23+
public function testQueryOnceWillResolveWhenBaseExecutorResolves()
24+
{
25+
$message = new Message();
26+
27+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
28+
$base->expects($this->once())->method('query')->willReturn(\React\Promise\resolve($message));
29+
$connector = new CoopExecutor($base);
30+
31+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
32+
$promise = $connector->query('8.8.8.8', $query);
33+
34+
$this->assertInstanceOf('React\Promise\PromiseInterface', $promise);
35+
36+
$promise->then($this->expectCallableOnceWith($message));
37+
}
38+
39+
public function testQueryOnceWillRejectWhenBaseExecutorRejects()
40+
{
41+
$exception = new RuntimeException();
42+
43+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
44+
$base->expects($this->once())->method('query')->willReturn(\React\Promise\reject($exception));
45+
$connector = new CoopExecutor($base);
46+
47+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
48+
$promise = $connector->query('8.8.8.8', $query);
49+
50+
$this->assertInstanceOf('React\Promise\PromiseInterface', $promise);
51+
52+
$promise->then(null, $this->expectCallableOnceWith($exception));
53+
}
54+
55+
public function testQueryTwoDifferentQueriesWillPassExactQueryToBaseExecutorTwice()
56+
{
57+
$pending = new Promise(function () { });
58+
$query1 = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
59+
$query2 = new Query('reactphp.org', Message::TYPE_AAAA, Message::CLASS_IN);
60+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
61+
$base->expects($this->exactly(2))->method('query')->withConsecutive(
62+
['8.8.8.8', $query1],
63+
['8.8.8.8', $query2]
64+
)->willReturn($pending);
65+
$connector = new CoopExecutor($base);
66+
67+
$connector->query('8.8.8.8', $query1);
68+
$connector->query('8.8.8.8', $query2);
69+
}
70+
71+
public function testQueryTwiceWillPassExactQueryToBaseExecutorOnceWhenQueryIsStillPending()
72+
{
73+
$pending = new Promise(function () { });
74+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
75+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
76+
$base->expects($this->once())->method('query')->with('8.8.8.8', $query)->willReturn($pending);
77+
$connector = new CoopExecutor($base);
78+
79+
$connector->query('8.8.8.8', $query);
80+
$connector->query('8.8.8.8', $query);
81+
}
82+
83+
public function testQueryTwiceWillPassExactQueryToBaseExecutorTwiceWhenFirstQueryIsAlreadyResolved()
84+
{
85+
$deferred = new Deferred();
86+
$pending = new Promise(function () { });
87+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
88+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
89+
$base->expects($this->exactly(2))->method('query')->with('8.8.8.8', $query)->willReturnOnConsecutiveCalls($deferred->promise(), $pending);
90+
91+
$connector = new CoopExecutor($base);
92+
93+
$connector->query('8.8.8.8', $query);
94+
95+
$deferred->resolve(new Message());
96+
97+
$connector->query('8.8.8.8', $query);
98+
}
99+
100+
public function testQueryTwiceWillPassExactQueryToBaseExecutorTwiceWhenFirstQueryIsAlreadyRejected()
101+
{
102+
$deferred = new Deferred();
103+
$pending = new Promise(function () { });
104+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
105+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
106+
$base->expects($this->exactly(2))->method('query')->with('8.8.8.8', $query)->willReturnOnConsecutiveCalls($deferred->promise(), $pending);
107+
108+
$connector = new CoopExecutor($base);
109+
110+
$connector->query('8.8.8.8', $query);
111+
112+
$deferred->reject(new RuntimeException());
113+
114+
$connector->query('8.8.8.8', $query);
115+
}
116+
117+
public function testCancelQueryWillCancelPromiseFromBaseExecutorAndReject()
118+
{
119+
$promise = new Promise(function () { }, $this->expectCallableOnce());
120+
121+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
122+
$base->expects($this->once())->method('query')->willReturn($promise);
123+
$connector = new CoopExecutor($base);
124+
125+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
126+
$promise = $connector->query('8.8.8.8', $query);
127+
128+
$promise->cancel();
129+
130+
$promise->then(null, $this->expectCallableOnce());
131+
}
132+
133+
public function testCancelOneQueryWhenOtherQueryIsStillPendingWillNotCancelPromiseFromBaseExecutorAndRejectCancelled()
134+
{
135+
$promise = new Promise(function () { }, $this->expectCallableNever());
136+
137+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
138+
$base->expects($this->once())->method('query')->willReturn($promise);
139+
$connector = new CoopExecutor($base);
140+
141+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
142+
$promise1 = $connector->query('8.8.8.8', $query);
143+
$promise2 = $connector->query('8.8.8.8', $query);
144+
145+
$promise1->cancel();
146+
147+
$promise1->then(null, $this->expectCallableOnce());
148+
$promise2->then(null, $this->expectCallableNever());
149+
}
150+
151+
public function testCancelSecondQueryWhenFirstQueryIsStillPendingWillNotCancelPromiseFromBaseExecutorAndRejectCancelled()
152+
{
153+
$promise = new Promise(function () { }, $this->expectCallableNever());
154+
155+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
156+
$base->expects($this->once())->method('query')->willReturn($promise);
157+
$connector = new CoopExecutor($base);
158+
159+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
160+
$promise1 = $connector->query('8.8.8.8', $query);
161+
$promise2 = $connector->query('8.8.8.8', $query);
162+
163+
$promise2->cancel();
164+
165+
$promise2->then(null, $this->expectCallableOnce());
166+
$promise1->then(null, $this->expectCallableNever());
167+
}
168+
169+
public function testCancelAllPendingQueriesWillCancelPromiseFromBaseExecutorAndRejectCancelled()
170+
{
171+
$promise = new Promise(function () { }, $this->expectCallableOnce());
172+
173+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
174+
$base->expects($this->once())->method('query')->willReturn($promise);
175+
$connector = new CoopExecutor($base);
176+
177+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
178+
$promise1 = $connector->query('8.8.8.8', $query);
179+
$promise2 = $connector->query('8.8.8.8', $query);
180+
181+
$promise1->cancel();
182+
$promise2->cancel();
183+
184+
$promise1->then(null, $this->expectCallableOnce());
185+
$promise2->then(null, $this->expectCallableOnce());
186+
}
187+
188+
public function testQueryTwiceWillQueryBaseExecutorTwiceIfFirstQueryHasAlreadyBeenCancelledWhenSecondIsStarted()
189+
{
190+
$promise = new Promise(function () { }, $this->expectCallableOnce());
191+
$pending = new Promise(function () { });
192+
193+
$base = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock();
194+
$base->expects($this->exactly(2))->method('query')->willReturnOnConsecutiveCalls($promise, $pending);
195+
$connector = new CoopExecutor($base);
196+
197+
$query = new Query('reactphp.org', Message::TYPE_A, Message::CLASS_IN);
198+
199+
$promise1 = $connector->query('8.8.8.8', $query);
200+
$promise1->cancel();
201+
202+
$promise2 = $connector->query('8.8.8.8', $query);
203+
204+
$promise1->then(null, $this->expectCallableOnce());
205+
206+
$promise2->then(null, $this->expectCallableNever());
207+
}
208+
}

0 commit comments

Comments
 (0)