Skip to content

Commit fe09733

Browse files
authored
Revert "Add file upload retry handling"
1 parent 10935c0 commit fe09733

File tree

5 files changed

+8
-51
lines changed

5 files changed

+8
-51
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ dependencies {
5151
testImplementation "org.embulk:embulk-parser-csv:0.10.31"
5252

5353
compile "org.embulk:embulk-output-jdbc:0.10.2"
54-
compile "net.snowflake:snowflake-jdbc:3.13.26"
54+
compile "net.snowflake:snowflake-jdbc:3.13.14"
5555
}
5656
embulkPlugin {
5757
mainClass = "org.embulk.output.SnowflakeOutputPlugin"

gradle/dependency-locks/embulkPluginRuntime.lockfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ com.fasterxml.jackson.core:jackson-core:2.6.7
66
com.fasterxml.jackson.core:jackson-databind:2.6.7
77
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7
88
javax.validation:validation-api:1.1.0.Final
9-
net.snowflake:snowflake-jdbc:3.13.26
9+
net.snowflake:snowflake-jdbc:3.13.14
1010
org.embulk:embulk-output-jdbc:0.10.2
1111
org.embulk:embulk-util-config:0.3.0
1212
org.embulk:embulk-util-json:0.1.1

makefile

Lines changed: 0 additions & 9 deletions
This file was deleted.

src/main/java/org/embulk/output/SnowflakeOutputPlugin.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,6 @@ public interface SnowflakePluginTask extends PluginTask {
5151
@Config("delete_stage")
5252
@ConfigDefault("false")
5353
public boolean getDeleteStage();
54-
55-
@Config("max_upload_retries")
56-
@ConfigDefault("3")
57-
public int getMaxUploadRetries();
5854
}
5955

6056
@Override
@@ -149,11 +145,7 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
149145
snowflakeCon.runCreateStage(this.stageIdentifier);
150146
}
151147

152-
return new SnowflakeCopyBatchInsert(
153-
getConnector(task, true),
154-
this.stageIdentifier,
155-
false,
156-
((SnowflakePluginTask) task).getMaxUploadRetries());
148+
return new SnowflakeCopyBatchInsert(getConnector(task, true), this.stageIdentifier, false);
157149
}
158150

159151
@Override

src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
2626
protected static final String nullString = "\\N";
2727
protected static final String newLineString = "\n";
2828
protected static final String delimiterString = "\t";
29-
private final int maxUploadRetries;
3029

3130
private SnowflakeOutputConnection connection = null;
3231
private TableIdentifier tableIdentifier = null;
@@ -40,10 +39,7 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
4039
private List<Future<Void>> uploadAndCopyFutures;
4140

4241
public SnowflakeCopyBatchInsert(
43-
JdbcOutputConnector connector,
44-
StageIdentifier stageIdentifier,
45-
boolean deleteStageFile,
46-
int maxUploadRetries)
42+
JdbcOutputConnector connector, StageIdentifier stageIdentifier, boolean deleteStageFile)
4743
throws IOException {
4844
this.index = 0;
4945
openNewFile();
@@ -52,7 +48,6 @@ public SnowflakeCopyBatchInsert(
5248
this.executorService = Executors.newCachedThreadPool();
5349
this.deleteStageFile = deleteStageFile;
5450
this.uploadAndCopyFutures = new ArrayList();
55-
this.maxUploadRetries = maxUploadRetries;
5651
}
5752

5853
@Override
@@ -256,7 +251,7 @@ public void flush() throws IOException, SQLException {
256251
String snowflakeStageFileName = "embulk_snowflake_" + SnowflakeUtils.randomString(8);
257252

258253
UploadTask uploadTask =
259-
new UploadTask(file, batchRows, stageIdentifier, snowflakeStageFileName, maxUploadRetries);
254+
new UploadTask(file, batchRows, stageIdentifier, snowflakeStageFileName);
260255
Future<Void> uploadFuture = executorService.submit(uploadTask);
261256
uploadAndCopyFutures.add(uploadFuture);
262257

@@ -335,49 +330,28 @@ private class UploadTask implements Callable<Void> {
335330
private final int batchRows;
336331
private final String snowflakeStageFileName;
337332
private final StageIdentifier stageIdentifier;
338-
private final int maxUploadRetries;
339333

340334
public UploadTask(
341-
File file,
342-
int batchRows,
343-
StageIdentifier stageIdentifier,
344-
String snowflakeStageFileName,
345-
int maxUploadRetries) {
335+
File file, int batchRows, StageIdentifier stageIdentifier, String snowflakeStageFileName) {
346336
this.file = file;
347337
this.batchRows = batchRows;
348338
this.snowflakeStageFileName = snowflakeStageFileName;
349339
this.stageIdentifier = stageIdentifier;
350-
this.maxUploadRetries = maxUploadRetries;
351340
}
352341

353-
public Void call() throws IOException, SQLException, InterruptedException {
342+
public Void call() throws IOException, SQLException {
354343
logger.info(
355344
String.format(
356345
"Uploading file id %s to Snowflake (%,d bytes %,d rows)",
357346
snowflakeStageFileName, file.length(), batchRows));
358347

359-
int retries = 0;
360348
try {
361349
long startTime = System.currentTimeMillis();
362350
// put file to snowflake internal storage
363351
SnowflakeOutputConnection con = (SnowflakeOutputConnection) connector.connect(true);
364352

365353
FileInputStream fileInputStream = new FileInputStream(file);
366-
do {
367-
try {
368-
con.runUploadFile(stageIdentifier, snowflakeStageFileName, fileInputStream);
369-
} catch (SQLException e) {
370-
retries++;
371-
if (retries > this.maxUploadRetries) {
372-
throw e;
373-
}
374-
logger.warn(
375-
String.format(
376-
"Upload error %s file %s retries: %d", e, snowflakeStageFileName, retries));
377-
Thread.sleep(retries * retries * 1000);
378-
}
379-
break;
380-
} while (retries < this.maxUploadRetries);
354+
con.runUploadFile(stageIdentifier, snowflakeStageFileName, fileInputStream);
381355

382356
double seconds = (System.currentTimeMillis() - startTime) / 1000.0;
383357

0 commit comments

Comments
 (0)