Skip to content

Commit 7a3fca6

Browse files
Amit-CloudSufivikasrathee-cs
authored andcommitted
Add hidden treatTimestampLTZAsTimestamp
1 parent f10bbf9 commit 7a3fca6

File tree

8 files changed

+165
-10
lines changed

8 files changed

+165
-10
lines changed

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ protected DBConnectorPath getDBConnectorPath(String path) {
113113
@Override
114114
protected SchemaReader getSchemaReader(String sessionID) {
115115
return new OracleSourceSchemaReader(sessionID, config.getTreatAsOldTimestamp(),
116-
config.getTreatPrecisionlessNumAsDeci());
116+
config.getTreatPrecisionlessNumAsDeci(),
117+
config.getTreatTimestampLTZAsTimestamp());
117118
}
118119

119120
@Override

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,14 @@ public OracleConnectorConfig(String host, int port, String user, String password
4242
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
4343
String connectionArguments, String connectionType, String database) {
4444
this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null, null,
45-
null);
45+
null, null);
4646
}
4747

4848
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
4949
String connectionArguments, String connectionType, String database,
5050
String role, Boolean useSSL, @Nullable Boolean treatAsOldTimestamp,
51-
@Nullable Boolean treatPrecisionlessNumAsDeci) {
51+
@Nullable Boolean treatPrecisionlessNumAsDeci,
52+
@Nullable Boolean treatTimestampLTZAsTimestamp) {
5253

5354
this.host = host;
5455
this.port = port;
@@ -62,6 +63,7 @@ public OracleConnectorConfig(String host, int port, String user, String password
6263
this.useSSL = useSSL;
6364
this.treatAsOldTimestamp = treatAsOldTimestamp;
6465
this.treatPrecisionlessNumAsDeci = treatPrecisionlessNumAsDeci;
66+
this.treatTimestampLTZAsTimestamp = treatTimestampLTZAsTimestamp;
6567
}
6668

6769
@Override
@@ -98,6 +100,11 @@ public String getConnectionString() {
98100
@Nullable
99101
public Boolean treatPrecisionlessNumAsDeci;
100102

103+
@Name(OracleConstants.TREAT_TIMESTAMP_LTZ_AS_TIMESTAMP)
104+
@Description("A hidden field to handle mapping of Oracle Timestamp_LTZ data type to BQ Timestamp.")
105+
@Nullable
106+
public Boolean treatTimestampLTZAsTimestamp;
107+
101108
@Override
102109
protected int getDefaultPort() {
103110
return 1521;
@@ -128,6 +135,10 @@ public Boolean getTreatPrecisionlessNumAsDeci() {
128135
return Boolean.TRUE.equals(treatPrecisionlessNumAsDeci);
129136
}
130137

138+
public Boolean getTreatTimestampLTZAsTimestamp() {
139+
return Boolean.TRUE.equals(treatTimestampLTZAsTimestamp);
140+
}
141+
131142
@Override
132143
public Properties getConnectionArgumentsProperties() {
133144
Properties prop = super.getConnectionArgumentsProperties();

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ private OracleConstants() {
4545
public static final String USE_SSL = "useSSL";
4646
public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp";
4747
public static final String TREAT_PRECISIONLESSNUM_AS_DECI = "treatPrecisionlessNumAsDeci";
48+
public static final String TREAT_TIMESTAMP_LTZ_AS_TIMESTAMP = "treatTimestampLTZAsTimestamp";
4849

4950
/**
5051
* Constructs the Oracle connection string based on the provided connection type, host, port, and database.

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ protected SchemaReader getSchemaReader() {
6767
// handle schema to make it backward compatible.
6868
boolean treatAsOldTimestamp = oracleSourceConfig.getConnection().getTreatAsOldTimestamp();
6969
boolean treatPrecisionlessNumAsDeci = oracleSourceConfig.getConnection().getTreatPrecisionlessNumAsDeci();
70+
boolean treatTimestampLTZAsTimestamp = oracleSourceConfig.getConnection().getTreatTimestampLTZAsTimestamp();
7071

71-
return new OracleSourceSchemaReader(null, treatAsOldTimestamp, treatPrecisionlessNumAsDeci);
72+
return new OracleSourceSchemaReader(null, treatAsOldTimestamp, treatPrecisionlessNumAsDeci,
73+
treatTimestampLTZAsTimestamp);
7274
}
7375

7476
@Override
@@ -133,10 +135,10 @@ public OracleSourceConfig(String host, int port, String user, String password, S
133135
int defaultBatchValue, int defaultRowPrefetch,
134136
String importQuery, Integer numSplits, int fetchSize,
135137
String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp,
136-
Boolean treatPrecisionlessNumAsDeci) {
138+
Boolean treatPrecisionlessNumAsDeci, Boolean treatTimestampLTZAsTimestamp) {
137139
this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments,
138140
connectionType, database, role, useSSL, treatAsOldTimestamp,
139-
treatPrecisionlessNumAsDeci);
141+
treatPrecisionlessNumAsDeci, treatTimestampLTZAsTimestamp);
140142
this.defaultBatchValue = defaultBatchValue;
141143
this.defaultRowPrefetch = defaultRowPrefetch;
142144
this.fetchSize = fetchSize;

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.common.collect.ImmutableSet;
2020
import io.cdap.cdap.api.data.schema.Schema;
2121
import io.cdap.plugin.db.CommonSchemaReader;
22+
import org.jetbrains.annotations.NotNull;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

@@ -68,15 +69,17 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {
6869
private final String sessionID;
6970
private final Boolean isTimestampOldBehavior;
7071
private final Boolean isPrecisionlessNumAsDecimal;
72+
private final Boolean isTimestampLtzFieldTimestamp;
7173

7274
public OracleSourceSchemaReader() {
73-
this(null, false, false);
75+
this(null, false, false, false);
7476
}
7577
public OracleSourceSchemaReader(@Nullable String sessionID, boolean isTimestampOldBehavior,
76-
boolean isPrecisionlessNumAsDecimal) {
78+
boolean isPrecisionlessNumAsDecimal, boolean isTimestampLtzFieldTimestamp) {
7779
this.sessionID = sessionID;
7880
this.isTimestampOldBehavior = isTimestampOldBehavior;
7981
this.isPrecisionlessNumAsDecimal = isPrecisionlessNumAsDecimal;
82+
this.isTimestampLtzFieldTimestamp = isTimestampLtzFieldTimestamp;
8083
}
8184

8285
@Override
@@ -87,8 +90,7 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
8790
case TIMESTAMP_TZ:
8891
return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
8992
case TIMESTAMP_LTZ:
90-
return isTimestampOldBehavior ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)
91-
: Schema.of(Schema.LogicalType.DATETIME);
93+
return getTimestampLtzSchema();
9294
case Types.TIMESTAMP:
9395
return isTimestampOldBehavior ? super.getSchema(metadata, index) : Schema.of(Schema.LogicalType.DATETIME);
9496
case BINARY_FLOAT:
@@ -139,6 +141,12 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
139141
}
140142
}
141143

144+
private @NotNull Schema getTimestampLtzSchema() {
145+
return isTimestampOldBehavior || isTimestampLtzFieldTimestamp
146+
? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)
147+
: Schema.of(Schema.LogicalType.DATETIME);
148+
}
149+
142150
@Override
143151
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
144152
if (sessionID == null) {
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.oracle;
18+
19+
import com.google.common.collect.Lists;
20+
import io.cdap.cdap.api.data.schema.Schema;
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
import org.junit.runner.RunWith;
24+
import org.mockito.Mockito;
25+
import org.mockito.junit.MockitoJUnitRunner;
26+
27+
import java.sql.ResultSet;
28+
import java.sql.ResultSetMetaData;
29+
import java.sql.SQLException;
30+
import java.util.List;
31+
32+
public class OracleSchemaReaderTest {
33+
34+
@Test
35+
public void getSchema_timestampLTZFieldTrue_returnTimestamp() throws SQLException {
36+
OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(null, false, false, true);
37+
38+
ResultSet resultSet = Mockito.mock(ResultSet.class);
39+
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
40+
41+
Mockito.when(resultSet.getMetaData()).thenReturn(metadata);
42+
43+
Mockito.when(metadata.getColumnCount()).thenReturn(2);
44+
// -101 is for TIMESTAMP_TZ
45+
Mockito.when(metadata.getColumnType(1)).thenReturn(-101);
46+
Mockito.when(metadata.getColumnName(1)).thenReturn("column1");
47+
48+
// -102 is for TIMESTAMP_LTZ
49+
Mockito.when(metadata.getColumnType(2)).thenReturn(-102);
50+
Mockito.when(metadata.getColumnName(2)).thenReturn("column2");
51+
52+
List<Schema.Field> expectedSchemaFields = Lists.newArrayList();
53+
expectedSchemaFields.add(Schema.Field.of("column1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)));
54+
expectedSchemaFields.add(Schema.Field.of("column2", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)));
55+
56+
List<Schema.Field> actualSchemaFields = schemaReader.getSchemaFields(resultSet);
57+
58+
Assert.assertEquals(expectedSchemaFields.get(0).getName(), actualSchemaFields.get(0).getName());
59+
Assert.assertEquals(expectedSchemaFields.get(0).getSchema(), actualSchemaFields.get(0).getSchema());
60+
Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName());
61+
Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema());
62+
63+
}
64+
65+
@Test
66+
public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLException {
67+
OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(null, false, false, false);
68+
69+
ResultSet resultSet = Mockito.mock(ResultSet.class);
70+
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
71+
72+
Mockito.when(resultSet.getMetaData()).thenReturn(metadata);
73+
74+
Mockito.when(metadata.getColumnCount()).thenReturn(2);
75+
// -101 is for TIMESTAMP_TZ
76+
Mockito.when(metadata.getColumnType(1)).thenReturn(-101);
77+
Mockito.when(metadata.getColumnName(1)).thenReturn("column1");
78+
79+
// -102 is for TIMESTAMP_LTZ
80+
Mockito.when(metadata.getColumnType(2)).thenReturn(-102);
81+
Mockito.when(metadata.getColumnName(2)).thenReturn("column2");
82+
83+
List<Schema.Field> expectedSchemaFields = Lists.newArrayList();
84+
expectedSchemaFields.add(Schema.Field.of("column1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)));
85+
expectedSchemaFields.add(Schema.Field.of("column2", Schema.of(Schema.LogicalType.DATETIME)));
86+
87+
List<Schema.Field> actualSchemaFields = schemaReader.getSchemaFields(resultSet);
88+
89+
Assert.assertEquals(expectedSchemaFields.get(0).getName(), actualSchemaFields.get(0).getName());
90+
Assert.assertEquals(expectedSchemaFields.get(0).getSchema(), actualSchemaFields.get(0).getSchema());
91+
Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName());
92+
Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema());
93+
}
94+
}

oracle-plugin/widgets/Oracle-batchsource.json

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,25 @@
158158
]
159159
}
160160
},
161+
{
162+
"widget-type": "hidden",
163+
"label": "Treat Timestamp_LTZ as Timestamp",
164+
"name": "treatTimestampLTZAsTimestamp",
165+
"widget-attributes": {
166+
"layout": "inline",
167+
"default": "false",
168+
"options": [
169+
{
170+
"id": "true",
171+
"label": "true"
172+
},
173+
{
174+
"id": "false",
175+
"label": "false"
176+
}
177+
]
178+
}
179+
},
161180
{
162181
"name": "connectionType",
163182
"label": "Connection Type",

oracle-plugin/widgets/Oracle-connector.json

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,25 @@
167167
}
168168
]
169169
}
170+
},
171+
{
172+
"widget-type": "hidden",
173+
"label": "Treat Timestamp_LTZ as Timestamp",
174+
"name": "treatTimestampLTZAsTimestamp",
175+
"widget-attributes": {
176+
"layout": "inline",
177+
"default": "false",
178+
"options": [
179+
{
180+
"id": "true",
181+
"label": "true"
182+
},
183+
{
184+
"id": "false",
185+
"label": "false"
186+
}
187+
]
188+
}
170189
}
171190
]
172191
},

0 commit comments

Comments
 (0)