Skip to content

Commit 3db5932

Browse files
author
Dai MIKURUBE
committed
Make CsvTokenizer independent from LineDecoder
1 parent 5206fe0 commit 3db5932

File tree

4 files changed

+44
-50
lines changed

4 files changed

+44
-50
lines changed

embulk-guess-csv/src/main/java/org/embulk/guess/csv/CsvGuessPlugin.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -307,28 +307,26 @@ private static List<List<String>> splitLines(
307307
final CsvTokenizer tokenizer = buildCsvTokenizer(parserTask, sample);
308308

309309
final ArrayList<List<String>> rows = new ArrayList<>();
310-
while (tokenizer.nextFile()) {
311-
while (tokenizer.nextRecord(skipEmptyLines)) {
312-
try {
313-
final ArrayList<String> columns = new ArrayList<>();
314-
while (true) {
315-
try {
316-
final String column = tokenizer.nextColumn();
317-
final boolean quoted = tokenizer.wasQuotedColumn();
318-
if (nullString != null && !quoted && nullString.equals(column)) {
319-
columns.add(null);
320-
} else {
321-
columns.add(column);
322-
}
323-
} catch (final TooFewColumnsException ex) {
324-
rows.add(Collections.unmodifiableList(columns));
325-
break;
310+
while (tokenizer.nextRecord(skipEmptyLines)) {
311+
try {
312+
final ArrayList<String> columns = new ArrayList<>();
313+
while (true) {
314+
try {
315+
final String column = tokenizer.nextColumn();
316+
final boolean quoted = tokenizer.wasQuotedColumn();
317+
if (nullString != null && !quoted && nullString.equals(column)) {
318+
columns.add(null);
319+
} else {
320+
columns.add(column);
326321
}
322+
} catch (final TooFewColumnsException ex) {
323+
rows.add(Collections.unmodifiableList(columns));
324+
break;
327325
}
328-
} catch (final InvalidValueException ex) {
329-
// TODO warning
330-
tokenizer.skipCurrentLine();
331326
}
327+
} catch (final InvalidValueException ex) {
328+
// TODO warning
329+
tokenizer.skipCurrentLine();
332330
}
333331
}
334332
return Collections.unmodifiableList(rows);
@@ -366,7 +364,8 @@ private static CsvTokenizer buildCsvTokenizer(final CsvParserPlugin.PluginTask p
366364
final LineDecoder decoder = LineDecoder.of(
367365
new ListFileInput(listListBuffer), parserTask.getCharset(), parserTask.getLineDelimiterRecognized().orElse(null));
368366

369-
return builder.build(decoder);
367+
decoder.nextFile();
368+
return builder.build(decoder.iterator());
370369
}
371370

372371
private String guessDelimiter(final List<String> sampleLines) {

embulk-parser-csv/src/main/java/org/embulk/parser/csv/CsvParserPlugin.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,14 +314,17 @@ public void run(TaskSource taskSource, final Schema schema,
314314
final PluginTask task = CONFIG_MAPPER_FACTORY.createTaskMapper().map(taskSource, PluginTask.class);
315315
final TimestampFormatter[] timestampFormatters = newTimestampColumnFormatters(task, task.getSchemaConfig());
316316
final JsonParser jsonParser = new JsonParser();
317-
final CsvTokenizer tokenizer = buildCsvTokenizer(task, input);
317+
final CsvTokenizer.Builder tokenizerBuilder = buildCsvTokenizerBuilder(task);
318318
final boolean allowOptionalColumns = task.getAllowOptionalColumns();
319319
final boolean allowExtraColumns = task.getAllowExtraColumns();
320320
final boolean stopOnInvalidRecord = task.getStopOnInvalidRecord();
321321
final int skipHeaderLines = task.getSkipHeaderLines();
322322

323323
try (final PageBuilder pageBuilder = getPageBuilder(Exec.getBufferAllocator(), schema, output)) {
324-
while (tokenizer.nextFile()) {
324+
while (input.nextFile()) {
325+
final CsvTokenizer tokenizer = tokenizerBuilder.build(
326+
LineDecoder.of(input, task.getCharset(), task.getLineDelimiterRecognized().orElse(null)).iterator());
327+
325328
final String fileName = input.hintOfCurrentInputFileNameForLogging().orElse("-");
326329

327330
// skip the header lines for each file
@@ -468,7 +471,7 @@ static class CsvRecordValidateException extends DataException {
468471
}
469472
}
470473

471-
private static CsvTokenizer buildCsvTokenizer(final PluginTask task, final FileInput input) {
474+
private static CsvTokenizer.Builder buildCsvTokenizerBuilder(final PluginTask task) {
472475
final CsvTokenizer.Builder builder = CsvTokenizer.builder(task.getDelimiter());
473476
task.getQuoteChar().ifPresent(q -> builder.setQuote(q.getCharacter()));
474477
task.getEscapeChar().ifPresent(e -> builder.setEscape(e.getCharacter()));
@@ -482,7 +485,7 @@ private static CsvTokenizer buildCsvTokenizer(final PluginTask task, final FileI
482485
builder.setMaxQuotedFieldLength(task.getMaxQuotedSizeLimit());
483486
task.getCommentLineMarker().ifPresent(m -> builder.setCommentLineMarker(m));
484487
task.getNullString().ifPresent(n -> builder.setNullString(n));
485-
return builder.build(LineDecoder.of(input, task.getCharset(), task.getLineDelimiterRecognized().orElse(null)));
488+
return builder;
486489
}
487490

488491
@SuppressWarnings("deprecation") // For the use of new PageBuilder().

embulk-parser-csv/src/main/java/org/embulk/parser/csv/CsvTokenizer.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
import java.util.ArrayDeque;
2020
import java.util.ArrayList;
2121
import java.util.Deque;
22+
import java.util.Iterator;
2223
import java.util.List;
2324
import java.util.stream.Collectors;
24-
import org.embulk.util.text.LineDecoder;
2525

2626
public class CsvTokenizer {
2727
private CsvTokenizer(
28-
final LineDecoder input,
28+
final Iterator<String> iterator,
2929
final char delimiterChar,
3030
final String delimiterFollowingString,
3131
final char quote,
@@ -49,7 +49,7 @@ private CsvTokenizer(
4949

5050
this.quotedValueLines = new ArrayList<>();
5151
this.unreadLines = new ArrayDeque<>();
52-
this.input = input;
52+
this.iterator = iterator;
5353

5454
this.recordState = RecordState.END; // initial state is end of a record. nextRecord() must be called first
5555
this.lineNumber = 0;
@@ -142,15 +142,15 @@ public Builder setNullString(final String nullString) {
142142
return this;
143143
}
144144

145-
public CsvTokenizer build(final LineDecoder input) {
145+
public CsvTokenizer build(final Iterator<String> iterator) {
146146
if (this.trimIfNotQuoted && this.quotesInQuotedFields != QuotesInQuotedFields.ACCEPT_ONLY_RFC4180_ESCAPED) {
147147
// The combination makes some syntax very ambiguous such as:
148148
// val1, \"\"val2\"\" ,val3
149149
throw new IllegalStateException(
150150
"[quotes_in_quoted_fields != ACCEPT_ONLY_RFC4180_ESCAPED] is not allowed to specify with [trim_if_not_quoted = true]");
151151
}
152152
return new CsvTokenizer(
153-
input,
153+
iterator,
154154
delimiterChar,
155155
delimiterFollowingString,
156156
quote,
@@ -185,11 +185,12 @@ public long getCurrentLineNumber() {
185185
}
186186

187187
public boolean skipHeaderLine() {
188-
final boolean skipped = this.input.poll() != null;
189-
if (skipped) {
190-
this.lineNumber++;
188+
if (!this.iterator.hasNext()) {
189+
return false;
191190
}
192-
return skipped;
191+
this.iterator.next();
192+
this.lineNumber++;
193+
return true;
193194
}
194195

195196
// returns skipped line
@@ -212,14 +213,6 @@ public String skipCurrentLine() {
212213
return skippedLine;
213214
}
214215

215-
public boolean nextFile() {
216-
final boolean next = this.input.nextFile();
217-
if (next) {
218-
this.lineNumber = 0;
219-
}
220-
return next;
221-
}
222-
223216
// used by guess-csv
224217
public boolean nextRecord() {
225218
return this.nextRecord(true);
@@ -245,10 +238,10 @@ private boolean nextLine(final boolean skipEmptyLine) {
245238
if (!this.unreadLines.isEmpty()) {
246239
this.line = this.unreadLines.removeFirst();
247240
} else {
248-
this.line = this.input.poll();
249-
if (this.line == null) {
241+
if (!this.iterator.hasNext()) {
250242
return false;
251243
}
244+
this.line = this.iterator.next();
252245
}
253246
this.linePos = 0;
254247
this.lineNumber++;
@@ -625,6 +618,8 @@ private static String escapeControl(final String from) {
625618

626619
private static final char END_OF_LINE = '\0';
627620

621+
private final Iterator<String> iterator;
622+
628623
private final char delimiterChar;
629624
private final String delimiterFollowingString;
630625
private final char quote;
@@ -634,7 +629,6 @@ private static String escapeControl(final String from) {
634629
private final QuotesInQuotedFields quotesInQuotedFields;
635630
private final long maxQuotedFieldLength;
636631
private final String commentLineMarker;
637-
private final LineDecoder input;
638632
private final String nullString;
639633

640634
private final List<String> quotedValueLines;

embulk-parser-csv/src/test/java/org/embulk/parser/csv/TestCsvTokenizer.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,8 @@ private static List<List<String>> parse(final CsvTokenizer.Builder builder, fina
6161

6262
private static List<List<String>> parse(final CsvTokenizer.Builder builder, final int columns, final FileInput input) {
6363
LineDecoder decoder = LineDecoder.of(input, StandardCharsets.UTF_8, null);
64-
final CsvTokenizer tokenizer = builder.build(decoder);
65-
66-
tokenizer.nextFile();
64+
decoder.nextFile();
65+
final CsvTokenizer tokenizer = builder.build(decoder.iterator());
6766

6867
List<List<String>> records = new ArrayList<>();
6968
while (tokenizer.nextRecord()) {
@@ -376,9 +375,8 @@ public void recoverFromQuotedSizeLimitExceededException() throws Exception {
376375
};
377376
final FileInput input = newFileInputFromLines("\n", lines);
378377
final LineDecoder decoder = LineDecoder.of(input, StandardCharsets.UTF_8, null);
379-
final CsvTokenizer tokenizer = builder.build(decoder);
380-
381-
tokenizer.nextFile();
378+
decoder.nextFile();
379+
final CsvTokenizer tokenizer = builder.build(decoder.iterator());
382380

383381
assertTrue(tokenizer.nextRecord());
384382
assertEquals("v1", tokenizer.nextColumn());

0 commit comments

Comments
 (0)