Skip to content

Commit f59c0cd

Browse files
[8.x] Add queue:monitor command (#38168)
* add queue:monitor command * fix style * formatting Co-authored-by: Taylor Otwell <[email protected]>
1 parent 6662f49 commit f59c0cd

File tree

3 files changed

+193
-0
lines changed

3 files changed

+193
-0
lines changed

src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
use Illuminate\Queue\Console\ForgetFailedCommand as ForgetFailedQueueCommand;
6969
use Illuminate\Queue\Console\ListenCommand as QueueListenCommand;
7070
use Illuminate\Queue\Console\ListFailedCommand as ListFailedQueueCommand;
71+
use Illuminate\Queue\Console\MonitorCommand as QueueMonitorCommand;
7172
use Illuminate\Queue\Console\PruneBatchesCommand as PruneBatchesQueueCommand;
7273
use Illuminate\Queue\Console\PruneFailedJobsCommand;
7374
use Illuminate\Queue\Console\RestartCommand as QueueRestartCommand;
@@ -111,6 +112,7 @@ class ArtisanServiceProvider extends ServiceProvider implements DeferrableProvid
111112
'QueueFlush' => 'command.queue.flush',
112113
'QueueForget' => 'command.queue.forget',
113114
'QueueListen' => 'command.queue.listen',
115+
'QueueMonitor' => 'command.queue.monitor',
114116
'QueuePruneBatches' => 'command.queue.prune-batches',
115117
'QueuePruneFailedJobs' => 'command.queue.prune-failed-jobs',
116118
'QueueRestart' => 'command.queue.restart',
@@ -702,6 +704,18 @@ protected function registerQueueListenCommand()
702704
});
703705
}
704706

707+
/**
708+
* Register the command.
709+
*
710+
* @return void
711+
*/
712+
protected function registerQueueMonitorCommand()
713+
{
714+
$this->app->singleton('command.queue.monitor', function ($app) {
715+
return new QueueMonitorCommand($app['queue'], $app['events']);
716+
});
717+
}
718+
705719
/**
706720
* Register the command.
707721
*
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
<?php
2+
3+
namespace Illuminate\Queue\Console;
4+
5+
use Illuminate\Console\Command;
6+
use Illuminate\Contracts\Events\Dispatcher;
7+
use Illuminate\Contracts\Queue\Factory;
8+
use Illuminate\Queue\Events\QueueBusy;
9+
use Illuminate\Support\Collection;
10+
11+
class MonitorCommand extends Command
12+
{
13+
/**
14+
* The console command name.
15+
*
16+
* @var string
17+
*/
18+
protected $signature = 'queue:monitor
19+
{queues : The names of the queues to monitor}
20+
{--max=1000 : The maximum number of jobs that can be on the queue before an event is dispatched}';
21+
22+
/**
23+
* The console command description.
24+
*
25+
* @var string
26+
*/
27+
protected $description = 'Monitor the size of the specified queues';
28+
29+
/**
30+
* The queue manager instance.
31+
*
32+
* @var \Illuminate\Contracts\Queue\Factory
33+
*/
34+
protected $manager;
35+
36+
/**
37+
* The events dispatcher instance.
38+
*
39+
* @var \Illuminate\Contracts\Events\Dispatcher
40+
*/
41+
protected $events;
42+
43+
/**
44+
* The table headers for the command.
45+
*
46+
* @var string[]
47+
*/
48+
protected $headers = ['Connection', 'Queue', 'Size', 'Status'];
49+
50+
/**
51+
* Create a new queue listen command.
52+
*
53+
* @param \Illuminate\Contracts\Queue\Factory $manager
54+
* @param \Illuminate\Contracts\Events\Dispatcher $events
55+
* @return void
56+
*/
57+
public function __construct(Factory $manager, Dispatcher $events)
58+
{
59+
parent::__construct();
60+
61+
$this->manager = $manager;
62+
$this->events = $events;
63+
}
64+
65+
/**
66+
* Execute the console command.
67+
*
68+
* @return void
69+
*/
70+
public function handle()
71+
{
72+
$queues = $this->parseQueues($this->argument('queues'));
73+
74+
$this->displaySizes($queues);
75+
76+
$this->dispatchEvents($queues);
77+
}
78+
79+
/**
80+
* Parse the queues into an array of the connections and queues.
81+
*
82+
* @param string $queues
83+
* @return \Illuminate\Support\Collection
84+
*/
85+
protected function parseQueues($queues)
86+
{
87+
return collect(explode(',', $queues))->map(function ($queue) {
88+
[$connection, $queue] = array_pad(explode(':', $queue, 2), 2, null);
89+
90+
if (! isset($queue)) {
91+
$queue = $connection;
92+
$connection = $this->laravel['config']['queue.default'];
93+
}
94+
95+
return [
96+
'connection' => $connection,
97+
'queue' => $queue,
98+
'size' => $size = $this->manager->connection($connection)->size($queue),
99+
'status' => $size >= $this->option('max') ? '<fg=red>ALERT</>' : 'OK',
100+
];
101+
});
102+
}
103+
104+
/**
105+
* Display the failed jobs in the console.
106+
*
107+
* @param \Illuminate\Support\Collection $queues
108+
* @return void
109+
*/
110+
protected function displaySizes(Collection $queues)
111+
{
112+
$this->table($this->headers, $queues);
113+
}
114+
115+
/**
116+
* Fire the monitoring events.
117+
*
118+
* @param \Illuminate\Support\Collection $queues
119+
* @return void
120+
*/
121+
protected function dispatchEvents(Collection $queues)
122+
{
123+
foreach ($queues as $queue) {
124+
if ($queue['status'] == 'OK') {
125+
continue;
126+
}
127+
128+
$this->events->dispatch(
129+
new QueueBusy(
130+
$queue['connection'],
131+
$queue['queue'],
132+
$queue['size'],
133+
)
134+
);
135+
}
136+
}
137+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
namespace Illuminate\Queue\Events;
4+
5+
class QueueBusy
6+
{
7+
/**
8+
* The connection name.
9+
*
10+
* @var string
11+
*/
12+
public $connection;
13+
14+
/**
15+
* The queue name.
16+
*
17+
* @var string
18+
*/
19+
public $queue;
20+
21+
/**
22+
* The size of the queue.
23+
*
24+
* @var int
25+
*/
26+
public $size;
27+
28+
/**
29+
* Create a new event instance.
30+
*
31+
* @param string $connection
32+
* @param string $queue
33+
* @param int $size
34+
* @return void
35+
*/
36+
public function __construct($connection, $queue, $size)
37+
{
38+
$this->connection = $connection;
39+
$this->queue = $queue;
40+
$this->size = $size;
41+
}
42+
}

0 commit comments

Comments
 (0)