Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/CloudTasksQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ public function later($delay, $job, $data = '', $queue = null)
protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0)
{
$queue = $this->getQueue($queue);
$queueName = $this->client->queueName(Config::project(), Config::location(), $queue);
$queueName = $this->client->queueName(Config::project($this->connectionName), Config::location($this->connectionName), $queue);
$availableAt = $this->availableAt($delay);

$httpRequest = $this->createHttpRequest();
$httpRequest->setUrl(Config::handler());
$httpRequest->setUrl(Config::handler($this->connectionName));
$httpRequest->setHttpMethod(HttpMethod::POST);
$httpRequest->setBody($payload);

$task = $this->createTask();
$task->setHttpRequest($httpRequest);

$token = new OidcToken;
$token->setServiceAccountEmail(Config::serviceAccountEmail());
$token->setServiceAccountEmail(Config::serviceAccountEmail($this->connectionName));
$httpRequest->setOidcToken($token);

if ($availableAt > time()) {
Expand Down
20 changes: 10 additions & 10 deletions src/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,29 @@

class Config
{
public static function credentials()
public static function credentials($connection = 'cloudtasks')
{
return config('queue.connections.cloudtasks.credentials');
return config("queue.connections.{$connection}.credentials");
}

public static function project()
public static function project($connection = 'cloudtasks')
{
return config('queue.connections.cloudtasks.project');
return config("queue.connections.{$connection}.project");
}

public static function location()
public static function location($connection = 'cloudtasks')
{
return config('queue.connections.cloudtasks.location');
return config("queue.connections.{$connection}.location");
}

public static function handler()
public static function handler($connection = 'cloudtasks')
{
return config('queue.connections.cloudtasks.handler');
return config("queue.connections.{$connection}.handler");
}

public static function serviceAccountEmail()
public static function serviceAccountEmail($connection = 'cloudtasks')
{
return config('queue.connections.cloudtasks.service_account_email');
return config("queue.connections.{$connection}.service_account_email");
}

public static function validate(array $config)
Expand Down
37 changes: 20 additions & 17 deletions src/TaskHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@ public function __construct(CloudTasksClient $client, Request $request, OpenIdVe
*/
public function handle($task = null)
{
$this->authorizeRequest();

$task = $task ?: $this->captureTask();

$this->listenForEvents();
$command = unserialize($task['data']['command']);
$connection = $command->connection ?? 'cloudtasks';
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to flip the order here a little. The payload is being unserialized and read for the connection value as it cannot be authorized without knowing which connection to use! 😄


$this->authorizeRequest($connection);

$this->listenForEvents($connection);

$this->handleTask($task);
$this->handleTask($connection, $task);
}

/**
* @throws CloudTasksException
*/
public function authorizeRequest()
public function authorizeRequest($connection)
{
if (!$this->request->hasHeader('Authorization')) {
throw new CloudTasksException('Missing [Authorization] header');
Expand All @@ -49,7 +52,7 @@ public function authorizeRequest()

$decodedToken = $this->publicKey->decodeOpenIdToken($openIdToken, $kid);

$this->validateToken($decodedToken);
$this->validateToken($connection, $decodedToken);
}

/**
Expand All @@ -58,13 +61,13 @@ public function authorizeRequest()
* @param $openIdToken
* @throws CloudTasksException
*/
protected function validateToken($openIdToken)
protected function validateToken($connection, $openIdToken)
{
if (!in_array($openIdToken->iss, ['https://accounts.google.com', 'accounts.google.com'])) {
throw new CloudTasksException('The given OpenID token is not valid');
}

if ($openIdToken->aud != Config::handler()) {
if ($openIdToken->aud != Config::handler($connection)) {
throw new CloudTasksException('The given OpenID token is not valid');
}

Expand Down Expand Up @@ -93,11 +96,11 @@ private function captureTask()
return $task;
}

private function listenForEvents()
private function listenForEvents($connection)
{
app('events')->listen(JobFailed::class, function ($event) {
app('events')->listen(JobFailed::class, function ($event) use ($connection) {
app('queue.failer')->log(
'cloudtasks', $event->job->getQueue(),
$connection, $event->job->getQueue(),
$event->job->getRawBody(), $event->exception
);
});
Expand All @@ -107,24 +110,24 @@ private function listenForEvents()
* @param $task
* @throws CloudTasksException
*/
private function handleTask($task)
private function handleTask($connection, $task)
{
$job = new CloudTasksJob($task);

$job->setAttempts(request()->header('X-CloudTasks-TaskRetryCount') + 1);
$job->setQueue(request()->header('X-Cloudtasks-Queuename'));
$job->setMaxTries($this->getQueueMaxTries($job));
$job->setMaxTries($this->getQueueMaxTries($connection, $job));

$worker = $this->getQueueWorker();

$worker->process('cloudtasks', $job, new WorkerOptions());
$worker->process($connection, $job, new WorkerOptions());
}

private function getQueueMaxTries(CloudTasksJob $job)
private function getQueueMaxTries($connection, CloudTasksJob $job)
{
$queueName = $this->client->queueName(
Config::project(),
Config::location(),
Config::project($connection),
Config::location($connection),
$job->getQueue()
);

Expand Down