|
2 | 2 |
|
3 | 3 | import java.io.IOException; |
4 | 4 | import java.sql.SQLException; |
5 | | -import java.util.Arrays; |
6 | | -import java.util.HashSet; |
7 | | -import java.util.Optional; |
8 | | -import java.util.Properties; |
| 5 | +import java.sql.Types; |
| 6 | +import java.util.*; |
9 | 7 | import org.embulk.config.ConfigDiff; |
10 | 8 | import org.embulk.config.TaskSource; |
11 | | -import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; |
12 | | -import org.embulk.output.jdbc.BatchInsert; |
13 | | -import org.embulk.output.jdbc.JdbcOutputConnection; |
14 | | -import org.embulk.output.jdbc.JdbcOutputConnector; |
15 | | -import org.embulk.output.jdbc.MergeConfig; |
| 9 | +import org.embulk.output.jdbc.*; |
16 | 10 | import org.embulk.output.snowflake.SnowflakeCopyBatchInsert; |
17 | 11 | import org.embulk.output.snowflake.SnowflakeOutputConnection; |
18 | 12 | import org.embulk.output.snowflake.SnowflakeOutputConnector; |
19 | 13 | import org.embulk.output.snowflake.StageIdentifier; |
20 | 14 | import org.embulk.output.snowflake.StageIdentifierHolder; |
| 15 | +import org.embulk.spi.Column; |
| 16 | +import org.embulk.spi.ColumnVisitor; |
21 | 17 | import org.embulk.spi.OutputPlugin; |
22 | 18 | import org.embulk.spi.Schema; |
23 | 19 | import org.embulk.util.config.Config; |
@@ -166,4 +162,71 @@ protected void logConnectionProperties(String url, Properties props) { |
166 | 162 | } |
167 | 163 | logger.info("Connecting to {} options {}", url, maskedProps); |
168 | 164 | } |
| 165 | + |
| 166 | + // TODO This is almost copy from AbstractJdbcOutputPlugin excepting type of JSON -> OBJECT |
| 167 | + // AbstractJdbcOutputPlugin should have better extensibility. |
| 168 | + @Override |
| 169 | + protected JdbcSchema newJdbcSchemaForNewTable(Schema schema) { |
| 170 | + final ArrayList<JdbcColumn> columns = new ArrayList<>(); |
| 171 | + for (Column c : schema.getColumns()) { |
| 172 | + final String columnName = c.getName(); |
| 173 | + c.visit( |
| 174 | + new ColumnVisitor() { |
| 175 | + public void booleanColumn(Column column) { |
| 176 | + columns.add( |
| 177 | + JdbcColumn.newGenericTypeColumn( |
| 178 | + columnName, Types.BOOLEAN, "BOOLEAN", 1, 0, false, false)); |
| 179 | + } |
| 180 | + |
| 181 | + public void longColumn(Column column) { |
| 182 | + columns.add( |
| 183 | + JdbcColumn.newGenericTypeColumn( |
| 184 | + columnName, Types.BIGINT, "BIGINT", 22, 0, false, false)); |
| 185 | + } |
| 186 | + |
| 187 | + public void doubleColumn(Column column) { |
| 188 | + columns.add( |
| 189 | + JdbcColumn.newGenericTypeColumn( |
| 190 | + columnName, Types.FLOAT, "DOUBLE PRECISION", 24, 0, false, false)); |
| 191 | + } |
| 192 | + |
| 193 | + public void stringColumn(Column column) { |
| 194 | + columns.add( |
| 195 | + JdbcColumn.newGenericTypeColumn( |
| 196 | + columnName, |
| 197 | + Types.CLOB, |
| 198 | + "CLOB", |
| 199 | + 4000, |
| 200 | + 0, |
| 201 | + false, |
| 202 | + false)); // TODO size type param |
| 203 | + } |
| 204 | + |
| 205 | + public void jsonColumn(Column column) { |
| 206 | + columns.add( |
| 207 | + JdbcColumn.newGenericTypeColumn( |
| 208 | + columnName, |
| 209 | + Types.OTHER, |
| 210 | + "OBJECT", |
| 211 | + 4000, |
| 212 | + 0, |
| 213 | + false, |
| 214 | + false)); // TODO size type param |
| 215 | + } |
| 216 | + |
| 217 | + public void timestampColumn(Column column) { |
| 218 | + columns.add( |
| 219 | + JdbcColumn.newGenericTypeColumn( |
| 220 | + columnName, |
| 221 | + Types.TIMESTAMP, |
| 222 | + "TIMESTAMP", |
| 223 | + 26, |
| 224 | + 0, |
| 225 | + false, |
| 226 | + false)); // size type param is from postgresql |
| 227 | + } |
| 228 | + }); |
| 229 | + } |
| 230 | + return new JdbcSchema(Collections.unmodifiableList(columns)); |
| 231 | + } |
169 | 232 | } |
0 commit comments