diff --git a/config/parameters.yml.dist b/config/parameters.yml.dist index 54c649d8..e34a7d2b 100644 --- a/config/parameters.yml.dist +++ b/config/parameters.yml.dist @@ -68,3 +68,16 @@ parameters: graylog_host: 'graylog.example.com' graylog_port: 12201 + + app.phplist_isp_conf_path: '%%env(APP_PHPLIST_ISP_CONF_PATH)%%' + env(APP_PHPLIST_ISP_CONF_PATH): '/etc/phplist.conf' + + # Message sending + messaging.mail_queue_batch_size: '%%env(MAILQUEUE_BATCH_SIZE)%%' + env(MAILQUEUE_BATCH_SIZE): '5' + messaging.mail_queue_period: '%%env(MAILQUEUE_BATCH_PERIOD)%%' + env(MAILQUEUE_BATCH_PERIOD): '5' + messaging.mail_queue_throttle: '%%env(MAILQUEUE_THROTTLE)%%' + env(MAILQUEUE_THROTTLE): '5' + messaging.max_process_time: '%%env(MESSAGING_MAX_PROCESS_TIME)%%' + env(MESSAGING_MAX_PROCESS_TIME): '600' diff --git a/config/services.yml b/config/services.yml index 47be8241..b21dc5aa 100644 --- a/config/services.yml +++ b/config/services.yml @@ -7,10 +7,6 @@ services: autoconfigure: true public: false - PhpList\Core\Core\ConfigProvider: - arguments: - $config: '%app.config%' - PhpList\Core\Core\ApplicationStructure: public: true diff --git a/config/services/providers.yml b/config/services/providers.yml index 226c4e81..bb4524c3 100644 --- a/config/services/providers.yml +++ b/config/services/providers.yml @@ -2,3 +2,13 @@ services: PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider: autowire: true autoconfigure: true + + PhpList\Core\Core\ConfigProvider: + arguments: + $config: '%app.config%' + + PhpList\Core\Domain\Common\IspRestrictionsProvider: + autowire: true + autoconfigure: true + arguments: + $confPath: '%app.phplist_isp_conf_path%' diff --git a/config/services/services.yml b/config/services/services.yml index f1b68e74..1afd1fc5 100644 --- a/config/services/services.yml +++ b/config/services/services.yml @@ -36,6 +36,14 @@ services: autoconfigure: true public: true + PhpList\Core\Domain\Messaging\Service\SendRateLimiter: + autowire: true + autoconfigure: true + arguments: + $mailqueueBatchSize: '%messaging.mail_queue_batch_size%' + $mailqueueBatchPeriod: '%messaging.mail_queue_period%' + $mailqueueThrottle: '%messaging.mail_queue_throttle%' + PhpList\Core\Domain\Common\ClientIpResolver: autowire: true autoconfigure: true @@ -44,6 +52,10 @@ services: autowire: true autoconfigure: true + PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer: + autowire: true + autoconfigure: true + PhpList\Core\Domain\Messaging\Service\ConsecutiveBounceHandler: autowire: true autoconfigure: true @@ -108,6 +120,13 @@ services: arguments: - !tagged_iterator { tag: 'phplist.bounce_action_handler' } + PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter: + autowire: true + autoconfigure: true + arguments: + $maxSeconds: '%messaging.max_process_time%' + + PhpList\Core\Domain\Identity\Service\PermissionChecker: autowire: true autoconfigure: true diff --git a/src/Domain/Common/IspRestrictionsProvider.php b/src/Domain/Common/IspRestrictionsProvider.php new file mode 100644 index 00000000..4095f5ce --- /dev/null +++ b/src/Domain/Common/IspRestrictionsProvider.php @@ -0,0 +1,137 @@ +readConfigFile(); + if ($contents === null) { + return new IspRestrictions(null, null, null); + } + + [$raw, $maxBatch, $minBatchPeriod, $lockFile] = $this->parseContents($contents); + + $this->logIfDetected($maxBatch, $minBatchPeriod, $lockFile); + + return new IspRestrictions($maxBatch, $minBatchPeriod, $lockFile, $raw); + } + + private function readConfigFile(): ?string + { + if (!is_file($this->confPath) || !is_readable($this->confPath)) { + return null; + } + $contents = file_get_contents($this->confPath); + if ($contents === false) { + $this->logger->warning('Cannot read ISP restrictions file', ['path' => $this->confPath]); + return null; + } + return $contents; + } + + /** + * @return array{0: array, 1: ?int, 2: ?int, 3: ?string} + */ + private function parseContents(string $contents): array + { + $maxBatch = null; + $minBatchPeriod = null; + $lockFile = null; + $raw = []; + + foreach (preg_split('/\R/', $contents) as $line) { + [$key, $val] = $this->parseLine($line); + if ($key === null) { + continue; + } + $raw[$key] = $val; + [$maxBatch, $minBatchPeriod, $lockFile] = $this->applyKeyValue( + $key, + $val, + $maxBatch, + $minBatchPeriod, + $lockFile + ); + } + + return [$raw, $maxBatch, $minBatchPeriod, $lockFile]; + } + + /** + * @return array{0: ?string, 1: string} + */ + private function parseLine(string $line): array + { + $line = trim($line); + if ($line === '' || str_starts_with($line, '#') || str_starts_with($line, ';')) { + return [null, '']; + } + $parts = explode('=', $line, 2); + if (\count($parts) !== 2) { + return [null, '']; + } + + return array_map('trim', $parts); + } + + /** + * @param string $key + * @param string $val + * @param ?int $maxBatch + * @param ?int $minBatchPeriod + * @param ?string $lockFile + * @return array{0: ?int, 1: ?int, 2: ?string} + */ + private function applyKeyValue( + string $key, + string $val, + ?int $maxBatch, + ?int $minBatchPeriod, + ?string $lockFile + ): array { + if ($key === 'maxbatch') { + if ($val !== '' && ctype_digit($val)) { + $maxBatch = (int) $val; + } + return [$maxBatch, $minBatchPeriod, $lockFile]; + } + if ($key === 'minbatchperiod') { + if ($val !== '' && ctype_digit($val)) { + $minBatchPeriod = (int) $val; + } + return [$maxBatch, $minBatchPeriod, $lockFile]; + } + if ($key === 'lockfile') { + if ($val !== '') { + $lockFile = $val; + } + return [$maxBatch, $minBatchPeriod, $lockFile]; + } + return [$maxBatch, $minBatchPeriod, $lockFile]; + } + + private function logIfDetected(?int $maxBatch, ?int $minBatchPeriod, ?string $lockFile): void + { + if ($maxBatch !== null || $minBatchPeriod !== null || $lockFile !== null) { + $this->logger->info('ISP restrictions detected', [ + 'path' => $this->confPath, + 'maxbatch' => $maxBatch, + 'minbatchperiod' => $minBatchPeriod, + 'lockfile' => $lockFile, + ]); + } + } +} diff --git a/src/Domain/Common/Model/IspRestrictions.php b/src/Domain/Common/Model/IspRestrictions.php new file mode 100644 index 00000000..c3fc56b4 --- /dev/null +++ b/src/Domain/Common/Model/IspRestrictions.php @@ -0,0 +1,21 @@ +maxBatch === null && $this->minBatchPeriod === null && $this->lockFile === null; + } +} diff --git a/src/Domain/Configuration/Service/Manager/ConfigManager.php b/src/Domain/Configuration/Service/Manager/ConfigManager.php index cae380be..1a9c356f 100644 --- a/src/Domain/Configuration/Service/Manager/ConfigManager.php +++ b/src/Domain/Configuration/Service/Manager/ConfigManager.php @@ -17,6 +17,12 @@ public function __construct(ConfigRepository $configRepository) $this->configRepository = $configRepository; } + public function inMaintenanceMode(): bool + { + $config = $this->getByItem('maintenancemode'); + return $config?->getValue() === '1'; + } + /** * Get a configuration item by its key */ diff --git a/src/Domain/Messaging/Command/ProcessQueueCommand.php b/src/Domain/Messaging/Command/ProcessQueueCommand.php index 820d403d..d2c7cbfa 100644 --- a/src/Domain/Messaging/Command/ProcessQueueCommand.php +++ b/src/Domain/Messaging/Command/ProcessQueueCommand.php @@ -4,6 +4,9 @@ namespace PhpList\Core\Domain\Messaging\Command; +use DateTimeImmutable; +use PhpList\Core\Domain\Configuration\Service\Manager\ConfigManager; +use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus; use PhpList\Core\Domain\Messaging\Repository\MessageRepository; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Messaging\Service\Processor\CampaignProcessor; @@ -24,18 +27,21 @@ class ProcessQueueCommand extends Command private LockFactory $lockFactory; private MessageProcessingPreparator $messagePreparator; private CampaignProcessor $campaignProcessor; + private ConfigManager $configManager; public function __construct( MessageRepository $messageRepository, LockFactory $lockFactory, MessageProcessingPreparator $messagePreparator, CampaignProcessor $campaignProcessor, + ConfigManager $configManager ) { parent::__construct(); $this->messageRepository = $messageRepository; $this->lockFactory = $lockFactory; $this->messagePreparator = $messagePreparator; $this->campaignProcessor = $campaignProcessor; + $this->configManager = $configManager; } /** @@ -50,11 +56,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int return Command::FAILURE; } + if ($this->configManager->inMaintenanceMode()) { + $output->writeln('The system is in maintenance mode, stopping. Try again later.'); + + return Command::FAILURE; + } + try { $this->messagePreparator->ensureSubscribersHaveUuid($output); $this->messagePreparator->ensureCampaignsHaveUuid($output); - $campaigns = $this->messageRepository->findBy(['status' => 'submitted']); + $campaigns = $this->messageRepository->getByStatusAndEmbargo( + status: MessageStatus::Submitted, + embargo: new DateTimeImmutable() + ); foreach ($campaigns as $campaign) { $this->campaignProcessor->process($campaign, $output); diff --git a/src/Domain/Messaging/Model/Dto/Message/MessageMetadataDto.php b/src/Domain/Messaging/Model/Dto/Message/MessageMetadataDto.php index 0776c1d1..91802df2 100644 --- a/src/Domain/Messaging/Model/Dto/Message/MessageMetadataDto.php +++ b/src/Domain/Messaging/Model/Dto/Message/MessageMetadataDto.php @@ -4,10 +4,12 @@ namespace PhpList\Core\Domain\Messaging\Model\Dto\Message; +use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus; + class MessageMetadataDto { public function __construct( - public readonly string $status, + public readonly MessageStatus $status, ) { } } diff --git a/src/Domain/Messaging/Model/Message/MessageMetadata.php b/src/Domain/Messaging/Model/Message/MessageMetadata.php index 123103ff..156539b2 100644 --- a/src/Domain/Messaging/Model/Message/MessageMetadata.php +++ b/src/Domain/Messaging/Model/Message/MessageMetadata.php @@ -6,6 +6,7 @@ use DateTime; use Doctrine\ORM\Mapping as ORM; +use InvalidArgumentException; use PhpList\Core\Domain\Common\Model\Interfaces\EmbeddableInterface; #[ORM\Embeddable] @@ -33,13 +34,13 @@ class MessageMetadata implements EmbeddableInterface private ?DateTime $sendStart; public function __construct( - ?string $status = null, + ?MessageStatus $status = null, int $bounceCount = 0, ?DateTime $entered = null, ?DateTime $sent = null, ?DateTime $sendStart = null, ) { - $this->status = $status; + $this->status = $status->value ?? null; $this->processed = false; $this->viewed = 0; $this->bounceCount = $bounceCount; @@ -48,14 +49,21 @@ public function __construct( $this->sendStart = $sendStart; } - public function getStatus(): ?string + /** + * @SuppressWarnings("PHPMD.StaticAccess") + */ + public function getStatus(): ?MessageStatus { - return $this->status; + return MessageStatus::from($this->status); } - public function setStatus(string $status): self + public function setStatus(MessageStatus $status): self { - $this->status = $status; + if (!$this->getStatus()->canTransitionTo($status)) { + throw new InvalidArgumentException('Invalid status transition'); + } + $this->status = $status->value; + return $this; } diff --git a/src/Domain/Messaging/Model/Message/MessageStatus.php b/src/Domain/Messaging/Model/Message/MessageStatus.php new file mode 100644 index 00000000..90b7f987 --- /dev/null +++ b/src/Domain/Messaging/Model/Message/MessageStatus.php @@ -0,0 +1,38 @@ + [self::Submitted], + self::Submitted => [self::Prepared, self::InProcess], + self::Prepared => [self::InProcess], + self::InProcess => [self::Sent, self::Suspended, self::Submitted], + self::Requeued => [self::InProcess, self::Suspended], + self::Sent => [], + }; + } + + public function canTransitionTo(self $next): bool + { + return in_array($next, $this->allowedTransitions(), true); + } +} diff --git a/src/Domain/Messaging/Model/Message/UserMessageStatus.php b/src/Domain/Messaging/Model/Message/UserMessageStatus.php new file mode 100644 index 00000000..1237cfe8 --- /dev/null +++ b/src/Domain/Messaging/Model/Message/UserMessageStatus.php @@ -0,0 +1,16 @@ +viewed; } - public function getStatus(): ?string + /** + * @SuppressWarnings("PHPMD.StaticAccess") + */ + public function getStatus(): ?UserMessageStatus { - return $this->status; + return UserMessageStatus::from($this->status); } public function setViewed(?DateTime $viewed): self @@ -76,9 +80,9 @@ public function setViewed(?DateTime $viewed): self return $this; } - public function setStatus(?string $status): self + public function setStatus(?UserMessageStatus $status): self { - $this->status = $status; + $this->status = $status->value; return $this; } } diff --git a/src/Domain/Messaging/Repository/MessageRepository.php b/src/Domain/Messaging/Repository/MessageRepository.php index 3da7ebf3..0ae8bc18 100644 --- a/src/Domain/Messaging/Repository/MessageRepository.php +++ b/src/Domain/Messaging/Repository/MessageRepository.php @@ -4,6 +4,7 @@ namespace PhpList\Core\Domain\Messaging\Repository; +use DateTimeImmutable; use PhpList\Core\Domain\Common\Model\Filter\FilterRequestInterface; use PhpList\Core\Domain\Common\Repository\AbstractRepository; use PhpList\Core\Domain\Common\Repository\Interfaces\PaginatableRepositoryInterface; @@ -74,4 +75,15 @@ public function incrementBounceCount(int $messageId): void ->getQuery() ->execute(); } + + public function getByStatusAndEmbargo(Message\MessageStatus $status, DateTimeImmutable $embargo): array + { + return $this->createQueryBuilder('m') + ->where('m.status = :status') + ->andWhere('m.embargo IS NULL OR m.embargo <= :embargo') + ->setParameter('status', $status->value) + ->setParameter('embargo', $embargo) + ->getQuery() + ->getResult(); + } } diff --git a/src/Domain/Messaging/Repository/UserMessageRepository.php b/src/Domain/Messaging/Repository/UserMessageRepository.php index a19c5823..e8268025 100644 --- a/src/Domain/Messaging/Repository/UserMessageRepository.php +++ b/src/Domain/Messaging/Repository/UserMessageRepository.php @@ -4,8 +4,32 @@ namespace PhpList\Core\Domain\Messaging\Repository; +use DateTimeInterface; use PhpList\Core\Domain\Common\Repository\AbstractRepository; +use PhpList\Core\Domain\Messaging\Model\Message; +use PhpList\Core\Domain\Messaging\Model\Message\UserMessageStatus; +use PhpList\Core\Domain\Messaging\Model\UserMessage; +use PhpList\Core\Domain\Subscription\Model\Subscriber; class UserMessageRepository extends AbstractRepository { + public function findOneByUserAndMessage(Subscriber $subscriber, Message $campaign): ?UserMessage + { + return $this->findOneBy(['user' => $subscriber, 'message' => $campaign]); + } + + /** + * Counts how many user messages have status "sent" since the given time. + */ + public function countSentSince(DateTimeInterface $since): int + { + $queryBuilder = $this->createQueryBuilder('um'); + $queryBuilder->select('COUNT(um)') + ->where('um.createdAt > :since') + ->andWhere('um.status = :status') + ->setParameter('since', $since) + ->setParameter('status', UserMessageStatus::Sent->value); + + return (int) $queryBuilder->getQuery()->getSingleScalarResult(); + } } diff --git a/src/Domain/Messaging/Service/Builder/MessageBuilder.php b/src/Domain/Messaging/Service/Builder/MessageBuilder.php index e8807170..85e74331 100644 --- a/src/Domain/Messaging/Service/Builder/MessageBuilder.php +++ b/src/Domain/Messaging/Service/Builder/MessageBuilder.php @@ -45,7 +45,7 @@ public function build(MessageDtoInterface $createMessageDto, object $context = n return $context->getExisting(); } - $metadata = new Message\MessageMetadata($createMessageDto->getMetadata()->status); + $metadata = new Message\MessageMetadata(Message\MessageStatus::Draft); return new Message($format, $schedule, $metadata, $content, $options, $context->getOwner(), $template); } diff --git a/src/Domain/Messaging/Service/Handler/RequeueHandler.php b/src/Domain/Messaging/Service/Handler/RequeueHandler.php new file mode 100644 index 00000000..6a1d9d95 --- /dev/null +++ b/src/Domain/Messaging/Service/Handler/RequeueHandler.php @@ -0,0 +1,60 @@ +getSchedule(); + $interval = $schedule->getRequeueInterval() ?? 0; + $until = $schedule->getRequeueUntil(); + + if ($interval <= 0) { + return false; + } + $now = new DateTime(); + if ($until instanceof DateTime && $now > $until) { + return false; + } + + $embargoIsInFuture = $schedule->getEmbargo() instanceof DateTime && $schedule->getEmbargo() > new DateTime(); + $base = $embargoIsInFuture ? clone $schedule->getEmbargo() : new DateTime(); + $next = (clone $base)->add(new DateInterval('PT' . max(1, $interval) . 'M')); + if ($until instanceof DateTime && $next > $until) { + return false; + } + + $schedule->setEmbargo($next); + $campaign->setSchedule($schedule); + $campaign->getMetadata()->setStatus(MessageStatus::Submitted); + $this->entityManager->flush(); + + $output?->writeln(sprintf( + 'Requeued campaign; next embargo at %s', + $next->format(DateTime::ATOM) + )); + $this->logger->info('Campaign requeued with new embargo', [ + 'campaign_id' => $campaign->getId(), + 'embargo' => $next->format(DateTime::ATOM), + ]); + + return true; + } +} diff --git a/src/Domain/Messaging/Service/Manager/MessageManager.php b/src/Domain/Messaging/Service/Manager/MessageManager.php index 7b263083..c73f31ca 100644 --- a/src/Domain/Messaging/Service/Manager/MessageManager.php +++ b/src/Domain/Messaging/Service/Manager/MessageManager.php @@ -43,6 +43,14 @@ public function updateMessage( return $message; } + public function updateStatus(Message $message, Message\MessageStatus $status): Message + { + $message->getMetadata()->setStatus($status); + $this->messageRepository->save($message); + + return $message; + } + public function delete(Message $message): void { $this->messageRepository->remove($message); diff --git a/src/Domain/Messaging/Service/MaxProcessTimeLimiter.php b/src/Domain/Messaging/Service/MaxProcessTimeLimiter.php new file mode 100644 index 00000000..c5269aaa --- /dev/null +++ b/src/Domain/Messaging/Service/MaxProcessTimeLimiter.php @@ -0,0 +1,46 @@ +maxSeconds = $maxSeconds ?? 600; + } + + public function start(): void + { + $this->startedAt = microtime(true); + } + + public function shouldStop(?OutputInterface $output = null): bool + { + if ($this->maxSeconds <= 0) { + return false; + } + if ($this->startedAt <= 0.0) { + $this->start(); + } + $elapsed = microtime(true) - $this->startedAt; + if ($elapsed >= $this->maxSeconds) { + $this->logger->warning(sprintf('Reached max processing time of %d seconds', $this->maxSeconds)); + $output?->writeln('Reached max processing time; stopping cleanly.'); + + return true; + } + + return false; + } +} diff --git a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php index 13a100a3..92313e28 100644 --- a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php +++ b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php @@ -6,70 +6,126 @@ use Doctrine\ORM\EntityManagerInterface; use PhpList\Core\Domain\Messaging\Model\Message; +use PhpList\Core\Domain\Messaging\Model\UserMessage; +use PhpList\Core\Domain\Messaging\Model\Message\UserMessageStatus; +use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus; +use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; +use PhpList\Core\Domain\Messaging\Service\Handler\RequeueHandler; +use PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer; +use PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider; +use PhpList\Core\Domain\Subscription\Model\Subscriber; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Output\OutputInterface; -use Symfony\Component\Mailer\MailerInterface; -use Symfony\Component\Mime\Email; use Throwable; +/** + * @SuppressWarnings(PHPMD.CouplingBetweenObjects) + */ class CampaignProcessor { - private MailerInterface $mailer; + private RateLimitedCampaignMailer $mailer; private EntityManagerInterface $entityManager; private SubscriberProvider $subscriberProvider; private MessageProcessingPreparator $messagePreparator; private LoggerInterface $logger; + private UserMessageRepository $userMessageRepository; + private MaxProcessTimeLimiter $timeLimiter; + private RequeueHandler $requeueHandler; public function __construct( - MailerInterface $mailer, + RateLimitedCampaignMailer $mailer, EntityManagerInterface $entityManager, SubscriberProvider $subscriberProvider, MessageProcessingPreparator $messagePreparator, LoggerInterface $logger, + UserMessageRepository $userMessageRepository, + MaxProcessTimeLimiter $timeLimiter, + RequeueHandler $requeueHandler ) { $this->mailer = $mailer; $this->entityManager = $entityManager; $this->subscriberProvider = $subscriberProvider; $this->messagePreparator = $messagePreparator; $this->logger = $logger; + $this->userMessageRepository = $userMessageRepository; + $this->timeLimiter = $timeLimiter; + $this->requeueHandler = $requeueHandler; } public function process(Message $campaign, ?OutputInterface $output = null): void { + $this->updateMessageStatus($campaign, MessageStatus::Prepared); $subscribers = $this->subscriberProvider->getSubscribersForMessage($campaign); - // phpcs:ignore Generic.Commenting.Todo - // @todo check $ISPrestrictions logic + + $this->updateMessageStatus($campaign, MessageStatus::InProcess); + + $this->timeLimiter->start(); + $stoppedEarly = false; + foreach ($subscribers as $subscriber) { + if ($this->timeLimiter->shouldStop($output)) { + $stoppedEarly = true; + break; + } + + $existing = $this->userMessageRepository->findOneByUserAndMessage($subscriber, $campaign); + if ($existing && $existing->getStatus() !== UserMessageStatus::Todo) { + continue; + } + + $userMessage = $existing ?? new UserMessage($subscriber, $campaign); + $userMessage->setStatus(UserMessageStatus::Active); + $this->userMessageRepository->save($userMessage); + if (!filter_var($subscriber->getEmail(), FILTER_VALIDATE_EMAIL)) { + $this->updateUserMessageStatus($userMessage, UserMessageStatus::InvalidEmailAddress); + $this->unconfirmSubscriber($subscriber); + $output?->writeln('Invalid email, marking unconfirmed: ' . $subscriber->getEmail()); continue; } - $this->messagePreparator->processMessageLinks($campaign, $subscriber->getId()); - $email = (new Email()) - ->from('news@example.com') - ->to($subscriber->getEmail()) - ->subject($campaign->getContent()->getSubject()) - ->text($campaign->getContent()->getTextMessage()) - ->html($campaign->getContent()->getText()); + + $processed = $this->messagePreparator->processMessageLinks($campaign, $subscriber->getId()); try { + $email = $this->mailer->composeEmail($processed, $subscriber); $this->mailer->send($email); - - // phpcs:ignore Generic.Commenting.Todo - // @todo log somewhere that this subscriber got email + $this->updateUserMessageStatus($userMessage, UserMessageStatus::Sent); } catch (Throwable $e) { + $this->updateUserMessageStatus($userMessage, UserMessageStatus::NotSent); $this->logger->error($e->getMessage(), [ 'subscriber_id' => $subscriber->getId(), 'campaign_id' => $campaign->getId(), ]); $output?->writeln('Failed to send to: ' . $subscriber->getEmail()); } + } - usleep(100000); + if ($stoppedEarly && $this->requeueHandler->handle($campaign, $output)) { + return; } - $campaign->getMetadata()->setStatus('sent'); + $this->updateMessageStatus($campaign, MessageStatus::Sent); + } + + private function unconfirmSubscriber(Subscriber $subscriber): void + { + if ($subscriber->isConfirmed()) { + $subscriber->setConfirmed(false); + $this->entityManager->flush(); + } + } + + private function updateMessageStatus(Message $message, MessageStatus $status): void + { + $message->getMetadata()->setStatus($status); + $this->entityManager->flush(); + } + + private function updateUserMessageStatus(UserMessage $userMessage, UserMessageStatus $status): void + { + $userMessage->setStatus($status); $this->entityManager->flush(); } } diff --git a/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php b/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php new file mode 100644 index 00000000..7691f970 --- /dev/null +++ b/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php @@ -0,0 +1,50 @@ +mailer = $mailer; + $this->limiter = $limiter; + } + + public function composeEmail(Message $processed, Subscriber $subscriber): Email + { + $email = new Email(); + if ($processed->getOptions()->getFromField() !== '') { + $email->from($processed->getOptions()->getFromField()); + } + + if ($processed->getOptions()->getReplyTo() !== '') { + $email->replyTo($processed->getOptions()->getReplyTo()); + } + + return $email + ->to($subscriber->getEmail()) + ->subject($processed->getContent()->getSubject()) + ->text($processed->getContent()->getTextMessage()) + ->html($processed->getContent()->getText()); + } + + /** + * @throws TransportExceptionInterface + */ + public function send(Email $email): void + { + $this->limiter->awaitTurn(); + $this->mailer->send($email); + $this->limiter->afterSend(); + } +} diff --git a/src/Domain/Messaging/Service/SendRateLimiter.php b/src/Domain/Messaging/Service/SendRateLimiter.php new file mode 100644 index 00000000..378b80d5 --- /dev/null +++ b/src/Domain/Messaging/Service/SendRateLimiter.php @@ -0,0 +1,103 @@ +initializeLimits(); + } + + private function initializeLimits(): void + { + $isp = $this->ispRestrictionsProvider->load(); + + $cfgBatch = $this->mailqueueBatchSize ?? 0; + $ispMax = isset($isp->maxBatch) ? (int)$isp->maxBatch : null; + + $cfgPeriod = $this->mailqueueBatchPeriod ?? 0; + $ispMinPeriod = $isp->minBatchPeriod ?? 0; + + $cfgThrottle = $this->mailqueueThrottle ?? 0; + $ispMinThrottle = (int)($isp->minThrottle ?? 0); + + if ($cfgBatch <= 0) { + $this->batchSize = $ispMax !== null ? max(0, $ispMax) : 0; + } else { + $this->batchSize = $ispMax !== null ? min($cfgBatch, max(1, $ispMax)) : $cfgBatch; + } + $this->batchPeriod = max(0, $cfgPeriod, $ispMinPeriod); + $this->throttleSec = max(0, $cfgThrottle, $ispMinThrottle); + + $this->sentInBatch = 0; + $this->batchStart = microtime(true); + $this->initializedFromHistory = false; + } + + /** + * Call before attempting to send another message. It will sleep if needed to + * respect batch limits. Returns true when it's okay to proceed. + */ + public function awaitTurn(?OutputInterface $output = null): bool + { + if (!$this->initializedFromHistory && $this->batchSize > 0 && $this->batchPeriod > 0) { + $since = (new DateTimeImmutable())->sub(new DateInterval('PT' . $this->batchPeriod . 'S')); + $alreadySent = $this->userMessageRepository->countSentSince($since); + $this->sentInBatch = max($this->sentInBatch, $alreadySent); + $this->initializedFromHistory = true; + } + + if ($this->batchSize > 0 && $this->batchPeriod > 0 && $this->sentInBatch >= $this->batchSize) { + $elapsed = microtime(true) - $this->batchStart; + $remaining = (int)ceil($this->batchPeriod - $elapsed); + if ($remaining > 0) { + $output?->writeln(sprintf( + 'Batch limit reached, sleeping %ds to respect MAILQUEUE_BATCH_PERIOD', + $remaining + )); + sleep($remaining); + } + $this->batchStart = microtime(true); + $this->sentInBatch = 0; + $this->initializedFromHistory = false; + } + + return true; + } + + /** + * Call after a successful sending to update counters and apply per-message throttle. + */ + public function afterSend(): void + { + $this->sentInBatch++; + if ($this->throttleSec > 0) { + sleep($this->throttleSec); + } + } +} diff --git a/src/Domain/Subscription/Service/Provider/SubscriberProvider.php b/src/Domain/Subscription/Service/Provider/SubscriberProvider.php index 5ec6b177..72b473be 100644 --- a/src/Domain/Subscription/Service/Provider/SubscriberProvider.php +++ b/src/Domain/Subscription/Service/Provider/SubscriberProvider.php @@ -36,7 +36,7 @@ public function getSubscribersForMessage(Message $message): array foreach ($lists as $list) { $listSubscribers = $this->subscriberRepository->getSubscribersBySubscribedListId($list->getId()); foreach ($listSubscribers as $subscriber) { - $subscribers[$subscriber->getId()] = $subscriber; + $subscribers[$subscriber->getEmail()] = $subscriber; } } diff --git a/tests/Integration/Domain/Messaging/Repository/MessageRepositoryTest.php b/tests/Integration/Domain/Messaging/Repository/MessageRepositoryTest.php index d7435815..1bd98735 100644 --- a/tests/Integration/Domain/Messaging/Repository/MessageRepositoryTest.php +++ b/tests/Integration/Domain/Messaging/Repository/MessageRepositoryTest.php @@ -48,7 +48,7 @@ public function testMessageIsPersistedAndFetchedCorrectly(): void $message = new Message( new MessageFormat(true, 'text'), new MessageSchedule(1, null, 3, null, null), - new MessageMetadata('done'), + new MessageMetadata(Message\MessageStatus::Sent), new MessageContent('Hello world!'), new MessageOptions(), $admin @@ -62,7 +62,7 @@ public function testMessageIsPersistedAndFetchedCorrectly(): void self::assertCount(1, $foundMessages); self::assertInstanceOf(Message::class, $foundMessages[0]); - self::assertSame('done', $foundMessages[0]->getMetadata()->getStatus()); + self::assertSame(Message\MessageStatus::Sent, $foundMessages[0]->getMetadata()->getStatus()); self::assertSame('Hello world!', $foundMessages[0]->getContent()->getSubject()); } @@ -77,7 +77,7 @@ public function testGetByOwnerIdReturnsOnlyOwnedMessages(): void $msg1 = new Message( new MessageFormat(true, MessageFormat::FORMAT_TEXT), new MessageSchedule(1, null, 3, null, null), - new MessageMetadata('done'), + new MessageMetadata(Message\MessageStatus::Sent), new MessageContent('Owned by Admin 1!'), new MessageOptions(), $admin1 diff --git a/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php b/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php index b3bfda0c..9019fd30 100644 --- a/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php +++ b/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php @@ -58,7 +58,7 @@ public function testDeleteSubscriberWithRelatedDataDoesNotThrowDoctrineError(): $msg = new Message( format: new MessageFormat(true, MessageFormat::FORMAT_TEXT), schedule: new MessageSchedule(1, null, 3, null, null), - metadata: new MessageMetadata('done'), + metadata: new MessageMetadata(Message\MessageStatus::Sent), content: new MessageContent('Owned by Admin 1!'), options: new MessageOptions(), owner: $admin @@ -92,7 +92,7 @@ public function testDeleteSubscriberWithRelatedDataDoesNotThrowDoctrineError(): $this->entityManager->persist($linkTrackUmlClick); $userMessage = new UserMessage($subscriber, $msg); - $userMessage->setStatus('sent'); + $userMessage->setStatus(Message\UserMessageStatus::Sent); $this->entityManager->persist($userMessage); $userMessageBounce = new UserMessageBounce(1, new DateTime()); diff --git a/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php b/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php index 79ece9bd..d76f63c0 100644 --- a/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php +++ b/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php @@ -5,6 +5,7 @@ namespace PhpList\Core\Tests\Unit\Domain\Messaging\Command; use Exception; +use PhpList\Core\Domain\Configuration\Service\Manager\ConfigManager; use PhpList\Core\Domain\Messaging\Command\ProcessQueueCommand; use PhpList\Core\Domain\Messaging\Model\Message; use PhpList\Core\Domain\Messaging\Repository\MessageRepository; @@ -41,7 +42,8 @@ protected function setUp(): void $this->messageRepository, $lockFactory, $this->messageProcessingPreparator, - $this->campaignProcessor + $this->campaignProcessor, + $this->createMock(ConfigManager::class), ); $application = new Application(); @@ -82,8 +84,8 @@ public function testExecuteWithNoCampaigns(): void ->method('ensureCampaignsHaveUuid'); $this->messageRepository->expects($this->once()) - ->method('findBy') - ->with(['status' => 'submitted']) + ->method('getByStatusAndEmbargo') + ->with($this->anything(), $this->anything()) ->willReturn([]); $this->campaignProcessor->expects($this->never()) @@ -112,8 +114,8 @@ public function testExecuteWithCampaigns(): void $campaign = $this->createMock(Message::class); $this->messageRepository->expects($this->once()) - ->method('findBy') - ->with(['status' => 'submitted']) + ->method('getByStatusAndEmbargo') + ->with($this->anything(), $this->anything()) ->willReturn([$campaign]); $this->campaignProcessor->expects($this->once()) @@ -145,8 +147,8 @@ public function testExecuteWithMultipleCampaigns(): void $campaign2 = $this->createMock(Message::class); $this->messageRepository->expects($this->once()) - ->method('findBy') - ->with(['status' => 'submitted']) + ->method('getByStatusAndEmbargo') + ->with($this->anything(), $this->anything()) ->willReturn([$campaign1, $campaign2]); $this->campaignProcessor->expects($this->exactly(2)) @@ -179,8 +181,8 @@ public function testExecuteWithProcessorException(): void $campaign = $this->createMock(Message::class); $this->messageRepository->expects($this->once()) - ->method('findBy') - ->with(['status' => 'submitted']) + ->method('getByStatusAndEmbargo') + ->with($this->anything(), $this->anything()) ->willReturn([$campaign]); $this->campaignProcessor->expects($this->once()) diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageBuilderTest.php index 564bd34d..d99d041a 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageBuilderTest.php @@ -2,9 +2,8 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; -use Error; use InvalidArgumentException; use PhpList\Core\Domain\Identity\Model\Administrator; use PhpList\Core\Domain\Messaging\Model\Dto\CreateMessageDto; @@ -64,7 +63,7 @@ private function createRequest(): CreateMessageDto formatOptions: [] ), metadata: new MessageMetadataDto( - status: 'draft' + status: Message\MessageStatus::Draft ), options: new MessageOptionsDto( fromField: '', @@ -117,16 +116,6 @@ public function testBuildsNewMessage(): void $this->builder->build($request, $context); } - public function testThrowsExceptionOnInvalidRequest(): void - { - $this->expectException(Error::class); - - $this->builder->build( - $this->createMock(CreateMessageDto::class), - new MessageContext($this->createMock(Administrator::class)) - ); - } - public function testThrowsExceptionOnInvalidContext(): void { $this->expectException(InvalidArgumentException::class); diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageContentBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageContentBuilderTest.php index 2b1aa771..21f90692 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageContentBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageContentBuilderTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; use InvalidArgumentException; use PhpList\Core\Domain\Messaging\Model\Dto\Message\MessageContentDto; diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageFormatBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageFormatBuilderTest.php index 8d9320a0..1bd576f5 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageFormatBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageFormatBuilderTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; use InvalidArgumentException; use PhpList\Core\Domain\Messaging\Model\Dto\Message\MessageFormatDto; diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageOptionsBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageOptionsBuilderTest.php index c6795d29..754177a2 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageOptionsBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageOptionsBuilderTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; use InvalidArgumentException; use PhpList\Core\Domain\Messaging\Model\Dto\Message\MessageOptionsDto; diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageScheduleBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageScheduleBuilderTest.php index 38f04338..25a89052 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageScheduleBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageScheduleBuilderTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; use DateTime; use InvalidArgumentException; diff --git a/tests/Unit/Domain/Messaging/Service/Handler/RequeueHandlerTest.php b/tests/Unit/Domain/Messaging/Service/Handler/RequeueHandlerTest.php new file mode 100644 index 00000000..5bfb1114 --- /dev/null +++ b/tests/Unit/Domain/Messaging/Service/Handler/RequeueHandlerTest.php @@ -0,0 +1,155 @@ +logger = $this->createMock(LoggerInterface::class); + $this->em = $this->createMock(EntityManagerInterface::class); + $this->output = $this->createMock(OutputInterface::class); + } + + private function createMessage( + ?int $requeueInterval, + ?DateTime $requeueUntil, + ?DateTime $embargo + ): Message { + $format = new MessageFormat(htmlFormatted: false, sendFormat: null); + $schedule = new MessageSchedule( + repeatInterval: null, + repeatUntil: null, + requeueInterval: $requeueInterval, + requeueUntil: $requeueUntil, + embargo: $embargo + ); + $metadata = new MessageMetadata(MessageStatus::Draft); + $content = new MessageContent('(no subject)'); + $options = new MessageOptions(); + + return new Message($format, $schedule, $metadata, $content, $options, owner: null, template: null); + } + + public function testReturnsFalseWhenIntervalIsZeroOrNegative(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $message = $this->createMessage(0, null, null); + + $this->em->expects($this->never())->method('flush'); + $this->output->expects($this->never())->method('writeln'); + $this->logger->expects($this->never())->method('info'); + + $result = $handler->handle($message, $this->output); + + $this->assertFalse($result); + $this->assertSame(MessageStatus::Draft, $message->getMetadata()->getStatus()); + } + + public function testReturnsFalseWhenNowIsAfterRequeueUntil(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $past = (new DateTime())->sub(new DateInterval('PT5M')); + $message = $this->createMessage(5, $past, null); + + $this->em->expects($this->never())->method('flush'); + $this->logger->expects($this->never())->method('info'); + + $result = $handler->handle($message, $this->output); + + $this->assertFalse($result); + $this->assertSame(MessageStatus::Draft, $message->getMetadata()->getStatus()); + } + + public function testRequeuesFromFutureEmbargoAndSetsSubmittedStatus(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $embargo = (new DateTime())->add(new DateInterval('PT5M')); + $interval = 10; + $message = $this->createMessage($interval, null, $embargo); + + $this->em->expects($this->once())->method('flush'); + $this->output->expects($this->once())->method('writeln'); + $this->logger->expects($this->once())->method('info'); + + $result = $handler->handle($message, $this->output); + + $this->assertTrue($result); + $this->assertSame(MessageStatus::Submitted, $message->getMetadata()->getStatus()); + + $expectedNext = (clone $embargo)->add(new DateInterval('PT' . $interval . 'M')); + $actualNext = $message->getSchedule()->getEmbargo(); + $this->assertInstanceOf(DateTime::class, $actualNext); + $this->assertEquals($expectedNext->format(DateTime::ATOM), $actualNext->format(DateTime::ATOM)); + } + + public function testRequeuesFromNowWhenEmbargoIsNullOrPast(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $interval = 3; + $message = $this->createMessage($interval, null, null); + + $this->em->expects($this->once())->method('flush'); + $this->logger->expects($this->once())->method('info'); + + $before = new DateTime(); + $result = $handler->handle($message, $this->output); + $after = new DateTime(); + + $this->assertTrue($result); + $this->assertSame(MessageStatus::Submitted, $message->getMetadata()->getStatus()); + + $embargo = $message->getSchedule()->getEmbargo(); + $this->assertInstanceOf(DateTime::class, $embargo); + + $minExpected = (clone $before)->add(new DateInterval('PT' . $interval . 'M')); + $maxExpected = (clone $after)->add(new DateInterval('PT' . $interval . 'M')); + + $this->assertGreaterThanOrEqual($minExpected->getTimestamp(), $embargo->getTimestamp()); + $this->assertLessThanOrEqual($maxExpected->getTimestamp(), $embargo->getTimestamp()); + } + + public function testReturnsFalseWhenNextEmbargoExceedsUntil(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $embargo = (new DateTime())->add(new DateInterval('PT1M')); + $interval = 10; + // next would be +10, which exceeds until + $until = (clone $embargo)->add(new DateInterval('PT5M')); + $message = $this->createMessage($interval, $until, $embargo); + + $this->em->expects($this->never())->method('flush'); + $this->logger->expects($this->never())->method('info'); + + $result = $handler->handle($message, $this->output); + + $this->assertFalse($result); + $this->assertSame(MessageStatus::Draft, $message->getMetadata()->getStatus()); + $this->assertEquals( + $embargo->format(DateTime::ATOM), + $message->getSchedule()->getEmbargo()?->format(DateTime::ATOM) + ); + } +} diff --git a/tests/Unit/Domain/Messaging/Service/Manager/MessageManagerTest.php b/tests/Unit/Domain/Messaging/Service/Manager/MessageManagerTest.php index aa1a47e0..94064485 100644 --- a/tests/Unit/Domain/Messaging/Service/Manager/MessageManagerTest.php +++ b/tests/Unit/Domain/Messaging/Service/Manager/MessageManagerTest.php @@ -13,6 +13,7 @@ use PhpList\Core\Domain\Messaging\Model\Dto\Message\MessageScheduleDto; use PhpList\Core\Domain\Messaging\Model\Dto\UpdateMessageDto; use PhpList\Core\Domain\Messaging\Model\Message; +use PhpList\Core\Domain\Messaging\Model\Message\MessageContent; use PhpList\Core\Domain\Messaging\Repository\MessageRepository; use PhpList\Core\Domain\Messaging\Service\Builder\MessageBuilder; use PhpList\Core\Domain\Messaging\Service\Manager\MessageManager; @@ -34,7 +35,7 @@ public function testCreateMessageReturnsPersistedMessage(): void requeueInterval: 60 * 12, requeueUntil: '2025-04-20T00:00:00+00:00', ); - $metadata = new MessageMetadataDto('draft'); + $metadata = new MessageMetadataDto(Message\MessageStatus::Draft); $content = new MessageContentDto('Subject', 'Full text', 'Short text', 'Footer'); $options = new MessageOptionsDto('from@example.com', 'to@example.com', 'reply@example.com', 'all-users'); @@ -50,11 +51,11 @@ public function testCreateMessageReturnsPersistedMessage(): void $authUser = $this->createMock(Administrator::class); $expectedMessage = $this->createMock(Message::class); - $expectedContent = $this->createMock(\PhpList\Core\Domain\Messaging\Model\Message\MessageContent::class); + $expectedContent = $this->createMock(MessageContent::class); $expectedMetadata = $this->createMock(Message\MessageMetadata::class); $expectedContent->method('getSubject')->willReturn('Subject'); - $expectedMetadata->method('getStatus')->willReturn('draft'); + $expectedMetadata->method('getStatus')->willReturn(Message\MessageStatus::Draft); $expectedMessage->method('getContent')->willReturn($expectedContent); $expectedMessage->method('getMetadata')->willReturn($expectedMetadata); @@ -71,7 +72,7 @@ public function testCreateMessageReturnsPersistedMessage(): void $message = $manager->createMessage($request, $authUser); $this->assertSame('Subject', $message->getContent()->getSubject()); - $this->assertSame('draft', $message->getMetadata()->getStatus()); + $this->assertSame(Message\MessageStatus::Draft, $message->getMetadata()->getStatus()); } public function testUpdateMessageReturnsUpdatedMessage(): void @@ -88,7 +89,7 @@ public function testUpdateMessageReturnsUpdatedMessage(): void requeueInterval: 0, requeueUntil: '2025-04-20T00:00:00+00:00', ); - $metadata = new MessageMetadataDto('draft'); + $metadata = new MessageMetadataDto(Message\MessageStatus::Draft); $content = new MessageContentDto( 'Updated Subject', 'Updated Full text', @@ -115,11 +116,11 @@ public function testUpdateMessageReturnsUpdatedMessage(): void $authUser = $this->createMock(Administrator::class); $existingMessage = $this->createMock(Message::class); - $expectedContent = $this->createMock(\PhpList\Core\Domain\Messaging\Model\Message\MessageContent::class); + $expectedContent = $this->createMock(MessageContent::class); $expectedMetadata = $this->createMock(Message\MessageMetadata::class); $expectedContent->method('getSubject')->willReturn('Updated Subject'); - $expectedMetadata->method('getStatus')->willReturn('draft'); + $expectedMetadata->method('getStatus')->willReturn(Message\MessageStatus::Draft); $existingMessage->method('getContent')->willReturn($expectedContent); $existingMessage->method('getMetadata')->willReturn($expectedMetadata); @@ -136,6 +137,6 @@ public function testUpdateMessageReturnsUpdatedMessage(): void $message = $manager->updateMessage($updateRequest, $existingMessage, $authUser); $this->assertSame('Updated Subject', $message->getContent()->getSubject()); - $this->assertSame('draft', $message->getMetadata()->getStatus()); + $this->assertSame(Message\MessageStatus::Draft, $message->getMetadata()->getStatus()); } } diff --git a/tests/Unit/Domain/Messaging/Service/MaxProcessTimeLimiterTest.php b/tests/Unit/Domain/Messaging/Service/MaxProcessTimeLimiterTest.php new file mode 100644 index 00000000..5944ca3e --- /dev/null +++ b/tests/Unit/Domain/Messaging/Service/MaxProcessTimeLimiterTest.php @@ -0,0 +1,53 @@ +logger = $this->createMock(LoggerInterface::class); + } + + public function testShouldNotStopWhenMaxSecondsIsZero(): void + { + $limiter = new MaxProcessTimeLimiter(logger: $this->logger, maxSeconds: 0); + + $output = $this->createMock(OutputInterface::class); + $output->expects($this->never())->method('writeln'); + $this->logger->expects($this->never())->method('warning'); + + $limiter->start(); + usleep(200_000); + $this->assertFalse($limiter->shouldStop($output)); + } + + public function testShouldStopAfterThresholdAndLogAndOutput(): void + { + $limiter = new MaxProcessTimeLimiter(logger: $this->logger, maxSeconds: 1); + + $output = $this->createMock(OutputInterface::class); + $output->expects($this->once()) + ->method('writeln') + ->with('Reached max processing time; stopping cleanly.'); + + $this->logger->expects($this->once()) + ->method('warning') + ->with($this->stringContains('Reached max processing time of 1 seconds')); + + $this->assertFalse($limiter->shouldStop($output)); + + usleep(1_200_000); + $this->assertTrue($limiter->shouldStop($output)); + } +} diff --git a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php index b2c51c71..26aec09f 100644 --- a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php +++ b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php @@ -9,42 +9,50 @@ use PhpList\Core\Domain\Messaging\Model\Message; use PhpList\Core\Domain\Messaging\Model\Message\MessageContent; use PhpList\Core\Domain\Messaging\Model\Message\MessageMetadata; +use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; +use PhpList\Core\Domain\Messaging\Service\Handler\RequeueHandler; +use PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Messaging\Service\Processor\CampaignProcessor; +use PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer; use PhpList\Core\Domain\Subscription\Model\Subscriber; use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Output\OutputInterface; -use Symfony\Component\Mailer\MailerInterface; use Symfony\Component\Mime\Email; class CampaignProcessorTest extends TestCase { - private MailerInterface&MockObject $mailer; - private EntityManagerInterface&MockObject $entityManager; - private SubscriberProvider&MockObject $subscriberProvider; - private MessageProcessingPreparator&MockObject $messagePreparator; - private LoggerInterface&MockObject $logger; - private OutputInterface&MockObject $output; + private RateLimitedCampaignMailer|MockObject $mailer; + private EntityManagerInterface|MockObject $entityManager; + private SubscriberProvider|MockObject $subscriberProvider; + private MessageProcessingPreparator|MockObject $messagePreparator; + private LoggerInterface|MockObject $logger; + private OutputInterface|MockObject $output; private CampaignProcessor $campaignProcessor; + private UserMessageRepository|MockObject $userMessageRepository; protected function setUp(): void { - $this->mailer = $this->createMock(MailerInterface::class); + $this->mailer = $this->createMock(RateLimitedCampaignMailer::class); $this->entityManager = $this->createMock(EntityManagerInterface::class); $this->subscriberProvider = $this->createMock(SubscriberProvider::class); $this->messagePreparator = $this->createMock(MessageProcessingPreparator::class); $this->logger = $this->createMock(LoggerInterface::class); $this->output = $this->createMock(OutputInterface::class); + $this->userMessageRepository = $this->createMock(UserMessageRepository::class); $this->campaignProcessor = new CampaignProcessor( - $this->mailer, - $this->entityManager, - $this->subscriberProvider, - $this->messagePreparator, - $this->logger + mailer: $this->mailer, + entityManager: $this->entityManager, + subscriberProvider: $this->subscriberProvider, + messagePreparator: $this->messagePreparator, + logger: $this->logger, + userMessageRepository: $this->userMessageRepository, + timeLimiter: $this->createMock(MaxProcessTimeLimiter::class), + requeueHandler: $this->createMock(RequeueHandler::class), ); } @@ -59,11 +67,10 @@ public function testProcessWithNoSubscribers(): void ->with($campaign) ->willReturn([]); - $metadata->expects($this->once()) - ->method('setStatus') - ->with('sent'); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->mailer->expects($this->never()) @@ -87,11 +94,10 @@ public function testProcessWithInvalidSubscriberEmail(): void ->with($campaign) ->willReturn([$subscriber]); - $metadata->expects($this->once()) - ->method('setStatus') - ->with('sent'); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->messagePreparator->expects($this->never()) @@ -123,22 +129,28 @@ public function testProcessWithValidSubscriberEmail(): void ->with($campaign, 1) ->willReturn($campaign); + $this->mailer->expects($this->once()) + ->method('composeEmail') + ->with($campaign, $subscriber) + ->willReturnCallback(function ($processed, $sub) use ($campaign, $subscriber) { + $this->assertSame($campaign, $processed); + $this->assertSame($subscriber, $sub); + return (new Email()) + ->from('news@example.com') + ->to('test@example.com') + ->subject('Test Subject') + ->text('Test text message') + ->html('

Test HTML message

'); + }); + $this->mailer->expects($this->once()) ->method('send') - ->with($this->callback(function (Email $email) { - $this->assertEquals('test@example.com', $email->getTo()[0]->getAddress()); - $this->assertEquals('news@example.com', $email->getFrom()[0]->getAddress()); - $this->assertEquals('Test Subject', $email->getSubject()); - $this->assertEquals('Test text message', $email->getTextBody()); - $this->assertEquals('

Test HTML message

', $email->getHtmlBody()); - return true; - })); - - $metadata->expects($this->once()) - ->method('setStatus') - ->with('sent'); - - $this->entityManager->expects($this->once()) + ->with($this->isInstanceOf(Email::class)); + + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); + + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->campaignProcessor->process($campaign, $this->output); @@ -181,11 +193,10 @@ public function testProcessWithMailerException(): void ->method('writeln') ->with('Failed to send to: test@example.com'); - $metadata->expects($this->once()) - ->method('setStatus') - ->with('sent'); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->campaignProcessor->process($campaign, $this->output); @@ -221,11 +232,10 @@ public function testProcessWithMultipleSubscribers(): void $this->mailer->expects($this->exactly(2)) ->method('send'); - $metadata->expects($this->once()) - ->method('setStatus') - ->with('sent'); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->campaignProcessor->process($campaign, $this->output); @@ -264,11 +274,10 @@ public function testProcessWithNullOutput(): void 'campaign_id' => 123, ]); - $metadata->expects($this->once()) - ->method('setStatus') - ->with('sent'); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->campaignProcessor->process($campaign, null); @@ -277,7 +286,7 @@ public function testProcessWithNullOutput(): void /** * Creates a mock for the Message class with content */ - private function createCampaignMock(): Message&MockObject + private function createCampaignMock(): Message|MockObject { $campaign = $this->createMock(Message::class); $content = $this->createMock(MessageContent::class); diff --git a/tests/Unit/Domain/Messaging/Service/RateLimitedCampaignMailerTest.php b/tests/Unit/Domain/Messaging/Service/RateLimitedCampaignMailerTest.php new file mode 100644 index 00000000..6e2011ff --- /dev/null +++ b/tests/Unit/Domain/Messaging/Service/RateLimitedCampaignMailerTest.php @@ -0,0 +1,134 @@ +mailer = $this->createMock(MailerInterface::class); + $this->limiter = $this->createMock(SendRateLimiter::class); + $this->sut = new RateLimitedCampaignMailer($this->mailer, $this->limiter); + } + + public function testComposeEmailSetsHeadersAndBody(): void + { + $message = $this->buildMessage( + subject: 'Subject', + textBody: 'Plain text', + htmlBody: '

HTML

', + from: 'from@example.com', + replyTo: 'reply@example.com' + ); + + $subscriber = new Subscriber(); + $this->setSubscriberEmail($subscriber, 'user@example.com'); + + $email = $this->sut->composeEmail($message, $subscriber); + + $this->assertInstanceOf(Email::class, $email); + $this->assertSame('user@example.com', $email->getTo()[0]->getAddress()); + $this->assertSame('Subject', $email->getSubject()); + $this->assertSame('from@example.com', $email->getFrom()[0]->getAddress()); + $this->assertSame('reply@example.com', $email->getReplyTo()[0]->getAddress()); + $this->assertSame('Plain text', $email->getTextBody()); + $this->assertSame('

HTML

', $email->getHtmlBody()); + } + + public function testComposeEmailWithoutOptionalHeaders(): void + { + $message = $this->buildMessage( + subject: 'No headers', + textBody: 'text', + htmlBody: 'h', + from: '', + replyTo: '' + ); + + $subscriber = new Subscriber(); + $this->setSubscriberEmail($subscriber, 'user2@example.com'); + + $email = $this->sut->composeEmail($message, $subscriber); + + $this->assertSame('user2@example.com', $email->getTo()[0]->getAddress()); + $this->assertSame('No headers', $email->getSubject()); + $this->assertSame([], $email->getFrom()); + $this->assertSame([], $email->getReplyTo()); + } + + public function testSendUsesLimiterAroundMailer(): void + { + $email = (new Email())->to('someone@example.com'); + + $this->limiter->expects($this->once())->method('awaitTurn'); + $this->mailer + ->expects($this->once()) + ->method('send') + ->with($this->isInstanceOf(Email::class)); + $this->limiter->expects($this->once())->method('afterSend'); + + $this->sut->send($email); + } + + private function buildMessage( + string $subject, + string $textBody, + string $htmlBody, + string $from, + string $replyTo + ): Message { + $content = new MessageContent( + subject: $subject, + text: $htmlBody, + textMessage: $textBody, + footer: null, + ); + $format = new MessageFormat( + htmlFormatted: true, + sendFormat: MessageFormat::FORMAT_HTML, + formatOptions: [MessageFormat::FORMAT_HTML] + ); + $schedule = new MessageSchedule( + repeatInterval: 0, + repeatUntil: null, + requeueInterval: 0, + requeueUntil: null, + embargo: null + ); + $metadata = new MessageMetadata(); + $options = new MessageOptions(fromField: $from, toField: '', replyTo: $replyTo); + + return new Message($format, $schedule, $metadata, $content, $options, null, null); + } + + /** + * Subscriber has no public setter for email, so we use reflection. + */ + private function setSubscriberEmail(Subscriber $subscriber, string $email): void + { + $ref = new ReflectionProperty($subscriber, 'email'); + $ref->setValue($subscriber, $email); + } +} diff --git a/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php b/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php new file mode 100644 index 00000000..e9ba27c0 --- /dev/null +++ b/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php @@ -0,0 +1,90 @@ +ispProvider = $this->createMock(IspRestrictionsProvider::class); + } + + public function testInitializesLimitsFromConfigOnly(): void + { + $this->ispProvider->method('load')->willReturn(new IspRestrictions(null, null, null)); + $limiter = new SendRateLimiter( + ispRestrictionsProvider: $this->ispProvider, + userMessageRepository: $this->createMock(UserMessageRepository::class), + mailqueueBatchSize: 5, + mailqueueBatchPeriod: 10, + mailqueueThrottle: 2 + ); + + $output = $this->createMock(OutputInterface::class); + $output->expects($this->never())->method('writeln'); + + $this->assertTrue($limiter->awaitTurn($output)); + } + + public function testBatchLimitTriggersWaitMessageAndResetsCounters(): void + { + $this->ispProvider->method('load')->willReturn(new IspRestrictions(2, 1, null)); + $limiter = new SendRateLimiter( + ispRestrictionsProvider: $this->ispProvider, + userMessageRepository: $this->createMock(UserMessageRepository::class), + mailqueueBatchSize: 10, + mailqueueBatchPeriod: 1, + mailqueueThrottle: 0 + ); + + $limiter->afterSend(); + $limiter->afterSend(); + + $output = $this->createMock(OutputInterface::class); + // We cannot reliably assert the exact second, but we assert a message called at least once + $output->expects($this->atLeast(0))->method('writeln'); + + // Now awaitTurn should detect batch full and attempt to sleep and reset. + $this->assertTrue($limiter->awaitTurn($output)); + + // Next afterSend should increase the counter again without exception + $limiter->afterSend(); + // Reaching here means no fatal due to internal counter/reset logic + $this->assertTrue(true); + } + + public function testThrottleSleepsPerMessagePathIsCallable(): void + { + $this->ispProvider->method('load')->willReturn(new IspRestrictions(null, null, null)); + $limiter = new SendRateLimiter( + ispRestrictionsProvider: $this->ispProvider, + userMessageRepository: $this->createMock(UserMessageRepository::class), + mailqueueBatchSize: 0, + mailqueueBatchPeriod: 0, + mailqueueThrottle: 1 + ); + + // We cannot speed up sleep without extensions; just call method to ensure no exceptions + $start = microtime(true); + $limiter->afterSend(); + $elapsed = microtime(true) - $start; + + // Ensure it likely slept at least ~0.5s + if ($elapsed < 0.3) { + $this->markTestIncomplete('Environment too fast to detect sleep; logic path executed.'); + } + $this->assertTrue(true); + } +} diff --git a/tests/Unit/Domain/Subscription/Service/Provider/SubscriberProviderTest.php b/tests/Unit/Domain/Subscription/Service/Provider/SubscriberProviderTest.php index 6adbde10..9efdeac2 100644 --- a/tests/Unit/Domain/Subscription/Service/Provider/SubscriberProviderTest.php +++ b/tests/Unit/Domain/Subscription/Service/Provider/SubscriberProviderTest.php @@ -26,8 +26,8 @@ protected function setUp(): void $this->subscriberListRepository = $this->createMock(SubscriberListRepository::class); $this->subscriberProvider = new SubscriberProvider( - $this->subscriberRepository, - $this->subscriberListRepository, + subscriberRepository: $this->subscriberRepository, + subscriberListRepository: $this->subscriberListRepository, ); } @@ -82,9 +82,9 @@ public function testGetSubscribersForMessageWithOneListAndSubscribersReturnsSubs ->willReturn([$subscriberList]); $subscriber1 = $this->createMock(Subscriber::class); - $subscriber1->method('getId')->willReturn(1); + $subscriber1->method('getEmail')->willReturn('test1@example.am'); $subscriber2 = $this->createMock(Subscriber::class); - $subscriber2->method('getId')->willReturn(2); + $subscriber2->method('getEmail')->willReturn('test2@exsmple.am'); $this->subscriberRepository ->expects($this->once()) @@ -114,11 +114,11 @@ public function testGetSubscribersForMessageWithMultipleListsReturnsUniqueSubscr ->willReturn([$subscriberList1, $subscriberList2]); $subscriber1 = $this->createMock(Subscriber::class); - $subscriber1->method('getId')->willReturn(1); + $subscriber1->method('getEmail')->willReturn('test1@example.am'); $subscriber2 = $this->createMock(Subscriber::class); - $subscriber2->method('getId')->willReturn(2); + $subscriber2->method('getEmail')->willReturn('test2@example.am'); $subscriber3 = $this->createMock(Subscriber::class); - $subscriber3->method('getId')->willReturn(3); + $subscriber3->method('getEmail')->willReturn('test3@example.am'); $this->subscriberRepository ->expects($this->exactly(2))