Skip to content

Commit 7b29fb2

Browse files
authored
Merge pull request #4 from bckp/feature-dlx-without-reenqueue
Add: dlx queue without re-queue
2 parents f35c8a9 + a47dfbd commit 7b29fb2

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

src/DI/Helpers/QueuesHelper.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public function getQueueSchema(): Schema
3434
'autoDelete' => Expect::bool(false),
3535
'noWait' => Expect::bool(false),
3636
'arguments' => Expect::array(),
37-
'dlx' => Expect::int()->min(1)->required(false)->before(
38-
static fn(string $time): int => (int) strtotime($time, 0)
37+
'dlx' => Expect::type('int|bool')->required(false)->before(
38+
fn (mixed $item) => $this->normalizeDlxEntry($item)
3939
),
4040
'autoCreate' => Expect::int(
4141
AbstractDataBag::AutoCreateLazy
@@ -73,4 +73,12 @@ public function setup(ContainerBuilder $builder, array $config = []): ServiceDef
7373
->setFactory(QueueFactory::class)
7474
->setArguments([$queuesDataBag]);
7575
}
76+
77+
protected function normalizeDlxEntry(string|bool $value): int|bool
78+
{
79+
if (is_string($value)) {
80+
return (int) strtotime($value, 0);
81+
}
82+
return $value;
83+
}
7684
}

src/DI/RabbitMQExtension.php

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -123,40 +123,47 @@ protected function processExtensions(array &$config): void
123123
continue;
124124
}
125125

126-
$exchangeOut = $name . '.dlx-out';
127-
$exchangeIn = $name . '.dlx-in';
128-
129-
$queueDlx = sprintf('%s.dlx-%s', $name, (string) $data['dlx']);
126+
$exchangeOut = "{$name}.dlx-out";
127+
$exchangeIn = "{$name}.dlx-in";
130128

131129
# Setup dead letter exchange
132130
$config['queues'][$name]['arguments']['x-dead-letter-exchange'] = $exchangeIn;
133131

134-
# DLX Exchange: will pass msg to queue
135-
$config['exchanges'][$exchangeOut] = $this->exchangesHelper->processConfiguration([
136-
'connection' => $data['connection'],
137-
'type' => ExchangesHelper::ExchangeTypes[3],
138-
'queueBindings' => [
139-
$name => [],
140-
],
141-
]);
132+
# Prepare variables
133+
$dlxSuffix = \is_numeric($data['dlx']) ? '-' . $data['dlx'] : '';
134+
$queueDlxName = "{$name}.dlx";
135+
$queueDlxArguments = [];
136+
137+
if (!is_bool($data['dlx'])) {
138+
$queueDlxName .= $dlxSuffix;
139+
$queueDlxArguments = [
140+
'x-dead-letter-exchange' => $exchangeOut,
141+
'x-message-ttl' => $data['dlx'] * 1000,
142+
];
143+
144+
$config['exchanges'][$exchangeOut] = $this->exchangesHelper->processConfiguration([
145+
'connection' => $data['connection'],
146+
'type' => ExchangesHelper::ExchangeTypes[3],
147+
'queueBindings' => [
148+
$name => [],
149+
],
150+
]);
151+
}
142152

143153
# DLX Exchange: will pass msg to dlx queue
144154
$config['exchanges'][$exchangeIn] = $this->exchangesHelper->processConfiguration([
145155
'connection' => $data['connection'],
146156
'type' => ExchangesHelper::ExchangeTypes[3],
147157
'queueBindings' => [
148-
$queueDlx => []
158+
$queueDlxName => []
149159
]
150160
]);
151161

152162
# Expand dlx into new queues and exchange for them
153-
$config['queues'][$queueDlx] = $this->queuesHelper->processConfiguration([
163+
$config['queues'][$queueDlxName] = $this->queuesHelper->processConfiguration([
154164
'connection' => $data['connection'],
155165
'autoCreate' => true,
156-
'arguments' => [
157-
'x-dead-letter-exchange' => $exchangeOut,
158-
'x-message-ttl' => $data['dlx'] * 1000,
159-
]
166+
'arguments' => $queueDlxArguments,
160167
]);
161168

162169
unset($config['queues'][$name]['dlx']);

0 commit comments

Comments
 (0)