Skip to content

Commit fa42258

Browse files
TatevikGrtatevikg1
andauthored
Message processor (#359)
* MessageStatusEnum * Status validate * Embargo check * IspRestrictions * IspRestrictions * SendRateLimiter * UserMessageStatus * Refactor * RateLimitedCampaignMailer * RateLimitedCampaignMailerTest * Rate limit initialized from history * Check maintenance mode * Max processing time limiter --------- Co-authored-by: Tatevik <[email protected]>
1 parent 936cabc commit fa42258

38 files changed

+1207
-130
lines changed

config/parameters.yml.dist

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,16 @@ parameters:
6868

6969
graylog_host: 'graylog.example.com'
7070
graylog_port: 12201
71+
72+
app.phplist_isp_conf_path: '%%env(APP_PHPLIST_ISP_CONF_PATH)%%'
73+
env(APP_PHPLIST_ISP_CONF_PATH): '/etc/phplist.conf'
74+
75+
# Message sending
76+
messaging.mail_queue_batch_size: '%%env(MAILQUEUE_BATCH_SIZE)%%'
77+
env(MAILQUEUE_BATCH_SIZE): '5'
78+
messaging.mail_queue_period: '%%env(MAILQUEUE_BATCH_PERIOD)%%'
79+
env(MAILQUEUE_BATCH_PERIOD): '5'
80+
messaging.mail_queue_throttle: '%%env(MAILQUEUE_THROTTLE)%%'
81+
env(MAILQUEUE_THROTTLE): '5'
82+
messaging.max_process_time: '%%env(MESSAGING_MAX_PROCESS_TIME)%%'
83+
env(MESSAGING_MAX_PROCESS_TIME): '600'

config/services.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@ services:
77
autoconfigure: true
88
public: false
99

10-
PhpList\Core\Core\ConfigProvider:
11-
arguments:
12-
$config: '%app.config%'
13-
1410
PhpList\Core\Core\ApplicationStructure:
1511
public: true
1612

config/services/providers.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,13 @@ services:
22
PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider:
33
autowire: true
44
autoconfigure: true
5+
6+
PhpList\Core\Core\ConfigProvider:
7+
arguments:
8+
$config: '%app.config%'
9+
10+
PhpList\Core\Domain\Common\IspRestrictionsProvider:
11+
autowire: true
12+
autoconfigure: true
13+
arguments:
14+
$confPath: '%app.phplist_isp_conf_path%'

config/services/services.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ services:
3636
autoconfigure: true
3737
public: true
3838

39+
PhpList\Core\Domain\Messaging\Service\SendRateLimiter:
40+
autowire: true
41+
autoconfigure: true
42+
arguments:
43+
$mailqueueBatchSize: '%messaging.mail_queue_batch_size%'
44+
$mailqueueBatchPeriod: '%messaging.mail_queue_period%'
45+
$mailqueueThrottle: '%messaging.mail_queue_throttle%'
46+
3947
PhpList\Core\Domain\Common\ClientIpResolver:
4048
autowire: true
4149
autoconfigure: true
@@ -44,6 +52,10 @@ services:
4452
autowire: true
4553
autoconfigure: true
4654

55+
PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer:
56+
autowire: true
57+
autoconfigure: true
58+
4759
PhpList\Core\Domain\Messaging\Service\ConsecutiveBounceHandler:
4860
autowire: true
4961
autoconfigure: true
@@ -108,6 +120,13 @@ services:
108120
arguments:
109121
- !tagged_iterator { tag: 'phplist.bounce_action_handler' }
110122

123+
PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter:
124+
autowire: true
125+
autoconfigure: true
126+
arguments:
127+
$maxSeconds: '%messaging.max_process_time%'
128+
129+
111130
PhpList\Core\Domain\Identity\Service\PermissionChecker:
112131
autowire: true
113132
autoconfigure: true
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PhpList\Core\Domain\Common;
6+
7+
use PhpList\Core\Domain\Common\Model\IspRestrictions;
8+
use Psr\Log\LoggerInterface;
9+
10+
class IspRestrictionsProvider
11+
{
12+
public function __construct(
13+
private readonly string $confPath,
14+
private readonly LoggerInterface $logger,
15+
) {
16+
}
17+
18+
public function load(): IspRestrictions
19+
{
20+
$contents = $this->readConfigFile();
21+
if ($contents === null) {
22+
return new IspRestrictions(null, null, null);
23+
}
24+
25+
[$raw, $maxBatch, $minBatchPeriod, $lockFile] = $this->parseContents($contents);
26+
27+
$this->logIfDetected($maxBatch, $minBatchPeriod, $lockFile);
28+
29+
return new IspRestrictions($maxBatch, $minBatchPeriod, $lockFile, $raw);
30+
}
31+
32+
private function readConfigFile(): ?string
33+
{
34+
if (!is_file($this->confPath) || !is_readable($this->confPath)) {
35+
return null;
36+
}
37+
$contents = file_get_contents($this->confPath);
38+
if ($contents === false) {
39+
$this->logger->warning('Cannot read ISP restrictions file', ['path' => $this->confPath]);
40+
return null;
41+
}
42+
return $contents;
43+
}
44+
45+
/**
46+
* @return array{0: array<string,string>, 1: ?int, 2: ?int, 3: ?string}
47+
*/
48+
private function parseContents(string $contents): array
49+
{
50+
$maxBatch = null;
51+
$minBatchPeriod = null;
52+
$lockFile = null;
53+
$raw = [];
54+
55+
foreach (preg_split('/\R/', $contents) as $line) {
56+
[$key, $val] = $this->parseLine($line);
57+
if ($key === null) {
58+
continue;
59+
}
60+
$raw[$key] = $val;
61+
[$maxBatch, $minBatchPeriod, $lockFile] = $this->applyKeyValue(
62+
$key,
63+
$val,
64+
$maxBatch,
65+
$minBatchPeriod,
66+
$lockFile
67+
);
68+
}
69+
70+
return [$raw, $maxBatch, $minBatchPeriod, $lockFile];
71+
}
72+
73+
/**
74+
* @return array{0: ?string, 1: string}
75+
*/
76+
private function parseLine(string $line): array
77+
{
78+
$line = trim($line);
79+
if ($line === '' || str_starts_with($line, '#') || str_starts_with($line, ';')) {
80+
return [null, ''];
81+
}
82+
$parts = explode('=', $line, 2);
83+
if (\count($parts) !== 2) {
84+
return [null, ''];
85+
}
86+
87+
return array_map('trim', $parts);
88+
}
89+
90+
/**
91+
* @param string $key
92+
* @param string $val
93+
* @param ?int $maxBatch
94+
* @param ?int $minBatchPeriod
95+
* @param ?string $lockFile
96+
* @return array{0: ?int, 1: ?int, 2: ?string}
97+
*/
98+
private function applyKeyValue(
99+
string $key,
100+
string $val,
101+
?int $maxBatch,
102+
?int $minBatchPeriod,
103+
?string $lockFile
104+
): array {
105+
if ($key === 'maxbatch') {
106+
if ($val !== '' && ctype_digit($val)) {
107+
$maxBatch = (int) $val;
108+
}
109+
return [$maxBatch, $minBatchPeriod, $lockFile];
110+
}
111+
if ($key === 'minbatchperiod') {
112+
if ($val !== '' && ctype_digit($val)) {
113+
$minBatchPeriod = (int) $val;
114+
}
115+
return [$maxBatch, $minBatchPeriod, $lockFile];
116+
}
117+
if ($key === 'lockfile') {
118+
if ($val !== '') {
119+
$lockFile = $val;
120+
}
121+
return [$maxBatch, $minBatchPeriod, $lockFile];
122+
}
123+
return [$maxBatch, $minBatchPeriod, $lockFile];
124+
}
125+
126+
private function logIfDetected(?int $maxBatch, ?int $minBatchPeriod, ?string $lockFile): void
127+
{
128+
if ($maxBatch !== null || $minBatchPeriod !== null || $lockFile !== null) {
129+
$this->logger->info('ISP restrictions detected', [
130+
'path' => $this->confPath,
131+
'maxbatch' => $maxBatch,
132+
'minbatchperiod' => $minBatchPeriod,
133+
'lockfile' => $lockFile,
134+
]);
135+
}
136+
}
137+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PhpList\Core\Domain\Common\Model;
6+
7+
final class IspRestrictions
8+
{
9+
public function __construct(
10+
public readonly ?int $maxBatch,
11+
public readonly ?int $minBatchPeriod,
12+
public readonly ?string $lockFile,
13+
public readonly array $raw = [],
14+
) {
15+
}
16+
17+
public function isEmpty(): bool
18+
{
19+
return $this->maxBatch === null && $this->minBatchPeriod === null && $this->lockFile === null;
20+
}
21+
}

src/Domain/Configuration/Service/Manager/ConfigManager.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ public function __construct(ConfigRepository $configRepository)
1717
$this->configRepository = $configRepository;
1818
}
1919

20+
public function inMaintenanceMode(): bool
21+
{
22+
$config = $this->getByItem('maintenancemode');
23+
return $config?->getValue() === '1';
24+
}
25+
2026
/**
2127
* Get a configuration item by its key
2228
*/

src/Domain/Messaging/Command/ProcessQueueCommand.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
namespace PhpList\Core\Domain\Messaging\Command;
66

7+
use DateTimeImmutable;
8+
use PhpList\Core\Domain\Configuration\Service\Manager\ConfigManager;
9+
use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus;
710
use PhpList\Core\Domain\Messaging\Repository\MessageRepository;
811
use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator;
912
use PhpList\Core\Domain\Messaging\Service\Processor\CampaignProcessor;
@@ -24,18 +27,21 @@ class ProcessQueueCommand extends Command
2427
private LockFactory $lockFactory;
2528
private MessageProcessingPreparator $messagePreparator;
2629
private CampaignProcessor $campaignProcessor;
30+
private ConfigManager $configManager;
2731

2832
public function __construct(
2933
MessageRepository $messageRepository,
3034
LockFactory $lockFactory,
3135
MessageProcessingPreparator $messagePreparator,
3236
CampaignProcessor $campaignProcessor,
37+
ConfigManager $configManager
3338
) {
3439
parent::__construct();
3540
$this->messageRepository = $messageRepository;
3641
$this->lockFactory = $lockFactory;
3742
$this->messagePreparator = $messagePreparator;
3843
$this->campaignProcessor = $campaignProcessor;
44+
$this->configManager = $configManager;
3945
}
4046

4147
/**
@@ -50,11 +56,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int
5056
return Command::FAILURE;
5157
}
5258

59+
if ($this->configManager->inMaintenanceMode()) {
60+
$output->writeln('The system is in maintenance mode, stopping. Try again later.');
61+
62+
return Command::FAILURE;
63+
}
64+
5365
try {
5466
$this->messagePreparator->ensureSubscribersHaveUuid($output);
5567
$this->messagePreparator->ensureCampaignsHaveUuid($output);
5668

57-
$campaigns = $this->messageRepository->findBy(['status' => 'submitted']);
69+
$campaigns = $this->messageRepository->getByStatusAndEmbargo(
70+
status: MessageStatus::Submitted,
71+
embargo: new DateTimeImmutable()
72+
);
5873

5974
foreach ($campaigns as $campaign) {
6075
$this->campaignProcessor->process($campaign, $output);

src/Domain/Messaging/Model/Dto/Message/MessageMetadataDto.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
namespace PhpList\Core\Domain\Messaging\Model\Dto\Message;
66

7+
use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus;
8+
79
class MessageMetadataDto
810
{
911
public function __construct(
10-
public readonly string $status,
12+
public readonly MessageStatus $status,
1113
) {
1214
}
1315
}

src/Domain/Messaging/Model/Message/MessageMetadata.php

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use DateTime;
88
use Doctrine\ORM\Mapping as ORM;
9+
use InvalidArgumentException;
910
use PhpList\Core\Domain\Common\Model\Interfaces\EmbeddableInterface;
1011

1112
#[ORM\Embeddable]
@@ -33,13 +34,13 @@ class MessageMetadata implements EmbeddableInterface
3334
private ?DateTime $sendStart;
3435

3536
public function __construct(
36-
?string $status = null,
37+
?MessageStatus $status = null,
3738
int $bounceCount = 0,
3839
?DateTime $entered = null,
3940
?DateTime $sent = null,
4041
?DateTime $sendStart = null,
4142
) {
42-
$this->status = $status;
43+
$this->status = $status->value ?? null;
4344
$this->processed = false;
4445
$this->viewed = 0;
4546
$this->bounceCount = $bounceCount;
@@ -48,14 +49,21 @@ public function __construct(
4849
$this->sendStart = $sendStart;
4950
}
5051

51-
public function getStatus(): ?string
52+
/**
53+
* @SuppressWarnings("PHPMD.StaticAccess")
54+
*/
55+
public function getStatus(): ?MessageStatus
5256
{
53-
return $this->status;
57+
return MessageStatus::from($this->status);
5458
}
5559

56-
public function setStatus(string $status): self
60+
public function setStatus(MessageStatus $status): self
5761
{
58-
$this->status = $status;
62+
if (!$this->getStatus()->canTransitionTo($status)) {
63+
throw new InvalidArgumentException('Invalid status transition');
64+
}
65+
$this->status = $status->value;
66+
5967
return $this;
6068
}
6169

0 commit comments

Comments
 (0)