|
2 | 2 |
|
3 | 3 | namespace Enqueue\LaravelQueue;
|
4 | 4 |
|
| 5 | +use Enqueue\AmqpTools\DelayStrategy; |
| 6 | +use Enqueue\AmqpTools\DelayStrategyAware; |
| 7 | +use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; |
| 8 | +use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; |
| 9 | +use Enqueue\ConnectionFactoryFactory; |
| 10 | +use Enqueue\ConnectionFactoryFactoryInterface; |
| 11 | +use Illuminate\Contracts\Queue\Queue as QueueContract; |
5 | 12 | use Illuminate\Queue\Connectors\ConnectorInterface;
|
6 |
| -use Interop\Queue\ConnectionFactory; |
| 13 | +use Interop\Amqp\AmqpContext; |
7 | 14 |
|
8 | 15 | class Connector implements ConnectorInterface
|
9 | 16 | {
|
10 |
| - /** |
11 |
| - * {@inheritdoc} |
12 |
| - */ |
13 |
| - public function connect(array $config) |
| 17 | + public function connect(array $config): QueueContract |
14 | 18 | {
|
15 | 19 | $config = array_replace([
|
16 |
| - 'connection_factory_class' => null, |
| 20 | + 'dsn' => null, |
| 21 | + 'factory_class' => null, |
17 | 22 | 'queue' => 'default',
|
18 | 23 | 'time_to_run' => 0,
|
19 | 24 | ], $config);
|
20 | 25 |
|
21 |
| - if (empty($config['connection_factory_class'])) { |
22 |
| - throw new \LogicException('The "connection_factory_class" option is required'); |
23 |
| - } |
| 26 | + $queue = $config['queue']; |
| 27 | + $timeToRum = $config['time_to_run']; |
| 28 | + $connectionFactoryFactoryClass = $config['factory_class'] ?? ConnectionFactoryFactory::class; |
24 | 29 |
|
25 |
| - $factoryClass = $config['connection_factory_class']; |
26 |
| - if (false == class_exists($factoryClass)) { |
27 |
| - throw new \LogicException(sprintf('The "connection_factory_class" option "%s" is not a class', $factoryClass)); |
28 |
| - } |
| 30 | + unset($config['factory_class']); |
29 | 31 |
|
30 |
| - $rc = new \ReflectionClass($factoryClass); |
31 |
| - if (false == $rc->implementsInterface(ConnectionFactory::class)) { |
32 |
| - throw new \LogicException(sprintf('The "connection_factory_class" option must contain a class that implements "%s" but it is not', ConnectionFactory::class)); |
33 |
| - } |
| 32 | + /** @var ConnectionFactoryFactoryInterface $factory */ |
| 33 | + $factory = new $connectionFactoryFactoryClass(); |
| 34 | + $connection = $factory->create($config); |
| 35 | + $context = $connection->createContext(); |
34 | 36 |
|
35 |
| - /** @var ConnectionFactory $factory */ |
36 |
| - $factory = new $factoryClass($config); |
| 37 | + if ($context instanceof AmqpContext) { |
| 38 | + $config = array_replace(['delay_strategy' => 'rabbitmq_dlx'], $config); |
| 39 | + |
| 40 | + if ($context instanceof DelayStrategyAware && 'rabbitmq_dlx' == $config['delay_strategy']) { |
| 41 | + $context->setDelayStrategy(new RabbitMqDlxDelayStrategy()); |
| 42 | + } |
| 43 | + if ($context instanceof DelayStrategyAware && 'rabbitmq_delay_plugin' == $config['delay_strategy']) { |
| 44 | + $context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy()); |
| 45 | + } |
| 46 | + if ($context instanceof DelayStrategyAware && $config['delay_strategy'] instanceof DelayStrategy) { |
| 47 | + $context->setDelayStrategy($config['delay_strategy']); |
| 48 | + } |
| 49 | + |
| 50 | + return new AmqpQueue($context, $queue, $timeToRum); |
| 51 | + } |
37 | 52 |
|
38 |
| - return new Queue($factory->createContext(), $config['queue'], $config['time_to_run']); |
| 53 | + return new Queue($context, $queue, $timeToRum); |
39 | 54 | }
|
40 | 55 | }
|
0 commit comments