diff --git a/src/DI/Helpers/QueuesHelper.php b/src/DI/Helpers/QueuesHelper.php index 35c3207..a7e3ac0 100644 --- a/src/DI/Helpers/QueuesHelper.php +++ b/src/DI/Helpers/QueuesHelper.php @@ -34,8 +34,8 @@ public function getQueueSchema(): Schema 'autoDelete' => Expect::bool(false), 'noWait' => Expect::bool(false), 'arguments' => Expect::array(), - 'dlx' => Expect::int()->min(1)->required(false)->before( - static fn(string $time): int => (int) strtotime($time, 0) + 'dlx' => Expect::type('int|bool')->required(false)->before( + fn (mixed $item) => $this->normalizeDlxEntry($item) ), 'autoCreate' => Expect::int( AbstractDataBag::AutoCreateLazy @@ -73,4 +73,12 @@ public function setup(ContainerBuilder $builder, array $config = []): ServiceDef ->setFactory(QueueFactory::class) ->setArguments([$queuesDataBag]); } + + protected function normalizeDlxEntry(string|bool $value): int|bool + { + if (is_string($value)) { + return (int) strtotime($value, 0); + } + return $value; + } } diff --git a/src/DI/RabbitMQExtension.php b/src/DI/RabbitMQExtension.php index 9554ee2..201a24d 100644 --- a/src/DI/RabbitMQExtension.php +++ b/src/DI/RabbitMQExtension.php @@ -123,40 +123,47 @@ protected function processExtensions(array &$config): void continue; } - $exchangeOut = $name . '.dlx-out'; - $exchangeIn = $name . '.dlx-in'; - - $queueDlx = sprintf('%s.dlx-%s', $name, (string) $data['dlx']); + $exchangeOut = "{$name}.dlx-out"; + $exchangeIn = "{$name}.dlx-in"; # Setup dead letter exchange $config['queues'][$name]['arguments']['x-dead-letter-exchange'] = $exchangeIn; - # DLX Exchange: will pass msg to queue - $config['exchanges'][$exchangeOut] = $this->exchangesHelper->processConfiguration([ - 'connection' => $data['connection'], - 'type' => ExchangesHelper::ExchangeTypes[3], - 'queueBindings' => [ - $name => [], - ], - ]); + # Prepare variables + $dlxSuffix = \is_numeric($data['dlx']) ? '-' . $data['dlx'] : ''; + $queueDlxName = "{$name}.dlx"; + $queueDlxArguments = []; + + if (!is_bool($data['dlx'])) { + $queueDlxName .= $dlxSuffix; + $queueDlxArguments = [ + 'x-dead-letter-exchange' => $exchangeOut, + 'x-message-ttl' => $data['dlx'] * 1000, + ]; + + $config['exchanges'][$exchangeOut] = $this->exchangesHelper->processConfiguration([ + 'connection' => $data['connection'], + 'type' => ExchangesHelper::ExchangeTypes[3], + 'queueBindings' => [ + $name => [], + ], + ]); + } # DLX Exchange: will pass msg to dlx queue $config['exchanges'][$exchangeIn] = $this->exchangesHelper->processConfiguration([ 'connection' => $data['connection'], 'type' => ExchangesHelper::ExchangeTypes[3], 'queueBindings' => [ - $queueDlx => [] + $queueDlxName => [] ] ]); # Expand dlx into new queues and exchange for them - $config['queues'][$queueDlx] = $this->queuesHelper->processConfiguration([ + $config['queues'][$queueDlxName] = $this->queuesHelper->processConfiguration([ 'connection' => $data['connection'], 'autoCreate' => true, - 'arguments' => [ - 'x-dead-letter-exchange' => $exchangeOut, - 'x-message-ttl' => $data['dlx'] * 1000, - ] + 'arguments' => $queueDlxArguments, ]); unset($config['queues'][$name]['dlx']);