From 705f1dd382ba6c07a7420b20f597ded3ce95f34e Mon Sep 17 00:00:00 2001 From: soul11201 Date: Wed, 1 Aug 2018 17:10:17 +0800 Subject: [PATCH 1/5] fix: offset + 1 --- src/Consumer/Process.php | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index ccc81d98..169817a5 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -490,7 +490,7 @@ public function succFetchOffset($result) foreach ($consumerOffsets as $topic => $value) { foreach ($value as $partId => $offset) { if (isset($lastOffsets[$topic][$partId]) && $lastOffsets[$topic][$partId] > $offset) { - $consumerOffsets[$topic][$partId] = $offset + 1; + $consumerOffsets[$topic][$partId] = $offset; } } } @@ -557,8 +557,8 @@ public function succFetch($result, $fd) continue; } - $consumerOffset = $assign->getConsumerOffset($topic['topicName'], $part['partition']); - if ($consumerOffset === false) { + $offset = $assign->getConsumerOffset($topic['topicName'], $part['partition']); + if ($offset === false) { return; // current is rejoin.... } foreach ($part['messages'] as $message) { @@ -566,14 +566,11 @@ public function succFetch($result, $fd) //if ($this->consumer != null) { // call_user_func($this->consumer, $topic['topicName'], $part['partition'], $message); //} - $commitOffset = $message['offset']; + $offset = $message['offset'] + 1; } - $commitOffset = isset($commitOffset) ? $commitOffset : $consumerOffset - 1; - $consumerOffset = $commitOffset + 1; - - $assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset); - $assign->setCommitOffset($topic['topicName'], $part['partition'], $commitOffset); + $assign->setConsumerOffset($topic['topicName'], $part['partition'], $offset); + $assign->setCommitOffset($topic['topicName'], $part['partition'], $offset); } } $this->state->succRun(\Kafka\Consumer\State::REQUEST_FETCH, $fd); From fc3b3f116e45c3e6378796214fc8283ce161dea8 Mon Sep 17 00:00:00 2001 From: soul11201 Date: Fri, 14 Sep 2018 19:00:59 +0800 Subject: [PATCH 2/5] Fix bug: broker instance will be wrong used when the consumer and producer was instanced at the same time --- src/Broker.php | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/Broker.php b/src/Broker.php index d3ddc7aa..41571759 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -7,8 +7,6 @@ class Broker { - use SingletonTrait; - private $groupBrokerId = null; private $topics = []; @@ -23,6 +21,21 @@ class Broker private $config; + private static $instance = []; + + /** + * @return static + */ + public static function getInstance($instance_name = 'default') + { + if(self::$instance[$instance_name]) { + return self::$instance[$instance_name]; + }else{ + return self::$instance[$instance_name] = new static(); + } + } + + public function setProcess(callable $process) { $this->process = $process; From 4ae5dbbfdbc36ec6dbe841d77181826a063a339c Mon Sep 17 00:00:00 2001 From: soul11201 Date: Fri, 14 Sep 2018 19:09:44 +0800 Subject: [PATCH 3/5] php version restrict --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index a0d44015..1204b88e 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ "sort-packages": true }, "require": { - "php": "^5.6", + "php": ">=5.6", "amphp/amp": "^v1.2.2", "psr/log": "^1.0.2" }, From c277bef9b5e76285ec0aa84b23f2fa9025ceb8f0 Mon Sep 17 00:00:00 2001 From: soul11201 Date: Fri, 14 Sep 2018 19:36:54 +0800 Subject: [PATCH 4/5] fix --- src/Broker.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Broker.php b/src/Broker.php index 41571759..f3184043 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -28,7 +28,8 @@ class Broker */ public static function getInstance($instance_name = 'default') { - if(self::$instance[$instance_name]) { + + if(isset(self::$instance[$instance_name] )) { return self::$instance[$instance_name]; }else{ return self::$instance[$instance_name] = new static(); From 385678dd1ce155f4a7bf2f8fd550c862e9828236 Mon Sep 17 00:00:00 2001 From: soul11201 Date: Fri, 14 Sep 2018 20:09:08 +0800 Subject: [PATCH 5/5] Fix producer call --- src/Producer/Process.php | 10 +++++----- src/Producer/SyncProcess.php | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/Producer/Process.php b/src/Producer/Process.php index 4ad5314a..34eb0440 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -34,7 +34,7 @@ public function init() \Kafka\Protocol::init($config->getBrokerVersion(), $this->logger); // init process request - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $broker->setConfig($config); $broker->setProcess(function ($data, $fd) { $this->processRequest($data, $fd); @@ -124,7 +124,7 @@ public function syncMeta() } shuffle($brokerHost); - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); foreach ($brokerHost as $host) { $socket = $broker->getMetaConnect($host); if ($socket) { @@ -160,7 +160,7 @@ protected function processRequest($data, $fd) $this->error('Get metadata is fail, brokers or topics is null.'); $this->state->failRun(\Kafka\Producer\State::REQUEST_METADATA); } else { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $isChange = $broker->setData($result['topics'], $result['brokers']); $this->state->succRun(\Kafka\Producer\State::REQUEST_METADATA, $isChange); } @@ -177,7 +177,7 @@ protected function processRequest($data, $fd) protected function produce() { $context = []; - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $requiredAck = \Kafka\ProducerConfig::getInstance()->getRequiredAck(); $timeout = \Kafka\ProducerConfig::getInstance()->getTimeout(); @@ -260,7 +260,7 @@ protected function stateConvert($errorCode, $context = null) protected function convertMessage($data) { $sendData = []; - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $topicInfos = $broker->getTopics(); foreach ($data as $value) { if (! isset($value['topic']) || ! trim($value['topic'])) { diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index d09096f9..182a256d 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -12,7 +12,7 @@ public function __construct() $config = \Kafka\ProducerConfig::getInstance(); \Kafka\Protocol::init($config->getBrokerVersion(), $this->logger); // init broker - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $broker->setConfig($config); $this->syncMeta(); @@ -20,7 +20,7 @@ public function __construct() public function send($data) { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $requiredAck = \Kafka\ProducerConfig::getInstance()->getRequiredAck(); $timeout = \Kafka\ProducerConfig::getInstance()->getTimeout(); @@ -80,7 +80,7 @@ public function syncMeta() } shuffle($brokerHost); - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); foreach ($brokerHost as $host) { $socket = $broker->getMetaConnect($host, true); if ($socket) { @@ -95,7 +95,7 @@ public function syncMeta() if (! isset($result['brokers']) || ! isset($result['topics'])) { throw new \Kafka\Exception('Get metadata is fail, brokers or topics is null.'); } else { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $broker->setData($result['topics'], $result['brokers']); } return; @@ -113,7 +113,7 @@ public function syncMeta() protected function convertMessage($data) { $sendData = []; - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $topicInfos = $broker->getTopics(); foreach ($data as $value) { if (! isset($value['topic']) || ! trim($value['topic'])) {