diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..4d93228 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,20 @@ +本仓库是在 [longlang/kafka](https://github.com/swoole/phpkafka) 的基础上增加了 `SCRAM-SHA-512` 加密方式的连接。 + +使用时 `sasl` 配置为 +```php +... +'sasl' => [ + 'type' => \longlang\phpkafka\Sasl\ScramSha512Sasl::class, + 'username' => env('KAFKA_SASL_USERNAME', ''), + 'password' => env('KAFKA_SASL_PASSWORD', ''), + // 是否验证第二次握手的服务器响应消息的签名 + 'verify_final_signature' => (bool) env('KAFKA_SASL_VERIFY_FINAL_SIGNATURE', false), +], +... +``` + + +# Changed Log +## [v1.2.3] - 2023-08-24 +### Added + - 增加基于 `SCRAM-SHA-512` 加密方式的连接; \ No newline at end of file diff --git a/src/Client/SyncClient.php b/src/Client/SyncClient.php index 2cb0d8e..83f396c 100644 --- a/src/Client/SyncClient.php +++ b/src/Client/SyncClient.php @@ -23,7 +23,9 @@ use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeRequest; use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeResponse; use longlang\phpkafka\Protocol\Type\Int32; +use longlang\phpkafka\Sasl\PlainSasl; use longlang\phpkafka\Sasl\SaslInterface; +use longlang\phpkafka\Sasl\ScramSha512Sasl; use longlang\phpkafka\Socket\SocketInterface; use longlang\phpkafka\Socket\StreamSocket; @@ -207,6 +209,21 @@ protected function sendAuthInfo(): void if (!$class instanceof SaslInterface) { return; } + + if ($class instanceof PlainSasl) { + /* \longlang\phpkafka\Sasl\PlainSasl $class */ + $this->sendPlainAuthInfo($class); + } elseif ($class instanceof ScramSha512Sasl) { + /* \longlang\phpkafka\Sasl\ScramSha512Sasl $class */ + $this->sendScramSha512AuthInfo($class); + } else { + return; + } + } + + private function sendPlainAuthInfo(SaslInterface $class): void + { + /** @var \longlang\phpkafka\Sasl\PlainSasl $class */ $handshakeRequest = new SaslHandshakeRequest(); $handshakeRequest->setMechanism($class->getName()); $correlationId = $this->send($handshakeRequest); @@ -221,4 +238,37 @@ protected function sendAuthInfo(): void $authenticateResponse = $this->recv($correlationId); ErrorCode::check($authenticateResponse->getErrorCode()); } + + private function sendScramSha512AuthInfo(SaslInterface $class): void + { + /** @var \longlang\phpkafka\Sasl\ScramSha512Sasl $class */ + // Send first verification message + $handshakeRequest = new SaslHandshakeRequest(); + $handshakeRequest->setMechanism($class->getName()); + $correlationId = $this->send($handshakeRequest); + /** @var SaslHandshakeResponse $handshakeResponse */ + $handshakeResponse = $this->recv($correlationId); + ErrorCode::check($handshakeResponse->getErrorCode()); + + // First handshake + $authenticateRequest = new SaslAuthenticateRequest(); + $authenticateRequest->setAuthBytes($class->getAuthBytes()); + $correlationId = $this->send($authenticateRequest); + /** @var SaslAuthenticateResponse $authenticateResponse */ + $authenticateResponse = $this->recv($correlationId); + ErrorCode::check($authenticateResponse->getErrorCode()); + + // Second handshake + $authenticateRequest = new SaslAuthenticateRequest(); + $authenticateRequest->setAuthBytes($class->getFinalMessage($authenticateResponse->getAuthBytes())); + $correlationId = $this->send($authenticateRequest); + /** @var SaslAuthenticateResponse $authenticateResponse */ + $authenticateResponse = $this->recv($correlationId); + ErrorCode::check($authenticateResponse->getErrorCode()); + + // Verify the second server response + if ($class->enableFinalSignatureVerification()) { + $class->verifyFinalMessage($authenticateResponse->getAuthBytes()); + } + } } diff --git a/src/Consumer/Consumer.php b/src/Consumer/Consumer.php index 357a749..410fbd1 100644 --- a/src/Consumer/Consumer.php +++ b/src/Consumer/Consumer.php @@ -411,11 +411,21 @@ protected function heartbeat(): void } } + protected function getLastHeartbeatTime(): float + { + return $this->lastHeartbeatTime > 0 ? $this->lastHeartbeatTime : microtime(true); + } + + protected function setLastHeartbeatTime(float $lastHeartbeatTime): void + { + $this->lastHeartbeatTime = $lastHeartbeatTime; + } + protected function checkBeartbeat(): void { $time = microtime(true); - if ($time - $this->lastHeartbeatTime >= $this->config->getGroupHeartbeat()) { - $this->lastHeartbeatTime = $time; + if ($time - $this->getLastHeartbeatTime() >= $this->config->getGroupHeartbeat()) { + $this->setLastHeartbeatTime($time); $this->heartbeat(); } } diff --git a/src/Sasl/ScramSha512Sasl.php b/src/Sasl/ScramSha512Sasl.php new file mode 100644 index 0000000..6b894b4 --- /dev/null +++ b/src/Sasl/ScramSha512Sasl.php @@ -0,0 +1,196 @@ +config = $config; + $this->nonce = base64_encode(random_bytes(16)); + } + + /** + * 授权模式. + */ + public function getName(): string + { + return 'SCRAM-SHA-512'; + } + + /** + * SCRAM-SHA-512 first handshake. + */ + public function getAuthBytes(): string + { + $config = $this->config->getSasl(); + if (empty($config['username']) || empty($config['password'])) { + throw new KafkaErrorException('sasl not found auth info'); + } + + return sprintf('n,,%s', $this->getFirstMessageBare()); + } + + /** + * Get all SASL configurations. + */ + public function getSaslConfigs(): array + { + return $this->config->getSasl(); + } + + /** + * Get SASL simple configuration. + */ + public function getSaslConfig(string $key): mixed + { + return $this->getSaslConfigs()[$key] ?? null; + } + + /** + * Second handshake of SCRAM-SHA-512. + */ + public function getFinalMessage(string $response): string + { + // Split the response after the first handshake + [$r, $s, $i] = explode(',', $response); + + // Extract the random number, salt, and number of iterations + $serverNonce = $this->ltrimMessage($r); + $salt = base64_decode($this->ltrimMessage($s)); + $iterations = (int) $this->ltrimMessage($i); + + // Calculate the parameters for the second handshake + $saltedPassword = $this->calculateSaltedPassword($this->getPassword(), $salt, $iterations); + $this->saltedPassword = $saltedPassword; + + $clientKey = $this->calculateClientKey($saltedPassword); + $storedKey = $this->calculateStoredKey($clientKey); + + $clientFirstMessageBare = $this->getFirstMessageBare(); + $serverFirstMessage = $response; + $clientFinalMessageWithoutProof = $this->getMessageWithoutProof($serverNonce); + + $authMessage = sprintf('%s,%s,%s', $clientFirstMessageBare, $serverFirstMessage, $clientFinalMessageWithoutProof); + $this->authMessage = $authMessage; + $clientSignature = $this->hmac($authMessage, $storedKey); + + return sprintf('%s,p=%s', $clientFinalMessageWithoutProof, base64_encode($clientKey ^ $clientSignature)); + } + + /** + * SHA-512 encryption. + */ + public function hmac(string $data, string $key): string + { + return hash_hmac('sha512', $data, $key, true); + } + + /** + * Remove the first two characters of the server response message. + */ + public function ltrimMessage(string $param): string + { + return substr($param, 2); + } + + /** + * Whether to enable final signature verification. + */ + public function enableFinalSignatureVerification(): bool + { + return (bool) $this->getSaslConfig('verify_final_signature'); + } + + /** + * Verify final signature. + */ + public function verifyFinalMessage(string $message): void + { + $receivedSignature = $this->ltrimMessage($message); + $receivedSignature = base64_decode($receivedSignature); + + $serverKey = $this->hmac('Server Key', $this->saltedPassword); + $expectedSignature = $this->hmac($this->authMessage, $serverKey); + + if (!hash_equals($receivedSignature, $expectedSignature)) { + ErrorCode::check(ErrorCode::SASL_AUTHENTICATION_FAILED); + } + } + + /** + * Get first handshake information of SCRAM-SHA-512. + */ + private function getFirstMessageBare(): string + { + return sprintf('n=%s,r=%s', $this->getSaslConfig('username'), $this->nonce); + } + + /** + * Get SASL password. + */ + private function getPassword(): string + { + return $this->getSaslConfigs()['password'] ?? ''; + } + + /** + * Compute salted password using PBKDF2 function and the salt and iteration count provided by the server. + */ + private function calculateSaltedPassword(string $password, string $salt, int $iterations): string + { + return hash_pbkdf2('sha512', $password, $salt, $iterations, 0, true); + } + + /** + * Compute client key using salted password and HMAC function to calculate client key. + */ + private function calculateClientKey(string $saltedPassword): string + { + // In SCRAM-SHA-512, a salted password is required to encrypt the calculation secret + // and the key is fixed to "Client Key" + return $this->hmac('Client Key', $saltedPassword); + } + + /** + * Compute stored key using client key and SHA-256 function to calculate stored key. + */ + private function calculateStoredKey(string $clientKey): string + { + return hash('sha512', $clientKey, true); + } + + /** + * Get message without proof. + */ + private function getMessageWithoutProof(string $nonce): string + { + return sprintf('c=biws,r=%s', $nonce); + } +}