Skip to content

Commit 5206fe0

Browse files
author
Dai MIKURUBE
committed
Make CsvTokenizer independent from CsvParserPlugin
1 parent 0a83220 commit 5206fe0

File tree

4 files changed

+301
-139
lines changed

4 files changed

+301
-139
lines changed

embulk-guess-csv/src/main/java/org/embulk/guess/csv/CsvGuessPlugin.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -304,13 +304,7 @@ private static List<List<String>> splitLines(
304304
sample.setBytes(0, data, 0, data.length);
305305
sample.limit(data.length);
306306

307-
final ArrayList<Buffer> listBuffer = new ArrayList<>();
308-
listBuffer.add(sample);
309-
final ArrayList<ArrayList<Buffer>> listListBuffer = new ArrayList<>();
310-
listListBuffer.add(listBuffer);
311-
final LineDecoder decoder = LineDecoder.of(
312-
new ListFileInput(listListBuffer), parserTask.getCharset(), parserTask.getLineDelimiterRecognized().orElse(null));
313-
final CsvTokenizer tokenizer = new CsvTokenizer(decoder, parserTask);
307+
final CsvTokenizer tokenizer = buildCsvTokenizer(parserTask, sample);
314308

315309
final ArrayList<List<String>> rows = new ArrayList<>();
316310
while (tokenizer.nextFile()) {
@@ -349,6 +343,32 @@ private static List<List<String>> splitLines(
349343
}
350344
}
351345

346+
private static CsvTokenizer buildCsvTokenizer(final CsvParserPlugin.PluginTask parserTask, final Buffer sample) {
347+
final CsvTokenizer.Builder builder = CsvTokenizer.builder(parserTask.getDelimiter());
348+
parserTask.getQuoteChar().ifPresent(q -> builder.setQuote(q.getCharacter()));
349+
parserTask.getEscapeChar().ifPresent(e -> builder.setEscape(e.getCharacter()));
350+
builder.setNewline(parserTask.getNewline().getString());
351+
if (parserTask.getTrimIfNotQuoted()) {
352+
builder.enableTrimIfNotQuoted();
353+
}
354+
if (parserTask.getQuotesInQuotedFields()
355+
== CsvParserPlugin.QuotesInQuotedFields.ACCEPT_STRAY_QUOTES_ASSUMING_NO_DELIMITERS_IN_FIELDS) {
356+
builder.acceptStrayQuotesAssumingNoDelimitersInFields();
357+
}
358+
builder.setMaxQuotedFieldLength(parserTask.getMaxQuotedSizeLimit());
359+
parserTask.getCommentLineMarker().ifPresent(m -> builder.setCommentLineMarker(m));
360+
parserTask.getNullString().ifPresent(n -> builder.setNullString(n));
361+
362+
final ArrayList<Buffer> listBuffer = new ArrayList<>();
363+
listBuffer.add(sample);
364+
final ArrayList<ArrayList<Buffer>> listListBuffer = new ArrayList<>();
365+
listListBuffer.add(listBuffer);
366+
final LineDecoder decoder = LineDecoder.of(
367+
new ListFileInput(listListBuffer), parserTask.getCharset(), parserTask.getLineDelimiterRecognized().orElse(null));
368+
369+
return builder.build(decoder);
370+
}
371+
352372
private String guessDelimiter(final List<String> sampleLines) {
353373
String selectedDelimiter = null;
354374
double mostWeight = 0.0;

embulk-parser-csv/src/main/java/org/embulk/parser/csv/CsvParserPlugin.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,7 @@ public void run(TaskSource taskSource, final Schema schema,
314314
final PluginTask task = CONFIG_MAPPER_FACTORY.createTaskMapper().map(taskSource, PluginTask.class);
315315
final TimestampFormatter[] timestampFormatters = newTimestampColumnFormatters(task, task.getSchemaConfig());
316316
final JsonParser jsonParser = new JsonParser();
317-
final CsvTokenizer tokenizer = new CsvTokenizer(
318-
LineDecoder.of(input, task.getCharset(), task.getLineDelimiterRecognized().orElse(null)), task);
317+
final CsvTokenizer tokenizer = buildCsvTokenizer(task, input);
319318
final boolean allowOptionalColumns = task.getAllowOptionalColumns();
320319
final boolean allowExtraColumns = task.getAllowExtraColumns();
321320
final boolean stopOnInvalidRecord = task.getStopOnInvalidRecord();
@@ -469,6 +468,23 @@ static class CsvRecordValidateException extends DataException {
469468
}
470469
}
471470

471+
private static CsvTokenizer buildCsvTokenizer(final PluginTask task, final FileInput input) {
472+
final CsvTokenizer.Builder builder = CsvTokenizer.builder(task.getDelimiter());
473+
task.getQuoteChar().ifPresent(q -> builder.setQuote(q.getCharacter()));
474+
task.getEscapeChar().ifPresent(e -> builder.setEscape(e.getCharacter()));
475+
builder.setNewline(task.getNewline().getString());
476+
if (task.getTrimIfNotQuoted()) {
477+
builder.enableTrimIfNotQuoted();
478+
}
479+
if (task.getQuotesInQuotedFields() == QuotesInQuotedFields.ACCEPT_STRAY_QUOTES_ASSUMING_NO_DELIMITERS_IN_FIELDS) {
480+
builder.acceptStrayQuotesAssumingNoDelimitersInFields();
481+
}
482+
builder.setMaxQuotedFieldLength(task.getMaxQuotedSizeLimit());
483+
task.getCommentLineMarker().ifPresent(m -> builder.setCommentLineMarker(m));
484+
task.getNullString().ifPresent(n -> builder.setNullString(n));
485+
return builder.build(LineDecoder.of(input, task.getCharset(), task.getLineDelimiterRecognized().orElse(null)));
486+
}
487+
472488
@SuppressWarnings("deprecation") // For the use of new PageBuilder().
473489
private static PageBuilder getPageBuilder(final BufferAllocator allocator, final Schema schema, final PageOutput output) {
474490
try {

embulk-parser-csv/src/main/java/org/embulk/parser/csv/CsvTokenizer.java

Lines changed: 183 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,33 @@
2020
import java.util.ArrayList;
2121
import java.util.Deque;
2222
import java.util.List;
23-
import org.embulk.config.ConfigException;
24-
import org.embulk.parser.csv.CsvParserPlugin.QuotesInQuotedFields;
23+
import java.util.stream.Collectors;
2524
import org.embulk.util.text.LineDecoder;
2625

2726
public class CsvTokenizer {
28-
public CsvTokenizer(final LineDecoder input, final CsvParserPlugin.PluginTask task) {
29-
final String delimiter = task.getDelimiter();
30-
if (delimiter.length() == 0) {
31-
throw new ConfigException("Empty delimiter is not allowed");
32-
} else {
33-
this.delimiterChar = delimiter.charAt(0);
34-
if (delimiter.length() > 1) {
35-
this.delimiterFollowingString = delimiter.substring(1);
36-
} else {
37-
this.delimiterFollowingString = null;
38-
}
39-
}
40-
this.quote = task.getQuoteChar().orElse(CsvParserPlugin.QuoteCharacter.noQuote()).getCharacter();
41-
this.escape = task.getEscapeChar().orElse(CsvParserPlugin.EscapeCharacter.noEscape()).getCharacter();
42-
this.newline = task.getNewline().getString();
43-
this.trimIfNotQuoted = task.getTrimIfNotQuoted();
44-
this.quotesInQuotedFields = task.getQuotesInQuotedFields();
45-
if (this.trimIfNotQuoted && this.quotesInQuotedFields != QuotesInQuotedFields.ACCEPT_ONLY_RFC4180_ESCAPED) {
46-
// The combination makes some syntax very ambiguous such as:
47-
// val1, \"\"val2\"\" ,val3
48-
throw new ConfigException("[quotes_in_quoted_fields != ACCEPT_ONLY_RFC4180_ESCAPED] is not allowed to specify with [trim_if_not_quoted = true]");
49-
}
50-
this.maxQuotedSizeLimit = task.getMaxQuotedSizeLimit();
51-
this.commentLineMarker = task.getCommentLineMarker().orElse(null);
52-
this.nullStringOrNull = task.getNullString().orElse(null);
27+
private CsvTokenizer(
28+
final LineDecoder input,
29+
final char delimiterChar,
30+
final String delimiterFollowingString,
31+
final char quote,
32+
final char escape,
33+
final String newline,
34+
final boolean trimIfNotQuoted,
35+
final QuotesInQuotedFields quotesInQuotedFields,
36+
final long maxQuotedFieldLength,
37+
final String commentLineMarker,
38+
final String nullString) {
39+
this.delimiterChar = delimiterChar;
40+
this.delimiterFollowingString = delimiterFollowingString;
41+
this.quote = quote;
42+
this.escape = escape;
43+
this.newline = newline;
44+
this.trimIfNotQuoted = trimIfNotQuoted;
45+
this.quotesInQuotedFields = quotesInQuotedFields;
46+
this.maxQuotedFieldLength = maxQuotedFieldLength;
47+
this.commentLineMarker = commentLineMarker;
48+
this.nullString = nullString;
49+
5350
this.quotedValueLines = new ArrayList<>();
5451
this.unreadLines = new ArrayDeque<>();
5552
this.input = input;
@@ -61,6 +58,127 @@ public CsvTokenizer(final LineDecoder input, final CsvParserPlugin.PluginTask ta
6158
this.wasQuotedColumn = false;
6259
}
6360

61+
public static class Builder {
62+
private Builder(final String delimiter) {
63+
if (delimiter == null) {
64+
throw new NullPointerException("CsvTokenizer does not accept null as a delimiter.");
65+
}
66+
if (delimiter.isEmpty()) {
67+
throw new IllegalArgumentException("CsvTokenizer does not accept an empty delimiter.");
68+
}
69+
70+
this.delimiterChar = delimiter.charAt(0);
71+
if (delimiter.length() > 1) {
72+
this.delimiterFollowingString = delimiter.substring(1);
73+
} else {
74+
this.delimiterFollowingString = null;
75+
}
76+
77+
this.quote = '\"';
78+
this.escape = '\\';
79+
this.newline = "\r\n";
80+
this.trimIfNotQuoted = false;
81+
this.quotesInQuotedFields = QuotesInQuotedFields.ACCEPT_ONLY_RFC4180_ESCAPED;
82+
this.maxQuotedFieldLength = 131072L; // 128KB
83+
this.commentLineMarker = null;
84+
this.nullString = null;
85+
}
86+
87+
public Builder setQuote(final char quote) {
88+
this.quote = quote;
89+
return this;
90+
}
91+
92+
public Builder noQuote() {
93+
this.quote = NO_QUOTE;
94+
return this;
95+
}
96+
97+
public Builder setEscape(final char escape) {
98+
this.escape = escape;
99+
return this;
100+
}
101+
102+
public Builder noEscape() {
103+
this.escape = NO_ESCAPE;
104+
return this;
105+
}
106+
107+
public Builder setNewline(final String newline) {
108+
if (newline == null) {
109+
throw new NullPointerException("CsvTokenizer does not accept null as a newline.");
110+
}
111+
112+
if ("\r\n".equals(newline) || "\r".equals(newline) || "\n".equals(newline)) {
113+
this.newline = newline;
114+
return this;
115+
}
116+
117+
throw new IllegalArgumentException("CsvTokenizer does not accept \"" + escapeControl(newline) + "\" as a newline.");
118+
}
119+
120+
public Builder enableTrimIfNotQuoted() {
121+
this.trimIfNotQuoted = true;
122+
return this;
123+
}
124+
125+
public Builder acceptStrayQuotesAssumingNoDelimitersInFields() {
126+
this.quotesInQuotedFields = QuotesInQuotedFields.ACCEPT_STRAY_QUOTES_ASSUMING_NO_DELIMITERS_IN_FIELDS;
127+
return this;
128+
}
129+
130+
public Builder setMaxQuotedFieldLength(final long maxQuotedFieldLength) {
131+
this.maxQuotedFieldLength = maxQuotedFieldLength;
132+
return this;
133+
}
134+
135+
public Builder setCommentLineMarker(final String commentLineMarker) {
136+
this.commentLineMarker = commentLineMarker;
137+
return this;
138+
}
139+
140+
public Builder setNullString(final String nullString) {
141+
this.nullString = nullString;
142+
return this;
143+
}
144+
145+
public CsvTokenizer build(final LineDecoder input) {
146+
if (this.trimIfNotQuoted && this.quotesInQuotedFields != QuotesInQuotedFields.ACCEPT_ONLY_RFC4180_ESCAPED) {
147+
// The combination makes some syntax very ambiguous such as:
148+
// val1, \"\"val2\"\" ,val3
149+
throw new IllegalStateException(
150+
"[quotes_in_quoted_fields != ACCEPT_ONLY_RFC4180_ESCAPED] is not allowed to specify with [trim_if_not_quoted = true]");
151+
}
152+
return new CsvTokenizer(
153+
input,
154+
delimiterChar,
155+
delimiterFollowingString,
156+
quote,
157+
escape,
158+
newline,
159+
trimIfNotQuoted,
160+
quotesInQuotedFields,
161+
maxQuotedFieldLength,
162+
commentLineMarker,
163+
nullString);
164+
}
165+
166+
private final char delimiterChar;
167+
private final String delimiterFollowingString;
168+
169+
private char quote;
170+
private char escape;
171+
private String newline;
172+
private boolean trimIfNotQuoted;
173+
private QuotesInQuotedFields quotesInQuotedFields;
174+
private long maxQuotedFieldLength;
175+
private String commentLineMarker;
176+
private String nullString;
177+
}
178+
179+
public static Builder builder(final String delimiter) {
180+
return new Builder(delimiter);
181+
}
64182

65183
public long getCurrentLineNumber() {
66184
return this.lineNumber;
@@ -311,8 +429,8 @@ public String nextColumn() {
311429
&& !(this.isDelimiter(next) || this.isEndOfLine(next))) {
312430
// A non-escaped stray "quote character" in the field is processed as a regular character
313431
// if ACCEPT_STRAY_QUOTES_ASSUMING_NO_DELIMITERS_IN_FIELDS is specified,
314-
if ((this.linePos - valueStartPos) + quotedValue.length() > this.maxQuotedSizeLimit) {
315-
throw new QuotedSizeLimitExceededException("The size of the quoted value exceeds the limit size (" + this.maxQuotedSizeLimit + ")");
432+
if ((this.linePos - valueStartPos) + quotedValue.length() > this.maxQuotedFieldLength) {
433+
throw new QuotedSizeLimitExceededException("The size of the quoted value exceeds the limit size (" + this.maxQuotedFieldLength + ")");
316434
}
317435
} else {
318436
quotedValue.append(this.line.substring(valueStartPos, this.linePos - 1));
@@ -337,8 +455,8 @@ public String nextColumn() {
337455
}
338456

339457
} else {
340-
if ((this.linePos - valueStartPos) + quotedValue.length() > this.maxQuotedSizeLimit) {
341-
throw new QuotedSizeLimitExceededException("The size of the quoted value exceeds the limit size (" + this.maxQuotedSizeLimit + ")");
458+
if ((this.linePos - valueStartPos) + quotedValue.length() > this.maxQuotedFieldLength) {
459+
throw new QuotedSizeLimitExceededException("The size of the quoted value exceeds the limit size (" + this.maxQuotedFieldLength + ")");
342460
}
343461
// keep QUOTED_VALUE state
344462
}
@@ -374,7 +492,7 @@ public String nextColumn() {
374492

375493
public String nextColumnOrNull() {
376494
final String v = this.nextColumn();
377-
if (this.nullStringOrNull == null) {
495+
if (this.nullString == null) {
378496
if (v.isEmpty()) {
379497
if (this.wasQuotedColumn) {
380498
return "";
@@ -385,7 +503,7 @@ public String nextColumnOrNull() {
385503
return v;
386504
}
387505
} else {
388-
if (v.equals(this.nullStringOrNull)) {
506+
if (v.equals(this.nullString)) {
389507
return null;
390508
} else {
391509
return v;
@@ -473,6 +591,35 @@ private static enum ColumnState {
473591
BEGIN, VALUE, QUOTED_VALUE, AFTER_QUOTED_VALUE, FIRST_TRIM, LAST_TRIM_OR_VALUE,
474592
}
475593

594+
private static enum QuotesInQuotedFields {
595+
ACCEPT_ONLY_RFC4180_ESCAPED,
596+
ACCEPT_STRAY_QUOTES_ASSUMING_NO_DELIMITERS_IN_FIELDS,
597+
;
598+
}
599+
600+
private static String escapeControl(final String from) {
601+
return from.chars().mapToObj(c -> {
602+
if (c > 0x20) {
603+
return "" + (char) c;
604+
}
605+
606+
switch (c) {
607+
case '\b':
608+
return "\\b";
609+
case '\n':
610+
return "\\n";
611+
case '\t':
612+
return "\\t";
613+
case '\f':
614+
return "\\f";
615+
case '\r':
616+
return "\\r";
617+
default:
618+
return String.format("\\u%04x", c);
619+
}
620+
}).collect(Collectors.joining());
621+
}
622+
476623
static final char NO_QUOTE = '\0';
477624
static final char NO_ESCAPE = '\0';
478625

@@ -485,10 +632,11 @@ private static enum ColumnState {
485632
private final String newline;
486633
private final boolean trimIfNotQuoted;
487634
private final QuotesInQuotedFields quotesInQuotedFields;
488-
private final long maxQuotedSizeLimit;
635+
private final long maxQuotedFieldLength;
489636
private final String commentLineMarker;
490637
private final LineDecoder input;
491-
private final String nullStringOrNull;
638+
private final String nullString;
639+
492640
private final List<String> quotedValueLines;
493641
private final Deque<String> unreadLines;
494642

0 commit comments

Comments
 (0)