From c11502a5fa76e0034a2a75a2d99efbf0e661dbff Mon Sep 17 00:00:00 2001 From: Mauro Valota Date: Tue, 21 Jun 2022 17:44:19 +0200 Subject: [PATCH 1/9] Add AWS MSK IAM support --- composer.json | 6 ++- examples/consumer_msk.php | 29 ++++++++++++ examples/producer_msk.php | 33 +++++++++++++ src/Sasl/AwsMskIamSasl.php | 96 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 examples/consumer_msk.php create mode 100644 examples/producer_msk.php create mode 100644 src/Sasl/AwsMskIamSasl.php diff --git a/composer.json b/composer.json index 3e9368a..f62b396 100644 --- a/composer.json +++ b/composer.json @@ -7,7 +7,11 @@ "google/crc32": "^0.1.0", "exussum12/xxhash": "^1.0.0", "chdemko/sorted-collections": "^1.0", - "symfony/polyfill-php81": "^1.23" + "symfony/polyfill-php81": "^1.23", + "guzzlehttp/guzzle": "^7.4", + "guzzlehttp/psr7": "^2.4", + "aws/aws-sdk-php": "^3.227", + "ext-json": "*" }, "require-dev": { "phpunit/phpunit": "^7.5|^8.0|^9.0", diff --git a/examples/consumer_msk.php b/examples/consumer_msk.php new file mode 100644 index 0000000..8326757 --- /dev/null +++ b/examples/consumer_msk.php @@ -0,0 +1,29 @@ +getKey() . ':' . $message->getValue()); +} +$config = new ConsumerConfig(); +$config->setBroker('127.0.0.1:9092'); +$config->setTopic('test'); // 主题名称 +$config->setGroupId('testGroup'); // 分组ID +$config->setClientId('test'); // 客户端ID +$config->setGroupInstanceId('test'); // 分组实例ID +$config->setInterval(0.1); +$config->setSasl([ + "type"=> AwsMskIamSasl::class, + "host"=>"localhost", + "region"=>"eu-west-1" +]); +$consumer = new Consumer($config, 'consume'); +$consumer->start(); diff --git a/examples/producer_msk.php b/examples/producer_msk.php new file mode 100644 index 0000000..b56d248 --- /dev/null +++ b/examples/producer_msk.php @@ -0,0 +1,33 @@ +setBootstrapServer('127.0.0.1:9092'); +$config->setUpdateBrokers(true); +$config->setAcks(-1); +$config->setSasl([ + "type"=> AwsMskIamSasl::class, + "host"=>"localhost", + "region"=>"eu-west-1" +]); +$producer = new Producer($config); +$topic = 'test'; +$value = (string) microtime(true); +$key = uniqid('', true); +$producer->send('test', $value, $key); + +// set headers +// key-value or use RecordHeader +$headers = [ + 'key1' => 'value1', + (new RecordHeader())->setHeaderKey('key2')->setValue('value2'), +]; +$producer->send('test', $value, $key, $headers); diff --git a/src/Sasl/AwsMskIamSasl.php b/src/Sasl/AwsMskIamSasl.php new file mode 100644 index 0000000..117c35b --- /dev/null +++ b/src/Sasl/AwsMskIamSasl.php @@ -0,0 +1,96 @@ +config = $config; + } + + public function setHost(string $host) + { + $this->host = $host; + } + + /** + * 授权模式. + */ + public function getName(): string + { + return 'AWS_MSK_IAM'; + } + + /** + * 获得加密串. + */ + public function getAuthBytes(): string + { + $config = $this->config->getSasl(); + if (empty($config['host']) || empty($config['region'])) { + // 不存在就报错 + throw new KafkaErrorException('sasl not found auth info'); + } + + $query = http_build_query(Array( + self::QUERY_ACTION_KEY => self::SIGN_ACTION + )); + + $host = $config['host']; + + $region = $config['region']; + + $url = "kafka://".$host."/".$query."/"; + + $req = new Request('GET', $url); + + $expiry = "+5 minutes"; + + $credentials = call_user_func(CredentialProvider::defaultProvider())->wait(); + + $signer = new SignatureV4(self::SIGN_SERVICE, $region); + $signedReq = $signer->presign($req, $credentials, $expiry); + $headers = $signedReq->getHeaders(); + + $signedMap = Array( + self::SIGN_VERSION_KEY => self::SIGN_VERSION, + self::SIGN_HOST_KEY => $host, + self::SIGN_USER_AGENT_KEY => "php-kafka/sasl/aws_msk_iam", + self::SIGN_ACTION_KEY => self::SIGN_ACTION + ); + + foreach (array_keys($headers) as $key) { + $signedMap[strtolower($key)] = $headers[$key]; + } + + $signedMap[strtolower(self::QUERY_ACTION_KEY)] = self::SIGN_ACTION; + + return json_encode($signedMap); + } +} From b955473c5efbe3822c101d7519bd771daa8605b3 Mon Sep 17 00:00:00 2001 From: Mauro Valota Date: Tue, 21 Jun 2022 18:08:09 +0200 Subject: [PATCH 2/9] Fixed AWS presign --- src/Sasl/AwsMskIamSasl.php | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Sasl/AwsMskIamSasl.php b/src/Sasl/AwsMskIamSasl.php index 117c35b..5812a5b 100644 --- a/src/Sasl/AwsMskIamSasl.php +++ b/src/Sasl/AwsMskIamSasl.php @@ -76,7 +76,9 @@ public function getAuthBytes(): string $signer = new SignatureV4(self::SIGN_SERVICE, $region); $signedReq = $signer->presign($req, $credentials, $expiry); - $headers = $signedReq->getHeaders(); + $signedUri = $signedReq->getUri(); + $url_components = parse_url((string)$signedUri); + parse_str($url_components['query'], $params); $signedMap = Array( self::SIGN_VERSION_KEY => self::SIGN_VERSION, @@ -85,8 +87,8 @@ public function getAuthBytes(): string self::SIGN_ACTION_KEY => self::SIGN_ACTION ); - foreach (array_keys($headers) as $key) { - $signedMap[strtolower($key)] = $headers[$key]; + foreach (array_keys($params) as $key) { + $signedMap[strtolower($key)] = $params[$key]; } $signedMap[strtolower(self::QUERY_ACTION_KEY)] = self::SIGN_ACTION; From da383b3c486a75a872e041cc17d45653f235b30b Mon Sep 17 00:00:00 2001 From: Mauro Valota Date: Wed, 28 Sep 2022 15:06:04 +0200 Subject: [PATCH 3/9] Fixed AWS MSK IAM Signed JSON --- examples/producer_msk.php | 36 +++++++++++-------- src/Client/SyncClient.php | 35 ++++++++++++------- src/Sasl/AwsMskIamSasl.php | 71 ++++++++++++++++++++++---------------- 3 files changed, 87 insertions(+), 55 deletions(-) diff --git a/examples/producer_msk.php b/examples/producer_msk.php index b56d248..347fa0c 100644 --- a/examples/producer_msk.php +++ b/examples/producer_msk.php @@ -5,29 +5,37 @@ use longlang\phpkafka\Producer\Producer; use longlang\phpkafka\Producer\ProducerConfig; use longlang\phpkafka\Protocol\RecordBatch\RecordHeader; +use longlang\phpkafka\Config\SslConfig; use longlang\phpkafka\Sasl\AwsMskIamSasl; require dirname(__DIR__) . '/vendor/autoload.php'; +$sslConfig = new SslConfig(); +$sslConfig->setOpen(true); +$sslConfig->setCompression(true); $config = new ProducerConfig(); -$config->setBootstrapServer('127.0.0.1:9092'); +$config->setBootstrapServer('b-1.fakemskcluster.kafka.eu-west-1.amazonaws.com:9098,b-2.fakemskcluster.kafka.eu-west-1.amazonaws.com:9098'); $config->setUpdateBrokers(true); $config->setAcks(-1); $config->setSasl([ - "type"=> AwsMskIamSasl::class, - "host"=>"localhost", - "region"=>"eu-west-1" + "type" => AwsMskIamSasl::class, + "region" => "eu-west-1", + "expiration" => "+5 minutes" ]); +$config->setSsl($sslConfig); + $producer = new Producer($config); -$topic = 'test'; -$value = (string) microtime(true); +$topic = 'MSKTutorialTopic'; +$value = "It Works!!!"; $key = uniqid('', true); -$producer->send('test', $value, $key); +$producer->send($topic, $value, $key); + +for ($i = 1; $i <= 10; $i++) { + $value = "Message: " . $i; + $headers = [ + 'key1' => 'value1', + (new RecordHeader())->setHeaderKey('key2')->setValue('value2'), + ]; + $producer->send($topic, $value, $key, $headers); +} -// set headers -// key-value or use RecordHeader -$headers = [ - 'key1' => 'value1', - (new RecordHeader())->setHeaderKey('key2')->setValue('value2'), -]; -$producer->send('test', $value, $key, $headers); diff --git a/src/Client/SyncClient.php b/src/Client/SyncClient.php index 2cb0d8e..9bb74ac 100644 --- a/src/Client/SyncClient.php +++ b/src/Client/SyncClient.php @@ -23,6 +23,7 @@ use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeRequest; use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeResponse; use longlang\phpkafka\Protocol\Type\Int32; +use longlang\phpkafka\Sasl\AwsMskIamSasl; use longlang\phpkafka\Sasl\SaslInterface; use longlang\phpkafka\Socket\SocketInterface; use longlang\phpkafka\Socket\StreamSocket; @@ -101,7 +102,15 @@ public function connect(): void $this->socket->connect(); $this->waitResponseMaps = []; $this->updateApiVersions(); - $this->sendAuthInfo(); + $class = $this->getSaslConfig(); + if (!$class instanceof SaslInterface) { + return; + } else { + if ($class instanceof AwsMskIamSasl) { + $class->setHost($this->socket->getHost()); + } + $this->sendAuthInfo($class); + } } public function close(): bool @@ -152,8 +161,8 @@ public function send(AbstractRequest $request, ?RequestHeader $header = null, bo if ($hasResponse) { $this->waitResponseMaps[$correlationId] = [ - 'apiKey' => $apiKey, - 'apiVersion' => $header->getRequestApiVersion(), + 'apiKey' => $apiKey, + 'apiVersion' => $header->getRequestApiVersion(), 'flexibleVersions' => $request->getFlexibleVersions(), ]; } @@ -197,16 +206,8 @@ protected function updateApiVersions(): void $this->setApiKeys($response->getApiKeys()); } - protected function sendAuthInfo(): void + protected function sendAuthInfo(SaslInterface $class): void { - $config = $this->getConfig()->getSasl(); - if (!isset($config['type']) || empty($config['type'])) { - return; - } - $class = new $config['type']($this->getConfig()); - if (!$class instanceof SaslInterface) { - return; - } $handshakeRequest = new SaslHandshakeRequest(); $handshakeRequest->setMechanism($class->getName()); $correlationId = $this->send($handshakeRequest); @@ -221,4 +222,14 @@ protected function sendAuthInfo(): void $authenticateResponse = $this->recv($correlationId); ErrorCode::check($authenticateResponse->getErrorCode()); } + + private function getSaslConfig() + { + $config = $this->getConfig()->getSasl(); + if (!isset($config['type']) || empty($config['type'])) { + return null; + } + $class = new $config['type']($this->getConfig()); + return $class; + } } diff --git a/src/Sasl/AwsMskIamSasl.php b/src/Sasl/AwsMskIamSasl.php index 5812a5b..75fb197 100644 --- a/src/Sasl/AwsMskIamSasl.php +++ b/src/Sasl/AwsMskIamSasl.php @@ -12,20 +12,24 @@ class AwsMskIamSasl implements SaslInterface { - const SIGN_VERSION = "2020_10_22"; - const SIGN_SERVICE = "kafka-cluster"; const SIGN_ACTION = "kafka-cluster:Connect"; - const SIGN_VERSION_KEY = "version"; + const SIGN_SERVICE = "kafka-cluster"; + const SIGN_VERSION = "2020_10_22"; + const SIGN_ACTION_KEY = "action"; const SIGN_HOST_KEY = "host"; const SIGN_USER_AGENT_KEY = "user-agent"; - const SIGN_ACTION_KEY = "action"; + const SIGN_VERSION_KEY = "version"; const QUERY_ACTION_KEY = "Action"; + /** * @var CommonConfig */ protected $config; + /** + * @var string + */ protected $host; @@ -34,13 +38,13 @@ public function __construct(CommonConfig $config) $this->config = $config; } - public function setHost(string $host) + public function setHost(string $host): void { $this->host = $host; } /** - * 授权模式. + * Authorization mode */ public function getName(): string { @@ -48,51 +52,60 @@ public function getName(): string } /** - * 获得加密串. + * Generated the Signed JSON used by AWS_MSK_IAM as auth string + * @throws KafkaErrorException */ public function getAuthBytes(): string { $config = $this->config->getSasl(); - if (empty($config['host']) || empty($config['region'])) { - // 不存在就报错 - throw new KafkaErrorException('sasl not found auth info'); + if (empty($this->host) || empty($config['region'])) { + throw new KafkaErrorException('AWS MSK config params not found'); } - - $query = http_build_query(Array( - self::QUERY_ACTION_KEY => self::SIGN_ACTION + + $query = http_build_query(array( + self::QUERY_ACTION_KEY => self::SIGN_ACTION, )); - $host = $config['host']; + if (empty($config['expiration'])) { + $expiration = "+5 minutes"; + } else { + $expiration = $config['expiration']; + } $region = $config['region']; - $url = "kafka://".$host."/".$query."/"; + $url = "kafka://" . $this->host . "/?" . $query; + $provider = CredentialProvider::defaultProvider(); + // Returns a CredentialsInterface or throws. + $creds = $provider()->wait(); $req = new Request('GET', $url); - $expiry = "+5 minutes"; - - $credentials = call_user_func(CredentialProvider::defaultProvider())->wait(); - $signer = new SignatureV4(self::SIGN_SERVICE, $region); - $signedReq = $signer->presign($req, $credentials, $expiry); - $signedUri = $signedReq->getUri(); + $signedReq = $signer->presign($req, $creds, $expiration); + $signedUri = $signedReq->getUri(); + $url_components = parse_url((string)$signedUri); parse_str($url_components['query'], $params); - $signedMap = Array( + $headers = $signedReq->getHeaders(); + + $signedMap = array( self::SIGN_VERSION_KEY => self::SIGN_VERSION, - self::SIGN_HOST_KEY => $host, - self::SIGN_USER_AGENT_KEY => "php-kafka/sasl/aws_msk_iam", - self::SIGN_ACTION_KEY => self::SIGN_ACTION + self::SIGN_USER_AGENT_KEY => "php-kafka/sasl/aws_msk_iam/" . phpversion(), + self::SIGN_ACTION_KEY => self::SIGN_ACTION, + self::SIGN_HOST_KEY => $this->host ); - foreach (array_keys($params) as $key) { - $signedMap[strtolower($key)] = $params[$key]; + foreach ($params as $params_key => $params_value) { + $signedMap[strtolower($params_key)] = $params_value; } - $signedMap[strtolower(self::QUERY_ACTION_KEY)] = self::SIGN_ACTION; - + foreach ($headers as $header_key => $header_value) { + if (strtolower($header_key) != strtolower(self::SIGN_HOST_KEY)) { + $signedMap[strtolower($header_key)] = $header_value; + } + } return json_encode($signedMap); } } From 68d5d4a28ff1e5d89f8464071db8a8873325d777 Mon Sep 17 00:00:00 2001 From: Mauro Valota <7254293+valmoz@users.noreply.github.com> Date: Fri, 16 Dec 2022 15:03:47 +0100 Subject: [PATCH 4/9] Update src/Client/SyncClient.php Co-authored-by: einacio --- src/Client/SyncClient.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Client/SyncClient.php b/src/Client/SyncClient.php index 9bb74ac..938822c 100644 --- a/src/Client/SyncClient.php +++ b/src/Client/SyncClient.php @@ -223,7 +223,10 @@ protected function sendAuthInfo(SaslInterface $class): void ErrorCode::check($authenticateResponse->getErrorCode()); } - private function getSaslConfig() +/** + * @return SaslInterface|null + */ +protected function getSaslConnector() { $config = $this->getConfig()->getSasl(); if (!isset($config['type']) || empty($config['type'])) { From d1ed99fc77d4a2e36977b1459114d714dfb28f28 Mon Sep 17 00:00:00 2001 From: Mauro Valota <7254293+valmoz@users.noreply.github.com> Date: Fri, 16 Dec 2022 15:04:23 +0100 Subject: [PATCH 5/9] Update src/Sasl/AwsMskIamSasl.php Co-authored-by: einacio --- src/Sasl/AwsMskIamSasl.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Sasl/AwsMskIamSasl.php b/src/Sasl/AwsMskIamSasl.php index 75fb197..663b334 100644 --- a/src/Sasl/AwsMskIamSasl.php +++ b/src/Sasl/AwsMskIamSasl.php @@ -85,8 +85,7 @@ public function getAuthBytes(): string $signedReq = $signer->presign($req, $creds, $expiration); $signedUri = $signedReq->getUri(); - $url_components = parse_url((string)$signedUri); - parse_str($url_components['query'], $params); + parse_str($signedUri->getQuery(), $params); $headers = $signedReq->getHeaders(); From a92b4f8f4092a6297226bbf5f4c527a1dcb1c30d Mon Sep 17 00:00:00 2001 From: Mauro Valota <7254293+valmoz@users.noreply.github.com> Date: Fri, 16 Dec 2022 15:04:40 +0100 Subject: [PATCH 6/9] Update src/Sasl/AwsMskIamSasl.php Co-authored-by: einacio --- src/Sasl/AwsMskIamSasl.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Sasl/AwsMskIamSasl.php b/src/Sasl/AwsMskIamSasl.php index 663b334..2aaf4da 100644 --- a/src/Sasl/AwsMskIamSasl.php +++ b/src/Sasl/AwsMskIamSasl.php @@ -91,7 +91,7 @@ public function getAuthBytes(): string $signedMap = array( self::SIGN_VERSION_KEY => self::SIGN_VERSION, - self::SIGN_USER_AGENT_KEY => "php-kafka/sasl/aws_msk_iam/" . phpversion(), + self::SIGN_USER_AGENT_KEY => "php-kafka/sasl/aws_msk_iam/" . PHP_VERSION, self::SIGN_ACTION_KEY => self::SIGN_ACTION, self::SIGN_HOST_KEY => $this->host ); From e11a08a6b66608bb418a7420b9a0eb65d7e5db0a Mon Sep 17 00:00:00 2001 From: Mauro Valota <7254293+valmoz@users.noreply.github.com> Date: Fri, 16 Dec 2022 15:06:51 +0100 Subject: [PATCH 7/9] Update src/Sasl/AwsMskIamSasl.php Co-authored-by: einacio --- src/Sasl/AwsMskIamSasl.php | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Sasl/AwsMskIamSasl.php b/src/Sasl/AwsMskIamSasl.php index 2aaf4da..28d4935 100644 --- a/src/Sasl/AwsMskIamSasl.php +++ b/src/Sasl/AwsMskIamSasl.php @@ -101,8 +101,9 @@ public function getAuthBytes(): string } foreach ($headers as $header_key => $header_value) { - if (strtolower($header_key) != strtolower(self::SIGN_HOST_KEY)) { - $signedMap[strtolower($header_key)] = $header_value; + $header_key = strtolower($header_key); + if ($header_key !== self::SIGN_HOST_KEY) { + $signedMap[$header_key] = $header_value; } } return json_encode($signedMap); From 91dd206a499d99fad6ee68352dd3d009491465dc Mon Sep 17 00:00:00 2001 From: Mauro Valota Date: Mon, 19 Dec 2022 11:16:04 +0100 Subject: [PATCH 8/9] Added setSocket to SaslInterface --- src/Client/SyncClient.php | 13 +++++-------- src/Sasl/AwsMskIamSasl.php | 17 ++++++++++------- src/Sasl/PlainSasl.php | 6 ++++++ src/Sasl/SaslInterface.php | 3 +++ 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/Client/SyncClient.php b/src/Client/SyncClient.php index 938822c..111cfdc 100644 --- a/src/Client/SyncClient.php +++ b/src/Client/SyncClient.php @@ -23,7 +23,6 @@ use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeRequest; use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeResponse; use longlang\phpkafka\Protocol\Type\Int32; -use longlang\phpkafka\Sasl\AwsMskIamSasl; use longlang\phpkafka\Sasl\SaslInterface; use longlang\phpkafka\Socket\SocketInterface; use longlang\phpkafka\Socket\StreamSocket; @@ -102,15 +101,13 @@ public function connect(): void $this->socket->connect(); $this->waitResponseMaps = []; $this->updateApiVersions(); - $class = $this->getSaslConfig(); - if (!$class instanceof SaslInterface) { + $connector = $this->getSaslConnector(); + if (!$connector instanceof SaslInterface) { return; - } else { - if ($class instanceof AwsMskIamSasl) { - $class->setHost($this->socket->getHost()); - } - $this->sendAuthInfo($class); } + + $connector->setSocket($this->socket); + $this->sendAuthInfo($connector); } public function close(): bool diff --git a/src/Sasl/AwsMskIamSasl.php b/src/Sasl/AwsMskIamSasl.php index 28d4935..a42af81 100644 --- a/src/Sasl/AwsMskIamSasl.php +++ b/src/Sasl/AwsMskIamSasl.php @@ -9,6 +9,7 @@ use Aws\Credentials\CredentialProvider; use Aws\Signature\SignatureV4; use GuzzleHttp\Psr7\Request; +use longlang\phpkafka\Socket\SocketInterface; class AwsMskIamSasl implements SaslInterface { @@ -28,9 +29,9 @@ class AwsMskIamSasl implements SaslInterface protected $config; /** - * @var string + * @var SocketInterface */ - protected $host; + protected $socket; public function __construct(CommonConfig $config) @@ -38,9 +39,9 @@ public function __construct(CommonConfig $config) $this->config = $config; } - public function setHost(string $host): void + public function setSocket(SocketInterface $socket): void { - $this->host = $host; + $this->socket = $socket; } /** @@ -58,10 +59,12 @@ public function getName(): string public function getAuthBytes(): string { $config = $this->config->getSasl(); - if (empty($this->host) || empty($config['region'])) { + if (empty($this->socket) || empty($config['region'])) { throw new KafkaErrorException('AWS MSK config params not found'); } + $host = $this->socket->getHost(); + $query = http_build_query(array( self::QUERY_ACTION_KEY => self::SIGN_ACTION, )); @@ -74,7 +77,7 @@ public function getAuthBytes(): string $region = $config['region']; - $url = "kafka://" . $this->host . "/?" . $query; + $url = "kafka://" . $host . "/?" . $query; $provider = CredentialProvider::defaultProvider(); // Returns a CredentialsInterface or throws. $creds = $provider()->wait(); @@ -93,7 +96,7 @@ public function getAuthBytes(): string self::SIGN_VERSION_KEY => self::SIGN_VERSION, self::SIGN_USER_AGENT_KEY => "php-kafka/sasl/aws_msk_iam/" . PHP_VERSION, self::SIGN_ACTION_KEY => self::SIGN_ACTION, - self::SIGN_HOST_KEY => $this->host + self::SIGN_HOST_KEY => $host ); foreach ($params as $params_key => $params_value) { diff --git a/src/Sasl/PlainSasl.php b/src/Sasl/PlainSasl.php index 5f7c98b..c6bcfec 100644 --- a/src/Sasl/PlainSasl.php +++ b/src/Sasl/PlainSasl.php @@ -6,6 +6,7 @@ use longlang\phpkafka\Config\CommonConfig; use longlang\phpkafka\Exception\KafkaErrorException; +use longlang\phpkafka\Socket\SocketInterface; class PlainSasl implements SaslInterface { @@ -40,4 +41,9 @@ public function getAuthBytes(): string return sprintf("\x00%s\x00%s", $config['username'], $config['password']); } + + public function setSocket(SocketInterface $socket): void + { + // we do nothing here + } } diff --git a/src/Sasl/SaslInterface.php b/src/Sasl/SaslInterface.php index 68265a0..152b9f4 100644 --- a/src/Sasl/SaslInterface.php +++ b/src/Sasl/SaslInterface.php @@ -5,6 +5,7 @@ namespace longlang\phpkafka\Sasl; use longlang\phpkafka\Config\CommonConfig; +use longlang\phpkafka\Socket\SocketInterface; interface SaslInterface { @@ -19,4 +20,6 @@ public function getName(): string; * 返回授权信息. */ public function getAuthBytes(): string; + + public function setSocket(SocketInterface $socket): void; } From 9e1819143dede941431daefd9775d1af997d799e Mon Sep 17 00:00:00 2001 From: Mauro Valota Date: Mon, 19 Dec 2022 11:52:50 +0100 Subject: [PATCH 9/9] Fixed consumer_msk example --- examples/consumer_msk.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/consumer_msk.php b/examples/consumer_msk.php index 8326757..f8e351c 100644 --- a/examples/consumer_msk.php +++ b/examples/consumer_msk.php @@ -22,8 +22,8 @@ function consume(ConsumeMessage $message): void $config->setInterval(0.1); $config->setSasl([ "type"=> AwsMskIamSasl::class, - "host"=>"localhost", - "region"=>"eu-west-1" + "region"=>"eu-west-1", + "expiration" => "+5 minutes" ]); $consumer = new Consumer($config, 'consume'); $consumer->start();