diff --git a/pom.xml b/pom.xml
index 5e34e7d..2c5ea20 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,8 @@
UTF-8
http://packages.confluent.io/maven/
2.6.3
+ 5.1.6
+ 9.4.1209
@@ -173,8 +175,13 @@
mysql
- mysql-connector-java
- 5.1.6
+ mysql-connector-java
+ ${mysql.version}
+
+
+ org.postgresql
+ postgresql
+ ${postgresql.version}
com.fasterxml.jackson.core
diff --git a/src/main/java/com/qubole/streamx/s3/S3Util.java b/src/main/java/com/qubole/streamx/s3/S3Util.java
new file mode 100644
index 0000000..96dc797
--- /dev/null
+++ b/src/main/java/com/qubole/streamx/s3/S3Util.java
@@ -0,0 +1,15 @@
+package com.qubole.streamx.s3;
+
+/**
+ * Static class for S3 utility functions
+ */
+public class S3Util {
+ /**
+ * Simply clean topic name for creating database.
+ * @param topicName
+ * @return
+ */
+ public static String cleanTopicNameForDBWal(String topicName){
+ return topicName.toLowerCase().replaceAll("\\W", "_");
+ }
+}
diff --git a/src/main/java/com/qubole/streamx/s3/wal/AbstractDBWALAccessor.java b/src/main/java/com/qubole/streamx/s3/wal/AbstractDBWALAccessor.java
new file mode 100644
index 0000000..e3c97c2
--- /dev/null
+++ b/src/main/java/com/qubole/streamx/s3/wal/AbstractDBWALAccessor.java
@@ -0,0 +1,191 @@
+package com.qubole.streamx.s3.wal;
+
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/**
+ * Abstract class for accessing DB Wal.
+ */
+public abstract class AbstractDBWALAccessor implements DBWALAccessor{
+ private static final Logger log = LoggerFactory.getLogger(AbstractDBWALAccessor.class);
+ public static final String LEASE_TABLE_NAME = "streamx_lease";
+
+ /**
+ * Factory class for initiating {@link AbstractDBWALAccessor}
+ */
+ private static class DBWALAccessorFactory{
+ private static final Logger log = LoggerFactory.getLogger(DBWALAccessorFactory.class);
+ private static AbstractDBWALAccessor getInstance(String connectionUrl, String user, String password, String tableName) throws SQLException{
+ String[] splitted = connectionUrl.split(":");
+
+ // check if the format is wrong
+ if (splitted.length < 3){
+ throw new IllegalArgumentException("incorrect database connectionUrl for " + connectionUrl);
+ }
+
+ String jdbcType = splitted[1].toLowerCase(); //lowercased to prevent mistakes
+ log.info("Initializing DBWAL with jdbcType: " + jdbcType);
+ // still using the old way to load jdbc class
+ try{
+ if ("postgresql".equals(jdbcType)){
+ Class.forName("org.postgresql.Driver");
+ return new PostgresqlWALAccessor(connectionUrl, user, password, tableName);
+ } else if ("mysql".equals(jdbcType)){
+ Class.forName("com.mysql.jdbc.Driver");
+ return new MysqlWALAccessor(connectionUrl, user, password, tableName);
+ }
+ }catch (ClassNotFoundException e) {
+ log.error("cannot find suitable class for: {}", jdbcType, e);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Get the correct instance of {@link AbstractDBWALAccessor}
+ * @param connectionUrl
+ * @param user
+ * @param password
+ * @return
+ */
+ public static AbstractDBWALAccessor getInstance(String connectionUrl, String user, String password, String tableName) throws SQLException{
+ return DBWALAccessorFactory.getInstance(connectionUrl, user, password, tableName);
+ }
+
+ protected Connection connection;
+ protected final String _user;
+ protected final String _connectionURL;
+ protected final String tableName;
+
+ public AbstractDBWALAccessor(String connectionURL, String user, String password, String tableName) throws SQLException{
+ _connectionURL = connectionURL;
+ _user = user;
+ this.tableName = tableName;
+ connection = DriverManager.getConnection(_connectionURL, _user, password);
+ connection.setAutoCommit(false);
+ }
+
+ public String getUser() {
+ return _user;
+ }
+
+ public String getConnectionURL() {
+ return _connectionURL;
+ }
+
+ @Override
+ public void createTableIfNotExists(String destTableName, String sql) throws SQLException {
+ Statement statement = connection.createStatement();
+ statement.setQueryTimeout(30); // set timeout to 30 sec.
+ DatabaseMetaData dbm = connection.getMetaData();
+ ResultSet tables = dbm.getTables(null, null, destTableName, null);
+ if (tables.next()) {
+ // No op
+ log.info("Table " + destTableName + " has already exists");
+ }
+ else {
+ log.info("Creating table "+ sql);
+ statement.executeUpdate(sql);
+ connection.commit();
+ }
+ }
+
+ @Override
+ public void createLeaseTable() throws SQLException {
+ String sql = String.format("CREATE TABLE `%s` (\n" +
+ " `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n" +
+ " `id` int(11) DEFAULT NULL,\n" +
+ " `wal` VARCHAR(500)" +
+ ") ", AbstractDBWALAccessor.LEASE_TABLE_NAME);
+ createTableIfNotExists(AbstractDBWALAccessor.LEASE_TABLE_NAME, sql);
+ }
+
+ @Override
+ public void createWalTable(String tablePrefixName, TopicPartition topicPartition) throws SQLException {
+ String sql = String.format(
+ "create table %s (id INT AUTO_INCREMENT, tempFiles VARCHAR(500), committedFiles VARCHAR(500), primary key (id))",
+ tableName
+ );
+ createTableIfNotExists(tableName, sql);
+ }
+
+ @Override
+ public void insertCommitedFile(String tempFile, String commitedFile) throws SQLException {
+ Statement statement = connection.createStatement();
+ statement.setQueryTimeout(30); // set timeout to 30 sec.
+
+ String sql = String.format("insert into %s (tempFiles,committedFiles) values ('%s','%s')", tableName, tempFile, commitedFile);
+ log.info("committing " + sql);
+ statement.executeUpdate(sql);
+ connection.commit();
+ }
+
+ @Override
+ public ResultSet getLastResultSetFromWalTable() throws SQLException{
+ Statement statement = connection.createStatement();
+ statement.setQueryTimeout(30); // set timeout to 30 sec.
+
+ String sql = String.format("select * from %s order by id desc limit 1", tableName);
+ log.info("Reading wal " + sql);
+ return statement.executeQuery(sql);
+ }
+
+ @Override
+ public ResultSet getLastNResultsetFromWalTable(int n) throws SQLException{
+ Statement statement = connection.createStatement();
+ statement.setQueryTimeout(30); // set timeout to 30 sec.
+
+ String sql = String.format("select * from %s order by id desc limit %s", tableName, n);
+ log.info("Reading wal " + sql);
+ return statement.executeQuery(sql);
+ }
+
+ @Override
+ public void truncateTableLessThanId(String id) throws SQLException{
+ Statement statement = connection.createStatement();
+ String sql = String.format("delete from %s where id < %s", tableName, id);
+
+ log.info("truncating table " + sql);
+
+ statement.executeUpdate(sql);
+ connection.commit();
+ }
+
+
+ @Override
+ public ResultSet getLeaseResultSetLockedRow() throws SQLException{
+ Statement statement = connection.createStatement();
+ statement.setQueryTimeout(5); // set timeout to 30 sec.
+ String sql = String.format("select now() as currentTS,l1.* from %s as l1 where wal = '%s' for update", LEASE_TABLE_NAME, tableName);
+
+ return statement.executeQuery(sql);
+ }
+
+ @Override
+ public void insertLeaseTableRow(int threadId) throws SQLException{
+ Statement statement = connection.createStatement();
+ statement.setQueryTimeout(5); // set timeout to 30 sec.
+
+ String sql = String.format("insert into %s(id,wal) values (%s,'%s')", LEASE_TABLE_NAME, threadId, tableName);
+ statement.executeUpdate(sql);
+ connection.commit();
+ }
+
+ @Override
+ public void updateLeaseTableRow(int threadId) throws SQLException{
+ Statement statement = connection.createStatement();
+ statement.setQueryTimeout(5); // set timeout to 30 sec.
+
+ String sql = String.format("update %s set id=%s,ts=now() where wal='%s'", LEASE_TABLE_NAME, threadId, tableName);
+ statement.executeUpdate(sql);
+ connection.commit();
+ }
+}
diff --git a/src/main/java/com/qubole/streamx/s3/wal/DBWAL.java b/src/main/java/com/qubole/streamx/s3/wal/DBWAL.java
index 805eb51..d687e4d 100644
--- a/src/main/java/com/qubole/streamx/s3/wal/DBWAL.java
+++ b/src/main/java/com/qubole/streamx/s3/wal/DBWAL.java
@@ -15,29 +15,25 @@
package com.qubole.streamx.s3.wal;
import com.qubole.streamx.s3.S3SinkConnectorConfig;
+import com.qubole.streamx.s3.S3Util;
import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
+import io.confluent.connect.hdfs.storage.Storage;
+import io.confluent.connect.hdfs.wal.WAL;
import org.apache.hadoop.util.StringUtils;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
-import io.confluent.connect.hdfs.wal.WAL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.DatabaseMetaData;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import io.confluent.connect.hdfs.storage.Storage;
-
public class DBWAL implements WAL {
private static final Logger log = LoggerFactory.getLogger(DBWAL.class);
String tableName;
@@ -49,41 +45,28 @@ public class DBWAL implements WAL {
int id = ThreadLocalRandom.current().nextInt(1, 100000 + 1);
HdfsSinkConnectorConfig config;
String lease_table = "streamx_lease";
+ AbstractDBWALAccessor dbwalAccessor;
public DBWAL(String logsDir, TopicPartition topicPartition, Storage storage, HdfsSinkConnectorConfig config) {
this.storage = storage;
this.config = config;
- try {
- Class.forName("com.mysql.jdbc.Driver");
- }catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
-
partitionId = topicPartition.partition();
try {
String name = config.getString(S3SinkConnectorConfig.NAME_CONFIG);
- tableName = name + "_" + topicPartition.topic() + "_" + partitionId;
-
+ tableName = name + "_" + S3Util.cleanTopicNameForDBWal(topicPartition.topic()) + "_" + partitionId;
+
String connectionURL = config.getString(S3SinkConnectorConfig.DB_CONNECTION_URL_CONFIG);
String user = config.getString(S3SinkConnectorConfig.DB_USER_CONFIG);
String password = config.getString(S3SinkConnectorConfig.DB_PASSWORD_CONFIG);
if(connectionURL.length()==0 || user.length()==0 || password.length()==0)
throw new ConnectException("db.connection.url,db.user,db.password - all three properties must be specified");
+ log.info("jdbc wal connecting to " + connectionURL);
+ dbwalAccessor = AbstractDBWALAccessor.getInstance(connectionURL, user, password, tableName);
connection = DriverManager.getConnection(connectionURL, user, password);
- connection.setAutoCommit(false);
-
- String sql = String.format("create table %s (id INT AUTO_INCREMENT, tempFiles VARCHAR(500), committedFiles VARCHAR(500), primary key (id))", tableName);
- createIfNotExists(tableName, sql);
-
- sql = String.format("CREATE TABLE `%s` (\n" +
- " `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n" +
- " `id` int(11) DEFAULT NULL,\n" +
- " `wal` VARCHAR(500)" +
- ") ", lease_table);
- createIfNotExists("streamx_lease", sql);
-
+ dbwalAccessor.createWalTable(name, topicPartition);
+ dbwalAccessor.createLeaseTable();
}catch (SQLException e) {
log.error(e.toString());
@@ -91,46 +74,21 @@ public DBWAL(String logsDir, TopicPartition topicPartition, Storage storage, Hdf
}
}
- private void createIfNotExists(String tableName, String sql) throws SQLException {
- Statement statement = connection.createStatement();
- statement.setQueryTimeout(30); // set timeout to 30 sec.
- DatabaseMetaData dbm = connection.getMetaData();
- ResultSet tables = dbm.getTables(null, null, tableName, null);
-
- if (tables.next()) {
- // No op
- }
- else {
- log.info("Creating table "+ sql);
- statement.executeUpdate(sql);
- connection.commit();
- }
- }
-
@Override
public void acquireLease() throws ConnectException {
long sleepIntervalMs = 1000L;
long MAX_SLEEP_INTERVAL_MS = 16000L;
while (sleepIntervalMs < MAX_SLEEP_INTERVAL_MS) {
-
try {
- Statement statement = connection.createStatement();
- statement.setQueryTimeout(5); // set timeout to 30 sec.
- String sql = String.format("select now() as currentTS,l1.* from %s as l1 where wal = '%s' for update", lease_table, tableName);
-
- ResultSet rs = statement.executeQuery(sql);
+ ResultSet rs = dbwalAccessor.getLeaseResultSetLockedRow();
if(!rs.next()) {
- sql = String.format("insert into %s(id,wal) values (%s,'%s')", lease_table, id, tableName);
- statement.executeUpdate(sql);
- connection.commit();
+ dbwalAccessor.insertLeaseTableRow(id);
return;
}
if(canAcquireLock(rs)) {
- sql = String.format("update %s set id=%s,ts=now() where wal='%s'", lease_table, id, tableName);
- statement.executeUpdate(sql);
- connection.commit();
+ dbwalAccessor.updateLeaseTableRow(id);
return;
}
connection.commit();
@@ -174,23 +132,16 @@ private boolean canAcquireLock(ResultSet rs) {
@Override
public void append(String tempFile, String committedFile) throws ConnectException {
try {
- if(tempFile==WAL.beginMarker) {
+ if(WAL.beginMarker.equals(tempFile)) {
tempFiles.clear();
committedFiles.clear();
}
- else if(tempFile==WAL.endMarker) {
+ else if(WAL.endMarker.equals(tempFile)) {
String tempFilesCommaSeparated = StringUtils.join(",",tempFiles);
String committedFilesCommaSeparated = StringUtils.join(",",committedFiles);
acquireLease();
-
- Statement statement = connection.createStatement();
- statement.setQueryTimeout(30); // set timeout to 30 sec.
-
- String sql = String.format("insert into %s (tempFiles,committedFiles) values ('%s','%s')", tableName, tempFilesCommaSeparated, committedFilesCommaSeparated);
- log.info("committing " + sql);
- statement.executeUpdate(sql);
- connection.commit();
+ dbwalAccessor.insertCommitedFile(tempFilesCommaSeparated, committedFilesCommaSeparated);
}
else {
tempFiles.add(tempFile);
@@ -206,13 +157,7 @@ else if(tempFile==WAL.endMarker) {
public void apply() throws ConnectException {
try {
acquireLease();
-
- Statement statement = connection.createStatement();
- statement.setQueryTimeout(30); // set timeout to 30 sec.
-
- String sql = String.format("select * from %s order by id desc limit 1", tableName);
- log.info("Reading wal " + sql);
- ResultSet rs=statement.executeQuery(sql);
+ ResultSet rs = dbwalAccessor.getLastResultSetFromWalTable();
while(rs.next()) {
String tempFiles = rs.getString("tempFiles");
@@ -239,10 +184,7 @@ public void apply() throws ConnectException {
@Override
public void truncate() throws ConnectException {
try {
- Statement statement = connection.createStatement();
- statement.setQueryTimeout(30); // set timeout to 30 sec.
- String sql = String.format("select * from %s order by id desc limit 2", tableName);
- ResultSet rs = statement.executeQuery(sql);
+ ResultSet rs = dbwalAccessor.getLastNResultsetFromWalTable(2);
int rows = 0;
while(rs.next()){
rows++;
@@ -251,10 +193,7 @@ public void truncate() throws ConnectException {
return;
rs.absolute(2);
String id = rs.getString("id");
- sql = String.format("delete from %s where id < %s", tableName, id);
- log.info("truncating table " + sql);
- statement.executeUpdate(sql);
- connection.commit();
+ dbwalAccessor.truncateTableLessThanId(id);
}catch (SQLException e){
log.error(e.toString());
throw new ConnectException(e);
@@ -329,10 +268,7 @@ private boolean checkFileExists(String files[]) {
private ResultSet fetch() throws ConnectException {
try {
- Statement statement = connection.createStatement();
- statement.setQueryTimeout(30); // set timeout to 30 sec.
- String sql = String.format("select * from %s order by id desc limit 2", tableName);
- ResultSet rs = statement.executeQuery(sql);
+ ResultSet rs = dbwalAccessor.getLastNResultsetFromWalTable(2);
return rs;
}catch (SQLException e){
log.error(e.toString());
diff --git a/src/main/java/com/qubole/streamx/s3/wal/DBWALAccessor.java b/src/main/java/com/qubole/streamx/s3/wal/DBWALAccessor.java
new file mode 100644
index 0000000..294ce3a
--- /dev/null
+++ b/src/main/java/com/qubole/streamx/s3/wal/DBWALAccessor.java
@@ -0,0 +1,22 @@
+package com.qubole.streamx.s3.wal;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * DBWalAccessor
+ */
+public interface DBWALAccessor {
+ void createTableIfNotExists(String tableName, String sql) throws SQLException;
+ void createLeaseTable() throws SQLException;
+ void createWalTable(String tablePrefixName, TopicPartition topicPartition) throws SQLException;
+ void insertCommitedFile(String tempFile, String commitedFile) throws SQLException;
+ ResultSet getLastResultSetFromWalTable() throws SQLException;
+ ResultSet getLastNResultsetFromWalTable(int n) throws SQLException;
+ void truncateTableLessThanId(String id) throws SQLException;
+ ResultSet getLeaseResultSetLockedRow() throws SQLException;
+ void insertLeaseTableRow(int threadId) throws SQLException;
+ void updateLeaseTableRow(int threadId) throws SQLException;
+}
diff --git a/src/main/java/com/qubole/streamx/s3/wal/MysqlWALAccessor.java b/src/main/java/com/qubole/streamx/s3/wal/MysqlWALAccessor.java
new file mode 100644
index 0000000..abc934d
--- /dev/null
+++ b/src/main/java/com/qubole/streamx/s3/wal/MysqlWALAccessor.java
@@ -0,0 +1,20 @@
+package com.qubole.streamx.s3.wal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+
+/**
+ * DBWAL Accessor for Mysql.
+ */
+public class MysqlWALAccessor extends AbstractDBWALAccessor {
+ private static final Logger log = LoggerFactory.getLogger(MysqlWALAccessor.class);
+
+ public MysqlWALAccessor(String connectionURL, String user, String password, String tableName) throws SQLException {
+ super(connectionURL, user, password, tableName);
+ }
+
+
+
+}
diff --git a/src/main/java/com/qubole/streamx/s3/wal/PostgresqlWALAccessor.java b/src/main/java/com/qubole/streamx/s3/wal/PostgresqlWALAccessor.java
new file mode 100644
index 0000000..36cedb4
--- /dev/null
+++ b/src/main/java/com/qubole/streamx/s3/wal/PostgresqlWALAccessor.java
@@ -0,0 +1,68 @@
+package com.qubole.streamx.s3.wal;
+
+import com.qubole.streamx.s3.S3Util;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/**
+ * Postgresql WAL Accessor.
+ */
+public class PostgresqlWALAccessor extends AbstractDBWALAccessor {
+ private static final Logger log = LoggerFactory.getLogger(PostgresqlWALAccessor.class);
+
+ public PostgresqlWALAccessor(String connectionURL, String user, String password, String tableName) throws SQLException {
+ super(connectionURL, user, password, tableName);
+ }
+
+ @Override
+ public void createLeaseTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ statement.setQueryTimeout(30); // set timeout to 30 sec.
+
+ log.info("Creating lease table");
+ String sql = String.format("CREATE TABLE %s (\n" +
+ " ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
+ " id int DEFAULT NULL,\n" +
+ " wal VARCHAR" +
+ ") ", AbstractDBWALAccessor.LEASE_TABLE_NAME);
+ createTableIfNotExists(AbstractDBWALAccessor.LEASE_TABLE_NAME, sql);
+
+ log.info("Creating trigger on update, update timestamp field");
+ sql = String.format("CREATE FUNCTION update_ts_column() RETURNS trigger\n" +
+ " LANGUAGE plpgsql\n" +
+ " AS $$\n" +
+ " BEGIN\n" +
+ " NEW.ts = NOW();\n" +
+ " RETURN NEW;\n" +
+ " END;\n" +
+ "$$;");
+ statement.executeUpdate(sql);
+
+ try{
+ sql = String.format("CREATE TRIGGER least_table_ts_modtime BEFORE UPDATE ON %s FOR EACH ROW EXECUTE PROCEDURE update_ts_column();", AbstractDBWALAccessor.LEASE_TABLE_NAME);
+ statement.executeUpdate(sql);
+ } catch(SQLException e){
+ log.info("CREATE TRIGGER on update exception is ignored", e);
+ }
+
+
+ }
+
+ @Override
+ public void createWalTable(String tablePrefixName, TopicPartition topicPartition) throws SQLException {
+ String topicName = topicPartition.topic();
+ int partitionId = topicPartition.partition();
+
+ String tableName = tablePrefixName + "_" + S3Util.cleanTopicNameForDBWal(topicName) + "_" + partitionId;
+
+ String sql = String.format(
+ "create table %s (id SERIAL, tempFiles VARCHAR, committedFiles VARCHAR, primary key (id))",
+ tableName
+ );
+ createTableIfNotExists(tableName, sql);
+ }
+}
diff --git a/src/test/java/com/qubole/streamx/s3/S3UtilTest.java b/src/test/java/com/qubole/streamx/s3/S3UtilTest.java
new file mode 100644
index 0000000..f1f5b8e
--- /dev/null
+++ b/src/test/java/com/qubole/streamx/s3/S3UtilTest.java
@@ -0,0 +1,14 @@
+package com.qubole.streamx.s3;
+
+import org.junit.Test;
+
+public class S3UtilTest {
+ @Test
+ public void cleanTopicNameForDBWal() throws Exception {
+ String name = "S3.Test";
+ String expectedName = "s3_test";
+
+ assert S3Util.cleanTopicNameForDBWal(name).equals(expectedName);
+ }
+
+}
\ No newline at end of file