From a5958fbcbf369792b9072bb1942751249fd05c98 Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Wed, 15 Aug 2018 15:34:49 +0900 Subject: [PATCH 01/10] Add a configuration parameter "pollingDuration" --- src/Snidel/Config.php | 2 ++ src/Snidel/Fork/Container.php | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Snidel/Config.php b/src/Snidel/Config.php index 278e357..e0c42e9 100644 --- a/src/Snidel/Config.php +++ b/src/Snidel/Config.php @@ -17,6 +17,8 @@ public function __construct($params = []) 'concurrency' => 5, 'logger' => null, 'driver' => null, + // a polling duration(in seconds) of queueing + 'pollingDuration' => 1, ]; $this->params = array_merge($default, $params); diff --git a/src/Snidel/Fork/Container.php b/src/Snidel/Fork/Container.php index 92c17ca..724d743 100644 --- a/src/Snidel/Fork/Container.php +++ b/src/Snidel/Fork/Container.php @@ -267,7 +267,7 @@ public function results() { for (; $this->queuedCount() > $this->dequeuedCount();) { for (;;) { - if ($envelope = $this->resultQueue->dequeue()) { + if ($envelope = $this->resultQueue->dequeue($this->config->get('pollingDuration'))) { $this->dequeuedCount++; break; } From 9029873ee6e79542ef355b91be0f9dda236cc53a Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Wed, 15 Aug 2018 21:48:20 +0900 Subject: [PATCH 02/10] Use queue instead of consumer object --- src/Snidel/Worker.php | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/Snidel/Worker.php b/src/Snidel/Worker.php index 2568ca7..cf0e836 100644 --- a/src/Snidel/Worker.php +++ b/src/Snidel/Worker.php @@ -1,10 +1,10 @@ process = $process; $this->factory = $this->createFactory($driver); - $router = new SimpleRouter(); - $router->add('Task', $this); - $this->consumer = $this->createConsumer($router); $this->producer = $this->createProducer($this->factory); + $this->taskQueue = @$this->factory->create('task'); + $this->taskQueue = $this->factory->create('task'); } /** @@ -62,7 +61,11 @@ public function getPid() */ public function run() { - $this->consumer->consume($this->factory->create('task')); + while (true) { + if ($envelope = $this->taskQueue->dequeue(1)) { + $this->task($envelope->getMessage()); + } + } } /** From 02c3220432a422a2e4999f020383e17ea07ddafd Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Wed, 15 Aug 2018 22:00:03 +0900 Subject: [PATCH 03/10] Apply "pollingDuration" configuration params --- src/Snidel/Fork/Container.php | 2 +- src/Snidel/Worker.php | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/Snidel/Fork/Container.php b/src/Snidel/Fork/Container.php index 724d743..c1b3e88 100644 --- a/src/Snidel/Fork/Container.php +++ b/src/Snidel/Fork/Container.php @@ -186,7 +186,7 @@ private function forkWorker() throw new \RuntimeException($message); } - $worker = new Worker($process, $this->config->get('driver')); + $worker = new Worker($process, $this->config->get('driver'), $this->config->get('pollingDuration')); if (getmypid() === $this->master->getPid()) { // master diff --git a/src/Snidel/Worker.php b/src/Snidel/Worker.php index cf0e836..dcbdff4 100644 --- a/src/Snidel/Worker.php +++ b/src/Snidel/Worker.php @@ -31,19 +31,24 @@ class Worker /** @var \Bernard\Queue */ private $taskQueue; + /** @var int */ + private $pollingDuration; + /** * @param \Ackintosh\Snidel\Fork\Process $process * @param \Bernard\Driver $driver + * @param int $pollingDuration */ - public function __construct($process, $driver) + public function __construct($process, $driver, $pollingDuration) { $this->pcntl = new Pcntl(); $this->process = $process; $this->factory = $this->createFactory($driver); $this->producer = $this->createProducer($this->factory); - $this->taskQueue = @$this->factory->create('task'); $this->taskQueue = $this->factory->create('task'); + + $this->pollingDuration = $pollingDuration; } /** @@ -62,7 +67,7 @@ public function getPid() public function run() { while (true) { - if ($envelope = $this->taskQueue->dequeue(1)) { + if ($envelope = $this->taskQueue->dequeue($this->pollingDuration)) { $this->task($envelope->getMessage()); } } From 5ab3a2b27cbacc33a3e628257a86d9390fd1016f Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Wed, 15 Aug 2018 22:27:19 +0900 Subject: [PATCH 04/10] Flat-file driver may causes E_WARNING (mkdir(): File exists) in race condition --- src/Snidel/Worker.php | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/Snidel/Worker.php b/src/Snidel/Worker.php index dcbdff4..b773c6e 100644 --- a/src/Snidel/Worker.php +++ b/src/Snidel/Worker.php @@ -46,7 +46,16 @@ public function __construct($process, $driver, $pollingDuration) $this->factory = $this->createFactory($driver); $this->producer = $this->createProducer($this->factory); - $this->taskQueue = $this->factory->create('task'); + + /* + * Flat-file driver may causes E_WARNING (mkdir(): File exists) in race condition. + * Suppress the warning as it isn't matter and we should progress this task. + */ + if ($driver instanceof \Bernard\Driver\FlatFileDriver) { + $this->taskQueue = @$this->factory->create('task'); + } else { + $this->taskQueue = $this->factory->create('task'); + } $this->pollingDuration = $pollingDuration; } From 9c2e689110423e57ecc4ca1b33ea1fe0bf19007b Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Wed, 15 Aug 2018 22:34:56 +0900 Subject: [PATCH 05/10] Minify the delay time due to the issue of bernard's polling --- tests/Snidel/SnidelTest.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/Snidel/SnidelTest.php b/tests/Snidel/SnidelTest.php index ca80ebe..6481b54 100644 --- a/tests/Snidel/SnidelTest.php +++ b/tests/Snidel/SnidelTest.php @@ -65,7 +65,11 @@ public function passMultipleArguments() */ public function concurrency() { - $snidel = new Snidel(['concurrency' => 3]); + $snidel = new Snidel([ + 'concurrency' => 3, + // in order to minify the delay time due to the issue of bernard's polling, specifying a small number. + 'pollingDuration' => 0.5, + ]); $start = time(); $snidel->process('sleepsTwoSeconds'); From 84b31ad38c816f8179096db1c82a5183c53c822a Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Wed, 15 Aug 2018 22:59:34 +0900 Subject: [PATCH 06/10] Fix missing argument in unit test --- tests/TestCase.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/TestCase.php b/tests/TestCase.php index dfacd59..2561a80 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -61,7 +61,8 @@ protected function makeWorker($pid = null) return new Worker( new Process($pid), - (new Config())->get('driver') + (new Config())->get('driver'), + (new Config())->get('pollingDuration') ); } } From 415fd174393e9daac686ee4f32c24d5e3a108153 Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Wed, 15 Aug 2018 23:03:32 +0900 Subject: [PATCH 07/10] Update README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 4c344d0..0d0bbf0 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,8 @@ new Snidel([ 'logger' => $monolog, // Please refer to `Using custom queue` 'driver' => $driver, + // a polling duration(in seconds) of queueing + 'pollingDuration' => 1, ]); ``` From 385956c8209018c87013b45701f615392cef0b3c Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Wed, 15 Aug 2018 23:15:31 +0900 Subject: [PATCH 08/10] Tweak spacing --- src/Snidel.php | 2 +- src/Snidel/Fork/Container.php | 2 +- src/Snidel/Worker.php | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Snidel.php b/src/Snidel.php index e25adc3..75673fd 100644 --- a/src/Snidel.php +++ b/src/Snidel.php @@ -1,5 +1,5 @@ Date: Thu, 16 Aug 2018 20:32:51 +0900 Subject: [PATCH 09/10] Fix missing constructor argument in unit test --- tests/Snidel/ActiveWorkerSetTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/Snidel/ActiveWorkerSetTest.php b/tests/Snidel/ActiveWorkerSetTest.php index 4a7db80..ddc5047 100644 --- a/tests/Snidel/ActiveWorkerSetTest.php +++ b/tests/Snidel/ActiveWorkerSetTest.php @@ -70,7 +70,7 @@ public function terminate() { $driver = (new Config())->get('driver'); $worker1 = $this->getMockBuilder('\Ackintosh\Snidel\Worker') - ->setConstructorArgs([$this->makeProcess(1), $driver]) + ->setConstructorArgs([$this->makeProcess(1), $driver, 1]) ->setMethods(['terminate']) ->getMock(); $worker1->expects($this->once()) @@ -78,7 +78,7 @@ public function terminate() ->with(SIGTERM); $worker2 = $this->getMockBuilder('\Ackintosh\Snidel\Worker') - ->setConstructorArgs([$this->makeProcess(2), $driver]) + ->setConstructorArgs([$this->makeProcess(2), $driver, 1]) ->setMethods(['terminate']) ->getMock(); $worker2->expects($this->once()) From 7a1004a82c7cc4615fb937e6db66a104a3578a40 Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Sat, 18 Aug 2018 13:00:23 +0900 Subject: [PATCH 10/10] Condition expressions are not tickable... --- src/Snidel/Worker.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Snidel/Worker.php b/src/Snidel/Worker.php index f153257..0a11070 100644 --- a/src/Snidel/Worker.php +++ b/src/Snidel/Worker.php @@ -79,6 +79,10 @@ public function run() if ($envelope = $this->taskQueue->dequeue($this->pollingDuration)) { $this->task($envelope->getMessage()); } + // We need to insert some statements here as condition expressions are not tickable. + // Worker process can't receive signals sent from Master if there's no statements here. + // @see http://jp2.php.net/manual/en/control-structures.declare.php#control-structures.declare.ticks + usleep(1); } }