Skip to content

Process queue command #344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
"symfony/mailchimp-mailer": "^6.4",
"symfony/sendgrid-mailer": "^6.4",
"symfony/twig-bundle": "^6.4",
"symfony/messenger": "^6.4"
"symfony/messenger": "^6.4",
"symfony/lock": "^6.4"
},
"require-dev": {
"phpunit/phpunit": "^9.5",
Expand Down
5 changes: 0 additions & 5 deletions config/services/managers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,3 @@ services:
PhpList\Core\Domain\Configuration\Service\Manager\ConfigManager:
autowire: true
autoconfigure: true

PhpList\Core\Domain\Subscription\Service\SubscriberDeletionService:
autowire: true
autoconfigure: true
public: true
4 changes: 4 additions & 0 deletions config/services/providers.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
services:
PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider:
autowire: true
autoconfigure: true
5 changes: 5 additions & 0 deletions config/services/repositories.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,8 @@ services:
parent: PhpList\Core\Domain\Common\Repository\AbstractRepository
arguments:
- PhpList\Core\Domain\Subscription\Model\SubscriberHistory

PhpList\Core\Domain\Messaging\Repository\ListMessageRepository:
parent: PhpList\Core\Domain\Common\Repository\AbstractRepository
arguments:
- PhpList\Core\Domain\Subscription\Model\ListMessage
10 changes: 10 additions & 0 deletions config/services/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ services:
autoconfigure: true
arguments:
$defaultFromEmail: '%app.mailer_from%'

PhpList\Core\Domain\Subscription\Service\SubscriberDeletionService:
autowire: true
autoconfigure: true
public: true

PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator:
autowire: true
autoconfigure: true
public: true
106 changes: 106 additions & 0 deletions src/Domain/Messaging/Command/ProcessQueueCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

declare(strict_types=1);

namespace PhpList\Core\Domain\Messaging\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Doctrine\ORM\EntityManagerInterface;
use PhpList\Core\Domain\Messaging\Model\Message;
use PhpList\Core\Domain\Messaging\Repository\MessageRepository;
use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator;
use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\Email;
use Throwable;

class ProcessQueueCommand extends Command
{
protected static $defaultName = 'phplist:process-queue';

private MessageRepository $messageRepository;
private MailerInterface $mailer;
private LockFactory $lockFactory;
private EntityManagerInterface $entityManager;
private SubscriberProvider $subscriberProvider;
private MessageProcessingPreparator $messageProcessingPreparator;

public function __construct(
MessageRepository $messageRepository,
MailerInterface $mailer,
LockFactory $lockFactory,
EntityManagerInterface $entityManager,
SubscriberProvider $subscriberProvider,
MessageProcessingPreparator $messageProcessingPreparator
) {
parent::__construct();
$this->messageRepository = $messageRepository;
$this->mailer = $mailer;
$this->lockFactory = $lockFactory;
$this->entityManager = $entityManager;
$this->subscriberProvider = $subscriberProvider;
$this->messageProcessingPreparator = $messageProcessingPreparator;
}

/**
* @SuppressWarnings("PHPMD.UnusedFormalParameter")
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$lock = $this->lockFactory->createLock('queue_processor');
if (!$lock->acquire()) {
$output->writeln('Queue is already being processed by another instance.');

return Command::FAILURE;
}

try {
$this->messageProcessingPreparator->ensureSubscribersHaveUuid($output);
$this->messageProcessingPreparator->ensureCampaignsHaveUuid($output);

$campaigns = $this->messageRepository->findBy(['status' => 'submitted']);

foreach ($campaigns as $campaign) {
$this->processCampaign($campaign, $output);
}
} finally {
$lock->release();
}

return Command::SUCCESS;
}

private function processCampaign(Message $campaign, OutputInterface $output): void
{
$subscribers = $this->subscriberProvider->getSubscribersForMessage($campaign);
// todo: check $ISPrestrictions logic
foreach ($subscribers as $subscriber) {
if (!filter_var($subscriber->getEmail(), FILTER_VALIDATE_EMAIL)) {
continue;
}

$email = (new Email())
->from('[email protected]')
->to($subscriber->getEmail())
->subject($campaign->getContent()->getSubject())
->text($campaign->getContent()->getTextMessage())
->html($campaign->getContent()->getText());

try {
$this->mailer->send($email);

// todo: log somewhere that this subscriber got email
} catch (Throwable $e) {
$output->writeln('Failed to send to: ' . $subscriber->getEmail());
}

usleep(100000);
}

$campaign->getMetadata()->setStatus('sent');
$this->entityManager->flush();
}
}
6 changes: 6 additions & 0 deletions src/Domain/Messaging/Model/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ public function getUuid(): ?string
return $this->uuid;
}

public function setUuid(string $uuid): self
{
$this->uuid = $uuid;
return $this;
}

public function getOwner(): ?Administrator
{
return $this->owner;
Expand Down
11 changes: 11 additions & 0 deletions src/Domain/Messaging/Repository/ListMessageRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,15 @@
class ListMessageRepository extends AbstractRepository implements PaginatableRepositoryInterface
{
use CursorPaginationTrait;

/** @return int[] */
public function getListIdsByMessageId(int $messageId): array
{
return $this->createQueryBuilder('lm')
->select('IDENTITY(lm.list)')
->where('lm.messageId = :messageId')
->setParameter('messageId', $messageId)
->getQuery()
->getSingleColumnResult();
}
}
12 changes: 12 additions & 0 deletions src/Domain/Messaging/Repository/MessageRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@

class MessageRepository extends AbstractRepository implements PaginatableRepositoryInterface
{
/**
* @return Message[]
*/
public function findCampaignsWithoutUuid(): array
{
return $this->createQueryBuilder('m')
->where('m.uuid IS NULL OR m.uuid = :emptyString')
->setParameter('emptyString', '')
->getQuery()
->getResult();
}

public function getByOwnerId(int $ownerId): array
{
return $this->createQueryBuilder('m')
Expand Down
55 changes: 55 additions & 0 deletions src/Domain/Messaging/Service/MessageProcessingPreparator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

declare(strict_types=1);

namespace PhpList\Core\Domain\Messaging\Service;

use Doctrine\ORM\EntityManagerInterface;
use PhpList\Core\Domain\Messaging\Repository\MessageRepository;
use PhpList\Core\Domain\Subscription\Repository\SubscriberRepository;
use Symfony\Component\Console\Output\OutputInterface;

class MessageProcessingPreparator
{
private EntityManagerInterface $entityManager;
private SubscriberRepository $subscriberRepository;
private MessageRepository $messageRepository;

public function __construct(
EntityManagerInterface $entityManager,
SubscriberRepository $subscriberRepository,
MessageRepository $messageRepository
) {
$this->entityManager = $entityManager;
$this->subscriberRepository = $subscriberRepository;
$this->messageRepository = $messageRepository;
}

public function ensureSubscribersHaveUuid(OutputInterface $output): void
{
$subscribersWithoutUuid = $this->subscriberRepository->findSubscribersWithoutUuid();

$numSubscribers = count($subscribersWithoutUuid);
if ($numSubscribers > 0) {
$output->writeln(sprintf('Giving a UUID to %d subscribers, this may take a while', $numSubscribers));
foreach ($subscribersWithoutUuid as $subscriber) {
$subscriber->setUniqueId(bin2hex(random_bytes(16)));
}
$this->entityManager->flush();
}
}

public function ensureCampaignsHaveUuid(OutputInterface $output): void
{
$campaignsWithoutUuid = $this->messageRepository->findCampaignsWithoutUuid();

$numCampaigns = count($campaignsWithoutUuid);
if ($numCampaigns > 0) {
$output->writeln(sprintf('Giving a UUID to %d campaigns', $numCampaigns));
foreach ($campaignsWithoutUuid as $campaign) {
$campaign->setUuid(bin2hex(random_bytes(18)));
}
$this->entityManager->flush();
}
}
}
12 changes: 12 additions & 0 deletions src/Domain/Subscription/Repository/SubscriberRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@
*/
class SubscriberRepository extends AbstractRepository implements PaginatableRepositoryInterface
{
/**
* @return Subscriber[]
*/
public function findSubscribersWithoutUuid(): array
{
return $this->createQueryBuilder('s')
->where('s.uniqueId IS NULL OR s.uniqueId = :emptyString')
->setParameter('emptyString', '')
->getQuery()
->getResult();
}

public function findOneByEmail(string $email): ?Subscriber
{
return $this->findOneBy(['email' => $email]);
Expand Down
45 changes: 45 additions & 0 deletions src/Domain/Subscription/Service/Provider/SubscriberProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace PhpList\Core\Domain\Subscription\Service\Provider;

use PhpList\Core\Domain\Messaging\Model\Message;
use PhpList\Core\Domain\Messaging\Repository\ListMessageRepository;
use PhpList\Core\Domain\Subscription\Model\Subscriber;
use PhpList\Core\Domain\Subscription\Repository\SubscriberRepository;

class SubscriberProvider
{
private ListMessageRepository $listMessageRepository;
private SubscriberRepository $subscriberRepository;

public function __construct(
ListMessageRepository $listMessageRepository,
SubscriberRepository $subscriberRepository
) {
$this->listMessageRepository = $listMessageRepository;
$this->subscriberRepository = $subscriberRepository;
}

/**
* Get subscribers for a message
*
* @param Message $message The message to get subscribers for
* @return Subscriber[] Array of subscribers
*/
public function getSubscribersForMessage(Message $message): array
{
$listIds = $this->listMessageRepository->getListIdsByMessageId($message->getId());

$subscribers = [];
foreach ($listIds as $listId) {
$listSubscribers = $this->subscriberRepository->getSubscribersBySubscribedListId($listId);
foreach ($listSubscribers as $subscriber) {
$subscribers[$subscriber->getId()] = $subscriber;
}
}

return array_values($subscribers);
}
}
Loading
Loading