Skip to content

Commit 96a55a9

Browse files
authored
Add possibility to have more routing keys for one exchange (#61)
1 parent 4d51745 commit 96a55a9

File tree

5 files changed

+55
-22
lines changed

5 files changed

+55
-22
lines changed

.docs/README.md

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ rabbitmq:
5252
type: fanout
5353
queueBindings:
5454
testQueue:
55+
# use only one option from `routingKey` and `routingKeys`, otherwise an exception will be thrown
5556
routingKey: testRoutingKey
57+
# routingKeys:
58+
# - testRoutingKey1
59+
# - testRoutingKey2
5660
# force exchange declare on first exchange operation during request
5761
# autoCreate: true
5862
@@ -160,14 +164,14 @@ final class LongRunningTestQueue
160164
* @var Producer
161165
*/
162166
private $testProducer;
163-
167+
164168
/**
165-
* @var DataProvider Some data provider
169+
* @var DataProvider Some data provider
166170
*/
167171
private $dataProvider;
168-
172+
169173
/**
170-
* @var bool
174+
* @var bool
171175
*/
172176
private $running;
173177

@@ -177,15 +181,15 @@ final class LongRunningTestQueue
177181
$this->testProducer = $testProducer;
178182
$this->dataProvider = $dataProvider;
179183
}
180-
184+
181185
public function run(): void {
182186
do {
183187
$message = $this->dataProvider->getMessage();
184188
if (!$message) {
185189
$this->testProducer->sendHeartbeat();
186190
continue;
187191
}
188-
192+
189193
$this->publish($message);
190194
} while ($this->running);
191195
}
@@ -265,16 +269,16 @@ final class TestConsumer
265269
foreach($messages as $message) {
266270
$data[$message->deliveryTag] = json_decode($message->content);
267271
}
268-
272+
269273
/**
270274
* @todo bulk message action
271-
*/
272-
275+
*/
276+
273277
foreach(array_keys($data) as $tag) {
274278
$return[$tag] = IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT
275279
}
276-
277-
return $return;
280+
281+
return $return;
278282
}
279283

280284
}

src/DI/Helpers/ExchangesHelper.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ final class ExchangesHelper extends AbstractHelper
3838
*/
3939
private array $queueBindingDefaults = [
4040
'routingKey' => '',
41+
'routingKeys' => [],
4142
'noWait' => false,
4243
'arguments' => [],
4344
];
@@ -67,12 +68,21 @@ public function setup(ContainerBuilder $builder, array $config = []): ServiceDef
6768

6869
if ($exchangeConfig['queueBindings'] !== []) {
6970
foreach ($exchangeConfig['queueBindings'] as $queueName => $queueBindingData) {
70-
$queueBindingData['routingKey'] = (string) $queueBindingData['routingKey'];
71+
if (isset($queueBindingData['routingKey']) && isset($queueBindingData['routingKeys'])) {
72+
throw new \InvalidArgumentException(
73+
"Options `routingKey` and `routingKeys` cannot be specified at the same time"
74+
);
75+
}
7176

72-
$exchangeConfig['queueBindings'][$queueName] = $this->extension->validateConfig(
77+
$queueBindingConfig = $this->extension->validateConfig(
7378
$this->queueBindingDefaults,
7479
$queueBindingData
7580
);
81+
82+
$queueBindingConfig['routingKey'] = (string) $queueBindingConfig['routingKey'];
83+
$queueBindingConfig['routingKeys'] = array_map('strval', (array) $queueBindingConfig['routingKeys']);
84+
85+
$exchangeConfig['queueBindings'][$queueName] = $queueBindingConfig;
7686
}
7787
}
7888

src/Exchange/ExchangeDeclarator.php

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,22 @@ public function declareExchange(string $name): void
5252
if ($exchangeData['queueBindings'] !== []) {
5353
foreach ($exchangeData['queueBindings'] as $queueName => $queueBinding) {
5454
$queue = $this->queueFactory->getQueue($queueName);
55+
56+
if ($queueBinding['routingKeys'] !== []) {
57+
$routingKeysToBind = $queueBinding['routingKeys'];
58+
} else {
59+
$routingKeysToBind = [$queueBinding['routingKey']];
60+
}
5561

56-
$connection->getChannel()->queueBind(
57-
$queue->getName(),
58-
$name,
59-
$queueBinding['routingKey'],
60-
$queueBinding['noWait'],
61-
$queueBinding['arguments']
62-
);
62+
foreach ($routingKeysToBind as $routingKey) {
63+
$connection->getChannel()->queueBind(
64+
$queue->getName(),
65+
$name,
66+
$routingKey,
67+
$queueBinding['noWait'],
68+
$queueBinding['arguments']
69+
);
70+
}
6371
}
6472
}
6573
}

src/Exchange/ExchangeFactory.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ private function create(string $name): IExchange
7878

7979
$queueBindings[] = new QueueBinding(
8080
$queue,
81-
$queueBinding['routingKey']
81+
$queueBinding['routingKey'],
82+
...$queueBinding['routingKeys']
8283
);
8384
}
8485
}

src/Exchange/QueueBinding.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,25 @@
44

55
namespace Contributte\RabbitMQ\Exchange;
66

7+
use Contributte\RabbitMQ\Exchange\Exception\QueueBindingException;
78
use Contributte\RabbitMQ\Queue\IQueue;
89

910
final class QueueBinding
1011
{
1112

1213
private IQueue $queue;
1314
private string $routingKey;
15+
private array $routingKeys;
1416

1517

1618
public function __construct(
1719
IQueue $queue,
18-
string $routingKey
20+
string $routingKey,
21+
string ...$routingKeys
1922
) {
2023
$this->queue = $queue;
2124
$this->routingKey = $routingKey;
25+
$this->routingKeys = $routingKeys;
2226
}
2327

2428

@@ -32,4 +36,10 @@ public function getRoutingKey(): string
3236
{
3337
return $this->routingKey;
3438
}
39+
40+
41+
public function getRoutingKeys(): array
42+
{
43+
return $this->routingKeys;
44+
}
3545
}

0 commit comments

Comments
 (0)