Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions src/Query/MultiServerExecutor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
<?php

namespace React\Dns\Query;

use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise\CancellablePromiseInterface;
use React\Promise\Promise;
use RuntimeException;

/**
* Resolves hosts in a Happy Eye Balls fashion by spreading queries out over multiple executors.
*
* Wraps existing `ExecutorInterface`'s and delegates sending queries to them in order. In order
* to prevent overloading the first server(s) in the list each query the starting point in the list
* moves one step with each query and wraps when it reaches the end. Once one of of the servers
* responds with a successful query all pending queries will be canceled and sending out new queries
* will be stopped. Any unsuccessful queries will be ignored until the last one, that error will be
* passed up the chain as reason for look up failure. Between each query there is an delay of about
* 50ms giving the first contacted server time to respond.
*
* This executor accepts an array of an arbitrary number of executors as long as it's more then zero:
*
* ```php
* $executor = new MultiServerExecutor(
* array(
* new UdpTransportExecutor('1.1.1.1, $loop),
* new UdpTransportExecutor('8.8.8.8, $loop),
* )
* $loop
* );
* ```
*
* @link https://tools.ietf.org/html/rfc8305#section-3.1
*/
final class MultiServerExecutor implements ExecutorInterface
{
/**
* @var ExecutorInterface[]
*/
public $executors = array();

private $loop;

public $executorsCount = 0;
private $executorsPosition = 0;

/**
* @param ExecutorInterface[] $executors
* @param LoopInterface $loop
*/
public function __construct($executors, LoopInterface $loop)
{
$this->executors = array_filter($executors, function ($executor) {
return $executor instanceof ExecutorInterface;
});
$this->executorsCount = \count($this->executors);
if ($this->executorsCount <= 0) {
throw new RuntimeException('No executors provided');
}
$this->loop = $loop;
}

public function query(Query $query)
{
$executorsLeft = $this->executorsCount;
$executorsPosition = $this->executorsPosition++;

if ($this->executorsPosition >= $this->executorsCount) {
$this->executorsPosition = 0;
}

$that = $this;
$loop = $this->loop;
$promises = array();
$timer = null;
$success = false;
return new Promise(function ($resolve, $reject) use (&$promises, &$timer, &$executorsLeft, &$executorsPosition, &$success, $that, $loop, $query) {
$resolveWrap = function ($index) use (&$promises, &$timer, &$success, $resolve, $loop) {
return function ($result) use ($index, &$promises, &$timer, &$success, $resolve, $loop) {
$success = true;
unset($promises[$index]);

if ($timer instanceof TimerInterface) {
$loop->cancelTimer($timer);
$timer = null;
}

foreach ($promises as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

$resolve($result);
};
};
$rejectWrap = function ($index) use (&$promises, &$timer, &$executorsLeft, &$success, $reject, $loop) {
return function ($error) use ($index, &$promises, &$timer, &$executorsLeft, &$success, $reject, $loop) {
unset($promises[$index]);

if (\count($promises) > 0 || $executorsLeft > 0 || $success === true) {
return;
}

$reject($error);
};
};

$promise = $that->executors[$executorsPosition]->query($query);
$promise->then($resolveWrap($executorsPosition), $rejectWrap($executorsPosition));
$promises[$executorsPosition] = $promise;

$executorsPosition++;
$executorsLeft--;

if ($executorsPosition >= $that->executorsCount) {
$executorsPosition = 0;
}

if ($executorsLeft <= 0) {
return;
}

$timer = $loop->addPeriodicTimer(0.05, function () use (&$promises, &$timer, &$executorsLeft, &$executorsPosition, $that, $loop, $query, $resolveWrap, $rejectWrap) {
$promise = $that->executors[$executorsPosition]->query($query);
$promise->then($resolveWrap($executorsPosition), $rejectWrap($executorsPosition));
$promises[$executorsPosition] = $promise;

$executorsPosition++;
$executorsLeft--;

if ($executorsPosition >= $that->executorsCount) {
$executorsPosition = 0;
}

if ($executorsLeft <= 0) {
$loop->cancelTimer($timer);
$timer = null;
}
});
}, function ($_, $reject) use (&$promises, &$timer, $loop) {
if ($timer instanceof TimerInterface) {
$loop->cancelTimer($timer);
}

foreach ($promises as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

$reject(new RuntimeException('Lookup query has been canceled'));
});
}
}
217 changes: 217 additions & 0 deletions tests/Query/MultiServerExecutorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
<?php

use React\Dns\Model\Message;
use React\Dns\Protocol\BinaryDumper;
use React\Dns\Protocol\Parser;
use React\Dns\Query\MultiServerExecutor;
use React\Dns\Query\Query;
use React\Dns\Query\UdpTransportExecutor;
use React\EventLoop\Factory;
use React\EventLoop\LoopInterface;
use React\Tests\Dns\TestCase;

class MultiServerExecutorTest extends TestCase
{
public $serverConnectCount = 0;
public $serverWriteCount = 0;

/**
* @expectedException RuntimeException
* @expectedExceptionMessage No executors provided
*/
public function testNoExecutorsSupplied()
{
$loop = Factory::create();

new MultiServerExecutor(array(), $loop);
}

public function testQueryWillResolve()
{
$loop = Factory::create();

$server = $this->createAnsweringServer($loop);
$address = new UdpTransportExecutor(stream_socket_get_name($server, false), $loop);
$executor = new MultiServerExecutor(array($address), $loop);

$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);

$promise = $executor->query($query);
$response = \Clue\React\Block\await($promise, $loop, 0.5);

$this->assertInstanceOf('React\Dns\Model\Message', $response);
$this->assertSame(1, $this->serverConnectCount);
$this->assertSame(1, $this->serverWriteCount);
}

public function testQueryWillBeSendToAllServers()
{
$loop = Factory::create();

$answeringServer = $this->createWaitingAnsweringServer($loop, 0.1);
$waitingServer = $this->createWaitingAnsweringServer($loop, 1);
$answeringAddress = new UdpTransportExecutor(stream_socket_get_name($answeringServer, false), $loop);
$waitingAddress = new UdpTransportExecutor(stream_socket_get_name($waitingServer, false), $loop);
$executor = new MultiServerExecutor(array($answeringAddress, $waitingAddress), $loop);

$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);

$promise = $executor->query($query);
$response = \Clue\React\Block\await($promise, $loop, 0.5);

$this->assertInstanceOf('React\Dns\Model\Message', $response);
$this->assertSame(2, $this->serverConnectCount);
$this->assertSame(1, $this->serverWriteCount);
}

public function testQueryWillNotFailWhenOneResponseIsTruncated()
{
$loop = Factory::create();

$servers = array();
$addresses = array();
$servers[] = $this->createWaitingAnsweringServer($loop, 0.1, true);
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2);
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2);
foreach ($servers as $server) {
$addresses[] = new UdpTransportExecutor(stream_socket_get_name($server, false), $loop);
}
$executor = new MultiServerExecutor($addresses, $loop);

$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);

$promise = $executor->query($query);
$response = \Clue\React\Block\await($promise, $loop, 0.5);

$this->assertInstanceOf('React\Dns\Model\Message', $response);
$this->assertSame(3, $this->serverConnectCount);
$this->assertSame(2, $this->serverWriteCount);
}

/**
* @expectedException RuntimeException
* @expectedExceptionMessage DNS query for google.com failed: The server returned a truncated result for a UDP query, but retrying via TCP is currently not supported
*/
public function testQueryWillFailWhenAllResponseAraTruncated()
{
$loop = Factory::create();

$servers = array();
$addresses = array();
$servers[] = $this->createWaitingAnsweringServer($loop, 0.1, true);
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2, true);
$servers[] = $this->createWaitingAnsweringServer($loop, 0.3, true);
foreach ($servers as $server) {
$addresses[] = new UdpTransportExecutor(stream_socket_get_name($server, false), $loop);
}
$executor = new MultiServerExecutor($addresses, $loop);

$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);

$promise = $executor->query($query);
$response = \Clue\React\Block\await($promise, $loop, 0.5);

$this->assertInstanceOf('React\Dns\Model\Message', $response);
$this->assertSame(2, $this->serverConnectCount);
$this->assertSame(2, $this->serverWriteCount);
}

/**
* @expectedException RuntimeException
* @expectedExceptionMessage Lookup query has been canceled
*/
public function testCancelPromiseWillCancelAllPendingQueries()
{
$loop = Factory::create();

$servers = array();
$addresses = array();
$servers[] = $this->createWaitingAnsweringServer($loop, 0.1, true);
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2, true);
$servers[] = $this->createWaitingAnsweringServer($loop, 0.3, true);
foreach ($servers as $server) {
$addresses[] = new UdpTransportExecutor(stream_socket_get_name($server, false), $loop);
}
$executor = new MultiServerExecutor($addresses, $loop);

$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);

$promise = $executor->query($query);
$loop->futureTick(function () use ($promise) {
$promise->cancel();
});
$response = \Clue\React\Block\await($promise, $loop, 0.5);

$this->assertInstanceOf('React\Dns\Model\Message', $response);
$this->assertSame(2, $this->serverConnectCount);
$this->assertSame(2, $this->serverWriteCount);
}

public function testResolvingWilCancelActiveTimer()
{
$loop = Factory::create();

$servers = array();
$addresses = array();
$servers[] = $this->createWaitingAnsweringServer($loop, 0.0001);
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2);
$servers[] = $this->createWaitingAnsweringServer($loop, 0.3);
foreach ($servers as $server) {
$addresses[] = new UdpTransportExecutor(stream_socket_get_name($server, false), $loop);
}
$executor = new MultiServerExecutor($addresses, $loop);

$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);

$promise = $executor->query($query);
$response = \Clue\React\Block\await($promise, $loop, 0.5);

$this->assertInstanceOf('React\Dns\Model\Message', $response);
$this->assertSame(1, $this->serverConnectCount);
$this->assertSame(1, $this->serverWriteCount);
}

private function createAnsweringServer(LoopInterface $loop)
{
$that = $this;
$server = stream_socket_server('udp://127.0.0.1:0', $errno, $errstr, STREAM_SERVER_BIND);
$loop->addReadStream($server, function ($server) use ($that) {
$that->serverConnectCount++;
$parser = new Parser();
$dumper = new BinaryDumper();

$data = stream_socket_recvfrom($server, 512, 0, $peer);

$message = $parser->parseMessage($data);

stream_socket_sendto($server, $dumper->toBinary($message), 0, $peer);
$that->serverWriteCount++;
});

return $server;
}

private function createWaitingAnsweringServer(LoopInterface $loop, $timerout, $truncated = false)
{
$that = $this;
$server = stream_socket_server('udp://127.0.0.1:0', $errno, $errstr, STREAM_SERVER_BIND);
$loop->addReadStream($server, function ($server) use ($loop, $timerout, $that, $truncated) {
$that->serverConnectCount++;
$parser = new Parser();

$data = stream_socket_recvfrom($server, 512, 0, $peer);

$message = $parser->parseMessage($data);
$message->tc = $truncated;

$loop->addTimer($timerout, function () use ($server, $message, $peer, $that) {
$dumper = new BinaryDumper();

stream_socket_sendto($server, $dumper->toBinary($message), 0, $peer);
$that->serverWriteCount++;
});
});

return $server;
}
}