Skip to content

Commit ceec254

Browse files
author
Admin
committed
1 parent de4ac8c commit ceec254

File tree

5 files changed

+71
-15
lines changed

5 files changed

+71
-15
lines changed

illuminate/Bus/UniqueLock.php

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
namespace Illuminate\Bus;
44

55
use Illuminate\Contracts\Cache\Repository as Cache;
6+
use Illuminate\Contracts\Queue\ShouldBeUnique;
7+
use Illuminate\Queue\SerializesModels;
68

79
class UniqueLock
810
{
@@ -28,10 +30,17 @@ public function __construct(Cache $cache)
2830
* Attempt to acquire a lock for the given job.
2931
*
3032
* @param mixed $job
31-
* @return bool
33+
* @throws \RuntimeException
3234
*/
33-
public function acquire($job)
35+
public function acquire($job): bool
3436
{
37+
if (
38+
$job instanceof ShouldBeUnique
39+
&& \in_array(SerializesModels::class, \class_uses_recursive($job::class), true)
40+
) {
41+
throw new \RuntimeException('ShouldBeUnique not supported in combination with SerializesModels');
42+
}
43+
3544
$uniqueFor = method_exists($job, 'uniqueFor')
3645
? $job->uniqueFor()
3746
: ($job->uniqueFor ?? 0);
@@ -49,7 +58,7 @@ public function acquire($job)
4958
* @param mixed $job
5059
* @return void
5160
*/
52-
public function release($job)
61+
public function release($job): void
5362
{
5463
$cache = method_exists($job, 'uniqueVia')
5564
? $job->uniqueVia()
@@ -62,9 +71,8 @@ public function release($job)
6271
* Generate the lock key for the given job.
6372
*
6473
* @param mixed $job
65-
* @return string
6674
*/
67-
protected function getKey($job)
75+
protected function getKey($job): string
6876
{
6977
$uniqueId = method_exists($job, 'uniqueId')
7078
? $job->uniqueId()

illuminate/Pipeline/Pipeline.php

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ class Pipeline implements PipelineContract
4141
*/
4242
protected $method = 'handle';
4343

44+
/**
45+
* The final callback to be executed after the pipeline ends regardless of the outcome.
46+
*
47+
* @var Closure|null
48+
*/
49+
protected $finally;
50+
4451
/**
4552
* Create a new class instance.
4653
*
@@ -118,7 +125,13 @@ public function then(Closure $destination)
118125
$this->prepareDestination($destination)
119126
);
120127

121-
return $pipeline($this->passable);
128+
try {
129+
return $pipeline($this->passable);
130+
} finally {
131+
if ($this->finally instanceof Closure) {
132+
($this->finally)($this->passable);
133+
}
134+
}
122135
}
123136

124137
/**
@@ -133,6 +146,16 @@ public function thenReturn()
133146
});
134147
}
135148

149+
/**
150+
* Set a final callback to be executed after the pipeline ends regardless of the outcome.
151+
*/
152+
public function finally(Closure $callback): static
153+
{
154+
$this->finally = $callback;
155+
156+
return $this;
157+
}
158+
136159
/**
137160
* Get the final piece of the Closure onion.
138161
*

illuminate/Queue/CallQueuedHandler.php

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,6 @@ public function call(Job $job, array $data)
6666
return;
6767
}
6868

69-
if ($command instanceof ShouldBeUniqueUntilProcessing) {
70-
$this->ensureUniqueJobLockIsReleased($command);
71-
}
72-
7369
$this->dispatchThroughMiddleware($job, $command);
7470

7571
if (!$job->isReleased() && !$command instanceof ShouldBeUniqueUntilProcessing) {
@@ -113,21 +109,40 @@ protected function getCommand(array $data)
113109
* @param \Illuminate\Contracts\Queue\Job $job
114110
* @param mixed $command
115111
* @return mixed
112+
* @throws Exception
116113
*/
117114
protected function dispatchThroughMiddleware(Job $job, $command)
118115
{
119116
if ($command instanceof \__PHP_Incomplete_Class) {
120117
throw new Exception('Job is incomplete class: ' . json_encode($command));
121118
}
122119

120+
$lockReleased = false;
121+
123122
return (new Pipeline($this->container))->send($command)
124123
->through(
125124
array_merge(
126125
method_exists($command, 'middleware') ? $command->middleware() : [],
127126
$command->middleware ?? []
128127
)
129128
)
130-
->then(function ($command) use ($job) {
129+
->finally(function ($command) use (&$lockReleased): void {
130+
if (
131+
!$lockReleased
132+
&& $command instanceof ShouldBeUniqueUntilProcessing
133+
&& ($command->job ?? null) instanceof Job
134+
&& !$command->job->isReleased()
135+
) {
136+
$this->ensureUniqueJobLockIsReleased($command);
137+
}
138+
})
139+
->then(function ($command) use ($job, &$lockReleased): mixed {
140+
if ($command instanceof ShouldBeUniqueUntilProcessing) {
141+
$this->ensureUniqueJobLockIsReleased($command);
142+
143+
$lockReleased = true;
144+
}
145+
131146
return $this->dispatcher->dispatchNow(
132147
$command,
133148
$this->resolveHandler($job, $command)
@@ -252,12 +267,16 @@ protected function handleModelNotFound(Job $job, $e)
252267
* @param array $data
253268
* @param \Throwable|null $e
254269
* @param string $uuid
255-
* @return void
270+
* @param \Illuminate\Contracts\Queue\Job|null $job
256271
*/
257-
public function failed(array $data, $e, string $uuid)
272+
public function failed(array $data, $e, string $uuid, ?Job $job = null): void
258273
{
259274
$command = $this->getCommand($data);
260275

276+
if ($job instanceof Job) {
277+
$command = $this->setJobInstanceIfNecessary($job, $command);
278+
}
279+
261280
if (!$command instanceof ShouldBeUniqueUntilProcessing) {
262281
$this->ensureUniqueJobLockIsReleased($command);
263282
}

illuminate/Queue/Jobs/Job.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ protected function failed($e)
239239
[$class, $method] = JobName::parse($payload['job']);
240240

241241
if (method_exists($this->instance = $this->resolve($class), 'failed')) {
242-
$this->instance->failed($payload['data'], $e, $payload['uuid'] ?? '');
242+
$this->instance->failed($payload['data'], $e, $payload['uuid'] ?? '', $this);
243243
}
244244
}
245245

src/Bus/PendingDispatch.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Illuminate\Contracts\Bus\Dispatcher;
77
use Illuminate\Contracts\Cache\Repository as Cache;
88
use Illuminate\Contracts\Queue\ShouldBeUnique;
9+
use Illuminate\Queue\SerializesModels;
910

1011
class PendingDispatch
1112
{
@@ -57,13 +58,18 @@ public function onQueue($queue)
5758
* Determine if the job should be dispatched.
5859
*
5960
* @return bool
61+
* @throws \RuntimeException
6062
*/
61-
protected function shouldDispatch()
63+
protected function shouldDispatch(): bool
6264
{
6365
if (!$this->job instanceof ShouldBeUnique) {
6466
return true;
6567
}
6668

69+
if (\in_array(SerializesModels::class, \class_uses_recursive($this->job::class), true)) {
70+
throw new \RuntimeException('ShouldBeUnique not supported in combination with SerializesModels');
71+
}
72+
6773
$uniqueId = method_exists($this->job, 'uniqueId')
6874
? $this->job->uniqueId()
6975
: ($this->job->uniqueId ?? '');

0 commit comments

Comments
 (0)