From d1c1d2c64fe37439747ecec90037508d072cf446 Mon Sep 17 00:00:00 2001 From: Alexander Trauzzi Date: Fri, 4 Jun 2021 10:21:07 -0500 Subject: [PATCH 1/5] Parameterize config to accept connection names. - `TaskHandler` still needs to be updated. Might look for some input there. --- src/CloudTasksQueue.php | 6 +++--- src/Config.php | 20 ++++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index 7c892a4..102841c 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -52,11 +52,11 @@ public function later($delay, $job, $data = '', $queue = null) protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0) { $queue = $this->getQueue($queue); - $queueName = $this->client->queueName(Config::project(), Config::location(), $queue); + $queueName = $this->client->queueName(Config::project($this->connectionName), Config::location($this->connectionName), $queue); $availableAt = $this->availableAt($delay); $httpRequest = $this->createHttpRequest(); - $httpRequest->setUrl(Config::handler()); + $httpRequest->setUrl(Config::handler($this->connectionName)); $httpRequest->setHttpMethod(HttpMethod::POST); $httpRequest->setBody($payload); @@ -64,7 +64,7 @@ protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0) $task->setHttpRequest($httpRequest); $token = new OidcToken; - $token->setServiceAccountEmail(Config::serviceAccountEmail()); + $token->setServiceAccountEmail(Config::serviceAccountEmail($this->connectionName)); $httpRequest->setOidcToken($token); if ($availableAt > time()) { diff --git a/src/Config.php b/src/Config.php index f0ea54d..11c274a 100644 --- a/src/Config.php +++ b/src/Config.php @@ -6,29 +6,29 @@ class Config { - public static function credentials() + public static function credentials($connection = 'cloudtasks') { - return config('queue.connections.cloudtasks.credentials'); + return config("queue.connections.{$connection}.credentials"); } - public static function project() + public static function project($connection = 'cloudtasks') { - return config('queue.connections.cloudtasks.project'); + return config("queue.connections.{$connection}.project"); } - public static function location() + public static function location($connection = 'cloudtasks') { - return config('queue.connections.cloudtasks.location'); + return config("queue.connections.{$connection}.location"); } - public static function handler() + public static function handler($connection = 'cloudtasks') { - return config('queue.connections.cloudtasks.handler'); + return config("queue.connections.{$connection}.handler"); } - public static function serviceAccountEmail() + public static function serviceAccountEmail($connection = 'cloudtasks') { - return config('queue.connections.cloudtasks.service_account_email'); + return config("queue.connections.{$connection}.service_account_email"); } public static function validate(array $config) From f0baa25c906b2ac4a607b026ff237b5755e94505 Mon Sep 17 00:00:00 2001 From: Alexander Trauzzi Date: Fri, 4 Jun 2021 11:13:53 -0500 Subject: [PATCH 2/5] First pass at updating TaskHandler. - This attempts to obtain the connection name as early as possible so that it can be passed as needed. - We'll continue to default to `"cloudtasks"` --- src/TaskHandler.php | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/src/TaskHandler.php b/src/TaskHandler.php index 58be40c..0796b16 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -26,19 +26,22 @@ public function __construct(CloudTasksClient $client, Request $request, OpenIdVe */ public function handle($task = null) { - $this->authorizeRequest(); - $task = $task ?: $this->captureTask(); - $this->listenForEvents(); + $command = unserialize($task['data']['command']); + $connection = $command->connection ?? 'cloudtasks'; + + $this->authorizeRequest($connection); + + $this->listenForEvents($connection); - $this->handleTask($task); + $this->handleTask($connection, $task); } /** * @throws CloudTasksException */ - public function authorizeRequest() + public function authorizeRequest($connection) { if (!$this->request->hasHeader('Authorization')) { throw new CloudTasksException('Missing [Authorization] header'); @@ -49,7 +52,7 @@ public function authorizeRequest() $decodedToken = $this->publicKey->decodeOpenIdToken($openIdToken, $kid); - $this->validateToken($decodedToken); + $this->validateToken($connection, $decodedToken); } /** @@ -58,13 +61,13 @@ public function authorizeRequest() * @param $openIdToken * @throws CloudTasksException */ - protected function validateToken($openIdToken) + protected function validateToken($connection, $openIdToken) { if (!in_array($openIdToken->iss, ['https://accounts.google.com', 'accounts.google.com'])) { throw new CloudTasksException('The given OpenID token is not valid'); } - if ($openIdToken->aud != Config::handler()) { + if ($openIdToken->aud != Config::handler($connection)) { throw new CloudTasksException('The given OpenID token is not valid'); } @@ -93,11 +96,11 @@ private function captureTask() return $task; } - private function listenForEvents() + private function listenForEvents($connection) { - app('events')->listen(JobFailed::class, function ($event) { + app('events')->listen(JobFailed::class, function ($event) use ($connection) { app('queue.failer')->log( - 'cloudtasks', $event->job->getQueue(), + $connection, $event->job->getQueue(), $event->job->getRawBody(), $event->exception ); }); @@ -107,24 +110,24 @@ private function listenForEvents() * @param $task * @throws CloudTasksException */ - private function handleTask($task) + private function handleTask($connection, $task) { $job = new CloudTasksJob($task); $job->setAttempts(request()->header('X-CloudTasks-TaskRetryCount') + 1); $job->setQueue(request()->header('X-Cloudtasks-Queuename')); - $job->setMaxTries($this->getQueueMaxTries($job)); + $job->setMaxTries($this->getQueueMaxTries($connection, $job)); $worker = $this->getQueueWorker(); - $worker->process('cloudtasks', $job, new WorkerOptions()); + $worker->process($connection, $job, new WorkerOptions()); } - private function getQueueMaxTries(CloudTasksJob $job) + private function getQueueMaxTries($connection, CloudTasksJob $job) { $queueName = $this->client->queueName( - Config::project(), - Config::location(), + Config::project($connection), + Config::location($connection), $job->getQueue() ); From acc5e91436a98c5fe67d638c22c9fd1c7ec85fdf Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 4 Jun 2021 21:35:01 +0200 Subject: [PATCH 3/5] Fix failing test --- tests/TaskHandlerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/TaskHandlerTest.php b/tests/TaskHandlerTest.php index 24a5343..845230c 100644 --- a/tests/TaskHandlerTest.php +++ b/tests/TaskHandlerTest.php @@ -77,7 +77,7 @@ public function it_needs_an_authorization_header() $this->expectException(CloudTasksException::class); $this->expectExceptionMessage('Missing [Authorization] header'); - $this->handler->handle(); + $this->handler->handle($this->simpleJob()); } /** @test */ From 62a03c4e910e865eed98eb754989af4210de0ca3 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 4 Jun 2021 21:39:13 +0200 Subject: [PATCH 4/5] Add support for custom connection name and update tests to test this scenario --- src/TaskHandler.php | 2 +- tests/TaskHandlerTest.php | 2 +- tests/TestCase.php | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/TaskHandler.php b/src/TaskHandler.php index 0796b16..04a62d6 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -29,7 +29,7 @@ public function handle($task = null) $task = $task ?: $this->captureTask(); $command = unserialize($task['data']['command']); - $connection = $command->connection ?? 'cloudtasks'; + $connection = $command->connection ?? config('queue.default'); $this->authorizeRequest($connection); diff --git a/tests/TaskHandlerTest.php b/tests/TaskHandlerTest.php index 845230c..c0d2f3e 100644 --- a/tests/TaskHandlerTest.php +++ b/tests/TaskHandlerTest.php @@ -165,7 +165,7 @@ public function after_max_attempts_it_will_log_to_failed_table() } $this->assertDatabaseHas('failed_jobs', [ - 'connection' => 'cloudtasks', + 'connection' => 'my-cloudtasks-connection', 'queue' => 'my-queue', 'payload' => rtrim($this->failingJobPayload()), ]); diff --git a/tests/TestCase.php b/tests/TestCase.php index 71d043d..f0fdf78 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -59,8 +59,8 @@ protected function getEnvironmentSetUp($app) } $app['config']->set('cache.default', 'file'); - $app['config']->set('queue.default', 'cloudtasks'); - $app['config']->set('queue.connections.cloudtasks', [ + $app['config']->set('queue.default', 'my-cloudtasks-connection'); + $app['config']->set('queue.connections.my-cloudtasks-connection', [ 'driver' => 'cloudtasks', 'queue' => 'test-queue', 'project' => 'test-project', @@ -72,6 +72,6 @@ protected function getEnvironmentSetUp($app) protected function setConfigValue($key, $value) { - $this->app['config']->set('queue.connections.cloudtasks.' . $key, $value); + $this->app['config']->set('queue.connections.my-cloudtasks-connection.' . $key, $value); } } From c17011ef11fbb866472a3c999af89b9adba71441 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 4 Jun 2021 22:13:19 +0200 Subject: [PATCH 5/5] Refactor accessing queue configuration --- src/CloudTasksQueue.php | 8 ++++--- src/Config.php | 25 ---------------------- src/TaskHandler.php | 46 +++++++++++++++++++++++++---------------- 3 files changed, 33 insertions(+), 46 deletions(-) diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index 102841c..73b7764 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -18,11 +18,13 @@ class CloudTasksQueue extends LaravelQueue implements QueueContract private $client; private $default; + private $config; public function __construct(array $config, CloudTasksClient $client) { $this->client = $client; $this->default = $config['queue']; + $this->config = $config; } public function size($queue = null) @@ -52,11 +54,11 @@ public function later($delay, $job, $data = '', $queue = null) protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0) { $queue = $this->getQueue($queue); - $queueName = $this->client->queueName(Config::project($this->connectionName), Config::location($this->connectionName), $queue); + $queueName = $this->client->queueName($this->config['project'], $this->config['location'], $queue); $availableAt = $this->availableAt($delay); $httpRequest = $this->createHttpRequest(); - $httpRequest->setUrl(Config::handler($this->connectionName)); + $httpRequest->setUrl($this->config['handler']); $httpRequest->setHttpMethod(HttpMethod::POST); $httpRequest->setBody($payload); @@ -64,7 +66,7 @@ protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0) $task->setHttpRequest($httpRequest); $token = new OidcToken; - $token->setServiceAccountEmail(Config::serviceAccountEmail($this->connectionName)); + $token->setServiceAccountEmail($this->config['service_account_email']); $httpRequest->setOidcToken($token); if ($availableAt > time()) { diff --git a/src/Config.php b/src/Config.php index 11c274a..ab82603 100644 --- a/src/Config.php +++ b/src/Config.php @@ -6,31 +6,6 @@ class Config { - public static function credentials($connection = 'cloudtasks') - { - return config("queue.connections.{$connection}.credentials"); - } - - public static function project($connection = 'cloudtasks') - { - return config("queue.connections.{$connection}.project"); - } - - public static function location($connection = 'cloudtasks') - { - return config("queue.connections.{$connection}.location"); - } - - public static function handler($connection = 'cloudtasks') - { - return config("queue.connections.{$connection}.handler"); - } - - public static function serviceAccountEmail($connection = 'cloudtasks') - { - return config("queue.connections.{$connection}.service_account_email"); - } - public static function validate(array $config) { if (empty($config['project'])) { diff --git a/src/TaskHandler.php b/src/TaskHandler.php index 04a62d6..a8b2b5f 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -12,6 +12,7 @@ class TaskHandler { private $request; private $publicKey; + private $config; public function __construct(CloudTasksClient $client, Request $request, OpenIdVerificator $publicKey) { @@ -28,20 +29,29 @@ public function handle($task = null) { $task = $task ?: $this->captureTask(); - $command = unserialize($task['data']['command']); - $connection = $command->connection ?? config('queue.default'); + $this->loadQueueConnectionConfiguration($task); - $this->authorizeRequest($connection); + $this->authorizeRequest(); - $this->listenForEvents($connection); + $this->listenForEvents(); + + $this->handleTask($task); + } - $this->handleTask($connection, $task); + private function loadQueueConnectionConfiguration($task) + { + $command = unserialize($task['data']['command']); + $connection = $command->connection ?? config('queue.default'); + $this->config = array_merge( + config("queue.connections.{$connection}"), + ['connection' => $connection] + ); } /** * @throws CloudTasksException */ - public function authorizeRequest($connection) + public function authorizeRequest() { if (!$this->request->hasHeader('Authorization')) { throw new CloudTasksException('Missing [Authorization] header'); @@ -52,7 +62,7 @@ public function authorizeRequest($connection) $decodedToken = $this->publicKey->decodeOpenIdToken($openIdToken, $kid); - $this->validateToken($connection, $decodedToken); + $this->validateToken($decodedToken); } /** @@ -61,13 +71,13 @@ public function authorizeRequest($connection) * @param $openIdToken * @throws CloudTasksException */ - protected function validateToken($connection, $openIdToken) + protected function validateToken($openIdToken) { if (!in_array($openIdToken->iss, ['https://accounts.google.com', 'accounts.google.com'])) { throw new CloudTasksException('The given OpenID token is not valid'); } - if ($openIdToken->aud != Config::handler($connection)) { + if ($openIdToken->aud != $this->config['handler']) { throw new CloudTasksException('The given OpenID token is not valid'); } @@ -96,11 +106,11 @@ private function captureTask() return $task; } - private function listenForEvents($connection) + private function listenForEvents() { - app('events')->listen(JobFailed::class, function ($event) use ($connection) { + app('events')->listen(JobFailed::class, function ($event) { app('queue.failer')->log( - $connection, $event->job->getQueue(), + $this->config['connection'], $event->job->getQueue(), $event->job->getRawBody(), $event->exception ); }); @@ -110,24 +120,24 @@ private function listenForEvents($connection) * @param $task * @throws CloudTasksException */ - private function handleTask($connection, $task) + private function handleTask($task) { $job = new CloudTasksJob($task); $job->setAttempts(request()->header('X-CloudTasks-TaskRetryCount') + 1); $job->setQueue(request()->header('X-Cloudtasks-Queuename')); - $job->setMaxTries($this->getQueueMaxTries($connection, $job)); + $job->setMaxTries($this->getQueueMaxTries($job)); $worker = $this->getQueueWorker(); - $worker->process($connection, $job, new WorkerOptions()); + $worker->process($this->config['connection'], $job, new WorkerOptions()); } - private function getQueueMaxTries($connection, CloudTasksJob $job) + private function getQueueMaxTries(CloudTasksJob $job) { $queueName = $this->client->queueName( - Config::project($connection), - Config::location($connection), + $this->config['project'], + $this->config['location'], $job->getQueue() );