Skip to content

Commit 2e1c2c9

Browse files
Merge pull request #1062 from ie3-institute/ms/#827-use-java-streams-on-sql-fetsches-effectively
Improving the usage of streams on SQL fetches.
2 parents e472dde + 5dd424e commit 2e1c2c9

File tree

6 files changed

+140
-87
lines changed

6 files changed

+140
-87
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1515
### Changed
1616
- Storage minimum level parameter removed from cylindrical thermal storage [#1123](https://github.com/ie3-institute/PowerSystemDataModel/issues/1123)
1717
- Converted eval-rst to myst syntax in ReadTheDocs, fixed line wrapping and widths[#1137](https://github.com/ie3-institute/PowerSystemDataModel/issues/1137)
18+
- Improving usage of streams on sql fetches [#827](https://github.com/ie3-institute/PowerSystemDataModel/issues/827)
1819

1920
## [5.1.0] - 2024-06-24
2021

src/main/java/edu/ie3/datamodel/io/connectors/SqlConnector.java

Lines changed: 95 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import edu.ie3.util.TimeUtil;
1010
import java.sql.*;
1111
import java.util.*;
12+
import java.util.stream.Stream;
13+
import java.util.stream.StreamSupport;
1214
import org.slf4j.Logger;
1315
import org.slf4j.LoggerFactory;
1416

@@ -55,6 +57,9 @@ public ResultSet executeQuery(Statement stmt, String query) throws SQLException
5557
return stmt.executeQuery(query);
5658
} catch (SQLException e) {
5759
throw new SQLException(String.format("Error at execution of query \"%1.127s\": ", query), e);
60+
} finally {
61+
// commits any changes made and unlocks database
62+
getConnection().commit();
5863
}
5964
}
6065

@@ -70,6 +75,9 @@ public int executeUpdate(String query) throws SQLException {
7075
} catch (SQLException e) {
7176
throw new SQLException(
7277
String.format("Error at execution of query, SQLReason: '%s'", e.getMessage()), e);
78+
} finally {
79+
// commits any changes made and unlocks database
80+
getConnection().commit();
7381
}
7482
}
7583

@@ -85,7 +93,8 @@ public Connection getConnection() throws SQLException {
8593
}
8694

8795
/**
88-
* Establishes and returns a database connection
96+
* Establishes and returns a database connection. The {@link Connection#getAutoCommit()} is set to
97+
* {@code false}.
8998
*
9099
* @param reuseConnection should the connection be used again, if it is still valid? If not, a new
91100
* connection will be established
@@ -98,6 +107,7 @@ public Connection getConnection(boolean reuseConnection) throws SQLException {
98107
if (connection != null) connection.close();
99108

100109
connection = DriverManager.getConnection(jdbcUrl, connectionProps);
110+
connection.setAutoCommit(false);
101111
} catch (SQLException e) {
102112
throw new SQLException("Could not establish connection: ", e);
103113
}
@@ -115,21 +125,82 @@ public void shutdown() {
115125
}
116126

117127
/**
118-
* Extracts all field to value maps from the ResultSet, one for each row
128+
* Method to execute a {@link PreparedStatement} and return its result as a stream.
119129
*
120-
* @param rs the ResultSet to use
121-
* @return a list of field maps
130+
* @param ps to execute
131+
* @param fetchSize used for {@link PreparedStatement#setFetchSize(int)}
132+
* @return a stream of maps
133+
* @throws SQLException if an exception occurred while executing the query
122134
*/
123-
public List<Map<String, String>> extractFieldMaps(ResultSet rs) {
124-
List<Map<String, String>> fieldMaps = new ArrayList<>();
135+
public Stream<Map<String, String>> toStream(PreparedStatement ps, int fetchSize)
136+
throws SQLException {
125137
try {
126-
while (rs.next()) {
127-
fieldMaps.add(extractFieldMap(rs));
138+
ps.setFetchSize(fetchSize);
139+
ResultSet resultSet = ps.executeQuery();
140+
Iterator<Map<String, String>> sqlIterator = getSqlIterator(resultSet);
141+
142+
return StreamSupport.stream(
143+
Spliterators.spliteratorUnknownSize(
144+
sqlIterator, Spliterator.NONNULL | Spliterator.IMMUTABLE),
145+
true)
146+
.onClose(() -> closeResultSet(ps, resultSet));
147+
} catch (SQLException e) {
148+
// catches the exception, closes the statement and re-throws the exception
149+
closeResultSet(ps, null);
150+
throw e;
151+
}
152+
}
153+
154+
/**
155+
* Returns an {@link Iterator} for the given {@link ResultSet}.
156+
*
157+
* @param rs given result set
158+
* @return an iterator
159+
*/
160+
public Iterator<Map<String, String>> getSqlIterator(ResultSet rs) {
161+
return new Iterator<>() {
162+
@Override
163+
public boolean hasNext() {
164+
try {
165+
return rs.next();
166+
} catch (SQLException e) {
167+
log.error("Exception at extracting next ResultSet: ", e);
168+
closeResultSet(null, rs);
169+
return false;
170+
}
128171
}
172+
173+
@Override
174+
public Map<String, String> next() {
175+
try {
176+
boolean isEmpty = !rs.isBeforeFirst() && rs.getRow() == 0;
177+
178+
if (isEmpty || rs.isAfterLast())
179+
throw new NoSuchElementException(
180+
"There is no more element to iterate to in the ResultSet.");
181+
182+
return extractFieldMap(rs);
183+
} catch (SQLException e) {
184+
log.error("Exception at extracting ResultSet: ", e);
185+
closeResultSet(null, rs);
186+
return Collections.emptyMap();
187+
}
188+
}
189+
};
190+
}
191+
192+
/**
193+
* Method for closing a {@link ResultSet}.
194+
*
195+
* @param rs to close
196+
*/
197+
private void closeResultSet(PreparedStatement ps, ResultSet rs) {
198+
try (ps;
199+
rs) {
200+
log.debug("Resources successfully closed.");
129201
} catch (SQLException e) {
130-
log.error("Exception at extracting ResultSet: ", e);
202+
log.warn("Failed to properly close sources.", e);
131203
}
132-
return fieldMaps;
133204
}
134205

135206
/**
@@ -138,26 +209,24 @@ public List<Map<String, String>> extractFieldMaps(ResultSet rs) {
138209
* @param rs the ResultSet to use
139210
* @return the field map for the current row
140211
*/
141-
public Map<String, String> extractFieldMap(ResultSet rs) {
212+
public Map<String, String> extractFieldMap(ResultSet rs) throws SQLException {
142213
TreeMap<String, String> insensitiveFieldsToAttributes =
143214
new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
144-
try {
145-
ResultSetMetaData metaData = rs.getMetaData();
146-
int columnCount = metaData.getColumnCount();
147-
for (int i = 1; i <= columnCount; i++) {
148-
String columnName = StringUtils.snakeCaseToCamelCase(metaData.getColumnName(i));
149-
String value;
150-
Object result = rs.getObject(i);
151-
if (result instanceof Timestamp) {
152-
value = TimeUtil.withDefaults.toString(rs.getTimestamp(i).toInstant());
153-
} else {
154-
value = String.valueOf(rs.getObject(i));
155-
}
156-
insensitiveFieldsToAttributes.put(columnName, value);
215+
216+
ResultSetMetaData metaData = rs.getMetaData();
217+
int columnCount = metaData.getColumnCount();
218+
for (int i = 1; i <= columnCount; i++) {
219+
String columnName = StringUtils.snakeCaseToCamelCase(metaData.getColumnName(i));
220+
String value;
221+
Object result = rs.getObject(i);
222+
if (result instanceof Timestamp) {
223+
value = TimeUtil.withDefaults.toString(rs.getTimestamp(i).toInstant());
224+
} else {
225+
value = String.valueOf(rs.getObject(i));
157226
}
158-
} catch (SQLException e) {
159-
log.error("Exception at extracting ResultSet: ", e);
227+
insensitiveFieldsToAttributes.put(columnName, value);
160228
}
229+
161230
return insensitiveFieldsToAttributes;
162231
}
163232
}

src/main/java/edu/ie3/datamodel/io/source/sql/SqlDataSource.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,15 @@ protected Stream<Map<String, String>> buildStreamByTableName(String tableName) {
177177
* table name.
178178
*/
179179
protected Stream<Map<String, String>> executeQuery(String query, AddParams addParams) {
180-
try (PreparedStatement ps = connector.getConnection().prepareStatement(query)) {
180+
try {
181+
PreparedStatement ps = connector.getConnection().prepareStatement(query);
181182
addParams.addParams(ps);
182-
ResultSet resultSet = ps.executeQuery();
183-
return connector.extractFieldMaps(resultSet).stream();
183+
184+
// don't work with `try with resource`, therefore manual closing is necessary
185+
// closes automatically after all dependent resultSets are closed
186+
ps.closeOnCompletion();
187+
188+
return connector.toStream(ps, 1000);
184189
} catch (SQLException e) {
185190
log.error("Error during execution of query {}", query, e);
186191
}

src/main/java/edu/ie3/datamodel/io/source/sql/SqlIdCoordinateSource.java

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import edu.ie3.util.geo.CoordinateDistance;
1919
import edu.ie3.util.geo.GeoUtils;
2020
import java.sql.Array;
21-
import java.sql.PreparedStatement;
2221
import java.util.*;
22+
import java.util.stream.Stream;
2323
import javax.measure.quantity.Length;
2424
import org.locationtech.jts.geom.Envelope;
2525
import org.locationtech.jts.geom.Point;
@@ -98,7 +98,8 @@ public Optional<Set<String>> getSourceFields() {
9898

9999
@Override
100100
public Optional<Point> getCoordinate(int id) {
101-
List<CoordinateValue> values = executeQueryToList(queryForPoint, ps -> ps.setInt(1, id));
101+
List<CoordinateValue> values =
102+
executeQueryToStream(queryForPoint, ps -> ps.setInt(1, id)).toList();
102103

103104
if (values.isEmpty()) {
104105
return Optional.empty();
@@ -111,15 +112,14 @@ public Optional<Point> getCoordinate(int id) {
111112
public Collection<Point> getCoordinates(int... ids) {
112113
Object[] idSet = Arrays.stream(ids).boxed().distinct().toArray();
113114

114-
List<CoordinateValue> values =
115-
executeQueryToList(
115+
return executeQueryToStream(
116116
queryForPoints,
117117
ps -> {
118118
Array sqlArray = ps.getConnection().createArrayOf("int", idSet);
119119
ps.setArray(1, sqlArray);
120-
});
121-
122-
return values.stream().map(value -> value.coordinate).toList();
120+
})
121+
.map(value -> value.coordinate)
122+
.toList();
123123
}
124124

125125
@Override
@@ -128,12 +128,13 @@ public Optional<Integer> getId(Point coordinate) {
128128
double longitude = coordinate.getX();
129129

130130
List<CoordinateValue> values =
131-
executeQueryToList(
132-
queryForId,
133-
ps -> {
134-
ps.setDouble(1, longitude);
135-
ps.setDouble(2, latitude);
136-
});
131+
executeQueryToStream(
132+
queryForId,
133+
ps -> {
134+
ps.setDouble(1, longitude);
135+
ps.setDouble(2, latitude);
136+
})
137+
.toList();
137138

138139
if (values.isEmpty()) {
139140
return Optional.empty();
@@ -144,23 +145,21 @@ public Optional<Integer> getId(Point coordinate) {
144145

145146
@Override
146147
public Collection<Point> getAllCoordinates() {
147-
List<CoordinateValue> values = executeQueryToList(basicQuery + ";", PreparedStatement::execute);
148-
149-
return values.stream().map(value -> value.coordinate).toList();
148+
return executeQueryToStream(basicQuery + ";").map(value -> value.coordinate).toList();
150149
}
151150

152151
@Override
153152
public List<CoordinateDistance> getNearestCoordinates(Point coordinate, int n) {
154-
List<CoordinateValue> values =
155-
executeQueryToList(
156-
queryForNearestPoints,
157-
ps -> {
158-
ps.setDouble(1, coordinate.getX());
159-
ps.setDouble(2, coordinate.getY());
160-
ps.setInt(3, n);
161-
});
162-
163-
List<Point> points = values.stream().map(value -> value.coordinate).toList();
153+
List<Point> points =
154+
executeQueryToStream(
155+
queryForNearestPoints,
156+
ps -> {
157+
ps.setDouble(1, coordinate.getX());
158+
ps.setDouble(2, coordinate.getY());
159+
ps.setInt(3, n);
160+
})
161+
.map(value -> value.coordinate)
162+
.toList();
164163
return calculateCoordinateDistances(coordinate, n, points);
165164
}
166165

@@ -185,15 +184,14 @@ private List<Point> getCoordinatesInBoundingBox(
185184
Point coordinate, ComparableQuantity<Length> distance) {
186185
Envelope envelope = GeoUtils.calculateBoundingBox(coordinate, distance);
187186

188-
return executeQueryToList(
187+
return executeQueryToStream(
189188
queryForBoundingBox,
190189
ps -> {
191190
ps.setDouble(1, envelope.getMinX());
192191
ps.setDouble(2, envelope.getMinY());
193192
ps.setDouble(3, envelope.getMaxX());
194193
ps.setDouble(4, envelope.getMaxY());
195194
})
196-
.stream()
197195
.map(value -> value.coordinate)
198196
.toList();
199197
}
@@ -208,9 +206,13 @@ private CoordinateValue createCoordinateValue(Map<String, String> fieldToValues)
208206
return new CoordinateValue(idCoordinate.id(), idCoordinate.point());
209207
}
210208

211-
private List<CoordinateValue> executeQueryToList(
209+
private Stream<CoordinateValue> executeQueryToStream(String query) {
210+
return dataSource.executeQuery(query).map(this::createCoordinateValue);
211+
}
212+
213+
private Stream<CoordinateValue> executeQueryToStream(
212214
String query, SqlDataSource.AddParams addParams) {
213-
return dataSource.executeQuery(query, addParams).map(this::createCoordinateValue).toList();
215+
return dataSource.executeQuery(query, addParams).map(this::createCoordinateValue);
214216
}
215217

216218
/**

src/test/groovy/edu/ie3/datamodel/io/connectors/SqlConnectorIT.groovy

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,15 @@ class SqlConnectorIT extends Specification implements TestContainerHelper {
121121
def "A SQL connector is able to extract all field to value maps from result set"() {
122122
given:
123123
def preparedStatement = connector.getConnection(false).prepareStatement("SELECT * FROM public.test;")
124-
def resultSet = preparedStatement.executeQuery()
125124

126125
when:
127-
def actual = connector.extractFieldMaps(resultSet)
126+
def actual = connector.toStream(preparedStatement, 1).toList()
128127

129128
then:
130129
actual.size() == 2
131130

132131
cleanup:
133-
resultSet.close()
132+
preparedStatement.close()
134133
}
135134

136135
def "A SQL connector shuts down correctly, if no connection was opened"() {

0 commit comments

Comments
 (0)