Skip to content

Commit 44a8c92

Browse files
committed
Reuse TCP/IP socket connection and pipeline outgoing queries
The TcpTransportExecutor will now keep the TCP/IP socket connection to the DNS server open for a short period. Additional queries can reuse the same socket connection and pipeline multiple queries over this single socket. This avoids creating multiple socket resources for concurrent requests and implies the TCP/IP connection overhead only applies once instead of for each query. Among others, this helps avoiding excessive resources and significantly improves query performance when using multiple (concurrent) queries. This implements RFC 7766 as per https://tools.ietf.org/html/rfc7766
1 parent f7f358f commit 44a8c92

File tree

4 files changed

+588
-108
lines changed

4 files changed

+588
-108
lines changed

README.md

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -319,12 +319,20 @@ Unlike the `UdpTransportExecutor`, this class uses a reliable TCP/IP
319319
transport, so you do not necessarily have to implement any retry logic.
320320

321321
Note that this executor is entirely async and as such allows you to execute
322-
any number of queries concurrently. You should probably limit the number of
323-
concurrent queries in your application or you're very likely going to face
324-
rate limitations and bans on the resolver end. For many common applications,
325-
you may want to avoid sending the same query multiple times when the first
326-
one is still pending, so you will likely want to use this in combination with
327-
a `CoopExecutor` like this:
322+
queries concurrently. The first query will establish a TCP/IP socket
323+
connection to the DNS server which will be kept open for a short period.
324+
Additional queries will automatically reuse this existing socket connection
325+
to the DNS server, will pipeline multiple requests over this single
326+
connection and will keep an idle connection open for a short period. The
327+
initial TCP/IP connection overhead may incur a slight delay if you only send
328+
occasional queries – when sending a larger number of concurrent queries over
329+
an existing connection, it becomes increasingly more efficient and avoids
330+
creating many concurrent sockets like the UDP-based executor. You may still
331+
want to limit the number of (concurrent) queries in your application or you
332+
may be facing rate limitations and bans on the resolver end. For many common
333+
applications, you may want to avoid sending the same query multiple times
334+
when the first one is still pending, so you will likely want to use this in
335+
combination with a `CoopExecutor` like this:
328336

329337
```php
330338
$executor = new CoopExecutor(

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"require": {
77
"php": ">=5.3.0",
88
"react/cache": "^1.0 || ^0.6 || ^0.5",
9-
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5",
9+
"react/event-loop": "^1.0 || ^0.5",
1010
"react/promise": "^2.1 || ^1.2.1",
1111
"react/promise-timer": "^1.2"
1212
},

src/Query/TcpTransportExecutor.php

Lines changed: 208 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,20 @@
4848
* transport, so you do not necessarily have to implement any retry logic.
4949
*
5050
* Note that this executor is entirely async and as such allows you to execute
51-
* any number of queries concurrently. You should probably limit the number of
52-
* concurrent queries in your application or you're very likely going to face
53-
* rate limitations and bans on the resolver end. For many common applications,
54-
* you may want to avoid sending the same query multiple times when the first
55-
* one is still pending, so you will likely want to use this in combination with
56-
* a `CoopExecutor` like this:
51+
* queries concurrently. The first query will establish a TCP/IP socket
52+
* connection to the DNS server which will be kept open for a short period.
53+
* Additional queries will automatically reuse this existing socket connection
54+
* to the DNS server, will pipeline multiple requests over this single
55+
* connection and will keep an idle connection open for a short period. The
56+
* initial TCP/IP connection overhead may incur a slight delay if you only send
57+
* occasional queries – when sending a larger number of concurrent queries over
58+
* an existing connection, it becomes increasingly more efficient and avoids
59+
* creating many concurrent sockets like the UDP-based executor. You may still
60+
* want to limit the number of (concurrent) queries in your application or you
61+
* may be facing rate limitations and bans on the resolver end. For many common
62+
* applications, you may want to avoid sending the same query multiple times
63+
* when the first one is still pending, so you will likely want to use this in
64+
* combination with a `CoopExecutor` like this:
5765
*
5866
* ```php
5967
* $executor = new CoopExecutor(
@@ -78,6 +86,51 @@ class TcpTransportExecutor implements ExecutorInterface
7886
private $parser;
7987
private $dumper;
8088

89+
/**
90+
* @var ?resource
91+
*/
92+
private $socket;
93+
94+
/**
95+
* @var Deferred[]
96+
*/
97+
private $pending = array();
98+
99+
/**
100+
* @var string[]
101+
*/
102+
private $names = array();
103+
104+
/**
105+
* Maximum idle time when socket is current unused (i.e. no pending queries outstanding)
106+
*
107+
* If a new query is to be sent during the idle period, we can reuse the
108+
* existing socket without having to wait for a new socket connection.
109+
* This uses a rather small, hard-coded value to not keep any unneeded
110+
* sockets open and to not keep the loop busy longer than needed.
111+
*
112+
* A future implementation may take advantage of `edns-tcp-keepalive` to keep
113+
* the socket open for longer periods. This will likely require explicit
114+
* configuration because this may consume additional resources and also keep
115+
* the loop busy for longer than expected in some applications.
116+
*
117+
* @var float
118+
* @link https://tools.ietf.org/html/rfc7766#section-6.2.1
119+
* @link https://tools.ietf.org/html/rfc7828
120+
*/
121+
private $idlePeriod = 0.001;
122+
123+
/**
124+
* @var ?\React\EventLoop\TimerInterface
125+
*/
126+
private $idleTimer;
127+
128+
private $writeBuffer = '';
129+
private $writePending = false;
130+
131+
private $readBuffer = '';
132+
private $readPending = false;
133+
81134
/**
82135
* @param string $nameserver
83136
* @param LoopInterface $loop
@@ -103,6 +156,12 @@ public function __construct($nameserver, LoopInterface $loop)
103156
public function query(Query $query)
104157
{
105158
$request = Message::createRequestForQuery($query);
159+
160+
// keep shuffing message ID to avoid using the same message ID for two pending queries at the same time
161+
while (isset($this->pending[$request->id])) {
162+
$request->id = \mt_rand(0, 0xffff); // @codeCoverageIgnore
163+
}
164+
106165
$queryData = $this->dumper->toBinary($request);
107166
$length = \strlen($queryData);
108167
if ($length > 0xffff) {
@@ -113,106 +172,175 @@ public function query(Query $query)
113172

114173
$queryData = \pack('n', $length) . $queryData;
115174

116-
// create async TCP/IP connection (may take a while)
117-
$socket = @\stream_socket_client($this->nameserver, $errno, $errstr, 0, \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_ASYNC_CONNECT);
118-
if ($socket === false) {
119-
return \React\Promise\reject(new \RuntimeException(
120-
'DNS query for ' . $query->name . ' failed: Unable to connect to DNS server (' . $errstr . ')',
121-
$errno
122-
));
175+
if ($this->socket === null) {
176+
// create async TCP/IP connection (may take a while)
177+
$socket = @\stream_socket_client($this->nameserver, $errno, $errstr, 0, \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_ASYNC_CONNECT);
178+
if ($socket === false) {
179+
return \React\Promise\reject(new \RuntimeException(
180+
'DNS query for ' . $query->name . ' failed: Unable to connect to DNS server (' . $errstr . ')',
181+
$errno
182+
));
183+
}
184+
185+
// set socket to non-blocking and wait for it to become writable (connection success/rejected)
186+
\stream_set_blocking($socket, false);
187+
$this->socket = $socket;
188+
}
189+
190+
if ($this->idleTimer !== null) {
191+
$this->loop->cancelTimer($this->idleTimer);
192+
$this->idleTimer = null;
193+
}
194+
195+
// wait for socket to become writable to actually write out data
196+
$this->writeBuffer .= $queryData;
197+
if (!$this->writePending) {
198+
$this->writePending = true;
199+
$this->loop->addWriteStream($this->socket, array($this, 'handleWritable'));
123200
}
124201

125-
$loop = $this->loop;
126-
$deferred = new Deferred(function () use ($loop, $socket, $query) {
127-
// cancellation should remove socket from loop and close socket
128-
$loop->removeReadStream($socket);
129-
$loop->removeWriteStream($socket);
130-
\fclose($socket);
202+
$names =& $this->names;
203+
$that = $this;
204+
$deferred = new Deferred(function () use ($that, &$names, $request) {
205+
// remove from list of pending names, but remember pending query
206+
$name = $names[$request->id];
207+
unset($names[$request->id]);
208+
$that->checkIdle();
131209

132-
throw new CancellationException('DNS query for ' . $query->name . ' has been cancelled');
210+
throw new CancellationException('DNS query for ' . $name . ' has been cancelled');
133211
});
134212

135-
// set socket to non-blocking and wait for it to become writable (connection success/rejected)
136-
\stream_set_blocking($socket, false);
137-
$loop->addWriteStream($socket, function ($socket) use ($loop, $query, $queryData, $deferred) {
138-
$loop->removeWriteStream($socket);
139-
$name = @\stream_socket_get_name($socket, true);
213+
$this->pending[$request->id] = $deferred;
214+
$this->names[$request->id] = $query->name;
215+
216+
return $deferred->promise();
217+
}
218+
219+
/**
220+
* @internal
221+
*/
222+
public function handleWritable()
223+
{
224+
if ($this->readPending === false) {
225+
$name = @\stream_socket_get_name($this->socket, true);
140226
if ($name === false) {
141-
$loop->removeReadStream($socket);
142-
@\fclose($socket);
143-
$deferred->reject(new \RuntimeException(
144-
'DNS query for ' . $query->name . ' failed: Connection to DNS server rejected'
145-
));
227+
$this->closeError('Connection to DNS server rejected');
146228
return;
147229
}
148230

149-
$written = @\fwrite($socket, $queryData);
150-
if ($written !== \strlen($queryData)) {
151-
$loop->removeReadStream($socket);
152-
\fclose($socket);
153-
$deferred->reject(new \RuntimeException(
154-
'DNS query for ' . $query->name . ' failed: Unable to write DNS query message in one chunk'
155-
));
156-
}
157-
});
231+
$this->readPending = true;
232+
$this->loop->addReadStream($this->socket, array($this, 'handleRead'));
233+
}
158234

159-
$buffer = '';
160-
$parser = $this->parser;
161-
$loop->addReadStream($socket, function ($socket) use (&$buffer, $loop, $deferred, $query, $parser, $request) {
162-
// read one chunk of data from the DNS server
163-
// any error is fatal, this is a stream of TCP/IP data
164-
$chunk = @\fread($socket, 65536);
165-
if ($chunk === false || $chunk === '') {
166-
$loop->removeReadStream($socket);
167-
\fclose($socket);
168-
$deferred->reject(new \RuntimeException(
169-
'DNS query for ' . $query->name . ' failed: Connection to DNS server lost'
170-
));
171-
return;
172-
}
235+
$written = @\fwrite($this->socket, $this->writeBuffer);
236+
if ($written === false || $written === 0) {
237+
$this->closeError('Unable to write to closed socket');
238+
return;
239+
}
173240

174-
// reassemble complete message by concatenating all chunks.
175-
// response message header contains at least 12 bytes
176-
$buffer .= $chunk;
177-
if (!isset($buffer[11])) {
178-
return;
179-
}
241+
if (isset($this->writeBuffer[$written])) {
242+
$this->writeBuffer = \substr($this->writeBuffer, $written);
243+
} else {
244+
$this->loop->removeWriteStream($this->socket);
245+
$this->writePending = false;
246+
$this->writeBuffer = '';
247+
}
248+
}
249+
250+
/**
251+
* @internal
252+
*/
253+
public function handleRead()
254+
{
255+
// read one chunk of data from the DNS server
256+
// any error is fatal, this is a stream of TCP/IP data
257+
$chunk = @\fread($this->socket, 65536);
258+
if ($chunk === false || $chunk === '') {
259+
$this->closeError('Connection to DNS server lost');
260+
return;
261+
}
180262

263+
// reassemble complete message by concatenating all chunks.
264+
$this->readBuffer .= $chunk;
265+
266+
// response message header contains at least 12 bytes
267+
while (isset($this->readBuffer[11])) {
181268
// read response message length from first 2 bytes and ensure we have length + data in buffer
182-
list(, $length) = \unpack('n', $buffer);
183-
if (!isset($buffer[$length + 1])) {
269+
list(, $length) = \unpack('n', $this->readBuffer);
270+
if (!isset($this->readBuffer[$length + 1])) {
184271
return;
185272
}
186273

187-
// we only react to the first complete message, so remove socket from loop and close
188-
$loop->removeReadStream($socket);
189-
\fclose($socket);
190-
$data = \substr($buffer, 2, $length);
191-
$buffer = '';
274+
$data = \substr($this->readBuffer, 2, $length);
275+
$this->readBuffer = (string)substr($this->readBuffer, $length + 2);
192276

193277
try {
194-
$response = $parser->parseMessage($data);
278+
$response = $this->parser->parseMessage($data);
195279
} catch (\Exception $e) {
196-
// reject if we received an invalid message from remote server
197-
$deferred->reject(new \RuntimeException(
198-
'DNS query for ' . $query->name . ' failed: Invalid message received from DNS server',
199-
0,
200-
$e
201-
));
280+
// reject all pending queries if we received an invalid message from remote server
281+
$this->closeError('Invalid message received from DNS server');
202282
return;
203283
}
204284

205-
// reject if we received an unexpected response ID or truncated response
206-
if ($response->id !== $request->id || $response->tc) {
207-
$deferred->reject(new \RuntimeException(
208-
'DNS query for ' . $query->name . ' failed: Invalid response message received from DNS server'
209-
));
285+
// reject all pending queries if we received an unexpected response ID or truncated response
286+
if (!isset($this->pending[$response->id]) || $response->tc) {
287+
$this->closeError('Invalid response message received from DNS server');
210288
return;
211289
}
212290

291+
$deferred = $this->pending[$response->id];
292+
unset($this->pending[$response->id], $this->names[$response->id]);
293+
213294
$deferred->resolve($response);
214-
});
215295

216-
return $deferred->promise();
296+
$this->checkIdle();
297+
}
298+
}
299+
300+
/**
301+
* @internal
302+
* @param string $reason
303+
*/
304+
public function closeError($reason)
305+
{
306+
$this->readBuffer = '';
307+
if ($this->readPending) {
308+
$this->loop->removeReadStream($this->socket);
309+
$this->readPending = false;
310+
}
311+
312+
$this->writeBuffer = '';
313+
if ($this->writePending) {
314+
$this->loop->removeWriteStream($this->socket);
315+
$this->writePending = false;
316+
}
317+
318+
if ($this->idleTimer !== null) {
319+
$this->loop->cancelTimer($this->idleTimer);
320+
$this->idleTimer = null;
321+
}
322+
323+
@\fclose($this->socket);
324+
$this->socket = null;
325+
326+
foreach ($this->names as $id => $name) {
327+
$this->pending[$id]->reject(new \RuntimeException(
328+
'DNS query for ' . $name . ' failed: ' . $reason
329+
));
330+
}
331+
$this->pending = $this->names = array();
332+
}
333+
334+
/**
335+
* @internal
336+
*/
337+
public function checkIdle()
338+
{
339+
if ($this->idleTimer === null && !$this->names) {
340+
$that = $this;
341+
$this->idleTimer = $this->loop->addTimer($this->idlePeriod, function () use ($that) {
342+
$that->closeError('Idle timeout');
343+
});
344+
}
217345
}
218346
}

0 commit comments

Comments
 (0)