From 4a94efcd4f2adbd73896b9eb9cc9894eacacbada Mon Sep 17 00:00:00 2001 From: Ray Ward Date: Thu, 13 Apr 2017 13:58:44 -0700 Subject: [PATCH 01/10] Add ReserverInterface. This interface allows implementations to define different behaviours for reserving jobs from the queues. --- lib/Resque/Reserver/ReserverInterface.php | 40 +++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 lib/Resque/Reserver/ReserverInterface.php diff --git a/lib/Resque/Reserver/ReserverInterface.php b/lib/Resque/Reserver/ReserverInterface.php new file mode 100644 index 00000000..6e151e51 --- /dev/null +++ b/lib/Resque/Reserver/ReserverInterface.php @@ -0,0 +1,40 @@ + Date: Thu, 13 Apr 2017 13:39:34 -0700 Subject: [PATCH 02/10] Add QueueOrderReserver. This reserver checks queues in the order they were specified. As long as jobs exist on higher priority queues, they will be reserved before moving to the next lowest priority queue. This implements the current default job reservation behaviour. --- composer.json | 5 ++ composer.lock | 62 ++++++++-------- lib/Resque/Reserver/AbstractReserver.php | 59 +++++++++++++++ lib/Resque/Reserver/QueueOrderReserver.php | 41 ++++++++++ .../Tests/Reserver/AbstractReserverTest.php | 59 +++++++++++++++ .../Tests/Reserver/QueueOrderReserverTest.php | 74 +++++++++++++++++++ test/bootstrap.php | 1 - 7 files changed, 271 insertions(+), 30 deletions(-) create mode 100644 lib/Resque/Reserver/AbstractReserver.php create mode 100644 lib/Resque/Reserver/QueueOrderReserver.php create mode 100644 test/Resque/Tests/Reserver/AbstractReserverTest.php create mode 100644 test/Resque/Tests/Reserver/QueueOrderReserverTest.php diff --git a/composer.json b/composer.json index b12fa291..e45aabe6 100644 --- a/composer.json +++ b/composer.json @@ -37,5 +37,10 @@ "psr-0": { "Resque": "lib" } + }, + "autoload-dev": { + "psr-0": { + "Resque": "test/" + } } } diff --git a/composer.lock b/composer.lock index 0f431b90..128cee07 100644 --- a/composer.lock +++ b/composer.lock @@ -4,21 +4,20 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", "This file is @generated automatically" ], - "hash": "41124ffd15a15b52947e430b92b8f10f", "content-hash": "11906622d4e017ff6807c6dff51f208d", "packages": [ { "name": "colinmollenhour/credis", - "version": "1.7", + "version": "1.8.1", "source": { "type": "git", "url": "https://github.com/colinmollenhour/credis.git", - "reference": "74b2b703da5c58dc07fb97e8954bc63280b469bf" + "reference": "215810e7161748a99dbc37020d38068a80aa0805" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/colinmollenhour/credis/zipball/74b2b703da5c58dc07fb97e8954bc63280b469bf", - "reference": "74b2b703da5c58dc07fb97e8954bc63280b469bf", + "url": "https://api.github.com/repos/colinmollenhour/credis/zipball/215810e7161748a99dbc37020d38068a80aa0805", + "reference": "215810e7161748a99dbc37020d38068a80aa0805", "shasum": "" }, "require": { @@ -44,7 +43,7 @@ ], "description": "Credis is a lightweight interface to the Redis key-value store which wraps the phpredis library when available for better performance.", "homepage": "https://github.com/colinmollenhour/credis", - "time": "2016-03-24 15:50:52" + "time": "2017-03-25T03:27:34+00:00" }, { "name": "psr/log", @@ -82,7 +81,7 @@ "psr", "psr-3" ], - "time": "2012-12-21 11:40:51" + "time": "2012-12-21T11:40:51+00:00" } ], "packages-dev": [ @@ -145,20 +144,20 @@ "testing", "xunit" ], - "time": "2014-09-02 10:13:14" + "time": "2014-09-02T10:13:14+00:00" }, { "name": "phpunit/php-file-iterator", - "version": "1.4.1", + "version": "1.4.2", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-file-iterator.git", - "reference": "6150bf2c35d3fc379e50c7602b75caceaa39dbf0" + "reference": "3cc8f69b3028d0f96a9078e6295d86e9bf019be5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-file-iterator/zipball/6150bf2c35d3fc379e50c7602b75caceaa39dbf0", - "reference": "6150bf2c35d3fc379e50c7602b75caceaa39dbf0", + "url": "https://api.github.com/repos/sebastianbergmann/php-file-iterator/zipball/3cc8f69b3028d0f96a9078e6295d86e9bf019be5", + "reference": "3cc8f69b3028d0f96a9078e6295d86e9bf019be5", "shasum": "" }, "require": { @@ -192,7 +191,7 @@ "filesystem", "iterator" ], - "time": "2015-06-21 13:08:43" + "time": "2016-10-03T07:40:28+00:00" }, { "name": "phpunit/php-text-template", @@ -233,29 +232,34 @@ "keywords": [ "template" ], - "time": "2015-06-21 13:50:34" + "time": "2015-06-21T13:50:34+00:00" }, { "name": "phpunit/php-timer", - "version": "1.0.8", + "version": "1.0.9", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-timer.git", - "reference": "38e9124049cf1a164f1e4537caf19c99bf1eb260" + "reference": "3dcf38ca72b158baf0bc245e9184d3fdffa9c46f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-timer/zipball/38e9124049cf1a164f1e4537caf19c99bf1eb260", - "reference": "38e9124049cf1a164f1e4537caf19c99bf1eb260", + "url": "https://api.github.com/repos/sebastianbergmann/php-timer/zipball/3dcf38ca72b158baf0bc245e9184d3fdffa9c46f", + "reference": "3dcf38ca72b158baf0bc245e9184d3fdffa9c46f", "shasum": "" }, "require": { - "php": ">=5.3.3" + "php": "^5.3.3 || ^7.0" }, "require-dev": { - "phpunit/phpunit": "~4|~5" + "phpunit/phpunit": "^4.8.35 || ^5.7 || ^6.0" }, "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0-dev" + } + }, "autoload": { "classmap": [ "src/" @@ -277,7 +281,7 @@ "keywords": [ "timer" ], - "time": "2016-05-12 18:03:57" + "time": "2017-02-26T11:10:40+00:00" }, { "name": "phpunit/php-token-stream", @@ -327,7 +331,7 @@ "keywords": [ "tokenizer" ], - "time": "2014-03-03 05:10:30" + "time": "2014-03-03T05:10:30+00:00" }, { "name": "phpunit/phpunit", @@ -400,7 +404,7 @@ "testing", "xunit" ], - "time": "2014-10-17 09:04:17" + "time": "2014-10-17T09:04:17+00:00" }, { "name": "phpunit/phpunit-mock-objects", @@ -449,20 +453,20 @@ "mock", "xunit" ], - "time": "2013-01-13 10:24:48" + "time": "2013-01-13T10:24:48+00:00" }, { "name": "symfony/yaml", - "version": "v2.8.12", + "version": "v2.8.19", "source": { "type": "git", "url": "https://github.com/symfony/yaml.git", - "reference": "e7540734bad981fe59f8ef14b6fc194ae9df8d9c" + "reference": "286d84891690b0e2515874717e49360d1c98a703" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/yaml/zipball/e7540734bad981fe59f8ef14b6fc194ae9df8d9c", - "reference": "e7540734bad981fe59f8ef14b6fc194ae9df8d9c", + "url": "https://api.github.com/repos/symfony/yaml/zipball/286d84891690b0e2515874717e49360d1c98a703", + "reference": "286d84891690b0e2515874717e49360d1c98a703", "shasum": "" }, "require": { @@ -498,7 +502,7 @@ ], "description": "Symfony Yaml Component", "homepage": "https://symfony.com", - "time": "2016-09-02 01:57:56" + "time": "2017-03-20T09:41:44+00:00" } ], "aliases": [], diff --git a/lib/Resque/Reserver/AbstractReserver.php b/lib/Resque/Reserver/AbstractReserver.php new file mode 100644 index 00000000..54cde8b6 --- /dev/null +++ b/lib/Resque/Reserver/AbstractReserver.php @@ -0,0 +1,59 @@ +logger = $logger; + $this->queues = $queues; + } + + /** + * {@inheritDoc} + */ + public function getQueues() + { + if (in_array('*', $this->queues)) { + $queues = Resque::queues(); + sort($queues); + return $queues; + } + + return $this->queues; + } + + /** + * {@inheritDoc} + */ + public function waitAfterReservationAttempt() + { + return true; + } + + /** + * {@inheritDoc} + */ + public function getName() + { + $name = get_class($this); + $name = str_replace(__NAMESPACE__, '', $name); + return trim($name, '\\'); + } +} diff --git a/lib/Resque/Reserver/QueueOrderReserver.php b/lib/Resque/Reserver/QueueOrderReserver.php new file mode 100644 index 00000000..8c6a0dbc --- /dev/null +++ b/lib/Resque/Reserver/QueueOrderReserver.php @@ -0,0 +1,41 @@ +getQueues() as $queue) { + $this->logger->debug("[{reserver}] Checking queue '{queue}' for jobs", array( + 'queue' => $queue, + 'reserver' => $this->getName(), + )); + + $job = Resque_Job::reserve($queue); + if ($job) { + $this->logger->info("[{reserver}] Found job on queue '{queue}'", array( + 'queue' => $queue, + 'reserver' => $this->getName(), + )); + return $job; + } + } + + return null; + } +} diff --git a/test/Resque/Tests/Reserver/AbstractReserverTest.php b/test/Resque/Tests/Reserver/AbstractReserverTest.php new file mode 100644 index 00000000..34b1828a --- /dev/null +++ b/test/Resque/Tests/Reserver/AbstractReserverTest.php @@ -0,0 +1,59 @@ +assertEquals($this->reserverName, $this->getReserver()->getName()); + } + + public function testGetQueuesReturnsConfiguredQueues() + { + $queues = array( + 'queue_' . rand(1, 100), + 'queue_' . rand(101, 200), + 'queue_' . rand(201, 300), + ); + $this->assertEquals($queues, $this->getReserver($queues)->getQueues()); + } + + public function testGetQueuesWithAsterixQueueReturnsAllQueuesFromRedisInSortedOrder() + { + $queues = array( + 'queue_b', + 'queue_c', + 'queue_d', + 'queue_a', + ); + + // register queues in redis + foreach ($queues as $queue) { + Resque::redis()->sadd('queues', $queue); + } + + $expected = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + ); + + $this->assertEquals($expected, $this->getReserver(array('*'))->getQueues()); + } +} diff --git a/test/Resque/Tests/Reserver/QueueOrderReserverTest.php b/test/Resque/Tests/Reserver/QueueOrderReserverTest.php new file mode 100644 index 00000000..c19e2d1e --- /dev/null +++ b/test/Resque/Tests/Reserver/QueueOrderReserverTest.php @@ -0,0 +1,74 @@ +assertTrue($this->getReserver()->waitAfterReservationAttempt()); + } + + public function testReserverWhenNoJobsEnqueuedReturnsNull() + { + $queues = array( + 'queue_1', + 'queue_2', + 'queue_3', + ); + $this->assertNull($this->getReserver($queues)->reserve()); + } + + public function testReserveReservesJobsInSpecifiedQueueOrder() + { + $queues = array( + 'high', + 'medium', + 'low', + ); + $reserver = $this->getReserver($queues); + + // Queue the jobs in a different order + Resque::enqueue('low', 'Low_Job_1'); + Resque::enqueue('high', 'High_Job_1'); + Resque::enqueue('medium', 'Medium_Job_1'); + Resque::enqueue('medium', 'Medium_Job_2'); + Resque::enqueue('high', 'High_Job_2'); + Resque::enqueue('low', 'Low_Job_2'); + + // Now check we get the jobs back in the right order + $job = $reserver->reserve(); + $this->assertEquals('high', $job->queue); + $this->assertEquals('High_Job_1', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('high', $job->queue); + $this->assertEquals('High_Job_2', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('medium', $job->queue); + $this->assertEquals('Medium_Job_1', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('medium', $job->queue); + $this->assertEquals('Medium_Job_2', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('low', $job->queue); + $this->assertEquals('Low_Job_1', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('low', $job->queue); + $this->assertEquals('Low_Job_2', $job->payload['class']); + } +} diff --git a/test/bootstrap.php b/test/bootstrap.php index a4b68377..e08c78fd 100644 --- a/test/bootstrap.php +++ b/test/bootstrap.php @@ -8,7 +8,6 @@ */ $loader = require __DIR__ . '/../vendor/autoload.php'; -$loader->add('Resque_Tests', __DIR__); define('TEST_MISC', realpath(__DIR__ . '/misc/')); define('REDIS_CONF', TEST_MISC . '/redis.conf'); From 3de7e5c0f7ee2df930631b4013d12838edf37b4f Mon Sep 17 00:00:00 2001 From: Ray Ward Date: Thu, 13 Apr 2017 14:56:15 -0700 Subject: [PATCH 03/10] Add RandomQueueOrderReserver. This reserver simply shuffles the list of available queues (whether configured or retrieved dynamically from redis) on each call to reserve() meaning a random queue is used to reserve a job from. --- .../Reserver/RandomQueueOrderReserver.php | 21 +++ .../Reserver/RandomQueueOrderReserverTest.php | 134 ++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 lib/Resque/Reserver/RandomQueueOrderReserver.php create mode 100644 test/Resque/Tests/Reserver/RandomQueueOrderReserverTest.php diff --git a/lib/Resque/Reserver/RandomQueueOrderReserver.php b/lib/Resque/Reserver/RandomQueueOrderReserver.php new file mode 100644 index 00000000..1072243a --- /dev/null +++ b/lib/Resque/Reserver/RandomQueueOrderReserver.php @@ -0,0 +1,21 @@ +assertEquals('RandomQueueOrderReserver', $this->getReserver()->getName()); + } + + public function testWaitAfterReservationAttemptReturnsTrue() + { + $this->assertTrue($this->getReserver()->waitAfterReservationAttempt()); + } + + private function assertQueuesAreShuffled(RandomQueueOrderReserver $reserver, array $queues) + { + // retrieve the queues 20 times + $shuffledQueues = array(); + for ($x = 0; $x < 20; $x++) { + $shuffledQueues[] = $reserver->getQueues(); + } + + $ordered = 0; + foreach ($shuffledQueues as $shuffled) { + // check if the order + if ($shuffled === $queues) { + $ordered++; + } + + // check that the shuffled queues contain all the right elements though + sort($shuffled); + $this->assertEquals($queues, $shuffled); + } + + // if all the shuffled queues were actually returned in sorted order then the shuffling is (unlikely) to be working + $this->assertNotEquals(20, $ordered, "queues were ordered 20 times; queues not shuffled correctly"); + } + + public function testGetQueuesReturnsConfiguredQueuesInShuffledOrder() + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + 'queue_e', + 'queue_f', + ); + + $reserver = $this->getReserver($queues); + + $this->assertQueuesAreShuffled($reserver, $queues); + } + + public function testGetQueuesWithAsterixQueueReturnsAllQueuesFromRedisInShuffledOrder() + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + 'queue_e', + 'queue_f', + ); + + // register queues in redis + foreach ($queues as $queue) { + Resque::redis()->sadd('queues', $queue); + } + + $reserver = $this->getReserver(array('*')); + + $this->assertQueuesAreShuffled($reserver, $queues); + } + + public function testReserverWhenNoJobsEnqueuedReturnsNull() + { + $queues = array( + 'queue_1', + 'queue_2', + 'queue_3', + ); + $this->assertNull($this->getReserver($queues)->reserve()); + } + + public function testReserveReservesJobsFromRandomQueue() + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + 'queue_e', + 'queue_f', + ); + + $reserver = $this->getReserver($queues); + + $jobsPerQueue = 5; + + // enqueue a bunch of jobs in each queue + foreach ($queues as $queue) { + for ($x = 0; $x < $jobsPerQueue; $x++) { + $queuesForAllJobs[] = $queue; + Resque::enqueue($queue, 'Test_Job'); + } + } + + $totalJobs = count($queues) * $jobsPerQueue; + + // track the queue for each reserved job + $reservedQueues = array(); + for ($x = 0; $x < $totalJobs; $x++) { + $job = $reserver->reserve(); + $this->assertNotNull($job); + $reservedQueues[] = $job->queue; + } + + // if jobs are reserved randomly, then $queueOrder shouldn't be ordered + $orderedQueues = $reservedQueues; + sort($orderedQueues); + $this->assertNotEquals($orderedQueues, $reservedQueues, "queues were ordered; queues not shuffled correctly"); + } +} From dbf8a21606f0521c949fed1928c03e904b0afa4d Mon Sep 17 00:00:00 2001 From: Ray Ward Date: Thu, 13 Apr 2017 15:33:37 -0700 Subject: [PATCH 04/10] Add BlockingListPopReserver. This behaves similarly to QueueOrderReserver, but uses the redis blpop command to do a blocking wait for a job to come onto the specified queues. --- .../Reserver/BlockingListPopReserver.php | 59 +++++++++++++++++ .../Reserver/BlockingListPopReserverTest.php | 65 +++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 lib/Resque/Reserver/BlockingListPopReserver.php create mode 100644 test/Resque/Tests/Reserver/BlockingListPopReserverTest.php diff --git a/lib/Resque/Reserver/BlockingListPopReserver.php b/lib/Resque/Reserver/BlockingListPopReserver.php new file mode 100644 index 00000000..3008a77a --- /dev/null +++ b/lib/Resque/Reserver/BlockingListPopReserver.php @@ -0,0 +1,59 @@ +timeout = $timeout; + parent::__construct($logger, $queues); + } + + /** + * {@inheritDoc} + */ + public function reserve() + { + $job = Resque_Job::reserveBlocking($this->getQueues(), $this->timeout); + if ($job) { + $this->logger->info("[{reserver}] Found job on queue '{queue}'", array( + 'queue' => $job->queue, + 'reserver' => $this->getName(), + )); + return $job; + } + return null; + } + + /** + * {@inheritDoc} + */ + public function waitAfterReservationAttempt() + { + return false; + } +} diff --git a/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php b/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php new file mode 100644 index 00000000..caf8fadc --- /dev/null +++ b/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php @@ -0,0 +1,65 @@ +assertFalse($this->getReserver()->waitAfterReservationAttempt()); + } + + public function testReserveCallsBlpopWithTimeout() + { + $timeout = rand(1, 100); + + $queues = array( + 'high', + 'medium', + 'low', + ); + + $redisQueues = array( + 'queue:high', + 'queue:medium', + 'queue:low', + ); + + $payload = array('class' => 'Test_Job'); + $item = array('resque:queue:high', json_encode($payload)); + + $redis = $this->getMockBuilder('\Resque_Redis') + ->disableOriginalConstructor() + ->setMethods(['__call']) + ->getMock(); + + $redis + ->expects($this->once()) + ->method('__call') + ->with($this->equalTo('blpop'), $this->equalTo(array($redisQueues, $timeout))) + ->will($this->returnValue($item)); + + $originalRedis = Resque::$redis; + + Resque::$redis = $redis; + + $job = $this->getReserver($queues, $timeout)->reserve(); + $this->assertEquals('high', $job->queue); + $this->assertEquals($payload, $job->payload); + + Resque::$redis = $originalRedis; + } +} From eeb2d25fc7d96c21fdf329a9b57a510dea567969 Mon Sep 17 00:00:00 2001 From: Ray Ward Date: Thu, 13 Apr 2017 15:47:27 -0700 Subject: [PATCH 05/10] Add ReserverFactory. This creates the various reserver implementations. --- lib/Resque/Reserver/ReserverFactory.php | 92 +++++++++++++++++++ .../Reserver/UnknownReserverException.php | 14 +++ .../Tests/Reserver/ReserverFactoryTest.php | 60 ++++++++++++ 3 files changed, 166 insertions(+) create mode 100644 lib/Resque/Reserver/ReserverFactory.php create mode 100644 lib/Resque/Reserver/UnknownReserverException.php create mode 100644 test/Resque/Tests/Reserver/ReserverFactoryTest.php diff --git a/lib/Resque/Reserver/ReserverFactory.php b/lib/Resque/Reserver/ReserverFactory.php new file mode 100644 index 00000000..cea38e64 --- /dev/null +++ b/lib/Resque/Reserver/ReserverFactory.php @@ -0,0 +1,92 @@ +logger = $logger; + } + + /** + * Creates a reserver given its name in snake case format. + * + * @param string $name + * @return ReserverInterface + * @throws UnknownReserverException + */ + public function createReserverFromName($name, array $queues) + { + $parts = explode('_', $name); + $parts = array_map(function ($word) { + return ucfirst(strtolower($word)); + }, $parts); + + $methodName = 'create' . implode('', $parts) . 'Reserver'; + + if (!method_exists($this, $methodName)) { + throw new UnknownReserverException("Unknown reserver '$name' - could not find factory method $methodName"); + } + + return $this->$methodName($queues); + } + + /** + * Creates the default reserver. + * + * @param array $queues + * @return ReserverInterface + */ + public function createDefaultReserver(array $queues) + { + return $this->createReserverFromName(self::DEFAULT_RESERVER, $queues); + } + + /** + * @param array $queues + * @return QueueOrderReserver + */ + public function createQueueOrderReserver(array $queues) + { + return new QueueOrderReserver($this->logger, $queues); + } + + /** + * @param array $queues + * @return RandomQueueOrderReserver + */ + public function createRandomQueueOrderReserver(array $queues) + { + return new RandomQueueOrderReserver($this->logger, $queues); + } + + /** + * @param array $queues + * @return BlockingListPopReserver + */ + public function createBlockingListPopReserver(array $queues) + { + $timeout = getenv('BPLOP_TIMEOUT'); + if ($timeout === false) { + $timeout = getenv('INTERVAL'); + } + + if ($timeout === false || $timeout < 0) { + $timeout = BlockingListPopReserver::DEFAULT_TIMEOUT; + } + + return new BlockingListPopReserver($this->logger, $queues, (int)$timeout); + } +} diff --git a/lib/Resque/Reserver/UnknownReserverException.php b/lib/Resque/Reserver/UnknownReserverException.php new file mode 100644 index 00000000..ded045fc --- /dev/null +++ b/lib/Resque/Reserver/UnknownReserverException.php @@ -0,0 +1,14 @@ +getFactory()->createReserverFromName('foo', array()); + } + + public function createReserverFromNameDataProvider() + { + return array( + array('queue_order', '\Resque\Reserver\QueueOrderReserver'), + array('RANDOM_QUEUE_ORDER', '\Resque\Reserver\RandomQueueOrderReserver'), + array('Blocking_List_Pop', '\Resque\Reserver\BlockingListPopReserver'), + ); + } + + /** + * @dataProvider createReserverFromNameDataProvider + */ + public function testCreateReserverFromNameCreatesExpectedReserver($name, $expectedReserver) + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + ); + + $reserver = $this->getFactory()->createReserverFromName($name, $queues); + $this->assertInstanceOf($expectedReserver, $reserver); + + // account for shuffling by RandomQueueOrderReserver + $actualQueues = $reserver->getQueues(); + sort($actualQueues); + + $this->assertEquals($queues, $actualQueues); + } + + public function testCreateDefaultReserverCreatesExpectedReserver() + { + $reserver = $this->getFactory()->createDefaultReserver(array()); + $this->assertInstanceOf('\Resque\Reserver\QueueOrderReserver', $reserver); + } +} From 0bdb6c7b6a33cecd909f66abf81c4e53f56af170 Mon Sep 17 00:00:00 2001 From: Ray Ward Date: Thu, 13 Apr 2017 11:29:07 -0700 Subject: [PATCH 06/10] Refactor Resque_Worker to use reservers. Resque_Worker now takes an instance of ReserverInterface as a dependency. The body of the Resque_Worker::reserve() method has been replaced with a call to ReserverInterface::reserve(). Blocking and non-blocking reservation behaviour is now implemented in BlockingListPopReserver and QueueOrderReserver respectively. Logic that decides whether the worker should sleep after an attempt to reserve a job has been replaced with a call to ReserverInterface::waitAfterReservationAttempt(). --- lib/Resque/Worker.php | 100 +++++++++++++--------------- test/Resque/Tests/EventTest.php | 11 ++- test/Resque/Tests/JobStatusTest.php | 13 +++- test/Resque/Tests/JobTest.php | 24 ++++--- test/Resque/Tests/WorkerTest.php | 88 ++++++++++++------------ test/bootstrap.php | 5 ++ 6 files changed, 131 insertions(+), 110 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 04714c1c..efbc96f2 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -1,6 +1,9 @@ logger = new Resque_Log(); + $this->reserver = $reserver; + $this->logger = new Resque_Log(); if(!is_array($queues)) { $queues = array($queues); @@ -76,6 +91,16 @@ public function __construct($queues) $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); } + /** + * Sets the reserver factory instance. Used by the find() method to create worker instances. + * + * @param ReserverFactory $reserverFactory + */ + public static function setReserverFactory(ReserverFactory $reserverFactory) + { + self::$reserverFactory = $reserverFactory; + } + /** * Return all workers known to Resque as instantiated instances. * @return array @@ -119,7 +144,9 @@ public static function find($workerId) list($hostname, $pid, $queues) = explode(':', $workerId, 3); $queues = explode(',', $queues); - $worker = new self($queues); + + $reserver = self::$reserverFactory->createDefaultReserver($queues); + $worker = new self($reserver, $queues); $worker->setId($workerId); return $worker; } @@ -142,7 +169,7 @@ public function setId($workerId) * * @param int $interval How often to check for new jobs across the queues. */ - public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) + public function work($interval = Resque::DEFAULT_INTERVAL) { $this->updateProcLine('Starting'); $this->startup(); @@ -154,36 +181,25 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) // Attempt to find and reserve a job $job = false; - if(!$this->paused) { - if($blocking === true) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); - } else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); - } + if (!$this->paused) { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); - $job = $this->reserve($blocking, $interval); + $job = $this->reserve(); + } else { + $this->updateProcLine('Paused'); } - if(!$job) { + if (!$job) { // For an interval of 0, break now - helps with unit testing etc - if($interval == 0) { + if ($interval == 0) { break; } - if($blocking === false) - { - // If no job was found, we sleep for $interval before continuing and checking again + // If no job was found, we sleep for $interval before continuing and checking again + if ($this->reserver->waitAfterReservationAttempt()) { $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); - if($this->paused) { - $this->updateProcLine('Paused'); - } - else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); - } - - usleep($interval * 1000000); - } + usleep($interval * 1000000); + } continue; } @@ -252,33 +268,11 @@ public function perform(Resque_Job $job) /** * @param bool $blocking * @param int $timeout - * @return object|boolean Instance of Resque_Job if a job is found, false if not. + * @return object|boolean Instance of Resque_Job if a job is found, false if not. */ - public function reserve($blocking = false, $timeout = null) + public function reserve() { - $queues = $this->queues(); - if(!is_array($queues)) { - return; - } - - if($blocking === true) { - $job = Resque_Job::reserveBlocking($queues, $timeout); - if($job) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); - return $job; - } - } else { - foreach($queues as $queue) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); - $job = Resque_Job::reserve($queue); - if($job) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); - return $job; - } - } - } - - return false; + return $this->reserver->reserve() ?: false; } /** diff --git a/test/Resque/Tests/EventTest.php b/test/Resque/Tests/EventTest.php index 6e102cf4..02149e2d 100644 --- a/test/Resque/Tests/EventTest.php +++ b/test/Resque/Tests/EventTest.php @@ -1,4 +1,7 @@ createDefaultReserver(array('jobs')); + // Register a worker to test with - $this->worker = new Resque_Worker('jobs'); - $this->worker->setLogger(new Resque_Log()); + $this->worker = new Resque_Worker($reserver, 'jobs'); + $this->worker->setLogger($logger); $this->worker->registerWorker(); } diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php index d751c37f..accd375e 100644 --- a/test/Resque/Tests/JobStatusTest.php +++ b/test/Resque/Tests/JobStatusTest.php @@ -1,4 +1,7 @@ createDefaultReserver(array('jobs')); + // Register a worker to test with - $this->worker = new Resque_Worker('jobs'); - $this->worker->setLogger(new Resque_Log()); + $this->worker = new Resque_Worker($reserver, 'jobs'); + $this->worker->setLogger($logger); } public function testJobStatusCanBeTracked() @@ -103,4 +110,4 @@ public function testRecreatedJobWithTrackingStillTracksStatus() $newJob = Resque_Job::reserve('jobs'); $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $newJob->getStatus()); } -} \ No newline at end of file +} diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index fb55d13b..3797f6a1 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -1,5 +1,7 @@ createDefaultReserver(array('jobs')); + // Register a worker to test with - $this->worker = new Resque_Worker('jobs'); - $this->worker->setLogger(new Resque_Log()); + $this->worker = new Resque_Worker($reserver, 'jobs'); + $this->worker->setLogger($logger); $this->worker->registerWorker(); } @@ -153,7 +159,7 @@ public function testInvalidJobThrowsException() $job->worker = $this->worker; $job->perform(); } - + public function testJobWithSetUpCallbackFiresSetUp() { $payload = array( @@ -165,10 +171,10 @@ public function testJobWithSetUpCallbackFiresSetUp() ); $job = new Resque_Job('jobs', $payload); $job->perform(); - + $this->assertTrue(Test_Job_With_SetUp::$called); } - + public function testJobWithTearDownCallbackFiresTearDown() { $payload = array( @@ -180,7 +186,7 @@ public function testJobWithTearDownCallbackFiresTearDown() ); $job = new Resque_Job('jobs', $payload); $job->perform(); - + $this->assertTrue(Test_Job_With_TearDown::$called); } @@ -329,7 +335,7 @@ public function testDequeueItemWithArg() $this->assertEquals(Resque::dequeue($queue, $test), 1); #$this->assertEquals(Resque::size($queue), 1); } - + public function testDequeueSeveralItemsWithArgs() { // GIVEN @@ -340,11 +346,11 @@ public function testDequeueSeveralItemsWithArgs() Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs); Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs); $this->assertEquals(Resque::size($queue), 3); - + // WHEN $test = array('Test_Job_Dequeue9' => $removeArgs); $removedItems = Resque::dequeue($queue, $test); - + // THEN $this->assertEquals($removedItems, 2); $this->assertEquals(Resque::size($queue), 1); diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php index 93c0621a..e0115dd3 100644 --- a/test/Resque/Tests/WorkerTest.php +++ b/test/Resque/Tests/WorkerTest.php @@ -1,4 +1,7 @@ createDefaultReserver($queues); + + $worker = new Resque_Worker($reserver, $queues); + $worker->setLogger($logger); + + return $worker; + } + public function testWorkerRegistersInList() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->registerWorker(); // Make sure the worker is in the list @@ -23,8 +44,7 @@ public function testGetAllWorkers() $num = 3; // Register a few workers for($i = 0; $i < $num; ++$i) { - $worker = new Resque_Worker('queue_' . $i); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('queue_' . $i)); $worker->registerWorker(); } @@ -34,8 +54,7 @@ public function testGetAllWorkers() public function testGetWorkerById() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->registerWorker(); $newWorker = Resque_Worker::find((string)$worker); @@ -49,8 +68,7 @@ public function testInvalidWorkerDoesNotExist() public function testWorkerCanUnregister() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->registerWorker(); $worker->unregisterWorker(); @@ -61,8 +79,7 @@ public function testWorkerCanUnregister() public function testPausedWorkerDoesNotPickUpJobs() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->pauseProcessing(); Resque::enqueue('jobs', 'Test_Job'); $worker->work(0); @@ -72,8 +89,7 @@ public function testPausedWorkerDoesNotPickUpJobs() public function testResumedWorkerPicksUpJobs() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->pauseProcessing(); Resque::enqueue('jobs', 'Test_Job'); $worker->work(0); @@ -85,11 +101,10 @@ public function testResumedWorkerPicksUpJobs() public function testWorkerCanWorkOverMultipleQueues() { - $worker = new Resque_Worker(array( + $worker = $this->getWorker(array( 'queue1', - 'queue2' + 'queue2', )); - $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('queue1', 'Test_Job_1'); Resque::enqueue('queue2', 'Test_Job_2'); @@ -103,12 +118,11 @@ public function testWorkerCanWorkOverMultipleQueues() public function testWorkerWorksQueuesInSpecifiedOrder() { - $worker = new Resque_Worker(array( + $worker = $this->getWorker(array( 'high', 'medium', - 'low' + 'low', )); - $worker->setLogger(new Resque_Log()); $worker->registerWorker(); // Queue the jobs in a different order @@ -129,8 +143,7 @@ public function testWorkerWorksQueuesInSpecifiedOrder() public function testWildcardQueueWorkerWorksAllQueues() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->registerWorker(); Resque::enqueue('queue1', 'Test_Job_1'); @@ -145,8 +158,7 @@ public function testWildcardQueueWorkerWorksAllQueues() public function testWorkerDoesNotWorkOnUnknownQueues() { - $worker = new Resque_Worker('queue1'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('queue1')); $worker->registerWorker(); Resque::enqueue('queue2', 'Test_Job'); @@ -156,8 +168,7 @@ public function testWorkerDoesNotWorkOnUnknownQueues() public function testWorkerClearsItsStatusWhenNotWorking() { Resque::enqueue('jobs', 'Test_Job'); - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $job = $worker->reserve(); $worker->workingOn($job); $worker->doneWorking(); @@ -166,8 +177,7 @@ public function testWorkerClearsItsStatusWhenNotWorking() public function testWorkerRecordsWhatItIsWorkingOn() { - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->registerWorker(); $payload = array( @@ -189,8 +199,7 @@ public function testWorkerErasesItsStatsWhenShutdown() Resque::enqueue('jobs', 'Test_Job'); Resque::enqueue('jobs', 'Invalid_Job'); - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->work(0); $worker->work(0); @@ -201,19 +210,16 @@ public function testWorkerErasesItsStatsWhenShutdown() public function testWorkerCleansUpDeadWorkersOnStartup() { // Register a good worker - $goodWorker = new Resque_Worker('jobs'); - $goodWorker->setLogger(new Resque_Log()); + $goodWorker = $this->getWorker(array('jobs')); $goodWorker->registerWorker(); $workerId = explode(':', $goodWorker); // Register some bad workers - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->setId($workerId[0].':1:jobs'); $worker->registerWorker(); - $worker = new Resque_Worker(array('high', 'low')); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('high', 'low')); $worker->setId($workerId[0].':2:high,low'); $worker->registerWorker(); @@ -228,15 +234,13 @@ public function testWorkerCleansUpDeadWorkersOnStartup() public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() { // Register a bad worker on this machine - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $workerId = explode(':', $worker); $worker->setId($workerId[0].':1:jobs'); $worker->registerWorker(); // Register some other false workers - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->setId('my.other.host:1:jobs'); $worker->registerWorker(); @@ -252,8 +256,7 @@ public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() public function testWorkerFailsUncompletedJobsOnExit() { - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->registerWorker(); $payload = array( @@ -269,8 +272,7 @@ public function testWorkerFailsUncompletedJobsOnExit() public function testBlockingListPop() { - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->registerWorker(); Resque::enqueue('jobs', 'Test_Job_1'); @@ -290,4 +292,4 @@ public function testBlockingListPop() $this->assertEquals(2, $i); } -} \ No newline at end of file +} diff --git a/test/bootstrap.php b/test/bootstrap.php index e08c78fd..6e71ceb7 100644 --- a/test/bootstrap.php +++ b/test/bootstrap.php @@ -9,6 +9,8 @@ $loader = require __DIR__ . '/../vendor/autoload.php'; +use Resque\Reserver\ReserverFactory; + define('TEST_MISC', realpath(__DIR__ . '/misc/')); define('REDIS_CONF', TEST_MISC . '/redis.conf'); @@ -36,6 +38,9 @@ Resque::setBackend('localhost:' . $matches[1]); +$reserverFactory = new ReserverFactory(new Resque_Log()); +Resque_Worker::setReserverFactory($reserverFactory); + // Shutdown function killRedis($pid) { From 601f2bec410133ffc59699acb71019e02d31633e Mon Sep 17 00:00:00 2001 From: Ray Ward Date: Thu, 13 Apr 2017 14:11:19 -0700 Subject: [PATCH 07/10] Update bootstrap script to create an appropriate reserver based off the environment. --- bin/resque | 31 ++++++++--- lib/Resque/Reserver/ReserverFactory.php | 26 ++++++++++ .../Tests/Reserver/ReserverFactoryTest.php | 51 +++++++++++++++++++ 3 files changed, 100 insertions(+), 8 deletions(-) diff --git a/bin/resque b/bin/resque index 1d604851..541abad0 100755 --- a/bin/resque +++ b/bin/resque @@ -25,6 +25,9 @@ if (!class_exists('Composer\Autoload\ClassLoader', false)) { ); } +use Resque\Reserver\ReserverFactory; +use Resque\Reserver\UnknownReserverException; + $QUEUE = getenv('QUEUE'); if(empty($QUEUE)) { die("Set QUEUE env var containing the list of queues to work.\n"); @@ -73,7 +76,22 @@ if (!isset($logger) || !is_object($logger)) { $logger = new Resque_Log($logLevel); } -$BLOCKING = getenv('BLOCKING') !== FALSE; +$reserverFactory = new ReserverFactory($logger); +Resque_Worker::setReserverFactory($reserverFactory); + +$queues = explode(',', $QUEUE); +if (!is_array($queues)) { + $queues = array($queues); +} + +$reserver = null; +try { + $reserver = $reserverFactory->createReserverFromEnvironment($queues); +} catch (UnknownReserverException $exception) { + $logger->emergency("Could not create reserver: {error}", ['error' => $exception->getMessage()]); + die; +} + $interval = 5; $INTERVAL = getenv('INTERVAL'); @@ -102,19 +120,17 @@ if($count > 1) { } // Child, start the worker else if(!$pid) { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); + $worker = new Resque_Worker($reserver, $queues); $worker->setLogger($logger); $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); - $worker->work($interval, $BLOCKING); + $worker->work($interval); break; } } } // Start a single worker else { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); + $worker = new Resque_Worker($reserver, $queues); $worker->setLogger($logger); $PIDFILE = getenv('PIDFILE'); @@ -124,6 +140,5 @@ else { } $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); - $worker->work($interval, $BLOCKING); + $worker->work($interval); } -?> diff --git a/lib/Resque/Reserver/ReserverFactory.php b/lib/Resque/Reserver/ReserverFactory.php index cea38e64..fe7213df 100644 --- a/lib/Resque/Reserver/ReserverFactory.php +++ b/lib/Resque/Reserver/ReserverFactory.php @@ -43,6 +43,32 @@ public function createReserverFromName($name, array $queues) return $this->$methodName($queues); } + /** + * Creates a reserver based off the environment configuration. + * + * The following environment vars are checked (in this order): + * - BLOCKING: Creates a BlockingListPopReserver (any non empty value) + * - RESERVER: Creates a reserver specified in snake case format without the reserver suffix, eg. 'random_queue_order' + * + * If neither var is specified, the default resever (QueueOrderReserver) is created. + * + * @param array $queues + * @return ReserverInterface + * @throws UnknownReserverException If the reserver specified in RESERVER could not be found. + */ + public function createReserverFromEnvironment(array $queues) + { + if (!empty(getenv('BLOCKING'))) { + $reserver = $this->createBlockingListPopReserver($queues); + } elseif (getenv('RESERVER') !== false) { + $reserver = $this->createReserverFromName((string)getenv('RESERVER'), $queues); + } else { + $reserver = $this->createDefaultReserver($queues); + } + + return $reserver; + } + /** * Creates the default reserver. * diff --git a/test/Resque/Tests/Reserver/ReserverFactoryTest.php b/test/Resque/Tests/Reserver/ReserverFactoryTest.php index 63330270..e20106a9 100644 --- a/test/Resque/Tests/Reserver/ReserverFactoryTest.php +++ b/test/Resque/Tests/Reserver/ReserverFactoryTest.php @@ -57,4 +57,55 @@ public function testCreateDefaultReserverCreatesExpectedReserver() $reserver = $this->getFactory()->createDefaultReserver(array()); $this->assertInstanceOf('\Resque\Reserver\QueueOrderReserver', $reserver); } + + public function createReserverFromEnvironmentDataProvider() + { + return array( + array(array('BLOCKING=1'), '\Resque\Reserver\BlockingListPopReserver'), + array(array('BLOCKING=0', 'RESERVER=random_queue_order'), '\Resque\Reserver\RandomQueueOrderReserver'), + array(array('BLOCKING=', 'RESERVER=random_queue_order'), '\Resque\Reserver\RandomQueueOrderReserver'), + array(array('RESERVER=Queue_Order'), '\Resque\Reserver\QueueOrderReserver'), + array(array(), '\Resque\Reserver\QueueOrderReserver'), + ); + } + + /** + * @dataProvider createReserverFromEnvironmentDataProvider + */ + public function testCreateReserverFromEnvironmentCreatesExpectedReserver($env, $expectedReserver) + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + ); + + foreach ($env as $var) { + putenv($var); + } + + $reserver = $this->getFactory()->createReserverFromEnvironment($queues); + $this->assertInstanceOf($expectedReserver, $reserver); + + // account for shuffling by RandomQueueOrderReserver + $actualQueues = $reserver->getQueues(); + sort($actualQueues); + + $this->assertEquals($queues, $actualQueues); + + putenv('BLOCKING'); + putenv('RESERVER'); + } + + /** + * @expectedException Resque\Reserver\UnknownReserverException + * @expectedExceptionMessage Unknown reserver 'foobar' + */ + public function testCreateReserverFromEnvironmentThrowsExceptionForUnknownReserver() + { + putenv('RESERVER=foobar'); + $this->getFactory()->createReserverFromEnvironment(array()); + putenv('RESERVER'); + } } From 380e0c7fee2e7fb11bbf3916b05526f3f72e3091 Mon Sep 17 00:00:00 2001 From: Ray Ward Date: Tue, 18 Apr 2017 11:34:41 -0700 Subject: [PATCH 08/10] Re-register the composer autoloader so that Resque from _this_ repo is used and not another version from APP_INCLUDE. Requiring composer from external apps causes their autoloader to register which takes precedence over any prior registered autoloaders. This may cause issues when using an APP_INCLUDE that depends on an older version of Resque which ends up replacing this version. --- bin/resque | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/bin/resque b/bin/resque index 541abad0..4770772d 100755 --- a/bin/resque +++ b/bin/resque @@ -9,15 +9,15 @@ $files = array( __DIR__ . '/../vendor/autoload.php', ); -$found = false; +$loader = null; foreach ($files as $file) { if (file_exists($file)) { - require_once $file; + $loader = require_once $file; break; } } -if (!class_exists('Composer\Autoload\ClassLoader', false)) { +if (!($loader instanceof Composer\Autoload\ClassLoader)) { die( 'You need to set up the project dependencies using the following commands:' . PHP_EOL . 'curl -s http://getcomposer.org/installer | php' . PHP_EOL . @@ -44,10 +44,11 @@ $REDIS_BACKEND = getenv('REDIS_BACKEND'); // A redis database number $REDIS_BACKEND_DB = getenv('REDIS_BACKEND_DB'); if(!empty($REDIS_BACKEND)) { - if (empty($REDIS_BACKEND_DB)) + if (empty($REDIS_BACKEND_DB)) { Resque::setBackend($REDIS_BACKEND); - else + } else { Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB); + } } $logLevel = false; @@ -70,6 +71,9 @@ if($APP_INCLUDE) { require_once $APP_INCLUDE; } +// re-register the composer autoloader so that we use Resque here, in case APP_INCLUDE depends on a different version +$loader->register(true); + // See if the APP_INCLUDE containes a logger object, // If none exists, fallback to internal logger if (!isset($logger) || !is_object($logger)) { From c932e183514fae0ce62d9d27f15450eb0a4ecf9e Mon Sep 17 00:00:00 2001 From: Ray Ward Date: Tue, 18 Apr 2017 15:39:06 -0700 Subject: [PATCH 09/10] Mock redis for blpop tests as hhvm doesn't respect the timeout arg. --- .../Reserver/BlockingListPopReserverTest.php | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php b/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php index caf8fadc..9664db54 100644 --- a/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php +++ b/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php @@ -22,6 +22,42 @@ public function testWaitAfterReservationAttemptReturnsTrue() $this->assertFalse($this->getReserver()->waitAfterReservationAttempt()); } + public function testReserverWhenNoJobsEnqueuedReturnsNull() + { + $queues = array( + 'queue_1', + 'queue_2', + 'queue_3', + ); + + $redisQueues = array( + 'queue:queue_1', + 'queue:queue_2', + 'queue:queue_3', + ); + + // hhvm doesn't respect the timeout arg for blpop, so we need to mock this command + // https://github.com/facebook/hhvm/issues/6286 + $redis = $this->getMockBuilder('\Resque_Redis') + ->disableOriginalConstructor() + ->setMethods(['__call']) + ->getMock(); + + $redis + ->expects($this->once()) + ->method('__call') + ->with($this->equalTo('blpop'), $this->equalTo(array($redisQueues, 1))) + ->will($this->returnValue(null)); + + $originalRedis = Resque::$redis; + + Resque::$redis = $redis; + + $this->assertNull($this->getReserver($queues)->reserve()); + + Resque::$redis = $originalRedis; + } + public function testReserveCallsBlpopWithTimeout() { $timeout = rand(1, 100); From b934212c0b328d44f4be6aec62c35781ef1f258b Mon Sep 17 00:00:00 2001 From: Ray Ward Date: Wed, 19 Apr 2017 14:56:14 -0700 Subject: [PATCH 10/10] Log the reserver in use when starting workers. --- bin/resque | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/resque b/bin/resque index 4770772d..da0f3e9d 100755 --- a/bin/resque +++ b/bin/resque @@ -91,6 +91,7 @@ if (!is_array($queues)) { $reserver = null; try { $reserver = $reserverFactory->createReserverFromEnvironment($queues); + $logger->notice('Using reserver {reserver}', array('reserver' => $reserver->getName())); } catch (UnknownReserverException $exception) { $logger->emergency("Could not create reserver: {error}", ['error' => $exception->getMessage()]); die;