15
15
*/
16
16
package org .springframework .data .cassandra .core .cql ;
17
17
18
- import reactor .core .publisher .Flux ;
19
- import reactor .core .publisher .Mono ;
20
-
21
18
import java .util .Map ;
22
19
import java .util .function .Function ;
23
20
24
- import org .reactivestreams .Publisher ;
25
-
26
- import org .springframework .dao .DataAccessException ;
27
- import org .springframework .dao .support .DataAccessUtils ;
28
- import org .springframework .data .cassandra .ReactiveResultSet ;
29
- import org .springframework .data .cassandra .ReactiveSession ;
30
- import org .springframework .data .cassandra .ReactiveSessionFactory ;
31
- import org .springframework .data .cassandra .core .cql .session .DefaultReactiveSessionFactory ;
32
- import org .springframework .lang .Nullable ;
33
- import org .springframework .util .Assert ;
34
-
35
21
import com .datastax .oss .driver .api .core .ConsistencyLevel ;
36
22
import com .datastax .oss .driver .api .core .CqlIdentifier ;
37
23
import com .datastax .oss .driver .api .core .CqlSession ;
43
29
import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
44
30
import com .datastax .oss .driver .api .core .cql .Statement ;
45
31
import com .datastax .oss .driver .api .core .retry .RetryPolicy ;
32
+ import org .reactivestreams .Publisher ;
33
+ import reactor .core .publisher .Flux ;
34
+ import reactor .core .publisher .Mono ;
35
+
36
+ import org .springframework .dao .DataAccessException ;
37
+ import org .springframework .dao .support .DataAccessUtils ;
38
+ import org .springframework .data .cassandra .ReactiveResultSet ;
39
+ import org .springframework .data .cassandra .ReactiveSession ;
40
+ import org .springframework .data .cassandra .ReactiveSessionFactory ;
41
+ import org .springframework .data .cassandra .core .cql .session .DefaultReactiveSessionFactory ;
42
+ import org .springframework .lang .Nullable ;
43
+ import org .springframework .util .Assert ;
46
44
47
45
/**
48
46
* <b>This is the central class in the CQL core package for reactive Cassandra data access.</b> It simplifies the use of
@@ -302,7 +300,7 @@ public <T> Flux<T> execute(ReactiveSessionCallback<T> action) throws DataAccessE
302
300
303
301
Assert .notNull (action , "Callback object must not be null" );
304
302
305
- return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , getCql (action )));
303
+ return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , toCql (action )));
306
304
}
307
305
308
306
// -------------------------------------------------------------------------
@@ -439,11 +437,12 @@ public <T> Flux<T> query(Statement<?> statement, ReactiveResultSetExtractor<T> r
439
437
return createFlux (statement , (session , stmt ) -> {
440
438
441
439
if (logger .isDebugEnabled ()) {
442
- logger .debug ("Executing statement [{} ]" , QueryExtractorDelegate . getCql (statement ));
440
+ logger .debug (String . format ( "Executing statement [%s ]" , toCql (statement ) ));
443
441
}
444
442
445
- return session .execute (applyStatementSettings (statement )).flatMapMany (rse ::extractData );
446
- }).onErrorMap (translateException ("Query" , statement .toString ()));
443
+ return session .execute (applyStatementSettings (statement ))
444
+ .flatMapMany (rse ::extractData );
445
+ }).onErrorMap (translateException ("Query" , toCql (statement )));
447
446
}
448
447
449
448
/* (non-Javadoc)
@@ -506,17 +505,17 @@ public Mono<ReactiveResultSet> queryForResultSet(Statement<?> statement) throws
506
505
return createMono (statement , (session , executedStatement ) -> {
507
506
508
507
if (logger .isDebugEnabled ()) {
509
- logger .debug ("Executing statement [{} ]" , QueryExtractorDelegate . getCql (statement ));
508
+ logger .debug (String . format ( "Executing statement [%s ]" , toCql (statement ) ));
510
509
}
511
510
512
511
return session .execute (applyStatementSettings (executedStatement ));
513
- }).onErrorMap (translateException ("QueryForResultSet" , statement . toString ( )));
512
+ }).onErrorMap (translateException ("QueryForResultSet" , toCql ( statement )));
514
513
}
515
514
516
515
@ Override
517
516
public Flux <Row > queryForRows (Statement <?> statement ) throws DataAccessException {
518
517
return queryForResultSet (statement ).flatMapMany (ReactiveResultSet ::rows )
519
- .onErrorMap (translateException ("QueryForRows" , statement . toString ( )));
518
+ .onErrorMap (translateException ("QueryForRows" , toCql ( statement )));
520
519
}
521
520
522
521
// -------------------------------------------------------------------------
@@ -535,10 +534,11 @@ public <T> Flux<T> execute(ReactivePreparedStatementCreator psc, ReactivePrepare
535
534
536
535
return createFlux (session -> {
537
536
538
- logger .debug ("Preparing statement [{} ] using {} " , getCql (psc ), psc );
537
+ logger .debug (String . format ( "Preparing statement [%s ] using %s " , toCql (psc ), psc ) );
539
538
540
- return psc .createPreparedStatement (session ).flatMapMany (ps -> action .doInPreparedStatement (session , ps ));
541
- }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , getCql (psc )));
539
+ return psc .createPreparedStatement (session )
540
+ .flatMapMany (ps -> action .doInPreparedStatement (session , ps ));
541
+ }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , toCql (psc )));
542
542
}
543
543
544
544
/* (non-Javadoc)
@@ -569,15 +569,15 @@ public <T> Flux<T> query(ReactivePreparedStatementCreator psc,
569
569
return execute (psc , (session , preparedStatement ) -> Mono .just (preparedStatement ).flatMapMany (pps -> {
570
570
571
571
if (logger .isDebugEnabled ()) {
572
- logger .debug ("Executing prepared statement [{} ]" , QueryExtractorDelegate . getCql (preparedStatement ));
572
+ logger .debug (String . format ( "Executing prepared statement [%s ]" , toCql (preparedStatement ) ));
573
573
}
574
574
575
575
BoundStatement boundStatement = (preparedStatementBinder != null
576
576
? preparedStatementBinder .bindValues (preparedStatement )
577
577
: preparedStatement .bind ());
578
578
579
579
return session .execute (applyStatementSettings (boundStatement ));
580
- }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , getCql (psc )));
580
+ }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , toCql (psc )));
581
581
}
582
582
583
583
/* (non-Javadoc)
@@ -737,7 +737,7 @@ public Flux<Boolean> execute(String cql, Publisher<Object[]> args) throws DataAc
737
737
return execute (newReactivePreparedStatementCreator (cql ), (session , ps ) -> Flux .from (args ).flatMap (objects -> {
738
738
739
739
if (logger .isDebugEnabled ()) {
740
- logger .debug ("Executing prepared CQL statement [{} ]" , cql );
740
+ logger .debug (String . format ( "Executing prepared CQL statement [%s ]" , cql ) );
741
741
}
742
742
743
743
BoundStatement boundStatement = newArgPreparedStatementBinder (objects ).bindValues (ps );
@@ -908,18 +908,6 @@ private Mono<ReactiveSession> getSession() {
908
908
return sessionFactory .getSession ();
909
909
}
910
910
911
- /**
912
- * Determine CQL from potential provider object.
913
- *
914
- * @param cqlProvider object that's potentially a {@link CqlProvider}
915
- * @return the CQL string, or {@literal null}
916
- * @see CqlProvider
917
- */
918
- @ Nullable
919
- private static String getCql (@ Nullable Object cqlProvider ) {
920
- return QueryExtractorDelegate .getCql (cqlProvider );
921
- }
922
-
923
911
static class SimpleReactivePreparedStatementCreator implements ReactivePreparedStatementCreator , CqlProvider {
924
912
925
913
private final SimpleStatement statement ;
0 commit comments