@@ -161,41 +161,44 @@ protected Set<Path> getIndividualTimeSeriesFilePaths() {
161161 * occurred
162162 */
163163 protected Map <String , String > buildFieldsToAttributes (
164- final String csvRow , final String [] headline ) {
164+ final String csvRow , final String [] headline ) throws SourceException {
165165
166166 TreeMap <String , String > insensitiveFieldsToAttributes =
167167 new TreeMap <>(String .CASE_INSENSITIVE_ORDER );
168168
169- try {
170- String [] fieldVals = parseCsvRow (csvRow , csvSep );
169+ String [] fieldVals = parseCsvRow (csvRow , csvSep );
170+ insensitiveFieldsToAttributes .putAll (
171+ IntStream .range (0 , Math .min (fieldVals .length , headline .length ))
172+ .boxed ()
173+ .collect (
174+ Collectors .toMap (
175+ k -> StringUtils .snakeCaseToCamelCase (headline [k ]), v -> fieldVals [v ])));
171176
172- if (fieldVals .length != headline .length ) {
173- throw new SourceException (
174- "The size of the headline does not fit to the size of the attribute fields.\n Headline: "
175- + String .join (", " , headline )
176- + "\n CsvRow: "
177- + csvRow .trim ()
178- + ".\n Please check:"
179- + "\n - is the csv separator in the file matching the separator provided in the constructor ('"
180- + csvSep
181- + "')"
182- + "\n - does the number of columns match the number of headline fields "
183- + "\n - are you using a valid RFC 4180 formatted csv row?" );
184- }
177+ if (fieldVals .length != headline .length ) {
178+ throw new SourceException (
179+ "The size of the headline ("
180+ + headline .length
181+ + ") does not fit to the size of the attribute fields ("
182+ + fieldVals .length
183+ + ").\n Headline: "
184+ + String .join (", " , headline )
185+ + "\n Row: "
186+ + csvRow .trim ()
187+ + ".\n Please check:"
188+ + "\n - is the csv separator in the file matching the separator provided in the constructor ('"
189+ + csvSep
190+ + "')"
191+ + "\n - does the number of columns match the number of headline fields "
192+ + "\n - are you using a valid RFC 4180 formatted csv row?" );
193+ }
185194
186- insensitiveFieldsToAttributes .putAll (
187- IntStream .range (0 , headline .length )
188- .boxed ()
189- .collect (
190- Collectors .toMap (
191- k -> StringUtils .snakeCaseToCamelCase (headline [k ]), v -> fieldVals [v ])));
192- } catch (SourceException e ) {
193- log .error (
194- "Cannot build fields to attributes map for row '{}' with headline '{}'." ,
195- csvRow .trim (),
196- String .join ("," , headline ),
197- e );
195+ if (insensitiveFieldsToAttributes .size () != fieldVals .length ) {
196+ throw new SourceException (
197+ "There might be duplicate headline elements.\n Headline: "
198+ + String .join (", " , headline )
199+ + ".\n Please keep in mind that headlines are case-insensitive and underscores from snake case are ignored." );
198200 }
201+
199202 return insensitiveFieldsToAttributes ;
200203 }
201204
@@ -252,7 +255,7 @@ Try<Stream<Map<String, String>>, SourceException> buildStreamWithFieldsToAttribu
252255 // is wanted to avoid a lock on the file), but this causes a closing of the stream as well.
253256 // As we still want to consume the data at other places, we start a new stream instead of
254257 // returning the original one
255- return Success . of ( csvRowFieldValueMapping (reader , headline ). parallelStream () );
258+ return csvRowFieldValueMapping (reader , headline );
256259 } catch (FileNotFoundException e ) {
257260 if (allowFileNotExisting ) {
258261 log .warn ("Unable to find file '{}': {}" , filePath , e .getMessage ());
@@ -282,13 +285,20 @@ private Try<Path, SourceException> getFilePath(Class<? extends Entity> entityCla
282285 * @param headline of the file
283286 * @return a list of mapping
284287 */
285- protected List < Map <String , String >> csvRowFieldValueMapping (
288+ protected Try < Stream < Map <String , String >>, SourceException > csvRowFieldValueMapping (
286289 BufferedReader reader , String [] headline ) {
287- return reader
288- .lines ()
289- .parallel ()
290- .map (csvRow -> buildFieldsToAttributes (csvRow , headline ))
291- .filter (map -> !map .isEmpty ())
292- .toList ();
290+ return Try .scanStream (
291+ reader
292+ .lines ()
293+ .parallel ()
294+ .map (
295+ csvRow ->
296+ Try .of (
297+ () -> buildFieldsToAttributes (csvRow , headline ),
298+ SourceException .class )),
299+ "Map<String, String>" )
300+ .transform (
301+ stream -> stream .filter (map -> !map .isEmpty ()),
302+ e -> new SourceException ("Parsing csv row failed." , e ));
293303 }
294304}
0 commit comments