Skip to content

Commit c8fe156

Browse files
authored
RPC Implementation using multiple relays to enable async communication (#25)
1 parent 05269a1 commit c8fe156

16 files changed

+1792
-80
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
"scripts": {
7676
"test": "phpunit --no-coverage --colors=always",
7777
"test-cover": "phpunit --coverage-clover=coverage.xml",
78-
"test-static": "psalm",
78+
"test-static": "psalm --no-cache",
7979
"test-mutations": "infection"
8080
},
8181
"minimum-stability": "dev",

src/ConnectedRelayInterface.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
namespace Spiral\Goridge;
4+
5+
use Spiral\Goridge\Exception\RelayException;
6+
7+
/**
8+
* This interface describes a Relay that explictily establishes a connection.
9+
* That connection can also be re-established on the fly (in comparison to StreamRelay, which relies on the existence of the streams).
10+
* The object is also clonable, i.e. supports cloning without data errors due to shared state.
11+
*/
12+
interface ConnectedRelayInterface extends RelayInterface
13+
{
14+
/**
15+
* Returns true if the underlying connection is already established
16+
*/
17+
public function isConnected(): bool;
18+
19+
/**
20+
* Establishes the underlying connection and returns true on success, false on failure, or throws an exception in case of an error.
21+
*
22+
* @throws RelayException
23+
*/
24+
public function connect(): bool;
25+
26+
/**
27+
* Closes the underlying connection.
28+
*/
29+
public function close(): void;
30+
31+
/**
32+
* Enforce implementation of __clone magic method
33+
* @psalm-return void
34+
*/
35+
public function __clone();
36+
}

src/MultiRelayHelper.php

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Spiral\Goridge;
6+
7+
use Spiral\Goridge\RPC\Exception\RPCException;
8+
use function socket_select;
9+
10+
class MultiRelayHelper
11+
{
12+
/**
13+
* @param array<array-key, RelayInterface> $relays
14+
* @return array-key[]|false
15+
* @internal
16+
* Returns either
17+
* - an array of array keys, even if only one
18+
* - or false if none
19+
*/
20+
public static function findRelayWithMessage(array $relays, int $timeoutInMicroseconds = 0): array|false
21+
{
22+
if (count($relays) === 0) {
23+
return false;
24+
}
25+
26+
if ($relays[array_key_first($relays)] instanceof SocketRelay) {
27+
$sockets = [];
28+
$socketIdToRelayIndexMap = [];
29+
foreach ($relays as $relayIndex => $relay) {
30+
assert($relay instanceof SocketRelay);
31+
32+
// Enforce connection
33+
if ($relay->socket === null) {
34+
// Important: Do not force reconnect here as it would otherwise completely ruin further handling
35+
continue;
36+
}
37+
38+
$sockets[] = $relay->socket;
39+
$socketIdToRelayIndexMap[spl_object_id($relay->socket)] = $relayIndex;
40+
}
41+
42+
if (count($sockets) === 0) {
43+
return false;
44+
}
45+
46+
$writes = null;
47+
$except = null;
48+
$changes = socket_select($sockets, $writes, $except, 0, $timeoutInMicroseconds);
49+
50+
if ($changes > 0) {
51+
$indexes = [];
52+
foreach ($sockets as $socket) {
53+
$indexes[] = $socketIdToRelayIndexMap[spl_object_id($socket)] ?? throw new RPCException("Invalid socket??");
54+
}
55+
56+
return $indexes;
57+
} else {
58+
return false;
59+
}
60+
}
61+
62+
if ($relays[array_key_first($relays)] instanceof StreamRelay) {
63+
$streams = [];
64+
$streamNameToRelayIndexMap = [];
65+
foreach ($relays as $relayIndex => $relay) {
66+
assert($relay instanceof StreamRelay);
67+
68+
$streams[] = $relay->in;
69+
$streamNameToRelayIndexMap[(string)$relay->in] = $relayIndex;
70+
}
71+
72+
$writes = null;
73+
$except = null;
74+
$changes = stream_select($streams, $writes, $except, 0, $timeoutInMicroseconds);
75+
76+
if ($changes > 0) {
77+
$indexes = [];
78+
foreach ($streams as $stream) {
79+
$indexes[] = $streamNameToRelayIndexMap[(string)$stream] ?? throw new RPCException("Invalid stream??");
80+
}
81+
82+
return $indexes;
83+
} else {
84+
return false;
85+
}
86+
}
87+
88+
return false;
89+
}
90+
91+
/**
92+
* @param array<array-key, RelayInterface> $relays
93+
* @return array-key[]|false
94+
* @internal
95+
* Returns either
96+
* - an array of array keys, even if only one
97+
* - or false if none
98+
*/
99+
public static function checkConnected(array $relays): array|false
100+
{
101+
if (count($relays) === 0) {
102+
return false;
103+
}
104+
105+
$keysNotConnected = [];
106+
foreach ($relays as $key => $relay) {
107+
if ($relay instanceof ConnectedRelayInterface && !$relay->isConnected()) {
108+
$relay->connect();
109+
$keysNotConnected[] = $key;
110+
}
111+
}
112+
113+
return $keysNotConnected;
114+
}
115+
}

src/RPC/AbstractRPC.php

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Spiral\Goridge\RPC;
6+
7+
use Spiral\Goridge\Frame;
8+
use Spiral\Goridge\RelayInterface;
9+
use Spiral\Goridge\RPC\Exception\ServiceException;
10+
use Stringable;
11+
use function sprintf;
12+
use function strlen;
13+
use function substr;
14+
use function ucfirst;
15+
16+
abstract class AbstractRPC implements RPCInterface
17+
{
18+
/**
19+
* RPC calls service prefix.
20+
*
21+
* @var non-empty-string|null
22+
*/
23+
protected ?string $service = null;
24+
25+
/**
26+
* @var positive-int
27+
*/
28+
protected static int $seq = 1;
29+
30+
public function __construct(
31+
protected CodecInterface $codec
32+
) {
33+
}
34+
35+
/**
36+
* @psalm-pure
37+
*/
38+
public function withServicePrefix(string $service): self
39+
{
40+
/** @psalm-suppress ImpureVariable */
41+
$rpc = clone $this;
42+
$rpc->service = $service;
43+
44+
return $rpc;
45+
}
46+
47+
/**
48+
* @psalm-pure
49+
*/
50+
public function withCodec(CodecInterface $codec): self
51+
{
52+
/** @psalm-suppress ImpureVariable */
53+
$rpc = clone $this;
54+
$rpc->codec = $codec;
55+
56+
return $rpc;
57+
}
58+
59+
/**
60+
* @throws Exception\ServiceException
61+
*/
62+
protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $options = null): mixed
63+
{
64+
// exclude method name
65+
$body = substr((string)$frame->payload, $frame->options[1]);
66+
67+
if ($frame->hasFlag(Frame::ERROR)) {
68+
$name = $relay instanceof Stringable
69+
? (string)$relay
70+
: $relay::class;
71+
72+
throw new ServiceException(sprintf("Error '%s' on %s", $body, $name));
73+
}
74+
75+
return $this->codec->decode($body, $options);
76+
}
77+
78+
/**
79+
* @param non-empty-string $method
80+
*/
81+
protected function packFrame(string $method, mixed $payload): Frame
82+
{
83+
if ($this->service !== null) {
84+
$method = $this->service . '.' . ucfirst($method);
85+
}
86+
87+
$body = $method . $this->codec->encode($payload);
88+
return new Frame($body, [self::$seq, strlen($method)], $this->codec->getIndex());
89+
}
90+
}

src/RPC/AsyncRPCInterface.php

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?php
2+
3+
namespace Spiral\Goridge\RPC;
4+
5+
use Spiral\Goridge\Exception\GoridgeException;
6+
use Spiral\Goridge\Exception\RelayException;
7+
use Spiral\Goridge\RPC\Exception\RPCException;
8+
use Spiral\Goridge\RPC\Exception\ServiceException;
9+
10+
interface AsyncRPCInterface extends RPCInterface
11+
{
12+
/**
13+
* Invoke remote RoadRunner service method using given payload (free form) non-blockingly and ignore the response.
14+
*
15+
* @param non-empty-string $method
16+
*
17+
* @throws GoridgeException
18+
*/
19+
public function callIgnoreResponse(string $method, mixed $payload): void;
20+
21+
/**
22+
* Invoke remote RoadRunner service method using given payload (free form) non-blockingly but accept a response.
23+
*
24+
* @param non-empty-string $method
25+
* @return positive-int An "ID" to check whether a response has been received and to fetch said response.
26+
*
27+
* @throws GoridgeException
28+
*/
29+
public function callAsync(string $method, mixed $payload): int;
30+
31+
/**
32+
* Check whether a response has been received using the "ID" obtained through @see AsyncRPCInterface::callAsync() .
33+
*
34+
* @param positive-int $seq
35+
* @return bool
36+
*/
37+
public function hasResponse(int $seq): bool;
38+
39+
/**
40+
* Checks the "ID"s obtained through @see AsyncRPCInterface::callAsync() if they've got a response yet.
41+
* Returns an array of "ID"s that do.
42+
*
43+
* @param positive-int[] $seqs
44+
* @return positive-int[]
45+
*/
46+
public function hasResponses(array $seqs): array;
47+
48+
/**
49+
* Fetch the response for the "ID" obtained through @see AsyncRPCInterface::callAsync() .
50+
* @param positive-int $seq
51+
* @throws RPCException
52+
* @throws ServiceException
53+
* @throws RelayException
54+
*/
55+
public function getResponse(int $seq, mixed $options = null): mixed;
56+
57+
/**
58+
* Fetches the responses for the "ID"s obtained through @see AsyncRPCInterface::callAsync()
59+
* and returns a map of "ID" => Response.
60+
* @throws RelayException
61+
* @throws ServiceException
62+
* @throws RPCException
63+
*
64+
* @param array<array-key, positive-int> $seqs
65+
* @return iterable<positive-int, mixed>
66+
*
67+
*/
68+
public function getResponses(array $seqs, mixed $options = null): iterable;
69+
}

0 commit comments

Comments
 (0)