From 0ab8750935e53ade9b94acaec3e65021f726d1ea Mon Sep 17 00:00:00 2001 From: Ashish Tilara Date: Mon, 9 Sep 2013 16:37:22 +1000 Subject: [PATCH] Keeping track of Process status as well as result - if the "Perform()" function returns result, store it along with process status - return process status+result in Resque_Job_Status->get() - allow user to delete the status from redis Prefix while enqueuing - to add one more level of saperation, (e.g. If you have a class between application and resque, the class can handle this, so a prefix can be userid or something in background, so one user do not access another user's queue status items) --- lib/Resque.php | 4 ++-- lib/Resque/Job.php | 20 +++++++++++--------- lib/Resque/Job/Status.php | 33 +++++++++++++++++++++++++++------ lib/Resque/Worker.php | 5 +++-- 4 files changed, 43 insertions(+), 19 deletions(-) diff --git a/lib/Resque.php b/lib/Resque.php index e01ab750..2e78f247 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -181,9 +181,9 @@ public static function size($queue) * * @return string */ - public static function enqueue($queue, $class, $args = null, $trackStatus = false) + public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "") { - $result = Resque_Job::create($queue, $class, $args, $trackStatus); + $result = Resque_Job::create($queue, $class, $args, $trackStatus, $prefix); if ($result) { Resque_Event::trigger('afterEnqueue', array( 'class' => $class, diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index ddc37d04..458d3ef7 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -50,7 +50,7 @@ public function __construct($queue, $payload) * * @return string */ - public static function create($queue, $class, $args = null, $monitor = false) + public static function create($queue, $class, $args = null, $monitor = false, $prefix = "") { if($args !== null && !is_array($args)) { throw new InvalidArgumentException( @@ -62,10 +62,11 @@ public static function create($queue, $class, $args = null, $monitor = false) 'class' => $class, 'args' => array($args), 'id' => $id, + 'prefix' => $prefix )); if($monitor) { - Resque_Job_Status::create($id); + Resque_Job_Status::create($id, $prefix); } return $id; @@ -112,14 +113,14 @@ public static function reserveBlocking(array $queues, $timeout = null) * * @param int $status Status constant from Resque_Job_Status indicating the current status of a job. */ - public function updateStatus($status) + public function updateStatus($status, $result = "") { if(empty($this->payload['id'])) { return; } - $statusInstance = new Resque_Job_Status($this->payload['id']); - $statusInstance->update($status); + $statusInstance = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); + $statusInstance->update($status, $result); } /** @@ -129,7 +130,7 @@ public function updateStatus($status) */ public function getStatus() { - $status = new Resque_Job_Status($this->payload['id']); + $status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); return $status->get(); } @@ -186,6 +187,7 @@ public function getInstance() */ public function perform() { + $result = true; $instance = $this->getInstance(); try { Resque_Event::trigger('beforePerform', $this); @@ -194,7 +196,7 @@ public function perform() $instance->setUp(); } - $instance->perform(); + $result = $instance->perform(); if(method_exists($instance, 'tearDown')) { $instance->tearDown(); @@ -207,7 +209,7 @@ public function perform() return false; } - return true; + return $result; } /** @@ -239,7 +241,7 @@ public function fail($exception) */ public function recreate() { - $status = new Resque_Job_Status($this->payload['id']); + $status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); $monitor = false; if($status->isTracking()) { $monitor = true; diff --git a/lib/Resque/Job/Status.php b/lib/Resque/Job/Status.php index ffa351ba..bad381e4 100644 --- a/lib/Resque/Job/Status.php +++ b/lib/Resque/Job/Status.php @@ -32,14 +32,20 @@ class Resque_Job_Status self::STATUS_COMPLETE ); + /** + * @var string prefix for the key + */ + private $prefix = ""; + /** * Setup a new instance of the job monitor class for the supplied job ID. * * @param string $id The ID of the job to manage the status for. */ - public function __construct($id) + public function __construct($id, $prefix = "") { $this->id = $id; + $this->prefix = $prefix; } /** @@ -48,14 +54,14 @@ public function __construct($id) * * @param string $id The ID of the job to monitor the status of. */ - public static function create($id) + public static function create($id, $prefix = "") { $statusPacket = array( 'status' => self::STATUS_WAITING, 'updated' => time(), 'started' => time(), ); - Resque::redis()->set('job:' . $id . ':status', json_encode($statusPacket)); + Resque::redis()->set($prefix . ':job:' . $id . ':status', json_encode($statusPacket)); } /** @@ -84,7 +90,7 @@ public function isTracking() * * @param int The status of the job (see constants in Resque_Job_Status) */ - public function update($status) + public function update($status, $result = "") { if(!$this->isTracking()) { return; @@ -93,6 +99,7 @@ public function update($status) $statusPacket = array( 'status' => $status, 'updated' => time(), + 'result' => $result ); Resque::redis()->set((string)$this, json_encode($statusPacket)); @@ -119,8 +126,22 @@ public function get() return false; } - return $statusPacket['status']; + return $statusPacket; } + + /** + * Delete the job monitoring from the queue + * + * @return boolean/int + */ + public function del() + { + if(!$this->isTracking()) { + return false; + } + + return Resque::redis()->del((string)$this); + } /** * Stop tracking the status of a job. @@ -137,7 +158,7 @@ public function stop() */ public function __toString() { - return 'job:' . $this->id . ':status'; + return $this->prefix . ':job:' . $this->id . ':status'; } } ?> \ No newline at end of file diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index d94aef54..f55e4207 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -236,9 +236,10 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) */ public function perform(Resque_Job $job) { + $result = ""; try { Resque_Event::trigger('afterFork', $job); - $job->perform(); + $result = $job->perform(); } catch(Exception $e) { $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage())); @@ -246,7 +247,7 @@ public function perform(Resque_Job $job) return; } - $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); + $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result); $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); }