Skip to content

Commit 088f9ea

Browse files
committed
[doctrine] sync index with object change
1 parent 2995b54 commit 088f9ea

File tree

8 files changed

+182
-143
lines changed

8 files changed

+182
-143
lines changed

DependencyInjection/Configuration.php

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Enqueue\Bundle\DependencyInjection;
3+
namespace Enqueue\ElasticaBundle\DependencyInjection;
44

55
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
66
use Symfony\Component\Config\Definition\ConfigurationInterface;
@@ -16,16 +16,22 @@ public function getConfigTreeBuilder()
1616
$rootNode = $tb->root('enqueue_elastica');
1717
$rootNode
1818
->children()
19-
->arrayNode('doctrine_queue_listeners')
20-
->prototype('array')
21-
->addDefaultsIfNotSet()
22-
->children()
23-
->booleanNode('on_insert')->defaultTrue()->end()
24-
->booleanNode('on_update')->defaultTrue()->end()
25-
->booleanNode('on_remove')->defaultTrue()->end()
26-
->scalarNode('index_name')->isRequired()->cannotBeEmpty()->end()
27-
->scalarNode('type_name')->isRequired()->cannotBeEmpty()->end()
28-
->scalarNode('model_class')->isRequired()->cannotBeEmpty()->end()
19+
->arrayNode('doctrine')
20+
->children()
21+
->arrayNode('queue_listeners')
22+
->prototype('array')
23+
->addDefaultsIfNotSet()
24+
->children()
25+
->booleanNode('insert')->defaultTrue()->end()
26+
->booleanNode('update')->defaultTrue()->end()
27+
->booleanNode('remove')->defaultTrue()->end()
28+
->scalarNode('connection')->defaultValue('default')->cannotBeEmpty()->end()
29+
->scalarNode('index_name')->isRequired()->cannotBeEmpty()->end()
30+
->scalarNode('type_name')->isRequired()->cannotBeEmpty()->end()
31+
->scalarNode('model_class')->isRequired()->cannotBeEmpty()->end()
32+
->scalarNode('model_id')->defaultValue('id')->cannotBeEmpty()->end()->end()
2933
;
34+
35+
return $tb;
3036
}
3137
}

DependencyInjection/EnqueueElasticaExtension.php

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@ class EnqueueElasticaExtension extends Extension
1616
*/
1717
public function load(array $configs, ContainerBuilder $container)
1818
{
19-
$configuration = $this->getConfiguration($configs, $container);
20-
$config = $this->processConfiguration($configuration, $configs);
19+
$config = $this->processConfiguration(new Configuration(), $configs);
2120

2221
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
2322
$loader->load('services.yml');
2423

25-
if (isset($config['doctrine_queue_listeners'])) {
26-
foreach ($config['doctrine_queue_listeners'] as $listenerConfig) {
24+
if (false == empty($config['doctrine']['queue_listeners'])) {
25+
foreach ($config['doctrine']['queue_listeners'] as $listenerConfig) {
2726
$listenerId = sprintf(
2827
'enqueue_elastica.doctrine_queue_listener.%s.%s',
2928
$listenerConfig['index_name'],
@@ -32,9 +31,9 @@ public function load(array $configs, ContainerBuilder $container)
3231

3332
$container->register($listenerId, SyncIndexWithObjectChangeListener::class)
3433
->addArgument(new Reference('enqueue.transport.context'))
35-
->addArgument($listenerConfig['modelClass'])
34+
->addArgument($listenerConfig['model_class'])
3635
->addArgument($listenerConfig)
37-
->addTag('doctrine.event_subscriber')
36+
->addTag('doctrine.event_subscriber', ['connection' => $listenerConfig['connection']])
3837
;
3938
}
4039
}

Doctrine/Queue/Commands.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Doctrine\Queue;
3+
4+
final class Commands
5+
{
6+
const SYNC_INDEX_WITH_OBJECT_CHANGE = 'fos_elastica_doctrine_orm_sync_index_with_object_change';
7+
8+
private function __construct()
9+
{
10+
}
11+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Doctrine\Queue;
3+
4+
use Enqueue\Client\CommandSubscriberInterface;
5+
use Enqueue\Consumption\QueueSubscriberInterface;
6+
use Enqueue\Consumption\Result;
7+
use Enqueue\Util\JSON;
8+
use FOS\ElasticaBundle\Persister\PersisterRegistry;
9+
use FOS\ElasticaBundle\Provider\IndexableInterface;
10+
use Interop\Queue\PsrContext;
11+
use Interop\Queue\PsrMessage;
12+
use Interop\Queue\PsrProcessor;
13+
use Symfony\Bridge\Doctrine\RegistryInterface;
14+
15+
final class SyncIndexWithObjectChangeProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
16+
{
17+
const INSERT_ACTION = 'insert';
18+
19+
const UPDATE_ACTION = 'update';
20+
21+
const REMOVE_ACTION = 'remove';
22+
23+
/**
24+
* @var PersisterRegistry
25+
*/
26+
private $persisterRegistry;
27+
28+
/**
29+
* @var IndexableInterface
30+
*/
31+
private $indexable;
32+
33+
/**
34+
* @var RegistryInterface
35+
*/
36+
private $doctrine;
37+
38+
public function __construct(RegistryInterface $doctrine, PersisterRegistry $persisterRegistry, IndexableInterface $indexable)
39+
{
40+
$this->persisterRegistry = $persisterRegistry;
41+
$this->indexable = $indexable;
42+
$this->doctrine = $doctrine;
43+
}
44+
45+
/**
46+
* {@inheritdoc}
47+
*/
48+
public function process(PsrMessage $message, PsrContext $context)
49+
{
50+
$data = JSON::decode($message->getBody());
51+
52+
if (false == isset($data['action'])) {
53+
return Result::reject('The message data misses action');
54+
}
55+
if (false == isset($data['model_class'])) {
56+
return Result::reject('The message data misses model_class');
57+
}
58+
if (false == isset($data['id'])) {
59+
return Result::reject('The message data misses id');
60+
}
61+
if (false == isset($data['index_name'])) {
62+
return Result::reject('The message data misses id');
63+
}
64+
if (false == isset($data['type_name'])) {
65+
return Result::reject('The message data misses id');
66+
}
67+
68+
$action = $data['action'];
69+
$modelClass = $data['model_class'];
70+
$id = $data['id'];
71+
$index = $data['index_name'];
72+
$type = $data['type_name'];
73+
74+
$repository = $this->doctrine->getManagerForClass($modelClass)->getRepository($modelClass);
75+
$persister = $this->persisterRegistry->getPersister($index, $type);
76+
77+
switch ($action) {
78+
case self::UPDATE_ACTION:
79+
if (false == $object = $repository->find($id)) {
80+
$persister->deleteById($id);
81+
82+
return Result::ack(sprintf('The object "%s" with id "%s" could not be found.', $modelClass, $id));
83+
}
84+
85+
if ($persister->handlesObject($object)) {
86+
if ($this->indexable->isObjectIndexable($index, $type, $object)) {
87+
$persister->replaceOne($object);
88+
} else {
89+
$persister->deleteOne($object);
90+
}
91+
}
92+
93+
return self::ACK;
94+
case self::INSERT_ACTION:
95+
if (false == $object = $repository->find($id)) {
96+
$persister->deleteById($id);
97+
98+
return Result::ack(sprintf('The object "%s" with id "%s" could not be found.', $modelClass, $id));
99+
}
100+
101+
if ($persister->handlesObject($object) && $this->indexable->isObjectIndexable($index, $type, $object)) {
102+
$persister->insertOne($object);
103+
}
104+
105+
return self::ACK;
106+
case self::REMOVE_ACTION:
107+
$persister->deleteById($id);
108+
109+
return self::ACK;
110+
default:
111+
return Result::reject(sprintf('The action "%s" is not supported', $action));
112+
}
113+
}
114+
115+
/**
116+
* {@inheritdoc}
117+
*/
118+
public static function getSubscribedCommand()
119+
{
120+
return [
121+
'processorName' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE,
122+
'queueName' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE,
123+
'queueNameHardcoded' => true,
124+
'exclusive' => true,
125+
];
126+
}
127+
128+
/**
129+
* {@inheritdoc}
130+
*/
131+
public static function getSubscribedQueues()
132+
{
133+
return [Commands::SYNC_INDEX_WITH_OBJECT_CHANGE];
134+
}
135+
}

Doctrine/SyncIndexWithObjectChangeListener.php

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
namespace Enqueue\ElasticaBundle\Doctrine;
33

44
use Doctrine\Common\Persistence\Event\LifecycleEventArgs;
5-
use Enqueue\ElasticaBundle\Queue\Commands;
5+
use Enqueue\ElasticaBundle\Doctrine\Queue\Commands;
6+
use Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor as SyncProcessor;
67
use Enqueue\Util\JSON;
78
use Interop\Queue\PsrContext;
89
use Doctrine\Common\EventSubscriber;
@@ -33,22 +34,23 @@ public function __construct(PsrContext $context, $modelClass, array $config)
3334

3435
public function postUpdate(LifecycleEventArgs $args)
3536
{
37+
3638
if ($args->getObject() instanceof $this->modelClass) {
37-
$this->sendUpdateIndexMessage('update', $args);
39+
$this->sendUpdateIndexMessage(SyncProcessor::UPDATE_ACTION, $args);
3840
}
3941
}
4042

4143
public function postPersist(LifecycleEventArgs $args)
4244
{
4345
if ($args->getObject() instanceof $this->modelClass) {
44-
$this->sendUpdateIndexMessage('insert', $args);
46+
$this->sendUpdateIndexMessage(SyncProcessor::INSERT_ACTION, $args);
4547
}
4648
}
4749

4850
public function preRemove(LifecycleEventArgs $args)
4951
{
5052
if ($args->getObject() instanceof $this->modelClass) {
51-
$this->sendUpdateIndexMessage('remove', $args);
53+
$this->sendUpdateIndexMessage(SyncProcessor::REMOVE_ACTION, $args);
5254
}
5355
}
5456

@@ -69,19 +71,20 @@ private function sendUpdateIndexMessage($action, LifecycleEventArgs $args)
6971
{
7072
$object = $args->getObject();
7173

72-
$rp = new \ReflectionProperty($object, $this->config['identifier']);
74+
$rp = new \ReflectionProperty($object, $this->config['model_id']);
7375
$rp->setAccessible(true);
7476
$id = $rp->getValue($object);
7577
$rp->setAccessible(false);
7678

77-
$queue = $this->context->createQueue(Commands::SYNC_INDEX_WITH_DOCTRINE_ORM_OBJECT_CHANGE);
79+
$queue = $this->context->createQueue(Commands::SYNC_INDEX_WITH_OBJECT_CHANGE);
7880

7981
$message = $this->context->createMessage(JSON::encode([
8082
'action' => $action,
81-
'modelClass' => $this->modelClass,
83+
'model_class' => $this->modelClass,
84+
'model_id' => $this->config['model_id'],
8285
'id' => $id,
83-
'indexName' => $this->config['indexName'],
84-
'typeName' => $this->config['typeName'],
86+
'index_name' => $this->config['index_name'],
87+
'type_name' => $this->config['type_name'],
8588
]));
8689

8790
$this->context->createProducer()->send($queue, $message);

Queue/Commands.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ final class Commands
55
{
66
const POPULATE = 'fos_elastica_populate';
77

8-
const SYNC_INDEX_WITH_DOCTRINE_ORM_OBJECT_CHANGE = 'fos_elastica_sync_index_with_doctrine_orm_object_change';
9-
108
private function __construct()
119
{
1210
}

0 commit comments

Comments
 (0)