From 7fe3a36ea74e5794b1ac1137b2ecb2ae35db84f9 Mon Sep 17 00:00:00 2001 From: Stephane Maarek Date: Fri, 12 Jan 2018 11:33:23 +1100 Subject: [PATCH 01/11] updated scala bits to Kafka Streams 1.0.0 --- favourite-colour-scala/build.sbt | 2 +- .../udemy/kafka/streams/FavouriteColourAppScala.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/favourite-colour-scala/build.sbt b/favourite-colour-scala/build.sbt index ba01c4a..de6c6f2 100644 --- a/favourite-colour-scala/build.sbt +++ b/favourite-colour-scala/build.sbt @@ -5,7 +5,7 @@ scalaVersion := "2.12.3" // https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams libraryDependencies ++= Seq( - "org.apache.kafka" % "kafka-streams" % "0.11.0.0", + "org.apache.kafka" % "kafka-streams" % "1.0.0", "org.slf4j" % "slf4j-api" % "1.7.25", "org.slf4j" % "slf4j-log4j12" % "1.7.25" ) diff --git a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala index d3f1073..f21854d 100644 --- a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala +++ b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala @@ -5,8 +5,8 @@ import java.util.Properties import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable} -import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} +import org.apache.kafka.streams.kstream.{KStream, KTable} +import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig} object FavouriteColourAppScala { def main(args: Array[String]): Unit = { @@ -21,7 +21,7 @@ object FavouriteColourAppScala { // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0") - val builder: KStreamBuilder = new KStreamBuilder + val builder: StreamsBuilder = new StreamsBuilder // Step 1: We create the topic of users keys to colours val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input") @@ -51,7 +51,7 @@ object FavouriteColourAppScala { // 6 - we output the results to a Kafka Topic - don't forget the serializers favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala") - val streams: KafkaStreams = new KafkaStreams(builder, config) + val streams: KafkaStreams = new KafkaStreams(builder.build(), config) streams.cleanUp() streams.start() From 2ce8fccd8c4a7fd31a15adcfd04f95b701520854 Mon Sep 17 00:00:00 2001 From: Stephane Maarek Date: Fri, 12 Jan 2018 11:52:42 +1100 Subject: [PATCH 02/11] more API changes due to deprecation --- .../udemy/kafka/streams/FavouriteColourAppScala.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala index f21854d..2c5ac0c 100644 --- a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala +++ b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala @@ -5,7 +5,9 @@ import java.util.Properties import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.kstream.{KStream, KTable} +import org.apache.kafka.common.utils.Bytes +import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced} +import org.apache.kafka.streams.state.KeyValueStore import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig} object FavouriteColourAppScala { @@ -45,11 +47,11 @@ object FavouriteColourAppScala { // step 3 - we count the occurences of colours val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable // 5 - we group by colour within the KTable - .groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour)) - .count("CountsByColours") + .groupBy[String, String]((user: String, colour: String) => new KeyValue[String, String](colour, colour)) + .count(Materialized.as[String, lang.Long, KeyValueStore[Bytes, Array[Byte]]]("CountsByColours")) // 6 - we output the results to a Kafka Topic - don't forget the serializers - favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala") + favouriteColours.toStream.to("favourite-colour-output-scala", Produced.`with`(Serdes.String, Serdes.Long)) val streams: KafkaStreams = new KafkaStreams(builder.build(), config) streams.cleanUp() From 9f04592eaa66afd8c9ca12c124f23de77a5896bd Mon Sep 17 00:00:00 2001 From: Stephane Maarek Date: Fri, 12 Jan 2018 11:54:51 +1100 Subject: [PATCH 03/11] last bit of deprecation --- .../udemy/kafka/streams/FavouriteColourAppScala.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala index 2c5ac0c..9eace49 100644 --- a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala +++ b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala @@ -58,7 +58,7 @@ object FavouriteColourAppScala { streams.start() // print the topology - System.out.println(streams.toString) + streams.localThreadsMetadata().forEach(t => System.out.print(t.toString)) // shutdown hook to correctly close the streams application Runtime.getRuntime.addShutdownHook(new Thread { From 61c6d4021593e54b5c2e469f4495842155fc6e64 Mon Sep 17 00:00:00 2001 From: Stephane Maarek Date: Fri, 12 Jan 2018 12:14:04 +1100 Subject: [PATCH 04/11] final type checks needed by Scala --- .../kafka/streams/FavouriteColourAppScala.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala index 9eace49..bc23942 100644 --- a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala +++ b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala @@ -4,9 +4,9 @@ import java.lang import java.util.Properties import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.common.serialization.{Serde, Serdes} import org.apache.kafka.common.utils.Bytes -import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced} +import org.apache.kafka.streams.kstream._ import org.apache.kafka.streams.state.KeyValueStore import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig} @@ -44,14 +44,20 @@ object FavouriteColourAppScala { // step 2 - we read that topic as a KTable so that updates are read correctly val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic) + val stringSerde: Serde[String] = Serdes.String + val longSerde: Serde[lang.Long] = Serdes.Long + // step 3 - we count the occurences of colours val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable // 5 - we group by colour within the KTable - .groupBy[String, String]((user: String, colour: String) => new KeyValue[String, String](colour, colour)) + .groupBy( + (user: String, colour: String) => new KeyValue[String, String](colour, colour), + Serialized.`with`(stringSerde, stringSerde) + ) .count(Materialized.as[String, lang.Long, KeyValueStore[Bytes, Array[Byte]]]("CountsByColours")) // 6 - we output the results to a Kafka Topic - don't forget the serializers - favouriteColours.toStream.to("favourite-colour-output-scala", Produced.`with`(Serdes.String, Serdes.Long)) + favouriteColours.toStream.to("favourite-colour-output-scala", Produced.`with`(stringSerde, longSerde)) val streams: KafkaStreams = new KafkaStreams(builder.build(), config) streams.cleanUp() From 6669f10e9b3f2fa35dc3fb06100bbf047bade9e8 Mon Sep 17 00:00:00 2001 From: Mithun Singh Date: Wed, 10 Jan 2018 00:27:14 -0600 Subject: [PATCH 05/11] Updated kafka versions to 1.0.0 modified: .gitignore --- .gitignore | 8 ++++- README.md | 2 +- bank-balance-exactly-once/pom.xml | 4 +-- .../streams/BankBalanceExactlyOnceApp.java | 33 ++++++++++--------- favourite-colour-java/pom.xml | 2 +- .../kafka/streams/FavouriteColourApp.java | 16 ++++----- streams-starter-project/pom.xml | 2 +- .../kafka/streams/StreamsStarterApp.java | 8 ++--- user-event-enricher/pom.xml | 2 +- .../kafka/streams/UserEventEnricherApp.java | 8 ++--- word-count/.gitignore | 1 + word-count/pom.xml | 2 +- .../udemy/kafka/streams/WordCountApp.java | 14 ++++---- 13 files changed, 57 insertions(+), 45 deletions(-) create mode 100644 word-count/.gitignore diff --git a/.gitignore b/.gitignore index 692e68f..b7e2b3a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,10 @@ target/ .idea/ *.iml -.DS_STORE \ No newline at end of file +.DS_STORE +/.classpath +*/.classpath +/.project +*/.project +*/.settings +bin/ diff --git a/README.md b/README.md index b520e72..ced3cc5 100644 --- a/README.md +++ b/README.md @@ -18,4 +18,4 @@ Happy learning! - Word Count to learn the basic API - Favourite Colour for a more advanced example (`Scala` version included) - Bank Balance to demonstrate exactly once semantics - - User Event matcher to learn about joins between `KStream` and `GlobalKTable`. \ No newline at end of file + - User Event matcher to learn about joins between `KStream` and `GlobalKTable`. diff --git a/bank-balance-exactly-once/pom.xml b/bank-balance-exactly-once/pom.xml index 95e0266..dbfbebc 100644 --- a/bank-balance-exactly-once/pom.xml +++ b/bank-balance-exactly-once/pom.xml @@ -16,7 +16,7 @@ org.apache.kafka kafka-streams - 0.11.0.1 + 1.0.0 @@ -24,7 +24,7 @@ org.apache.kafka kafka-clients - 0.11.0.1 + 1.0.0 diff --git a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java index a71f573..9a91630 100644 --- a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java +++ b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java @@ -10,14 +10,19 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.processor.ThreadMetadata; import java.time.Instant; import java.util.Properties; +import java.util.Set; public class BankBalanceExactlyOnceApp { @@ -33,18 +38,17 @@ public static void main(String[] args) { // Exactly once processing!! config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - + // json Serde final Serializer jsonSerializer = new JsonSerializer(); final Deserializer jsonDeserializer = new JsonDeserializer(); final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); - - - KStreamBuilder builder = new KStreamBuilder(); - - KStream bankTransactions = - builder.stream(Serdes.String(), jsonSerde, "bank-transactions"); - + + StreamsBuilder builder = new StreamsBuilder(); + + KStream bankTransactions = builder.stream("bank-transactions"); + + // create the initial json object for balances ObjectNode initialBalance = JsonNodeFactory.instance.objectNode(); @@ -53,22 +57,21 @@ public static void main(String[] args) { initialBalance.put("time", Instant.ofEpochMilli(0L).toString()); KTable bankBalance = bankTransactions - .groupByKey(Serdes.String(), jsonSerde) + .groupByKey(Serialized.with(Serdes.String(), jsonSerde)) .aggregate( () -> initialBalance, (key, transaction, balance) -> newBalance(transaction, balance), - jsonSerde, - "bank-balance-agg" + Materialized.as("bank-balance-agg") ); - bankBalance.to(Serdes.String(), jsonSerde,"bank-balance-exactly-once"); + bankBalance.toStream().to("bank-balance-exactly-once"); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.cleanUp(); streams.start(); // print the topology - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); diff --git a/favourite-colour-java/pom.xml b/favourite-colour-java/pom.xml index a973825..eef3d1a 100644 --- a/favourite-colour-java/pom.xml +++ b/favourite-colour-java/pom.xml @@ -15,7 +15,7 @@ org.apache.kafka kafka-streams - 0.11.0.0 + 1.0.0 diff --git a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java index 4828745..117b72a 100644 --- a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java +++ b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java @@ -2,16 +2,17 @@ import java.util.Properties; import java.util.Arrays; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; public class FavouriteColourApp { @@ -26,8 +27,7 @@ public static void main(String[] args) { // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); - KStreamBuilder builder = new KStreamBuilder(); - + StreamsBuilder builder = new StreamsBuilder(); // Step 1: We create the topic of users keys to colours KStream textLines = builder.stream("favourite-colour-input"); @@ -50,18 +50,18 @@ public static void main(String[] args) { KTable favouriteColours = usersAndColoursTable // 5 - we group by colour within the KTable .groupBy((user, colour) -> new KeyValue<>(colour, colour)) - .count("CountsByColours"); + .count(Materialized.as("CountsByColours")); // 6 - we output the results to a Kafka Topic - don't forget the serializers - favouriteColours.to(Serdes.String(), Serdes.Long(),"favourite-colour-output"); + favouriteColours.toStream().to("favourite-colour-output",Produced.with(Serdes.String(),Serdes.Long())); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); // only do this in dev - not in prod streams.cleanUp(); streams.start(); // print the topology - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); diff --git a/streams-starter-project/pom.xml b/streams-starter-project/pom.xml index 84f1104..3e5713c 100644 --- a/streams-starter-project/pom.xml +++ b/streams-starter-project/pom.xml @@ -13,7 +13,7 @@ org.apache.kafka kafka-streams - 0.11.0.0 + 1.0.0 diff --git a/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java b/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java index 5a88d5f..3f4f842 100644 --- a/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java +++ b/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java @@ -3,9 +3,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import java.util.Properties; @@ -20,18 +20,18 @@ public static void main(String[] args) { config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - KStreamBuilder builder = new KStreamBuilder(); + StreamsBuilder builder = new StreamsBuilder(); KStream kStream = builder.stream("input-topic-name"); // do stuff kStream.to("word-count-output"); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.cleanUp(); // only do this in dev - not in prod streams.start(); // print the topology - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); diff --git a/user-event-enricher/pom.xml b/user-event-enricher/pom.xml index 0be5452..d8d6cef 100644 --- a/user-event-enricher/pom.xml +++ b/user-event-enricher/pom.xml @@ -14,7 +14,7 @@ org.apache.kafka kafka-streams - 0.11.0.0 + 1.0.0 diff --git a/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java b/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java index 180aae6..314e1fd 100644 --- a/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java +++ b/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java @@ -3,10 +3,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import java.util.Properties; @@ -20,7 +20,7 @@ public static void main(String[] args) { config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - KStreamBuilder builder = new KStreamBuilder(); + StreamsBuilder builder = new StreamsBuilder(); // we get a global table out of Kafka. This table will be replicated on each Kafka Streams application // the key of our globalKTable is the user ID @@ -55,12 +55,12 @@ public static void main(String[] args) { userPurchasesEnrichedLeftJoin.to("user-purchases-enriched-left-join"); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.cleanUp(); // only do this in dev - not in prod streams.start(); // print the topology - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); diff --git a/word-count/.gitignore b/word-count/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/word-count/.gitignore @@ -0,0 +1 @@ +/bin/ diff --git a/word-count/pom.xml b/word-count/pom.xml index 8e0f93f..ea7b6d6 100644 --- a/word-count/pom.xml +++ b/word-count/pom.xml @@ -15,7 +15,7 @@ org.apache.kafka kafka-streams - 0.11.0.0 + 1.0.0 diff --git a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java index 21f64a9..a8b42ac 100644 --- a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java +++ b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java @@ -6,10 +6,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; public class WordCountApp { public static void main(String[] args) { @@ -20,7 +22,7 @@ public static void main(String[] args) { config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - KStreamBuilder builder = new KStreamBuilder(); + StreamsBuilder builder = new StreamsBuilder(); // 1 - stream from Kafka KStream textLines = builder.stream("word-count-input"); @@ -36,18 +38,18 @@ public static void main(String[] args) { // 5 - group by key before aggregation .groupByKey() // 6 - count occurences - .count("Counts"); + .count(Materialized.as("Counts")); // 7 - to in order to write the results back to kafka - wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output"); + wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long())); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); // Update: // print the topology every 10 seconds for learning purposes while(true){ - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); try { Thread.sleep(5000); } catch (InterruptedException e) { From 5393c7f71781449ee46139af5acbb9624a9f2659 Mon Sep 17 00:00:00 2001 From: Stephane Maarek Date: Fri, 19 Jan 2018 19:22:59 +1100 Subject: [PATCH 06/11] pushing last fixes to add Serdes to count to Scala operator --- .../udemy/kafka/streams/FavouriteColourAppScala.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala index bc23942..18d5c25 100644 --- a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala +++ b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala @@ -54,7 +54,9 @@ object FavouriteColourAppScala { (user: String, colour: String) => new KeyValue[String, String](colour, colour), Serialized.`with`(stringSerde, stringSerde) ) - .count(Materialized.as[String, lang.Long, KeyValueStore[Bytes, Array[Byte]]]("CountsByColours")) + .count(Materialized.as[String, lang.Long, KeyValueStore[Bytes, Array[Byte]]]("CountsByColours") + .withKeySerde(stringSerde) + .withValueSerde(longSerde)) // 6 - we output the results to a Kafka Topic - don't forget the serializers favouriteColours.toStream.to("favourite-colour-output-scala", Produced.`with`(stringSerde, longSerde)) From 2b7f3b9162f4f7fb1fef92c19912fa207c7c90eb Mon Sep 17 00:00:00 2001 From: Stephane Maarek Date: Fri, 26 Jan 2018 09:14:53 +1100 Subject: [PATCH 07/11] finalised PR --- .../streams/BankBalanceExactlyOnceApp.java | 19 ++++++++++--------- .../kafka/streams/FavouriteColourApp.java | 12 ++++++++++-- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java index 9a91630..4bcb4cb 100644 --- a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java +++ b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java @@ -8,17 +8,16 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.connect.json.JsonSerializer; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.state.KeyValueStore; import java.time.Instant; import java.util.Properties; @@ -45,9 +44,9 @@ public static void main(String[] args) { final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); StreamsBuilder builder = new StreamsBuilder(); - - KStream bankTransactions = builder.stream("bank-transactions"); - + + KStream bankTransactions = builder.stream("bank-transactions", + Consumed.with(Serdes.String(), jsonSerde)); // create the initial json object for balances @@ -61,10 +60,12 @@ public static void main(String[] args) { .aggregate( () -> initialBalance, (key, transaction, balance) -> newBalance(transaction, balance), - Materialized.as("bank-balance-agg") + Materialized.>as("bank-balance-agg") + .withKeySerde(Serdes.String()) + .withValueSerde(jsonSerde) ); - bankBalance.toStream().to("bank-balance-exactly-once"); + bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde)); KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.cleanUp(); diff --git a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java index 117b72a..c5d115f 100644 --- a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java +++ b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java @@ -4,7 +4,9 @@ import java.util.Arrays; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -13,6 +15,7 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueStore; public class FavouriteColourApp { @@ -43,6 +46,9 @@ public static void main(String[] args) { usersAndColours.to("user-keys-and-colours"); + Serde stringSerde = Serdes.String(); + Serde longSerde = Serdes.Long(); + // step 2 - we read that topic as a KTable so that updates are read correctly KTable usersAndColoursTable = builder.table("user-keys-and-colours"); @@ -50,10 +56,12 @@ public static void main(String[] args) { KTable favouriteColours = usersAndColoursTable // 5 - we group by colour within the KTable .groupBy((user, colour) -> new KeyValue<>(colour, colour)) - .count(Materialized.as("CountsByColours")); + .count(Materialized.>as("CountsByColours") + .withKeySerde(stringSerde) + .withValueSerde(longSerde)); // 6 - we output the results to a Kafka Topic - don't forget the serializers - favouriteColours.toStream().to("favourite-colour-output",Produced.with(Serdes.String(),Serdes.Long())); + favouriteColours.toStream().to("favourite-colour-output", Produced.with(Serdes.String(),Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), config); // only do this in dev - not in prod From d6628aca60251cff3bf9d7ae5f58cda74aa29346 Mon Sep 17 00:00:00 2001 From: Chris Coy Date: Wed, 14 Feb 2018 15:05:42 -0700 Subject: [PATCH 08/11] Moved shutdown hook to reachable code block --- .../simplesteph/udemy/kafka/streams/WordCountApp.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java index a8b42ac..2cdbf20 100644 --- a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java +++ b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java @@ -46,6 +46,9 @@ public static void main(String[] args) { KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); + // shutdown hook to correctly close the streams application + Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); + // Update: // print the topology every 10 seconds for learning purposes while(true){ @@ -57,7 +60,6 @@ public static void main(String[] args) { } } - // shutdown hook to correctly close the streams application - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); + } } From 8d52ad66020a5936501ffbd65c18d1116200fcf0 Mon Sep 17 00:00:00 2001 From: Stephane Maarek Date: Thu, 29 Mar 2018 16:59:05 +0530 Subject: [PATCH 09/11] updated to kafka streams 1.1.0 --- bank-balance-exactly-once/pom.xml | 4 ++-- .../udemy/kafka/streams/BankBalanceExactlyOnceApp.java | 2 -- favourite-colour-java/pom.xml | 2 +- favourite-colour-scala/build.sbt | 2 +- streams-starter-project/pom.xml | 2 +- user-event-enricher/pom.xml | 2 +- word-count/pom.xml | 2 +- 7 files changed, 7 insertions(+), 9 deletions(-) diff --git a/bank-balance-exactly-once/pom.xml b/bank-balance-exactly-once/pom.xml index dbfbebc..9c02fde 100644 --- a/bank-balance-exactly-once/pom.xml +++ b/bank-balance-exactly-once/pom.xml @@ -16,7 +16,7 @@ org.apache.kafka kafka-streams - 1.0.0 + 1.1.0 @@ -24,7 +24,7 @@ org.apache.kafka kafka-clients - 1.0.0 + 1.1.0 diff --git a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java index 4bcb4cb..6ecd706 100644 --- a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java +++ b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java @@ -16,12 +16,10 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.state.KeyValueStore; import java.time.Instant; import java.util.Properties; -import java.util.Set; public class BankBalanceExactlyOnceApp { diff --git a/favourite-colour-java/pom.xml b/favourite-colour-java/pom.xml index eef3d1a..e5d1d84 100644 --- a/favourite-colour-java/pom.xml +++ b/favourite-colour-java/pom.xml @@ -15,7 +15,7 @@ org.apache.kafka kafka-streams - 1.0.0 + 1.1.0 diff --git a/favourite-colour-scala/build.sbt b/favourite-colour-scala/build.sbt index de6c6f2..3b71bd4 100644 --- a/favourite-colour-scala/build.sbt +++ b/favourite-colour-scala/build.sbt @@ -5,7 +5,7 @@ scalaVersion := "2.12.3" // https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams libraryDependencies ++= Seq( - "org.apache.kafka" % "kafka-streams" % "1.0.0", + "org.apache.kafka" % "kafka-streams" % "1.1.0", "org.slf4j" % "slf4j-api" % "1.7.25", "org.slf4j" % "slf4j-log4j12" % "1.7.25" ) diff --git a/streams-starter-project/pom.xml b/streams-starter-project/pom.xml index 3e5713c..9f1df0c 100644 --- a/streams-starter-project/pom.xml +++ b/streams-starter-project/pom.xml @@ -13,7 +13,7 @@ org.apache.kafka kafka-streams - 1.0.0 + 1.1.0 diff --git a/user-event-enricher/pom.xml b/user-event-enricher/pom.xml index d8d6cef..a0e9bc0 100644 --- a/user-event-enricher/pom.xml +++ b/user-event-enricher/pom.xml @@ -14,7 +14,7 @@ org.apache.kafka kafka-streams - 1.0.0 + 1.1.0 diff --git a/word-count/pom.xml b/word-count/pom.xml index ea7b6d6..08802e1 100644 --- a/word-count/pom.xml +++ b/word-count/pom.xml @@ -15,7 +15,7 @@ org.apache.kafka kafka-streams - 1.0.0 + 1.1.0 From 384a89b9dcd883748427179b97ab6c76fb6c30e9 Mon Sep 17 00:00:00 2001 From: Stephane Maarek Date: Thu, 29 Mar 2018 17:10:21 +0530 Subject: [PATCH 10/11] scala weirdness in 1.1.0 for ValueMappers --- .../udemy/kafka/streams/FavouriteColourAppScala.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala index 18d5c25..9428b81 100644 --- a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala +++ b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala @@ -34,7 +34,9 @@ object FavouriteColourAppScala { // 2 - we select a key that will be the user id (lowercase for safety) .selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase) // 3 - we get the colour from the value (lowercase for safety) - .mapValues[String]((value: String) => value.split(",")(1).toLowerCase) + .mapValues[String](new ValueMapper[String, String] { + override def apply(value: String): String = { value.split(",")(1).toLowerCase } + }) // 4 - we filter undesired colours (could be a data sanitization step) .filter((user: String, colour: String) => List("green", "blue", "red").contains(colour)) From 5d9b7fad6760a3f3fc462b7d45686e69e9f8587f Mon Sep 17 00:00:00 2001 From: Stephane Maarek Date: Sat, 7 Apr 2018 14:10:44 +0530 Subject: [PATCH 11/11] added wordcount tests --- word-count/pom.xml | 15 ++++ .../udemy/kafka/streams/WordCountApp.java | 24 +++-- .../udemy/kafka/streams/WordCountAppTest.java | 89 +++++++++++++++++++ 3 files changed, 120 insertions(+), 8 deletions(-) create mode 100644 word-count/src/test/java/com/github/simplesteph/udemy/kafka/streams/WordCountAppTest.java diff --git a/word-count/pom.xml b/word-count/pom.xml index 08802e1..489ff40 100644 --- a/word-count/pom.xml +++ b/word-count/pom.xml @@ -18,6 +18,21 @@ 1.1.0 + + org.apache.kafka + kafka-streams-test-utils + 1.1.0 + test + + + + + junit + junit + 4.12 + test + + diff --git a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java index 2cdbf20..4e8a118 100644 --- a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java +++ b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java @@ -8,20 +8,15 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; public class WordCountApp { - public static void main(String[] args) { - Properties config = new Properties(); - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); - config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); - config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + public Topology createTopology(){ StreamsBuilder builder = new StreamsBuilder(); // 1 - stream from Kafka @@ -43,7 +38,20 @@ public static void main(String[] args) { // 7 - to in order to write the results back to kafka wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long())); - KafkaStreams streams = new KafkaStreams(builder.build(), config); + return builder.build(); + } + + public static void main(String[] args) { + Properties config = new Properties(); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + WordCountApp wordCountApp = new WordCountApp(); + + KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config); streams.start(); // shutdown hook to correctly close the streams application diff --git a/word-count/src/test/java/com/github/simplesteph/udemy/kafka/streams/WordCountAppTest.java b/word-count/src/test/java/com/github/simplesteph/udemy/kafka/streams/WordCountAppTest.java new file mode 100644 index 0000000..f88d610 --- /dev/null +++ b/word-count/src/test/java/com/github/simplesteph/udemy/kafka/streams/WordCountAppTest.java @@ -0,0 +1,89 @@ +package com.github.simplesteph.udemy.kafka.streams; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.streams.test.OutputVerifier; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class WordCountAppTest { + + TopologyTestDriver testDriver; + + StringSerializer stringSerializer = new StringSerializer(); + + ConsumerRecordFactory recordFactory = + new ConsumerRecordFactory<>(stringSerializer, stringSerializer); + + + @Before + public void setUpTopologyTestDriver(){ + Properties config = new Properties(); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + WordCountApp wordCountApp = new WordCountApp(); + Topology topology = wordCountApp.createTopology(); + testDriver = new TopologyTestDriver(topology, config); + } + + @After + public void closeTestDriver(){ + testDriver.close(); + } + + public void pushNewInputRecord(String value){ + testDriver.pipeInput(recordFactory.create("word-count-input", null, value)); + } + + @Test + public void dummyTest(){ + String dummy = "Du" + "mmy"; + assertEquals(dummy, "Dummy"); + } + + public ProducerRecord readOutput(){ + return testDriver.readOutput("word-count-output", new StringDeserializer(), new LongDeserializer()); + } + + @Test + public void makeSureCountsAreCorrect(){ + String firstExample = "testing Kafka Streams"; + pushNewInputRecord(firstExample); + OutputVerifier.compareKeyValue(readOutput(), "testing", 1L); + OutputVerifier.compareKeyValue(readOutput(), "kafka", 1L); + OutputVerifier.compareKeyValue(readOutput(), "streams", 1L); + assertEquals(readOutput(), null); + + String secondExample = "testing Kafka again"; + pushNewInputRecord(secondExample); + OutputVerifier.compareKeyValue(readOutput(), "testing", 2L); + OutputVerifier.compareKeyValue(readOutput(), "kafka", 2L); + OutputVerifier.compareKeyValue(readOutput(), "again", 1L); + + } + + @Test + public void makeSureWordsBecomeLowercase(){ + String upperCaseString = "KAFKA kafka Kafka"; + pushNewInputRecord(upperCaseString); + OutputVerifier.compareKeyValue(readOutput(), "kafka", 1L); + OutputVerifier.compareKeyValue(readOutput(), "kafka", 2L); + OutputVerifier.compareKeyValue(readOutput(), "kafka", 3L); + + } +}