dsePassword;
-
- @Value("${dse.localdc : dc1}")
- public String localDc;
-
- @Bean
- public DseSession dseSession() {
- long top = System.currentTimeMillis();
- LOGGER.info("Initializing connection to DSE Cluster");
-
- Builder clusterConfig = new Builder();
- LOGGER.info(" + Contact Points : {}", contactPoints);
- contactPoints.stream().forEach(clusterConfig::addContactPoint);
- LOGGER.info(" + Listening Port : {}", port);
- clusterConfig.withPort(port);
-
- if (dseUsername.isPresent() && dsePassword.isPresent() && dseUsername.get().length() > 0) {
- AuthProvider cassandraAuthProvider =
- new DsePlainTextAuthProvider(dseUsername.get(), dsePassword.get());
- clusterConfig.withAuthProvider(cassandraAuthProvider);
- LOGGER.info(" + With username : {}", dseUsername.get());
- }
-
- // OPTIONS
- clusterConfig.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM));
-
- // Long <-> Timestamp
- clusterConfig.withCodecRegistry(new CodecRegistry().register(new LongToTimeStampCodec()));
-
- try {
- // First Connect without Keyspace (to create if needed)
- DseSession tmpSession = null;
- try {
- tmpSession = clusterConfig.build().connect();
- tmpSession.execute(
- SchemaBuilder.createKeyspace(keyspace)
- .ifNotExists()
- .with()
- .replication(ImmutableMap.of("class", "SimpleStrategy", "replication_factor", 1)));
- LOGGER.info(" + Creating keyspace '{}' (if needed)", keyspace);
- } finally {
- if (tmpSession != null) {
- tmpSession.close();
- }
- }
-
- // Real Connection now
- DseSession dseSession = clusterConfig.build().connect(keyspace);
- LOGGER.info(
- " + Connection established to DSE Cluster \\_0_/ in {} millis.",
- System.currentTimeMillis() - top);
- return dseSession;
- } catch (InvalidQueryException iqe) {
- LOGGER.error(
- "\n-----------------------------------------\n\n"
- + "Keyspace '{}' seems does not exist. \nPlease update 'application.yml' with correct keyspace name or create one with:\n\n"
- + " create keyspace {} WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; \n\nI will create the "
- + "tables I need after that.\n-----------------------------------------",
- keyspace,
- keyspace);
- throw new IllegalStateException("", iqe);
- }
- }
-
- /**
- * Use to create mapper and perform ORM on top of Cassandra tables.
- *
- * @param session current dse session.
- * @return mapper
- */
- @Bean
- public MappingManager mappingManager(DseSession session) {
- // Do not map all fields, only the annotated ones with @Column or @Fields
- PropertyMapper propertyMapper =
- new DefaultPropertyMapper()
- .setPropertyTransienceStrategy(PropertyTransienceStrategy.OPT_IN);
- // Build configuration from mapping
- MappingConfiguration configuration =
- MappingConfiguration.builder().withPropertyMapper(propertyMapper).build();
- // Sample Manager with advance configuration
- return new MappingManager(session, configuration);
- }
-}
diff --git a/kafka-dse-core/src/main/java/com/datastax/demo/conf/DseConstants.java b/kafka-dse-core/src/main/java/com/datastax/demo/conf/DseConstants.java
deleted file mode 100644
index 013c94d..0000000
--- a/kafka-dse-core/src/main/java/com/datastax/demo/conf/DseConstants.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.datastax.demo.conf;
-
-/**
- * Constants in DSE-DB Tables.
- *
- * @author DataStax Evangelist Team
- */
-public interface DseConstants {
-
- /** Table Names in Keyspace (Columns are defined in Beans). */
- String STOCKS_MINUTE = "stocks_by_min";
-
- String STOCKS_HOUR = "stocks_by_hour";
- String STOCKS_DAY = "stocks_by_day";
-
- String STOCKS_TICKS = "stocks_ticks";
- String STOCKS_INFOS = "stocks_infos";
-
- String TICKER_COL_EXCHANGE = "exchange";
- String TICKER_COL_INDUSTRY = "industry";
- String TICKER_COL_NAME = "name";
- String TICKER_COL_SYMBOL = "symbol";
-}
diff --git a/kafka-dse-core/src/main/java/com/datastax/demo/domain/LongToTimeStampCodec.java b/kafka-dse-core/src/main/java/com/datastax/demo/domain/LongToTimeStampCodec.java
deleted file mode 100644
index 3452cdb..0000000
--- a/kafka-dse-core/src/main/java/com/datastax/demo/domain/LongToTimeStampCodec.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.datastax.demo.domain;
-
-import com.datastax.driver.core.TypeCodec;
-import com.datastax.driver.extras.codecs.MappingCodec;
-import java.util.Date;
-
-/**
- * Column expect a blob, attribute is a String, we need a codec here for conversion.
- *
- * In CQL you would be able to use textAsBlob().
- *
- * @author DataStax evangelist team.
- */
-public class LongToTimeStampCodec extends MappingCodec {
-
- /** Default charset will be UTF8. */
- public LongToTimeStampCodec() {
- super(TypeCodec.timestamp(), Long.class);
- }
-
- /** {@inheritDoc} */
- @Override
- protected Long deserialize(Date value) {
- return value.getTime();
- }
-
- /** {@inheritDoc} */
- @Override
- protected Date serialize(Long value) {
- return new Date(value);
- }
-}
diff --git a/kafka-dse-core/src/main/java/com/datastax/demo/domain/Stock1Hour.java b/kafka-dse-core/src/main/java/com/datastax/demo/domain/Stock1Hour.java
deleted file mode 100644
index 19f2763..0000000
--- a/kafka-dse-core/src/main/java/com/datastax/demo/domain/Stock1Hour.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.datastax.demo.domain;
-
-import com.datastax.demo.conf.DseConstants;
-import com.datastax.driver.mapping.annotations.Table;
-
-/** Bean to save into table for minutes aggregation. */
-@Table(name = DseConstants.STOCKS_HOUR)
-public class Stock1Hour extends Stock {
-
- /** Specialization for a dedicated table. */
- private static final long serialVersionUID = 6789940996895471880L;
-
- /** Specialization. */
- public Stock1Hour(Stock parent) {
- super(parent);
- }
-}
diff --git a/kafka-dse-core/src/main/java/com/datastax/demo/domain/Stock1Min.java b/kafka-dse-core/src/main/java/com/datastax/demo/domain/Stock1Min.java
deleted file mode 100644
index e03d5b8..0000000
--- a/kafka-dse-core/src/main/java/com/datastax/demo/domain/Stock1Min.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.datastax.demo.domain;
-
-import com.datastax.demo.conf.DseConstants;
-import com.datastax.driver.mapping.annotations.Table;
-
-/** Bean to save into table for minutes aggregation. */
-@Table(name = DseConstants.STOCKS_MINUTE)
-public class Stock1Min extends Stock {
-
- /** Specialization for a dedicated table. */
- private static final long serialVersionUID = 6789940996895471880L;
-
- /** Specialization. */
- public Stock1Min(Stock parent) {
- super(parent);
- }
-}
diff --git a/kafka-dse-core/src/main/java/com/datastax/demo/utils/FileUtils.java b/kafka-dse-core/src/main/java/com/datastax/demo/utils/FileUtils.java
deleted file mode 100644
index 70a2c71..0000000
--- a/kafka-dse-core/src/main/java/com/datastax/demo/utils/FileUtils.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.datastax.demo.utils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Scanner;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** utility class to parse files. */
-public class FileUtils {
-
- /** Hide default. */
- private FileUtils() {}
-
- /** Load file content as a String. */
- public static String readFileIntoString(String filename) {
- try (Scanner s = new Scanner(new File(filename))) {
- return s.useDelimiter("\\Z").next();
- } catch (FileNotFoundException e) {
- throw new IllegalArgumentException("Cannot find the file", e);
- }
- }
-
- /** Load each line as a row. */
- public static List readFileIntoList(String filename) {
- try (Stream lines = Files.lines(Paths.get(filename))) {
- return lines.collect(Collectors.toList());
- } catch (IOException e1) {
- throw new IllegalArgumentException("Cannot read file", e1);
- }
- }
-}
diff --git a/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/conf/DseConfiguration.java b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/conf/DseConfiguration.java
new file mode 100644
index 0000000..fbd080c
--- /dev/null
+++ b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/conf/DseConfiguration.java
@@ -0,0 +1,100 @@
+package com.datastax.kafkadse.core.conf;
+
+import com.datastax.dse.driver.api.core.DseSession;
+import com.datastax.dse.driver.api.core.DseSessionBuilder;
+import com.datastax.dse.driver.internal.core.auth.DsePlainTextAuthProvider;
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
+import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
+import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoaderBuilder;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.util.StopWatch;
+import org.springframework.util.StringUtils;
+
+/** Connectivity to DSE (cassandra, graph, search). */
+@Configuration
+public class DseConfiguration {
+
+ /** Internal logger. */
+ private static final Logger LOGGER = LoggerFactory.getLogger(DseConfiguration.class);
+
+ @Value("#{'${dse.contactPoints}'.split(',')}")
+ private List contactPoints;
+
+ @Value("${dse.port: 9042}")
+ private int port;
+
+ @Value(
+ "#{T(com.datastax.oss.driver.api.core.CqlIdentifier).fromInternal('${dse.keyspace: demo_kafka}')}")
+ public CqlIdentifier keyspace;
+
+ @Value("${dse.username}")
+ private String dseUsername;
+
+ @Value("${dse.password}")
+ private String dsePassword;
+
+ @Value("${dse.localdc: dc1}")
+ private String localDc;
+
+ @Bean
+ public DseSession dseSession() {
+
+ LOGGER.info("Initializing connection to DSE Cluster");
+ LOGGER.info("Contact Points : {}", contactPoints);
+ LOGGER.info("Listening Port : {}", port);
+ LOGGER.info("Local DC : {}", localDc);
+ LOGGER.info("Keyspace : {}", keyspace);
+
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+
+ DseSessionBuilder sessionBuilder = new DseSessionBuilder().withLocalDatacenter(localDc);
+
+ contactPoints
+ .stream()
+ .map(cp -> InetSocketAddress.createUnresolved(cp, port))
+ .forEach(sessionBuilder::addContactPoint);
+
+ DefaultDriverConfigLoaderBuilder configLoaderBuilder =
+ DefaultDriverConfigLoader.builder()
+ .withString(DefaultDriverOption.REQUEST_CONSISTENCY, "QUORUM")
+ .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30));
+
+ if (!StringUtils.isEmpty(dseUsername) && !StringUtils.isEmpty(dsePassword)) {
+ LOGGER.info("Username : {}", dseUsername);
+ configLoaderBuilder
+ .withString(
+ DefaultDriverOption.AUTH_PROVIDER_CLASS, DsePlainTextAuthProvider.class.getName())
+ .withString(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, dseUsername)
+ .withString(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, dsePassword);
+ }
+
+ sessionBuilder.withConfigLoader(configLoaderBuilder.build());
+
+ // First Connect without Keyspace (to create it if needed)
+ try (DseSession tempSession = sessionBuilder.build()) {
+ LOGGER.info("Creating keyspace {} (if needed)", keyspace);
+ SimpleStatement createKeyspace =
+ SchemaBuilder.createKeyspace(keyspace).ifNotExists().withSimpleStrategy(1).build();
+ tempSession.execute(createKeyspace);
+ }
+
+ // Now create the actual session
+ DseSession dseSession = sessionBuilder.withKeyspace(keyspace).build();
+ stopWatch.stop();
+ LOGGER.info(
+ "Connection established to DSE Cluster \\_0_/ in {} seconds.",
+ stopWatch.getTotalTimeSeconds());
+ return dseSession;
+ }
+}
diff --git a/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/conf/DseConstants.java b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/conf/DseConstants.java
new file mode 100644
index 0000000..0dab8ac
--- /dev/null
+++ b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/conf/DseConstants.java
@@ -0,0 +1,32 @@
+package com.datastax.kafkadse.core.conf;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+
+/**
+ * Constants in DSE-DB Tables.
+ *
+ * @author DataStax Evangelist Team
+ */
+public interface DseConstants {
+
+ // Table names
+
+ CqlIdentifier STOCKS_MINUTE = CqlIdentifier.fromCql("stocks_by_min");
+ CqlIdentifier STOCKS_HOUR = CqlIdentifier.fromCql("stocks_by_hour");
+ CqlIdentifier STOCKS_TICKS = CqlIdentifier.fromCql("stocks_ticks");
+ CqlIdentifier STOCKS_INFOS = CqlIdentifier.fromCql("stocks_infos");
+
+ // Column names
+
+ CqlIdentifier EXCHANGE = CqlIdentifier.fromCql("exchange");
+ CqlIdentifier NAME = CqlIdentifier.fromCql("name");
+ CqlIdentifier INDUSTRY = CqlIdentifier.fromCql("industry");
+ CqlIdentifier SYMBOL = CqlIdentifier.fromCql("symbol");
+ CqlIdentifier VALUE_DATE = CqlIdentifier.fromCql("value_date");
+ CqlIdentifier VALUE = CqlIdentifier.fromCql("value");
+ CqlIdentifier OPEN = CqlIdentifier.fromCql("open");
+ CqlIdentifier CLOSE = CqlIdentifier.fromCql("close");
+ CqlIdentifier HIGH = CqlIdentifier.fromCql("high");
+ CqlIdentifier LOW = CqlIdentifier.fromCql("low");
+ CqlIdentifier VOLUME = CqlIdentifier.fromCql("volume");
+}
diff --git a/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/dao/DseDao.java b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/dao/DseDao.java
new file mode 100644
index 0000000..6b6cecb
--- /dev/null
+++ b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/dao/DseDao.java
@@ -0,0 +1,130 @@
+package com.datastax.kafkadse.core.dao;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
+import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
+import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;
+
+import com.datastax.dse.driver.api.core.DseSession;
+import com.datastax.kafkadse.core.conf.DseConstants;
+import com.datastax.kafkadse.core.domain.StockInfo;
+import com.datastax.kafkadse.core.domain.StockTick;
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import javax.annotation.PostConstruct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public class DseDao implements DseConstants {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DseDao.class);
+
+ @Autowired private DseSession dseSession;
+
+ private CqlIdentifier keyspace;
+
+ private PreparedStatement insertIntoStockInfos;
+ private PreparedStatement insertIntoStockTicks;
+
+ @PostConstruct
+ public void createOrUpdateSchema() {
+ keyspace = dseSession.getKeyspace().orElseThrow(IllegalStateException::new);
+ createTableStockInfosIfNotExists();
+ createTableStockTicksIfNotExists();
+ prepareStatements();
+ LOGGER.info("Connection established to DSE and schema successfully created or updated.");
+ }
+
+ /** Metadata table (Home page for webUI) */
+ private void createTableStockInfosIfNotExists() {
+ dseSession.execute(
+ createTable(STOCKS_INFOS)
+ .ifNotExists()
+ .withPartitionKey(EXCHANGE, DataTypes.TEXT)
+ .withClusteringColumn(NAME, DataTypes.TEXT)
+ .withColumn(INDUSTRY, DataTypes.TEXT)
+ .withColumn(SYMBOL, DataTypes.TEXT)
+ .withClusteringOrder(NAME, ClusteringOrder.ASC)
+ .build());
+ LOGGER.info(" + Table {} created in keyspace {} (if needed)", STOCKS_INFOS, keyspace);
+ }
+
+ /** Random ticks where seed is last AlphaVantage */
+ private void createTableStockTicksIfNotExists() {
+ dseSession.execute(
+ createTable(STOCKS_TICKS)
+ .ifNotExists()
+ .withPartitionKey(SYMBOL, DataTypes.TEXT)
+ .withClusteringColumn(VALUE_DATE, DataTypes.TIMESTAMP)
+ .withColumn(VALUE, DataTypes.DOUBLE)
+ .withClusteringOrder(VALUE_DATE, ClusteringOrder.DESC)
+ .build());
+ LOGGER.info(" + Table {} created in keyspace {} (if needed)", STOCKS_TICKS, keyspace);
+ }
+
+ private void prepareStatements() {
+ insertIntoStockInfos =
+ dseSession.prepare(
+ insertInto(STOCKS_INFOS)
+ .value(EXCHANGE, bindMarker(EXCHANGE))
+ .value(NAME, bindMarker(NAME))
+ .value(INDUSTRY, bindMarker(INDUSTRY))
+ .value(SYMBOL, bindMarker(SYMBOL))
+ .build());
+ insertIntoStockTicks =
+ dseSession.prepare(
+ insertInto(STOCKS_TICKS)
+ .value(SYMBOL, bindMarker(SYMBOL))
+ .value(VALUE_DATE, bindMarker(VALUE_DATE))
+ .value(VALUE, bindMarker(VALUE))
+ .build());
+ }
+
+ public CompletionStage saveTickerAsync(StockTick tick) {
+ return dseSession
+ .executeAsync(
+ insertIntoStockTicks
+ .boundStatementBuilder()
+ .setString(SYMBOL, tick.getSymbol())
+ .setInstant(VALUE_DATE, tick.getValueDate())
+ .setDouble(VALUE, tick.getValue())
+ .build())
+ .thenApply(rs -> tick);
+ }
+
+ public CompletionStage saveStockInfoAsync(StockInfo info) {
+ return dseSession
+ .executeAsync(
+ insertIntoStockInfos
+ .boundStatementBuilder()
+ .setString(EXCHANGE, info.getExchange())
+ .setString(NAME, info.getName())
+ .setString(INDUSTRY, info.getIndustry())
+ .setString(SYMBOL, info.getSymbol())
+ .build())
+ .thenApply(rs -> info);
+ }
+
+ public Set getSymbolsNYSE() {
+ return dseSession
+ .execute(
+ selectFrom(STOCKS_INFOS)
+ .column(SYMBOL)
+ .where(column(EXCHANGE).isEqualTo(literal("NYSE")))
+ .build())
+ .all()
+ .stream()
+ .map(row -> row.getString(SYMBOL))
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/kafka-dse-core/src/main/java/com/datastax/demo/domain/Stock.java b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/Stock.java
similarity index 52%
rename from kafka-dse-core/src/main/java/com/datastax/demo/domain/Stock.java
rename to kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/Stock.java
index fa2937b..78a0510 100644
--- a/kafka-dse-core/src/main/java/com/datastax/demo/domain/Stock.java
+++ b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/Stock.java
@@ -1,11 +1,10 @@
-package com.datastax.demo.domain;
+package com.datastax.kafkadse.core.domain;
-import com.datastax.driver.mapping.annotations.ClusteringColumn;
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.PartitionKey;
-import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
-import java.util.Date;
+import java.time.Instant;
+import java.util.Objects;
/** POJO Representing stock from Alpha Vantage. */
public class Stock implements Serializable {
@@ -14,41 +13,53 @@ public class Stock implements Serializable {
private static final long serialVersionUID = -5240591446495279713L;
/** Stock symbol. */
- @PartitionKey private String symbol;
+ private String symbol;
/** timestamp. */
- @ClusteringColumn
- @Column(name = "value_date")
- @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss")
- private Date valueDate;
+ private Instant valueDate;
- /** value at begining of period. */
- @Column private double open;
+ /** value at beginning of period. */
+ private double open;
/** value at end of period. */
- @Column private double close;
+ private double close;
/** low value. */
- @Column private double low;
+ private double low;
/** high value. */
- @Column private double high;
-
- /** number exchanged. */
- @Column private long volume;
-
- /** Default constructor (unmarshalling) */
- public Stock() {}
+ private double high;
+
+ /** volume exchanged. */
+ private long volume;
+
+ @JsonCreator
+ public Stock(
+ @JsonProperty("symbol") String symbol,
+ @JsonProperty("valueDate") Instant valueDate,
+ @JsonProperty("open") double open,
+ @JsonProperty("close") double close,
+ @JsonProperty("low") double low,
+ @JsonProperty("high") double high,
+ @JsonProperty("volume") long volume) {
+ this.symbol = symbol;
+ this.valueDate = valueDate;
+ this.open = open;
+ this.close = close;
+ this.low = low;
+ this.high = high;
+ this.volume = volume;
+ }
/** Copy constructor (specialization) */
- public Stock(Stock parent) {
- this.valueDate = parent.getValueDate();
- this.high = parent.getHigh();
- this.low = parent.getLow();
- this.open = parent.getOpen();
- this.close = parent.getClose();
- this.volume = parent.getVolume();
- this.symbol = parent.getSymbol();
+ public Stock(Stock toCopy) {
+ this.symbol = toCopy.getSymbol();
+ this.valueDate = toCopy.getValueDate();
+ this.open = toCopy.getOpen();
+ this.close = toCopy.getClose();
+ this.high = toCopy.getHigh();
+ this.low = toCopy.getLow();
+ this.volume = toCopy.getVolume();
}
/**
@@ -56,7 +67,7 @@ public Stock(Stock parent) {
*
* @return current value of 'valueDate'
*/
- public Date getValueDate() {
+ public Instant getValueDate() {
return valueDate;
}
@@ -65,7 +76,7 @@ public Date getValueDate() {
*
* @param valueDate new value for 'valueDate '
*/
- public void setValueDate(Date valueDate) {
+ public void setValueDate(Instant valueDate) {
this.valueDate = valueDate;
}
@@ -176,4 +187,48 @@ public String getSymbol() {
public void setSymbol(String symbol) {
this.symbol = symbol;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Stock stock = (Stock) o;
+ return Double.compare(stock.open, open) == 0
+ && Double.compare(stock.close, close) == 0
+ && Double.compare(stock.low, low) == 0
+ && Double.compare(stock.high, high) == 0
+ && volume == stock.volume
+ && symbol.equals(stock.symbol)
+ && valueDate.equals(stock.valueDate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(symbol, valueDate, open, close, low, high, volume);
+ }
+
+ @Override
+ public String toString() {
+ return "Stock{"
+ + "symbol='"
+ + symbol
+ + '\''
+ + ", valueDate="
+ + valueDate
+ + ", open="
+ + open
+ + ", close="
+ + close
+ + ", low="
+ + low
+ + ", high="
+ + high
+ + ", volume="
+ + volume
+ + '}';
+ }
}
diff --git a/kafka-dse-core/src/main/java/com/datastax/demo/domain/StockInfo.java b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/StockInfo.java
similarity index 53%
rename from kafka-dse-core/src/main/java/com/datastax/demo/domain/StockInfo.java
rename to kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/StockInfo.java
index c747e9f..32465a4 100644
--- a/kafka-dse-core/src/main/java/com/datastax/demo/domain/StockInfo.java
+++ b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/StockInfo.java
@@ -1,34 +1,39 @@
-package com.datastax.demo.domain;
+package com.datastax.kafkadse.core.domain;
-import static com.datastax.demo.conf.DseConstants.STOCKS_INFOS;
-
-import com.datastax.driver.mapping.annotations.ClusteringColumn;
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.PartitionKey;
-import com.datastax.driver.mapping.annotations.Table;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
+import java.util.Objects;
/** Value for Ticks. */
-@Table(name = STOCKS_INFOS)
public class StockInfo implements Serializable {
/** serial. */
private static final long serialVersionUID = 5806346188526710465L;
/** value. */
- @PartitionKey private String exchange;
+ private String exchange;
/** Value Date. */
- @ClusteringColumn private String name;
+ private String name;
/** code. */
- @Column private String symbol;
+ private String symbol;
/** value. */
- @Column private String industry;
-
- /** Default Constructor */
- public StockInfo() {}
+ private String industry;
+
+ @JsonCreator
+ public StockInfo(
+ @JsonProperty("exchange") String exchange,
+ @JsonProperty("name") String name,
+ @JsonProperty("symbol") String symbol,
+ @JsonProperty("industry") String industry) {
+ this.exchange = exchange;
+ this.name = name;
+ this.symbol = symbol;
+ this.industry = industry;
+ }
/**
* Getter accessor for attribute 'symbol'.
@@ -101,4 +106,42 @@ public String getExchange() {
public void setExchange(String exchange) {
this.exchange = exchange;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StockInfo stockInfo = (StockInfo) o;
+ return exchange.equals(stockInfo.exchange)
+ && name.equals(stockInfo.name)
+ && symbol.equals(stockInfo.symbol)
+ && industry.equals(stockInfo.industry);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(exchange, name, symbol, industry);
+ }
+
+ @Override
+ public String toString() {
+ return "StockInfo{"
+ + "exchange='"
+ + exchange
+ + '\''
+ + ", name='"
+ + name
+ + '\''
+ + ", symbol='"
+ + symbol
+ + '\''
+ + ", industry='"
+ + industry
+ + '\''
+ + '}';
+ }
}
diff --git a/kafka-dse-core/src/main/java/com/datastax/demo/domain/StockTick.java b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/StockTick.java
similarity index 51%
rename from kafka-dse-core/src/main/java/com/datastax/demo/domain/StockTick.java
rename to kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/StockTick.java
index 150653a..d2d6664 100644
--- a/kafka-dse-core/src/main/java/com/datastax/demo/domain/StockTick.java
+++ b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/StockTick.java
@@ -1,40 +1,33 @@
-package com.datastax.demo.domain;
+package com.datastax.kafkadse.core.domain;
-import static com.datastax.demo.conf.DseConstants.STOCKS_TICKS;
-
-import com.datastax.driver.mapping.annotations.ClusteringColumn;
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.PartitionKey;
-import com.datastax.driver.mapping.annotations.Table;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
+import java.time.Instant;
+import java.util.Objects;
/** Value for Ticks. */
-@Table(name = STOCKS_TICKS)
public class StockTick implements Serializable {
/** serial. */
private static final long serialVersionUID = 5806346188526710465L;
/** code. */
- @PartitionKey private String symbol;
+ private String symbol;
/** Value Date. */
- @ClusteringColumn private long valueDate;
+ private Instant valueDate;
/** value. */
- @Column private double value;
-
- /** Default Constructor */
- public StockTick() {}
-
- /** Constructor with parameters. */
- public StockTick(String tickSymbol, double value) {
- this(tickSymbol, value, System.currentTimeMillis());
- }
+ private double value;
/** Constructor with parameters. */
- public StockTick(String tickSymbol, double value, long valueDate) {
- this.symbol = tickSymbol;
+ @JsonCreator
+ public StockTick(
+ @JsonProperty("symbol") String symbol,
+ @JsonProperty("valueDate") Instant valueDate,
+ @JsonProperty("value") double value) {
+ this.symbol = symbol;
this.value = value;
this.valueDate = valueDate;
}
@@ -62,7 +55,7 @@ public void setValue(double value) {
*
* @return current value of 'valueDate'
*/
- public long getValueDate() {
+ public Instant getValueDate() {
return valueDate;
}
@@ -71,7 +64,7 @@ public long getValueDate() {
*
* @param valueDate new value for 'valueDate '
*/
- public void setValueDate(long valueDate) {
+ public void setValueDate(Instant valueDate) {
this.valueDate = valueDate;
}
@@ -92,4 +85,36 @@ public String getSymbol() {
public void setSymbol(String symbol) {
this.symbol = symbol;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StockTick stockTick = (StockTick) o;
+ return Double.compare(stockTick.value, value) == 0
+ && symbol.equals(stockTick.symbol)
+ && valueDate.equals(stockTick.valueDate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(symbol, valueDate, value);
+ }
+
+ @Override
+ public String toString() {
+ return "StockTick{"
+ + "symbol='"
+ + symbol
+ + '\''
+ + ", valueDate="
+ + valueDate
+ + ", value="
+ + value
+ + '}';
+ }
}
diff --git a/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/package-info.java b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/package-info.java
new file mode 100644
index 0000000..8bd644b
--- /dev/null
+++ b/kafka-dse-core/src/main/java/com/datastax/kafkadse/core/domain/package-info.java
@@ -0,0 +1,6 @@
+@NonNullApi
+@NonNullFields
+package com.datastax.kafkadse.core.domain;
+
+import org.springframework.lang.NonNullApi;
+import org.springframework.lang.NonNullFields;
diff --git a/kafka-dse-core/src/main/resources/cql/create_schema.cql b/kafka-dse-core/src/main/resources/cql/create_schema.cql
index 6512cd5..a035b94 100644
--- a/kafka-dse-core/src/main/resources/cql/create_schema.cql
+++ b/kafka-dse-core/src/main/resources/cql/create_schema.cql
@@ -4,8 +4,8 @@ use demo_sdc;
CREATE TABLE stocks_ticks (
symbol text,
- valueDate timestamp,
+ value_date timestamp,
value double,
- PRIMARY KEY (symbol, valueDate)
-) WITH CLUSTERING ORDER BY (valueDate DESC);
+ PRIMARY KEY (symbol, value_date)
+) WITH CLUSTERING ORDER BY (value_date DESC);
diff --git a/kafka-dse-producer/.DS_Store b/kafka-dse-producer/.DS_Store
deleted file mode 100644
index 9a874b5..0000000
Binary files a/kafka-dse-producer/.DS_Store and /dev/null differ
diff --git a/kafka-dse-producer/.classpath b/kafka-dse-producer/.classpath
deleted file mode 100644
index 0ca1374..0000000
--- a/kafka-dse-producer/.classpath
+++ /dev/null
@@ -1,55 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/kafka-dse-producer/.factorypath b/kafka-dse-producer/.factorypath
index 978be56..66fa196 100644
--- a/kafka-dse-producer/.factorypath
+++ b/kafka-dse-producer/.factorypath
@@ -16,16 +16,49 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
+
@@ -33,47 +66,13 @@
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
@@ -83,10 +82,8 @@
-
-
@@ -102,34 +99,7 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -155,17 +125,15 @@
-
-
-
+
@@ -173,19 +141,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-