From 0b0f9dc3d30720626dd1a699c522190a57a68e52 Mon Sep 17 00:00:00 2001 From: Richard K Miller Date: Tue, 12 Mar 2013 16:49:20 -0600 Subject: [PATCH 01/27] Make workerPids() work on Windows using WMIC instead of ps --- lib/Resque/Worker.php | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 45852bb..b21b788 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -441,9 +441,18 @@ public function pruneDeadWorkers() public function workerPids() { $pids = array(); - exec('ps -A -o pid,command | grep [r]esque', $cmdOutput); - foreach($cmdOutput as $line) { - list($pids[],) = explode(' ', trim($line), 2); + if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { + exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput); + foreach($cmdOutput as $line) { + $line = preg_replace('/\s+/m', ' ', $line); + list(,,$pids[]) = explode(' ', trim($line), 3); + } + } + else { + exec('ps -A -o pid,command | grep [r]esque', $cmdOutput); + foreach($cmdOutput as $line) { + list($pids[],) = explode(' ', trim($line), 2); + } } return $pids; } From e60a477fdb628dab7eea185a23b7d80a2e488daf Mon Sep 17 00:00:00 2001 From: Richard K Miller Date: Tue, 12 Mar 2013 16:53:23 -0600 Subject: [PATCH 02/27] Set job timestamp to ISO 8601 format so it can be properly 'relatized' by Resque-web --- lib/Resque/Worker.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 45852bb..656ddd6 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -486,7 +486,7 @@ public function workingOn(Resque_Job $job) $job->updateStatus(Resque_Job_Status::STATUS_RUNNING); $data = json_encode(array( 'queue' => $job->queue, - 'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'), + 'run_at' => date('c'), 'payload' => $job->payload )); Resque::redis()->set('worker:' . $job->worker, $data); From 94155244b8f2919a8759b83887bef37779fcdeb6 Mon Sep 17 00:00:00 2001 From: Richard K Miller Date: Tue, 26 Mar 2013 09:38:46 -0600 Subject: [PATCH 03/27] Can't fork on Windows but we can run the job anyway --- lib/Resque/Worker.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index b21b788..d25db23 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -186,8 +186,8 @@ public function work($interval = 5) $this->child = Resque::fork(); - // Forked and we're the child. Run the job. - if ($this->child === 0 || $this->child === false) { + // Forked and we're the child. Or this is Windows. Run the job. + if ($this->child === 0 || $this->child === false || strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $this->updateProcLine($status); $this->log($status, self::LOG_VERBOSE); From b42d8a183470438552e989d306df1e95e61ee464 Mon Sep 17 00:00:00 2001 From: Richard K Miller Date: Wed, 27 Mar 2013 08:32:43 -0600 Subject: [PATCH 04/27] update the timestamp format in another location --- lib/Resque/Failure/Redis.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Resque/Failure/Redis.php b/lib/Resque/Failure/Redis.php index cfac5b6..84cdec4 100644 --- a/lib/Resque/Failure/Redis.php +++ b/lib/Resque/Failure/Redis.php @@ -20,7 +20,7 @@ class Resque_Failure_Redis implements Resque_Failure_Interface public function __construct($payload, $exception, $worker, $queue) { $data = new stdClass; - $data->failed_at = strftime('%a %b %d %H:%M:%S %Z %Y'); + $data->failed_at = date('c'); $data->payload = $payload; $data->exception = get_class($exception); $data->error = $exception->getMessage(); From f1ccf60e4d2f7f9aa613783592b38fc3b15d2922 Mon Sep 17 00:00:00 2001 From: Richard K Miller Date: Wed, 27 Mar 2013 08:32:52 -0600 Subject: [PATCH 05/27] update the timestamp format in another location --- lib/Resque/Worker.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 656ddd6..0ffe79c 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -454,7 +454,7 @@ public function workerPids() public function registerWorker() { Resque::redis()->sadd('workers', (string)$this); - Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y')); + Resque::redis()->set('worker:' . (string)$this . ':started', date('c')); } /** From a84812f0f2bb5019c7b4e6cbfa45fcfbc1f88535 Mon Sep 17 00:00:00 2001 From: Richard K Miller Date: Wed, 27 Mar 2013 08:49:56 -0600 Subject: [PATCH 06/27] even better: test for functionality, not OS, before forking --- lib/Resque/Worker.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index d25db23..e740b59 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -186,8 +186,8 @@ public function work($interval = 5) $this->child = Resque::fork(); - // Forked and we're the child. Or this is Windows. Run the job. - if ($this->child === 0 || $this->child === false || strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { + // Forked and we're the child. Or PCNTL is not installed. Run the job. + if ($this->child === 0 || $this->child === false || !function_exists('pcntl_fork')) { $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $this->updateProcLine($status); $this->log($status, self::LOG_VERBOSE); From 954d093b769b809b707d2f0b098e8eddc2673630 Mon Sep 17 00:00:00 2001 From: Richard K Miller Date: Wed, 27 Mar 2013 08:54:50 -0600 Subject: [PATCH 07/27] the best: Resque::fork already tests for ability to fork, so use that instead --- lib/Resque/Worker.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index e740b59..4a01d7d 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -187,7 +187,7 @@ public function work($interval = 5) $this->child = Resque::fork(); // Forked and we're the child. Or PCNTL is not installed. Run the job. - if ($this->child === 0 || $this->child === false || !function_exists('pcntl_fork')) { + if ($this->child === 0 || $this->child === false || $this->child === -1) { $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $this->updateProcLine($status); $this->log($status, self::LOG_VERBOSE); From 5290fde3ecf0610b405df65218b4ed1cd9f054a3 Mon Sep 17 00:00:00 2001 From: varvar Date: Thu, 31 Dec 2015 10:58:18 +0200 Subject: [PATCH 08/27] Add method to retrieve all the items of a list --- lib/Resque.php | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/lib/Resque.php b/lib/Resque.php index 4729189..ed10f92 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -259,6 +259,20 @@ public static function queues() return $queues; } + /** + * Retrieve all the items of a list with Redis + * + * @return array Array of items. + */ + public static function lrange($list,$start,$stop) + { + $list = self::redis()->lrange($list,$start,$stop ); + if(!is_array($list)) { + $list = array(); + } + return $list; + } + /** * Remove Items from the queue * Safely moving each item to a temporary queue before processing it From f993b9f5dbf230149e51c6891531e9f0eaa59619 Mon Sep 17 00:00:00 2001 From: Jared King Date: Tue, 29 Nov 2016 13:07:23 -0600 Subject: [PATCH 09/27] Catch PHP 7 errors in Resque_Worker::perform() and adhere to PSR-3 when logging exceptions --- lib/Resque/Failure.php | 14 ++++++++++++++ lib/Resque/Job.php | 21 +++++++++++++++------ lib/Resque/Worker.php | 7 ++++++- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/lib/Resque/Failure.php b/lib/Resque/Failure.php index deb678f..066fc6e 100644 --- a/lib/Resque/Failure.php +++ b/lib/Resque/Failure.php @@ -28,6 +28,20 @@ public static function create($payload, Exception $exception, Resque_Worker $wor new $backend($payload, $exception, $worker, $queue); } + /** + * Create a new failed job on the backend from PHP 7 errors. + * + * @param object $payload The contents of the job that has just failed. + * @param \Error $exception The PHP 7 error generated when the job failed to run. + * @param \Resque_Worker $worker Instance of Resque_Worker that was running this job when it failed. + * @param string $queue The name of the queue that this job was fetched from. + */ + public static function createFromError($payload, Error $exception, Resque_Worker $worker, $queue) + { + $backend = self::getBackend(); + new $backend($payload, $exception, $worker, $queue); + } + /** * Return an instance of the backend for saving job failures. * diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 8508f76..558142a 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -220,12 +220,21 @@ public function fail($exception) )); $this->updateStatus(Resque_Job_Status::STATUS_FAILED); - Resque_Failure::create( - $this->payload, - $exception, - $this->worker, - $this->queue - ); + if ($exception instanceof Error) { + Resque_Failure::createFromError( + $this->payload, + $exception, + $this->worker, + $this->queue + ); + } else { + Resque_Failure::create( + $this->payload, + $exception, + $this->worker, + $this->queue + ); + } Resque_Stat::incr('failed'); Resque_Stat::incr('failed:' . $this->worker); } diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 04714c1..221c638 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -240,7 +240,12 @@ public function perform(Resque_Job $job) $job->perform(); } catch(Exception $e) { - $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e)); + $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); + $job->fail($e); + return; + } + catch(Error $e) { + $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); $job->fail($e); return; } From c38f047321da654ffde6fc36361917f5b97f480c Mon Sep 17 00:00:00 2001 From: Mateusz Uzdowski Date: Fri, 31 Mar 2017 14:01:51 +1300 Subject: [PATCH 10/27] Introduce Resque_Job_PID so a process PID can be obtained. --- HOWITWORKS.md | 41 +++++++++++++------------ README.md | 13 ++++++++ lib/Resque/Job/PID.php | 43 ++++++++++++++++++++++++++ lib/Resque/Worker.php | 17 +++++++++++ test/Resque/Tests/JobPIDTest.php | 47 +++++++++++++++++++++++++++++ test/Resque/Tests/JobStatusTest.php | 8 ++--- test/bootstrap.php | 18 +++++++++++ 7 files changed, 164 insertions(+), 23 deletions(-) create mode 100644 lib/Resque/Job/PID.php create mode 100644 test/Resque/Tests/JobPIDTest.php diff --git a/HOWITWORKS.md b/HOWITWORKS.md index ec85fa3..3aa18b4 100644 --- a/HOWITWORKS.md +++ b/HOWITWORKS.md @@ -101,47 +101,50 @@ How do the workers process the queues? 8. `Resque_Job->fail()` returns control to the worker (still in `Resque_Worker::work()`) without a value * Job - 1. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its + 1. `Resque_Job_PID` is created, registering the PID of the actual process + doing the job. + 2. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its only argument. - 2. `Resque_Worker->perform()` sets up a `try...catch` block so it can + 3. `Resque_Worker->perform()` sets up a `try...catch` block so it can properly handle exceptions by marking jobs as failed (by calling `Resque_Job->fail()`, as above) - 3. Inside the `try...catch`, `Resque_Worker->perform()` triggers an + 4. Inside the `try...catch`, `Resque_Worker->perform()` triggers an `afterFork` event - 4. Still inside the `try...catch`, `Resque_Worker->perform()` calls + 5. Still inside the `try...catch`, `Resque_Worker->perform()` calls `Resque_Job->perform()` with no arguments - 5. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no + 6. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no arguments - 6. If `Resque_Job->getInstance()` has already been called, it returns the + 7. If `Resque_Job->getInstance()` has already been called, it returns the existing instance; otherwise: - 7. `Resque_Job->getInstance()` checks that the job's class (type) exists + 8. `Resque_Job->getInstance()` checks that the job's class (type) exists and has a `perform()` method; if not, in either case, it throws an exception which will be caught by `Resque_Worker->perform()` - 8. `Resque_Job->getInstance()` creates an instance of the job's class, and + 9. `Resque_Job->getInstance()` creates an instance of the job's class, and initializes it with a reference to the `Resque_Job` itself, the job's arguments (which it gets by calling `Resque_Job->getArguments()`, which in turn simply returns the value of `args[0]`, or an empty array if no arguments were passed), and the queue name - 9. `Resque_Job->getInstance()` returns control, along with the job class + 10. `Resque_Job->getInstance()` returns control, along with the job class instance, to `Resque_Job->perform()` - 10. `Resque_Job->perform()` sets up its own `try...catch` block to handle + 11. `Resque_Job->perform()` sets up its own `try...catch` block to handle `Resque_Job_DontPerform` exceptions; any other exceptions are passed up to `Resque_Worker->perform()` - 11. `Resque_Job->perform()` triggers a `beforePerform` event - 12. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists - 13. `Resque_Job->perform()` calls `perform()` on the instance - 14. `Resque_Job->perform()` calls `tearDown()` on the instance, if it + 12. `Resque_Job->perform()` triggers a `beforePerform` event + 13. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists + 14. `Resque_Job->perform()` calls `perform()` on the instance + 15. `Resque_Job->perform()` calls `tearDown()` on the instance, if it exists - 15. `Resque_Job->perform()` triggers an `afterPerform` event - 16. The `try...catch` block ends, suppressing `Resque_Job_DontPerform` + 16. `Resque_Job->perform()` triggers an `afterPerform` event + 17. The `try...catch` block ends, suppressing `Resque_Job_DontPerform` exceptions by returning control, and the value `FALSE`, to `Resque_Worker->perform()`; any other situation returns the value `TRUE` along with control, instead - 17. The `try...catch` block in `Resque_Worker->perform()` ends - 18. `Resque_Worker->perform()` updates the job status from `RUNNING` to + 18. The `try...catch` block in `Resque_Worker->perform()` ends + 19. `Resque_Worker->perform()` updates the job status from `RUNNING` to `COMPLETE`, then returns control, with no value, to the worker (again still in `Resque_Worker::work()`) - 19. `Resque_Worker::work()` calls `exit(0)` to terminate the job process + 20. `Resque_Job_PID()` is removed, the forked process will terminate soon + 21. `Resque_Worker::work()` calls `exit(0)` to terminate the job process cleanly * SPECIAL CASE: Non-forking OS (Windows) 1. Same as the job above, except it doesn't call `exit(0)` when done diff --git a/README.md b/README.md index cd8f83f..d00c59d 100644 --- a/README.md +++ b/README.md @@ -193,6 +193,19 @@ or failed, and are then automatically expired. A status can also forcefully be expired by calling the `stop()` method on a status class. +### Obtaining job PID ### + +You can obtain the PID of the actual process doing the work through `Resque_Job_PID`. On a forking OS this will be the +PID of the forked process. + +CAUTION: on a non-forking OS, the PID returned will be of the worker itself. + +```php +echo Resque_Job_PID::get($token); +``` + +Function returns `0` if the `perform` hasn't started yet, or if it has already ended. + ## Workers ## Workers work in the exact same way as the Ruby workers. For complete diff --git a/lib/Resque/Job/PID.php b/lib/Resque/Job/PID.php new file mode 100644 index 0000000..f72710d --- /dev/null +++ b/lib/Resque/Job/PID.php @@ -0,0 +1,43 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job_PID +{ + /** + * Create a new PID tracker item for the supplied job ID. + * + * @param string $id The ID of the job to track the PID of. + */ + public static function create($id) + { + Resque::redis()->set('job:' . $id . ':pid', (string)getmypid()); + } + + /** + * Fetch the PID for the process actually executing the job. + * + * @param string $id The ID of the job to get the PID of. + * + * @return int PID of the process doing the job (on non-forking OS, PID of the worker, otherwise forked PID). + */ + public static function get($id) + { + return (int)Resque::redis()->get('job:' . $id . ':pid'); + } + + /** + * Remove the PID tracker for the job. + * + * @param string $id The ID of the job to remove the tracker from. + */ + public static function del($id) + { + Resque::redis()->del('job:' . $id . ':pid'); + } +} + diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 04714c1..9acce78 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -199,7 +199,17 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $this->updateProcLine($status); $this->logger->log(Psr\Log\LogLevel::INFO, $status); + + if(!empty($job->payload['id'])) { + Resque_Job_PID::create($job->payload['id']); + } + $this->perform($job); + + if(!empty($job->payload['id'])) { + Resque_Job_PID::del($job->payload['id']); + } + if ($this->child === 0) { exit(0); } @@ -394,6 +404,13 @@ public function shutdownNow() $this->killChild(); } + /** + * @return int Child process PID. + */ + public function getChildPID() { + return $this->child; + } + /** * Kill a forked child job immediately. The job it is processing will not * be completed. diff --git a/test/Resque/Tests/JobPIDTest.php b/test/Resque/Tests/JobPIDTest.php new file mode 100644 index 0000000..2d4a93d --- /dev/null +++ b/test/Resque/Tests/JobPIDTest.php @@ -0,0 +1,47 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_JobPIDTest extends Resque_Tests_TestCase +{ + /** + * @var \Resque_Worker + */ + protected $worker; + + public function setUp() + { + parent::setUp(); + + // Register a worker to test with + $this->worker = new Resque_Worker('jobs'); + $this->worker->setLogger(new Resque_Log()); + } + + public function testQueuedJobDoesNotReturnPID() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $this->assertEquals(0, Resque_Job_PID::get($token)); + } + + public function testRunningJobReturnsPID() + { + // Cannot use InProgress_Job on non-forking OS. + if(!function_exists('pcntl_fork')) return; + + $token = Resque::enqueue('jobs', 'InProgress_Job', null, true); + $this->worker->work(0); + $this->assertNotEquals(0, Resque_Job_PID::get($token)); + } + + public function testFinishedJobDoesNotReturnPID() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $this->worker->work(0); + $this->assertEquals(0, Resque_Job_PID::get($token)); + } +} diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php index d751c37..8be7753 100644 --- a/test/Resque/Tests/JobStatusTest.php +++ b/test/Resque/Tests/JobStatusTest.php @@ -8,10 +8,10 @@ */ class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase { - /** - * @var \Resque_Worker - */ - protected $worker; + /** + * @var \Resque_Worker + */ + protected $worker; public function setUp() { diff --git a/test/bootstrap.php b/test/bootstrap.php index a4b6837..d28137f 100644 --- a/test/bootstrap.php +++ b/test/bootstrap.php @@ -109,6 +109,24 @@ public function perform() } } +/** + * This job exits the forked worker process, which simulates the job being (forever) in progress, + * so that we can verify the state of the system for "running jobs". Does not work on a non-forking OS. + * + * CAUTION Use this test job only with Worker::work, i.e. only when you actually trigger the fork in tests. + */ +class InProgress_Job +{ + public function perform() + { + if(!function_exists('pcntl_fork')) { + // We can't lose the worker on a non-forking OS. + throw new Failing_Job_Exception('Do not use InProgress_Job for tests on non-forking OS!'); + } + exit(0); + } +} + class Test_Job_Without_Perform_Method { From e473ef00acc6ffca2dc4b88b7dfe8ff01f6ecb44 Mon Sep 17 00:00:00 2001 From: Hikariii Date: Tue, 11 Apr 2017 11:03:31 +0200 Subject: [PATCH 11/27] use a variable to store process name prefix --- lib/Resque/Worker.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 04714c1..3892812 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -11,6 +11,11 @@ */ class Resque_Worker { + /** + * @var string Prefix for the process name + */ + public static $processPrefix = 'resque-'; + /** * @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface */ @@ -323,7 +328,7 @@ private function startup() */ private function updateProcLine($status) { - $processTitle = 'resque-' . Resque::VERSION . ': ' . $status; + $processTitle = static::$processPrefix . Resque::VERSION . ': ' . $status; if(function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') { cli_set_process_title($processTitle); } From 63ce150db6e524425f8ae4880ef29fbf2a572350 Mon Sep 17 00:00:00 2001 From: Hikariii Date: Thu, 13 Apr 2017 08:40:44 +0200 Subject: [PATCH 12/27] moved setting the process name to setter --- lib/Resque/Worker.php | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 3892812..5ffa7a9 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -14,7 +14,7 @@ class Resque_Worker /** * @var string Prefix for the process name */ - public static $processPrefix = 'resque-'; + private static $processPrefix = 'resque-'; /** * @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface @@ -81,6 +81,15 @@ public function __construct($queues) $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); } + /** + * Set the process prefix of the workers to the given prefix string. + * @param string $prefix The new process prefix + */ + public static function setProcessPrefix($prefix) + { + self::$processPrefix = $prefix; + } + /** * Return all workers known to Resque as instantiated instances. * @return array From 981ef3da53f671c14f8fc61a233c3974f04128a9 Mon Sep 17 00:00:00 2001 From: Damien Tardy-Panis Date: Tue, 9 May 2017 14:23:49 +0200 Subject: [PATCH 13/27] worker process name always includes all queues name this facilitates monitoring of each worker process --- lib/Resque/Worker.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 04714c1..e557430 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -157,9 +157,9 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) if(!$this->paused) { if($blocking === true) { $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); + $this->updateProcLine('Waiting with blocking timeout ' . $interval); } else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + $this->updateProcLine('Waiting with interval ' . $interval); } $job = $this->reserve($blocking, $interval); @@ -179,7 +179,7 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) $this->updateProcLine('Paused'); } else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + $this->updateProcLine('Waiting'); } usleep($interval * 1000000); @@ -323,7 +323,7 @@ private function startup() */ private function updateProcLine($status) { - $processTitle = 'resque-' . Resque::VERSION . ': ' . $status; + $processTitle = 'resque-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status; if(function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') { cli_set_process_title($processTitle); } From bf44f146685770d1eb94a8a4e6b72bbfc7b438d8 Mon Sep 17 00:00:00 2001 From: Javier Spagnoletti Date: Fri, 1 Sep 2017 16:10:37 -0300 Subject: [PATCH 14/27] Add `extra.branch-alias` directive at `composer.json` in order to allow semver installs for development versions --- composer.json | 83 +++++++++++++++++++++++++++------------------------ composer.lock | 23 +++++++------- 2 files changed, 55 insertions(+), 51 deletions(-) diff --git a/composer.json b/composer.json index b12fa29..70cebcc 100644 --- a/composer.json +++ b/composer.json @@ -1,41 +1,46 @@ { - "name": "chrisboulton/php-resque", - "type": "library", - "description": "Redis backed library for creating background jobs and processing them later. Based on resque for Ruby.", - "keywords": ["job", "background", "redis", "resque"], - "homepage": "http://www.github.com/chrisboulton/php-resque/", - "license": "MIT", - "authors": [ - { - "name": "Chris Boulton", - "email": "chris@bigcommerce.com" - } - ], - "repositories": [ - { - "type": "vcs", - "url": "https://github.com/chrisboulton/credis" - } - ], - "require": { - "php": ">=5.3.0", - "ext-pcntl": "*", - "colinmollenhour/credis": "~1.7", - "psr/log": "~1.0" - }, - "suggest": { - "ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.", - "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available." - }, - "require-dev": { - "phpunit/phpunit": "3.7.*" - }, - "bin": [ - "bin/resque" - ], - "autoload": { - "psr-0": { - "Resque": "lib" - } - } + "name": "chrisboulton/php-resque", + "type": "library", + "description": "Redis backed library for creating background jobs and processing them later. Based on resque for Ruby.", + "keywords": ["job", "background", "redis", "resque"], + "homepage": "http://www.github.com/chrisboulton/php-resque/", + "license": "MIT", + "authors": [ + { + "name": "Chris Boulton", + "email": "chris@bigcommerce.com" + } + ], + "repositories": [ + { + "type": "vcs", + "url": "https://github.com/chrisboulton/credis" + } + ], + "require": { + "php": ">=5.3.0", + "ext-pcntl": "*", + "colinmollenhour/credis": "~1.7", + "psr/log": "~1.0" + }, + "suggest": { + "ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.", + "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available." + }, + "require-dev": { + "phpunit/phpunit": "3.7.*" + }, + "bin": [ + "bin/resque" + ], + "autoload": { + "psr-0": { + "Resque": "lib" + } + }, + "extra": { + "branch-alias": { + "dev-master": "1.0-dev" + } + } } diff --git a/composer.lock b/composer.lock index 0f431b9..33d3f26 100644 --- a/composer.lock +++ b/composer.lock @@ -4,8 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", "This file is @generated automatically" ], - "hash": "41124ffd15a15b52947e430b92b8f10f", - "content-hash": "11906622d4e017ff6807c6dff51f208d", + "content-hash": "e813c4b5ecd745892ddc86e410592220", "packages": [ { "name": "colinmollenhour/credis", @@ -44,7 +43,7 @@ ], "description": "Credis is a lightweight interface to the Redis key-value store which wraps the phpredis library when available for better performance.", "homepage": "https://github.com/colinmollenhour/credis", - "time": "2016-03-24 15:50:52" + "time": "2016-03-24T15:50:52+00:00" }, { "name": "psr/log", @@ -82,7 +81,7 @@ "psr", "psr-3" ], - "time": "2012-12-21 11:40:51" + "time": "2012-12-21T11:40:51+00:00" } ], "packages-dev": [ @@ -145,7 +144,7 @@ "testing", "xunit" ], - "time": "2014-09-02 10:13:14" + "time": "2014-09-02T10:13:14+00:00" }, { "name": "phpunit/php-file-iterator", @@ -192,7 +191,7 @@ "filesystem", "iterator" ], - "time": "2015-06-21 13:08:43" + "time": "2015-06-21T13:08:43+00:00" }, { "name": "phpunit/php-text-template", @@ -233,7 +232,7 @@ "keywords": [ "template" ], - "time": "2015-06-21 13:50:34" + "time": "2015-06-21T13:50:34+00:00" }, { "name": "phpunit/php-timer", @@ -277,7 +276,7 @@ "keywords": [ "timer" ], - "time": "2016-05-12 18:03:57" + "time": "2016-05-12T18:03:57+00:00" }, { "name": "phpunit/php-token-stream", @@ -327,7 +326,7 @@ "keywords": [ "tokenizer" ], - "time": "2014-03-03 05:10:30" + "time": "2014-03-03T05:10:30+00:00" }, { "name": "phpunit/phpunit", @@ -400,7 +399,7 @@ "testing", "xunit" ], - "time": "2014-10-17 09:04:17" + "time": "2014-10-17T09:04:17+00:00" }, { "name": "phpunit/phpunit-mock-objects", @@ -449,7 +448,7 @@ "mock", "xunit" ], - "time": "2013-01-13 10:24:48" + "time": "2013-01-13T10:24:48+00:00" }, { "name": "symfony/yaml", @@ -498,7 +497,7 @@ ], "description": "Symfony Yaml Component", "homepage": "https://symfony.com", - "time": "2016-09-02 01:57:56" + "time": "2016-09-02T01:57:56+00:00" } ], "aliases": [], From 19006186289800e98fd4bdcbad803d9657f27808 Mon Sep 17 00:00:00 2001 From: Ben Morris Date: Wed, 3 Jan 2018 16:51:46 -0800 Subject: [PATCH 15/27] Use is_callable instead of method_exists to check for setup/teardown methods `method_exists` will return true even if an object has a protected method; calling it from Resque will then fail. `is_callable` only returns true when the method both exists and can be called externally. --- lib/Resque/Job.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 8508f76..f12fd82 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -187,13 +187,13 @@ public function perform() Resque_Event::trigger('beforePerform', $this); $instance = $this->getInstance(); - if(method_exists($instance, 'setUp')) { + if(is_callable([$instance, 'setUp'])) { $instance->setUp(); } $instance->perform(); - if(method_exists($instance, 'tearDown')) { + if(is_callable([$instance, 'tearDown'])) { $instance->tearDown(); } From 84439b45b22ef52c46d53acd4fe812cf8f451ace Mon Sep 17 00:00:00 2001 From: razonyang Date: Wed, 22 Aug 2018 14:57:50 +0800 Subject: [PATCH 16/27] sleep x seconds if no blocking queue was found --- lib/Resque/Worker.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 04714c1..8f2b9e7 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -262,6 +262,11 @@ public function reserve($blocking = false, $timeout = null) } if($blocking === true) { + if(empty($queues)){ + $this->logger->log(Psr\Log\LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout)); + usleep($timeout * 1000000); + return false; + } $job = Resque_Job::reserveBlocking($queues, $timeout); if($job) { $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); From 23e641dfa8a2bd9b1f50c1602c1429caf64fddfa Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Mon, 10 Dec 2018 19:14:10 -0700 Subject: [PATCH 17/27] Use official Credis Chris's fork is outdated and unnecessary. --- composer.json | 6 ------ 1 file changed, 6 deletions(-) diff --git a/composer.json b/composer.json index 629472d..88c3ee2 100644 --- a/composer.json +++ b/composer.json @@ -27,12 +27,6 @@ "role": "Creator" } ], - "repositories": [ - { - "type": "vcs", - "url": "https://github.com/chrisboulton/credis" - } - ], "require": { "php": ">=5.3.0", "ext-pcntl": "*", From 415eb00b8615afef0c57c4c1ba1726b8884c0c5e Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Mon, 10 Dec 2018 19:00:23 -0700 Subject: [PATCH 18/27] Apply old PR 293 Closes chrisboulton/php-resque#293 --- lib/Resque/Worker.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 3add46e..946a215 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -427,7 +427,7 @@ public function killChild() } $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); - if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { + if(exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); posix_kill($this->child, SIGKILL); $this->child = null; @@ -471,7 +471,7 @@ public function pruneDeadWorkers() public function workerPids() { $pids = array(); - exec('ps -A -o pid,command | grep [r]esque', $cmdOutput); + exec('ps -A -o pid,args | grep [r]esque', $cmdOutput); foreach($cmdOutput as $line) { list($pids[],) = explode(' ', trim($line), 2); } From f87b00223eb86f54ed550a84c0bd14e679ddf071 Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Mon, 10 Dec 2018 19:47:38 -0700 Subject: [PATCH 19/27] Apply old PR 134 Closes chrisboulton/php-resque#134 --- lib/Resque/Job.php | 11 ++++++----- lib/Resque/Job/Status.php | 39 +++++++++++++++++++++++++++++++++++++-- lib/Resque/Worker.php | 5 +++-- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 8508f76..e8f57f0 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -123,14 +123,14 @@ public static function reserveBlocking(array $queues, $timeout = null) * * @param int $status Status constant from Resque_Job_Status indicating the current status of a job. */ - public function updateStatus($status) + public function updateStatus($status, $result = null) { if(empty($this->payload['id'])) { return; } $statusInstance = new Resque_Job_Status($this->payload['id']); - $statusInstance->update($status); + $statusInstance->update($status, $result); } /** @@ -183,6 +183,7 @@ public function getInstance() */ public function perform() { + $result = true; try { Resque_Event::trigger('beforePerform', $this); @@ -191,7 +192,7 @@ public function perform() $instance->setUp(); } - $instance->perform(); + $result = $instance->perform(); if(method_exists($instance, 'tearDown')) { $instance->tearDown(); @@ -201,10 +202,10 @@ public function perform() } // beforePerform/setUp have said don't perform this job. Return. catch(Resque_Job_DontPerform $e) { - return false; + $result = false; } - return true; + return $result; } /** diff --git a/lib/Resque/Job/Status.php b/lib/Resque/Job/Status.php index 00fc40c..e61cb6f 100644 --- a/lib/Resque/Job/Status.php +++ b/lib/Resque/Job/Status.php @@ -84,7 +84,7 @@ public function isTracking() * * @param int The status of the job (see constants in Resque_Job_Status) */ - public function update($status) + public function update($status, $result = null) { if(!$this->isTracking()) { return; @@ -93,6 +93,7 @@ public function update($status) $statusPacket = array( 'status' => $status, 'updated' => time(), + 'result' => $result, ); Resque::redis()->set((string)$this, json_encode($statusPacket)); @@ -105,7 +106,7 @@ public function update($status) /** * Fetch the status for the job being monitored. * - * @return mixed False if the status is not being monitored, otherwise the status as + * @return mixed False if the status is not being monitored, otherwise the status * as an integer, based on the Resque_Job_Status constants. */ public function get() @@ -122,6 +123,40 @@ public function get() return $statusPacket['status']; } + /** + * Fetch the result of the job being monitored. + * + * @return mixed False if the job is not being monitored, otherwise the result + * as mixed + */ + public function getResult() + { + if(!$this->isTracking()) { + return false; + } + + $statusPacket = json_decode(Resque::redis()->get((string)$this), true); + if(!$statusPacket) { + return false; + } + + return $statusPacket['result']; + } + + /** + * Delete the job monitoring from the queue + * + * @return boolean|int + */ + public function del() + { + if(!$this->isTracking()) { + return false; + } + + return Resque::redis()->del((string)$this); + } + /** * Stop tracking the status of a job. */ diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 178b33d..74a567d 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -250,9 +250,10 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) */ public function perform(Resque_Job $job) { + $result = null; try { Resque_Event::trigger('afterFork', $job); - $job->perform(); + $result = $job->perform(); } catch(Exception $e) { $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e)); @@ -260,7 +261,7 @@ public function perform(Resque_Job $job) return; } - $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); + $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result); $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); } From e52ee4cc0a1fb45ba909baeba8a1403410f6abfb Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Mon, 10 Dec 2018 20:14:14 -0700 Subject: [PATCH 20/27] Apply old PR 135 Closes chrisboulton/php-resque#135 --- lib/Resque.php | 5 +-- lib/Resque/Job.php | 20 ++++++----- lib/Resque/Job/Status.php | 71 +++++++++++++++++++-------------------- 3 files changed, 49 insertions(+), 47 deletions(-) diff --git a/lib/Resque.php b/lib/Resque.php index d03b2ec..06748a3 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -213,10 +213,11 @@ public static function size($queue) * @param string $class The name of the class that contains the code to execute the job. * @param array $args Any optional arguments that should be passed when the job is executed. * @param boolean $trackStatus Set to true to be able to monitor the status of a job. + * @param string $prefix The prefix needs to be set for the status key * * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue */ - public static function enqueue($queue, $class, $args = null, $trackStatus = false) + public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "") { $id = Resque::generateJobId(); $hookParams = array( @@ -232,7 +233,7 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals return false; } - Resque_Job::create($queue, $class, $args, $trackStatus, $id); + Resque_Job::create($queue, $class, $args, $trackStatus, $id, $prefix); Resque_Event::trigger('afterEnqueue', $hookParams); return $id; diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index e8f57f0..fa5b133 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -53,11 +53,12 @@ public function __construct($queue, $payload) * @param array $args Any optional arguments that should be passed when the job is executed. * @param boolean $monitor Set to true to be able to monitor the status of a job. * @param string $id Unique identifier for tracking the job. Generated if not supplied. + * @param string $prefix The prefix needs to be set for the status key * * @return string * @throws \InvalidArgumentException */ - public static function create($queue, $class, $args = null, $monitor = false, $id = null) + public static function create($queue, $class, $args = null, $monitor = false, $id = null, $prefix = "") { if (is_null($id)) { $id = Resque::generateJobId(); @@ -69,14 +70,15 @@ public static function create($queue, $class, $args = null, $monitor = false, $i ); } Resque::push($queue, array( - 'class' => $class, - 'args' => array($args), - 'id' => $id, + 'class' => $class, + 'args' => array($args), + 'id' => $id, + 'prefix' => $prefix, 'queue_time' => microtime(true), )); if($monitor) { - Resque_Job_Status::create($id); + Resque_Job_Status::create($id, $prefix); } return $id; @@ -129,7 +131,7 @@ public function updateStatus($status, $result = null) return; } - $statusInstance = new Resque_Job_Status($this->payload['id']); + $statusInstance = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); $statusInstance->update($status, $result); } @@ -140,7 +142,7 @@ public function updateStatus($status, $result = null) */ public function getStatus() { - $status = new Resque_Job_Status($this->payload['id']); + $status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); return $status->get(); } @@ -237,13 +239,13 @@ public function fail($exception) */ public function recreate() { - $status = new Resque_Job_Status($this->payload['id']); + $status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); $monitor = false; if($status->isTracking()) { $monitor = true; } - return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor); + return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor, $this->payload['prefix']); } /** diff --git a/lib/Resque/Job/Status.php b/lib/Resque/Job/Status.php index e61cb6f..6bc7746 100644 --- a/lib/Resque/Job/Status.php +++ b/lib/Resque/Job/Status.php @@ -13,6 +13,11 @@ class Resque_Job_Status const STATUS_FAILED = 3; const STATUS_COMPLETE = 4; + /** + * @var string The prefix of the job status id. + */ + private $prefix; + /** * @var string The ID of the job this status class refers back to. */ @@ -37,9 +42,10 @@ class Resque_Job_Status * * @param string $id The ID of the job to manage the status for. */ - public function __construct($id) + public function __construct($id, $prefix = '') { $this->id = $id; + $this->prefix = empty($prefix) ? '' : "${prefix}_"; } /** @@ -48,14 +54,18 @@ public function __construct($id) * * @param string $id The ID of the job to monitor the status of. */ - public static function create($id) + public static function create($id, $prefix = "") { + $status = new self($id, $prefix); $statusPacket = array( - 'status' => self::STATUS_WAITING, + 'status' => self::STATUS_WAITING, 'updated' => time(), 'started' => time(), + 'result' => null, ); - Resque::redis()->set('job:' . $id . ':status', json_encode($statusPacket)); + Resque::redis()->set((string) $status, json_encode($statusPacket)); + + return $status; } /** @@ -91,9 +101,10 @@ public function update($status, $result = null) } $statusPacket = array( - 'status' => $status, + 'status' => $status, 'updated' => time(), - 'result' => $result, + 'started' => $this->getValue('started'), + 'result' => $result, ); Resque::redis()->set((string)$this, json_encode($statusPacket)); @@ -104,12 +115,12 @@ public function update($status, $result = null) } /** - * Fetch the status for the job being monitored. + * Fetch a value from the status packet for the job being monitored. * - * @return mixed False if the status is not being monitored, otherwise the status - * as an integer, based on the Resque_Job_Status constants. + * @return mixed False if the status is not being monitored, otherwise the + * requested value from the status packet. */ - public function get() + protected function getValue($value = null) { if(!$this->isTracking()) { return false; @@ -120,7 +131,18 @@ public function get() return false; } - return $statusPacket['status']; + return empty($value) ? $statusPacket : $statusPacket[$value]; + } + + /** + * Fetch the status for the job being monitored. + * + * @return mixed False if the status is not being monitored, otherwise the status + * as an integer, based on the Resque_Job_Status constants. + */ + public function get() + { + return $this->getValue('status'); } /** @@ -131,32 +153,9 @@ public function get() */ public function getResult() { - if(!$this->isTracking()) { - return false; - } - - $statusPacket = json_decode(Resque::redis()->get((string)$this), true); - if(!$statusPacket) { - return false; - } - - return $statusPacket['result']; + return $this->getValue('result'); } - /** - * Delete the job monitoring from the queue - * - * @return boolean|int - */ - public function del() - { - if(!$this->isTracking()) { - return false; - } - - return Resque::redis()->del((string)$this); - } - /** * Stop tracking the status of a job. */ @@ -172,6 +171,6 @@ public function stop() */ public function __toString() { - return 'job:' . $this->id . ':status'; + return 'job:' . $this->prefix . $this->id . ':status'; } } From dec0d9c6109e171b996fbd50cae495e20586eee0 Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Mon, 10 Dec 2018 20:44:48 -0700 Subject: [PATCH 21/27] Apply old PR 210 Closes chrisboulton/php-resque#210 --- lib/Resque/Job/Status.php | 86 ++++++++++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 19 deletions(-) diff --git a/lib/Resque/Job/Status.php b/lib/Resque/Job/Status.php index 6bc7746..249ba8c 100644 --- a/lib/Resque/Job/Status.php +++ b/lib/Resque/Job/Status.php @@ -96,14 +96,20 @@ public function isTracking() */ public function update($status, $result = null) { + $status = (int) $status; + if(!$this->isTracking()) { return; } + if($status < self::STATUS_WAITING || $status > self::STATUS_COMPLETE) { + return; + } + $statusPacket = array( 'status' => $status, 'updated' => time(), - 'started' => $this->getValue('started'), + 'started' => $this->fetch('started'), 'result' => $result, ); Resque::redis()->set((string)$this, json_encode($statusPacket)); @@ -115,23 +121,14 @@ public function update($status, $result = null) } /** - * Fetch a value from the status packet for the job being monitored. + * Fetch the status for the job being monitored. * - * @return mixed False if the status is not being monitored, otherwise the - * requested value from the status packet. + * @return mixed False if the status is not being monitored, otherwise the status + * as an integer, based on the Resque_Job_Status constants. */ - protected function getValue($value = null) + public function get() { - if(!$this->isTracking()) { - return false; - } - - $statusPacket = json_decode(Resque::redis()->get((string)$this), true); - if(!$statusPacket) { - return false; - } - - return empty($value) ? $statusPacket : $statusPacket[$value]; + return $this->status(); } /** @@ -140,20 +137,42 @@ protected function getValue($value = null) * @return mixed False if the status is not being monitored, otherwise the status * as an integer, based on the Resque_Job_Status constants. */ - public function get() + public function status() { - return $this->getValue('status'); + return $this->fetch('status'); } + /** + * Fetch the last update timestamp of the job being monitored. + * + * @return mixed False if the job is not being monitored, otherwise the + * update timestamp. + */ + public function updated() + { + return $this->fetch('updated'); + } + + /** + * Fetch the start timestamp of the job being monitored. + * + * @return mixed False if the job is not being monitored, otherwise the + * start timestamp. + */ + public function started() + { + return $this->fetch('started'); + } + /** * Fetch the result of the job being monitored. * * @return mixed False if the job is not being monitored, otherwise the result * as mixed */ - public function getResult() + public function result() { - return $this->getValue('result'); + return $this->fetch('result'); } /** @@ -173,4 +192,33 @@ public function __toString() { return 'job:' . $this->prefix . $this->id . ':status'; } + + /** + * Fetch a value from the status packet for the job being monitored. + * + * @return mixed False if the status is not being monitored, otherwise the + * requested value from the status packet. + */ + protected function fetch($value = null) + { + if(!$this->isTracking()) { + return false; + } + + $statusPacket = json_decode(Resque::redis()->get((string)$this), true); + if(!$statusPacket) { + return false; + } + + if(empty($value)) { + return $statusPacket; + } else { + if(isset($statusPacket[$value])) { + return $statusPacket[$value]; + } else { + return null; + } + } + + } } From 642199844114d8c0a8cb325e3a18ece7a08769b1 Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Mon, 10 Dec 2018 21:00:38 -0700 Subject: [PATCH 22/27] Apply old PR 243 Closes chrisboulton/php-resque#243 --- lib/Resque/Worker.php | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 74a567d..9e927bd 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -157,6 +157,18 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) break; } + // is redis still alive? + try { + if (Resque::redis()->ping() === false) { + throw new CredisException('redis ping() failed'); + } + } catch (CredisException $e) { + $this->logger->log(Psr\Log\LogLevel::ERROR, 'redis went away. trying to reconnect'); + Resque::$redis = null; + usleep($interval * 1000000); + continue; + } + // Attempt to find and reserve a job $job = false; if(!$this->paused) { From bb25f4bb4713011b67b55f01646e734b63107fa4 Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Mon, 10 Dec 2018 21:05:13 -0700 Subject: [PATCH 23/27] Apply old PR 276 Closes chrisboulton/php-resque#276 --- demo/resque.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demo/resque.php b/demo/resque.php index fcfe578..05d3da6 100644 --- a/demo/resque.php +++ b/demo/resque.php @@ -4,4 +4,4 @@ require 'job.php'; require 'php_error_job.php'; -require '../bin/resque'; \ No newline at end of file +require __DIR__ . '/../bin/resque'; From 28d46d5abe41cd7283630be22416d5be3bcfb281 Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Mon, 10 Dec 2018 21:43:32 -0700 Subject: [PATCH 24/27] Apply old PR 328 Closes chrisboulton/php-resque#328 --- lib/Resque.php | 13 ++++++++++++- lib/Resque/Worker.php | 4 ++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/lib/Resque.php b/lib/Resque.php index e81476a..f33c15b 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -28,6 +28,11 @@ class Resque */ protected static $redisDatabase = 0; + /** + * @var string auth of Redis database + */ + protected static $auth; + /** * Given a host/port combination separated by a colon, set it as * the redis server that Resque will talk to. @@ -37,11 +42,13 @@ class Resque * and returns a Resque_Redis instance, or * a nested array of servers with host/port pairs. * @param int $database + * @param string $auth */ - public static function setBackend($server, $database = 0) + public static function setBackend($server, $database = 0, $auth = null) { self::$redisServer = $server; self::$redisDatabase = $database; + self::$auth = $auth; self::$redis = null; } @@ -62,6 +69,10 @@ public static function redis() self::$redis = new Resque_Redis(self::$redisServer, self::$redisDatabase); } + if (!empty(self::$auth)) { + self::$redis->auth(self::$auth); + } + return self::$redis; } diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index e2c7224..6e97fad 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -152,6 +152,10 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) $this->updateProcLine('Starting'); $this->startup(); + if(function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + while(true) { if($this->shutdown) { break; From 5212318be620914302b51ca3c0d07191183ee481 Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Tue, 11 Dec 2018 00:11:16 -0700 Subject: [PATCH 25/27] Add tests for PHP 7.2 and 7.3 Allow those to fail, as we expect them to until we upgrade PHPUnit to one compatible with 7.2+. Also allow HHVM to fail, because we're going to be deprecating it soon anyway. Closes chrisboulton/php-resque#358 --- .travis.yml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9bb4841..069d58b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,19 @@ language: php php: - - 5.6 - - 7.0 - - 7.1 + - '7.3' + - '7.2' + - '7.1' + - '7.0' + - '5.6' - hhvm matrix: exclude: - php: hhvm env: ENABLE_REDIS_EXT=1 + allow_failures: + - php: '7.3' + - php: '7.2' + - php: hhvm env: - ENABLE_REDIS_EXT=0 - ENABLE_REDIS_EXT=1 From 6ff26b9c2c01ffd236f6b503bbd9247fd7bf9954 Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Tue, 11 Dec 2018 00:26:59 -0700 Subject: [PATCH 26/27] Apply old PR 367 Closes chrisboulton/php-resque#367 --- lib/Resque/Redis.php | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index 153bd40..cf05444 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -113,12 +113,15 @@ public static function prefix($namespace) public function __construct($server, $database = null, $client = null) { try { - if (is_array($server)) { - $this->driver = new Credis_Cluster($server); - } - else if (is_object($client)) { + if (is_object($client)) { $this->driver = $client; } + elseif (is_object($server)) { + $this->driver = $server; + } + elseif (is_array($server)) { + $this->driver = new Credis_Cluster($server); + } else { list($host, $port, $dsnDatabase, $user, $password, $options) = self::parseDsn($server); // $user is not used, only $password From d8308512b73cb2f26a337a1ed5f3f1a71e18983f Mon Sep 17 00:00:00 2001 From: Dan Hunsaker Date: Tue, 11 Dec 2018 01:00:35 -0700 Subject: [PATCH 27/27] Apply old PR 189 Closes chrisboulton/php-resque#189 --- lib/Resque/Worker.php | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 2408445..7935cec 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -257,7 +257,14 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) $this->logger->log(Psr\Log\LogLevel::INFO, $status); // Wait until the child process finishes before continuing - pcntl_wait($status); + while (pcntl_wait($status, WNOHANG) === 0) { + if(function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + + // Pause for a half a second to conserve system resources + usleep(500000); + } if (pcntl_wifexited($status) !== true) { $job->fail(new Resque_Job_DirtyExitException('Job exited abnormally'));