@@ -26,6 +26,7 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
2626 protected static final String nullString = "\\ N" ;
2727 protected static final String newLineString = "\n " ;
2828 protected static final String delimiterString = "\t " ;
29+ private final int maxUploadRetries ;
2930
3031 private SnowflakeOutputConnection connection = null ;
3132 private TableIdentifier tableIdentifier = null ;
@@ -39,7 +40,10 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
3940 private List <Future <Void >> uploadAndCopyFutures ;
4041
4142 public SnowflakeCopyBatchInsert (
42- JdbcOutputConnector connector , StageIdentifier stageIdentifier , boolean deleteStageFile )
43+ JdbcOutputConnector connector ,
44+ StageIdentifier stageIdentifier ,
45+ boolean deleteStageFile ,
46+ int maxUploadRetries )
4347 throws IOException {
4448 this .index = 0 ;
4549 openNewFile ();
@@ -48,6 +52,7 @@ public SnowflakeCopyBatchInsert(
4852 this .executorService = Executors .newCachedThreadPool ();
4953 this .deleteStageFile = deleteStageFile ;
5054 this .uploadAndCopyFutures = new ArrayList ();
55+ this .maxUploadRetries = maxUploadRetries ;
5156 }
5257
5358 @ Override
@@ -251,7 +256,7 @@ public void flush() throws IOException, SQLException {
251256 String snowflakeStageFileName = "embulk_snowflake_" + SnowflakeUtils .randomString (8 );
252257
253258 UploadTask uploadTask =
254- new UploadTask (file , batchRows , stageIdentifier , snowflakeStageFileName );
259+ new UploadTask (file , batchRows , stageIdentifier , snowflakeStageFileName , maxUploadRetries );
255260 Future <Void > uploadFuture = executorService .submit (uploadTask );
256261 uploadAndCopyFutures .add (uploadFuture );
257262
@@ -330,28 +335,49 @@ private class UploadTask implements Callable<Void> {
330335 private final int batchRows ;
331336 private final String snowflakeStageFileName ;
332337 private final StageIdentifier stageIdentifier ;
338+ private final int maxUploadRetries ;
333339
334340 public UploadTask (
335- File file , int batchRows , StageIdentifier stageIdentifier , String snowflakeStageFileName ) {
341+ File file ,
342+ int batchRows ,
343+ StageIdentifier stageIdentifier ,
344+ String snowflakeStageFileName ,
345+ int maxUploadRetries ) {
336346 this .file = file ;
337347 this .batchRows = batchRows ;
338348 this .snowflakeStageFileName = snowflakeStageFileName ;
339349 this .stageIdentifier = stageIdentifier ;
350+ this .maxUploadRetries = maxUploadRetries ;
340351 }
341352
342- public Void call () throws IOException , SQLException {
353+ public Void call () throws IOException , SQLException , InterruptedException {
343354 logger .info (
344355 String .format (
345356 "Uploading file id %s to Snowflake (%,d bytes %,d rows)" ,
346357 snowflakeStageFileName , file .length (), batchRows ));
347358
359+ int retries = 0 ;
348360 try {
349361 long startTime = System .currentTimeMillis ();
350362 // put file to snowflake internal storage
351363 SnowflakeOutputConnection con = (SnowflakeOutputConnection ) connector .connect (true );
352364
353365 FileInputStream fileInputStream = new FileInputStream (file );
354- con .runUploadFile (stageIdentifier , snowflakeStageFileName , fileInputStream );
366+ do {
367+ try {
368+ con .runUploadFile (stageIdentifier , snowflakeStageFileName , fileInputStream );
369+ } catch (SQLException e ) {
370+ retries ++;
371+ if (retries > this .maxUploadRetries ) {
372+ throw e ;
373+ }
374+ logger .warn (
375+ String .format (
376+ "Upload error %s file %s retries: %d" , e , snowflakeStageFileName , retries ));
377+ Thread .sleep (retries * retries * 1000 );
378+ }
379+ break ;
380+ } while (retries < this .maxUploadRetries );
355381
356382 double seconds = (System .currentTimeMillis () - startTime ) / 1000.0 ;
357383
0 commit comments