diff --git a/composer.json b/composer.json index a0d44015..1204b88e 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ "sort-packages": true }, "require": { - "php": "^5.6", + "php": ">=5.6", "amphp/amp": "^v1.2.2", "psr/log": "^1.0.2" }, diff --git a/src/Broker.php b/src/Broker.php index d3ddc7aa..f3184043 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -7,8 +7,6 @@ class Broker { - use SingletonTrait; - private $groupBrokerId = null; private $topics = []; @@ -23,6 +21,22 @@ class Broker private $config; + private static $instance = []; + + /** + * @return static + */ + public static function getInstance($instance_name = 'default') + { + + if(isset(self::$instance[$instance_name] )) { + return self::$instance[$instance_name]; + }else{ + return self::$instance[$instance_name] = new static(); + } + } + + public function setProcess(callable $process) { $this->process = $process; diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index ccc81d98..169817a5 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -490,7 +490,7 @@ public function succFetchOffset($result) foreach ($consumerOffsets as $topic => $value) { foreach ($value as $partId => $offset) { if (isset($lastOffsets[$topic][$partId]) && $lastOffsets[$topic][$partId] > $offset) { - $consumerOffsets[$topic][$partId] = $offset + 1; + $consumerOffsets[$topic][$partId] = $offset; } } } @@ -557,8 +557,8 @@ public function succFetch($result, $fd) continue; } - $consumerOffset = $assign->getConsumerOffset($topic['topicName'], $part['partition']); - if ($consumerOffset === false) { + $offset = $assign->getConsumerOffset($topic['topicName'], $part['partition']); + if ($offset === false) { return; // current is rejoin.... } foreach ($part['messages'] as $message) { @@ -566,14 +566,11 @@ public function succFetch($result, $fd) //if ($this->consumer != null) { // call_user_func($this->consumer, $topic['topicName'], $part['partition'], $message); //} - $commitOffset = $message['offset']; + $offset = $message['offset'] + 1; } - $commitOffset = isset($commitOffset) ? $commitOffset : $consumerOffset - 1; - $consumerOffset = $commitOffset + 1; - - $assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset); - $assign->setCommitOffset($topic['topicName'], $part['partition'], $commitOffset); + $assign->setConsumerOffset($topic['topicName'], $part['partition'], $offset); + $assign->setCommitOffset($topic['topicName'], $part['partition'], $offset); } } $this->state->succRun(\Kafka\Consumer\State::REQUEST_FETCH, $fd); diff --git a/src/Producer/Process.php b/src/Producer/Process.php index 4ad5314a..34eb0440 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -34,7 +34,7 @@ public function init() \Kafka\Protocol::init($config->getBrokerVersion(), $this->logger); // init process request - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $broker->setConfig($config); $broker->setProcess(function ($data, $fd) { $this->processRequest($data, $fd); @@ -124,7 +124,7 @@ public function syncMeta() } shuffle($brokerHost); - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); foreach ($brokerHost as $host) { $socket = $broker->getMetaConnect($host); if ($socket) { @@ -160,7 +160,7 @@ protected function processRequest($data, $fd) $this->error('Get metadata is fail, brokers or topics is null.'); $this->state->failRun(\Kafka\Producer\State::REQUEST_METADATA); } else { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $isChange = $broker->setData($result['topics'], $result['brokers']); $this->state->succRun(\Kafka\Producer\State::REQUEST_METADATA, $isChange); } @@ -177,7 +177,7 @@ protected function processRequest($data, $fd) protected function produce() { $context = []; - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $requiredAck = \Kafka\ProducerConfig::getInstance()->getRequiredAck(); $timeout = \Kafka\ProducerConfig::getInstance()->getTimeout(); @@ -260,7 +260,7 @@ protected function stateConvert($errorCode, $context = null) protected function convertMessage($data) { $sendData = []; - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $topicInfos = $broker->getTopics(); foreach ($data as $value) { if (! isset($value['topic']) || ! trim($value['topic'])) { diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index d09096f9..182a256d 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -12,7 +12,7 @@ public function __construct() $config = \Kafka\ProducerConfig::getInstance(); \Kafka\Protocol::init($config->getBrokerVersion(), $this->logger); // init broker - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $broker->setConfig($config); $this->syncMeta(); @@ -20,7 +20,7 @@ public function __construct() public function send($data) { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $requiredAck = \Kafka\ProducerConfig::getInstance()->getRequiredAck(); $timeout = \Kafka\ProducerConfig::getInstance()->getTimeout(); @@ -80,7 +80,7 @@ public function syncMeta() } shuffle($brokerHost); - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); foreach ($brokerHost as $host) { $socket = $broker->getMetaConnect($host, true); if ($socket) { @@ -95,7 +95,7 @@ public function syncMeta() if (! isset($result['brokers']) || ! isset($result['topics'])) { throw new \Kafka\Exception('Get metadata is fail, brokers or topics is null.'); } else { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $broker->setData($result['topics'], $result['brokers']); } return; @@ -113,7 +113,7 @@ public function syncMeta() protected function convertMessage($data) { $sendData = []; - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $topicInfos = $broker->getTopics(); foreach ($data as $value) { if (! isset($value['topic']) || ! trim($value['topic'])) {