@@ -26,7 +26,6 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
2626 protected static final String nullString = "\\ N" ;
2727 protected static final String newLineString = "\n " ;
2828 protected static final String delimiterString = "\t " ;
29- private final int maxUploadRetries ;
3029
3130 private SnowflakeOutputConnection connection = null ;
3231 private TableIdentifier tableIdentifier = null ;
@@ -40,10 +39,7 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
4039 private List <Future <Void >> uploadAndCopyFutures ;
4140
4241 public SnowflakeCopyBatchInsert (
43- JdbcOutputConnector connector ,
44- StageIdentifier stageIdentifier ,
45- boolean deleteStageFile ,
46- int maxUploadRetries )
42+ JdbcOutputConnector connector , StageIdentifier stageIdentifier , boolean deleteStageFile )
4743 throws IOException {
4844 this .index = 0 ;
4945 openNewFile ();
@@ -52,7 +48,6 @@ public SnowflakeCopyBatchInsert(
5248 this .executorService = Executors .newCachedThreadPool ();
5349 this .deleteStageFile = deleteStageFile ;
5450 this .uploadAndCopyFutures = new ArrayList ();
55- this .maxUploadRetries = maxUploadRetries ;
5651 }
5752
5853 @ Override
@@ -256,7 +251,7 @@ public void flush() throws IOException, SQLException {
256251 String snowflakeStageFileName = "embulk_snowflake_" + SnowflakeUtils .randomString (8 );
257252
258253 UploadTask uploadTask =
259- new UploadTask (file , batchRows , stageIdentifier , snowflakeStageFileName , maxUploadRetries );
254+ new UploadTask (file , batchRows , stageIdentifier , snowflakeStageFileName );
260255 Future <Void > uploadFuture = executorService .submit (uploadTask );
261256 uploadAndCopyFutures .add (uploadFuture );
262257
@@ -335,49 +330,28 @@ private class UploadTask implements Callable<Void> {
335330 private final int batchRows ;
336331 private final String snowflakeStageFileName ;
337332 private final StageIdentifier stageIdentifier ;
338- private final int maxUploadRetries ;
339333
340334 public UploadTask (
341- File file ,
342- int batchRows ,
343- StageIdentifier stageIdentifier ,
344- String snowflakeStageFileName ,
345- int maxUploadRetries ) {
335+ File file , int batchRows , StageIdentifier stageIdentifier , String snowflakeStageFileName ) {
346336 this .file = file ;
347337 this .batchRows = batchRows ;
348338 this .snowflakeStageFileName = snowflakeStageFileName ;
349339 this .stageIdentifier = stageIdentifier ;
350- this .maxUploadRetries = maxUploadRetries ;
351340 }
352341
353- public Void call () throws IOException , SQLException , InterruptedException {
342+ public Void call () throws IOException , SQLException {
354343 logger .info (
355344 String .format (
356345 "Uploading file id %s to Snowflake (%,d bytes %,d rows)" ,
357346 snowflakeStageFileName , file .length (), batchRows ));
358347
359- int retries = 0 ;
360348 try {
361349 long startTime = System .currentTimeMillis ();
362350 // put file to snowflake internal storage
363351 SnowflakeOutputConnection con = (SnowflakeOutputConnection ) connector .connect (true );
364352
365353 FileInputStream fileInputStream = new FileInputStream (file );
366- do {
367- try {
368- con .runUploadFile (stageIdentifier , snowflakeStageFileName , fileInputStream );
369- } catch (SQLException e ) {
370- retries ++;
371- if (retries > this .maxUploadRetries ) {
372- throw e ;
373- }
374- logger .warn (
375- String .format (
376- "Upload error %s file %s retries: %d" , e , snowflakeStageFileName , retries ));
377- Thread .sleep (retries * retries * 1000 );
378- }
379- break ;
380- } while (retries < this .maxUploadRetries );
354+ con .runUploadFile (stageIdentifier , snowflakeStageFileName , fileInputStream );
381355
382356 double seconds = (System .currentTimeMillis () - startTime ) / 1000.0 ;
383357
0 commit comments