99 */
1010namespace Magento \Cron \Observer ;
1111
12+ use Magento \Cron \Model \ResourceModel \Schedule \Collection as ScheduleCollection ;
1213use Magento \Cron \Model \Schedule ;
1314use Magento \Framework \App \State ;
1415use Magento \Framework \Console \Cli ;
@@ -83,7 +84,7 @@ class ProcessCronQueueObserver implements ObserverInterface
8384 const MAX_RETRIES = 5 ;
8485
8586 /**
86- * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
87+ * @var ScheduleCollection
8788 */
8889 protected $ _pendingSchedules ;
8990
@@ -278,12 +279,12 @@ function ($groupId) use ($currentTime) {
278279 *
279280 * It should be taken by standalone (child) process, not by the parent process.
280281 *
281- * @param int $groupId
282+ * @param string $groupId
282283 * @param callable $callback
283284 *
284285 * @return void
285286 */
286- private function lockGroup ($ groupId , callable $ callback )
287+ private function lockGroup (string $ groupId , callable $ callback ): void
287288 {
288289 if (!$ this ->lockManager ->lock (self ::LOCK_PREFIX . $ groupId , self ::LOCK_TIMEOUT )) {
289290 $ this ->logger ->warning (
@@ -399,7 +400,7 @@ function () use ($schedule) {
399400 * @param string $jobName
400401 * @return void
401402 */
402- private function startProfiling (string $ jobName = '' )
403+ private function startProfiling (string $ jobName = '' ): void
403404 {
404405 $ this ->statProfiler ->clear ();
405406 $ this ->statProfiler ->start (
@@ -416,7 +417,7 @@ private function startProfiling(string $jobName = '')
416417 * @param string $jobName
417418 * @return void
418419 */
419- private function stopProfiling (string $ jobName = '' )
420+ private function stopProfiling (string $ jobName = '' ): void
420421 {
421422 $ this ->statProfiler ->stop (
422423 sprintf (self ::CRON_TIMERID , $ jobName ),
@@ -445,9 +446,9 @@ private function getProfilingStat(string $jobName): string
445446 * Return job collection from data base with status 'pending'.
446447 *
447448 * @param string $groupId
448- * @return \Magento\Cron\Model\ResourceModel\Schedule\Collection
449+ * @return ScheduleCollection
449450 */
450- private function getPendingSchedules ($ groupId )
451+ private function getPendingSchedules (string $ groupId ): ScheduleCollection
451452 {
452453 $ jobs = $ this ->_config ->getJobs ();
453454 $ pendingJobs = $ this ->_scheduleFactory ->create ()->getCollection ();
@@ -462,7 +463,7 @@ private function getPendingSchedules($groupId)
462463 * @param string $groupId
463464 * @return $this
464465 */
465- private function generateSchedules ($ groupId )
466+ private function generateSchedules (string $ groupId ): self
466467 {
467468 /**
468469 * check if schedule generation is needed
@@ -533,13 +534,13 @@ protected function _generateJobs($jobs, $exists, $groupId)
533534 * @param int $currentTime
534535 * @return void
535536 */
536- private function cleanupJobs ($ groupId , $ currentTime )
537+ private function cleanupJobs (string $ groupId , int $ currentTime ): void
537538 {
538539 // check if history cleanup is needed
539540 $ lastCleanup = (int )$ this ->_cache ->load (self ::CACHE_KEY_LAST_HISTORY_CLEANUP_AT . $ groupId );
540541 $ historyCleanUp = (int )$ this ->getCronGroupConfigurationValue ($ groupId , self ::XML_PATH_HISTORY_CLEANUP_EVERY );
541542 if ($ lastCleanup > $ this ->dateTime ->gmtTimestamp () - $ historyCleanUp * self ::SECONDS_IN_MINUTE ) {
542- return $ this ;
543+ return ;
543544 }
544545 // save time history cleanup was ran with no expiration
545546 $ this ->_cache ->save (
@@ -550,6 +551,7 @@ private function cleanupJobs($groupId, $currentTime)
550551 );
551552
552553 $ this ->cleanupDisabledJobs ($ groupId );
554+ $ this ->cleanupRunningJobs ($ groupId );
553555
554556 $ historySuccess = (int )$ this ->getCronGroupConfigurationValue ($ groupId , self ::XML_PATH_HISTORY_SUCCESS );
555557 $ historyFailure = (int )$ this ->getCronGroupConfigurationValue ($ groupId , self ::XML_PATH_HISTORY_FAILURE );
@@ -673,7 +675,7 @@ protected function getScheduleTimeInterval($groupId)
673675 * @param string $groupId
674676 * @return void
675677 */
676- private function cleanupDisabledJobs ($ groupId )
678+ private function cleanupDisabledJobs (string $ groupId ): void
677679 {
678680 $ jobs = $ this ->_config ->getJobs ();
679681 $ jobsToCleanup = [];
@@ -696,6 +698,33 @@ private function cleanupDisabledJobs($groupId)
696698 }
697699 }
698700
701+ /**
702+ * Cleanup jobs that were left in a running state due to an unexpected stop
703+ *
704+ * @param string $groupId
705+ * @return void
706+ */
707+ private function cleanupRunningJobs (string $ groupId ): void
708+ {
709+ $ scheduleResource = $ this ->_scheduleFactory ->create ()->getResource ();
710+ $ connection = $ scheduleResource ->getConnection ();
711+
712+ $ jobs = $ this ->_config ->getJobs ();
713+
714+ $ connection ->update (
715+ $ scheduleResource ->getTable ('cron_schedule ' ),
716+ [
717+ 'status ' => \Magento \Cron \Model \Schedule::STATUS_ERROR ,
718+ 'messages ' => 'Time out '
719+ ],
720+ [
721+ $ connection ->quoteInto ('status = ? ' , \Magento \Cron \Model \Schedule::STATUS_RUNNING ),
722+ $ connection ->quoteInto ('job_code IN (?) ' , array_keys ($ jobs [$ groupId ])),
723+ 'scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY '
724+ ]
725+ );
726+ }
727+
699728 /**
700729 * Get cron expression of cron job.
701730 *
@@ -773,13 +802,13 @@ private function isGroupInFilter($groupId): bool
773802 * @param array $jobsRoot
774803 * @param int $currentTime
775804 */
776- private function processPendingJobs ($ groupId , $ jobsRoot , $ currentTime )
805+ private function processPendingJobs (string $ groupId , array $ jobsRoot , int $ currentTime ): void
777806 {
778- $ procesedJobs = [];
807+ $ processedJobs = [];
779808 $ pendingJobs = $ this ->getPendingSchedules ($ groupId );
780809 /** @var Schedule $schedule */
781810 foreach ($ pendingJobs as $ schedule ) {
782- if (isset ($ procesedJobs [$ schedule ->getJobCode ()])) {
811+ if (isset ($ processedJobs [$ schedule ->getJobCode ()])) {
783812 // process only on job per run
784813 continue ;
785814 }
@@ -796,7 +825,7 @@ private function processPendingJobs($groupId, $jobsRoot, $currentTime)
796825 $ this ->tryRunJob ($ scheduledTime , $ currentTime , $ jobConfig , $ schedule , $ groupId );
797826
798827 if ($ schedule ->getStatus () === Schedule::STATUS_SUCCESS ) {
799- $ procesedJobs [$ schedule ->getJobCode ()] = true ;
828+ $ processedJobs [$ schedule ->getJobCode ()] = true ;
800829 }
801830
802831 $ this ->retrier ->execute (
@@ -821,7 +850,7 @@ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule,
821850 {
822851 // use sha1 to limit length
823852 // phpcs:ignore Magento2.Security.InsecureFunction
824- $ lockName = self ::LOCK_PREFIX . md5 ($ groupId . '_ ' . $ schedule ->getJobCode ());
853+ $ lockName = self ::LOCK_PREFIX . md5 ($ groupId . '_ ' . $ schedule ->getJobCode ());
825854
826855 try {
827856 for ($ retries = self ::MAX_RETRIES ; $ retries > 0 ; $ retries --) {
0 commit comments