diff --git a/composer.json b/composer.json index 3e9368a..f62b396 100644 --- a/composer.json +++ b/composer.json @@ -7,7 +7,11 @@ "google/crc32": "^0.1.0", "exussum12/xxhash": "^1.0.0", "chdemko/sorted-collections": "^1.0", - "symfony/polyfill-php81": "^1.23" + "symfony/polyfill-php81": "^1.23", + "guzzlehttp/guzzle": "^7.4", + "guzzlehttp/psr7": "^2.4", + "aws/aws-sdk-php": "^3.227", + "ext-json": "*" }, "require-dev": { "phpunit/phpunit": "^7.5|^8.0|^9.0", diff --git a/examples/consumer_msk.php b/examples/consumer_msk.php new file mode 100644 index 0000000..f8e351c --- /dev/null +++ b/examples/consumer_msk.php @@ -0,0 +1,29 @@ +getKey() . ':' . $message->getValue()); +} +$config = new ConsumerConfig(); +$config->setBroker('127.0.0.1:9092'); +$config->setTopic('test'); // 主题名称 +$config->setGroupId('testGroup'); // 分组ID +$config->setClientId('test'); // 客户端ID +$config->setGroupInstanceId('test'); // 分组实例ID +$config->setInterval(0.1); +$config->setSasl([ + "type"=> AwsMskIamSasl::class, + "region"=>"eu-west-1", + "expiration" => "+5 minutes" +]); +$consumer = new Consumer($config, 'consume'); +$consumer->start(); diff --git a/examples/producer_msk.php b/examples/producer_msk.php new file mode 100644 index 0000000..347fa0c --- /dev/null +++ b/examples/producer_msk.php @@ -0,0 +1,41 @@ +setOpen(true); +$sslConfig->setCompression(true); + +$config = new ProducerConfig(); +$config->setBootstrapServer('b-1.fakemskcluster.kafka.eu-west-1.amazonaws.com:9098,b-2.fakemskcluster.kafka.eu-west-1.amazonaws.com:9098'); +$config->setUpdateBrokers(true); +$config->setAcks(-1); +$config->setSasl([ + "type" => AwsMskIamSasl::class, + "region" => "eu-west-1", + "expiration" => "+5 minutes" +]); +$config->setSsl($sslConfig); + +$producer = new Producer($config); +$topic = 'MSKTutorialTopic'; +$value = "It Works!!!"; +$key = uniqid('', true); +$producer->send($topic, $value, $key); + +for ($i = 1; $i <= 10; $i++) { + $value = "Message: " . $i; + $headers = [ + 'key1' => 'value1', + (new RecordHeader())->setHeaderKey('key2')->setValue('value2'), + ]; + $producer->send($topic, $value, $key, $headers); +} + diff --git a/src/Client/SyncClient.php b/src/Client/SyncClient.php index 2cb0d8e..111cfdc 100644 --- a/src/Client/SyncClient.php +++ b/src/Client/SyncClient.php @@ -101,7 +101,13 @@ public function connect(): void $this->socket->connect(); $this->waitResponseMaps = []; $this->updateApiVersions(); - $this->sendAuthInfo(); + $connector = $this->getSaslConnector(); + if (!$connector instanceof SaslInterface) { + return; + } + + $connector->setSocket($this->socket); + $this->sendAuthInfo($connector); } public function close(): bool @@ -152,8 +158,8 @@ public function send(AbstractRequest $request, ?RequestHeader $header = null, bo if ($hasResponse) { $this->waitResponseMaps[$correlationId] = [ - 'apiKey' => $apiKey, - 'apiVersion' => $header->getRequestApiVersion(), + 'apiKey' => $apiKey, + 'apiVersion' => $header->getRequestApiVersion(), 'flexibleVersions' => $request->getFlexibleVersions(), ]; } @@ -197,16 +203,8 @@ protected function updateApiVersions(): void $this->setApiKeys($response->getApiKeys()); } - protected function sendAuthInfo(): void + protected function sendAuthInfo(SaslInterface $class): void { - $config = $this->getConfig()->getSasl(); - if (!isset($config['type']) || empty($config['type'])) { - return; - } - $class = new $config['type']($this->getConfig()); - if (!$class instanceof SaslInterface) { - return; - } $handshakeRequest = new SaslHandshakeRequest(); $handshakeRequest->setMechanism($class->getName()); $correlationId = $this->send($handshakeRequest); @@ -221,4 +219,17 @@ protected function sendAuthInfo(): void $authenticateResponse = $this->recv($correlationId); ErrorCode::check($authenticateResponse->getErrorCode()); } + +/** + * @return SaslInterface|null + */ +protected function getSaslConnector() + { + $config = $this->getConfig()->getSasl(); + if (!isset($config['type']) || empty($config['type'])) { + return null; + } + $class = new $config['type']($this->getConfig()); + return $class; + } } diff --git a/src/Sasl/AwsMskIamSasl.php b/src/Sasl/AwsMskIamSasl.php new file mode 100644 index 0000000..a42af81 --- /dev/null +++ b/src/Sasl/AwsMskIamSasl.php @@ -0,0 +1,114 @@ +config = $config; + } + + public function setSocket(SocketInterface $socket): void + { + $this->socket = $socket; + } + + /** + * Authorization mode + */ + public function getName(): string + { + return 'AWS_MSK_IAM'; + } + + /** + * Generated the Signed JSON used by AWS_MSK_IAM as auth string + * @throws KafkaErrorException + */ + public function getAuthBytes(): string + { + $config = $this->config->getSasl(); + if (empty($this->socket) || empty($config['region'])) { + throw new KafkaErrorException('AWS MSK config params not found'); + } + + $host = $this->socket->getHost(); + + $query = http_build_query(array( + self::QUERY_ACTION_KEY => self::SIGN_ACTION, + )); + + if (empty($config['expiration'])) { + $expiration = "+5 minutes"; + } else { + $expiration = $config['expiration']; + } + + $region = $config['region']; + + $url = "kafka://" . $host . "/?" . $query; + $provider = CredentialProvider::defaultProvider(); + // Returns a CredentialsInterface or throws. + $creds = $provider()->wait(); + + $req = new Request('GET', $url); + + $signer = new SignatureV4(self::SIGN_SERVICE, $region); + $signedReq = $signer->presign($req, $creds, $expiration); + $signedUri = $signedReq->getUri(); + + parse_str($signedUri->getQuery(), $params); + + $headers = $signedReq->getHeaders(); + + $signedMap = array( + self::SIGN_VERSION_KEY => self::SIGN_VERSION, + self::SIGN_USER_AGENT_KEY => "php-kafka/sasl/aws_msk_iam/" . PHP_VERSION, + self::SIGN_ACTION_KEY => self::SIGN_ACTION, + self::SIGN_HOST_KEY => $host + ); + + foreach ($params as $params_key => $params_value) { + $signedMap[strtolower($params_key)] = $params_value; + } + + foreach ($headers as $header_key => $header_value) { + $header_key = strtolower($header_key); + if ($header_key !== self::SIGN_HOST_KEY) { + $signedMap[$header_key] = $header_value; + } + } + return json_encode($signedMap); + } +} diff --git a/src/Sasl/PlainSasl.php b/src/Sasl/PlainSasl.php index 5f7c98b..c6bcfec 100644 --- a/src/Sasl/PlainSasl.php +++ b/src/Sasl/PlainSasl.php @@ -6,6 +6,7 @@ use longlang\phpkafka\Config\CommonConfig; use longlang\phpkafka\Exception\KafkaErrorException; +use longlang\phpkafka\Socket\SocketInterface; class PlainSasl implements SaslInterface { @@ -40,4 +41,9 @@ public function getAuthBytes(): string return sprintf("\x00%s\x00%s", $config['username'], $config['password']); } + + public function setSocket(SocketInterface $socket): void + { + // we do nothing here + } } diff --git a/src/Sasl/SaslInterface.php b/src/Sasl/SaslInterface.php index 68265a0..152b9f4 100644 --- a/src/Sasl/SaslInterface.php +++ b/src/Sasl/SaslInterface.php @@ -5,6 +5,7 @@ namespace longlang\phpkafka\Sasl; use longlang\phpkafka\Config\CommonConfig; +use longlang\phpkafka\Socket\SocketInterface; interface SaslInterface { @@ -19,4 +20,6 @@ public function getName(): string; * 返回授权信息. */ public function getAuthBytes(): string; + + public function setSocket(SocketInterface $socket): void; }