diff --git a/docker-compose/db-plugins-env/docker-compose.yml b/docker-compose/db-plugins-env/docker-compose.yml
index 2b5206e9f..2df4c7ad4 100644
--- a/docker-compose/db-plugins-env/docker-compose.yml
+++ b/docker-compose/db-plugins-env/docker-compose.yml
@@ -80,7 +80,6 @@ services:
     sysctls:
       - kernel.shmmax=1073741824
       - net.ipv4.ip_local_port_range=60000 65535
-      - kernel.shmmni=524288
       - kernel.shmall=8388608
     extra_hosts:
       # Alter this if running on non-Linux machine
diff --git a/saphana-plugin/pom.xml b/saphana-plugin/pom.xml
index 01bb2c815..de0b1a2f1 100644
--- a/saphana-plugin/pom.xml
+++ b/saphana-plugin/pom.xml
@@ -51,6 +51,12 @@
       test-jar
       test
     
+    
+      com.sap.cloud.db.jdbc
+      ngdbc
+      2.3.48
+      test
+    
     
       io.cdap.cdap
       hydrator-test
diff --git a/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java b/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java
index 4d14c6bfd..a2620d490 100644
--- a/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java
+++ b/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java
@@ -31,8 +31,11 @@
 @Description("Runs a SAP HANA query after a pipeline run.")
 public class SapHanaPostAction extends AbstractQueryAction {
 
+  private final SapHanaQueryActionConfig sapHanaQueryActionConfig;
+
   public SapHanaPostAction(SapHanaQueryActionConfig sapHanaQueryActionConfig) {
     super(sapHanaQueryActionConfig, false);
+    this.sapHanaQueryActionConfig = sapHanaQueryActionConfig;
   }
 
   /**
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java
new file mode 100644
index 000000000..9e63bc272
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.etl.api.action.Action;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.mock.batch.MockSource;
+import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.etl.proto.v2.ETLStage;
+import io.cdap.cdap.proto.artifact.AppRequest;
+import io.cdap.cdap.proto.id.ApplicationId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.plugin.db.batch.action.QueryConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+public class SapHanaActionTestRun extends SapHanaPluginTestBase {
+
+    protected static final String ACTION_TEST_TABLE_NAME = "DB_ACTION_TEST";
+
+
+    @Before
+    public void createTables() throws Exception {
+        Connection conn = createConnection();
+        Statement statement = conn.createStatement();
+        statement.execute("CREATE TABLE " + ACTION_TEST_TABLE_NAME + "(ID INT);");
+        statement.execute("INSERT INTO " + ACTION_TEST_TABLE_NAME + " VALUES(1)");
+        statement.execute("INSERT INTO " + ACTION_TEST_TABLE_NAME + " VALUES(2)");
+    }
+
+
+    @Test
+    public void testDBAction() throws Exception {
+        ETLStage source = new ETLStage("source", MockSource.getPlugin("actionInput"));
+        ETLStage sink = new ETLStage("sink", MockSink.getPlugin("actionOutput"));
+        ETLStage action = new ETLStage("action", new ETLPlugin(
+                SapHanaConstants.PLUGIN_NAME,
+                Action.PLUGIN_TYPE,
+                ImmutableMap.builder()
+                        .putAll(BASE_PROPS)
+                        .put(QueryConfig.QUERY, "DELETE from " + ACTION_TEST_TABLE_NAME + " where ID=1")
+                        .build(),
+                null));
+        ETLBatchConfig config = ETLBatchConfig.builder()
+                .addStage(source)
+                .addStage(sink)
+                .addStage(action)
+                .addConnection(sink.getName(), action.getName())
+                .addConnection(source.getName(), sink.getName())
+                .build();
+        AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config);
+        ApplicationId appId = NamespaceId.DEFAULT.app("actionTest");
+        ApplicationManager appManager = deployApplication(appId, appRequest);
+        runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0"));
+
+        Connection connection = createConnection();
+        Statement statement = connection.createStatement();
+
+        ResultSet results = statement.executeQuery("select * from " + ACTION_TEST_TABLE_NAME);
+        results.next();
+        int id = results.getInt("ID");
+        Assert.assertEquals(id, 2);
+        Assert.assertNotEquals(id, 1);
+        Assert.assertFalse(results.next());
+
+    }
+
+    @After
+    public void dropTables() throws Exception {
+        Connection conn = createConnection();
+        Statement statement = conn.createStatement();
+        statement.execute("DROP TABLE " + ACTION_TEST_TABLE_NAME + ";");
+    }
+
+}
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java
new file mode 100644
index 000000000..42d894976
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.sap.db.jdbc.Driver;
+import io.cdap.cdap.api.artifact.ArtifactSummary;
+import io.cdap.cdap.api.plugin.PluginClass;
+import io.cdap.cdap.datapipeline.DataPipelineApp;
+import io.cdap.cdap.proto.id.ArtifactId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.db.DBRecord;
+import io.cdap.plugin.db.batch.DatabasePluginTestBase;
+import io.cdap.plugin.db.batch.sink.ETLDBOutputFormat;
+import io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat;
+import org.junit.BeforeClass;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Collections;
+import java.util.Map;
+
+
+public abstract class SapHanaPluginTestBase extends DatabasePluginTestBase {
+
+
+    protected static final String JDBC_DRIVER_NAME = "sap";
+    protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "3.2.0");
+    protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0");
+    protected static final Map BASE_PROPS = ImmutableMap.builder()
+            .put(ConnectionConfig.HOST, System.getProperty("saphana.host", "localhost"))
+            .put(ConnectionConfig.PORT, System.getProperty("saphana.port", "39017"))
+            .put(ConnectionConfig.DATABASE, System.getProperty("sapahana.database", "SYSTEMDB"))
+            .put(ConnectionConfig.USER, System.getProperty("sapahana.user", "SYSTEM"))
+            .put(ConnectionConfig.PASSWORD, System.getProperty("sapahana.password", "SAPhxe123"))
+            .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME)
+            .build();
+    private static String connectionUrl;
+    private static Boolean setupCompleted = false;
+
+
+    @BeforeClass
+    public static void setupTest() throws Exception {
+        if (setupCompleted) {
+            return;
+        }
+        System.out.println("Setting up batch artifacts");
+        setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);
+        System.out.println("Adding plugin artifact");
+        addPluginArtifact(NamespaceId.DEFAULT.artifact(SapHanaConstants.PLUGIN_NAME, "1.0.0"),
+                DATAPIPELINE_ARTIFACT_ID, SapHanaSource.class, DBRecord.class, ETLDBOutputFormat.class,
+                DataDrivenETLDBInputFormat.class, SapHanaAction.class, SapHanaPostAction.class);
+
+        PluginClass sapHanaDriver = new PluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, JDBC_DRIVER_NAME,
+                "SapHana driver class", Driver.class.getName(), null, Collections.emptyMap());
+        addPluginArtifact(NamespaceId.DEFAULT.artifact("saphana-jdbc-connector", "1.0.0"), DATAPIPELINE_ARTIFACT_ID,
+                Sets.newHashSet(sapHanaDriver), Driver.class);
+
+        connectionUrl = "jdbc:sap://" + BASE_PROPS.get(ConnectionConfig.HOST) + ":" +
+                BASE_PROPS.get(ConnectionConfig.PORT) + "/" + BASE_PROPS.get(ConnectionConfig.DATABASE);
+        setupCompleted = true;
+    }
+
+
+    protected static Connection createConnection() throws Exception {
+        Class.forName(Driver.class.getCanonicalName());
+        return DriverManager.getConnection(connectionUrl, BASE_PROPS.get(ConnectionConfig.USER),
+                BASE_PROPS.get(ConnectionConfig.PASSWORD));
+    }
+
+
+}
+
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java
new file mode 100644
index 000000000..714212e6f
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import io.cdap.cdap.common.test.TestSuite;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+
+/**
+ * This is a test suite that runs all the tests for Database plugins.
+ */
+@RunWith(TestSuite.class)
+@Suite.SuiteClasses({
+        SapHanaActionTestRun.class,
+        SapHanaSourceTestRun.class,
+        SapHanaSinkTestRun.class,
+        SapHanaPostActionTestRun.class
+})
+public class SapHanaPluginTestSuite extends SapHanaPluginTestBase {
+
+}
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java
new file mode 100644
index 000000000..1e9de305c
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.etl.api.batch.PostAction;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.mock.batch.MockSource;
+import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.etl.proto.v2.ETLStage;
+import io.cdap.cdap.proto.artifact.AppRequest;
+import io.cdap.cdap.proto.id.ApplicationId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.plugin.common.batch.action.Condition;
+import io.cdap.plugin.db.batch.action.QueryActionConfig;
+import io.cdap.plugin.db.batch.action.QueryConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+public class SapHanaPostActionTestRun extends SapHanaPluginTestBase {
+
+    protected static final String POST_ACTION_TEST_TABLE_NAME = "POST_ACTION_TEST";
+
+    @Before
+    public void createTables() throws Exception {
+        Connection conn = createConnection();
+        Statement statement = conn.createStatement();
+        statement.execute("CREATE TABLE " + POST_ACTION_TEST_TABLE_NAME + "(ID INT);");
+        statement.execute("INSERT INTO " + POST_ACTION_TEST_TABLE_NAME + " VALUES(1)");
+        statement.execute("INSERT INTO " + POST_ACTION_TEST_TABLE_NAME + " VALUES(2)");
+    }
+
+
+    @Test
+    public void testDBPostAction() throws Exception {
+        ETLStage source = new ETLStage("source", MockSource.getPlugin("postActionInput"));
+        ETLStage sink = new ETLStage("sink", MockSink.getPlugin("postActionOutput"));
+        ETLStage action = new ETLStage("postAction", new ETLPlugin(
+                SapHanaConstants.PLUGIN_NAME,
+                PostAction.PLUGIN_TYPE,
+                ImmutableMap.builder()
+                        .putAll(BASE_PROPS)
+                        .put(QueryConfig.QUERY, "DELETE from " + POST_ACTION_TEST_TABLE_NAME + " where ID=1")
+                        .put(QueryActionConfig.RUN_CONDITION, Condition.SUCCESS.name())
+                        .build(),
+                null));
+
+        ETLBatchConfig config = ETLBatchConfig.builder()
+                .addStage(source)
+                .addStage(sink)
+                .addPostAction(action)
+                .addConnection(source.getName(), sink.getName())
+                .build();
+
+
+        AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config);
+        ApplicationId appId = NamespaceId.DEFAULT.app("postActionTest");
+        ApplicationManager appManager = deployApplication(appId, appRequest);
+        runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0"));
+
+        Connection connection = createConnection();
+        Statement statement = connection.createStatement();
+
+        ResultSet results = statement.executeQuery("select * from " + POST_ACTION_TEST_TABLE_NAME);
+        results.next();
+        int id = results.getInt("ID");
+        Assert.assertEquals(id, 2);
+        Assert.assertFalse(results.next());
+        connection.close();
+    }
+
+    @After
+    public void dropTables() throws Exception {
+        Connection conn = createConnection();
+        Statement statement = conn.createStatement();
+        statement.execute("DROP TABLE " + POST_ACTION_TEST_TABLE_NAME + ";");
+    }
+}
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java
new file mode 100644
index 000000000..45e473106
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.table.Table;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.mock.batch.MockSource;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.cdap.test.DataSetManager;
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.db.batch.sink.AbstractDBSink;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class SapHanaSinkTestRun extends SapHanaPluginTestBase {
+
+    public static final String SINK_TEST_TABLE_NAME = "SINK_TEST";
+
+
+    private static final Schema SCHEMA = Schema.recordOf(
+            "dbRecord",
+            Schema.Field.of("ID", Schema.of(Schema.Type.INT))
+    );
+
+    @Before
+    public void createTables() throws Exception {
+        Connection conn = createConnection();
+        Statement statement = conn.createStatement();
+        statement.execute("CREATE TABLE " + SINK_TEST_TABLE_NAME + "(ID INT);");
+    }
+
+
+    private void createInputData(String inputDatasetName) throws Exception {
+        // add some data to the input table
+        DataSetManager inputManager = getDataset(inputDatasetName);
+        List inputRecords = new ArrayList<>();
+        for (int i = 1; i <= 2; i++) {
+            inputRecords.add(StructuredRecord.builder(SCHEMA)
+                    .set("ID", i)
+                    .build());
+        }
+        MockSource.writeInput(inputManager, inputRecords);
+    }
+
+    @Test
+    public void testDBSinkWithExplicitInputSchema() throws Exception {
+        testDBSink("testDBSinkWithExplicitInputSchema", "input-dbsinktest-explicit", true);
+    }
+
+    public void testDBSink(String appName, String inputDatasetName, boolean setInputSchema) throws Exception {
+        ETLPlugin sourceConfig = (setInputSchema)
+                ? MockSource.getPlugin(inputDatasetName, SCHEMA)
+                : MockSource.getPlugin(inputDatasetName);
+
+        ETLPlugin sinkConfig = getSinkConfig();
+
+        ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, appName);
+        createInputData(inputDatasetName);
+        runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(0)));
+        try (Connection conn = createConnection();
+             Statement stmt = conn.createStatement();
+             ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + SINK_TEST_TABLE_NAME)) {
+            Assert.assertTrue(resultSet.next());
+
+        }
+
+    }
+
+    private ETLPlugin getSinkConfig() {
+        return new ETLPlugin(
+                SapHanaConstants.PLUGIN_NAME,
+                BatchSink.PLUGIN_TYPE,
+                ImmutableMap.builder()
+                        .putAll(BASE_PROPS)
+                        .put(AbstractDBSink.DBSinkConfig.TABLE_NAME, SINK_TEST_TABLE_NAME)
+                        .put(Constants.Reference.REFERENCE_NAME, "DBTest")
+                        .build(),
+                null);
+    }
+
+
+    @After
+    public void dropTables() throws Exception {
+        Connection conn = createConnection();
+        Statement statement = conn.createStatement();
+        statement.execute("DROP TABLE " + SINK_TEST_TABLE_NAME + ";");
+    }
+
+}
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java
new file mode 100644
index 000000000..4084293df
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.api.dataset.table.Table;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.cdap.test.DataSetManager;
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.db.batch.source.AbstractDBSource;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+public class SapHanaSourceTestRun extends SapHanaPluginTestBase {
+
+    public static final String SOURCE_TEST_TABLE_NAME = "SOURCE_TEST";
+
+    @Before
+    public void createTables() throws Exception {
+        Connection conn = createConnection();
+        Statement statement = conn.createStatement();
+        statement.execute("CREATE TABLE " + SOURCE_TEST_TABLE_NAME + "(ID INT);");
+        statement.execute("INSERT INTO " + SOURCE_TEST_TABLE_NAME + " VALUES(1)");
+        statement.execute("INSERT INTO " + SOURCE_TEST_TABLE_NAME + " VALUES(2)");
+        statement.execute("INSERT INTO " + SOURCE_TEST_TABLE_NAME + " VALUES(3)");
+    }
+
+
+    @Test
+    public void testDBSource() throws Exception {
+        String importQuery = "SELECT ID from " + SOURCE_TEST_TABLE_NAME + " where $CONDITIONS";
+        String boundingQuery = "SELECT MIN(ID),MAX(ID) from " + SOURCE_TEST_TABLE_NAME + " where $CONDITIONS";
+        String splitBy = "ID";
+
+        ImmutableMap sourceProps = ImmutableMap.builder()
+                .putAll(BASE_PROPS)
+                .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+                .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+                .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+                .put(Constants.Reference.REFERENCE_NAME, "DBTestSource").build();
+
+        ETLPlugin sourceConfig = new ETLPlugin(
+                SapHanaConstants.PLUGIN_NAME,
+                BatchSource.PLUGIN_TYPE,
+                sourceProps
+        );
+
+        ETLPlugin sinkConfig = MockSink.getPlugin("macroOutputTable");
+
+        ApplicationManager appManager = deployETL(sourceConfig, sinkConfig,
+                DATAPIPELINE_ARTIFACT, "testDBMacro");
+        runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0"));
+
+        DataSetManager outputManager = getDataset("macroOutputTable");
+        Assert.assertEquals(3, MockSink.readOutput(outputManager).size());
+    }
+
+    @After
+    public void dropTables() throws Exception {
+        Connection conn = createConnection();
+        Statement statement = conn.createStatement();
+        statement.execute("DROP TABLE " + SOURCE_TEST_TABLE_NAME + ";");
+    }
+
+}