From 5c1026b5c1816622eadab0fea4927cb9fc2cb45b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 27 Apr 2017 14:41:29 +0300 Subject: [PATCH] Sync with change in Enqueue 0.3.x version. --- Async/ElasticaPopulateProcessor.php | 25 +++++++++++++------ Elastica/AsyncDoctrineOrmProvider.php | 10 ++++---- .../PurgeFosElasticPopulateQueueListener.php | 10 ++++---- README.md | 6 ++--- composer.json | 3 ++- 5 files changed, 32 insertions(+), 22 deletions(-) diff --git a/Async/ElasticaPopulateProcessor.php b/Async/ElasticaPopulateProcessor.php index 164a4ab..6148ded 100644 --- a/Async/ElasticaPopulateProcessor.php +++ b/Async/ElasticaPopulateProcessor.php @@ -1,13 +1,14 @@ getReplyTo()) { return self::REJECT; @@ -54,11 +55,11 @@ public function process(Message $message, Context $context) } /** - * @param Context $context - * @param Message $message + * @param PsrContext $context + * @param PsrMessage $message * @param bool $successful */ - private function sendReply(Context $context, Message $message, $successful) + private function sendReply(PsrContext $context, PsrMessage $message, $successful) { $replyMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders()); $replyMessage->setProperty('fos-populate-successful', (int) $successful); @@ -67,4 +68,12 @@ private function sendReply(Context $context, Message $message, $successful) $context->createProducer()->send($replyQueue, $replyMessage); } + + /** + * {@inheritdoc} + */ + public static function getSubscribedQueues() + { + return ['fos_elastica_populate']; + } } diff --git a/Elastica/AsyncDoctrineOrmProvider.php b/Elastica/AsyncDoctrineOrmProvider.php index 50c9ee0..2a8060c 100644 --- a/Elastica/AsyncDoctrineOrmProvider.php +++ b/Elastica/AsyncDoctrineOrmProvider.php @@ -1,7 +1,7 @@ context = $context; } @@ -42,7 +42,7 @@ protected function doPopulate($options, \Closure $loggerClosure = null) $nbObjects = $this->countObjects($queryBuilder); $offset = $options['offset']; - $queue = $this->context->createQueue('fos_elastica.populate'); + $queue = $this->context->createQueue('fos_elastica_populate'); $resultQueue = $this->context->createTemporaryQueue(); $consumer = $this->context->createConsumer($resultQueue); diff --git a/Listener/PurgeFosElasticPopulateQueueListener.php b/Listener/PurgeFosElasticPopulateQueueListener.php index 94e4f99..5ce59c8 100644 --- a/Listener/PurgeFosElasticPopulateQueueListener.php +++ b/Listener/PurgeFosElasticPopulateQueueListener.php @@ -1,21 +1,21 @@ context = $context; } @@ -23,7 +23,7 @@ public function __construct(Context $context) public function onPreIndexPopulate(IndexPopulateEvent $event) { if (method_exists($this->context, 'purge')) { - $queue = $this->context->createQueue('fos_elastica.populate'); + $queue = $this->context->createQueue('fos_elastica_populate'); $this->context->purge($queue); diff --git a/README.md b/README.md index 02f152d..c216c89 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ enqueue: ``` Sure you can configure other transports like: [rabbitmq, amqp, stomp and so on](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/config_reference.md) -Create a `fos_elastica.populate` queue on broker side, if needed. +Create a `fos_elastica_populate` queue on broker side, if needed. ## Usage @@ -87,7 +87,7 @@ $ ENQUEUE_ELASTICA_DISABLE_ASYNC=1 ./bin/console fos:elastica:populate and have pull of consumer commands run somewhere, run them as many as you'd like ```bash -$ ./bin/console enqueue:transport:consume fos_elastica.populate enqueue_elastica.populate_processor +$ ./bin/console enqueue:transport:consume enqueue_elastica.populate_processor -vv ``` We suggest to use [supervisor](http://supervisord.org/) on production to control numbers of consumers and restart them. @@ -97,7 +97,7 @@ Here's config example ``` # cat /etc/supervisor/conf.d/fos_elastica_populate.conf [program:fos_elastica_populate] -command=/mqs/symfony/bin/console enqueue:transport:consume fos_elastica.populate enqueue_elastica.populate_processor +command=/mqs/symfony/bin/console enqueue:transport:consume fos_elastica_populate enqueue_elastica.populate_processor process_name=%(program_name)s_%(process_num)02d numprocs=10 autostart=true diff --git a/composer.json b/composer.json index 26b88c9..8816c3c 100644 --- a/composer.json +++ b/composer.json @@ -7,7 +7,8 @@ "require": { "php": ">=5.6", "friendsofsymfony/elastica-bundle": "^3", - "enqueue/enqueue-bundle": "^0.2" + "enqueue/enqueue-bundle": "^0.3.5", + "enqueue/enqueue": "^0.3.5" }, "autoload": { "psr-4": { "Enqueue\\ElasticaBundle\\": "" }