diff --git a/README.md b/README.md index 0b86c59f1..9dbf583a9 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ SAP HANA requires that password for DB is provided through url. Convenience script ```docker-compose/db-plugins-env/saphana-password-server.sh``` provided for this purpose. -Netezza requires VMware Player for running Netezza emulator. +Netezza and Teradata require VMware Player for running emulator. * [Install Docker Compose](https://docs.docker.com/compose/install/) * Build local docker images @@ -66,6 +66,11 @@ grant all on *.* to 'root'@'%' identified by 'root' with grant option; * [Install and start Netezza emulator](http://dwgeek.com/install-vmware-player-netezza-emulator.html/) * Create database `mydb` in Netezza emulator + +* [Install and start Teradata Express](https://downloads.teradata.com/download/files/7671/200652/1/B035-5948-018K.pdf) +* Create database `mydb` in Teradata Express +* Create user `test` with password `test` in Teradata Express + ### Properties #### MySQL * **mysql.host** - Server host. Default: localhost. @@ -110,6 +115,12 @@ grant all on *.* to 'root'@'%' identified by 'root' with grant option; * **memsql.database** - Server namespace for test databases. Default: mydb. * **memsql.username** - Server username. Default: root. * **memsql.password** - Server password. Default: root. +#### Teradata +* **teradata.host** - Server host. Default: localhost. +* **teradata.port** - Server port. Default: 1025. +* **teradata.database** - Server namespace for test databases. Default: mydb. +* **teradata.username** - Server username. Default: test. +* **teradata.password** - Server password. Default: test. #### Aurora MySQL * **auroraMysql.clusterEndpoint** - Cluster endpoint. * **auroraMysql.port** - Server port. diff --git a/aurora-mysql-plugin/pom.xml b/aurora-mysql-plugin/pom.xml index e6fd22af6..76cfbccfb 100644 --- a/aurora-mysql-plugin/pom.xml +++ b/aurora-mysql-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Aurora DB MySQL plugin aurora-mysql-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/aurora-postgresql-plugin/pom.xml b/aurora-postgresql-plugin/pom.xml index 00cf5cd21..2faf8b891 100644 --- a/aurora-postgresql-plugin/pom.xml +++ b/aurora-postgresql-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Aurora DB PostgreSQL plugin aurora-postgresql-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/cloudsql-mysql-plugin/pom.xml b/cloudsql-mysql-plugin/pom.xml index acf871342..1eb1277c5 100644 --- a/cloudsql-mysql-plugin/pom.xml +++ b/cloudsql-mysql-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 CloudSQL MySQL plugin cloudsql-mysql-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/cloudsql-postgresql-plugin/pom.xml b/cloudsql-postgresql-plugin/pom.xml index cd41da6c8..3b35f31f4 100644 --- a/cloudsql-postgresql-plugin/pom.xml +++ b/cloudsql-postgresql-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 CloudSQL PostgreSQL plugin cloudsql-postgresql-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 @@ -46,7 +46,7 @@ io.cdap.plugin postgresql-plugin - 1.5.0-SNAPSHOT + 1.5.0 io.cdap.cdap diff --git a/database-commons/pom.xml b/database-commons/pom.xml index b07e68c00..4d3943710 100644 --- a/database-commons/pom.xml +++ b/database-commons/pom.xml @@ -20,7 +20,7 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Database Commons diff --git a/db2-plugin/pom.xml b/db2-plugin/pom.xml index 89c6c27d3..a042b5518 100644 --- a/db2-plugin/pom.xml +++ b/db2-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 IBM DB2 plugin db2-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/generic-database-plugin/pom.xml b/generic-database-plugin/pom.xml index 4a12082e3..b04a3af07 100644 --- a/generic-database-plugin/pom.xml +++ b/generic-database-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Generic database plugin generic-database-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/mariadb-plugin/pom.xml b/mariadb-plugin/pom.xml index 3024ebefe..d738000a6 100644 --- a/mariadb-plugin/pom.xml +++ b/mariadb-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Maria DB plugin mariadb-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/memsql-plugin/pom.xml b/memsql-plugin/pom.xml index ef07eec8d..bc8709bc8 100644 --- a/memsql-plugin/pom.xml +++ b/memsql-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Memsql plugin memsql-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/mssql-plugin/pom.xml b/mssql-plugin/pom.xml index b5a91f122..c5e1f7646 100644 --- a/mssql-plugin/pom.xml +++ b/mssql-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Microsoft SQL Server plugin mssql-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/mysql-plugin/pom.xml b/mysql-plugin/pom.xml index 83decb285..f178c745c 100644 --- a/mysql-plugin/pom.xml +++ b/mysql-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Mysql plugin mysql-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/netezza-plugin/pom.xml b/netezza-plugin/pom.xml index dff96e54f..44f8af8f5 100644 --- a/netezza-plugin/pom.xml +++ b/netezza-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Netezza plugin netezza-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/oracle-plugin/pom.xml b/oracle-plugin/pom.xml index e25f7efa1..dbebf2316 100644 --- a/oracle-plugin/pom.xml +++ b/oracle-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 Oracle plugin oracle-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/pom.xml b/pom.xml index 78ab99894..28c8d6dbc 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin database-plugins - 1.5.0-SNAPSHOT + 1.5.0 pom Database Plugins Collection of database plugins @@ -42,6 +42,7 @@ saphana-plugin cloudsql-mysql-plugin cloudsql-postgresql-plugin + teradata-plugin diff --git a/postgresql-plugin/pom.xml b/postgresql-plugin/pom.xml index 756dc8c56..c6bebaf1f 100644 --- a/postgresql-plugin/pom.xml +++ b/postgresql-plugin/pom.xml @@ -20,12 +20,12 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 PostgreSQL plugin postgresql-plugin - 1.5.0-SNAPSHOT + 1.5.0 4.0.0 diff --git a/saphana-plugin/pom.xml b/saphana-plugin/pom.xml index 6aa1e06d0..f222eae6d 100644 --- a/saphana-plugin/pom.xml +++ b/saphana-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins io.cdap.plugin - 1.5.0-SNAPSHOT + 1.5.0 SAP HANA plugin diff --git a/teradata-plugin/docs/Teradata-action.md b/teradata-plugin/docs/Teradata-action.md new file mode 100644 index 000000000..c8d386b1a --- /dev/null +++ b/teradata-plugin/docs/Teradata-action.md @@ -0,0 +1,48 @@ +# Teradata Action + + +Description +----------- +Action that runs a Teradata command. + + +Use Case +-------- +The action can be used whenever you want to run a Teradata command before or after a data pipeline. +For example, you may want to run a sql update command on a database before the pipeline source pulls data from tables. + + +Properties +---------- +**Driver Name:** Name of the JDBC driver to use. + +**Database Command:** Database command to execute. + +**Host:** Host that Teradata is running on. + +**Port:** Port that Teradata is running on. + +**Database:** Teradata database name. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments +will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. + +Example +------- +Suppose you want to execute a query against a Teradata database named "prod" that is running on "localhost" +port 1025 (Ensure that the driver for Teradata is installed. You can also provide driver name for some specific driver, +otherwise "teradata" will be used), then configure the plugin with: + +``` +Driver Name: "teradata" +Database Command: "UPDATE table_name SET price = 20 WHERE ID = 6" +Host: "localhost" +Port: 1025 +Database: "prod" +Username: "dbc" +Password: "dbc" +``` diff --git a/teradata-plugin/docs/Teradata-batchsink.md b/teradata-plugin/docs/Teradata-batchsink.md new file mode 100644 index 000000000..0c1709f3f --- /dev/null +++ b/teradata-plugin/docs/Teradata-batchsink.md @@ -0,0 +1,93 @@ +# Teradata Batch Sink + + +Description +----------- +Writes records to a Teradata table. Each record will be written to a row in the table. + + +Use Case +-------- +This sink is used whenever you need to write to a Teradata table. +Suppose you periodically build a recommendation model for products on your online store. +The model is stored in a FileSet and you want to export the contents +of the FileSet to a Teradata table where it can be served to your users. + +Column names would be autodetected from input schema. + +Properties +---------- +**Reference Name:** Name used to uniquely identify this sink for lineage, annotating metadata, etc. + +**Driver Name:** Name of the JDBC driver to use. + +**Host:** Host that Teradata is running on. + +**Port:** Port that Teradata is running on. + +**Database:** Teradata database name. + +**Table Name:** Name of the table to export to. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments +will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. + +Example +------- +Suppose you want to write output records to "users" table of Teradata database named "prod" that is running on "localhost", +port 1025, as "root" user with "root" password (Ensure that the driver for Teradata is installed. You can also provide +driver name for some specific driver, otherwise "teradata" will be used), then configure the plugin with: + +``` +Reference Name: "snk1" +Driver Name: "teradata" +Host: "localhost" +Port: 1025 +Database: "prod" +Table Name: "users" +Username: "dbc" +Password: "dbc" +``` +Data Types Mapping +------ +Teradata specific data types mapped to string and can have multiple input formats and one 'canonical' output form. +Please, refer to Teradata data types documentation to figure out proper formats. + +| Teradata Data Type | CDAP Schema Data Type | Comment | +|-----------------------------------------------------|-----------------------|----------------------------------------------| +| BYTEINT | INT | | +| SMALLINT | INT | | +| INTEGER | INT | | +| BIGINT | LONG | | +| DECIMAL/NUMERIC | DECIMAL | | +| FLOAT/REAL/DOUBLE PRECISION | DOUBLE | | +| NUMBER | DECIMAL | | +| BYTE | BYTES | | +| VARBYTE | BYTES | | +| BLOB | BYTES | | +| CHAR | STRING | | +| VARCHAR | STRING | | +| CLOB | STRING | | +| DATE | DATE | | +| TIME | TIME_MICROS | | +| TIMESTAMP | TIMESTAMP_MICROS | | +| TIME WITH TIME ZONE | TIME_MICROS | | +| TIMESTAMP WITH TIME ZONE | TIMESTAMP_MICROS | | +| INTERVAL YEAR | STRING | | +| INTERVAL YEAR TO MONTH | STRING | | +| INTERVAL MONTH | STRING | | +| INTERVAL DAY | STRING | | +| INTERVAL DAY TO HOUR | STRING | | +| INTERVAL DAY TO MINUTE | STRING | | +| INTERVAL DAY TO SECOND | STRING | | +| INTERVAL HOUR | STRING | | +| INTERVAL HOUR TO MINUTE | STRING | | +| INTERVAL HOUR TO SECOND | STRING | | +| INTERVAL MINUTE | STRING | | +| INTERVAL MINUTE TO SECOND | STRING | | +| INTERVAL SECOND | STRING | | +| ST_Geometry | STRING | | \ No newline at end of file diff --git a/teradata-plugin/docs/Teradata-batchsource.md b/teradata-plugin/docs/Teradata-batchsource.md new file mode 100644 index 000000000..d00069070 --- /dev/null +++ b/teradata-plugin/docs/Teradata-batchsource.md @@ -0,0 +1,121 @@ +# Teradata Batch Source + + +Description +----------- +Reads from a Teradata using a configurable SQL query. +Outputs one record for each row returned by the query. + + +Use Case +-------- +The source is used whenever you need to read from a Teradata. For example, you may want +to create daily snapshots of a database table by using this source and writing to +a TimePartitionedFileSet. + + +Properties +---------- +**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc. + +**Driver Name:** Name of the JDBC driver to use. + +**Host:** Host that Teradata is running on. + +**Port:** Port that Teradata is running on. + +**Database:** Teradata database name. + +**Import Query:** The SELECT query to use to import data from the specified table. +You can specify an arbitrary number of columns to import, or import all columns using \*. The Query should +contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDITIONS'. +The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query. +The '$CONDITIONS' string is not required if numSplits is set to one. + +**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field. +For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one. + +**Split-By Field Name:** Field Name which will be used to generate splits. Not required if numSplits is set to one. + +**Number of Splits to Generate:** Number of splits to generate. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments +will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. + +**Schema:** The schema of records output by the source. This will be used in place of whatever schema comes +back from the query. However, it must match the schema that comes back from the query, +except it can mark fields as nullable and can contain a subset of the fields. + + +Example +------ +Suppose you want to read data from Teradata database named "prod" that is running on "localhost" port 1025, +as "postgres" user with "postgres" password (Ensure that the driver for Teradata is installed. You can also provide +driver name for some specific driver, otherwise "teradata" will be used), then configure plugin with: + + +``` +Reference Name: "src1" +Driver Name: "teradata" +Host: "localhost" +Port: 1025 +Database: "prod" +Import Query: "select id, name, email, phone from users;" +Number of Splits to Generate: 1 +Username: "dbc" +Password: "dbc" +``` + +For example, if the 'id' column is a primary key of type int and the other columns are +non-nullable varchars, output records will have this schema: + + | field name | type | + | -------------- | ------------------- | + | id | int | + | name | string | + | email | string | + | phone | string | + +Data Types Mapping +------ +Teradata specific data types mapped to string and can have multiple input formats and one 'canonical' output form. +Please, refer to Teradata data types documentation to figure out proper formats. + +| Teradata Data Type | CDAP Schema Data Type | Comment | +|-----------------------------------------------------|-----------------------|----------------------------------------------| +| BYTEINT | INT | | +| SMALLINT | INT | | +| INTEGER | INT | | +| BIGINT | LONG | | +| DECIMAL/NUMERIC | DECIMAL | | +| FLOAT/REAL/DOUBLE PRECISION | DOUBLE | | +| NUMBER | DECIMAL | | +| BYTE | BYTES | | +| VARBYTE | BYTES | | +| BLOB | BYTES | | +| CHAR | STRING | | +| VARCHAR | STRING | | +| CLOB | STRING | | +| DATE | DATE | | +| TIME | TIME_MICROS | | +| TIMESTAMP | TIMESTAMP_MICROS | | +| TIME WITH TIME ZONE | TIME_MICROS | | +| TIMESTAMP WITH TIME ZONE | TIMESTAMP_MICROS | | +| INTERVAL YEAR | STRING | | +| INTERVAL YEAR TO MONTH | STRING | | +| INTERVAL MONTH | STRING | | +| INTERVAL DAY | STRING | | +| INTERVAL DAY TO HOUR | STRING | | +| INTERVAL DAY TO MINUTE | STRING | | +| INTERVAL DAY TO SECOND | STRING | | +| INTERVAL HOUR | STRING | | +| INTERVAL HOUR TO MINUTE | STRING | | +| INTERVAL HOUR TO SECOND | STRING | | +| INTERVAL MINUTE | STRING | | +| INTERVAL MINUTE TO SECOND | STRING | | +| INTERVAL SECOND | STRING | | +| ST_Geometry | STRING | | \ No newline at end of file diff --git a/teradata-plugin/docs/Teradata-postaction.md b/teradata-plugin/docs/Teradata-postaction.md new file mode 100644 index 000000000..5ce3e5586 --- /dev/null +++ b/teradata-plugin/docs/Teradata-postaction.md @@ -0,0 +1,59 @@ +# Teradata Query Post-run Action + + +Description +----------- +Runs a Teradata query at the end of the pipeline run. +Can be configured to run only on success, only on failure, or always at the end of the run. + + +Use Case +-------- +The action is used whenever you need to run a query at the end of a pipeline run. +For example, you may have a pipeline that imports data from a database table to +hdfs files. At the end of the run, you may want to run a query that deletes the data +that was read from the table. + + +Properties +---------- +**Run Condition:** When to run the action. Must be 'completion', 'success', or 'failure'. Defaults to 'success'. +If set to 'completion', the action will be executed regardless of whether the pipeline run succeeded or failed. +If set to 'success', the action will only be executed if the pipeline run succeeded. +If set to 'failure', the action will only be executed if the pipeline run failed. + +**Driver Name:** Name of the JDBC driver to use. + +**Query:** Query to run. + +**Host:** Host that Teradata is running on. + +**Port:** Port that Teradata is running on. + +**Database:** Teradata database name. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments +will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. + + +Example +------- +Suppose you want to delete all records from Teradata table "userEvents" of database "prod" running on localhost, +port 1025, without authentication using driver "teradata" if the pipeline completes successfully +(Ensure that the driver for Teradata is installed. You can also driver name for some specific driver, +otherwise "teradata" will be used ), then configure the plugin with: + +``` +Run Condition: "success" +Driver Name: "teradata" +Query: "delete * from userEvents" +Host: "localhost" +Port: 1025 +Database: "prod" +Username: "dbc" +Password: "dbc" +``` diff --git a/teradata-plugin/icons/Teradata-action.png b/teradata-plugin/icons/Teradata-action.png new file mode 100644 index 000000000..480587916 Binary files /dev/null and b/teradata-plugin/icons/Teradata-action.png differ diff --git a/teradata-plugin/icons/Teradata-batchsink.png b/teradata-plugin/icons/Teradata-batchsink.png new file mode 100644 index 000000000..480587916 Binary files /dev/null and b/teradata-plugin/icons/Teradata-batchsink.png differ diff --git a/teradata-plugin/icons/Teradata-batchsource.png b/teradata-plugin/icons/Teradata-batchsource.png new file mode 100644 index 000000000..480587916 Binary files /dev/null and b/teradata-plugin/icons/Teradata-batchsource.png differ diff --git a/teradata-plugin/icons/Teradata-postaction.png b/teradata-plugin/icons/Teradata-postaction.png new file mode 100644 index 000000000..480587916 Binary files /dev/null and b/teradata-plugin/icons/Teradata-postaction.png differ diff --git a/teradata-plugin/pom.xml b/teradata-plugin/pom.xml new file mode 100644 index 000000000..2390f4b23 --- /dev/null +++ b/teradata-plugin/pom.xml @@ -0,0 +1,119 @@ + + + + + + database-plugins + io.cdap.plugin + 1.5.0 + + + teradata-plugin + Teradata plugin + 1.5.0 + 4.0.0 + + + + io.cdap.cdap + cdap-etl-api + + + io.cdap.plugin + database-commons + ${project.version} + + + io.cdap.plugin + hydrator-common + + + com.google.guava + guava + + + + + io.cdap.plugin + database-commons + ${project.version} + test-jar + test + + + io.cdap.cdap + hydrator-test + + + io.cdap.cdap + cdap-data-pipeline + + + junit + junit + + + io.cdap.cdap + cdap-api + provided + + + org.jetbrains + annotations + RELEASE + compile + + + + + + + org.apache.felix + maven-bundle-plugin + 3.3.0 + true + + + <_exportcontents> + io.cdap.plugin.teradata.*; + io.cdap.plugin.db.*; + io.cdap.plugin.util.*; + + *;inline=false;scope=compile + true + lib + + + + + package + + bundle + + + + + + io.cdap + cdap-maven-plugin + + + + + \ No newline at end of file diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataConstants.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataConstants.java new file mode 100644 index 000000000..4340ba97c --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataConstants.java @@ -0,0 +1,25 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata; + +/** + * Teradata Constants. + */ +public final class TeradataConstants { + public static final String PLUGIN_NAME = "Teradata"; + public static final String TERADATA_CONNECTION_STRING_FORMAT = "jdbc:teradata://%s/DATABASE=%s,DBS_PORT=%s%s"; +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataDBRecord.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataDBRecord.java new file mode 100644 index 000000000..d954907fd --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataDBRecord.java @@ -0,0 +1,80 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata; + +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.db.ColumnType; +import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.SchemaReader; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + +/** + * Writable class for Teradata Source/Sink. + */ +public class TeradataDBRecord extends DBRecord { + + /** + * Used in map-reduce. Do not remove. + */ + @SuppressWarnings("unused") + public TeradataDBRecord() { + } + + /** + * Used to construct a DBRecord from a StructuredRecord in the ETL Pipeline. + * + * @param record the {@link StructuredRecord} to construct the {@link TeradataDBRecord} from + */ + public TeradataDBRecord(StructuredRecord record, List columnTypes) { + super(record, columnTypes); + } + + @Override + protected SchemaReader getSchemaReader() { + return new TeradataSchemaReader(); + } + + @Override + protected void writeBytes(PreparedStatement stmt, int fieldIndex, int sqlIndex, Object fieldValue) + throws SQLException { + byte[] byteValue = fieldValue instanceof ByteBuffer ? Bytes.toBytes((ByteBuffer) fieldValue) : (byte[]) fieldValue; + // handles BLOB, BINARY, VARBINARY + stmt.setBytes(sqlIndex, byteValue); + } + + @Override + protected void setField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, + int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException { + Object original = resultSet.getObject(columnIndex); + if (original != null && sqlType == Types.NUMERIC) { + BigDecimal decimal = (BigDecimal) original; + recordBuilder.setDecimal(field.getName(), decimal.setScale(sqlScale, RoundingMode.HALF_EVEN)); + } else { + super.setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); + } + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataSchemaReader.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataSchemaReader.java new file mode 100644 index 000000000..3223b4754 --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataSchemaReader.java @@ -0,0 +1,65 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata; + +import com.google.common.collect.ImmutableSet; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.db.CommonSchemaReader; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Set; + +/** + * Teradata schema reader. + */ +public class TeradataSchemaReader extends CommonSchemaReader { + public static final Set TERADATA_STRING_TYPES = ImmutableSet.of( + "INTERVAL YEAR", + "INTERVAL YEAR TO MONTH", + "INTERVAL MONTH", + "INTERVAL DAY", + "INTERVAL DAY TO HOUR", + "INTERVAL DAY TO MINUTE", + "INTERVAL DAY TO SECOND", + "INTERVAL HOUR", + "INTERVAL HOUR TO MINUTE", + "INTERVAL HOUR TO SECOND", + "INTERVAL MINUTE", + "INTERVAL MINUTE TO SECOND", + "INTERVAL SECOND" + ); + + @Override + public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException { + int sqlType = metadata.getColumnType(index); + String sqlTypeName = metadata.getColumnTypeName(index); + + // Teradata interval types are mapping to String + if (TERADATA_STRING_TYPES.contains(sqlTypeName)) { + return Schema.of(Schema.Type.STRING); + } + + // In Teradata FLOAT and DOUBLE are same types + if (sqlType == Types.FLOAT) { + return Schema.of(Schema.Type.DOUBLE); + } + + return super.getSchema(metadata, index); + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataUtils.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataUtils.java new file mode 100644 index 000000000..e5e3ec09b --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/TeradataUtils.java @@ -0,0 +1,69 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata; + +import com.google.common.base.Strings; +import io.cdap.plugin.common.KeyValueListParser; + +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; + +/** + * Teradata util methods. + */ +public final class TeradataUtils { + + /** + * Creates Teradata specific JDBC connection string. + * + * @param host server host. + * @param port server port. + * @param database database name. + * @param connectionArguments connection arguments. + * @return Teradata specific JDBC connection string + */ + public static String getConnectionString(String host, Integer port, String database, String connectionArguments) { + String arguments = getConnectionArguments(connectionArguments); + return String.format( + TeradataConstants.TERADATA_CONNECTION_STRING_FORMAT, + host, + database, + port, + arguments.length() == 1 ? "" : arguments + ); + } + + /** + * Format Teradata connection parameters. + * + * @param connectionArguments server host. + * @return Teradata connection parameters. + */ + public static String getConnectionArguments(@Nullable String connectionArguments) { + KeyValueListParser kvParser = new KeyValueListParser("\\s*;\\s*", "="); + String result = ""; + + if (!Strings.isNullOrEmpty(connectionArguments)) { + result = StreamSupport.stream(kvParser.parse(connectionArguments).spliterator(), false) + .map(keyVal -> String.format("%s=%s", keyVal.getKey().toUpperCase(), keyVal.getValue())) + .collect(Collectors.joining(",", ",", "")); + } + + return result; + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/action/TeradataAction.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/action/TeradataAction.java new file mode 100644 index 000000000..c399885a0 --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/action/TeradataAction.java @@ -0,0 +1,39 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.action; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.plugin.db.batch.action.AbstractDBAction; +import io.cdap.plugin.teradata.TeradataConstants; + +/** + * Action that runs Teradata command. + */ +@Plugin(type = Action.PLUGIN_TYPE) +@Name(TeradataConstants.PLUGIN_NAME) +@Description("Action that runs a Teradata command") +public class TeradataAction extends AbstractDBAction { + private final TeradataActionConfig config; + + public TeradataAction(TeradataActionConfig config) { + super(config, false); + this.config = config; + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/action/TeradataActionConfig.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/action/TeradataActionConfig.java new file mode 100644 index 000000000..dd7dc6438 --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/action/TeradataActionConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.action; + +import io.cdap.plugin.db.batch.config.DBSpecificQueryConfig; +import io.cdap.plugin.teradata.TeradataUtils; + +/** + * Teradata Action Config. + */ +public class TeradataActionConfig extends DBSpecificQueryConfig { + + @Override + public String getConnectionString() { + return TeradataUtils.getConnectionString(host, port, database, connectionArguments); + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/postaction/TeradataPostAction.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/postaction/TeradataPostAction.java new file mode 100644 index 000000000..704885bfb --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/postaction/TeradataPostAction.java @@ -0,0 +1,39 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.postaction; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.batch.PostAction; +import io.cdap.plugin.db.batch.action.AbstractQueryAction; +import io.cdap.plugin.teradata.TeradataConstants; + +/** + * Represents Teradata post action. + */ +@Plugin(type = PostAction.PLUGIN_TYPE) +@Name(TeradataConstants.PLUGIN_NAME) +@Description("Runs a Teradata query after a pipeline run.") +public class TeradataPostAction extends AbstractQueryAction { + private final TeradataPostActionConfig config; + + public TeradataPostAction(TeradataPostActionConfig config) { + super(config, false); + this.config = config; + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/postaction/TeradataPostActionConfig.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/postaction/TeradataPostActionConfig.java new file mode 100644 index 000000000..dbc518c35 --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/postaction/TeradataPostActionConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.postaction; + +import io.cdap.plugin.db.batch.config.DBSpecificQueryActionConfig; +import io.cdap.plugin.teradata.TeradataUtils; + +/** + * Teradata post action configuration. + */ +public class TeradataPostActionConfig extends DBSpecificQueryActionConfig { + + @Override + public String getConnectionString() { + return TeradataUtils.getConnectionString(host, port, database, connectionArguments); + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/sink/TeradataFieldsValidator.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/sink/TeradataFieldsValidator.java new file mode 100644 index 000000000..b1dc0bcfc --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/sink/TeradataFieldsValidator.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.sink; + +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.db.batch.sink.CommonFieldsValidator; +import io.cdap.plugin.teradata.TeradataSchemaReader; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; + +/** + * Teradata validator for DB fields. + */ +public class TeradataFieldsValidator extends CommonFieldsValidator { + + @Override + public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, int index) throws SQLException { + Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); + Schema.Type fieldType = fieldSchema.getType(); + Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType(); + + int sqlType = metadata.getColumnType(index); + String sqlTypeName = metadata.getColumnTypeName(index); + + // In Teradata FLOAT and DOUBLE are same types + if (fieldType == Schema.Type.DOUBLE && sqlType == Types.FLOAT) { + return true; + } + + // Teradata interval types are mapping to String + if (fieldType == Schema.Type.STRING && TeradataSchemaReader.TERADATA_STRING_TYPES.contains(sqlTypeName)) { + return true; + } + + return isFieldCompatible(fieldType, fieldLogicalType, sqlType); + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/sink/TeradataSink.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/sink/TeradataSink.java new file mode 100644 index 000000000..d8b67a600 --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/sink/TeradataSink.java @@ -0,0 +1,60 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.sink; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.SchemaReader; +import io.cdap.plugin.db.batch.sink.AbstractDBSink; +import io.cdap.plugin.db.batch.sink.FieldsValidator; +import io.cdap.plugin.teradata.TeradataConstants; +import io.cdap.plugin.teradata.TeradataDBRecord; +import io.cdap.plugin.teradata.TeradataSchemaReader; + +/** + * Sink support for a Teradata database. + */ +@Plugin(type = BatchSink.PLUGIN_TYPE) +@Name(TeradataConstants.PLUGIN_NAME) +@Description("Writes records to a Teradata table. Each record will be written in a row in the table") +public class TeradataSink extends AbstractDBSink { + private final TeradataSinkConfig config; + + public TeradataSink(TeradataSinkConfig config) { + super(config); + this.config = config; + } + + @Override + protected SchemaReader getSchemaReader() { + return new TeradataSchemaReader(); + } + + @Override + protected FieldsValidator getFieldsValidator() { + return new TeradataFieldsValidator(); + } + + @Override + protected DBRecord getDBRecord(StructuredRecord output) { + return new TeradataDBRecord(output, columnTypes); + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/sink/TeradataSinkConfig.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/sink/TeradataSinkConfig.java new file mode 100644 index 000000000..1d7db72d9 --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/sink/TeradataSinkConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.sink; + +import io.cdap.plugin.db.batch.config.DBSpecificSinkConfig; +import io.cdap.plugin.teradata.TeradataUtils; + +/** + * Teradata sink config. + */ +public class TeradataSinkConfig extends DBSpecificSinkConfig { + + @Override + public String getConnectionString() { + return TeradataUtils.getConnectionString(host, port, database, connectionArguments); + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/source/TeradataSource.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/source/TeradataSource.java new file mode 100644 index 000000000..ae7df707a --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/source/TeradataSource.java @@ -0,0 +1,59 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.source; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.plugin.db.SchemaReader; +import io.cdap.plugin.db.batch.source.AbstractDBSource; +import io.cdap.plugin.teradata.TeradataConstants; +import io.cdap.plugin.teradata.TeradataDBRecord; +import io.cdap.plugin.teradata.TeradataSchemaReader; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +/** + * Batch source to read from Teradata. + */ +@Plugin(type = BatchSource.PLUGIN_TYPE) +@Name(TeradataConstants.PLUGIN_NAME) +@Description("Reads from a database table(s) using a configurable SQL query." + + " Outputs one record for each row returned by the query.") +public class TeradataSource extends AbstractDBSource { + private final TeradataSourceConfig config; + + public TeradataSource(TeradataSourceConfig config) { + super(config); + this.config = config; + } + + @Override + protected Class getDBRecordType() { + return TeradataDBRecord.class; + } + + @Override + protected String createConnectionString() { + return config.getConnectionString(); + } + + @Override + protected SchemaReader getSchemaReader() { + return new TeradataSchemaReader(); + } +} diff --git a/teradata-plugin/src/main/java/io/cdap/plugin/teradata/source/TeradataSourceConfig.java b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/source/TeradataSourceConfig.java new file mode 100644 index 000000000..1826ce59d --- /dev/null +++ b/teradata-plugin/src/main/java/io/cdap/plugin/teradata/source/TeradataSourceConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.source; + +import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; +import io.cdap.plugin.teradata.TeradataUtils; + +/** + * Teradata source config. + */ +public class TeradataSourceConfig extends DBSpecificSourceConfig { + + @Override + public String getConnectionString() { + return TeradataUtils.getConnectionString(host, port, database, connectionArguments); + } +} diff --git a/teradata-plugin/src/test/java/io/cdap/plugin/teradata/TeradataPluginTestBase.java b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/TeradataPluginTestBase.java new file mode 100644 index 000000000..99c5caf97 --- /dev/null +++ b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/TeradataPluginTestBase.java @@ -0,0 +1,270 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.plugin.PluginClass; +import io.cdap.cdap.datapipeline.DataPipelineApp; +import io.cdap.cdap.proto.id.ArtifactId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.TestConfiguration; +import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.batch.DatabasePluginTestBase; +import io.cdap.plugin.db.batch.sink.ETLDBOutputFormat; +import io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat; +import io.cdap.plugin.teradata.action.TeradataAction; +import io.cdap.plugin.teradata.postaction.TeradataPostAction; +import io.cdap.plugin.teradata.sink.TeradataSink; +import io.cdap.plugin.teradata.source.TeradataSource; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Calendar; +import java.util.Collections; +import java.util.Map; +import java.util.TimeZone; + +public abstract class TeradataPluginTestBase extends DatabasePluginTestBase { + protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "3.2.0"); + protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0"); + protected static final long CURRENT_TS = System.currentTimeMillis(); + + protected static final String JDBC_DRIVER_NAME = "teradata"; + protected static final String DRIVER_CLASS = "com.teradata.jdbc.TeraDriver"; + + protected static String connectionUrl; + protected static final int YEAR; + protected static final int PRECISION = 10; + protected static final int SCALE = 6; + protected static final ZoneId UTC_ZONE = ZoneId.ofOffset("UTC", ZoneOffset.UTC); + protected static boolean tearDown = true; + private static int startCount; + + @ClassRule + public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false); + + static { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date(CURRENT_TS)); + YEAR = calendar.get(Calendar.YEAR); + } + + protected static final Map BASE_PROPS = ImmutableMap.builder() + .put(ConnectionConfig.HOST, System.getProperty("teradata.host", "localhost")) + .put(ConnectionConfig.PORT, System.getProperty("teradata.port", "1025")) + .put(ConnectionConfig.DATABASE, System.getProperty("teradata.database", "mydb")) + .put(ConnectionConfig.USER, System.getProperty("teradata.username", "test")) + .put(ConnectionConfig.PASSWORD, System.getProperty("teradata.password", "test")) + .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME) + .build(); + + @BeforeClass + public static void setupTest() throws Exception { + if (startCount++ > 0) { + return; + } + + setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class); + + addPluginArtifact(NamespaceId.DEFAULT.artifact(JDBC_DRIVER_NAME, "1.0.0"), + DATAPIPELINE_ARTIFACT_ID, + TeradataSource.class, TeradataSink.class, TeradataDBRecord.class, ETLDBOutputFormat.class, + DataDrivenETLDBInputFormat.class, DBRecord.class, TeradataAction.class, TeradataPostAction.class); + + Class driverClass = Class.forName(DRIVER_CLASS); + + // add mysql 3rd party plugin + PluginClass teradtaDriver = new PluginClass( + ConnectionConfig.JDBC_PLUGIN_TYPE, JDBC_DRIVER_NAME, "teradata driver class", + driverClass.getName(), + null, + Collections.emptyMap() + ); + + addPluginArtifact(NamespaceId.DEFAULT.artifact("teradata-jdbc-connector", "1.0.0"), + DATAPIPELINE_ARTIFACT_ID, + Sets.newHashSet(teradtaDriver), driverClass); + + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + + connectionUrl = TeradataUtils.getConnectionString( + BASE_PROPS.get(ConnectionConfig.HOST), + Integer.parseInt(BASE_PROPS.get(ConnectionConfig.PORT)), + BASE_PROPS.get(ConnectionConfig.DATABASE), + "" + ); + Connection conn = createConnection(); + + createTestTables(conn); + prepareTestData(conn); + } + + protected static void createTestTables(Connection conn) throws SQLException { + try (Statement stmt = conn.createStatement()) { + String columns = "ID INTEGER NOT NULL, " + + "NAME VARCHAR(40) NOT NULL, " + + "SCORE DOUBLE PRECISION, " + + "DATE_COL DATE, " + + "GRADUATED BYTEINT, " + + "NOT_IMPORTED VARCHAR(30), " + + "CHAR_COL CHAR(100)," + + "VARCHAR_COL VARCHAR(100)," + + "CLOB_COL CLOB," + + "BINARY_COL BYTE(5)," + + "VARBINARY_COL VARBYTE(20)," + + "BLOB_COL BLOB, " + + "SMALL SMALLINT, " + + "BIG BIGINT, " + + "DECIMAL_COL DECIMAL(" + PRECISION + "," + SCALE + "), " + + "NUMBER_COL NUMBER(" + PRECISION + "," + SCALE + "), " + + "TIME_COL TIME, " + + "TIMESTAMP_COL TIMESTAMP, " + + "TIMETZ_COL TIME With Time Zone, " + + "TIMESTAMPTZ_COL TIMESTAMP WITH TIME ZONE, " + + "INTERVAL_YEAR_COL INTERVAL YEAR (4), " + + "INTERVAL_YEAR_TO_MONTH_COL INTERVAL YEAR(4) TO MONTH, " + + "INTERVAL_MONTH_COL INTERVAL MONTH(2), " + + "INTERVAL_DAY_COL INTERVAL DAY(2), " + + "INTERVAL_DAY_TO_HOUR_COL INTERVAL DAY(2) TO HOUR, " + + "INTERVAL_DAY_TO_MINUTE_COL INTERVAL DAY(2) TO MINUTE, " + + "INTERVAL_DAY_TO_SECOND_COL INTERVAL DAY(2) TO SECOND(3), " + + "INTERVAL_HOUR_COL INTERVAL HOUR(2), " + + "INTERVAL_HOUR_TO_MINUTE_COL INTERVAL HOUR(2) TO MINUTE, " + + "INTERVAL_HOUR_TO_SECOND_COL INTERVAL HOUR(2) TO SECOND(3), " + + "INTERVAL_MINUTE_COL INTERVAL MINUTE(2), " + + "INTERVAL_MINUTE_TO_SECOND_COL INTERVAL MINUTE(2) TO SECOND(3), " + + "INTERVAL_SECOND_COL INTERVAL SECOND(2,3), " + + "ST_GEOMETRY_COL ST_Geometry"; + + stmt.execute("CREATE TABLE my_table(" + columns + ")"); + stmt.execute("CREATE TABLE your_table(" + columns + ")"); + stmt.execute("CREATE TABLE MY_DEST_TABLE(" + columns + ")"); + + // create a table that the action will truncate at the end of the run + stmt.execute("CREATE TABLE dbActionTest (x int, \"day\" varchar(10))"); + // create a table that the action will truncate at the end of the run + stmt.execute("CREATE TABLE postActionTest (x INT, \"day\" VARCHAR(10))"); + } + } + + protected static void prepareTestData(Connection conn) throws SQLException { + String insertTemplate = "INSERT INTO %s VALUES(" + + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," + + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?" + + ")"; + + try ( + Statement stmt = conn.createStatement(); + + PreparedStatement pStmt1 = + conn.prepareStatement(String.format(insertTemplate, "my_table")); + PreparedStatement pStmt2 = + conn.prepareStatement(String.format(insertTemplate, "your_table"))) { + + stmt.execute("insert into dbActionTest values (1, '1970-01-01')"); + stmt.execute("insert into postActionTest values (1, '1970-01-01')"); + + populateData(pStmt1, pStmt2); + } + } + + private static void populateData(PreparedStatement... stmts) throws SQLException { + // insert the same data into both tables: my_table and your_table + for (PreparedStatement pStmt : stmts) { + for (int i = 1; i <= 5; i++) { + String name = "user" + i; + pStmt.setInt(1, i); + pStmt.setString(2, name); + pStmt.setDouble(3, 123.45 + i); + pStmt.setDate(4, new Date(CURRENT_TS)); + pStmt.setBoolean(5, (i % 2 == 0)); + pStmt.setString(6, "random" + i); + pStmt.setString(7, name); + pStmt.setString(8, name); + pStmt.setString(9, name); + pStmt.setBytes(10, name.getBytes()); + pStmt.setBytes(11, name.getBytes()); + pStmt.setBytes(12, name.getBytes()); + pStmt.setShort(13, (short) i); + pStmt.setLong(14, (long) i); + pStmt.setBigDecimal(15, new BigDecimal(123.45).add(new BigDecimal(i))); + pStmt.setBigDecimal(16, new BigDecimal(54.65).add(new BigDecimal(i))); + pStmt.setTime(17, new Time(CURRENT_TS)); + pStmt.setTimestamp(18, new Timestamp(CURRENT_TS)); + pStmt.setTime(19, new Time(CURRENT_TS)); + pStmt.setTimestamp(20, new Timestamp(CURRENT_TS)); + pStmt.setString(21, "2019"); + pStmt.setString(22, "2019-10"); + pStmt.setString(23, "10"); + pStmt.setString(24, "11"); + pStmt.setString(25, "11 12"); + pStmt.setString(26, "11 12:13"); + pStmt.setString(27, "11 12:13:14.567"); + pStmt.setString(28, "12"); + pStmt.setString(29, "12:13"); + pStmt.setString(30, "12:13:14.567"); + pStmt.setString(31, "13"); + pStmt.setString(32, "13:14.567"); + pStmt.setString(33, "14.567"); + pStmt.setString(34, "POINT (10 20)"); + pStmt.executeUpdate(); + } + } + } + + public static Connection createConnection() { + try { + Class.forName(DRIVER_CLASS); + return DriverManager.getConnection(connectionUrl, BASE_PROPS.get(ConnectionConfig.USER), + BASE_PROPS.get(ConnectionConfig.PASSWORD)); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @AfterClass + public static void tearDownDB() throws SQLException { + if (!tearDown) { + return; + } + + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement()) { + stmt.execute("DROP TABLE my_table"); + stmt.execute("DROP TABLE your_table"); + stmt.execute("DROP TABLE MY_DEST_TABLE"); + stmt.execute("DROP TABLE postActionTest"); + stmt.execute("DROP TABLE dbActionTest"); + } + } +} diff --git a/teradata-plugin/src/test/java/io/cdap/plugin/teradata/TeradataPluginTestSuite.java b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/TeradataPluginTestSuite.java new file mode 100644 index 000000000..ac48296e9 --- /dev/null +++ b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/TeradataPluginTestSuite.java @@ -0,0 +1,50 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata; + +import io.cdap.cdap.common.test.TestSuite; +import io.cdap.plugin.teradata.action.TeradataActionTestRun; +import io.cdap.plugin.teradata.postaction.TeradataPostActionTestRun; +import io.cdap.plugin.teradata.sink.TeradataSinkTestRun; +import io.cdap.plugin.teradata.source.TeradataSourceTestRun; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * This is a test suite that runs all the tests for Database plugins. + */ +@RunWith(TestSuite.class) +@Suite.SuiteClasses({ + TeradataSourceTestRun.class, + TeradataSinkTestRun.class, + TeradataActionTestRun.class, + TeradataPostActionTestRun.class +}) +public class TeradataPluginTestSuite extends TeradataPluginTestBase { + + @BeforeClass + public static void setup() { + tearDown = false; + } + + @AfterClass + public static void tearDown() throws Exception { + tearDown = true; + } +} diff --git a/teradata-plugin/src/test/java/io/cdap/plugin/teradata/action/TeradataActionTestRun.java b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/action/TeradataActionTestRun.java new file mode 100644 index 000000000..c55eb39ba --- /dev/null +++ b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/action/TeradataActionTestRun.java @@ -0,0 +1,75 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.action; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.batch.MockSource; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.plugin.db.batch.action.QueryConfig; +import io.cdap.plugin.teradata.TeradataConstants; +import io.cdap.plugin.teradata.TeradataPluginTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +public class TeradataActionTestRun extends TeradataPluginTestBase { + + @Test + public void testDBAction() throws Exception { + + ETLStage source = new ETLStage("source", MockSource.getPlugin("actionInput")); + ETLStage sink = new ETLStage("sink", MockSink.getPlugin("actionOutput")); + ETLStage action = new ETLStage("action", new ETLPlugin( + TeradataConstants.PLUGIN_NAME, + Action.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(QueryConfig.QUERY, "delete from dbActionTest where \"day\" = '${logicalStartTime(yyyy-MM-dd,0m,UTC)}'") + .build(), + null)); + + ETLBatchConfig config = ETLBatchConfig.builder() + .addStage(source) + .addStage(sink) + .addStage(action) + .addConnection(sink.getName(), action.getName()) + .addConnection(source.getName(), sink.getName()) + .build(); + + AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config); + ApplicationId appId = NamespaceId.DEFAULT.app("actionTest"); + ApplicationManager appManager = deployApplication(appId, appRequest); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0")); + + try (Connection connection = createConnection(); + Statement statement = connection.createStatement(); + ResultSet results = statement.executeQuery("select * from dbActionTest")) { + Assert.assertFalse(results.next()); + } + } +} diff --git a/teradata-plugin/src/test/java/io/cdap/plugin/teradata/postaction/TeradataPostActionTestRun.java b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/postaction/TeradataPostActionTestRun.java new file mode 100644 index 000000000..ca911ff2b --- /dev/null +++ b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/postaction/TeradataPostActionTestRun.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.postaction; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.etl.api.batch.PostAction; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.batch.MockSource; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.plugin.common.batch.action.Condition; +import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.db.batch.action.QueryActionConfig; +import io.cdap.plugin.db.batch.action.QueryConfig; +import io.cdap.plugin.teradata.TeradataConstants; +import io.cdap.plugin.teradata.TeradataPluginTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +public class TeradataPostActionTestRun extends TeradataPluginTestBase { + + @Test + public void testAction() throws Exception { + + ETLStage source = new ETLStage("source", MockSource.getPlugin("actionInput")); + ETLStage sink = new ETLStage("sink", MockSink.getPlugin("actionOutput")); + ETLStage action = new ETLStage("action", new ETLPlugin( + TeradataConstants.PLUGIN_NAME, + PostAction.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(QueryConfig.QUERY, "delete from postActionTest where \"day\" = '${logicalStartTime(yyyy-MM-dd,0m,UTC)}'") + .put(ConnectionConfig.ENABLE_AUTO_COMMIT, "false") + .put(QueryActionConfig.RUN_CONDITION, Condition.SUCCESS.name()) + .build(), + null)); + + ETLBatchConfig config = ETLBatchConfig.builder() + .addStage(source) + .addStage(sink) + .addPostAction(action) + .addConnection(source.getName(), sink.getName()) + .build(); + + AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config); + ApplicationId appId = NamespaceId.DEFAULT.app("postActionTest"); + ApplicationManager appManager = deployApplication(appId, appRequest); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0")); + + try (Connection connection = createConnection(); + Statement statement = connection.createStatement(); + ResultSet results = statement.executeQuery("select * from postActionTest")) { + Assert.assertFalse(results.next()); + } + } +} diff --git a/teradata-plugin/src/test/java/io/cdap/plugin/teradata/sink/TeradataSinkTestRun.java b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/sink/TeradataSinkTestRun.java new file mode 100644 index 000000000..d84a7049a --- /dev/null +++ b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/sink/TeradataSinkTestRun.java @@ -0,0 +1,277 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.sink; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.cdap.etl.mock.batch.MockSource; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.plugin.common.Constants; +import io.cdap.plugin.db.CustomAssertions; +import io.cdap.plugin.db.batch.sink.AbstractDBSink; +import io.cdap.plugin.teradata.TeradataConstants; +import io.cdap.plugin.teradata.TeradataPluginTestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.math.RoundingMode; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +public class TeradataSinkTestRun extends TeradataPluginTestBase { + + private static final Schema SCHEMA = Schema.recordOf( + "dbRecord", + Schema.Field.of("ID", Schema.of(Schema.Type.INT)), + Schema.Field.of("NAME", Schema.of(Schema.Type.STRING)), + Schema.Field.of("SCORE", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("GRADUATED", Schema.of(Schema.Type.INT)), + Schema.Field.of("SMALL", Schema.of(Schema.Type.INT)), + Schema.Field.of("BIG", Schema.of(Schema.Type.LONG)), + Schema.Field.of("NUMBER_COL", Schema.decimalOf(PRECISION, SCALE)), + Schema.Field.of("DECIMAL_COL", Schema.decimalOf(PRECISION, SCALE)), + Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)), + Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)), + Schema.Field.of("TIMESTAMP_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)), + Schema.Field.of("TIMETZ_COL", Schema.of(Schema.LogicalType.TIME_MICROS)), + Schema.Field.of("TIMESTAMPTZ_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)), + Schema.Field.of("CHAR_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("VARCHAR_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("BINARY_COL", Schema.of(Schema.Type.BYTES)), + Schema.Field.of("VARBINARY_COL", Schema.of(Schema.Type.BYTES)), + Schema.Field.of("BLOB_COL", Schema.of(Schema.Type.BYTES)), + Schema.Field.of("INTERVAL_YEAR_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_YEAR_TO_MONTH_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_MONTH_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_DAY_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_DAY_TO_HOUR_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_DAY_TO_MINUTE_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_DAY_TO_SECOND_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_HOUR_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_HOUR_TO_MINUTE_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_HOUR_TO_SECOND_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_MINUTE_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_MINUTE_TO_SECOND_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("INTERVAL_SECOND_COL", Schema.of(Schema.Type.STRING)), + Schema.Field.of("ST_GEOMETRY_COL", Schema.of(Schema.Type.STRING)) + ); + + @Before + public void setup() throws Exception { + try (Connection connection = createConnection(); + Statement stmt = connection.createStatement()) { + stmt.execute("DELETE MY_DEST_TABLE ALL"); + } + } + + @Test + public void testDBSinkWithInvalidFieldType() throws Exception { + testDBInvalidFieldType("ID", Schema.Type.STRING, getSinkConfig(), DATAPIPELINE_ARTIFACT); + } + + @Test + public void testDBSinkWithInvalidFieldLogicalType() throws Exception { + testDBInvalidFieldLogicalType("TIMESTAMP_COL", Schema.Type.LONG, getSinkConfig(), DATAPIPELINE_ARTIFACT); + } + + @Test + public void testDBSinkWithDBSchemaAndInvalidData() throws Exception { + // MySQL JDBC connector allows to write integer values to STRING column. Use ENUM column instead. + String enumColumnName = "ENUM_COL"; + startPipelineAndWriteInvalidData(enumColumnName, getSinkConfig(), DATAPIPELINE_ARTIFACT); + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement(); + ResultSet resultSet = stmt.executeQuery("SELECT * FROM MY_DEST_TABLE")) { + testInvalidDataWrite(resultSet, enumColumnName); + } + } + + @Test + public void testDBSinkWithExplicitInputSchema() throws Exception { + testDBSink("testDBSinkWithExplicitInputSchema", "input-dbsinktest-explicit", SCHEMA); + } + + @Test + public void testDBSinkWithInferredInputSchema() throws Exception { + testDBSink("testDBSinkWithInferredInputSchema", "input-dbsinktest-inferred", null); + } + + private void testDBSink(String appName, String inputDatasetName, Schema schema) throws Exception { + ETLPlugin sourceConfig = (schema != null) + ? MockSource.getPlugin(inputDatasetName, schema) + : MockSource.getPlugin(inputDatasetName); + + ETLPlugin sinkConfig = getSinkConfig(); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, appName); + + // Prepare test input data + List inputRecords = createInputData(); + DataSetManager inputManager = getDataset(inputDatasetName); + MockSource.writeInput(inputManager, inputRecords); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement(); + ResultSet actual = stmt.executeQuery("SELECT * FROM MY_DEST_TABLE ORDER BY ID")) { + + for (StructuredRecord expected : inputRecords) { + Assert.assertTrue(actual.next()); + + // Verify data + CustomAssertions.assertObjectEquals(expected.get("ID"), actual.getInt("ID")); + CustomAssertions.assertObjectEquals(expected.get("NAME"), actual.getString("NAME")); + CustomAssertions.assertObjectEquals(expected.get("CHAR_COL"), actual.getString("CHAR_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("VARCHAR_COL"), + actual.getString("VARCHAR_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("GRADUATED"), actual.getInt("GRADUATED")); + Assert.assertNull(actual.getString("NOT_IMPORTED")); + CustomAssertions.assertObjectEquals(expected.get("SMALL"), actual.getInt("SMALL")); + CustomAssertions.assertObjectEquals(expected.get("BIG"), actual.getLong("BIG")); + CustomAssertions.assertNumericEquals(expected.get("SCORE"), actual.getDouble("SCORE")); + CustomAssertions.assertObjectEquals(expected.getDecimal("NUMBER_COL"), + actual.getBigDecimal("NUMBER_COL") + .setScale(SCALE, RoundingMode.HALF_EVEN)); + CustomAssertions.assertObjectEquals(expected.getDecimal("DECIMAL_COL"), + actual.getBigDecimal("DECIMAL_COL")); + + Clob clob = actual.getClob("ST_GEOMETRY_COL"); + CustomAssertions.assertObjectEquals(expected.get("ST_GEOMETRY_COL"), + clob.getSubString(1, (int) clob.length())); + + // Verify interval columns + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_YEAR_COL"), + actual.getString("INTERVAL_YEAR_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_YEAR_TO_MONTH_COL"), + actual.getString("INTERVAL_YEAR_TO_MONTH_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_MONTH_COL"), + actual.getString("INTERVAL_MONTH_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_DAY_COL"), + actual.getString("INTERVAL_DAY_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_DAY_TO_HOUR_COL"), + actual.getString("INTERVAL_DAY_TO_HOUR_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_DAY_TO_MINUTE_COL"), + actual.getString("INTERVAL_DAY_TO_MINUTE_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_DAY_TO_SECOND_COL"), + actual.getString("INTERVAL_DAY_TO_SECOND_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_HOUR_COL"), + actual.getString("INTERVAL_HOUR_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_HOUR_TO_MINUTE_COL"), + actual.getString("INTERVAL_HOUR_TO_MINUTE_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_HOUR_TO_SECOND_COL"), + actual.getString("INTERVAL_HOUR_TO_SECOND_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_MINUTE_COL"), + actual.getString("INTERVAL_MINUTE_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_MINUTE_TO_SECOND_COL"), + actual.getString("INTERVAL_MINUTE_TO_SECOND_COL").trim()); + CustomAssertions.assertObjectEquals(expected.get("INTERVAL_SECOND_COL"), + actual.getString("INTERVAL_SECOND_COL").trim()); + + // Verify binary columns + Assert.assertArrayEquals(expected.get("BINARY_COL"), actual.getBytes("BINARY_COL")); + Assert.assertArrayEquals(expected.get("VARBINARY_COL"), actual.getBytes("VARBINARY_COL")); + Assert.assertArrayEquals(expected.get("BLOB_COL"), actual.getBytes("BLOB_COL")); + + // Verify time columns + Assert.assertEquals(expected.getDate("DATE_COL"), + actual.getDate("DATE_COL").toLocalDate()); + + // compare seconds, since mysql 'time' type does not store milliseconds but 'LocalTime' does + Assert.assertEquals(expected.getTime("TIME_COL").toSecondOfDay(), + actual.getTime("TIME_COL").toLocalTime().toSecondOfDay()); + Assert.assertEquals(expected.getTimestamp("TIMESTAMP_COL"), + actual.getTimestamp("TIMESTAMP_COL").toInstant().atZone(UTC_ZONE)); + Assert.assertEquals(expected.getTime("TIMETZ_COL").toSecondOfDay(), + actual.getTime("TIMETZ_COL").toLocalTime().toSecondOfDay()); + Assert.assertEquals(expected.getTimestamp("TIMESTAMPTZ_COL"), + actual.getTimestamp("TIMESTAMPTZ_COL").toInstant().atZone(UTC_ZONE)); + } + } + } + + private List createInputData() throws Exception { + List inputRecords = new ArrayList<>(); + LocalDateTime localDateTime = new Timestamp(CURRENT_TS).toLocalDateTime(); + for (int i = 1; i <= 2; i++) { + String name = "user" + i; + StructuredRecord.Builder builder = StructuredRecord.builder(SCHEMA) + .set("ID", i) + .set("NAME", name) + .set("SCORE", 3.451) + .set("GRADUATED", (i % 2 == 0) ? 1 : 0) + .set("SMALL", i) + .set("BIG", 3456987L) + .setDecimal("NUMBER_COL", new BigDecimal(3.458d, new MathContext(PRECISION)).setScale(SCALE)) + .setDecimal("DECIMAL_COL", new BigDecimal(3.459d, new MathContext(PRECISION)).setScale(SCALE)) + .setDate("DATE_COL", localDateTime.toLocalDate()) + .setTime("TIME_COL", localDateTime.toLocalTime()) + .setTimestamp("TIMESTAMP_COL", localDateTime.atZone(UTC_ZONE)) + .setTime("TIMETZ_COL", localDateTime.toLocalTime()) + .setTimestamp("TIMESTAMPTZ_COL", localDateTime.atZone(UTC_ZONE)) + .set("CHAR_COL", "char" + i) + .set("VARCHAR_COL", "char" + i) + .set("BINARY_COL", name.getBytes(Charsets.UTF_8)) + .set("VARBINARY_COL", name.getBytes(Charsets.UTF_8)) + .set("BLOB_COL", name.getBytes(Charsets.UTF_8)) + .set("INTERVAL_YEAR_COL", "2019") + .set("INTERVAL_YEAR_TO_MONTH_COL", "2019-10") + .set("INTERVAL_MONTH_COL", "10") + .set("INTERVAL_DAY_COL", "11") + .set("INTERVAL_DAY_TO_HOUR_COL", "11 12") + .set("INTERVAL_DAY_TO_MINUTE_COL", "11 12:13") + .set("INTERVAL_DAY_TO_SECOND_COL", "11 12:13:14.567") + .set("INTERVAL_HOUR_COL", "12") + .set("INTERVAL_HOUR_TO_MINUTE_COL", "12:13") + .set("INTERVAL_HOUR_TO_SECOND_COL", "12:13:14.567") + .set("INTERVAL_MINUTE_COL", "13") + .set("INTERVAL_MINUTE_TO_SECOND_COL", "13:14.567") + .set("INTERVAL_SECOND_COL", "14.567") + .set("ST_GEOMETRY_COL", "POINT (10 20)"); + + inputRecords.add(builder.build()); + } + + return inputRecords; + } + + private ETLPlugin getSinkConfig() { + return new ETLPlugin( + TeradataConstants.PLUGIN_NAME, + BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(AbstractDBSink.DBSinkConfig.TABLE_NAME, "MY_DEST_TABLE") + .put(Constants.Reference.REFERENCE_NAME, "DBTest") + .build(), + null); + } +} diff --git a/teradata-plugin/src/test/java/io/cdap/plugin/teradata/source/TeradataSourceTestRun.java b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/source/TeradataSourceTestRun.java new file mode 100644 index 000000000..a9d618a24 --- /dev/null +++ b/teradata-plugin/src/test/java/io/cdap/plugin/teradata/source/TeradataSourceTestRun.java @@ -0,0 +1,401 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.teradata.source; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.plugin.common.Constants; +import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.db.DBConfig; +import io.cdap.plugin.db.batch.source.AbstractDBSource; +import io.cdap.plugin.teradata.TeradataConstants; +import io.cdap.plugin.teradata.TeradataPluginTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZonedDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TeradataSourceTestRun extends TeradataPluginTestBase { + + @Test + @SuppressWarnings("ConstantConditions") + public void testDBMacroSupport() throws Exception { + String importQuery = "SELECT * FROM my_table WHERE DATE_COL <= '${logicalStartTime(yyyy-MM-dd,1d)}' " + + "AND $CONDITIONS"; + String boundingQuery = "SELECT MIN(ID),MAX(ID) from my_table"; + String splitBy = "ID"; + + ImmutableMap sourceProps = ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery) + .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery) + .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy) + .put(Constants.Reference.REFERENCE_NAME, "DBTestSource").build(); + + ETLPlugin sourceConfig = new ETLPlugin( + TeradataConstants.PLUGIN_NAME, + BatchSource.PLUGIN_TYPE, + sourceProps + ); + + ETLPlugin sinkConfig = MockSink.getPlugin("macroOutputTable"); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testDBMacro"); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + DataSetManager
outputManager = getDataset("macroOutputTable"); + Assert.assertTrue(MockSink.readOutput(outputManager).isEmpty()); + } + + @Test + @SuppressWarnings("ConstantConditions") + public void testDBSource() throws Exception { + String importQuery = "SELECT " + + "ID, " + + "NAME, " + + "SCORE, " + + "DATE_COL, " + + "GRADUATED, " + + "CHAR_COL, " + + "VARCHAR_COL, " + + "CLOB_COL, " + + "BINARY_COL, " + + "VARBINARY_COL, " + + "BLOB_COL, " + + "SMALL, " + + "BIG, " + + "DECIMAL_COL, " + + "NUMBER_COL, " + + "TIME_COL, " + + "TIMESTAMP_COL, " + + "TIMETZ_COL, " + + "TIMESTAMPTZ_COL, " + + "INTERVAL_YEAR_COL, " + + "INTERVAL_YEAR_TO_MONTH_COL, " + + "INTERVAL_MONTH_COL, " + + "INTERVAL_DAY_COL, " + + "INTERVAL_DAY_TO_HOUR_COL, " + + "INTERVAL_DAY_TO_MINUTE_COL, " + + "INTERVAL_DAY_TO_SECOND_COL, " + + "INTERVAL_HOUR_COL, " + + "INTERVAL_HOUR_TO_MINUTE_COL, " + + "INTERVAL_HOUR_TO_SECOND_COL, " + + "INTERVAL_MINUTE_COL, " + + "INTERVAL_MINUTE_TO_SECOND_COL, " + + "INTERVAL_SECOND_COL, " + + "ST_GEOMETRY_COL " + + "FROM my_table WHERE ID < 3 AND $CONDITIONS"; + String boundingQuery = "SELECT MIN(ID),MAX(ID) from my_table"; + String splitBy = "ID"; + ETLPlugin sourceConfig = new ETLPlugin( + TeradataConstants.PLUGIN_NAME, + BatchSource.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery) + .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery) + .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy) + .put(Constants.Reference.REFERENCE_NAME, "DBSourceTest") + .build(), + null + ); + + String outputDatasetName = "output-dbsourcetest"; + ETLPlugin sinkConfig = MockSink.getPlugin(outputDatasetName); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testDBSource"); + runETLOnce(appManager); + + DataSetManager
outputManager = getDataset(outputDatasetName); + List outputRecords = MockSink.readOutput(outputManager); + + Assert.assertEquals(2, outputRecords.size()); + String userid = outputRecords.get(0).get("NAME"); + StructuredRecord row1 = "user1".equals(userid) ? outputRecords.get(0) : outputRecords.get(1); + StructuredRecord row2 = "user1".equals(userid) ? outputRecords.get(1) : outputRecords.get(0); + + // Verify data + Assert.assertEquals("user1", row1.get("NAME")); + Assert.assertEquals("user2", row2.get("NAME")); + Assert.assertEquals("user1", row1.get("CHAR_COL").toString().trim()); + Assert.assertEquals("user2", row2.get("CHAR_COL").toString().trim()); + Assert.assertEquals("user1", row1.get("VARCHAR_COL")); + Assert.assertEquals("user2", row2.get("VARCHAR_COL")); + Assert.assertEquals("user1", row1.get("CLOB_COL")); + Assert.assertEquals("user2", row2.get("CLOB_COL")); + Assert.assertEquals("POINT (10 20)", row1.get("ST_GEOMETRY_COL")); + Assert.assertEquals("POINT (10 20)", row2.get("ST_GEOMETRY_COL")); + + // Verify interval columns + Assert.assertEquals("2019", row1.get("INTERVAL_YEAR_COL").toString().trim()); + Assert.assertEquals("2019", row2.get("INTERVAL_YEAR_COL").toString().trim()); + Assert.assertEquals("2019-10", row1.get("INTERVAL_YEAR_TO_MONTH_COL").toString().trim()); + Assert.assertEquals("2019-10", row2.get("INTERVAL_YEAR_TO_MONTH_COL").toString().trim()); + Assert.assertEquals("10", row1.get("INTERVAL_MONTH_COL").toString().trim()); + Assert.assertEquals("10", row2.get("INTERVAL_MONTH_COL").toString().trim()); + Assert.assertEquals("11", row1.get("INTERVAL_DAY_COL").toString().trim()); + Assert.assertEquals("11", row2.get("INTERVAL_DAY_COL").toString().trim()); + Assert.assertEquals("11 12", row1.get("INTERVAL_DAY_TO_HOUR_COL").toString().trim()); + Assert.assertEquals("11 12", row2.get("INTERVAL_DAY_TO_HOUR_COL").toString().trim()); + Assert.assertEquals("11 12:13", row1.get("INTERVAL_DAY_TO_MINUTE_COL").toString().trim()); + Assert.assertEquals("11 12:13", row2.get("INTERVAL_DAY_TO_MINUTE_COL").toString().trim()); + Assert.assertEquals("11 12:13:14.567", row1.get("INTERVAL_DAY_TO_SECOND_COL").toString().trim()); + Assert.assertEquals("11 12:13:14.567", row2.get("INTERVAL_DAY_TO_SECOND_COL").toString().trim()); + Assert.assertEquals("12", row1.get("INTERVAL_HOUR_COL").toString().trim()); + Assert.assertEquals("12", row2.get("INTERVAL_HOUR_COL").toString().trim()); + Assert.assertEquals("12:13", row1.get("INTERVAL_HOUR_TO_MINUTE_COL").toString().trim()); + Assert.assertEquals("12:13", row2.get("INTERVAL_HOUR_TO_MINUTE_COL").toString().trim()); + Assert.assertEquals("12:13:14.567", row1.get("INTERVAL_HOUR_TO_SECOND_COL").toString().trim()); + Assert.assertEquals("12:13:14.567", row2.get("INTERVAL_HOUR_TO_SECOND_COL").toString().trim()); + Assert.assertEquals("13", row1.get("INTERVAL_MINUTE_COL").toString().trim()); + Assert.assertEquals("13", row2.get("INTERVAL_MINUTE_COL").toString().trim()); + Assert.assertEquals("13:14.567", row1.get("INTERVAL_MINUTE_TO_SECOND_COL").toString().trim()); + Assert.assertEquals("13:14.567", row2.get("INTERVAL_MINUTE_TO_SECOND_COL").toString().trim()); + Assert.assertEquals("14.567", row1.get("INTERVAL_SECOND_COL").toString().trim()); + Assert.assertEquals("14.567", row2.get("INTERVAL_SECOND_COL").toString().trim()); + + // Verify numeric columns + Assert.assertEquals(0, (int) row1.get("GRADUATED")); + Assert.assertEquals(1, (int) row2.get("GRADUATED")); + Assert.assertEquals(124.45, row1.get("SCORE"), 0.000001); + Assert.assertEquals(125.45, row2.get("SCORE"), 0.000001); + Assert.assertEquals(1, (int) row1.get("ID")); + Assert.assertEquals(2, (int) row2.get("ID")); + Assert.assertEquals(1, (int) row1.get("SMALL")); + Assert.assertEquals(2, (int) row2.get("SMALL")); + Assert.assertEquals(1, (long) row1.get("BIG")); + Assert.assertEquals(2, (long) row2.get("BIG")); + Assert.assertEquals(new BigDecimal(124.45, new MathContext(PRECISION)).setScale(SCALE), + row1.getDecimal("DECIMAL_COL")); + Assert.assertEquals(new BigDecimal(125.45, new MathContext(PRECISION)).setScale(SCALE), + row2.getDecimal("DECIMAL_COL")); + Assert.assertEquals(new BigDecimal(55.65, new MathContext(PRECISION)).setScale(SCALE), + row1.getDecimal("NUMBER_COL")); + Assert.assertEquals(new BigDecimal(56.65, new MathContext(PRECISION)).setScale(SCALE), + row2.getDecimal("NUMBER_COL")); + + // Verify time columns + java.util.Date date = new java.util.Date(CURRENT_TS); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + LocalDate expectedDate = Date.valueOf(sdf.format(date)).toLocalDate(); + sdf = new SimpleDateFormat("H:mm:ss"); + LocalTime expectedTime = Time.valueOf(sdf.format(date)).toLocalTime(); + ZonedDateTime expectedTs = date.toInstant().atZone(UTC_ZONE); + Assert.assertEquals(expectedDate, row1.getDate("DATE_COL")); + Assert.assertEquals(expectedTime, row1.getTime("TIME_COL")); + Assert.assertEquals(expectedTs, row1.getTimestamp("TIMESTAMP_COL", UTC_ZONE)); + Assert.assertEquals(expectedTime, row1.getTime("TIMETZ_COL")); + Assert.assertEquals(expectedTs, row1.getTimestamp("TIMESTAMPTZ_COL", UTC_ZONE)); + + // verify binary columns + Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("BINARY_COL")).array(), 0, 5)); + Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("BINARY_COL")).array(), 0, 5)); + Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("VARBINARY_COL")).array(), 0, 5)); + Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("VARBINARY_COL")).array(), 0, 5)); + Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("BLOB_COL")).array(), 0, 5)); + Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("BLOB_COL")).array(), 0, 5)); + } + + @Test + public void testDbSourceMultipleTables() throws Exception { + String importQuery = "SELECT \"my_table\".\"ID\", \"your_table\".\"NAME\" FROM \"my_table\", \"your_table\"" + + "WHERE \"my_table\".\"ID\" < 3 and \"my_table\".\"ID\" = \"your_table\".\"ID\" and $CONDITIONS"; + String boundingQuery = "SELECT MIN(MIN(\"my_table\".\"ID\"), MIN(\"your_table\".\"ID\")), " + + "MAX(MAX(\"my_table\".\"ID\"), MAX(\"your_table\".\"ID\"))"; + String splitBy = "\"my_table\".\"ID\""; + ETLPlugin sourceConfig = new ETLPlugin( + TeradataConstants.PLUGIN_NAME, + BatchSource.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery) + .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery) + .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy) + .put(Constants.Reference.REFERENCE_NAME, "DBMultipleTest") + .build(), + null + ); + + String outputDatasetName = "output-multitabletest"; + ETLPlugin sinkConfig = MockSink.getPlugin(outputDatasetName); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testDBSourceWithMultipleTables"); + runETLOnce(appManager); + + // records should be written + DataSetManager
outputManager = getDataset(outputDatasetName); + List outputRecords = MockSink.readOutput(outputManager); + Assert.assertEquals(2, outputRecords.size()); + String userid = outputRecords.get(0).get("NAME"); + StructuredRecord row1 = "user1".equals(userid) ? outputRecords.get(0) : outputRecords.get(1); + StructuredRecord row2 = "user1".equals(userid) ? outputRecords.get(1) : outputRecords.get(0); + // Verify data + Assert.assertEquals("user1", row1.get("NAME")); + Assert.assertEquals("user2", row2.get("NAME")); + Assert.assertEquals(1, row1.get("ID").intValue()); + Assert.assertEquals(2, row2.get("ID").intValue()); + } + + @Test + public void testUserNamePasswordCombinations() throws Exception { + String importQuery = "SELECT * FROM my_table WHERE $CONDITIONS"; + String boundingQuery = "SELECT MIN(ID),MAX(ID) from my_table"; + String splitBy = "ID"; + + ETLPlugin sinkConfig = MockSink.getPlugin("outputTable"); + + Map baseSourceProps = ImmutableMap.builder() + .put(ConnectionConfig.HOST, BASE_PROPS.get(ConnectionConfig.HOST)) + .put(ConnectionConfig.PORT, BASE_PROPS.get(ConnectionConfig.PORT)) + .put(ConnectionConfig.DATABASE, BASE_PROPS.get(ConnectionConfig.DATABASE)) + .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME) + .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery) + .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery) + .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy) + .put(Constants.Reference.REFERENCE_NAME, "UserPassDBTest") + .build(); + + ApplicationId appId = NamespaceId.DEFAULT.app("dbTest"); + + // null user name, null password. Should succeed. + // as source + ETLPlugin dbConfig = new ETLPlugin(TeradataConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, baseSourceProps, null); + ETLStage table = new ETLStage("uniqueTableSink", sinkConfig); + ETLStage database = new ETLStage("databaseSource", dbConfig); + ETLBatchConfig etlConfig = ETLBatchConfig.builder() + .addStage(database) + .addStage(table) + .addConnection(database.getName(), table.getName()) + .build(); + AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlConfig); + deployApplication(appId, appRequest); + + // null user name, non-null password. Should fail. + // as source + Map noUser = new HashMap<>(baseSourceProps); + noUser.put(DBConfig.PASSWORD, "password"); + database = new ETLStage("databaseSource", new ETLPlugin(TeradataConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, + noUser, null)); + etlConfig = ETLBatchConfig.builder() + .addStage(database) + .addStage(table) + .addConnection(database.getName(), table.getName()) + .build(); + assertDeploymentFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT, + "Deploying DB Source with null username but non-null password should have failed."); + + // non-null username, non-null, but empty password. Should succeed. + // as source + Map emptyPassword = new HashMap<>(baseSourceProps); + emptyPassword.put(DBConfig.USER, "root"); + emptyPassword.put(DBConfig.PASSWORD, ""); + database = new ETLStage("databaseSource", new ETLPlugin(TeradataConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, + emptyPassword, null)); + etlConfig = ETLBatchConfig.builder() + .addStage(database) + .addStage(table) + .addConnection(database.getName(), table.getName()) + .build(); + appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlConfig); + deployApplication(appId, appRequest); + } + + @Test + public void testNonExistentDBTable() throws Exception { + // source + String importQuery = "SELECT ID, NAME FROM dummy WHERE ID < 3 AND $CONDITIONS"; + String boundingQuery = "SELECT MIN(ID),MAX(ID) FROM dummy"; + String splitBy = "ID"; + ETLPlugin sinkConfig = MockSink.getPlugin("table"); + ETLPlugin sourceBadNameConfig = new ETLPlugin( + TeradataConstants.PLUGIN_NAME, + BatchSource.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery) + .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery) + .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy) + .put(Constants.Reference.REFERENCE_NAME, "DBNonExistentTest") + .build(), + null); + ETLStage sink = new ETLStage("sink", sinkConfig); + ETLStage sourceBadName = new ETLStage("sourceBadName", sourceBadNameConfig); + + ETLBatchConfig etlConfig = ETLBatchConfig.builder() + .addStage(sourceBadName) + .addStage(sink) + .addConnection(sourceBadName.getName(), sink.getName()) + .build(); + ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest"); + assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT, + "ETL Application with DB Source should have failed because of a " + + "non-existent source table.", 1); + + // Bad connection + ETLPlugin sourceBadConnConfig = new ETLPlugin( + TeradataConstants.PLUGIN_NAME, + BatchSource.PLUGIN_TYPE, + ImmutableMap.builder() + .put(ConnectionConfig.HOST, BASE_PROPS.get(ConnectionConfig.HOST)) + .put(ConnectionConfig.PORT, BASE_PROPS.get(ConnectionConfig.PORT)) + .put(ConnectionConfig.DATABASE, "dumDB") + .put(ConnectionConfig.USER, BASE_PROPS.get(ConnectionConfig.USER)) + .put(ConnectionConfig.PASSWORD, BASE_PROPS.get(ConnectionConfig.PASSWORD)) + .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME) + .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery) + .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery) + .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy) + .put(Constants.Reference.REFERENCE_NAME, "TeradataTest") + .build(), + null); + ETLStage sourceBadConn = new ETLStage("sourceBadConn", sourceBadConnConfig); + etlConfig = ETLBatchConfig.builder() + .addStage(sourceBadConn) + .addStage(sink) + .addConnection(sourceBadConn.getName(), sink.getName()) + .build(); + assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT, + "ETL Application with DB Source should have failed because of a " + + "non-existent source database.", 2); + } +} diff --git a/teradata-plugin/widgets/Teradata-action.json b/teradata-plugin/widgets/Teradata-action.json new file mode 100644 index 000000000..2ffba361c --- /dev/null +++ b/teradata-plugin/widgets/Teradata-action.json @@ -0,0 +1,79 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Teradata Execute", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "teradata" + } + }, + { + "widget-type": "textbox", + "label": "Host", + "name": "host", + "widget-attributes": { + "default": "localhost" + } + }, + { + "widget-type": "number", + "label": "Port", + "name": "port", + "widget-attributes": { + "default": "1025" + } + }, + { + "widget-type": "textbox", + "label": "Database", + "name": "database" + }, + { + "widget-type": "textarea", + "label": "Database Command", + "name": "query" + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "keyvalue", + "label": "Connection Arguments", + "name": "connectionArguments", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Key", + "value-placeholder": "Value", + "kv-delimiter": "=", + "delimiter": ";" + } + } + ] + } + ] +} diff --git a/teradata-plugin/widgets/Teradata-batchsink.json b/teradata-plugin/widgets/Teradata-batchsink.json new file mode 100644 index 000000000..c4169a64b --- /dev/null +++ b/teradata-plugin/widgets/Teradata-batchsink.json @@ -0,0 +1,95 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Teradata", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName", + "widget-attributes": { + "placeholder": "Name used to identify this sink for lineage" + } + }, + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "teradata" + } + }, + { + "widget-type": "textbox", + "label": "Host", + "name": "host", + "widget-attributes": { + "default": "localhost" + } + }, + { + "widget-type": "number", + "label": "Port", + "name": "port", + "widget-attributes": { + "default": "1025" + } + }, + { + "widget-type": "textbox", + "label": "Database", + "name": "database" + }, + { + "widget-type": "textbox", + "label": "Table Name", + "name": "tableName" + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "keyvalue", + "label": "Connection Arguments", + "name": "connectionArguments", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Key", + "value-placeholder": "Value", + "kv-delimiter": "=", + "delimiter": ";" + } + } + ] + } + ], + "outputs": [], + "jump-config": { + "datasets": [ + { + "ref-property-name": "referenceName" + } + ] + } +} diff --git a/teradata-plugin/widgets/Teradata-batchsource.json b/teradata-plugin/widgets/Teradata-batchsource.json new file mode 100644 index 000000000..7c43a8886 --- /dev/null +++ b/teradata-plugin/widgets/Teradata-batchsource.json @@ -0,0 +1,136 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Teradata", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName", + "widget-attributes": { + "placeholder": "Name used to identify this source for lineage" + } + }, + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "teradata" + } + }, + { + "widget-type": "textbox", + "label": "Host", + "name": "host", + "widget-attributes": { + "default": "localhost" + } + }, + { + "widget-type": "number", + "label": "Port", + "name": "port", + "widget-attributes": { + "default": "1025" + } + }, + { + "widget-type": "textbox", + "label": "Database", + "name": "database" + }, + { + "widget-type": "textarea", + "label": "Import Query", + "name": "importQuery", + "widget-attributes": { + "rows": "4" + } + }, + { + "widget-type": "textarea", + "label": "Bounding Query", + "name": "boundingQuery", + "widget-attributes": { + "rows": "4" + } + }, + { + "widget-type": "textbox", + "label": "Split-By Field Name", + "name": "splitBy" + }, + { + "widget-type": "textbox", + "label": "Number of Splits to Generate", + "name": "numSplits", + "widget-attributes": { + "default": "1" + } + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "keyvalue", + "label": "Connection Arguments", + "name": "connectionArguments", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Key", + "value-placeholder": "Value", + "kv-delimiter": "=", + "delimiter": ";" + } + } + ] + } + ], + "outputs": [ + { + "name": "schema", + "widget-type": "schema", + "widget-attributes": { + "schema-types": [ + "boolean", + "int", + "long", + "float", + "double", + "bytes", + "string" + ], + "schema-default-type": "string" + } + } + ], + "jump-config": { + "datasets": [ + { + "ref-property-name": "referenceName" + } + ] + } +} diff --git a/teradata-plugin/widgets/Teradata-postaction.json b/teradata-plugin/widgets/Teradata-postaction.json new file mode 100644 index 000000000..35ead0013 --- /dev/null +++ b/teradata-plugin/widgets/Teradata-postaction.json @@ -0,0 +1,95 @@ +{ + "metadata": { + "spec-version": "1.0" + }, + "display-name": "Teradata", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "select", + "label": "Run Condition", + "name": "runCondition", + "widget-attributes": { + "values": [ + "completion", + "success", + "failure" + ], + "default": "success" + } + }, + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "teradata" + } + }, + { + "widget-type": "textbox", + "label": "Host", + "name": "host", + "widget-attributes": { + "default": "localhost" + } + }, + { + "widget-type": "number", + "label": "Port", + "name": "port", + "widget-attributes": { + "default": "1025" + } + }, + { + "widget-type": "textbox", + "label": "Database", + "name": "database" + }, + { + "widget-type": "textarea", + "label": "Query", + "name": "query", + "widget-attributes": { + "rows": "4" + } + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "keyvalue", + "label": "Connection Arguments", + "name": "connectionArguments", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Key", + "value-placeholder": "Value", + "kv-delimiter": "=", + "delimiter": ";" + } + } + ] + } + ] +}