Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/Resque.php
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ public static function size($queue)
*
* @return string
*/
public static function enqueue($queue, $class, $args = null, $trackStatus = false)
public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "")
{
$result = Resque_Job::create($queue, $class, $args, $trackStatus);
$result = Resque_Job::create($queue, $class, $args, $trackStatus, $prefix);
if ($result) {
Resque_Event::trigger('afterEnqueue', array(
'class' => $class,
Expand Down
20 changes: 11 additions & 9 deletions lib/Resque/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public function __construct($queue, $payload)
*
* @return string
*/
public static function create($queue, $class, $args = null, $monitor = false)
public static function create($queue, $class, $args = null, $monitor = false, $prefix = "")
{
if($args !== null && !is_array($args)) {
throw new InvalidArgumentException(
Expand All @@ -62,10 +62,11 @@ public static function create($queue, $class, $args = null, $monitor = false)
'class' => $class,
'args' => array($args),
'id' => $id,
'prefix' => $prefix
));

if($monitor) {
Resque_Job_Status::create($id);
Resque_Job_Status::create($id, $prefix);
}

return $id;
Expand Down Expand Up @@ -112,14 +113,14 @@ public static function reserveBlocking(array $queues, $timeout = null)
*
* @param int $status Status constant from Resque_Job_Status indicating the current status of a job.
*/
public function updateStatus($status)
public function updateStatus($status, $result = "")
{
if(empty($this->payload['id'])) {
return;
}

$statusInstance = new Resque_Job_Status($this->payload['id']);
$statusInstance->update($status);
$statusInstance = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']);
$statusInstance->update($status, $result);
}

/**
Expand All @@ -129,7 +130,7 @@ public function updateStatus($status)
*/
public function getStatus()
{
$status = new Resque_Job_Status($this->payload['id']);
$status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']);
return $status->get();
}

Expand Down Expand Up @@ -186,6 +187,7 @@ public function getInstance()
*/
public function perform()
{
$result = true;
$instance = $this->getInstance();
try {
Resque_Event::trigger('beforePerform', $this);
Expand All @@ -194,7 +196,7 @@ public function perform()
$instance->setUp();
}

$instance->perform();
$result = $instance->perform();

if(method_exists($instance, 'tearDown')) {
$instance->tearDown();
Expand All @@ -207,7 +209,7 @@ public function perform()
return false;
}

return true;
return $result;
}

/**
Expand Down Expand Up @@ -239,7 +241,7 @@ public function fail($exception)
*/
public function recreate()
{
$status = new Resque_Job_Status($this->payload['id']);
$status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']);
$monitor = false;
if($status->isTracking()) {
$monitor = true;
Expand Down
33 changes: 27 additions & 6 deletions lib/Resque/Job/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,20 @@ class Resque_Job_Status
self::STATUS_COMPLETE
);

/**
* @var string prefix for the key
*/
private $prefix = "";

/**
* Setup a new instance of the job monitor class for the supplied job ID.
*
* @param string $id The ID of the job to manage the status for.
*/
public function __construct($id)
public function __construct($id, $prefix = "")
{
$this->id = $id;
$this->prefix = $prefix;
}

/**
Expand All @@ -48,14 +54,14 @@ public function __construct($id)
*
* @param string $id The ID of the job to monitor the status of.
*/
public static function create($id)
public static function create($id, $prefix = "")
{
$statusPacket = array(
'status' => self::STATUS_WAITING,
'updated' => time(),
'started' => time(),
);
Resque::redis()->set('job:' . $id . ':status', json_encode($statusPacket));
Resque::redis()->set($prefix . ':job:' . $id . ':status', json_encode($statusPacket));
}

/**
Expand Down Expand Up @@ -84,7 +90,7 @@ public function isTracking()
*
* @param int The status of the job (see constants in Resque_Job_Status)
*/
public function update($status)
public function update($status, $result = "")
{
if(!$this->isTracking()) {
return;
Expand All @@ -93,6 +99,7 @@ public function update($status)
$statusPacket = array(
'status' => $status,
'updated' => time(),
'result' => $result
);
Resque::redis()->set((string)$this, json_encode($statusPacket));

Expand All @@ -119,8 +126,22 @@ public function get()
return false;
}

return $statusPacket['status'];
return $statusPacket;
}

/**
* Delete the job monitoring from the queue
*
* @return boolean/int
*/
public function del()
{
if(!$this->isTracking()) {
return false;
}

return Resque::redis()->del((string)$this);
}

/**
* Stop tracking the status of a job.
Expand All @@ -137,7 +158,7 @@ public function stop()
*/
public function __toString()
{
return 'job:' . $this->id . ':status';
return $this->prefix . ':job:' . $this->id . ':status';
}
}
?>
5 changes: 3 additions & 2 deletions lib/Resque/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,18 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
*/
public function perform(Resque_Job $job)
{
$result = "";
try {
Resque_Event::trigger('afterFork', $job);
$job->perform();
$result = $job->perform();
}
catch(Exception $e) {
$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage()));
$job->fail($e);
return;
}

$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result);
$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job));
}

Expand Down