Skip to content

Commit 0ddc7ab

Browse files
Add circuit breaker job middleware
1 parent 34418f3 commit 0ddc7ab

File tree

2 files changed

+273
-0
lines changed

2 files changed

+273
-0
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
<?php
2+
3+
namespace Illuminate\Queue\Middleware;
4+
5+
use Illuminate\Cache\RateLimiter;
6+
use Illuminate\Container\Container;
7+
use Throwable;
8+
9+
class CircuitBreaker
10+
{
11+
/**
12+
* The maximum number of attempts allowed before the circuit is opened.
13+
*
14+
* @var int
15+
*/
16+
protected $maxAttempts;
17+
18+
/**
19+
* The number of minutes until the maximum attempts are reset.
20+
*
21+
* @var int
22+
*/
23+
protected $decayMinutes;
24+
25+
/**
26+
* The number of minutes to wait before retrying the job after an exception.
27+
*
28+
* @var int
29+
*/
30+
protected $retryAfterMinutes;
31+
32+
/**
33+
* The rate limiter key.
34+
*
35+
* @var string
36+
*/
37+
protected $key;
38+
39+
/**
40+
* The prefix of the rate limiter key.
41+
*
42+
* @var string
43+
*/
44+
protected $prefix = 'circuit_breaker:';
45+
46+
/**
47+
* The rate limiter instance.
48+
*
49+
* @var \Illuminate\Cache\RateLimiter
50+
*/
51+
protected $limiter;
52+
53+
/**
54+
* Create a new middleware instance.
55+
*
56+
* @param int $maxAttempts
57+
* @param int $decayMinutes
58+
* @param int $retryAfterMinutes
59+
* @param string $key
60+
*/
61+
public function __construct($maxAttempts = 10, $decayMinutes = 10, $retryAfterMinutes = 0, string $key = '')
62+
{
63+
$this->maxAttempts = $maxAttempts;
64+
$this->decayMinutes = $decayMinutes;
65+
$this->retryAfterMinutes = $retryAfterMinutes;
66+
$this->key = $key;
67+
}
68+
69+
/**
70+
* Process the job.
71+
*
72+
* @param mixed $job
73+
* @param callable $next
74+
* @return mixed
75+
*/
76+
public function handle($job, $next)
77+
{
78+
$this->limiter = Container::getInstance()->make(RateLimiter::class);
79+
80+
if ($this->limiter->tooManyAttempts($jobKey = $this->getKey($job), $this->maxAttempts)) {
81+
return $job->release($this->getTimeUntilNextRetry($jobKey));
82+
}
83+
84+
try {
85+
$next($job);
86+
87+
$this->limiter->clear($jobKey);
88+
} catch (Throwable $throwable) {
89+
$this->limiter->hit($jobKey, $this->decayMinutes * 60);
90+
91+
return $job->release($this->retryAfterMinutes * 60);
92+
}
93+
}
94+
95+
/**
96+
* Set the prefix of the rate limiter key.
97+
*
98+
* @param string $prefix
99+
* @return $this
100+
*/
101+
public function withPrefix(string $prefix)
102+
{
103+
$this->prefix = $prefix;
104+
105+
return $this;
106+
}
107+
108+
/**
109+
* Get the number of seconds that should elapse before the job is retried.
110+
*
111+
* @param string $key
112+
* @return int
113+
*/
114+
protected function getTimeUntilNextRetry($key)
115+
{
116+
return $this->limiter->availableIn($key) + 3;
117+
}
118+
119+
/**
120+
* Get the cache key associated for the rate limiter.
121+
*
122+
* @param mixed $job
123+
* @return string
124+
*/
125+
protected function getKey($job)
126+
{
127+
return md5($this->prefix.(empty($this->key) ? get_class($job) : $this->key));
128+
}
129+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
<?php
2+
3+
namespace Illuminate\Tests\Integration\Queue;
4+
5+
use Exception;
6+
use Illuminate\Bus\Dispatcher;
7+
use Illuminate\Bus\Queueable;
8+
use Illuminate\Contracts\Queue\Job;
9+
use Illuminate\Queue\CallQueuedHandler;
10+
use Illuminate\Queue\InteractsWithQueue;
11+
use Illuminate\Queue\Middleware\CircuitBreaker;
12+
use Mockery as m;
13+
use Orchestra\Testbench\TestCase;
14+
15+
/**
16+
* @group integration
17+
*/
18+
class CircuitBreakerTest extends TestCase
19+
{
20+
protected function tearDown(): void
21+
{
22+
parent::tearDown();
23+
24+
m::close();
25+
}
26+
27+
public function testCircuitIsOpenedForJobErrors()
28+
{
29+
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
30+
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
31+
$this->assertJobWasReleasedWithDelay(CircuitBreakerTestJob::class);
32+
}
33+
34+
public function testCircuitStaysClosedForSuccessfulJobs()
35+
{
36+
$this->assertJobRanSuccessfully(CircuitBreakerSuccessfulJob::class);
37+
$this->assertJobRanSuccessfully(CircuitBreakerSuccessfulJob::class);
38+
$this->assertJobRanSuccessfully(CircuitBreakerSuccessfulJob::class);
39+
}
40+
41+
public function testCircuitResetsAfterSuccess()
42+
{
43+
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
44+
$this->assertJobRanSuccessfully(CircuitBreakerSuccessfulJob::class);
45+
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
46+
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
47+
$this->assertJobWasReleasedWithDelay(CircuitBreakerTestJob::class);
48+
}
49+
50+
protected function assertJobWasReleasedImmediately($class)
51+
{
52+
$class::$handled = false;
53+
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);
54+
55+
$job = m::mock(Job::class);
56+
57+
$job->shouldReceive('hasFailed')->once()->andReturn(false);
58+
$job->shouldReceive('release')->with(0)->once();
59+
$job->shouldReceive('isReleased')->andReturn(true);
60+
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);
61+
62+
$instance->call($job, [
63+
'command' => serialize($command = new $class),
64+
]);
65+
66+
$this->assertTrue($class::$handled);
67+
}
68+
69+
protected function assertJobWasReleasedWithDelay($class)
70+
{
71+
$class::$handled = false;
72+
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);
73+
74+
$job = m::mock(Job::class);
75+
76+
$job->shouldReceive('hasFailed')->once()->andReturn(false);
77+
$job->shouldReceive('release')->withArgs(function ($delay) {
78+
return $delay >= 600;
79+
})->once();
80+
$job->shouldReceive('isReleased')->andReturn(true);
81+
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);
82+
83+
$instance->call($job, [
84+
'command' => serialize($command = new $class),
85+
]);
86+
87+
$this->assertFalse($class::$handled);
88+
}
89+
90+
protected function assertJobRanSuccessfully($class)
91+
{
92+
$class::$handled = false;
93+
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);
94+
95+
$job = m::mock(Job::class);
96+
97+
$job->shouldReceive('hasFailed')->once()->andReturn(false);
98+
$job->shouldReceive('isReleased')->andReturn(false);
99+
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
100+
$job->shouldReceive('delete')->once();
101+
102+
$instance->call($job, [
103+
'command' => serialize($command = new $class),
104+
]);
105+
106+
$this->assertTrue($class::$handled);
107+
}
108+
}
109+
110+
class CircuitBreakerTestJob
111+
{
112+
use InteractsWithQueue, Queueable;
113+
114+
public static $handled = false;
115+
116+
public function handle()
117+
{
118+
static::$handled = true;
119+
120+
throw new Exception;
121+
}
122+
123+
public function middleware()
124+
{
125+
return [new CircuitBreaker(2, 10, 0, 'test')];
126+
}
127+
}
128+
129+
class CircuitBreakerSuccessfulJob
130+
{
131+
use InteractsWithQueue, Queueable;
132+
133+
public static $handled = false;
134+
135+
public function handle()
136+
{
137+
static::$handled = true;
138+
}
139+
140+
public function middleware()
141+
{
142+
return [new CircuitBreaker(2, 10, 0, 'test')];
143+
}
144+
}

0 commit comments

Comments
 (0)