Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b3c209a
Improving the usage of streams on SQL fetches.
staudtMarius Apr 9, 2024
4b1b4a8
Fixing `Codacy` issue.
staudtMarius Apr 9, 2024
ba15e89
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Apr 9, 2024
46fc1b1
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Apr 15, 2024
88023cb
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Apr 23, 2024
f952cd2
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
danielfeismann Apr 24, 2024
b3c6db7
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Apr 30, 2024
f01cbbd
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius May 1, 2024
19cd068
Updating `CHANGELOG`.
staudtMarius May 1, 2024
1194a43
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius May 6, 2024
fd57b05
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius May 7, 2024
167f0b7
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
danielfeismann May 14, 2024
ea320e7
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
danielfeismann May 16, 2024
ed1fa01
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius May 23, 2024
fa0337e
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Jun 4, 2024
abad5ed
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Jun 10, 2024
76b1c29
Merge branch 'refs/heads/dev' into ms/#827-use-java-streams-on-sql-fe…
sebastian-peter Jun 13, 2024
d8ba515
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Jun 13, 2024
e804380
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Jun 20, 2024
fef513d
Implementing requested changes.
staudtMarius Jun 20, 2024
fcf58c7
Adding some comments.
staudtMarius Jun 24, 2024
784c5c4
Merge branch 'refs/heads/dev' into ms/#827-use-java-streams-on-sql-fe…
staudtMarius Jun 24, 2024
1cffc66
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Jun 24, 2024
3101166
Update `CHANGELOG`.
staudtMarius Jun 24, 2024
710a886
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Jul 1, 2024
538906c
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Jul 4, 2024
75538a0
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Jul 9, 2024
09d581b
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Jul 29, 2024
81ef0e1
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Aug 9, 2024
a398154
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
sebastian-peter Oct 25, 2024
55374c4
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
Nov 5, 2024
61e7ca3
Small improvements.
staudtMarius Nov 5, 2024
357de53
Fixing failing sql test.
staudtMarius Nov 6, 2024
1937279
Adding missing import in `SqlSinkTest`.
staudtMarius Nov 6, 2024
0595b72
Small improvements.
staudtMarius Nov 6, 2024
2a62cc2
Merge branch 'dev' into ms/#827-use-java-streams-on-sql-fetsches-effe…
staudtMarius Nov 8, 2024
3e47beb
Improving closing of sql resources on error.
staudtMarius Nov 8, 2024
919e10b
Adding NoSuchElementException to solve SQ bug
sebastian-peter Nov 8, 2024
3e4a7b9
Solving code smell regarding method name
sebastian-peter Nov 8, 2024
5dd424e
Solving new code smell
sebastian-peter Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Storage minimum level parameter removed from cylindrical thermal storage [#1123](https://github.com/ie3-institute/PowerSystemDataModel/issues/1123)
- Converted eval-rst to myst syntax in ReadTheDocs, fixed line wrapping and widths[#1137](https://github.com/ie3-institute/PowerSystemDataModel/issues/1137)
- Improving usage of streams on sql fetches [#827](https://github.com/ie3-institute/PowerSystemDataModel/issues/827)

## [5.1.0] - 2024-06-24

Expand Down
121 changes: 95 additions & 26 deletions src/main/java/edu/ie3/datamodel/io/connectors/SqlConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import edu.ie3.util.TimeUtil;
import java.sql.*;
import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,6 +57,9 @@ public ResultSet executeQuery(Statement stmt, String query) throws SQLException
return stmt.executeQuery(query);
} catch (SQLException e) {
throw new SQLException(String.format("Error at execution of query \"%1.127s\": ", query), e);
} finally {
// commits any changes made and unlocks database
getConnection().commit();
}
}

Expand All @@ -70,6 +75,9 @@ public int executeUpdate(String query) throws SQLException {
} catch (SQLException e) {
throw new SQLException(
String.format("Error at execution of query, SQLReason: '%s'", e.getMessage()), e);
} finally {
// commits any changes made and unlocks database
getConnection().commit();
}
}

Expand All @@ -85,7 +93,8 @@ public Connection getConnection() throws SQLException {
}

/**
* Establishes and returns a database connection
* Establishes and returns a database connection. The {@link Connection#getAutoCommit()} is set to
* {@code false}.
*
* @param reuseConnection should the connection be used again, if it is still valid? If not, a new
* connection will be established
Expand All @@ -98,6 +107,7 @@ public Connection getConnection(boolean reuseConnection) throws SQLException {
if (connection != null) connection.close();

connection = DriverManager.getConnection(jdbcUrl, connectionProps);
connection.setAutoCommit(false);
} catch (SQLException e) {
throw new SQLException("Could not establish connection: ", e);
}
Expand All @@ -115,21 +125,82 @@ public void shutdown() {
}

/**
* Extracts all field to value maps from the ResultSet, one for each row
* Method to execute a {@link PreparedStatement} and return its result as a stream.
*
* @param rs the ResultSet to use
* @return a list of field maps
* @param ps to execute
* @param fetchSize used for {@link PreparedStatement#setFetchSize(int)}
* @return a stream of maps
* @throws SQLException if an exception occurred while executing the query
*/
public List<Map<String, String>> extractFieldMaps(ResultSet rs) {
List<Map<String, String>> fieldMaps = new ArrayList<>();
public Stream<Map<String, String>> toStream(PreparedStatement ps, int fetchSize)
throws SQLException {
try {
while (rs.next()) {
fieldMaps.add(extractFieldMap(rs));
ps.setFetchSize(fetchSize);
ResultSet resultSet = ps.executeQuery();
Iterator<Map<String, String>> sqlIterator = getSqlIterator(resultSet);

return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
sqlIterator, Spliterator.NONNULL | Spliterator.IMMUTABLE),
true)
.onClose(() -> closeResultSet(ps, resultSet));
} catch (SQLException e) {
// catches the exception, closes the statement and re-throws the exception
closeResultSet(ps, null);
throw e;
}
}

/**
* Returns an {@link Iterator} for the given {@link ResultSet}.
*
* @param rs given result set
* @return an iterator
*/
public Iterator<Map<String, String>> getSqlIterator(ResultSet rs) {
return new Iterator<>() {
@Override
public boolean hasNext() {
try {
return rs.next();
} catch (SQLException e) {
log.error("Exception at extracting next ResultSet: ", e);
closeResultSet(null, rs);
return false;
}
}

@Override
public Map<String, String> next() {
try {
boolean isEmpty = !rs.isBeforeFirst() && rs.getRow() == 0;

if (isEmpty || rs.isAfterLast())
throw new NoSuchElementException(
"There is no more element to iterate to in the ResultSet.");

return extractFieldMap(rs);
} catch (SQLException e) {
log.error("Exception at extracting ResultSet: ", e);
closeResultSet(null, rs);
return Collections.emptyMap();
}
}
};
}

/**
* Method for closing a {@link ResultSet}.
*
* @param rs to close
*/
private void closeResultSet(PreparedStatement ps, ResultSet rs) {
try (ps;
rs) {
log.debug("Resources successfully closed.");
} catch (SQLException e) {
log.error("Exception at extracting ResultSet: ", e);
log.warn("Failed to properly close sources.", e);
}
return fieldMaps;
}

/**
Expand All @@ -138,26 +209,24 @@ public List<Map<String, String>> extractFieldMaps(ResultSet rs) {
* @param rs the ResultSet to use
* @return the field map for the current row
*/
public Map<String, String> extractFieldMap(ResultSet rs) {
public Map<String, String> extractFieldMap(ResultSet rs) throws SQLException {
TreeMap<String, String> insensitiveFieldsToAttributes =
new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
try {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
String columnName = StringUtils.snakeCaseToCamelCase(metaData.getColumnName(i));
String value;
Object result = rs.getObject(i);
if (result instanceof Timestamp) {
value = TimeUtil.withDefaults.toString(rs.getTimestamp(i).toInstant());
} else {
value = String.valueOf(rs.getObject(i));
}
insensitiveFieldsToAttributes.put(columnName, value);

ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
String columnName = StringUtils.snakeCaseToCamelCase(metaData.getColumnName(i));
String value;
Object result = rs.getObject(i);
if (result instanceof Timestamp) {
value = TimeUtil.withDefaults.toString(rs.getTimestamp(i).toInstant());
} else {
value = String.valueOf(rs.getObject(i));
}
} catch (SQLException e) {
log.error("Exception at extracting ResultSet: ", e);
insensitiveFieldsToAttributes.put(columnName, value);
}

return insensitiveFieldsToAttributes;
}
}
11 changes: 8 additions & 3 deletions src/main/java/edu/ie3/datamodel/io/source/sql/SqlDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,15 @@ protected Stream<Map<String, String>> buildStreamByTableName(String tableName) {
* table name.
*/
protected Stream<Map<String, String>> executeQuery(String query, AddParams addParams) {
try (PreparedStatement ps = connector.getConnection().prepareStatement(query)) {
try {
PreparedStatement ps = connector.getConnection().prepareStatement(query);
addParams.addParams(ps);
ResultSet resultSet = ps.executeQuery();
return connector.extractFieldMaps(resultSet).stream();

// don't work with `try with resource`, therefore manual closing is necessary
// closes automatically after all dependent resultSets are closed
ps.closeOnCompletion();

return connector.toStream(ps, 1000);
} catch (SQLException e) {
log.error("Error during execution of query {}", query, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import edu.ie3.util.geo.CoordinateDistance;
import edu.ie3.util.geo.GeoUtils;
import java.sql.Array;
import java.sql.PreparedStatement;
import java.util.*;
import java.util.stream.Stream;
import javax.measure.quantity.Length;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Point;
Expand Down Expand Up @@ -98,7 +98,8 @@ public Optional<Set<String>> getSourceFields() {

@Override
public Optional<Point> getCoordinate(int id) {
List<CoordinateValue> values = executeQueryToList(queryForPoint, ps -> ps.setInt(1, id));
List<CoordinateValue> values =
executeQueryToStream(queryForPoint, ps -> ps.setInt(1, id)).toList();

if (values.isEmpty()) {
return Optional.empty();
Expand All @@ -111,15 +112,14 @@ public Optional<Point> getCoordinate(int id) {
public Collection<Point> getCoordinates(int... ids) {
Object[] idSet = Arrays.stream(ids).boxed().distinct().toArray();

List<CoordinateValue> values =
executeQueryToList(
return executeQueryToStream(
queryForPoints,
ps -> {
Array sqlArray = ps.getConnection().createArrayOf("int", idSet);
ps.setArray(1, sqlArray);
});

return values.stream().map(value -> value.coordinate).toList();
})
.map(value -> value.coordinate)
.toList();
}

@Override
Expand All @@ -128,12 +128,13 @@ public Optional<Integer> getId(Point coordinate) {
double longitude = coordinate.getX();

List<CoordinateValue> values =
executeQueryToList(
queryForId,
ps -> {
ps.setDouble(1, longitude);
ps.setDouble(2, latitude);
});
executeQueryToStream(
queryForId,
ps -> {
ps.setDouble(1, longitude);
ps.setDouble(2, latitude);
})
.toList();

if (values.isEmpty()) {
return Optional.empty();
Expand All @@ -144,23 +145,21 @@ public Optional<Integer> getId(Point coordinate) {

@Override
public Collection<Point> getAllCoordinates() {
List<CoordinateValue> values = executeQueryToList(basicQuery + ";", PreparedStatement::execute);

return values.stream().map(value -> value.coordinate).toList();
return executeQueryToStream(basicQuery + ";").map(value -> value.coordinate).toList();
}

@Override
public List<CoordinateDistance> getNearestCoordinates(Point coordinate, int n) {
List<CoordinateValue> values =
executeQueryToList(
queryForNearestPoints,
ps -> {
ps.setDouble(1, coordinate.getX());
ps.setDouble(2, coordinate.getY());
ps.setInt(3, n);
});

List<Point> points = values.stream().map(value -> value.coordinate).toList();
List<Point> points =
executeQueryToStream(
queryForNearestPoints,
ps -> {
ps.setDouble(1, coordinate.getX());
ps.setDouble(2, coordinate.getY());
ps.setInt(3, n);
})
.map(value -> value.coordinate)
.toList();
return calculateCoordinateDistances(coordinate, n, points);
}

Expand All @@ -185,15 +184,14 @@ private List<Point> getCoordinatesInBoundingBox(
Point coordinate, ComparableQuantity<Length> distance) {
Envelope envelope = GeoUtils.calculateBoundingBox(coordinate, distance);

return executeQueryToList(
return executeQueryToStream(
queryForBoundingBox,
ps -> {
ps.setDouble(1, envelope.getMinX());
ps.setDouble(2, envelope.getMinY());
ps.setDouble(3, envelope.getMaxX());
ps.setDouble(4, envelope.getMaxY());
})
.stream()
.map(value -> value.coordinate)
.toList();
}
Expand All @@ -208,9 +206,13 @@ private CoordinateValue createCoordinateValue(Map<String, String> fieldToValues)
return new CoordinateValue(idCoordinate.id(), idCoordinate.point());
}

private List<CoordinateValue> executeQueryToList(
private Stream<CoordinateValue> executeQueryToStream(String query) {
return dataSource.executeQuery(query).map(this::createCoordinateValue);
}

private Stream<CoordinateValue> executeQueryToStream(
String query, SqlDataSource.AddParams addParams) {
return dataSource.executeQuery(query, addParams).map(this::createCoordinateValue).toList();
return dataSource.executeQuery(query, addParams).map(this::createCoordinateValue);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,15 @@ class SqlConnectorIT extends Specification implements TestContainerHelper {
def "A SQL connector is able to extract all field to value maps from result set"() {
given:
def preparedStatement = connector.getConnection(false).prepareStatement("SELECT * FROM public.test;")
def resultSet = preparedStatement.executeQuery()

when:
def actual = connector.extractFieldMaps(resultSet)
def actual = connector.toStream(preparedStatement, 1).toList()

then:
actual.size() == 2

cleanup:
resultSet.close()
preparedStatement.close()
}

def "A SQL connector shuts down correctly, if no connection was opened"() {
Expand Down
Loading