Skip to content

Commit ff7a43d

Browse files
Merge remote-tracking branch 'magento2/2.4-develop' into unlock-admin-user
2 parents 6c92add + 6b196b5 commit ff7a43d

File tree

630 files changed

+46901
-48270
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

630 files changed

+46901
-48270
lines changed

app/code/Magento/Analytics/Test/Unit/ReportXml/SelectHydratorTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public function testRecreateWithExpression(
198198
->willReturn($this->selectMock);
199199
$withArgs = [];
200200

201-
foreach (array_keys($selectParts) as $key => $partName) {
201+
foreach (array_keys($selectParts) as $partName) {
202202
$withArgs[] = [$partName, $expectedParts[$partName]];
203203
}
204204
$this->selectMock
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Magento\AsynchronousOperations\Cron;
9+
10+
use Magento\AsynchronousOperations\Api\Data\OperationInterface;
11+
use Magento\AsynchronousOperations\Model\ResourceModel\Operation;
12+
use Magento\Framework\Stdlib\DateTime\DateTime;
13+
14+
/**
15+
* Marks incomplete operations as failed
16+
*/
17+
class MarkIncompleteOperationsAsFailed
18+
{
19+
/**
20+
* Default message maximum processing time. Default to 12h
21+
*/
22+
private const DEFAULT_MESSAGE_MAX_PROCESSING_TIME = 43200;
23+
24+
/**
25+
* Default error code
26+
*/
27+
private const ERROR_CODE = 0;
28+
29+
/**
30+
* Default error message
31+
*/
32+
private const ERROR_MESSAGE = 'Unknown Error';
33+
34+
/**
35+
* @var Operation
36+
*/
37+
private $resource;
38+
39+
/**
40+
* @var DateTime
41+
*/
42+
private $dateTime;
43+
44+
/**
45+
* @var int
46+
*/
47+
private $messageMaxProcessingTime;
48+
49+
/**
50+
* @var string
51+
*/
52+
private $errorMessage;
53+
54+
/**
55+
* @var int
56+
*/
57+
private $errorCode;
58+
59+
/**
60+
* @var int
61+
*/
62+
private $failedStatus;
63+
64+
/**
65+
* @param Operation $resource
66+
* @param DateTime $dateTime
67+
* @param int $messageMaxProcessingTime
68+
* @param int $failedStatus
69+
* @param int $errorCode
70+
* @param string $errorMessage
71+
*/
72+
public function __construct(
73+
Operation $resource,
74+
DateTime $dateTime,
75+
int $messageMaxProcessingTime = self::DEFAULT_MESSAGE_MAX_PROCESSING_TIME,
76+
int $failedStatus = OperationInterface::STATUS_TYPE_RETRIABLY_FAILED,
77+
int $errorCode = self::ERROR_CODE,
78+
string $errorMessage = self::ERROR_MESSAGE
79+
) {
80+
$this->resource = $resource;
81+
$this->dateTime = $dateTime;
82+
$this->messageMaxProcessingTime = $messageMaxProcessingTime;
83+
$this->errorMessage = $errorMessage;
84+
$this->errorCode = $errorCode;
85+
$this->failedStatus = $failedStatus;
86+
}
87+
88+
/**
89+
* Marks incomplete operations as failed
90+
*/
91+
public function execute(): void
92+
{
93+
$connection = $this->resource->getConnection();
94+
$now = $this->dateTime->gmtTimestamp();
95+
$idField = $this->resource->getIdFieldName();
96+
$select = $connection->select()
97+
->from($this->resource->getMainTable(), [$idField])
98+
->where('status = ?', OperationInterface::STATUS_TYPE_OPEN)
99+
->where('started_at <= ?', $connection->formatDate($now - $this->messageMaxProcessingTime));
100+
101+
foreach ($connection->fetchCol($select) as $id) {
102+
$connection->update(
103+
$this->resource->getMainTable(),
104+
[
105+
'status' => $this->failedStatus,
106+
'result_message' => $this->errorMessage,
107+
'error_code' => $this->errorCode,
108+
],
109+
[
110+
"$idField = ?" => (int) $id
111+
]
112+
);
113+
}
114+
}
115+
}

app/code/Magento/AsynchronousOperations/Model/BulkManagement.php

Lines changed: 62 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77

88
namespace Magento\AsynchronousOperations\Model;
99

10+
use Exception;
1011
use Magento\AsynchronousOperations\Api\Data\BulkSummaryInterface;
1112
use Magento\AsynchronousOperations\Api\Data\BulkSummaryInterfaceFactory;
1213
use Magento\AsynchronousOperations\Api\Data\OperationInterface;
14+
use Magento\AsynchronousOperations\Model\ResourceModel\Operation\Collection;
1315
use Magento\AsynchronousOperations\Model\ResourceModel\Operation\CollectionFactory;
1416
use Magento\Authorization\Model\UserContextInterface;
1517
use Magento\Framework\App\ResourceConnection;
@@ -21,6 +23,8 @@
2123

2224
/**
2325
* Asynchronous Bulk Management
26+
*
27+
* @SuppressWarnings(PHPMD.CouplingBetweenObjects)
2428
*/
2529
class BulkManagement implements BulkManagementInterface
2630
{
@@ -109,7 +113,7 @@ public function scheduleBulk($bulkUuid, array $operations, $description, $userId
109113
$userType = UserContextInterface::USER_TYPE_ADMIN;
110114
}
111115
try {
112-
/** @var \Magento\AsynchronousOperations\Api\Data\BulkSummaryInterface $bulkSummary */
116+
/** @var BulkSummaryInterface $bulkSummary */
113117
$bulkSummary = $this->bulkSummaryFactory->create();
114118
$this->entityManager->load($bulkSummary, $bulkUuid);
115119
$bulkSummary->setBulkId($bulkUuid);
@@ -122,7 +126,7 @@ public function scheduleBulk($bulkUuid, array $operations, $description, $userId
122126
$this->publishOperations($operations);
123127

124128
$connection->commit();
125-
} catch (\Exception $exception) {
129+
} catch (Exception $exception) {
126130
$connection->rollBack();
127131
$this->logger->critical($exception->getMessage());
128132
return false;
@@ -140,57 +144,69 @@ public function scheduleBulk($bulkUuid, array $operations, $description, $userId
140144
*/
141145
public function retryBulk($bulkUuid, array $errorCodes)
142146
{
143-
$metadata = $this->metadataPool->getMetadata(BulkSummaryInterface::class);
144-
145-
$connection = $this->resourceConnection->getConnectionByName($metadata->getEntityConnectionName());
146-
/** @var \Magento\AsynchronousOperations\Model\ResourceModel\Operation[] $retriablyFailedOperations */
147-
$retriablyFailedOperations = $this->operationCollectionFactory->create()
148-
->addFieldToFilter('error_code', ['in' => $errorCodes])
149-
->addFieldToFilter('bulk_uuid', ['eq' => $bulkUuid])
147+
/** @var Collection $collection */
148+
$collection = $this->operationCollectionFactory->create();
149+
/** @var Operation[] $retriablyFailedOperations */
150+
$retriablyFailedOperations = $collection
151+
->addFieldToFilter(OperationInterface::BULK_ID, ['eq' => $bulkUuid])
152+
->addFieldToFilter(OperationInterface::ERROR_CODE, ['in' => $errorCodes])
150153
->getItems();
151-
152-
// remove corresponding operations from database (i.e. move them to 'open' status)
153-
$connection->beginTransaction();
154-
try {
155-
$operationIds = [];
156-
$currentBatchSize = 0;
157-
$maxBatchSize = 10000;
158-
/** @var OperationInterface $operation */
154+
$affectedOperations = count($retriablyFailedOperations);
155+
if ($retriablyFailedOperations) {
156+
$operation = reset($retriablyFailedOperations);
157+
//async consumer expects operations to be in the database
158+
// thus such operation should not be deleted but reopened
159+
$shouldReopen = strpos($operation->getTopicName(), ConfigInterface::TOPIC_PREFIX) === 0;
160+
$metadata = $this->metadataPool->getMetadata(OperationInterface::class);
161+
$linkField = $metadata->getLinkField();
162+
$ids = [];
159163
foreach ($retriablyFailedOperations as $operation) {
160-
if ($currentBatchSize === $maxBatchSize) {
161-
$whereCondition = $connection->quoteInto('operation_key IN (?)', $operationIds)
162-
. " AND "
163-
. $connection->quoteInto('bulk_uuid = ?', $bulkUuid);
164-
$connection->delete(
165-
$this->resourceConnection->getTableName('magento_operation'),
166-
$whereCondition
167-
);
168-
$operationIds = [];
169-
$currentBatchSize = 0;
170-
}
171-
$currentBatchSize++;
172-
$operationIds[] = $operation->getId();
164+
$ids[] = (int) $operation->getData($linkField);
173165
}
174-
// remove operations from the last batch
175-
if (!empty($operationIds)) {
176-
$whereCondition = $connection->quoteInto('operation_key IN (?)', $operationIds)
177-
. " AND "
178-
. $connection->quoteInto('bulk_uuid = ?', $bulkUuid);
179-
$connection->delete(
180-
$this->resourceConnection->getTableName('magento_operation'),
181-
$whereCondition
182-
);
166+
$batchSize = 10000;
167+
$chunks = array_chunk($ids, $batchSize);
168+
$connection = $this->resourceConnection->getConnectionByName($metadata->getEntityConnectionName());
169+
$connection->beginTransaction();
170+
try {
171+
if ($shouldReopen) {
172+
foreach ($chunks as $chunk) {
173+
$connection->update(
174+
$metadata->getEntityTable(),
175+
[
176+
OperationInterface::STATUS => OperationInterface::STATUS_TYPE_OPEN,
177+
OperationInterface::RESULT_SERIALIZED_DATA => null,
178+
OperationInterface::ERROR_CODE => null,
179+
OperationInterface::RESULT_MESSAGE => null,
180+
'started_at' => null,
181+
],
182+
[
183+
$linkField . ' IN (?)' => $chunk,
184+
]
185+
);
186+
}
187+
} else {
188+
foreach ($chunks as $chunk) {
189+
$connection->delete(
190+
$metadata->getEntityTable(),
191+
[
192+
$linkField . ' IN (?)' => $chunk,
193+
]
194+
);
195+
}
196+
}
197+
$connection->commit();
198+
} catch (\Throwable $exception) {
199+
$connection->rollBack();
200+
$this->logger->critical($exception->getMessage());
201+
$affectedOperations = 0;
183202
}
184203

185-
$connection->commit();
186-
} catch (\Exception $exception) {
187-
$connection->rollBack();
188-
$this->logger->critical($exception->getMessage());
189-
return 0;
204+
if ($affectedOperations) {
205+
$this->publishOperations($retriablyFailedOperations);
206+
}
190207
}
191-
$this->publishOperations($retriablyFailedOperations);
192208

193-
return count($retriablyFailedOperations);
209+
return $affectedOperations;
194210
}
195211

196212
/**

app/code/Magento/AsynchronousOperations/Model/MassConsumerEnvelopeCallback.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
namespace Magento\AsynchronousOperations\Model;
1010

11+
use Magento\Framework\App\ObjectManager;
1112
use Magento\Framework\App\ResourceConnection;
1213
use Psr\Log\LoggerInterface;
1314
use Magento\Framework\MessageQueue\MessageLockException;
@@ -55,21 +56,28 @@ class MassConsumerEnvelopeCallback
5556
*/
5657
private $operationProcessor;
5758

59+
/**
60+
* @var MessageControllerDecorator
61+
*/
62+
private $messageControllerDecorator;
63+
5864
/**
5965
* @param ResourceConnection $resource
6066
* @param MessageController $messageController
6167
* @param ConsumerConfigurationInterface $configuration
6268
* @param OperationProcessorFactory $operationProcessorFactory
6369
* @param LoggerInterface $logger
6470
* @param QueueInterface $queue
71+
* @param MessageControllerDecorator|null $messageControllerDecorator
6572
*/
6673
public function __construct(
6774
ResourceConnection $resource,
6875
MessageController $messageController,
6976
ConsumerConfigurationInterface $configuration,
7077
OperationProcessorFactory $operationProcessorFactory,
7178
LoggerInterface $logger,
72-
QueueInterface $queue
79+
QueueInterface $queue,
80+
?MessageControllerDecorator $messageControllerDecorator = null
7381
) {
7482
$this->resource = $resource;
7583
$this->messageController = $messageController;
@@ -81,6 +89,8 @@ public function __construct(
8189
);
8290
$this->logger = $logger;
8391
$this->queue = $queue;
92+
$this->messageControllerDecorator = $messageControllerDecorator
93+
?: ObjectManager::getInstance()->get(MessageControllerDecorator::class);
8494
}
8595

8696
/**
@@ -96,7 +106,7 @@ public function execute(EnvelopeInterface $message)
96106
$lock = null;
97107
try {
98108
$topicName = $message->getProperties()['topic_name'];
99-
$lock = $this->messageController->lock($message, $this->configuration->getConsumerName());
109+
$lock = $this->messageControllerDecorator->lock($message, $this->configuration->getConsumerName());
100110

101111
$allowedTopics = $this->configuration->getTopicNames();
102112
if (in_array($topicName, $allowedTopics)) {

0 commit comments

Comments
 (0)