Skip to content

Commit b3c209a

Browse files
committed
Improving the usage of streams on SQL fetches.
1 parent 0e3d14f commit b3c209a

File tree

4 files changed

+56
-21
lines changed

4 files changed

+56
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5050
- Removing `uuid` as required column from input and result time series [#826](https://github.com/ie3-institute/PowerSystemDataModel/issues/826)
5151
- Removing the support for the old csv format that was marked `deprecated` back in version `1.1.0` [#795](https://github.com/ie3-institute/PowerSystemDataModel/issues/795)
5252
- BREAKING: Updating PowerSystemUtils dependency to 2.2 [#1006](https://github.com/ie3-institute/PowerSystemDataModel/issues/1006)
53+
- Improving usage of streams on sql fetches [#827](https://github.com/ie3-institute/PowerSystemDataModel/issues/827)
5354

5455
## [4.1.0] - 2023-11-02
5556

src/main/java/edu/ie3/datamodel/io/connectors/SqlConnector.java

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import edu.ie3.util.TimeUtil;
1010
import java.sql.*;
1111
import java.util.*;
12+
import java.util.stream.Stream;
13+
import java.util.stream.StreamSupport;
1214
import org.slf4j.Logger;
1315
import org.slf4j.LoggerFactory;
1416

@@ -66,7 +68,9 @@ public ResultSet executeQuery(Statement stmt, String query) throws SQLException
6668
*/
6769
public int executeUpdate(String updateQuery) {
6870
try (Statement stmt = getConnection().createStatement()) {
69-
return stmt.executeUpdate(updateQuery);
71+
int res = stmt.executeUpdate(updateQuery);
72+
getConnection().commit();
73+
return res;
7074
} catch (SQLException e) {
7175
log.error(String.format("Error at execution of query \"%1.127s\": ", updateQuery), e);
7276
return -1;
@@ -85,7 +89,8 @@ public Connection getConnection() throws SQLException {
8589
}
8690

8791
/**
88-
* Establishes and returns a database connection
92+
* Establishes and returns a database connection. The {@link Connection#getAutoCommit()} is set to
93+
* {@code false}.
8994
*
9095
* @param reuseConnection should the connection be used again, if it is still valid? If not, a new
9196
* connection will be established
@@ -98,6 +103,7 @@ public Connection getConnection(boolean reuseConnection) throws SQLException {
98103
if (connection != null) connection.close();
99104

100105
connection = DriverManager.getConnection(jdbcUrl, connectionProps);
106+
connection.setAutoCommit(false);
101107
} catch (SQLException e) {
102108
throw new SQLException("Could not establish connection: ", e);
103109
}
@@ -115,21 +121,47 @@ public void shutdown() {
115121
}
116122

117123
/**
118-
* Extracts all field to value maps from the ResultSet, one for each row
124+
* Method to execute a {@link PreparedStatement} and return its result as a stream.
119125
*
120-
* @param rs the ResultSet to use
121-
* @return a list of field maps
126+
* @param ps to execute
127+
* @param fetchSize used for {@link PreparedStatement#setFetchSize(int)}
128+
* @return a stream of maps
129+
* @throws SQLException if an exception occurred while executing the query
122130
*/
123-
public List<Map<String, String>> extractFieldMaps(ResultSet rs) {
124-
List<Map<String, String>> fieldMaps = new ArrayList<>();
125-
try {
126-
while (rs.next()) {
127-
fieldMaps.add(extractFieldMap(rs));
131+
public Stream<Map<String, String>> toStream(PreparedStatement ps, int fetchSize)
132+
throws SQLException {
133+
ps.setFetchSize(fetchSize);
134+
Iterator<Map<String, String>> sqlIterator = getSqlIterator(ps.executeQuery());
135+
136+
return StreamSupport.stream(
137+
Spliterators.spliteratorUnknownSize(
138+
sqlIterator, Spliterator.NONNULL | Spliterator.IMMUTABLE),
139+
true);
140+
}
141+
142+
/**
143+
* Returns an {@link Iterator} for the given {@link ResultSet}.
144+
*
145+
* @param rs given result set
146+
* @return an iterator
147+
*/
148+
public Iterator<Map<String, String>> getSqlIterator(ResultSet rs) {
149+
return new Iterator<>() {
150+
@Override
151+
public boolean hasNext() {
152+
try {
153+
return rs.next();
154+
} catch (SQLException e) {
155+
log.error("Exception at extracting next ResultSet: ", e);
156+
throw new RuntimeException(e);
157+
}
128158
}
129-
} catch (SQLException e) {
130-
log.error("Exception at extracting ResultSet: ", e);
131-
}
132-
return fieldMaps;
159+
160+
@Override
161+
public Map<String, String> next() {
162+
return extractFieldMap(rs);
163+
}
164+
};
133165
}
134166

135167
/**

src/main/java/edu/ie3/datamodel/io/source/sql/SqlDataSource.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ public Optional<Set<String>> getSourceFields(String tableName) {
125125
ResultSet rs =
126126
connector.getConnection().getMetaData().getColumns(null, null, tableName, null);
127127
Set<String> columnNames = new HashSet<>();
128-
129128
while (rs.next()) {
130129
String name = rs.getString("COLUMN_NAME");
131130
columnNames.add(StringUtils.snakeCaseToCamelCase(name));
@@ -177,11 +176,15 @@ protected Stream<Map<String, String>> buildStreamByTableName(String tableName) {
177176
* table name.
178177
*/
179178
protected Stream<Map<String, String>> executeQuery(String query, AddParams addParams) {
180-
try (PreparedStatement ps = connector.getConnection().prepareStatement(query)) {
179+
try {
180+
PreparedStatement ps = connector.getConnection().prepareStatement(query);
181181
addParams.addParams(ps);
182182

183-
ResultSet resultSet = ps.executeQuery();
184-
return connector.extractFieldMaps(resultSet).stream();
183+
Stream<Map<String, String>> stream = connector.toStream(ps, 1000);
184+
185+
// don't work with `try with resource`, but is necessary to get results
186+
ps.closeOnCompletion();
187+
return stream;
185188
} catch (SQLException e) {
186189
log.error("Error during execution of query {}", query, e);
187190
}

src/test/groovy/edu/ie3/datamodel/io/connectors/SqlConnectorIT.groovy

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,15 @@ class SqlConnectorIT extends Specification implements TestContainerHelper {
121121
def "A SQL connector is able to extract all field to value maps from result set"() {
122122
given:
123123
def preparedStatement = connector.getConnection(false).prepareStatement("SELECT * FROM public.test;")
124-
def resultSet = preparedStatement.executeQuery()
125124

126125
when:
127-
def actual = connector.extractFieldMaps(resultSet)
126+
def actual = connector.toStream(preparedStatement, 1).toList()
128127

129128
then:
130129
actual.size() == 2
131130

132131
cleanup:
133-
resultSet.close()
132+
preparedStatement.close()
134133
}
135134

136135
def "A SQL connector shuts down correctly, if no connection was opened"() {

0 commit comments

Comments
 (0)