77
88import edu .ie3 .datamodel .exceptions .ConnectorException ;
99import edu .ie3 .datamodel .io .IoUtil ;
10- import edu .ie3 .datamodel .io .csv .*;
11- import edu .ie3 .datamodel .io .naming .FileNamingStrategy ;
12- import edu .ie3 .datamodel .io .naming .TimeSeriesMetaInformation ;
13- import edu .ie3 .datamodel .io .naming .timeseries .ColumnScheme ;
14- import edu .ie3 .datamodel .io .naming .timeseries .IndividualTimeSeriesMetaInformation ;
10+ import edu .ie3 .datamodel .io .csv .BufferedCsvWriter ;
11+ import edu .ie3 .datamodel .io .csv .CsvFileDefinition ;
1512import edu .ie3 .datamodel .models .Entity ;
1613import edu .ie3 .datamodel .models .timeseries .TimeSeries ;
1714import edu .ie3 .datamodel .models .timeseries .TimeSeriesEntry ;
1815import edu .ie3 .datamodel .models .value .Value ;
1916import java .io .*;
2017import java .nio .charset .StandardCharsets ;
21- import java .nio .file .Files ;
2218import java .nio .file .Path ;
2319import java .util .*;
24- import java .util .function .Function ;
25- import java .util .stream .Collectors ;
2620import java .util .stream .Stream ;
2721import org .slf4j .Logger ;
2822import org .slf4j .LoggerFactory ;
2923
3024/**
3125 * Provides the connector (here: buffered writer) for specific files to be used by a {@link
32- * edu.ie3.datamodel.io.sink.CsvFileSink}
26+ * edu.ie3.datamodel.io.sink.CsvFileSink} or {@link edu.ie3.datamodel.io.source.csv.CsvDataSource}
3327 *
3428 * @version 0.1
3529 * @since 19.03.20
@@ -39,27 +33,26 @@ public class CsvFileConnector implements DataConnector {
3933
4034 private final Map <Class <? extends Entity >, BufferedCsvWriter > entityWriters = new HashMap <>();
4135 private final Map <UUID , BufferedCsvWriter > timeSeriesWriters = new HashMap <>();
42-
43- private final FileNamingStrategy fileNamingStrategy ;
4436 private final Path baseDirectory ;
45-
4637 private static final String FILE_ENDING = ".csv" ;
4738
48- public CsvFileConnector (Path baseDirectory , FileNamingStrategy fileNamingStrategy ) {
39+ public CsvFileConnector (Path baseDirectory ) {
4940 this .baseDirectory = baseDirectory ;
50- this .fileNamingStrategy = fileNamingStrategy ;
41+ }
42+
43+ /** Returns the base directory of this connector. */
44+ public Path getBaseDirectory () {
45+ return baseDirectory ;
5146 }
5247
5348 public synchronized BufferedCsvWriter getOrInitWriter (
54- Class <? extends Entity > clz , String [] headerElements , String csvSep )
55- throws ConnectorException {
56- /* Try to the the right writer */
49+ Class <? extends Entity > clz , CsvFileDefinition fileDefinition ) throws ConnectorException {
50+ /* Try to the right writer */
5751 BufferedCsvWriter predefinedWriter = entityWriters .get (clz );
5852 if (predefinedWriter != null ) return predefinedWriter ;
5953
6054 /* If it is not available, build and register one */
6155 try {
62- CsvFileDefinition fileDefinition = buildFileDefinition (clz , headerElements , csvSep );
6356 BufferedCsvWriter newWriter = initWriter (baseDirectory , fileDefinition );
6457
6558 entityWriters .put (clz , newWriter );
@@ -71,15 +64,14 @@ public synchronized BufferedCsvWriter getOrInitWriter(
7164 }
7265
7366 public synchronized <T extends TimeSeries <E , V >, E extends TimeSeriesEntry <V >, V extends Value >
74- BufferedCsvWriter getOrInitWriter (T timeSeries , String [] headerElements , String csvSep )
67+ BufferedCsvWriter getOrInitWriter (T timeSeries , CsvFileDefinition fileDefinition )
7568 throws ConnectorException {
76- /* Try to the the right writer */
69+ /* Try to the right writer */
7770 BufferedCsvWriter predefinedWriter = timeSeriesWriters .get (timeSeries .getUuid ());
7871 if (predefinedWriter != null ) return predefinedWriter ;
7972
8073 /* If it is not available, build and register one */
8174 try {
82- CsvFileDefinition fileDefinition = buildFileDefinition (timeSeries , headerElements , csvSep );
8375 BufferedCsvWriter newWriter = initWriter (baseDirectory , fileDefinition );
8476
8577 timeSeriesWriters .put (timeSeries .getUuid (), newWriter );
@@ -131,8 +123,7 @@ public synchronized void closeTimeSeriesWriter(UUID uuid) throws IOException {
131123 Optional <BufferedCsvWriter > maybeWriter = Optional .ofNullable (timeSeriesWriters .get (uuid ));
132124 if (maybeWriter .isPresent ()) {
133125 log .debug ("Remove reference to time series writer for UUID '{}'." , uuid );
134- timeSeriesWriters .remove (uuid );
135- maybeWriter .get ().close ();
126+ timeSeriesWriters .remove (uuid ).close ();
136127 } else {
137128 log .warn ("No writer found for time series '{}'." , uuid );
138129 }
@@ -149,8 +140,7 @@ public synchronized <C extends Entity> void closeEntityWriter(Class<C> clz) thro
149140 Optional <BufferedCsvWriter > maybeWriter = Optional .ofNullable (entityWriters .get (clz ));
150141 if (maybeWriter .isPresent ()) {
151142 log .debug ("Remove reference to entity writer for class '{}'." , clz );
152- entityWriters .remove (clz );
153- maybeWriter .get ().close ();
143+ entityWriters .remove (clz ).close ();
154144 } else {
155145 log .warn ("No writer found for class '{}'." , clz );
156146 }
@@ -170,106 +160,6 @@ public BufferedReader initReader(Path filePath) throws FileNotFoundException {
170160 new InputStreamReader (new FileInputStream (fullPath ), StandardCharsets .UTF_8 ), 16384 );
171161 }
172162
173- /**
174- * Receive the information for specific time series. They are given back filtered by the column
175- * scheme in order to allow for accounting the different content types.
176- *
177- * @param columnSchemes the column schemes to initialize readers for. If no scheme is given, all
178- * possible readers will be initialized.
179- * @return A mapping from column scheme to the individual time series meta information
180- */
181- public Map <UUID , CsvIndividualTimeSeriesMetaInformation >
182- getCsvIndividualTimeSeriesMetaInformation (final ColumnScheme ... columnSchemes ) {
183- return getIndividualTimeSeriesFilePaths ().parallelStream ()
184- .map (
185- filePath -> {
186- /* Extract meta information from file path and enhance it with the file path itself */
187- IndividualTimeSeriesMetaInformation metaInformation =
188- fileNamingStrategy .individualTimeSeriesMetaInformation (filePath .toString ());
189- return new CsvIndividualTimeSeriesMetaInformation (
190- metaInformation , FileNamingStrategy .removeFileNameEnding (filePath .getFileName ()));
191- })
192- .filter (
193- metaInformation ->
194- columnSchemes == null
195- || columnSchemes .length == 0
196- || Stream .of (columnSchemes )
197- .anyMatch (scheme -> scheme .equals (metaInformation .getColumnScheme ())))
198- .collect (Collectors .toMap (TimeSeriesMetaInformation ::getUuid , Function .identity ()));
199- }
200-
201- /**
202- * Returns a set of relative paths strings to time series files, with respect to the base folder
203- * path
204- *
205- * @return A set of relative paths to time series files, with respect to the base folder path
206- */
207- private Set <Path > getIndividualTimeSeriesFilePaths () {
208- try (Stream <Path > pathStream = Files .walk (baseDirectory )) {
209- return pathStream
210- .map (baseDirectory ::relativize )
211- .filter (
212- path -> {
213- Path withoutEnding =
214- Path .of (FileNamingStrategy .removeFileNameEnding (path .toString ()));
215- return fileNamingStrategy
216- .getIndividualTimeSeriesPattern ()
217- .matcher (withoutEnding .toString ())
218- .matches ();
219- })
220- .collect (Collectors .toSet ());
221- } catch (IOException e ) {
222- log .error ("Unable to determine time series files readers for time series." , e );
223- return Collections .emptySet ();
224- }
225- }
226-
227- /**
228- * Builds a new file definition consisting of file name and head line elements
229- *
230- * @param timeSeries Time series to derive naming information from
231- * @param headLineElements Array of head line elements
232- * @param csvSep Separator for csv columns
233- * @return A suitable file definition
234- * @throws ConnectorException If the definition cannot be determined
235- */
236- private <T extends TimeSeries <E , V >, E extends TimeSeriesEntry <V >, V extends Value >
237- CsvFileDefinition buildFileDefinition (T timeSeries , String [] headLineElements , String csvSep )
238- throws ConnectorException {
239- Path directoryPath = fileNamingStrategy .getDirectoryPath (timeSeries ).orElse (Path .of ("" ));
240- String fileName =
241- fileNamingStrategy
242- .getEntityName (timeSeries )
243- .orElseThrow (
244- () ->
245- new ConnectorException (
246- "Cannot determine the file name for time series '" + timeSeries + "'." ));
247- return new CsvFileDefinition (fileName , directoryPath , headLineElements , csvSep );
248- }
249-
250- /**
251- * Builds a new file definition consisting of file name and head line elements
252- *
253- * @param clz Class that is meant to be serialized into this file
254- * @param headLineElements Array of head line elements
255- * @param csvSep Separator for csv columns
256- * @return A suitable file definition
257- * @throws ConnectorException If the definition cannot be determined
258- */
259- private CsvFileDefinition buildFileDefinition (
260- Class <? extends Entity > clz , String [] headLineElements , String csvSep )
261- throws ConnectorException {
262- Path directoryPath = fileNamingStrategy .getDirectoryPath (clz ).orElse (Path .of ("" ));
263- String fileName =
264- fileNamingStrategy
265- .getEntityName (clz )
266- .orElseThrow (
267- () ->
268- new ConnectorException (
269- "Cannot determine the file name for class '" + clz .getSimpleName () + "'." ));
270- return new CsvFileDefinition (fileName , directoryPath , headLineElements , csvSep );
271- }
272-
273163 @ Override
274164 public void shutdown () {
275165 Stream .of (entityWriters .values (), timeSeriesWriters .values ())
0 commit comments