Skip to content

Commit 1fa7ad0

Browse files
authored
Merge pull request #61 from trocco-io/feat/EMPTY_FIELD_AS_NULL
empty_field_as_null
2 parents b1e562c + 421ca58 commit 1fa7ad0

File tree

3 files changed

+33
-9
lines changed

3 files changed

+33
-9
lines changed

src/main/java/org/embulk/output/SnowflakeOutputPlugin.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ public interface SnowflakePluginTask extends PluginTask {
5555
@Config("max_upload_retries")
5656
@ConfigDefault("3")
5757
public int getMaxUploadRetries();
58+
59+
@Config("empty_field_as_null")
60+
@ConfigDefault("true")
61+
public boolean getEmtpyFieldAsNull();
5862
}
5963

6064
@Override
@@ -153,12 +157,14 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
153157
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
154158
snowflakeCon.runCreateStage(this.stageIdentifier);
155159
}
160+
SnowflakePluginTask pluginTask = (SnowflakePluginTask) task;
156161

157162
return new SnowflakeCopyBatchInsert(
158163
getConnector(task, true),
159164
this.stageIdentifier,
160165
false,
161-
((SnowflakePluginTask) task).getMaxUploadRetries());
166+
pluginTask.getMaxUploadRetries(),
167+
pluginTask.getEmtpyFieldAsNull());
162168
}
163169

164170
@Override

src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
3838
private long totalRows;
3939
private int fileCount;
4040
private List<Future<Void>> uploadAndCopyFutures;
41+
private boolean emptyFieldAsNull;
4142

4243
public SnowflakeCopyBatchInsert(
4344
JdbcOutputConnector connector,
4445
StageIdentifier stageIdentifier,
4546
boolean deleteStageFile,
46-
int maxUploadRetries)
47+
int maxUploadRetries,
48+
boolean emptyFieldAsNull)
4749
throws IOException {
4850
this.index = 0;
4951
openNewFile();
@@ -53,6 +55,7 @@ public SnowflakeCopyBatchInsert(
5355
this.deleteStageFile = deleteStageFile;
5456
this.uploadAndCopyFutures = new ArrayList();
5557
this.maxUploadRetries = maxUploadRetries;
58+
this.emptyFieldAsNull = emptyFieldAsNull;
5659
}
5760

5861
@Override
@@ -260,7 +263,7 @@ public void flush() throws IOException, SQLException {
260263
Future<Void> uploadFuture = executorService.submit(uploadTask);
261264
uploadAndCopyFutures.add(uploadFuture);
262265

263-
CopyTask copyTask = new CopyTask(uploadFuture, snowflakeStageFileName);
266+
CopyTask copyTask = new CopyTask(uploadFuture, snowflakeStageFileName, emptyFieldAsNull);
264267
uploadAndCopyFutures.add(executorService.submit(copyTask));
265268

266269
fileCount++;
@@ -393,10 +396,13 @@ public Void call() throws IOException, SQLException, InterruptedException {
393396
private class CopyTask implements Callable<Void> {
394397
private final Future<Void> uploadFuture;
395398
private final String snowflakeStageFileName;
399+
private final boolean emptyFieldAsNull;
396400

397-
public CopyTask(Future<Void> uploadFuture, String snowflakeStageFileName) {
401+
public CopyTask(
402+
Future<Void> uploadFuture, String snowflakeStageFileName, boolean emptyFieldAsNull) {
398403
this.uploadFuture = uploadFuture;
399404
this.snowflakeStageFileName = snowflakeStageFileName;
405+
this.emptyFieldAsNull = emptyFieldAsNull;
400406
}
401407

402408
public Void call() throws SQLException, InterruptedException, ExecutionException {
@@ -408,7 +414,12 @@ public Void call() throws SQLException, InterruptedException, ExecutionException
408414
logger.info("Running COPY from file {}", snowflakeStageFileName);
409415

410416
long startTime = System.currentTimeMillis();
411-
con.runCopy(tableIdentifier, stageIdentifier, snowflakeStageFileName, delimiterString);
417+
con.runCopy(
418+
tableIdentifier,
419+
stageIdentifier,
420+
snowflakeStageFileName,
421+
delimiterString,
422+
emptyFieldAsNull);
412423

413424
double seconds = (System.currentTimeMillis() - startTime) / 1000.0;
414425

src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ public void runCopy(
2121
TableIdentifier tableIdentifier,
2222
StageIdentifier stageIdentifier,
2323
String filename,
24-
String delimiterString)
24+
String delimiterString,
25+
boolean emptyFieldAsNull)
2526
throws SQLException {
26-
String sql = buildCopySQL(tableIdentifier, stageIdentifier, filename, delimiterString);
27+
String sql =
28+
buildCopySQL(tableIdentifier, stageIdentifier, filename, delimiterString, emptyFieldAsNull);
2729
runUpdate(sql);
2830
}
2931

@@ -171,15 +173,20 @@ protected String buildCopySQL(
171173
TableIdentifier tableIdentifier,
172174
StageIdentifier stageIdentifier,
173175
String snowflakeStageFileName,
174-
String delimiterString) {
176+
String delimiterString,
177+
boolean emptyFieldAsNull) {
175178
StringBuilder sb = new StringBuilder();
176179
sb.append("COPY INTO ");
177180
quoteTableIdentifier(sb, tableIdentifier);
178181
sb.append(" FROM ");
179182
quoteInternalStoragePath(sb, stageIdentifier, snowflakeStageFileName);
180183
sb.append(" FILE_FORMAT = ( TYPE = CSV FIELD_DELIMITER = '");
181184
sb.append(delimiterString);
182-
sb.append("');");
185+
sb.append("') ");
186+
if (!emptyFieldAsNull) {
187+
sb.append("EMPTY_FIELD_AS_NULL = FALSE");
188+
}
189+
sb.append(";");
183190
return sb.toString();
184191
}
185192

0 commit comments

Comments
 (0)