Skip to content

Commit cf678ba

Browse files
committed
Merge branch 'master' into 0.9
2 parents 7e0c3fa + 5906753 commit cf678ba

File tree

4 files changed

+36
-12
lines changed

4 files changed

+36
-12
lines changed

composer.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@
2727
"enqueue/simple-client": "If you want to use enqueue client and cli commands"
2828
},
2929
"extra": {
30+
"laravel": {
31+
"providers": [
32+
"Enqueue\\LaravelQueue\\EnqueueServiceProvider"
33+
]
34+
},
3035
"branch-alias": {
3136
"dev-master": "0.9.x-dev"
3237
}

src/AmqpQueue.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ public function pop($queue = null)
5454
*/
5555
protected function declareQueue($queue = null)
5656
{
57-
$queue = $this->getQueueInteropContext()->createQueue($this->getQueue($queue));
58-
$queue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE);
57+
$interopQueue = $this->getQueue($queue);
58+
$interopQueue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE);
5959

6060
$this->getQueueInteropContext()->declareQueue($queue);
6161
}

src/Job.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Illuminate\Queue\Jobs\Job as BaseJob;
88
use Interop\Queue\Consumer;
99
use Interop\Queue\Context;
10+
use Interop\Queue\Exception\DeliveryDelayNotSupportedException;
1011
use Interop\Queue\Message;
1112

1213
class Job extends BaseJob implements JobContract
@@ -35,6 +36,11 @@ public function __construct(Container $container, Context $context, Consumer $co
3536
$this->connectionName = $connectionName;
3637
}
3738

39+
public function getJobId()
40+
{
41+
return $this->message->getMessageId();
42+
}
43+
3844
/**
3945
* {@inheritdoc}
4046
*/
@@ -50,16 +56,20 @@ public function delete()
5056
*/
5157
public function release($delay = 0)
5258
{
53-
if ($delay) {
54-
throw new \LogicException('To be implemented');
55-
}
59+
parent::release($delay);
5660

5761
$requeueMessage = clone $this->message;
5862
$requeueMessage->setProperty('x-attempts', $this->attempts() + 1);
63+
64+
$producer = $this->context->createProducer();
5965

60-
$this->context->createProducer()->send($this->consumer->getQueue(), $requeueMessage);
66+
try {
67+
$producer->setDeliveryDelay($this->secondsUntil($delay) * 1000);
68+
} catch (DeliveryDelayNotSupportedException $e) {
69+
}
6170

6271
$this->consumer->acknowledge($this->message);
72+
$producer->send($this->consumer->getQueue(), $requeueMessage);
6373
}
6474

6575
public function getQueue()

src/Queue.php

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Illuminate\Contracts\Queue\Queue as QueueContract;
66
use Illuminate\Queue\Queue as BaseQueue;
77
use Interop\Queue\Context;
8+
use Interop\Amqp\Impl\AmqpMessage;
89

910
class Queue extends BaseQueue implements QueueContract
1011
{
@@ -56,9 +57,15 @@ public function push($job, $data = '', $queue = null)
5657
*/
5758
public function pushRaw($payload, $queue = null, array $options = [])
5859
{
59-
$this->context->createProducer()->send(
60+
$message = $this->context->createMessage($payload);
61+
62+
if ($message instanceof AmqpMessage) {
63+
$message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT);
64+
}
65+
66+
return $this->context->createProducer()->send(
6067
$this->getQueue($queue),
61-
$this->context->createMessage($payload)
68+
$message
6269
);
6370
}
6471

@@ -69,11 +76,13 @@ public function later($delay, $job, $data = '', $queue = null)
6976
{
7077
$message = $this->context->createMessage($this->createPayload($job, $data));
7178

72-
$this->context->createProducer()
73-
->setDeliveryDelay($this->secondsUntil($delay) * 1000)
79+
if ($message instanceof AmqpMessage) {
80+
$message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT);
81+
}
7482

75-
->send($this->getQueue($queue), $message)
76-
;
83+
return $this->context->createProducer()
84+
->setDeliveryDelay($this->secondsUntil($delay) * 1000)
85+
->send($this->getQueue($queue), $message);
7786
}
7887

7988
public function pop($queue = null)

0 commit comments

Comments
 (0)