Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
"phpstan": "tools/phpstan analyse",
"stan-baseline": "tools/phpstan --generate-baseline",
"stan-setup": "phive install",
"rector-setup": "cp composer.json composer.backup && composer require --dev rector/rector:\"^2.2\" && mv composer.backup composer.json",
"rector-check": "vendor/bin/rector process --dry-run",
"rector-fix": "vendor/bin/rector process",
"test": "phpunit",
"test-coverage": "phpunit --coverage-clover=clover.xml"
}
Expand Down
35 changes: 35 additions & 0 deletions rector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php
declare(strict_types=1);

use Rector\CodeQuality\Rector\If_\SimplifyIfElseToTernaryRector;
use Rector\CodingStyle\Rector\ClassMethod\MakeInheritedMethodVisibilitySameAsParentRector;
use Rector\Config\RectorConfig;
use Rector\DeadCode\Rector\ClassMethod\RemoveNullTagValueNodeRector;
use Rector\DeadCode\Rector\ClassMethod\RemoveUselessReturnTagRector;
use Rector\Strict\Rector\Empty_\DisallowedEmptyRuleFixerRector;
use Rector\TypeDeclarationDocblocks\Rector\ClassMethod\DocblockReturnArrayFromDirectArrayInstanceRector;
use Rector\ValueObject\PhpVersion;

return RectorConfig::configure()
->withPhpVersion(PhpVersion::PHP_82)
->withPaths([
__DIR__ . '/src',
__DIR__ . '/tests',
])
->withSkip([
DisallowedEmptyRuleFixerRector::class,
SimplifyIfElseToTernaryRector::class,
MakeInheritedMethodVisibilitySameAsParentRector::class,
RemoveNullTagValueNodeRector::class,
RemoveUselessReturnTagRector::class,
DocblockReturnArrayFromDirectArrayInstanceRector::class => [
__DIR__ . '/src/Mailer/Transport/QueueTransport.php',
],
])
->withParallel()
->withPreparedSets(
deadCode: true,
codeQuality: true,
codingStyle: true,
typeDeclarationDocblocks: true,
);
16 changes: 9 additions & 7 deletions src/Command/PurgeFailedCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public function getOptionParser(): ConsoleOptionParser
/**
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return void
* @return int
*/
public function execute(Arguments $args, ConsoleIo $io): void
public function execute(Arguments $args, ConsoleIo $io): int
{
/** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */
$failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs');
Expand Down Expand Up @@ -108,21 +108,23 @@ public function execute(Arguments $args, ConsoleIo $io): void
if (!$deletingCount) {
$io->out('0 jobs found.');

return;
return self::CODE_SUCCESS;
}

if (!$args->getOption('force')) {
$confirmed = $io->askChoice("Delete {$deletingCount} jobs?", ['y', 'n'], 'n');
$confirmed = $io->askChoice(sprintf('Delete %s jobs?', $deletingCount), ['y', 'n'], 'n');

if ($confirmed !== 'y') {
return;
return self::CODE_SUCCESS;
}
}

$io->out("Deleting {$deletingCount} jobs.");
$io->out(sprintf('Deleting %s jobs.', $deletingCount));

$failedJobsTable->deleteManyOrFail($jobsToDelete);

$io->success("{$deletingCount} jobs deleted.");
$io->success($deletingCount . ' jobs deleted.');

return self::CODE_SUCCESS;
}
}
26 changes: 14 additions & 12 deletions src/Command/RequeueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public function getOptionParser(): ConsoleOptionParser
/**
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return void
* @return int
*/
public function execute(Arguments $args, ConsoleIo $io): void
public function execute(Arguments $args, ConsoleIo $io): int
{
/** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */
$failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs');
Expand Down Expand Up @@ -110,26 +110,26 @@ public function execute(Arguments $args, ConsoleIo $io): void
if (!$requeueingCount) {
$io->out('0 jobs found.');

return;
return self::CODE_SUCCESS;
}

if (!$args->getOption('force')) {
$confirmed = $io->askChoice("Requeue {$requeueingCount} jobs?", ['y', 'n'], 'n');
$confirmed = $io->askChoice(sprintf('Requeue %s jobs?', $requeueingCount), ['y', 'n'], 'n');

if ($confirmed !== 'y') {
return;
return self::CODE_SUCCESS;
}
}

$io->out("Requeueing {$requeueingCount} jobs.");
$io->out(sprintf('Requeueing %s jobs.', $requeueingCount));

$succeededCount = 0;
$failedCount = 0;

/** @var array<\Cake\Queue\Model\Entity\FailedJob> $jobsToRequeue */
$jobsToRequeue = $jobsToRequeueQuery->all();
foreach ($jobsToRequeue as $failedJob) {
$io->verbose("Requeueing FailedJob with ID {$failedJob->id}.");
$io->verbose(sprintf('Requeueing FailedJob with ID %d.', $failedJob->id));
try {
QueueManager::push(
[$failedJob->class, $failedJob->method],
Expand All @@ -145,19 +145,21 @@ public function execute(Arguments $args, ConsoleIo $io): void

$succeededCount++;
} catch (Exception $e) {
$io->err("Exception occurred while requeueing FailedJob with ID {$failedJob->id}");
$io->err('Exception occurred while requeueing FailedJob with ID ' . $failedJob->id);
$io->err((string)$e);

$failedCount++;
}
}

if ($failedCount) {
$io->err("Failed to requeue {$failedCount} jobs.");
if ($failedCount !== 0) {
$io->err(sprintf('Failed to requeue %d jobs.', $failedCount));
}

if ($succeededCount) {
$io->success("{$succeededCount} jobs requeued.");
if ($succeededCount !== 0) {
$io->success($succeededCount . ' jobs requeued.');
}

return self::CODE_SUCCESS;
}
}
22 changes: 9 additions & 13 deletions src/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,12 @@
*/
class WorkerCommand extends Command
{
/**
* @var \Cake\Core\ContainerInterface|null
*/
protected ?ContainerInterface $container = null;

/**
* @param \Cake\Core\ContainerInterface|null $container DI container instance
*/
public function __construct(?ContainerInterface $container = null)
{
$this->container = $container;
public function __construct(
protected readonly ?ContainerInterface $container = null,
) {
}

/**
Expand Down Expand Up @@ -138,12 +133,12 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
$limitAttempsExtension,
];

if (!is_null($args->getOption('max-jobs'))) {
if ($args->getOption('max-jobs') !== null) {
$maxJobs = (int)$args->getOption('max-jobs');
$extensions[] = new LimitConsumedMessagesExtension($maxJobs);
}

if (!is_null($args->getOption('max-runtime'))) {
if ($args->getOption('max-runtime') !== null) {
$endTime = new DateTime(sprintf('+%d seconds', (int)$args->getOption('max-runtime')));
$extensions[] = new LimitConsumptionTimeExtension($endTime);
}
Expand Down Expand Up @@ -187,12 +182,12 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface
$processorClass = $config['processor'] ?? Processor::class;

if (!class_exists($processorClass)) {
$io->error(sprintf(sprintf('Processor class %s not found', $processorClass)));
$io->error(sprintf('Processor class %s not found', $processorClass));
$this->abort();
}

if (!is_subclass_of($processorClass, InteropProcessor::class)) {
$io->error(sprintf(sprintf('Processor class %s must implement Interop\Queue\Processor', $processorClass)));
$io->error(sprintf('Processor class %s must implement Interop\Queue\Processor', $processorClass));
$this->abort();
}

Expand Down Expand Up @@ -231,10 +226,11 @@ public function execute(Arguments $args, ConsoleIo $io): int
$processor->getEventManager()->on($listener);
}
}

$client = QueueManager::engine($config);
$queue = $args->getOption('queue')
? (string)$args->getOption('queue')
: Configure::read("Queue.{$config}.queue", 'default');
: Configure::read(sprintf('Queue.%s.queue', $config), 'default');
$processorName = $args->getOption('processor') ? (string)$args->getOption('processor') : 'default';

$client->bindTopic($queue, $processor, $processorName);
Expand Down
16 changes: 4 additions & 12 deletions src/Consumption/LimitAttemptsExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,12 @@ class LimitAttemptsExtension implements MessageResultExtensionInterface
public const ATTEMPTS_PROPERTY = 'attempts';

/**
* The maximum number of times a job may be attempted. $maxAttempts defined on a
* Job will override this value.
*
* @var int|null
*/
protected ?int $maxAttempts = null;

/**
* @param int|null $maxAttempts The maximum number of times a job may be attempted.
* @param int|null $maxAttempts The maximum number of times a job may be attempted. $maxAttempts defined on a Job will override this value.
* @return void
*/
public function __construct(?int $maxAttempts = null)
{
$this->maxAttempts = $maxAttempts;
public function __construct(
protected readonly ?int $maxAttempts = null,
) {
}

/**
Expand Down
15 changes: 4 additions & 11 deletions src/Consumption/LimitConsumedMessagesExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@
*/
class LimitConsumedMessagesExtension implements PreConsumeExtensionInterface, PostConsumeExtensionInterface
{
/**
* @var int
*/
protected int $messageLimit;

/**
* @var int
*/
protected int $messageConsumed = 0;

/**
* @param int $messageLimit The number of messages to process before exiting.
* @return void
*/
public function __construct(int $messageLimit)
{
$this->messageLimit = $messageLimit;
public function __construct(
protected readonly int $messageLimit,
) {
}

/**
Expand Down
13 changes: 3 additions & 10 deletions src/Consumption/RemoveUniqueJobIdFromCacheExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,13 @@

class RemoveUniqueJobIdFromCacheExtension implements MessageResultExtensionInterface
{
/**
* Cache engine name.
*
* @var string
*/
protected string $cache;

/**
* @param string $cache Cache engine name.
* @return void
*/
public function __construct(string $cache)
{
$this->cache = $cache;
public function __construct(
protected readonly string $cache,
) {
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/Job/MailerJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ public function execute(Message $message): ?string

try {
$mailer = $this->getMailer($mailerName, $mailerConfig);
} catch (MissingMailerException $e) {
} catch (MissingMailerException $missingMailerException) {
return Processor::REJECT;
}

try {
$mailer->send($action, $args, $headers);
} catch (BadMethodCallException $e) {
} catch (BadMethodCallException $badMethodCallException) {
return Processor::REJECT;
}

Expand Down
Loading