Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .DS_Store
Binary file not shown.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Jar to be download on academy
kafka-connect-dse-1.0.0.jar

# eclipse conf file
.settings
.classpath
Expand All @@ -19,4 +22,5 @@ dist
# misc
.DS_Store
.java-version

nohup.out
webui.log
17 changes: 0 additions & 17 deletions .project

This file was deleted.

4 changes: 0 additions & 4 deletions .settings/org.eclipse.m2e.core.prefs

This file was deleted.

110 changes: 88 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,51 +1,117 @@
# Reactive Driver and Kafka

### Projects overview :
- *[kafka-dse-core](kafka-dse-core)* contains domain beans and utility classes
- [kafka-dse-producer](kafka-dse-producer) is a standalone Spring Boot application that generates random Tick Data load from AlphaVantage and then random
# DataStax Apache Kafka™ Connector and Reactive Driver

This project is a demo illustrating *DataStax Apache Kafka™ Connector* and the new reactive driver. Stocks valuations events are generated, pushed to Kafka and sent to DataStax Enterprise.

<img src="./pics/kafkadse-3.png" height="200" />


## Overview

### Architecture

<img src="./pics/kafkadse-2.png" height="200" />

### Maven modules :

- [kafka-dse-core](kafka-dse-core) contains domain beans and utility classes
- [kafka-dse-producer](kafka-dse-producer) is a standalone Spring Boot application that generates random Tick Data load from AlphaVantage and send Json events to kafka topics `stocks_ticks`
- [kafka-dse-consumer](kafka-dse-consumer) is a standalone Spring Boot application that read from
- [kafka-dse-webui](kafka-dse-webui) is the standalone web UI.

<img src="./pics/02-archi.png" height="300" />

## Install and run

## Start the project
1. Clone this repository on your laptop, driver2 branch

#### Use docker compose to start all components
```bash
git clone -b driver2 https://github.com/clun/kafka-dse.git
```

2. Compile source code and modules

```
cd kafka-dse

mvn clean install
```

3. Start `Kafka` and `DataStax` components using the following command :

```bash
docker-compose up -d
```

A `docker ps` command will list you the following containers :
- **datastax/dse-server:6.7.0** is the DataStax enterprise server (listening on 9042)
- **datastax/dse-studio:6.7.0** is the WebUI available 9091 [DataStax Studio ](http://localhost:9091)
- **wurstmeister/zookeeper:3.4.6** is the Zookeeper ensuring consistency for Kafka (listening on 2181)
- **wurstmeister/kafka:1.0.0** the kafka server (listening on 9092)

### 2. Start the `kafka-dse-producer` component.
You have now access to :
- [KafkaHQ](http://localhost:8080/docker-kafka-server/topic), a web ui to look into Kafka topics.
- [DataStax Studio](http://localhost:9091) : edit the connection to point to `dse` host name of datastax Enterprise

Open the folder kafka-dse-producer :
4. Start the `kafka-dse-producer` component. Open the folder `kafka-dse-producer` and execute the following

```
# Start Web Application
cd kafka-dse-producer

mvn spring-boot:run
```

This will create the `KeySpace` `demo_kafka` if needed, the tables to work with and fill the reference table `stocks_infos` with Data coming fron the CSV
This will create the keySpace `demo_kafka` if needed, and the expected tables `stocks_infos` and `stocks_ticks`. `stocks_infos` is loaded with a list of Stocks codes to be retrieved.

This is a Camel Application with no UI, still you should access the [SpringBoot Administration Console](ttp://localhost:8088/admin#/wallboard)
To see incoming event in the topic you can use the [KafkaHQ UI](http://localhost:8080/docker-kafka-server/topic) and use the embedded tool provided by Kafka:

<img src="./pics/AdminConsole.png" height="300px" />
```
docker exec -i -t kafka-dse_kafka_1 kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic stocks_ticks
```

### 3. Start the `kafka-dse-webui` component.
You should see something like

```
{"symbol":"TGT","valueDate":"2019-03-06T09:42:51.242Z","value":72.29042675857293}
{"symbol":"MPC","valueDate":"2019-03-06T09:42:51.243Z","value":59.04528781377921}
{"symbol":"CVS","valueDate":"2019-03-06T09:42:51.243Z","value":53.99304938100401}
{"symbol":"ABC","valueDate":"2019-03-06T09:42:51.243Z","value":81.10990156983966}
{"symbol":"C","valueDate":"2019-03-06T09:42:51.243Z","value":62.885685814641356}
{"symbol":"VZ","valueDate":"2019-03-06T09:42:51.243Z","value":58.235384086812275}
{"symbol":"CVX","valueDate":"2019-03-06T09:42:51.243Z","value":122.11841963153303}
{"symbol":"JPM","valueDate":"2019-03-06T09:42:51.243Z","value":107.49725481587726}
{"symbol":"F","valueDate":"2019-03-06T09:42:51.243Z","value":8.861661896519792}
{"symbol":"BRK.A","valueDate":"2019-03-06T09:42:51.243Z","value":301526.64877576346}
{"symbol":"JNJ","valueDate":"2019-03-06T09:42:51.243Z","value":142.508769918913}
{"symbol":"ADM","valueDate":"2019-03-06T09:42:51.243Z","value":42.757409759352036}
{"symbol":"UNH","valueDate":"2019-03-06T09:42:51.243Z","value":243.075302209574}
{"symbol":"PFE","valueDate":"2019-03-06T09:42:51.243Z","value":42.58821518419893}
{"symbol":"PRU","valueDate":"2019-03-06T09:42:51.243Z","value":101.21939105941746}
{"symbol":"T","valueDate":"2019-03-06T09:42:51.243Z","value":33.00476819346831}
{"symbol":"WFC","valueDate":"2019-03-06T09:42:51.243Z","value":49.8785431886429}
{"symbol":"IBM","valueDate":"2019-03-06T09:42:51.243Z","value":135.1453199656281}
{"symbol":"VLO","valueDate":"2019-03-06T09:42:51.244Z","value":80.76656130882697}
```

5. Start the `kafka-dse-webui` component.

Open the folder kafka-dse-webui :

```
# Start Web Application
cd ../kafka-dse-webui

mvn spring-boot:run
```

[Access the web UI](http://localhost:8082)
You will have access to :
- [The web UI](http://localhost:8082) user interface to see charts


You can start `kafka-dse-consumer` component using again `spring-boot:run` and see events coming in Datastax Enterprise and the UI but this one is only for test. But you want to use the sink.

6. Download the *DataStax Apache Kafka™ Connector* from [academy.datastax.com] and put the library in `kafka-dse-sink` folder here. Then execute the following :

```
docker-compose -f docker-compose-connect.yml up
```

Kafka-Connect is starting. You can in the log that the class `com.datastax.kafkaconnector.DseSinkConnector` is detected in the classpath. When the service is up (http interface is OK) we register the sink using a POST query.

You can look to `kafka-connect` [sinks here](http://localhost:18083/connectors/) ou our [config here](http://localhost:18083/connectors/dse_stock_ticks)

More informations on the [DataStax Documentation](https://docs.datastax.com/en/kafka/doc/index.html)


51 changes: 51 additions & 0 deletions demo.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

DEMO

curl -s \
-X "POST" "http://localhost:18083/connectors/" \
-H "Content-Type: application/json" \
-d '{
"name": "dse_tx_flat",
"config": {
"connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
"tasks.max": "1",
"topics": "stocks_ticks",
"contactPoints": "dse",
"loadBalancing.localDc": "DC1",
"port": 9042,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"jmx": false,
"compression": "None",
"auth.provider": "None",s
"auth.username": "",
"auth.password": "",
"auth.gssapi.keyTab": "",
"auth.gssapi.principal": "",
"auth.gssapi.service": "dse",
"ssl.provider": "None",
"ssl.hostnameValidation": true,
"ssl.keystore.password": "",
"ssl.keystore.path": "",
"ssl.openssl.keyCertChain": "",
"ssl.openssl.privateKey": "",
"ssl.truststore.password": "",
"ssl.truststore.path": "",
"ssl.cipherSuites": "",
"topic.stocks_ticks.demo_kafka.stocks_ticks.mapping": "symbol=value.symbol, value=value.value, value_date=value.valueDate",
"topic.stocks_ticks.demo_kafka.stocks_ticks.consistencyLevel": "LOCAL_ONE",
"topic.stocks_ticks.demo_kafka.stocks_ticks.ttl": -1,
"topic.stocks_ticks.demo_kafka.stocks_ticks.nullToUnset": "true",
"topic.stocks_ticks.demo_kafka.stocks_ticks.deletesEnabled": "true",
"topic.stocks_ticks.codec.locale": "en_US",
"topic.stocks_ticks.codec.timeZone": "UTC",
"topic.stocks_ticks.codec.timestamp": "CQL_TIMESTAMP",
"topic.stocks_ticks.codec.date": "ISO_LOCAL_DATE",
"topic.stocks_ticks.codec.time": "ISO_LOCAL_TIME",
"topic.stocks_ticks.codec.unit": "MILLISECONDS"
}
}

-
92 changes: 92 additions & 0 deletions docker-compose-connect.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
version: '3'
services:
# As keyspaces and tables should already exist we need to start the applications first.
kafka-connect:
image: confluentinc/cp-kafka-connect:5.1.0
#depends_on:
# - zookeeper
# - kafka
ports:
- 18083:18083
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: 18083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/connectors,/tmp/dse-kafka-connect/jars'
volumes:
- $PWD/kafka-dse-sink:/tmp/dse-kafka-connect/jars
command:
- bash
- -c
- |
/etc/confluent/docker/run &
echo "Waiting for Kafka Connect to start listening on $$CONNECT_REST_ADVERTISED_HOST_NAME ⏳"
while [ $$(curl -s -o /dev/null -w %{http_code} http://$$CONNECT_REST_ADVERTISED_HOST_NAME:$$CONNECT_REST_PORT/connectors) -eq 000 ] ; do
echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://$$CONNECT_REST_ADVERTISED_HOST_NAME:$$CONNECT_REST_PORT/connectors) " (waiting for 200)"
sleep 5
done
nc -vz $$CONNECT_REST_ADVERTISED_HOST_NAME $$CONNECT_REST_PORT
echo -e "\n--\n+> Creating Kafka Connect DataStax Sink"
curl -s \
-X "POST" "http://localhost:18083/connectors/" \
-H "Content-Type: application/json" \
-d '{
"name": "dse_stock_ticks",
"config": {
"connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
"tasks.max": "1",
"topics": "stocks_ticks",
"contactPoints": "dse",
"loadBalancing.localDc": "DC1",
"port": 9042,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"jmx": false,
"compression": "None",
"auth.provider": "None",
"auth.username": "",
"auth.password": "",
"auth.gssapi.keyTab": "",
"auth.gssapi.principal": "",
"auth.gssapi.service": "dse",
"ssl.provider": "None",
"ssl.hostnameValidation": true,
"ssl.keystore.password": "",
"ssl.keystore.path": "",
"ssl.openssl.keyCertChain": "",
"ssl.openssl.privateKey": "",
"ssl.truststore.password": "",
"ssl.truststore.path": "",
"ssl.cipherSuites": "",
"topic.stocks_ticks.demo_kafka.stocks_ticks.mapping": "symbol=value.symbol, value=value.value, value_date=value.valueDate",
"topic.stocks_ticks.demo_kafka.stocks_ticks.consistencyLevel": "LOCAL_ONE",
"topic.stocks_ticks.demo_kafka.stocks_ticks.ttl": -1,
"topic.stocks_ticks.demo_kafka.stocks_ticks.nullToUnset": "true",
"topic.stocks_ticks.demo_kafka.stocks_ticks.deletesEnabled": "true",
"topic.stocks_ticks.codec.locale": "en_US",
"topic.stocks_ticks.codec.timeZone": "UTC",
"topic.stocks_ticks.codec.timestamp": "CQL_TIMESTAMP",
"topic.stocks_ticks.codec.date": "ISO_LOCAL_DATE",
"topic.stocks_ticks.codec.time": "ISO_LOCAL_TIME",
"topic.stocks_ticks.codec.unit": "MILLISECONDS"
}
}'
sleep infinity

45 changes: 0 additions & 45 deletions docker-compose-kafka.yml

This file was deleted.

Loading