|
7 | 7 |
|
8 | 8 | import java.sql.ResultSet; |
9 | 9 | import java.sql.SQLException; |
| 10 | +import java.util.List; |
10 | 11 | import java.util.Objects; |
11 | 12 | import java.util.concurrent.CompletableFuture; |
12 | 13 | import java.util.concurrent.CompletionStage; |
13 | 14 | import java.util.function.Consumer; |
| 15 | +import java.util.function.Supplier; |
14 | 16 |
|
15 | 17 | import org.hibernate.engine.jdbc.internal.FormatStyle; |
16 | 18 | import org.hibernate.engine.jdbc.spi.SqlExceptionHelper; |
|
26 | 28 | import io.vertx.sqlclient.RowSet; |
27 | 29 | import io.vertx.sqlclient.SqlConnection; |
28 | 30 | import io.vertx.sqlclient.Tuple; |
| 31 | +import io.vertx.sqlclient.spi.DatabaseMetadata; |
29 | 32 |
|
| 33 | +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; |
30 | 34 | import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; |
| 35 | +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; |
31 | 36 |
|
32 | 37 | /** |
33 | 38 | * A pool of reactive connections backed by a supplier of |
@@ -72,13 +77,31 @@ public abstract class SqlClientPool implements ReactiveConnectionPool { |
72 | 77 | * subclasses which support multitenancy. |
73 | 78 | * |
74 | 79 | * @param tenantId the id of the tenant |
| 80 | + * |
75 | 81 | * @throws UnsupportedOperationException if multitenancy is not supported |
76 | 82 | * @see ReactiveConnectionPool#getConnection(String) |
77 | 83 | */ |
78 | 84 | protected Pool getTenantPool(String tenantId) { |
79 | 85 | throw new UnsupportedOperationException( "multitenancy not supported by built-in SqlClientPool" ); |
80 | 86 | } |
81 | 87 |
|
| 88 | + @Override |
| 89 | + public ReactiveConnection getProxyConnection() { |
| 90 | + return new ProxyConnection( this::getConnection ); |
| 91 | + } |
| 92 | + |
| 93 | + @Override |
| 94 | + public ReactiveConnection getProxyConnection(String tenantId) { |
| 95 | + return tenantId == null |
| 96 | + ? new ProxyConnection( this::getConnection ) |
| 97 | + : new ProxyConnection( () -> getConnection( tenantId ) ); |
| 98 | + } |
| 99 | + |
| 100 | + @Override |
| 101 | + public ReactiveConnection getProxyConnection(SqlExceptionHelper sqlExceptionHelper) { |
| 102 | + return new ProxyConnection( () -> getConnection( sqlExceptionHelper ) ); |
| 103 | + } |
| 104 | + |
82 | 105 | @Override |
83 | 106 | public CompletionStage<ReactiveConnection> getConnection() { |
84 | 107 | return getConnectionFromPool( getPool() ); |
@@ -143,10 +166,13 @@ private <T> T convertException(T rows, String sql, Throwable sqlException) { |
143 | 166 | if ( sqlException == null ) { |
144 | 167 | return rows; |
145 | 168 | } |
146 | | - if ( sqlException instanceof DatabaseException ) { |
147 | | - DatabaseException de = (DatabaseException) sqlException; |
| 169 | + if ( sqlException instanceof DatabaseException de ) { |
148 | 170 | sqlException = getSqlExceptionHelper() |
149 | | - .convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql ); |
| 171 | + .convert( |
| 172 | + new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), |
| 173 | + "error executing SQL statement", |
| 174 | + sql |
| 175 | + ); |
150 | 176 | } |
151 | 177 | return rethrow( sqlException ); |
152 | 178 | } |
@@ -186,4 +212,155 @@ private SqlClientConnection newConnection(SqlConnection connection) { |
186 | 212 | private SqlClientConnection newConnection(SqlConnection connection, SqlExceptionHelper sqlExceptionHelper) { |
187 | 213 | return new SqlClientConnection( connection, getPool(), getSqlStatementLogger(), sqlExceptionHelper ); |
188 | 214 | } |
| 215 | + |
| 216 | + private static class ProxyConnection implements ReactiveConnection { |
| 217 | + private final Supplier<CompletionStage<ReactiveConnection>> connectionSupplier; |
| 218 | + private Integer batchSize; |
| 219 | + private ReactiveConnection connection; |
| 220 | + |
| 221 | + public ProxyConnection(Supplier<CompletionStage<ReactiveConnection>> connectionSupplier) { |
| 222 | + this.connectionSupplier = connectionSupplier; |
| 223 | + } |
| 224 | + |
| 225 | + /** |
| 226 | + * @return the existing {@link ReactiveConnection}, or open a new one |
| 227 | + */ |
| 228 | + CompletionStage<ReactiveConnection> connection() { |
| 229 | + if ( connection == null ) { |
| 230 | + return connectionSupplier.get() |
| 231 | + .thenApply( conn -> { |
| 232 | + if ( batchSize != null ) { |
| 233 | + conn.withBatchSize( batchSize ); |
| 234 | + } |
| 235 | + connection = conn; |
| 236 | + return connection; |
| 237 | + } ); |
| 238 | + } |
| 239 | + return completedFuture( connection ); |
| 240 | + } |
| 241 | + |
| 242 | + @Override |
| 243 | + public boolean isTransactionInProgress() { |
| 244 | + return connection != null && connection.isTransactionInProgress(); |
| 245 | + } |
| 246 | + |
| 247 | + @Override |
| 248 | + public DatabaseMetadata getDatabaseMetadata() { |
| 249 | + Objects.requireNonNull( connection, "Database metadata not available until the connection is opened" ); |
| 250 | + return connection.getDatabaseMetadata(); |
| 251 | + } |
| 252 | + |
| 253 | + @Override |
| 254 | + public CompletionStage<Void> execute(String sql) { |
| 255 | + return connection().thenCompose( conn -> conn.execute( sql ) ); |
| 256 | + } |
| 257 | + |
| 258 | + @Override |
| 259 | + public CompletionStage<Void> executeOutsideTransaction(String sql) { |
| 260 | + return connection().thenCompose( conn -> conn.executeOutsideTransaction( sql ) ); |
| 261 | + } |
| 262 | + |
| 263 | + @Override |
| 264 | + public CompletionStage<Void> executeUnprepared(String sql) { |
| 265 | + return connection().thenCompose( conn -> conn.executeUnprepared( sql ) ); |
| 266 | + } |
| 267 | + |
| 268 | + @Override |
| 269 | + public CompletionStage<Integer> update(String sql) { |
| 270 | + return connection().thenCompose( conn -> conn.update( sql ) ); |
| 271 | + } |
| 272 | + |
| 273 | + @Override |
| 274 | + public CompletionStage<Integer> update(String sql, Object[] paramValues) { |
| 275 | + return connection().thenCompose( conn -> conn.update( sql, paramValues ) ); |
| 276 | + } |
| 277 | + |
| 278 | + @Override |
| 279 | + public CompletionStage<Void> update(String sql, Object[] paramValues, boolean allowBatching, Expectation expectation) { |
| 280 | + return connection().thenCompose( conn -> conn.update( sql, paramValues, allowBatching, expectation ) ); |
| 281 | + } |
| 282 | + |
| 283 | + @Override |
| 284 | + public CompletionStage<int[]> update(String sql, List<Object[]> paramValues) { |
| 285 | + return connection().thenCompose( conn -> conn.update( sql, paramValues ) ); |
| 286 | + } |
| 287 | + |
| 288 | + @Override |
| 289 | + public CompletionStage<Result> select(String sql) { |
| 290 | + return connection().thenCompose( conn -> conn.select( sql ) ); |
| 291 | + } |
| 292 | + |
| 293 | + @Override |
| 294 | + public CompletionStage<Result> select(String sql, Object[] paramValues) { |
| 295 | + return connection().thenCompose( conn -> conn.select( sql ) ); |
| 296 | + } |
| 297 | + |
| 298 | + @Override |
| 299 | + public CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues) { |
| 300 | + return connection().thenCompose( conn -> conn.selectJdbc( sql, paramValues ) ); |
| 301 | + } |
| 302 | + |
| 303 | + @Override |
| 304 | + public <T> CompletionStage<T> insertAndSelectIdentifier( |
| 305 | + String sql, |
| 306 | + Object[] paramValues, |
| 307 | + Class<T> idClass, |
| 308 | + String idColumnName) { |
| 309 | + return connection().thenCompose( conn -> conn |
| 310 | + .insertAndSelectIdentifier( sql, paramValues, idClass, idColumnName ) ); |
| 311 | + } |
| 312 | + |
| 313 | + @Override |
| 314 | + public CompletionStage<ResultSet> insertAndSelectIdentifierAsResultSet( |
| 315 | + String sql, |
| 316 | + Object[] paramValues, |
| 317 | + Class<?> idClass, |
| 318 | + String idColumnName) { |
| 319 | + return connection().thenCompose( conn -> conn |
| 320 | + .insertAndSelectIdentifierAsResultSet( sql, paramValues, idClass, idColumnName ) ); |
| 321 | + } |
| 322 | + |
| 323 | + @Override |
| 324 | + public <T> CompletionStage<T> selectIdentifier(String sql, Object[] paramValues, Class<T> idClass) { |
| 325 | + return connection().thenCompose( conn -> conn.selectIdentifier( sql, paramValues, idClass ) ); |
| 326 | + } |
| 327 | + |
| 328 | + @Override |
| 329 | + public CompletionStage<Void> beginTransaction() { |
| 330 | + return connection().thenCompose( ReactiveConnection::beginTransaction ); |
| 331 | + } |
| 332 | + |
| 333 | + @Override |
| 334 | + public CompletionStage<Void> commitTransaction() { |
| 335 | + return connection().thenCompose( ReactiveConnection::commitTransaction ); |
| 336 | + } |
| 337 | + |
| 338 | + @Override |
| 339 | + public CompletionStage<Void> rollbackTransaction() { |
| 340 | + return connection().thenCompose( ReactiveConnection::rollbackTransaction ); |
| 341 | + } |
| 342 | + |
| 343 | + @Override |
| 344 | + public ReactiveConnection withBatchSize(int batchSize) { |
| 345 | + if ( connection == null ) { |
| 346 | + this.batchSize = batchSize; |
| 347 | + } |
| 348 | + else { |
| 349 | + connection = connection.withBatchSize( batchSize ); |
| 350 | + } |
| 351 | + return this; |
| 352 | + } |
| 353 | + |
| 354 | + @Override |
| 355 | + public CompletionStage<Void> executeBatch() { |
| 356 | + return connection().thenCompose( ReactiveConnection::executeBatch ); |
| 357 | + } |
| 358 | + |
| 359 | + @Override |
| 360 | + public CompletionStage<Void> close() { |
| 361 | + return connection != null |
| 362 | + ? connection.close().thenAccept( v -> connection = null ) |
| 363 | + : voidFuture(); |
| 364 | + } |
| 365 | + } |
189 | 366 | } |
0 commit comments