From 644ed628a86b1f9bdc7a81047b9bd5bea265cb6c Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Besselat Date: Mon, 25 Jan 2021 16:29:25 +0100 Subject: [PATCH 1/2] switch to kafkaAdminClient api, deprecate zookeeper api --- project.clj | 2 +- src/gregor/core.clj | 158 +++++++++++++++++++++++++------------------- 2 files changed, 92 insertions(+), 68 deletions(-) diff --git a/project.clj b/project.clj index 8f8bd21..1f6f284 100644 --- a/project.clj +++ b/project.clj @@ -5,7 +5,7 @@ :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} :dependencies [[org.clojure/clojure "1.10.0"] - [org.apache.kafka/kafka_2.12 "2.1.1"]] + [org.apache.kafka/kafka_2.12 "2.4.1"]] :plugins [[lein-eftest "0.5.6"]] :deploy-repositories {"clojars" {:url "https://clojars.org/repo" :sign-releases false diff --git a/src/gregor/core.clj b/src/gregor/core.clj index c9db455..9fb6747 100644 --- a/src/gregor/core.clj +++ b/src/gregor/core.clj @@ -6,8 +6,12 @@ ConsumerRebalanceListener] [org.apache.kafka.clients.producer Producer KafkaProducer Callback ProducerRecord RecordMetadata] - [kafka.admin AdminUtils] - [kafka.utils ZkUtils] + [org.apache.kafka.clients.admin + KafkaAdminClient + DescribeConfigsOptions + DescribeTopicsOptions + NewTopic] + [org.apache.kafka.common.config ConfigResource ConfigResource$Type] [java.util.concurrent TimeUnit] [scala.collection JavaConversions]) (:require [clojure.set :as set] @@ -449,6 +453,18 @@ ;;;;;;;;;;;;;;;;;;;;;; +(defn admin + "Return a `KafkaAdminClient` + + The kafkaAdminClient is used to manage topics and brokers. + Args: + - `servers`: comma-separated host:port strs or list of strs as bootstrap servers" + ^KafkaAdminClient + [servers] + (KafkaAdminClient/create (as-properties {"bootstrap.servers" servers}))) + + + (defn- validate-zookeeper-config "A helper for validating a Zookeeper configuration map and applying default values. Any invalid item in the provided config will result in an assertion failure. @@ -478,99 +494,107 @@ merged)) -(defmacro with-zookeeper - "A utility macro for interacting with Zookeeper. - - Args: - - `zk-config`: a map with Zookeeper connection details. This will be validated using - `validate-zookeeper-config` before use. - - `zookeeper`: this will be bound to an instance of `ZkUtils` while the body is executed. - The connection to Zookeeper will be cleaned up when the body exits." - [zk-config zookeeper & body] - `(let [zk-config# (validate-zookeeper-config ~zk-config) - client-and-conn# (ZkUtils/createZkClientAndConnection - (:connection-string zk-config#) - (:session-timeout zk-config#) - (:connect-timeout zk-config#))] - (with-open [client# (._1 client-and-conn#) - connection# (._2 client-and-conn#)] - (let [~zookeeper (ZkUtils. client# connection# false)] - ~@body)))) - - -(def rack-aware-modes - {:disabled (kafka.admin.RackAwareMode$Disabled$.) - :enforced (kafka.admin.RackAwareMode$Enforced$.) - :safe (kafka.admin.RackAwareMode$Safe$.)}) - - -(defn- rack-aware-mode-constant - "Convert a keyword name for a `RackAwareMode` into the appropriate constant from the - underlying Kafka library. - - Args: - - `mode`: a keyword of the same name as one of the constants in `kafka.admin.RackAwareMode`." - [mode] - (when-not (contains? rack-aware-modes mode) - (throw (IllegalArgumentException. (format "Bad RackAwareMode: %s" mode)))) - (get rack-aware-modes mode)) - - (defn create-topic "Create a topic. Args: - - `zk-config`: a map with Zookeeper connection details as expected by `with-zookeeper`. + - `admin`: a kafkaAdminClient. - `topic`: the name of the topic to create. - an unnamed configuration map. Valid keys are as follows: `:partitions` (optional) The number of ways to partition the topic. Defaults to 1. `:replication-factor` (optional) The replication factor for the topic. Defaults to 1. - `:config` (optional) A map of configuration options for the topic. - `:rack-aware-mode` (optional) Control how rack aware replica assignment is done. - Valid values are `:disabled`, `:enforced`, `:safe`. - Default is `:safe`." - [zk-config topic {:keys [partitions replication-factor config rack-aware-mode] + `:config` (optional) A map of configuration options for the topic." + [^KafkaAdminClient admin topic {:keys [partitions replication-factor config] :or {partitions 1 replication-factor 1 - config nil - rack-aware-mode :safe}}] - (with-zookeeper zk-config zookeeper - (AdminUtils/createTopic zookeeper - topic - (int partitions) - (int replication-factor) - (as-properties config) - (rack-aware-mode-constant rack-aware-mode)))) + config nil}}] + (let [^NewTopic new-topic (NewTopic. topic (int partitions) (short replication-factor)) + ^NewTopic new-topic (if (some? config) (.configs new-topic config) new-topic)] + (.get (.all (.createTopics admin [new-topic]))))) (defn topics "Query existing topics. Args: - - `zk-config`: a map with Zookeeper connection details as expected by `with-zookeeper`." - [zk-config] - (with-zookeeper zk-config zookeeper - (-> zookeeper .getAllTopics JavaConversions/seqAsJavaList seq))) + - `admin`: a kafkaAdminClient" + [^KafkaAdminClient admin] + (seq + (.get + (.names + (.listTopics admin))))) + +(defn topics-partitions-summary + "Return a summary (partitions and replication factor) of a list of topics as a clojure map. + + Args + - `admin`: a KafkaAdminClient + - `topics`: a list of topics" + [^KafkaAdminClient admin topics] + (let [topics-description (.get + (.all + (.describeTopics + admin + topics)))] + (reduce + (fn [acc desc] + (assoc acc (.name desc) + {:partitions (count (.partitions desc)) + :replication-factor (count (.replicas (first (.partitions desc))))})) + {} + (vals topics-description)))) + +(defn- topic-config-resource + [topic] + (ConfigResource. ConfigResource$Type/TOPIC topic)) + +(defn- entries->map + [config include-default?] + (reduce (fn [acc entry] + (if include-default? + (assoc acc (.name entry) (.value entry)) + (if-not (= "DEFAULT_CONFIG" (str (.source entry))) + (assoc acc (.name entry) (.value entry)) + acc))) {} config)) + +(defn topics-config + "Return topics configuration. + + Args: + - `admin`: a kafkaAdminClient + - `topics`: A collection of topics to retrieve configuration + - `include-default?`: A boolean to indicate whether return default setting or not (default to false)" + ([^KafkaAdminClient admin topics] + (topics-config admin topics false)) + ([^KafkaAdminClient admin topics include-default?] + (let [topics-config-res (map topic-config-resource topics) + raw-topics-config (.get + (.all + (.describeConfigs + admin + topics-config-res)))] + (into {} (map + (fn [[k v]] + [(.name k) (entries->map (.entries v) include-default?)]) + raw-topics-config))))) (defn topic-exists? "Query whether or not a topic exists. Args: - - `zk-config`: a map with Zookeeper connection details as expected `with-zookeeper`. + - `admin`: a kafkaAdminClient - `topic`: The name of the topic to check for." - [zk-config topic] - (with-zookeeper zk-config zookeeper - (AdminUtils/topicExists zookeeper topic))) + [^KafkaAdminClient admin topic] + (.contains (topics admin) topic)) (defn delete-topic "Delete a topic. Args: - - `zk-config`: A map with Zookeeper connection details as expected by `with-zookeeper`. + - `admin`: a kafkaAdminClient - `topic`: The name of the topic to delete." - [zk-config topic] - (with-zookeeper zk-config zookeeper - (AdminUtils/deleteTopic zookeeper topic))) + [^KafkaAdminClient admin topic] + (.get (.all (.deleteTopics admin [topic])))) From 70796e0b98a7be7072a101231fdacceaa541fc34 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Besselat Date: Mon, 25 Jan 2021 16:56:34 +0100 Subject: [PATCH 2/2] [readme] --- README.md | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index e5c352a..3e35d8d 100644 --- a/README.md +++ b/README.md @@ -75,32 +75,37 @@ you'd like to provide a callback to be invoked when the send has been acknowledg ## Topic Management +Create an admin client + +```clojure +(def admin-client (admin "localhost:9092")) +``` + Create a topic: ```clojure -(create-topic {:connection-string "localhost:2181"} "some-topic" {}) +(create-topic admin-client "some-topic" {}) ``` That empty map can be used to specify configuration for number of topic partitions, replication factor, Delete a topic: ``` clojure -(delete-topic {:connection-string "localhost:2181"} "some-topic") +(delete-topic admin-client "some-topic") ``` Query about a topic's existence: ``` clojure -(topic-exists? {:connection-string "localhost:2181"} "some-topic") +(topic-exists? admin-client "some-topic") ``` List existing topics: ``` clojure -(topics {:connection-string "localhost:2181"}) +(topics admin-client) ``` - ## License Distributed under the Eclipse Public License either version 1.0 or (at your option) any