Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ dist

# misc
.DS_Store


.java-version
nohup.out
webui.log
119 changes: 54 additions & 65 deletions kafka-dse-core/pom.xml
Original file line number Diff line number Diff line change
@@ -1,67 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-dse-core</artifactId>
<name> + kafka-dse-core</name>
<description>DAO, BEAN</description>

<parent>
<groupId>com.datastax</groupId>
<artifactId>kafka-dse-example</artifactId>
<version>6.7-SNAPSHOT</version>
</parent>

<dependencies>

<!-- Java driver for DSE -->
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-core</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-mapping</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-extras</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-graph</artifactId>
</dependency>

<!-- Work with Json -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>


</dependencies>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-dse-core</artifactId>
<name>+ kafka-dse-core</name>
<description>DAO, BEAN</description>
<parent>
<groupId>com.datastax</groupId>
<artifactId>kafka-dse-example</artifactId>
<version>6.7-SNAPSHOT</version>
</parent>
<dependencies>
<!-- Java driver for DSE -->
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-core</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-reactor</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
</dependency>
<!-- Work with Json -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,130 +1,98 @@
package com.datastax.demo.conf;

import com.datastax.dse.driver.api.reactor.ReactorDseSession;
import com.datastax.dse.driver.api.reactor.ReactorDseSessionBuilder;
import com.datastax.dse.driver.internal.core.auth.DsePlainTextAuthProvider;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoaderBuilder;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StopWatch;
import org.springframework.util.StringUtils;

import com.datastax.demo.domain.LongToTimeStampCodec;
import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.datastax.driver.dse.DseCluster.Builder;
import com.datastax.driver.dse.DseSession;
import com.datastax.driver.dse.auth.DsePlainTextAuthProvider;
import com.datastax.driver.mapping.DefaultPropertyMapper;
import com.datastax.driver.mapping.MappingConfiguration;
import com.datastax.driver.mapping.MappingManager;
import com.datastax.driver.mapping.PropertyMapper;
import com.datastax.driver.mapping.PropertyTransienceStrategy;
import com.google.common.collect.ImmutableMap;

/**
* Connectivity to DSE (cassandra, graph, search).
*/
/** Connectivity to DSE (cassandra, graph, search). */
@Configuration
public class DseConfiguration {

/** Internal logger. */
private static final Logger LOGGER = LoggerFactory.getLogger(DseConfiguration.class);

@Value("#{'${dse.contactPoints}'.split(',')}")
public List < String > contactPoints;

@Value("${dse.port: 9042}")
public int port;

@Value("${dse.keyspace: system}")
public String keyspace;

@Value("${dse.username}")
public Optional < String > dseUsername;

@Value("${dse.password}")
public Optional < String > dsePassword;

@Value("${dse.localdc : dc1}")
public String localDc;

@Bean
public DseSession dseSession() {
long top = System.currentTimeMillis();
LOGGER.info("Initializing connection to DSE Cluster");

Builder clusterConfig = new Builder();
LOGGER.info(" + Contact Points : {}" , contactPoints);
contactPoints.stream().forEach(clusterConfig::addContactPoint);
LOGGER.info(" + Listening Port : {}", port);
clusterConfig.withPort(port);

if (dseUsername.isPresent() && dsePassword.isPresent() && dseUsername.get().length() > 0) {
AuthProvider cassandraAuthProvider = new DsePlainTextAuthProvider(dseUsername.get(), dsePassword.get());
clusterConfig.withAuthProvider(cassandraAuthProvider);
LOGGER.info(" + With username : {}", dseUsername.get());
}

// OPTIONS
clusterConfig.withQueryOptions(
new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM));

// Long <-> Timestamp
clusterConfig.withCodecRegistry(new CodecRegistry().register(new LongToTimeStampCodec()));

try {
// First Connect without Keyspace (to create if needed)
DseSession tmpSession = null;
try {
tmpSession = clusterConfig.build().connect();
tmpSession.execute(SchemaBuilder.createKeyspace(keyspace)
.ifNotExists().with()
.replication(ImmutableMap.of("class", "SimpleStrategy", "replication_factor", 1)));
LOGGER.info(" + Creating keyspace '{}' (if needed)", keyspace);
} finally {
if (tmpSession != null) {
tmpSession.close();
}
}

// Real Connection now
DseSession dseSession = clusterConfig.build().connect(keyspace);
LOGGER.info(" + Connection established to DSE Cluster \\_0_/ in {} millis.", System.currentTimeMillis() - top);
return dseSession;
} catch(InvalidQueryException iqe) {
LOGGER.error("\n-----------------------------------------\n\n"
+ "Keyspace '{}' seems does not exist. \nPlease update 'application.yml' with correct keyspace name or create one with:\n\n"
+ " create keyspace {} WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; \n\nI will create the "
+ "tables I need after that.\n-----------------------------------------",
keyspace, keyspace);
throw new IllegalStateException("", iqe);
}
/** Internal logger. */
private static final Logger LOGGER = LoggerFactory.getLogger(DseConfiguration.class);

@Value("#{'${dse.contactPoints}'.split(',')}")
private List<String> contactPoints;

@Value("${dse.port: 9042}")
private int port;

@Value(
"#{T(com.datastax.oss.driver.api.core.CqlIdentifier).fromInternal('${dse.keyspace: demo_kafka}')}")
public CqlIdentifier keyspace;

@Value("${dse.username}")
private String dseUsername;

@Value("${dse.password}")
private String dsePassword;

@Value("${dse.localdc: dc1}")
private String localDc;

@Bean
public ReactorDseSession dseSession() {

LOGGER.info("Initializing connection to DSE Cluster");
LOGGER.info("Contact Points : {}", contactPoints);
LOGGER.info("Listening Port : {}", port);
LOGGER.info("Local DC : {}", localDc);
LOGGER.info("Keyspace : {}", keyspace);

StopWatch stopWatch = new StopWatch();
stopWatch.start();

ReactorDseSessionBuilder sessionBuilder =
new ReactorDseSessionBuilder().withLocalDatacenter(localDc);

contactPoints
.stream()
.map(cp -> InetSocketAddress.createUnresolved(cp, port))
.forEach(sessionBuilder::addContactPoint);

DefaultDriverConfigLoaderBuilder configLoaderBuilder =
DefaultDriverConfigLoader.builder()
.withString(DefaultDriverOption.REQUEST_CONSISTENCY, "QUORUM");

if (!StringUtils.isEmpty(dseUsername) && !StringUtils.isEmpty(dsePassword)) {
LOGGER.info("Username : {}", dseUsername);
configLoaderBuilder
.withString(
DefaultDriverOption.AUTH_PROVIDER_CLASS, DsePlainTextAuthProvider.class.getName())
.withString(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, dseUsername)
.withString(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, dsePassword);
}

sessionBuilder.withConfigLoader(configLoaderBuilder.build());

// First Connect without Keyspace (to create it if needed)
try (ReactorDseSession tempSession = sessionBuilder.build()) {
LOGGER.info("Creating keyspace {} (if needed)", keyspace);
SimpleStatement createKeyspace =
SchemaBuilder.createKeyspace(keyspace).ifNotExists().withSimpleStrategy(1).build();
tempSession.execute(createKeyspace);
}

// Now create the actual session
try (ReactorDseSession dseSession = sessionBuilder.withKeyspace(keyspace).build()) {
stopWatch.stop();
LOGGER.info("Connection established to DSE Cluster \\_0_/ in {}.", stopWatch.prettyPrint());
return dseSession;
}

/**
* Use to create mapper and perform ORM on top of Cassandra tables.
*
* @param session
* current dse session.
* @return
* mapper
*/
@Bean
public MappingManager mappingManager(DseSession session) {
// Do not map all fields, only the annotated ones with @Column or @Fields
PropertyMapper propertyMapper = new DefaultPropertyMapper()
.setPropertyTransienceStrategy(PropertyTransienceStrategy.OPT_IN);
// Build configuration from mapping
MappingConfiguration configuration = MappingConfiguration.builder()
.withPropertyMapper(propertyMapper)
.build();
// Sample Manager with advance configuration
return new MappingManager(session, configuration);
}

}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
package com.datastax.demo.conf;

import com.datastax.oss.driver.api.core.CqlIdentifier;

/**
* Constants in DSE-DB Tables.
*
* @author DataStax Evangelist Team
*/
public interface DseConstants {

/** Table Names in Keyspace (Columns are defined in Beans). */
String STOCKS_MINUTE = "stocks_by_min";
String STOCKS_HOUR = "stocks_by_hour";
String STOCKS_DAY = "stocks_by_day";

String STOCKS_TICKS = "stocks_ticks";
String STOCKS_INFOS = "stocks_infos";

String TICKER_COL_EXCHANGE = "exchange";
String TICKER_COL_INDUSTRY = "industry";
String TICKER_COL_NAME = "name";
String TICKER_COL_SYMBOL = "symbol";

// Table names

CqlIdentifier STOCKS_MINUTE = CqlIdentifier.fromCql("stocks_by_min");
CqlIdentifier STOCKS_HOUR = CqlIdentifier.fromCql("stocks_by_hour");
CqlIdentifier STOCKS_TICKS = CqlIdentifier.fromCql("stocks_ticks");
CqlIdentifier STOCKS_INFOS = CqlIdentifier.fromCql("stocks_infos");

// Column names

CqlIdentifier EXCHANGE = CqlIdentifier.fromCql("exchange");
CqlIdentifier NAME = CqlIdentifier.fromCql("name");
CqlIdentifier INDUSTRY = CqlIdentifier.fromCql("industry");
CqlIdentifier SYMBOL = CqlIdentifier.fromCql("symbol");
CqlIdentifier VALUE_DATE = CqlIdentifier.fromCql("value_date");
CqlIdentifier VALUE = CqlIdentifier.fromCql("value");
CqlIdentifier OPEN = CqlIdentifier.fromCql("open");
CqlIdentifier CLOSE = CqlIdentifier.fromCql("close");
CqlIdentifier HIGH = CqlIdentifier.fromCql("high");
CqlIdentifier LOW = CqlIdentifier.fromCql("low");
CqlIdentifier VOLUME = CqlIdentifier.fromCql("volume");
}

Loading