Symfony bundle for convenient work with queues. Currently it supports RabbitMQ.
-
Install bundle
composer require lamoda/queue-bundle
-
Extend
Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclassuse Doctrine\ORM\Mapping as ORM; use Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass; /** * @ORM\Entity(repositoryClass="Lamoda\QueueBundle\Entity\QueueRepository") */ class Queue extends QueueEntityMappedSuperclass { }
-
Configure bundle parameters
lamoda_queue: ## required entity_class: App\Entity\Queue max_attempts: 5 batch_size_per_requeue: 5 batch_size_per_republish: 5 ## optional (will use for default delay Geometric Progression Strategy) strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
-
Register bundle
class AppKernel extends Kernel { // ... public function registerBundles() { $bundles = [ // ... new Lamoda\QueueBundle\LamodaQueueBundle(), // ... ]; return $bundles; } // ... }
or add to
config/bundles.phpreturn [ // ... Lamoda\QueueBundle\LamodaQueueBundle::class => ['all' => true], // ... ];
-
Migrate schema
doctrine:migrations:diffto create migration forqueuetabledoctrine:migrations:migrate- apply the migration
-
Define new exchange constant
namespace App\Constant; class Exchanges { public const DEFAULT = 'default'; }
-
Add new node to
old_sound_rabbit_mq.producerswith previous defined constant name, example:old_sound_rabbit_mq: producers: default: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct"
-
Define new queue constant
namespace App\Constant; class Queues { public const NOTIFICATION = 'notification'; }
-
Register consumer for queue in
old_sound_rabbit_mq.consumerswith previous defined constant name, example:old_sound_rabbit_mq: consumers: notification: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct" queue_options: name: !php/const App\Constant\Queues::NOTIFICATION routing_keys: - !php/const App\Constant\Queues::NOTIFICATION callback: "lamoda_queue.consumer"
-
Create job class, extend
AbstractJobby example:namespace App\Job; use App\Constant\Exchanges; use App\Constant\Queues; use Lamoda\QueueBundle\Job\AbstractJob; use JMS\Serializer\Annotation as JMS; class SendNotificationJob extends AbstractJob { /** * @var string * * @JMS\Type("int") */ private $message; public function __construct(string $message) { $this->message = $message; } public function getDefaultQueue(): string { return Queues::NOTIFICATION; } public function getDefaultExchange(): string { return Exchanges::DEFAULT; } }
-
Create job handler, implement HandlerInterface by example:
namespace App\Handler; use Lamoda\QueueBundle\Handler\HandlerInterface; use Lamoda\QueueBundle\QueueInterface; class SendNotificationHandler implements HandlerInterface { public function handle(QueueInterface $job): void { // implement service logic here } }
-
Tag handler at service container
services: App\Handler\SendNotificationHandler: public: true tags: - { name: queue.handler, handle: App\Job\SendNotificationJob }
-
Configure delay strategy parameters (optional)
lamoda_queue: ... queues: queue_one: 'delay_arithmetic_progression' queue_two: 'delay_special_geometric_progression' # Settings of special behaviors for Delay strategies (optional) services: lamoda_queue.strategy.delay.arithmetic_progression: class: Lamoda\QueueBundle\Strategy\Delay\ArithmeticProgressionStrategy tags: - { name: 'lamoda_queue_strategy', key: 'delay_arithmetic_progression' } arguments: - 60 # start_interval_sec parameter - 1700 # multiplier parameter lamoda_queue.strategy.delay.geometric_progression: class: Lamoda\QueueBundle\Strategy\Delay\GeometricProgressionStrategy tags: - { name: 'lamoda_queue_strategy', key: 'delay_special_geometric_progression' } arguments: - 70 # start_interval_sec parameter - 4 # multiplier parameter
In this block, you can config special delay behaviors for each queue. For this, you have to register new services that use one of several base strategies (ArithmeticProgressionStrategy, GeometricProgressionStrategy) or yours (for this you have to make Service Class that implements DelayStrategyInterface).
Each strategy service has to have a tag with name
lamoda_queue_strategyand uniquekey. After this, you can use thesekeysfor matching with queues inlamoda_queue.queuessection.By default, use GeometricProgressionStrategy with params (their you can customize in
lamoda_queueconfig section):strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2 -
Add queue name in "codeception.yml" at
modules.config.AMQP.queues -
Execute
./bin/console queue:initcommand
./bin/console queue:init
$job = new SendNotificationJob($id);
$container->get(Lamoda\QueueBundle\Factory\PublisherFactory::class)->publish($job);./bin/console queue:consume notification
./bin/console queue:requeue
You can queue any primitive class, just implement QueueInterface:
namespace App\Process;
use Lamoda\QueueBundle\Entity\QueueInterface;
class MyProcess implements QueueInterface
{
// implement interface functions
}services:
App\Handler\MyProcessHandler:
public: true
tags:
- { name: queue.handler, handle: App\Process\MyProcess }$process = new MyProcess();
$container->get('queue.publisher')->publish($process);If you want to rerun queue, throw Lamoda\QueueBundle\Exception\RuntimeException.
If you want mark queue as failed, throw any another kind of exception.
namespace App\Handler;
use Lamoda\QueueBundle\Handler\HandlerInterface;
use Lamoda\QueueBundle\QueueInterface;
class SendNotificationHandler implements HandlerInterface
{
public function handle(QueueInterface $job): void
{
// implement service logic here
// Rerun queue
if ($rerun === true) {
throw new Lamoda\QueueBundle\Exception\RuntimeException('Error message');
}
// Mark queue as failed
if ($failed === true) {
throw new \Exception();
}
}
}By default delay time is calculated exponentially. You can affect it through configuration.
lamoda_queue:
## required
## ...
max_attempts: 5
## optional
strategy_delay_geometric_progression_start_interval_sec: 60
strategy_delay_geometric_progression_multiplier: 2When consumer wants to execute reached maximum attempts queue.
Properties:
- Queue Entity
QueueAttemptsReachedEvent::getQueue()
make php-cs-check
make php-cs-fixUnit
make test-unit

