From 7e0c3fa59a90df35a20b3e62b8b020a20de7be5b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 4 Oct 2018 14:52:06 +0300 Subject: [PATCH 1/2] 0.9 version --- composer.json | 18 ++-- src/AmqpConnector.php | 6 +- src/AmqpQueue.php | 14 +-- src/Command/ConsumeCommand.php | 19 ++++ src/Command/ConsumeMessagesCommand.php | 17 --- src/Command/ProduceCommand.php | 17 +++ src/Command/ProduceMessageCommand.php | 12 --- src/Command/QueuesCommand.php | 12 --- src/Command/RoutesCommand.php | 17 +++ src/Command/SetupBrokerCommand.php | 7 +- src/Command/TopicsCommand.php | 12 --- src/Connector.php | 8 +- src/EnqueueServiceProvider.php | 20 ++-- src/Job.php | 56 ++++------ src/Queue.php | 47 ++++---- src/Tests/ConnectorTest.php | 12 +-- src/Tests/QueueTest.php | 142 ++++++++++++------------- 17 files changed, 206 insertions(+), 230 deletions(-) create mode 100644 src/Command/ConsumeCommand.php delete mode 100644 src/Command/ConsumeMessagesCommand.php create mode 100644 src/Command/ProduceCommand.php delete mode 100644 src/Command/ProduceMessageCommand.php delete mode 100644 src/Command/QueuesCommand.php create mode 100644 src/Command/RoutesCommand.php delete mode 100644 src/Command/TopicsCommand.php diff --git a/composer.json b/composer.json index 3d684c1..264d4ee 100644 --- a/composer.json +++ b/composer.json @@ -5,17 +5,17 @@ "keywords": ["messaging", "queue", "laravel"], "license": "MIT", "require": { - "php": ">=5.6", - "illuminate/queue": "^5.4", - "queue-interop/queue-interop": "^0.6", - "queue-interop/amqp-interop": "^0.7", - "enqueue/amqp-tools": "^0.8" + "php": ">=7.1", + "illuminate/queue": "^5.6", + "queue-interop/amqp-interop": "0.8.x-dev", + "queue-interop/queue-interop": "0.7.x-dev", + "enqueue/amqp-tools": "0.9.x-dev" }, "require-dev": { "phpunit/phpunit": "~5.5", - "enqueue/enqueue": "^0.8@dev", - "enqueue/null": "^0.8@dev", - "enqueue/test": "^0.8@dev" + "enqueue/enqueue": "0.9.x-dev", + "enqueue/null": "0.9.x-dev", + "enqueue/test": "0.9.x-dev" }, "autoload": { "psr-4": { "Enqueue\\LaravelQueue\\": "src/" }, @@ -28,7 +28,7 @@ }, "extra": { "branch-alias": { - "dev-master": "0.8.x-dev" + "dev-master": "0.9.x-dev" } } } diff --git a/src/AmqpConnector.php b/src/AmqpConnector.php index 50b03ac..9023103 100644 --- a/src/AmqpConnector.php +++ b/src/AmqpConnector.php @@ -16,12 +16,10 @@ public function connect(array $config) $config = array_replace(['delay_strategy' => 'rabbitmq_dlx'], $config); - - /** @var AmqpContext $amqpContext */ - $amqpContext = $queue->getPsrContext(); + $amqpContext = $queue->getQueueInteropContext(); if (false == $amqpContext instanceof AmqpContext) { - throw new \LogicException(sprintf('The context must be instance of "%s" but got "%s"', AmqpContext::class, get_class($queue->getPsrContext()))); + throw new \LogicException(sprintf('The context must be instance of "%s" but got "%s"', AmqpContext::class, get_class($queue->getQueueInteropContext()))); } if ($amqpContext instanceof DelayStrategyAware && 'rabbitmq_dlx' == $config['delay_strategy']) { diff --git a/src/AmqpQueue.php b/src/AmqpQueue.php index e06bfd1..9a67335 100644 --- a/src/AmqpQueue.php +++ b/src/AmqpQueue.php @@ -5,18 +5,18 @@ use Interop\Amqp\AmqpContext; /** - * @method AmqpContext getPsrContext() + * @method AmqpContext getQueueInteropContext() */ class AmqpQueue extends Queue { /** * {@inheritdoc} * - * @param AmqpContext $psrContext + * @param AmqpContext $amqpContext */ - public function __construct(AmqpContext $psrContext, $queueName, $timeToRun) + public function __construct(AmqpContext $amqpContext, $queueName, $timeToRun) { - parent::__construct($psrContext, $queueName, $timeToRun); + parent::__construct($amqpContext, $queueName, $timeToRun); } /** @@ -54,9 +54,9 @@ public function pop($queue = null) */ protected function declareQueue($queue = null) { - $psrQueue = $this->getPsrContext()->createQueue($this->getQueue($queue)); - $psrQueue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE); + $queue = $this->getQueueInteropContext()->createQueue($this->getQueue($queue)); + $queue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE); - $this->getPsrContext()->declareQueue($psrQueue); + $this->getQueueInteropContext()->declareQueue($queue); } } diff --git a/src/Command/ConsumeCommand.php b/src/Command/ConsumeCommand.php new file mode 100644 index 0000000..e38414d --- /dev/null +++ b/src/Command/ConsumeCommand.php @@ -0,0 +1,19 @@ + $client->getQueueConsumer(), + 'driver' => $client->getDriver(), + 'processor' => $client->getDelegateProcessor() + ]); + + parent::__construct($container, 'queue_consumer', 'driver', 'processor'); + } +} diff --git a/src/Command/ConsumeMessagesCommand.php b/src/Command/ConsumeMessagesCommand.php deleted file mode 100644 index cf70883..0000000 --- a/src/Command/ConsumeMessagesCommand.php +++ /dev/null @@ -1,17 +0,0 @@ -getQueueConsumer(), - $client->getDelegateProcessor(), - $client->getQueueMetaRegistry(), - $client->getDriver() - ); - } -} diff --git a/src/Command/ProduceCommand.php b/src/Command/ProduceCommand.php new file mode 100644 index 0000000..941e72b --- /dev/null +++ b/src/Command/ProduceCommand.php @@ -0,0 +1,17 @@ + $client->getProducer(), + ]); + + parent::__construct($container, 'producer'); + } +} diff --git a/src/Command/ProduceMessageCommand.php b/src/Command/ProduceMessageCommand.php deleted file mode 100644 index 8b85469..0000000 --- a/src/Command/ProduceMessageCommand.php +++ /dev/null @@ -1,12 +0,0 @@ -getProducer()); - } -} diff --git a/src/Command/QueuesCommand.php b/src/Command/QueuesCommand.php deleted file mode 100644 index 7d146f2..0000000 --- a/src/Command/QueuesCommand.php +++ /dev/null @@ -1,12 +0,0 @@ -getQueueMetaRegistry()); - } -} diff --git a/src/Command/RoutesCommand.php b/src/Command/RoutesCommand.php new file mode 100644 index 0000000..d207ded --- /dev/null +++ b/src/Command/RoutesCommand.php @@ -0,0 +1,17 @@ + $client->getDriver(), + ]); + + parent::__construct($container, 'driver'); + } +} diff --git a/src/Command/SetupBrokerCommand.php b/src/Command/SetupBrokerCommand.php index 1fcdea8..d04ce75 100644 --- a/src/Command/SetupBrokerCommand.php +++ b/src/Command/SetupBrokerCommand.php @@ -2,12 +2,17 @@ namespace Enqueue\LaravelQueue\Command; +use Enqueue\Container\Container; use Enqueue\SimpleClient\SimpleClient; class SetupBrokerCommand extends \Enqueue\Symfony\Client\SetupBrokerCommand { public function __construct(SimpleClient $client) { - parent::__construct($client->getDriver()); + $container = new Container([ + 'driver' => $client->getDriver(), + ]); + + parent::__construct($container, 'driver'); } } \ No newline at end of file diff --git a/src/Command/TopicsCommand.php b/src/Command/TopicsCommand.php deleted file mode 100644 index 61cf9b3..0000000 --- a/src/Command/TopicsCommand.php +++ /dev/null @@ -1,12 +0,0 @@ -getTopicMetaRegistry()); - } -} diff --git a/src/Connector.php b/src/Connector.php index 390c0d0..0dc1991 100644 --- a/src/Connector.php +++ b/src/Connector.php @@ -3,7 +3,7 @@ namespace Enqueue\LaravelQueue; use Illuminate\Queue\Connectors\ConnectorInterface; -use Interop\Queue\PsrConnectionFactory; +use Interop\Queue\ConnectionFactory; class Connector implements ConnectorInterface { @@ -28,11 +28,11 @@ public function connect(array $config) } $rc = new \ReflectionClass($factoryClass); - if (false == $rc->implementsInterface(PsrConnectionFactory::class)) { - throw new \LogicException(sprintf('The "connection_factory_class" option must contain a class that implements "%s" but it is not', PsrConnectionFactory::class)); + if (false == $rc->implementsInterface(ConnectionFactory::class)) { + throw new \LogicException(sprintf('The "connection_factory_class" option must contain a class that implements "%s" but it is not', ConnectionFactory::class)); } - /** @var PsrConnectionFactory $factory */ + /** @var ConnectionFactory $factory */ $factory = new $factoryClass($config); return new Queue($factory->createContext(), $config['queue'], $config['time_to_run']); diff --git a/src/EnqueueServiceProvider.php b/src/EnqueueServiceProvider.php index a7f73e5..44f5372 100644 --- a/src/EnqueueServiceProvider.php +++ b/src/EnqueueServiceProvider.php @@ -2,28 +2,21 @@ namespace Enqueue\LaravelQueue; -use Enqueue\LaravelQueue\Command\ConsumeMessagesCommand; -use Enqueue\LaravelQueue\Command\ProduceMessageCommand; -use Enqueue\LaravelQueue\Command\QueuesCommand; +use Enqueue\LaravelQueue\Command\ConsumeCommand; +use Enqueue\LaravelQueue\Command\ProduceCommand; +use Enqueue\LaravelQueue\Command\RoutesCommand; use Enqueue\LaravelQueue\Command\SetupBrokerCommand; -use Enqueue\LaravelQueue\Command\TopicsCommand; use Enqueue\SimpleClient\SimpleClient; use Illuminate\Queue\QueueManager; use Illuminate\Support\ServiceProvider; class EnqueueServiceProvider extends ServiceProvider { - /** - * {@inheritdoc} - */ public function boot() { $this->bootInteropQueueDriver(); } - /** - * {@inheritdoc} - */ public function register() { $this->registerClient(); @@ -49,10 +42,9 @@ private function registerClient() if ($this->app->runningInConsole()) { $this->commands([ SetupBrokerCommand::class, - ProduceMessageCommand::class, - QueuesCommand::class, - TopicsCommand::class, - ConsumeMessagesCommand::class, + ProduceCommand::class, + RoutesCommand::class, + ConsumeCommand::class, ]); } } diff --git a/src/Job.php b/src/Job.php index 1d4e7d4..1095345 100644 --- a/src/Job.php +++ b/src/Job.php @@ -5,40 +5,33 @@ use Illuminate\Container\Container; use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Queue\Jobs\Job as BaseJob; -use Interop\Queue\PsrConsumer; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrMessage; +use Interop\Queue\Consumer; +use Interop\Queue\Context; +use Interop\Queue\Message; class Job extends BaseJob implements JobContract { /** - * @var PsrContext + * @var Context */ - private $psrContext; + private $context; /** - * @var PsrConsumer + * @var Consumer */ - private $psrConsumer; + private $consumer; /** - * @var PsrMessage + * @var Message */ - private $psrMessage; + private $message; - /** - * @param Container $container - * @param PsrContext $psrContext - * @param PsrConsumer $psrConsumer - * @param PsrMessage $psrMessage - * @param string $connectionName - */ - public function __construct(Container $container, PsrContext $psrContext, PsrConsumer $psrConsumer, PsrMessage $psrMessage, $connectionName) + public function __construct(Container $container, Context $context, Consumer $consumer, Message $message, $connectionName) { $this->container = $container; - $this->psrContext = $psrContext; - $this->psrConsumer = $psrConsumer; - $this->psrMessage = $psrMessage; + $this->context = $context; + $this->consumer = $consumer; + $this->message = $message; $this->connectionName = $connectionName; } @@ -49,7 +42,7 @@ public function delete() { parent::delete(); - $this->psrConsumer->acknowledge($this->psrMessage); + $this->consumer->acknowledge($this->message); } /** @@ -61,35 +54,26 @@ public function release($delay = 0) throw new \LogicException('To be implemented'); } - $requeueMessage = clone $this->psrMessage; + $requeueMessage = clone $this->message; $requeueMessage->setProperty('x-attempts', $this->attempts() + 1); - $this->psrContext->createProducer()->send($this->psrConsumer->getQueue(), $requeueMessage); + $this->context->createProducer()->send($this->consumer->getQueue(), $requeueMessage); - $this->psrConsumer->acknowledge($this->psrMessage); + $this->consumer->acknowledge($this->message); } - /** - * {@inheritdoc} - */ public function getQueue() { - return $this->psrConsumer->getQueue()->getQueueName(); + return $this->consumer->getQueue()->getQueueName(); } - /** - * {@inheritdoc} - */ public function attempts() { - return $this->psrMessage->getProperty('x-attempts', 1); + return $this->message->getProperty('x-attempts', 1); } - /** - * {@inheritdoc} - */ public function getRawBody() { - return $this->psrMessage->getBody(); + return $this->message->getBody(); } } diff --git a/src/Queue.php b/src/Queue.php index fae0e3f..deea238 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -4,7 +4,7 @@ use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Queue\Queue as BaseQueue; -use Interop\Queue\PsrContext; +use Interop\Queue\Context; class Queue extends BaseQueue implements QueueContract { @@ -19,18 +19,18 @@ class Queue extends BaseQueue implements QueueContract protected $timeToRun; /** - * @var PsrContext + * @var Context */ - protected $psrContext; + protected $context; /** - * @param PsrContext $psrContext - * @param string $queueName - * @param int $timeToRun + * @param Context $amqpContext + * @param string $queueName + * @param int $timeToRun */ - public function __construct(PsrContext $psrContext, $queueName, $timeToRun) + public function __construct(Context $amqpContext, $queueName, $timeToRun) { - $this->psrContext = $psrContext; + $this->context = $amqpContext; $this->queueName = $queueName; $this->timeToRun = $timeToRun; } @@ -56,9 +56,9 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->psrContext->createProducer()->send( + $this->context->createProducer()->send( $this->getQueue($queue), - $this->psrContext->createMessage($payload) + $this->context->createMessage($payload) ); } @@ -67,29 +67,26 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - $message = $this->psrContext->createMessage($this->createPayload($job, $data)); + $message = $this->context->createMessage($this->createPayload($job, $data)); - return $this->psrContext->createProducer() + $this->context->createProducer() ->setDeliveryDelay($this->secondsUntil($delay) * 1000) ->send($this->getQueue($queue), $message) ; } - /** - * {@inheritdoc} - */ public function pop($queue = null) { $queue = $this->getQueue($queue); - $psrConsumer = $this->psrContext->createConsumer($queue); - if ($psrMessage = $psrConsumer->receive(1000)) { // 1 sec + $consumer = $this->context->createConsumer($queue); + if ($message = $consumer->receive(1000)) { // 1 sec return new Job( $this->container, - $this->psrContext, - $psrConsumer, - $psrMessage, + $this->context, + $consumer, + $message, $this->connectionName ); } @@ -100,19 +97,19 @@ public function pop($queue = null) * * @param string|null $queue * - * @return \Interop\Queue\PsrQueue + * @return \Interop\Queue\Queue */ public function getQueue($queue = null) { - return $this->psrContext->createQueue($queue ?: $this->queueName); + return $this->context->createQueue($queue ?: $this->queueName); } /** - * @return PsrContext + * @return Context */ - public function getPsrContext() + public function getQueueInteropContext() { - return $this->psrContext; + return $this->context; } /** diff --git a/src/Tests/ConnectorTest.php b/src/Tests/ConnectorTest.php index f853c38..ac309f3 100644 --- a/src/Tests/ConnectorTest.php +++ b/src/Tests/ConnectorTest.php @@ -8,7 +8,7 @@ use Enqueue\Null\NullContext; use Enqueue\Test\ClassExtensionTrait; use Illuminate\Queue\Connectors\ConnectorInterface; -use Interop\Queue\PsrQueue; +use Interop\Queue\Queue as InteropQueue; use PHPUnit\Framework\TestCase; class ConnectorTest extends TestCase @@ -45,7 +45,7 @@ public function testThrowIfConnectorFactoryClassOptionIsNotValidClass() ]); } - public function testThrowIfConnectorFactoryClassOptionDoesNotImplementPsrConnectionFactoryInterface() + public function testThrowIfConnectorFactoryClassOptionDoesNotImplementConnectionFactoryInterface() { $connector = new Connector(); @@ -71,9 +71,9 @@ public function testShouldSetExpectedOptionsIfNotProvidedOnConnectMethodCall() $queue = $connector->connect(['connection_factory_class' => NullConnectionFactory::class]); - $this->assertInstanceOf(NullContext::class, $queue->getPsrContext()); + $this->assertInstanceOf(NullContext::class, $queue->getQueueInteropContext()); - $this->assertInstanceOf(PsrQueue::class, $queue->getQueue()); + $this->assertInstanceOf(InteropQueue::class, $queue->getQueue()); $this->assertSame('default', $queue->getQueue()->getQueueName()); $this->assertSame(0, $queue->getTimeToRun()); @@ -89,9 +89,9 @@ public function testShouldSetExpectedCustomOptionsIfProvidedOnConnectMethodCall( 'time_to_run' => 123, ]); - $this->assertInstanceOf(NullContext::class, $queue->getPsrContext()); + $this->assertInstanceOf(NullContext::class, $queue->getQueueInteropContext()); - $this->assertInstanceOf(PsrQueue::class, $queue->getQueue()); + $this->assertInstanceOf(InteropQueue::class, $queue->getQueue()); $this->assertSame('theCustomQueue', $queue->getQueue()->getQueueName()); $this->assertSame(123, $queue->getTimeToRun()); diff --git a/src/Tests/QueueTest.php b/src/Tests/QueueTest.php index fea8e6e..f9e275d 100644 --- a/src/Tests/QueueTest.php +++ b/src/Tests/QueueTest.php @@ -12,11 +12,11 @@ use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\Queue as BaseQueue; -use Interop\Queue\PsrConsumer; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrMessage; -use Interop\Queue\PsrProducer; -use Interop\Queue\PsrQueue; +use Interop\Queue\Consumer as InteropConsumer; +use Interop\Queue\Context as InteropContext; +use Interop\Queue\Message as InteropMessage; +use Interop\Queue\Producer as InteropProducer; +use Interop\Queue\Queue as InteropQueue; use PHPUnit\Framework\TestCase; class QueueTest extends TestCase @@ -35,70 +35,70 @@ public function testShouldExtendsBaseQueue() public function testCouldBeConstructedWithExpectedArguments() { - new Queue($this->createPsrContextMock(), 'queueName', 123); + new Queue($this->createInteropContextMock(), 'queueName', 123); } - public function testShouldReturnPsrContextSetInConstructor() + public function testShouldReturnInteropContextSetInConstructor() { - $psrContext = $this->createPsrContextMock(); + $interopContext = $this->createInteropContextMock(); - $queue = new Queue($psrContext, 'queueName', 123); + $queue = new Queue($interopContext, 'queueName', 123); - $this->assertSame($psrContext, $queue->getPsrContext()); + $this->assertSame($interopContext, $queue->getQueueInteropContext()); } public function testShouldReturnTimeToRunSetInConstructor() { - $psrContext = $this->createPsrContextMock(); + $interopContext = $this->createInteropContextMock(); - $queue = new Queue($psrContext, 'queueName', 123); + $queue = new Queue($interopContext, 'queueName', 123); $this->assertSame(123, $queue->getTimeToRun()); } public function testShouldReturnDefaultQueueIfNotNameProvided() { - $psrQueue = new NullQueue('queueName'); + $interopQueue = new NullQueue('queueName'); - $psrContext = $this->createPsrContextMock(); - $psrContext + $interopContext = $this->createInteropContextMock(); + $interopContext ->expects($this->once()) ->method('createQueue') ->with('queueName') - ->willReturn($psrQueue) + ->willReturn($interopQueue) ; - $queue = new Queue($psrContext, 'queueName', 123); + $queue = new Queue($interopContext, 'queueName', 123); - $this->assertSame($psrQueue, $queue->getQueue()); + $this->assertSame($interopQueue, $queue->getQueue()); } public function testShouldReturnCustomQueueIfNameProvided() { - $psrQueue = new NullQueue('theCustomQueueName'); + $interopQueue = new NullQueue('theCustomQueueName'); - $psrContext = $this->createPsrContextMock(); - $psrContext + $interopContext = $this->createInteropContextMock(); + $interopContext ->expects($this->once()) ->method('createQueue') ->with('theCustomQueueName') - ->willReturn($psrQueue) + ->willReturn($interopQueue) ; - $queue = new Queue($psrContext, 'queueName', 123); + $queue = new Queue($interopContext, 'queueName', 123); - $this->assertSame($psrQueue, $queue->getQueue('theCustomQueueName')); + $this->assertSame($interopQueue, $queue->getQueue('theCustomQueueName')); } public function testShouldSendJobAsMessageToExpectedQueue() { - $psrQueue = new NullQueue('theCustomQueueName'); + $interopQueue = new NullQueue('theCustomQueueName'); - $psrProducer = $this->createMock(PsrProducer::class); - $psrProducer + $interopProducer = $this->createMock(InteropProducer::class); + $interopProducer ->expects($this->once()) ->method('send') - ->willReturnCallback(function (PsrQueue $queue, PsrMessage $message) { + ->willReturnCallback(function (InteropQueue $queue, InteropMessage $message) { $this->assertSame('theCustomQueueName', $queue->getQueueName()); $this->assertContains('"displayName":"Enqueue\\\LaravelQueue\\\Tests\\\TestJob"', $message->getBody()); @@ -107,19 +107,19 @@ public function testShouldSendJobAsMessageToExpectedQueue() }) ; - $psrContext = $this->createPsrContextMock(); - $psrContext + $interopContext = $this->createInteropContextMock(); + $interopContext ->expects($this->once()) ->method('createQueue') ->with('theCustomQueueName') - ->willReturn($psrQueue) + ->willReturn($interopQueue) ; - $psrContext + $interopContext ->expects($this->once()) ->method('createProducer') - ->willReturn($psrProducer) + ->willReturn($interopProducer) ; - $psrContext + $interopContext ->expects($this->once()) ->method('createMessage') ->willReturnCallback(function ($body, $properties, $headers) { @@ -127,20 +127,20 @@ public function testShouldSendJobAsMessageToExpectedQueue() }) ; - $queue = new Queue($psrContext, 'queueName', 123); + $queue = new Queue($interopContext, 'queueName', 123); $queue->push(new TestJob(), '', 'theCustomQueueName'); } public function testShouldSendDoRawPush() { - $psrQueue = new NullQueue('theCustomQueueName'); + $interopQueue = new NullQueue('theCustomQueueName'); - $psrProducer = $this->createMock(PsrProducer::class); - $psrProducer + $interopProducer = $this->createMock(InteropProducer::class); + $interopProducer ->expects($this->once()) ->method('send') - ->willReturnCallback(function (PsrQueue $queue, PsrMessage $message) { + ->willReturnCallback(function (InteropQueue $queue, InteropMessage $message) { $this->assertSame('theCustomQueueName', $queue->getQueueName()); $this->assertSame('thePayload', $message->getBody()); @@ -149,19 +149,19 @@ public function testShouldSendDoRawPush() }) ; - $psrContext = $this->createPsrContextMock(); - $psrContext + $interopContext = $this->createInteropContextMock(); + $interopContext ->expects($this->once()) ->method('createQueue') ->with('theCustomQueueName') - ->willReturn($psrQueue) + ->willReturn($interopQueue) ; - $psrContext + $interopContext ->expects($this->once()) ->method('createProducer') - ->willReturn($psrProducer) + ->willReturn($interopProducer) ; - $psrContext + $interopContext ->expects($this->once()) ->method('createMessage') ->willReturnCallback(function ($body, $properties, $headers) { @@ -169,70 +169,70 @@ public function testShouldSendDoRawPush() }) ; - $queue = new Queue($psrContext, 'queueName', 123); + $queue = new Queue($interopContext, 'queueName', 123); $queue->pushRaw('thePayload', 'theCustomQueueName'); } public function testShouldReturnNullIfNoMessageInQueue() { - $psrQueue = new NullQueue('theCustomQueueName'); + $interopQueue = new NullQueue('theCustomQueueName'); - $psrConsumer = $this->createMock(PsrConsumer::class); - $psrConsumer + $interopConsumer = $this->createMock(InteropConsumer::class); + $interopConsumer ->expects($this->once()) ->method('receive') ->with(1000) ->willReturn(null) ; - $psrContext = $this->createPsrContextMock(); - $psrContext + $interopContext = $this->createInteropContextMock(); + $interopContext ->expects($this->once()) ->method('createQueue') ->with('theCustomQueueName') - ->willReturn($psrQueue) + ->willReturn($interopQueue) ; - $psrContext + $interopContext ->expects($this->once()) ->method('createConsumer') - ->with($this->identicalTo($psrQueue)) - ->willReturn($psrConsumer) + ->with($this->identicalTo($interopQueue)) + ->willReturn($interopConsumer) ; - $queue = new Queue($psrContext, 'queueName', 123); + $queue = new Queue($interopContext, 'queueName', 123); $this->assertNull($queue->pop('theCustomQueueName')); } public function testShouldReturnJobForReceivedMessage() { - $psrQueue = new NullQueue('theCustomQueueName'); - $psrMessage = new NullMessage(); + $interopQueue = new NullQueue('theCustomQueueName'); + $interopMessage = new NullMessage(); - $psrConsumer = $this->createMock(PsrConsumer::class); - $psrConsumer + $interopConsumer = $this->createMock(InteropConsumer::class); + $interopConsumer ->expects($this->once()) ->method('receive') ->with(1000) - ->willReturn($psrMessage) + ->willReturn($interopMessage) ; - $psrContext = $this->createPsrContextMock(); - $psrContext + $interopContext = $this->createInteropContextMock(); + $interopContext ->expects($this->once()) ->method('createQueue') ->with('theCustomQueueName') - ->willReturn($psrQueue) + ->willReturn($interopQueue) ; - $psrContext + $interopContext ->expects($this->once()) ->method('createConsumer') - ->with($this->identicalTo($psrQueue)) - ->willReturn($psrConsumer) + ->with($this->identicalTo($interopQueue)) + ->willReturn($interopConsumer) ; - $queue = new Queue($psrContext, 'queueName', 123); + $queue = new Queue($interopContext, 'queueName', 123); $queue->setContainer(new Container()); $job = $queue->pop('theCustomQueueName'); @@ -241,11 +241,11 @@ public function testShouldReturnJobForReceivedMessage() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|PsrContext + * @return \PHPUnit_Framework_MockObject_MockObject|InteropContext */ - private function createPsrContextMock() + private function createInteropContextMock() { - return $this->createMock(PsrContext::class); + return $this->createMock(InteropContext::class); } } From 9b01008d9ff8f7476d6874db05ab1a0daf1900ce Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 22 Oct 2018 08:55:10 +0300 Subject: [PATCH 2/2] Use connection factory factory. --- composer.json | 3 ++- src/AmqpConnector.php | 37 ----------------------------- src/Connector.php | 55 +++++++++++++++++++++++++++---------------- 3 files changed, 37 insertions(+), 58 deletions(-) delete mode 100644 src/AmqpConnector.php diff --git a/composer.json b/composer.json index 5691206..4d84a03 100644 --- a/composer.json +++ b/composer.json @@ -9,7 +9,8 @@ "illuminate/queue": "^5.6", "queue-interop/amqp-interop": "0.8.x-dev", "queue-interop/queue-interop": "0.7.x-dev", - "enqueue/amqp-tools": "0.9.x-dev" + "enqueue/enqueue": "0.9.x-dev", + "enqueue/dsn": "0.9.x-dev" }, "require-dev": { "phpunit/phpunit": "~5.5", diff --git a/src/AmqpConnector.php b/src/AmqpConnector.php deleted file mode 100644 index 9023103..0000000 --- a/src/AmqpConnector.php +++ /dev/null @@ -1,37 +0,0 @@ - 'rabbitmq_dlx'], $config); - - /** @var AmqpContext $amqpContext */ - $amqpContext = $queue->getQueueInteropContext(); - if (false == $amqpContext instanceof AmqpContext) { - throw new \LogicException(sprintf('The context must be instance of "%s" but got "%s"', AmqpContext::class, get_class($queue->getQueueInteropContext()))); - } - - if ($amqpContext instanceof DelayStrategyAware && 'rabbitmq_dlx' == $config['delay_strategy']) { - $amqpContext->setDelayStrategy(new RabbitMqDlxDelayStrategy()); - } - if ($amqpContext instanceof DelayStrategyAware && 'rabbitmq_delay_plugin' == $config['delay_strategy']) { - $amqpContext->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy()); - } - if ($amqpContext instanceof DelayStrategyAware && $config['delay_strategy'] instanceof DelayStrategy) { - $amqpContext->setDelayStrategy($config['delay_strategy']); - } - - return $queue; - } -} diff --git a/src/Connector.php b/src/Connector.php index 0dc1991..7e232c3 100644 --- a/src/Connector.php +++ b/src/Connector.php @@ -2,39 +2,54 @@ namespace Enqueue\LaravelQueue; +use Enqueue\AmqpTools\DelayStrategy; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; +use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; +use Enqueue\ConnectionFactoryFactory; +use Enqueue\ConnectionFactoryFactoryInterface; +use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Queue\Connectors\ConnectorInterface; -use Interop\Queue\ConnectionFactory; +use Interop\Amqp\AmqpContext; class Connector implements ConnectorInterface { - /** - * {@inheritdoc} - */ - public function connect(array $config) + public function connect(array $config): QueueContract { $config = array_replace([ - 'connection_factory_class' => null, + 'dsn' => null, + 'factory_class' => null, 'queue' => 'default', 'time_to_run' => 0, ], $config); - if (empty($config['connection_factory_class'])) { - throw new \LogicException('The "connection_factory_class" option is required'); - } + $queue = $config['queue']; + $timeToRum = $config['time_to_run']; + $connectionFactoryFactoryClass = $config['factory_class'] ?? ConnectionFactoryFactory::class; - $factoryClass = $config['connection_factory_class']; - if (false == class_exists($factoryClass)) { - throw new \LogicException(sprintf('The "connection_factory_class" option "%s" is not a class', $factoryClass)); - } + unset($config['factory_class']); - $rc = new \ReflectionClass($factoryClass); - if (false == $rc->implementsInterface(ConnectionFactory::class)) { - throw new \LogicException(sprintf('The "connection_factory_class" option must contain a class that implements "%s" but it is not', ConnectionFactory::class)); - } + /** @var ConnectionFactoryFactoryInterface $factory */ + $factory = new $connectionFactoryFactoryClass(); + $connection = $factory->create($config); + $context = $connection->createContext(); - /** @var ConnectionFactory $factory */ - $factory = new $factoryClass($config); + if ($context instanceof AmqpContext) { + $config = array_replace(['delay_strategy' => 'rabbitmq_dlx'], $config); + + if ($context instanceof DelayStrategyAware && 'rabbitmq_dlx' == $config['delay_strategy']) { + $context->setDelayStrategy(new RabbitMqDlxDelayStrategy()); + } + if ($context instanceof DelayStrategyAware && 'rabbitmq_delay_plugin' == $config['delay_strategy']) { + $context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy()); + } + if ($context instanceof DelayStrategyAware && $config['delay_strategy'] instanceof DelayStrategy) { + $context->setDelayStrategy($config['delay_strategy']); + } + + return new AmqpQueue($context, $queue, $timeToRum); + } - return new Queue($factory->createContext(), $config['queue'], $config['time_to_run']); + return new Queue($context, $queue, $timeToRum); } }