Skip to content

Commit 48d84c4

Browse files
committed
fix oom
1 parent 502ae1a commit 48d84c4

File tree

1 file changed

+26
-6
lines changed

1 file changed

+26
-6
lines changed

src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
3030
private SnowflakeOutputConnection connection = null;
3131
private TableIdentifier tableIdentifier = null;
3232
protected File currentFile;
33+
34+
private int batchWeight;
35+
3336
protected BufferedWriter writer;
3437
protected int index;
3538
protected int batchRows;
@@ -84,18 +87,14 @@ protected BufferedWriter openWriter(File newFile) throws IOException {
8487
}
8588

8689
public int getBatchWeight() {
87-
long fsize = currentFile.length();
88-
if (fsize > Integer.MAX_VALUE) {
89-
return Integer.MAX_VALUE;
90-
} else {
91-
return (int) fsize;
92-
}
90+
return batchWeight;
9391
}
9492

9593
public void add() throws IOException {
9694
writer.write(newLineString);
9795
batchRows++;
9896
index = 0;
97+
batchWeight += 32;
9998
}
10099

101100
private void appendDelimiter() throws IOException {
@@ -108,61 +107,73 @@ private void appendDelimiter() throws IOException {
108107
public void setNull(int sqlType) throws IOException {
109108
appendDelimiter();
110109
writer.write(nullString);
110+
nextColumn(0);
111111
}
112112

113113
public void setBoolean(boolean v) throws IOException {
114114
appendDelimiter();
115115
writer.write(String.valueOf(v));
116+
nextColumn(1);
116117
}
117118

118119
public void setByte(byte v) throws IOException {
119120
appendDelimiter();
120121
setEscapedString(String.valueOf(v));
122+
nextColumn(1);
121123
}
122124

123125
public void setShort(short v) throws IOException {
124126
appendDelimiter();
125127
writer.write(String.valueOf(v));
128+
nextColumn(2);
126129
}
127130

128131
public void setInt(int v) throws IOException {
129132
appendDelimiter();
130133
writer.write(String.valueOf(v));
134+
nextColumn(4);
131135
}
132136

133137
public void setLong(long v) throws IOException {
134138
appendDelimiter();
135139
writer.write(String.valueOf(v));
140+
nextColumn(8);
136141
}
137142

138143
public void setFloat(float v) throws IOException {
139144
appendDelimiter();
140145
writer.write(String.valueOf(v));
146+
nextColumn(4);
141147
}
142148

143149
public void setDouble(double v) throws IOException {
144150
appendDelimiter();
145151
writer.write(String.valueOf(v));
152+
nextColumn(8);
146153
}
147154

148155
public void setBigDecimal(BigDecimal v) throws IOException {
149156
appendDelimiter();
150157
writer.write(String.valueOf(v));
158+
nextColumn((v.precision() & ~2) / 2 + 8);
151159
}
152160

153161
public void setString(String v) throws IOException {
154162
appendDelimiter();
155163
setEscapedString(v);
164+
nextColumn(v.length() * 2 + 4);
156165
}
157166

158167
public void setNString(String v) throws IOException {
159168
appendDelimiter();
160169
setEscapedString(v);
170+
nextColumn(v.length() * 2 + 4);
161171
}
162172

163173
public void setBytes(byte[] v) throws IOException {
164174
appendDelimiter();
165175
setEscapedString(String.valueOf(v));
176+
nextColumn(v.length + 4);
166177
}
167178

168179
@Override
@@ -177,6 +188,7 @@ public void setSqlDate(final Instant v, final Calendar cal) throws IOException {
177188
cal.get(Calendar.MONTH) + 1,
178189
cal.get(Calendar.DAY_OF_MONTH));
179190
writer.write(f);
191+
nextColumn(32);
180192
}
181193

182194
@Override
@@ -192,6 +204,11 @@ public void setSqlTime(final Instant v, final Calendar cal) throws IOException {
192204
cal.get(Calendar.SECOND),
193205
v.getNano() / 1000);
194206
writer.write(f);
207+
nextColumn(32);
208+
}
209+
210+
private void nextColumn(int weight) {
211+
batchWeight += weight + 4; // add weight as overhead of each columns
195212
}
196213

197214
@Override
@@ -219,12 +236,14 @@ public void setSqlTimestamp(final Instant v, final Calendar cal) throws IOExcept
219236
v.getNano() / 1000,
220237
offset);
221238
writer.write(f);
239+
nextColumn(32);
222240
}
223241

224242
private void setEscapedString(String v) throws IOException {
225243
for (char c : v.toCharArray()) {
226244
writer.write(escape(c));
227245
}
246+
nextColumn(v.length() * 2 + 4);
228247
}
229248

230249
@Override
@@ -244,6 +263,7 @@ public void flush() throws IOException, SQLException {
244263
fileCount++;
245264
totalRows += batchRows;
246265
batchRows = 0;
266+
batchWeight = 0;
247267

248268
openNewFile();
249269
}

0 commit comments

Comments
 (0)