Skip to content

Commit d224147

Browse files
committed
Merge remote-tracking branch 'origin/imported-magento-magento2-31480' into 2.4-develop-pr118
2 parents 2c3bb67 + 051ccce commit d224147

File tree

9 files changed

+193
-26
lines changed

9 files changed

+193
-26
lines changed

app/code/Magento/MessageQueue/Model/ResourceModel/Lock.php

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,46 +5,52 @@
55
*/
66
namespace Magento\MessageQueue\Model\ResourceModel;
77

8-
use \Magento\Framework\MessageQueue\Lock\ReaderInterface;
9-
use \Magento\Framework\MessageQueue\Lock\WriterInterface;
8+
use DateInterval;
9+
use DateTime;
10+
use Magento\Framework\MessageQueue\Lock\ReaderInterface;
11+
use Magento\Framework\MessageQueue\Lock\WriterInterface;
12+
use Magento\Framework\MessageQueue\LockInterface;
13+
use Magento\Framework\Model\ResourceModel\Db\AbstractDb;
14+
use Magento\Framework\Model\ResourceModel\Db\Context;
15+
use Magento\MessageQueue\Model\LockFactory;
1016

1117
/**
1218
* Class Lock to handle database lock table db transactions.
1319
*/
14-
class Lock extends \Magento\Framework\Model\ResourceModel\Db\AbstractDb implements ReaderInterface, WriterInterface
20+
class Lock extends AbstractDb implements ReaderInterface, WriterInterface
1521
{
1622
/**#@+
1723
* Constants
1824
*/
19-
const QUEUE_LOCK_TABLE = 'queue_lock';
25+
public const QUEUE_LOCK_TABLE = 'queue_lock';
2026
/**#@-*/
2127

2228
/**#@-*/
2329
private $dateTime;
2430

2531
/**
26-
* @var \Magento\MessageQueue\Model\LockFactory
32+
* @var LockFactory
2733
*/
2834
private $lockFactory;
2935

3036
/**
31-
* @var integer
37+
* @var int
3238
*/
3339
private $interval;
3440

3541
/**
3642
* Initialize dependencies.
3743
*
38-
* @param \Magento\Framework\Model\ResourceModel\Db\Context $context
44+
* @param Context $context
3945
* @param \Magento\Framework\Stdlib\DateTime\DateTime $dateTime
40-
* @param \Magento\MessageQueue\Model\LockFactory $lockFactory
41-
* @param null $connectionName
42-
* @param integer $interval
46+
* @param LockFactory $lockFactory
47+
* @param ?string $connectionName
48+
* @param int $interval
4349
*/
4450
public function __construct(
45-
\Magento\Framework\Model\ResourceModel\Db\Context $context,
51+
Context $context,
4652
\Magento\Framework\Stdlib\DateTime\DateTime $dateTime,
47-
\Magento\MessageQueue\Model\LockFactory $lockFactory,
53+
LockFactory $lockFactory,
4854
$connectionName = null,
4955
$interval = 86400
5056
) {
@@ -55,17 +61,17 @@ public function __construct(
5561
}
5662

5763
/**
58-
* {@inheritDoc}
64+
* @inheritdoc
5965
*/
6066
protected function _construct()
6167
{
6268
$this->_init(self::QUEUE_LOCK_TABLE, 'id');
6369
}
6470

6571
/**
66-
* {@inheritDoc}
72+
* @inheritdoc
6773
*/
68-
public function read(\Magento\Framework\MessageQueue\LockInterface $lock, $code)
74+
public function read(LockInterface $lock, $code)
6975
{
7076
$object = $this->lockFactory->create();
7177
$object->load($code, 'message_code');
@@ -75,23 +81,25 @@ public function read(\Magento\Framework\MessageQueue\LockInterface $lock, $code)
7581
}
7682

7783
/**
78-
* {@inheritDoc}
84+
* @inheritdoc
7985
*/
80-
public function saveLock(\Magento\Framework\MessageQueue\LockInterface $lock)
86+
public function saveLock(LockInterface $lock)
8187
{
8288
$object = $this->lockFactory->create();
8389
$object->setMessageCode($lock->getMessageCode());
8490
$object->setCreatedAt($this->dateTime->gmtTimestamp());
8591
$object->save();
92+
$lock->setId($object->getId());
93+
$lock->setCreatedAt($object->getCreatedAt());
8694
}
8795

8896
/**
89-
* {@inheritDoc}
97+
* @inheritdoc
9098
*/
9199
public function releaseOutdatedLocks()
92100
{
93-
$date = (new \DateTime())->setTimestamp($this->dateTime->gmtTimestamp());
94-
$date->add(new \DateInterval('PT' . $this->interval . 'S'));
101+
$date = (new DateTime())->setTimestamp($this->dateTime->gmtTimestamp());
102+
$date->add(new DateInterval('PT' . $this->interval . 'S'));
95103
$this->getConnection()->delete($this->getTable(self::QUEUE_LOCK_TABLE), ['created_at <= ?' => $date]);
96104
}
97105
}

dev/tests/integration/_files/Magento/TestModuleMysqlMq/Model/Processor.php

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
*/
66
namespace Magento\TestModuleMysqlMq\Model;
77

8+
use LogicException;
9+
use Magento\Framework\MessageQueue\ConnectionLostException;
10+
811
/**
912
* Test message processor is used by \Magento\MysqlMq\Model\PublisherConsumerTest
1013
*/
1114
class Processor
1215
{
1316
/**
14-
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
17+
* @param DataObject $message
1518
*/
1619
public function processMessage($message)
1720
{
@@ -23,7 +26,7 @@ public function processMessage($message)
2326
}
2427

2528
/**
26-
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
29+
* @param DataObject $message
2730
*/
2831
public function processObjectCreated($message)
2932
{
@@ -35,7 +38,7 @@ public function processObjectCreated($message)
3538
}
3639

3740
/**
38-
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
41+
* @param DataObject $message
3942
*/
4043
public function processCustomObjectCreated($message)
4144
{
@@ -47,7 +50,7 @@ public function processCustomObjectCreated($message)
4750
}
4851

4952
/**
50-
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
53+
* @param DataObject $message
5154
*/
5255
public function processObjectUpdated($message)
5356
{
@@ -59,13 +62,23 @@ public function processObjectUpdated($message)
5962
}
6063

6164
/**
62-
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
65+
* @param DataObject $message
6366
*/
6467
public function processMessageWithException($message)
6568
{
6669
file_put_contents($message->getOutputPath(), "Exception processing {$message->getEntityId()}");
67-
throw new \LogicException(
70+
throw new LogicException(
6871
"Exception during message processing happened. Entity: {{$message->getEntityId()}}"
6972
);
7073
}
74+
75+
/**
76+
* @throws ConnectionLostException
77+
*/
78+
public function processMessageWithConnectionException()
79+
{
80+
throw new ConnectionLostException(
81+
"Connection exception during message processing happened."
82+
);
83+
}
7184
}

dev/tests/integration/_files/Magento/TestModuleMysqlMq/etc/communication.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
-->
88
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
99
<topic name="demo.exception" request="Magento\TestModuleMysqlMq\Model\DataObject"/>
10+
<topic name="demo.connection.exception" request="Magento\TestModuleMysqlMq\Model\DataObject"/>
1011
<topic name="test.schema.defined.by.method" schema="Magento\TestModuleMysqlMq\Model\DataObjectRepository::delayedOperation" is_synchronous="false"/>
1112
<topic name="demo.object.created" request="Magento\TestModuleMysqlMq\Model\DataObject"/>
1213
<topic name="demo.object.updated" request="Magento\TestModuleMysqlMq\Model\DataObject"/>

dev/tests/integration/_files/Magento/TestModuleMysqlMq/etc/queue.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
<broker topic="demo.exception" type="db" exchange="magento">
1010
<queue consumer="demoConsumerWithException" name="queue-exception" handler="Magento\TestModuleMysqlMq\Model\Processor::processMessageWithException"/>
1111
</broker>
12+
<broker topic="demo.connection.exception" type="db" exchange="magento">
13+
<queue consumer="demoConsumerWithConnectionException" name="queue-connection-exception" handler="Magento\TestModuleMysqlMq\Model\Processor::processMessageWithConnectionException"/>
14+
</broker>
1215
<broker topic="test.schema.defined.by.method" type="db" exchange="magento">
1316
<queue consumer="delayedOperationConsumer" name="demo-queue-6" handler="Magento\TestModuleMysqlMq\Model\DataObjectRepository::delayedOperation"/>
1417
</broker>

dev/tests/integration/_files/Magento/TestModuleMysqlMq/etc/queue_consumer.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@
1010
<consumer name="demoConsumerQueueTwo" queue="queue-updated" connection="db" handler="Magento\TestModuleMysqlMq\Model\Processor::processObjectUpdated"/>
1111
<consumer name="demoConsumerQueueThree" queue="queue-custom-created" connection="db" handler="Magento\TestModuleMysqlMq\Model\Processor::processCustomObjectCreated"/>
1212
<consumer name="demoConsumerWithException" queue="queue-exception" connection="db" handler="Magento\TestModuleMysqlMq\Model\Processor::processMessageWithException"/>
13+
<consumer name="demoConsumerWithConnectionException" queue="queue-connection-exception" connection="db" handler="Magento\TestModuleMysqlMq\Model\Processor::processMessageWithConnectionException"/>
1314
<consumer name="delayedOperationConsumer" queue="demo-queue-6" connection="db" handler="Magento\TestModuleMysqlMq\Model\DataObjectRepository::delayedOperation"/>
1415
</config>

dev/tests/integration/_files/Magento/TestModuleMysqlMq/etc/queue_publisher.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
<publisher topic="demo.exception">
1010
<connection name="db" exchange="magento"/>
1111
</publisher>
12+
<publisher topic="demo.connection.exception">
13+
<connection name="db" exchange="magento"/>
14+
</publisher>
1215
<publisher topic="test.schema.defined.by.method">
1316
<connection name="db" exchange="magento"/>
1417
</publisher>

dev/tests/integration/_files/Magento/TestModuleMysqlMq/etc/queue_topology.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
<exchange name="magento" type="topic" connection="db">
1111
<binding id="demo.exception.consumer" topic="demo.exception" destination="queue-exception" destinationType="queue"/>
12+
<binding id="demo.connection.exception.consumer" topic="demo.connection.exception" destination="queue-connection-exception" destinationType="queue"/>
1213
<binding id="test.schema.defined.by.method" topic="test.schema.defined.by.method" destination="demo-queue-6" destinationType="queue"/>
1314
<binding id="demo.object.created" topic="demo.object.created" destination="queue-created" destinationType="queue"/>
1415
<binding id="demo.object.updated" topic="demo.object.updated" destination="queue-updated" destinationType="queue"/>
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Magento\MessageQueue\Model;
9+
10+
use Magento\Framework\MessageQueue\Consumer;
11+
use Magento\Framework\MessageQueue\ConsumerFactory;
12+
use Magento\Framework\MessageQueue\EnvelopeFactory;
13+
use Magento\Framework\MessageQueue\QueueInterface;
14+
use Magento\MysqlMq\Model\QueueManagement;
15+
use Magento\MysqlMq\Model\ResourceModel\Queue;
16+
use Magento\TestFramework\ObjectManager;
17+
use PHPUnit\Framework\TestCase;
18+
19+
/**
20+
* Tests the different cases of consumers running by Consumer processor
21+
*/
22+
class ConsumerTest extends TestCase
23+
{
24+
/**
25+
* @var ObjectManager
26+
*/
27+
private $objectManager;
28+
29+
/**
30+
* @var Consumer
31+
*/
32+
private $model;
33+
34+
/**
35+
* @var Queue
36+
*/
37+
private $queueResource;
38+
39+
/**
40+
* @inheritdoc
41+
*/
42+
protected function setUp(): void
43+
{
44+
$this->objectManager = ObjectManager::getInstance();
45+
/** @var ConsumerFactory $factory */
46+
$factory = $this->objectManager->get(ConsumerFactory::class);
47+
$this->model = $factory->get('demoConsumerWithConnectionException');
48+
$this->queueResource = $this->objectManager->get(Queue::class);
49+
}
50+
51+
/**
52+
* Test if after connection exception and retry
53+
* message doesn't have success status but still has status in progress
54+
*
55+
* @return void
56+
*/
57+
public function testRunWithException(): void
58+
{
59+
/** @var EnvelopeFactory $envelopFactory */
60+
$envelopFactory = $this->objectManager->get(EnvelopeFactory::class);
61+
$messageBody = '{"name":"test"}';
62+
$topicName = 'demo.connection.exception';
63+
$queueName = 'queue-connection-exception';
64+
$envelope = $envelopFactory->create(['body' => $messageBody, 'properties' => ['topic_name' => $topicName]]);
65+
/** @var QueueInterface $queue */
66+
$queue = $this->objectManager->create(
67+
\Magento\MysqlMq\Model\Driver\Queue::class,
68+
['queueName' => $queueName]
69+
);
70+
$queue->push($envelope);
71+
$messages = $this->queueResource->getMessages($queueName, 1);
72+
$envelope = $envelopFactory->create(['body' => $messageBody, 'properties' => $messages[0]]);
73+
$this->model->process(1);
74+
$queue->reject($envelope);
75+
$this->model->process(1);
76+
$message = $this->getLastMessage($queueName);
77+
$this->assertEquals(QueueManagement::MESSAGE_STATUS_IN_PROGRESS, $message['status']);
78+
}
79+
80+
/**
81+
* Return last message by queue name
82+
*
83+
* @param string $queueName
84+
* @return array
85+
*/
86+
private function getLastMessage(string $queueName)
87+
{
88+
$connection = $this->queueResource->getConnection();
89+
$select = $connection->select()
90+
->from(
91+
['queue_message' => $this->queueResource->getTable('queue_message')],
92+
[]
93+
)->join(
94+
['queue_message_status' => $this->queueResource->getTable('queue_message_status')],
95+
'queue_message.id = queue_message_status.message_id',
96+
[
97+
QueueManagement::MESSAGE_QUEUE_RELATION_ID => 'id',
98+
QueueManagement::MESSAGE_STATUS => 'status',
99+
]
100+
)->join(
101+
['queue' => $this->queueResource->getTable('queue')],
102+
'queue.id = queue_message_status.queue_id',
103+
[QueueManagement::MESSAGE_QUEUE_NAME => 'name']
104+
)->where('queue.name = ?', $queueName)
105+
->order(['queue_message_status.id DESC']);
106+
107+
return $connection->fetchRow($select);
108+
}
109+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Magento\MessageQueue\Model\ResourceModel;
9+
10+
use Magento\Framework\MessageQueue\LockInterface;
11+
use Magento\TestFramework\ObjectManager;
12+
use PHPUnit\Framework\TestCase;
13+
14+
/**
15+
* Covers Lock resource model test cases
16+
*/
17+
class LockTest extends TestCase
18+
{
19+
public function testSaveLock()
20+
{
21+
$objectManager = ObjectManager::getInstance();
22+
/** @var Lock $resourceModel */
23+
$resourceModel = $objectManager->get(Lock::class);
24+
$lock = $objectManager->create(LockInterface::class);
25+
$resourceModel->saveLock($lock);
26+
self::assertNotEquals(null, $lock->getId());
27+
}
28+
}

0 commit comments

Comments
 (0)