Skip to content

Commit 7f7ed92

Browse files
committed
Use nested classes for fetch size retrieval in session callbacks.
We now use nested classes implementing CqlProvider to retrieve the fetch size in session callbacks. This enables CQL retrieval. Previously, CQL retrieval saw a class that didn't implement CqlProvider and reported therefore unknown CQL. See #1186
1 parent d9ddf0d commit 7f7ed92

File tree

7 files changed

+143
-117
lines changed

7 files changed

+143
-117
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,24 @@
2222
import java.util.stream.Collectors;
2323
import java.util.stream.StreamSupport;
2424

25+
import com.datastax.oss.driver.api.core.CqlIdentifier;
26+
import com.datastax.oss.driver.api.core.CqlSession;
27+
import com.datastax.oss.driver.api.core.DriverException;
28+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
29+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
30+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
31+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
32+
import com.datastax.oss.driver.api.core.cql.ResultSet;
33+
import com.datastax.oss.driver.api.core.cql.Row;
34+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
35+
import com.datastax.oss.driver.api.core.cql.Statement;
36+
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
37+
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
38+
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
39+
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
40+
import com.datastax.oss.driver.api.querybuilder.select.Select;
41+
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
42+
import com.datastax.oss.driver.api.querybuilder.update.Update;
2543
import org.slf4j.Logger;
2644
import org.slf4j.LoggerFactory;
2745

@@ -63,25 +81,6 @@
6381
import org.springframework.util.Assert;
6482
import org.springframework.util.concurrent.ListenableFuture;
6583

66-
import com.datastax.oss.driver.api.core.CqlIdentifier;
67-
import com.datastax.oss.driver.api.core.CqlSession;
68-
import com.datastax.oss.driver.api.core.DriverException;
69-
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
70-
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
71-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
72-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
73-
import com.datastax.oss.driver.api.core.cql.ResultSet;
74-
import com.datastax.oss.driver.api.core.cql.Row;
75-
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
76-
import com.datastax.oss.driver.api.core.cql.Statement;
77-
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
78-
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
79-
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
80-
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
81-
import com.datastax.oss.driver.api.querybuilder.select.Select;
82-
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
83-
import com.datastax.oss.driver.api.querybuilder.update.Update;
84-
8584
/**
8685
* Primary implementation of {@link AsyncCassandraOperations}. It simplifies the use of asynchronous Cassandra usage and
8786
* helps to avoid common errors. It executes core Cassandra workflow. This class executes CQL queries or updates,
@@ -933,9 +932,20 @@ private int getEffectivePageSize(Statement<?> statement) {
933932
return accessor.getFetchSize();
934933
}
935934
}
935+
class GetConfiguredPageSize implements AsyncSessionCallback<Integer>, CqlProvider {
936+
@Override
937+
public ListenableFuture<Integer> doInSession(CqlSession session) {
938+
return AsyncResult.forValue(getConfiguredPageSize(session));
939+
}
940+
941+
@Override
942+
public String getCql() {
943+
return QueryExtractorDelegate.getCql(statement);
944+
}
945+
}
936946

937947
return getAsyncCqlOperations()
938-
.execute((AsyncSessionCallback<Integer>) session -> AsyncResult.forValue(getConfiguredPageSize(session)))
948+
.execute(new GetConfiguredPageSize())
939949
.completable().join();
940950
}
941951

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,24 @@
2020
import java.util.function.Function;
2121
import java.util.stream.Stream;
2222

23+
import com.datastax.oss.driver.api.core.CqlIdentifier;
24+
import com.datastax.oss.driver.api.core.CqlSession;
25+
import com.datastax.oss.driver.api.core.DriverException;
26+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
27+
import com.datastax.oss.driver.api.core.cql.BatchType;
28+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
29+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
30+
import com.datastax.oss.driver.api.core.cql.ResultSet;
31+
import com.datastax.oss.driver.api.core.cql.Row;
32+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
33+
import com.datastax.oss.driver.api.core.cql.Statement;
34+
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
35+
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
36+
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
37+
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
38+
import com.datastax.oss.driver.api.querybuilder.select.Select;
39+
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
40+
import com.datastax.oss.driver.api.querybuilder.update.Update;
2341
import org.slf4j.Logger;
2442
import org.slf4j.LoggerFactory;
2543

@@ -37,16 +55,7 @@
3755
import org.springframework.data.cassandra.core.convert.MappingCassandraConverter;
3856
import org.springframework.data.cassandra.core.convert.QueryMapper;
3957
import org.springframework.data.cassandra.core.convert.UpdateMapper;
40-
import org.springframework.data.cassandra.core.cql.CassandraAccessor;
41-
import org.springframework.data.cassandra.core.cql.CqlOperations;
42-
import org.springframework.data.cassandra.core.cql.CqlProvider;
43-
import org.springframework.data.cassandra.core.cql.CqlTemplate;
44-
import org.springframework.data.cassandra.core.cql.PreparedStatementBinder;
45-
import org.springframework.data.cassandra.core.cql.PreparedStatementCreator;
46-
import org.springframework.data.cassandra.core.cql.QueryOptions;
47-
import org.springframework.data.cassandra.core.cql.RowMapper;
48-
import org.springframework.data.cassandra.core.cql.SingleColumnRowMapper;
49-
import org.springframework.data.cassandra.core.cql.WriteOptions;
58+
import org.springframework.data.cassandra.core.cql.*;
5059
import org.springframework.data.cassandra.core.cql.session.DefaultSessionFactory;
5160
import org.springframework.data.cassandra.core.cql.util.StatementBuilder;
5261
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
@@ -69,25 +78,6 @@
6978
import org.springframework.lang.Nullable;
7079
import org.springframework.util.Assert;
7180

72-
import com.datastax.oss.driver.api.core.CqlIdentifier;
73-
import com.datastax.oss.driver.api.core.CqlSession;
74-
import com.datastax.oss.driver.api.core.DriverException;
75-
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
76-
import com.datastax.oss.driver.api.core.cql.BatchType;
77-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
78-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
79-
import com.datastax.oss.driver.api.core.cql.ResultSet;
80-
import com.datastax.oss.driver.api.core.cql.Row;
81-
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
82-
import com.datastax.oss.driver.api.core.cql.Statement;
83-
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
84-
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
85-
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
86-
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
87-
import com.datastax.oss.driver.api.querybuilder.select.Select;
88-
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
89-
import com.datastax.oss.driver.api.querybuilder.update.Update;
90-
9181
/**
9282
* Primary implementation of {@link CassandraOperations}. It simplifies the use of Cassandra usage and helps to avoid
9383
* common errors. It executes core Cassandra workflow. This class executes CQL queries or updates, initiating iteration
@@ -992,7 +982,19 @@ private int getEffectivePageSize(Statement<?> statement) {
992982
}
993983
}
994984

995-
return getCqlOperations().execute(this::getConfiguredPageSize);
985+
class GetConfiguredPageSize implements SessionCallback<Integer>, CqlProvider {
986+
@Override
987+
public Integer doInSession(CqlSession session) {
988+
return getConfiguredPageSize(session);
989+
}
990+
991+
@Override
992+
public String getCql() {
993+
return QueryExtractorDelegate.getCql(statement);
994+
}
995+
}
996+
997+
return getCqlOperations().execute(new GetConfiguredPageSize());
996998
}
997999

9981000
@SuppressWarnings("unchecked")

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraTemplate.java

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,33 @@
1515
*/
1616
package org.springframework.data.cassandra.core;
1717

18-
import reactor.core.publisher.Flux;
19-
import reactor.core.publisher.Mono;
20-
import reactor.core.publisher.SynchronousSink;
21-
2218
import java.util.Collections;
2319
import java.util.function.BiConsumer;
2420
import java.util.function.Function;
2521

22+
import com.datastax.oss.driver.api.core.CqlIdentifier;
23+
import com.datastax.oss.driver.api.core.DriverException;
24+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
25+
import com.datastax.oss.driver.api.core.context.DriverContext;
26+
import com.datastax.oss.driver.api.core.cql.BatchType;
27+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
28+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
29+
import com.datastax.oss.driver.api.core.cql.Row;
30+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
31+
import com.datastax.oss.driver.api.core.cql.Statement;
32+
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
33+
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
34+
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
35+
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
36+
import com.datastax.oss.driver.api.querybuilder.select.Select;
37+
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
38+
import com.datastax.oss.driver.api.querybuilder.update.Update;
39+
import org.reactivestreams.Publisher;
2640
import org.slf4j.Logger;
2741
import org.slf4j.LoggerFactory;
42+
import reactor.core.publisher.Flux;
43+
import reactor.core.publisher.Mono;
44+
import reactor.core.publisher.SynchronousSink;
2845

2946
import org.springframework.beans.BeansException;
3047
import org.springframework.context.ApplicationContext;
@@ -65,24 +82,6 @@
6582
import org.springframework.lang.Nullable;
6683
import org.springframework.util.Assert;
6784

68-
import com.datastax.oss.driver.api.core.CqlIdentifier;
69-
import com.datastax.oss.driver.api.core.DriverException;
70-
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
71-
import com.datastax.oss.driver.api.core.context.DriverContext;
72-
import com.datastax.oss.driver.api.core.cql.BatchType;
73-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
74-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
75-
import com.datastax.oss.driver.api.core.cql.Row;
76-
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
77-
import com.datastax.oss.driver.api.core.cql.Statement;
78-
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
79-
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
80-
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
81-
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
82-
import com.datastax.oss.driver.api.querybuilder.select.Select;
83-
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
84-
import com.datastax.oss.driver.api.querybuilder.update.Update;
85-
8685
/**
8786
* Primary implementation of {@link ReactiveCassandraOperations}. It simplifies the use of Reactive Cassandra usage and
8887
* helps to avoid common errors. It executes core Cassandra workflow. This class executes CQL queries or updates,
@@ -937,8 +936,20 @@ private Mono<Integer> getEffectiveFetchSize(Statement<?> statement) {
937936
}
938937
}
939938

939+
class GetConfiguredPageSize implements ReactiveSessionCallback<Integer>, CqlProvider {
940+
@Override
941+
public Publisher<Integer> doInSession(ReactiveSession session) {
942+
return Mono.just(getConfiguredPageSize(session.getContext()));
943+
}
944+
945+
@Override
946+
public String getCql() {
947+
return QueryExtractorDelegate.getCql(statement);
948+
}
949+
}
950+
940951
return getReactiveCqlOperations()
941-
.execute((ReactiveSessionCallback<Integer>) session -> Mono.just(getConfiguredPageSize(session.getContext())))
952+
.execute(new GetConfiguredPageSize())
942953
.single();
943954
}
944955

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/AsyncCqlTemplate.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,6 @@
2222
import java.util.concurrent.ExecutionException;
2323
import java.util.function.Function;
2424

25-
import com.datastax.oss.driver.api.core.CqlSession;
26-
import com.datastax.oss.driver.api.core.DriverException;
27-
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
28-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
29-
import com.datastax.oss.driver.api.core.cql.ResultSet;
30-
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
31-
import com.datastax.oss.driver.api.core.cql.Statement;
32-
3325
import org.springframework.dao.DataAccessException;
3426
import org.springframework.dao.support.DataAccessUtils;
3527
import org.springframework.dao.support.PersistenceExceptionTranslator;
@@ -40,6 +32,14 @@
4032
import org.springframework.util.concurrent.ListenableFuture;
4133
import org.springframework.util.concurrent.SettableListenableFuture;
4234

35+
import com.datastax.oss.driver.api.core.CqlSession;
36+
import com.datastax.oss.driver.api.core.DriverException;
37+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
38+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
39+
import com.datastax.oss.driver.api.core.cql.ResultSet;
40+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
41+
import com.datastax.oss.driver.api.core.cql.Statement;
42+
4343
/**
4444
* <b>This is the central class in the CQL core package for asynchronous Cassandra data access.</b> It simplifies the
4545
* use of CQL and helps to avoid common errors. It executes core CQL workflow, leaving application code to provide CQL
@@ -293,8 +293,7 @@ public <T> ListenableFuture<T> query(Statement<?> statement, AsyncResultSetExtra
293293
.thenApply(resultSetExtractor::extractData) //
294294
.thenCompose(ListenableFuture::completable);
295295

296-
return new CassandraFutureAdapter<>(results,
297-
ex -> translateExceptionIfPossible("Query", toCql(statement), ex));
296+
return new CassandraFutureAdapter<>(results, ex -> translateExceptionIfPossible("Query", toCql(statement), ex));
298297
} catch (DriverException e) {
299298
throw translateException("Query", toCql(statement), e);
300299
}
@@ -446,7 +445,8 @@ public <T> ListenableFuture<T> execute(AsyncPreparedStatementCreator preparedSta
446445
toCql(preparedStatementCreator), ex);
447446
try {
448447
if (logger.isDebugEnabled()) {
449-
logger.debug(String.format("Preparing statement [%s] using %s", toCql(preparedStatementCreator), preparedStatementCreator));
448+
logger.debug(String.format("Preparing statement [%s] using %s", toCql(preparedStatementCreator),
449+
preparedStatementCreator));
450450
}
451451

452452
CqlSession currentSession = getCurrentSession();
@@ -517,7 +517,8 @@ public <T> ListenableFuture<T> query(AsyncPreparedStatementCreator preparedState
517517
try {
518518

519519
if (logger.isDebugEnabled()) {
520-
logger.debug(String.format("Preparing statement [%s] using %s", toCql(preparedStatementCreator), preparedStatementCreator));
520+
logger.debug(String.format("Preparing statement [%s] using %s", toCql(preparedStatementCreator),
521+
preparedStatementCreator));
521522
}
522523

523524
CqlSession session = getCurrentSession();

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CqlTemplate.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525
import java.util.stream.Stream;
2626
import java.util.stream.StreamSupport;
2727

28+
import org.springframework.dao.DataAccessException;
29+
import org.springframework.dao.support.DataAccessUtils;
30+
import org.springframework.data.cassandra.SessionFactory;
31+
import org.springframework.lang.Nullable;
32+
import org.springframework.util.Assert;
33+
2834
import com.datastax.oss.driver.api.core.CqlSession;
2935
import com.datastax.oss.driver.api.core.DriverException;
3036
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -34,12 +40,6 @@
3440
import com.datastax.oss.driver.api.core.cql.Statement;
3541
import com.datastax.oss.driver.api.core.metadata.Node;
3642

37-
import org.springframework.dao.DataAccessException;
38-
import org.springframework.dao.support.DataAccessUtils;
39-
import org.springframework.data.cassandra.SessionFactory;
40-
import org.springframework.lang.Nullable;
41-
import org.springframework.util.Assert;
42-
4343
/**
4444
* <b>This is the central class in the CQL core package.</b> It simplifies the use of CQL and helps to avoid common
4545
* errors. It executes core CQL workflow, leaving application code to provide CQL and extract results. This class

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/ReactiveCqlTemplate.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,23 @@
1515
*/
1616
package org.springframework.data.cassandra.core.cql;
1717

18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.Mono;
20+
1821
import java.util.Map;
1922
import java.util.function.Function;
2023

24+
import org.reactivestreams.Publisher;
25+
26+
import org.springframework.dao.DataAccessException;
27+
import org.springframework.dao.support.DataAccessUtils;
28+
import org.springframework.data.cassandra.ReactiveResultSet;
29+
import org.springframework.data.cassandra.ReactiveSession;
30+
import org.springframework.data.cassandra.ReactiveSessionFactory;
31+
import org.springframework.data.cassandra.core.cql.session.DefaultReactiveSessionFactory;
32+
import org.springframework.lang.Nullable;
33+
import org.springframework.util.Assert;
34+
2135
import com.datastax.oss.driver.api.core.ConsistencyLevel;
2236
import com.datastax.oss.driver.api.core.CqlIdentifier;
2337
import com.datastax.oss.driver.api.core.CqlSession;
@@ -29,18 +43,6 @@
2943
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
3044
import com.datastax.oss.driver.api.core.cql.Statement;
3145
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
32-
import org.reactivestreams.Publisher;
33-
import reactor.core.publisher.Flux;
34-
import reactor.core.publisher.Mono;
35-
36-
import org.springframework.dao.DataAccessException;
37-
import org.springframework.dao.support.DataAccessUtils;
38-
import org.springframework.data.cassandra.ReactiveResultSet;
39-
import org.springframework.data.cassandra.ReactiveSession;
40-
import org.springframework.data.cassandra.ReactiveSessionFactory;
41-
import org.springframework.data.cassandra.core.cql.session.DefaultReactiveSessionFactory;
42-
import org.springframework.lang.Nullable;
43-
import org.springframework.util.Assert;
4446

4547
/**
4648
* <b>This is the central class in the CQL core package for reactive Cassandra data access.</b> It simplifies the use of
@@ -440,8 +442,7 @@ public <T> Flux<T> query(Statement<?> statement, ReactiveResultSetExtractor<T> r
440442
logger.debug(String.format("Executing statement [%s]", toCql(statement)));
441443
}
442444

443-
return session.execute(applyStatementSettings(statement))
444-
.flatMapMany(rse::extractData);
445+
return session.execute(applyStatementSettings(statement)).flatMapMany(rse::extractData);
445446
}).onErrorMap(translateException("Query", toCql(statement)));
446447
}
447448

@@ -536,8 +537,7 @@ public <T> Flux<T> execute(ReactivePreparedStatementCreator psc, ReactivePrepare
536537

537538
logger.debug(String.format("Preparing statement [%s] using %s", toCql(psc), psc));
538539

539-
return psc.createPreparedStatement(session)
540-
.flatMapMany(ps -> action.doInPreparedStatement(session, ps));
540+
return psc.createPreparedStatement(session).flatMapMany(ps -> action.doInPreparedStatement(session, ps));
541541
}).onErrorMap(translateException("ReactivePreparedStatementCallback", toCql(psc)));
542542
}
543543

0 commit comments

Comments
 (0)