22
33import java .io .IOException ;
44import java .sql .SQLException ;
5+ import java .sql .Types ;
6+ import java .util .ArrayList ;
57import java .util .Arrays ;
8+ import java .util .Collections ;
69import java .util .HashSet ;
10+ import java .util .List ;
11+ import java .util .Map ;
712import java .util .Optional ;
813import java .util .Properties ;
14+ import java .util .function .Supplier ;
15+ import java .util .stream .Collectors ;
916import org .embulk .config .ConfigDiff ;
17+ import org .embulk .config .ConfigException ;
1018import org .embulk .config .TaskSource ;
1119import org .embulk .output .jdbc .AbstractJdbcOutputPlugin ;
1220import org .embulk .output .jdbc .BatchInsert ;
21+ import org .embulk .output .jdbc .JdbcColumn ;
22+ import org .embulk .output .jdbc .JdbcColumnOption ;
1323import org .embulk .output .jdbc .JdbcOutputConnection ;
1424import org .embulk .output .jdbc .JdbcOutputConnector ;
25+ import org .embulk .output .jdbc .JdbcSchema ;
1526import org .embulk .output .jdbc .MergeConfig ;
27+ import org .embulk .output .jdbc .TableIdentifier ;
1628import org .embulk .output .snowflake .SnowflakeCopyBatchInsert ;
1729import org .embulk .output .snowflake .SnowflakeOutputConnection ;
1830import org .embulk .output .snowflake .SnowflakeOutputConnector ;
1931import org .embulk .output .snowflake .StageIdentifier ;
2032import org .embulk .output .snowflake .StageIdentifierHolder ;
33+ import org .embulk .spi .Column ;
2134import org .embulk .spi .OutputPlugin ;
2235import org .embulk .spi .Schema ;
2336import org .embulk .util .config .Config ;
2437import org .embulk .util .config .ConfigDefault ;
38+ import org .embulk .util .retryhelper .RetryExecutor ;
39+ import org .embulk .util .retryhelper .RetryGiveupException ;
40+ import org .embulk .util .retryhelper .Retryable ;
2541
2642public class SnowflakeOutputPlugin extends AbstractJdbcOutputPlugin {
2743 private StageIdentifier stageIdentifier ;
@@ -121,7 +137,143 @@ protected void doCommit(JdbcOutputConnection con, PluginTask task, int taskCount
121137 protected void doBegin (
122138 JdbcOutputConnection con , PluginTask task , final Schema schema , int taskCount )
123139 throws SQLException {
124- super .doBegin (con , task , schema , taskCount );
140+ if (schema .getColumnCount () == 0 ) {
141+ throw new ConfigException ("No column." );
142+ }
143+
144+ Mode mode = task .getMode ();
145+ logger .info ("Using {} mode" , mode );
146+
147+ if (mode .commitBySwapTable () && task .getBeforeLoad ().isPresent ()) {
148+ throw new ConfigException (
149+ String .format ("%s mode does not support 'before_load' option." , mode ));
150+ }
151+
152+ String actualTable ;
153+ if (con .tableExists (task .getTable ())) {
154+ actualTable = task .getTable ();
155+ } else {
156+ String upperTable = task .getTable ().toUpperCase ();
157+ String lowerTable = task .getTable ().toLowerCase ();
158+ if (con .tableExists (upperTable )) {
159+ if (con .tableExists (lowerTable )) {
160+ throw new ConfigException (
161+ String .format (
162+ "Cannot specify table '%s' because both '%s' and '%s' exist." ,
163+ task .getTable (), upperTable , lowerTable ));
164+ } else {
165+ actualTable = upperTable ;
166+ }
167+ } else {
168+ if (con .tableExists (lowerTable )) {
169+ actualTable = lowerTable ;
170+ } else {
171+ actualTable = task .getTable ();
172+ }
173+ }
174+ }
175+ // need to get database name
176+ SnowflakePluginTask sfTask = (SnowflakePluginTask ) task ;
177+
178+ task .setActualTable (new TableIdentifier (sfTask .getDatabase (), sfTask .getSchema (), actualTable ));
179+
180+ Optional <JdbcSchema > initialTargetTableSchema =
181+ mode .ignoreTargetTableSchema ()
182+ ? Optional .<JdbcSchema >empty ()
183+ : newJdbcSchemaFromTableIfExists (con , task .getActualTable ());
184+
185+ // TODO get CREATE TABLE statement from task if set
186+ JdbcSchema newTableSchema =
187+ applyColumnOptionsToNewTableSchema (
188+ initialTargetTableSchema .orElseGet (
189+ new Supplier <JdbcSchema >() {
190+ public JdbcSchema get () {
191+ return newJdbcSchemaForNewTable (schema );
192+ }
193+ }),
194+ task .getColumnOptions ());
195+
196+ // create intermediate tables
197+ if (!mode .isDirectModify ()) {
198+ // create the intermediate tables here
199+ task .setIntermediateTables (
200+ Optional .<List <TableIdentifier >>of (
201+ createIntermediateTables (con , task , taskCount , newTableSchema )));
202+ } else {
203+ // direct modify mode doesn't need intermediate tables.
204+ task .setIntermediateTables (Optional .<List <TableIdentifier >>empty ());
205+ if (task .getBeforeLoad ().isPresent ()) {
206+ // executeSql is private need to cast
207+ SnowflakeOutputConnection sfCon = (SnowflakeOutputConnection ) con ;
208+ sfCon .executeSql (task .getBeforeLoad ().get ());
209+ }
210+ }
211+
212+ // build JdbcSchema from a table
213+ JdbcSchema targetTableSchema ;
214+ if (initialTargetTableSchema .isPresent ()) {
215+ targetTableSchema = initialTargetTableSchema .get ();
216+ task .setNewTableSchema (Optional .<JdbcSchema >empty ());
217+ } else if (task .getIntermediateTables ().isPresent ()
218+ && !task .getIntermediateTables ().get ().isEmpty ()) {
219+ TableIdentifier firstItermTable = task .getIntermediateTables ().get ().get (0 );
220+ targetTableSchema = newJdbcSchemaFromTableIfExists (con , firstItermTable ).get ();
221+ task .setNewTableSchema (Optional .of (newTableSchema ));
222+ } else {
223+ // also create the target table if not exists
224+ // CREATE TABLE IF NOT EXISTS xyz
225+ con .createTableIfNotExists (
226+ task .getActualTable (),
227+ newTableSchema ,
228+ task .getCreateTableConstraint (),
229+ task .getCreateTableOption ());
230+ targetTableSchema = newJdbcSchemaFromTableIfExists (con , task .getActualTable ()).get ();
231+ task .setNewTableSchema (Optional .<JdbcSchema >empty ());
232+ }
233+ task .setTargetTableSchema (matchSchemaByColumnNames (schema , targetTableSchema ));
234+
235+ // validate column_options
236+ newColumnSetters (
237+ newColumnSetterFactory (null , task .getDefaultTimeZone ()), // TODO create a dummy BatchInsert
238+ task .getTargetTableSchema (),
239+ schema ,
240+ task .getColumnOptions ());
241+
242+ // normalize merge_key parameter for merge modes
243+ if (mode .isMerge ()) {
244+ Optional <List <String >> mergeKeys = task .getMergeKeys ();
245+ if (task .getFeatures ().getIgnoreMergeKeys ()) {
246+ if (mergeKeys .isPresent ()) {
247+ throw new ConfigException ("This output type does not accept 'merge_key' option." );
248+ }
249+ task .setMergeKeys (Optional .<List <String >>of (Collections .emptyList ()));
250+ } else if (mergeKeys .isPresent ()) {
251+ if (task .getMergeKeys ().get ().isEmpty ()) {
252+ throw new ConfigException ("Empty 'merge_keys' option is invalid." );
253+ }
254+ for (String key : mergeKeys .get ()) {
255+ if (!targetTableSchema .findColumn (key ).isPresent ()) {
256+ throw new ConfigException (
257+ String .format ("Merge key '%s' does not exist in the target table." , key ));
258+ }
259+ }
260+ } else {
261+ final ArrayList <String > builder = new ArrayList <>();
262+ for (JdbcColumn column : targetTableSchema .getColumns ()) {
263+ if (column .isUniqueKey ()) {
264+ builder .add (column .getName ());
265+ }
266+ }
267+ task .setMergeKeys (Optional .<List <String >>of (Collections .unmodifiableList (builder )));
268+ if (task .getMergeKeys ().get ().isEmpty ()) {
269+ throw new ConfigException (
270+ "Merging mode is used but the target table does not have primary keys. Please set merge_keys option." );
271+ }
272+ }
273+ logger .info ("Using merge keys: {}" , task .getMergeKeys ().get ());
274+ } else {
275+ task .setMergeKeys (Optional .<List <String >>empty ());
276+ }
125277 }
126278
127279 @ Override
@@ -143,4 +295,159 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
143295
144296 return new SnowflakeCopyBatchInsert (getConnector (task , true ), this .stageIdentifier , false );
145297 }
298+
299+ // borrow code from jdbc to fix TablIdentifer in doBegin
300+ private static JdbcSchema applyColumnOptionsToNewTableSchema (
301+ JdbcSchema schema , final Map <String , JdbcColumnOption > columnOptions ) {
302+ return new JdbcSchema (
303+ schema .getColumns ().stream ()
304+ .map (
305+ c -> {
306+ JdbcColumnOption option = columnOptionOf (columnOptions , c .getName ());
307+ if (option .getType ().isPresent ()) {
308+ return JdbcColumn .newTypeDeclaredColumn (
309+ c .getName (),
310+ Types .OTHER , // sqlType, isNotNull, and isUniqueKey are ignored
311+ option .getType ().get (),
312+ false ,
313+ false );
314+ }
315+ return c ;
316+ })
317+ .collect (Collectors .toList ()));
318+ }
319+
320+ // borrow code from jdbc to fix TablIdentifer in doBegin
321+ private static JdbcColumnOption columnOptionOf (
322+ Map <String , JdbcColumnOption > columnOptions , String columnName ) {
323+ return Optional .ofNullable (columnOptions .get (columnName ))
324+ .orElseGet (
325+ // default column option
326+ new Supplier <JdbcColumnOption >() {
327+ public JdbcColumnOption get () {
328+ return CONFIG_MAPPER .map (
329+ CONFIG_MAPPER_FACTORY .newConfigSource (), JdbcColumnOption .class );
330+ }
331+ });
332+ }
333+
334+ // borrow code from jdbc to fix TablIdentifer in doBegin
335+ private List <TableIdentifier > createIntermediateTables (
336+ final JdbcOutputConnection con ,
337+ final PluginTask task ,
338+ final int taskCount ,
339+ final JdbcSchema newTableSchema )
340+ throws SQLException {
341+ try {
342+ return buildRetryExecutor (task )
343+ .run (
344+ new Retryable <List <TableIdentifier >>() {
345+ private TableIdentifier table ;
346+ private ArrayList <TableIdentifier > intermTables ;
347+
348+ @ Override
349+ public List <TableIdentifier > call () throws Exception {
350+ intermTables = new ArrayList <>();
351+ if (task .getMode ().tempTablePerTask ()) {
352+ String tableNameFormat =
353+ generateIntermediateTableNameFormat (
354+ task .getActualTable ().getTableName (),
355+ con ,
356+ taskCount ,
357+ task .getFeatures ().getMaxTableNameLength (),
358+ task .getFeatures ().getTableNameLengthSemantics ());
359+ for (int taskIndex = 0 ; taskIndex < taskCount ; taskIndex ++) {
360+ String tableName = String .format (tableNameFormat , taskIndex );
361+ table = buildIntermediateTableId (con , task , tableName );
362+ // if table already exists, SQLException will be thrown
363+ con .createTable (
364+ table ,
365+ newTableSchema ,
366+ task .getCreateTableConstraint (),
367+ task .getCreateTableOption ());
368+ intermTables .add (table );
369+ }
370+ } else {
371+ String tableName =
372+ generateIntermediateTableNamePrefix (
373+ task .getActualTable ().getTableName (),
374+ con ,
375+ 0 ,
376+ task .getFeatures ().getMaxTableNameLength (),
377+ task .getFeatures ().getTableNameLengthSemantics ());
378+ table = buildIntermediateTableId (con , task , tableName );
379+ con .createTable (
380+ table ,
381+ newTableSchema ,
382+ task .getCreateTableConstraint (),
383+ task .getCreateTableOption ());
384+ intermTables .add (table );
385+ }
386+ return Collections .unmodifiableList (intermTables );
387+ }
388+
389+ @ Override
390+ public boolean isRetryableException (Exception exception ) {
391+ if (exception instanceof SQLException ) {
392+ try {
393+ // true means that creating table failed because the table already exists.
394+ return con .tableExists (table );
395+ } catch (SQLException e ) {
396+ }
397+ }
398+ return false ;
399+ }
400+
401+ @ Override
402+ public void onRetry (
403+ Exception exception , int retryCount , int retryLimit , int retryWait )
404+ throws RetryGiveupException {
405+ logger .info ("Try to create intermediate tables again because already exist" );
406+ try {
407+ dropTables ();
408+ } catch (SQLException e ) {
409+ throw new RetryGiveupException (e );
410+ }
411+ }
412+
413+ @ Override
414+ public void onGiveup (Exception firstException , Exception lastException )
415+ throws RetryGiveupException {
416+ try {
417+ dropTables ();
418+ } catch (SQLException e ) {
419+ logger .warn ("Cannot delete intermediate table" , e );
420+ }
421+ }
422+
423+ private void dropTables () throws SQLException {
424+ for (TableIdentifier table : intermTables ) {
425+ con .dropTableIfExists (table );
426+ }
427+ }
428+ });
429+ } catch (RetryGiveupException e ) {
430+ throw new RuntimeException (e );
431+ }
432+ }
433+
434+ // borrow code from jdbc to fix TablIdentifer in doBegin
435+ private static RetryExecutor buildRetryExecutor (PluginTask task ) {
436+ return RetryExecutor .retryExecutor ()
437+ .withRetryLimit (task .getRetryLimit ())
438+ .withInitialRetryWait (task .getRetryWait ())
439+ .withMaxRetryWait (task .getMaxRetryWait ());
440+ }
441+
442+ // borrow code from jdbc to fix TablIdentifer in doBegin
443+ private JdbcSchema matchSchemaByColumnNames (Schema inputSchema , JdbcSchema targetTableSchema ) {
444+ final ArrayList <JdbcColumn > jdbcColumns = new ArrayList <>();
445+
446+ for (Column column : inputSchema .getColumns ()) {
447+ Optional <JdbcColumn > c = targetTableSchema .findColumn (column .getName ());
448+ jdbcColumns .add (c .orElse (JdbcColumn .skipColumn ()));
449+ }
450+
451+ return new JdbcSchema (Collections .unmodifiableList (jdbcColumns ));
452+ }
146453}
0 commit comments