6767import static oracle .nosql .proxy .protocol .BinaryProtocol .RECOMPILE_QUERY ;
6868import static oracle .nosql .proxy .protocol .BinaryProtocol .REPLICA_STATS_LIMIT ;
6969import static oracle .nosql .proxy .protocol .BinaryProtocol .REQUEST_SIZE_LIMIT_EXCEEDED ;
70+ import static oracle .nosql .proxy .protocol .BinaryProtocol .REQUEST_TIMEOUT ;
7071import static oracle .nosql .proxy .protocol .BinaryProtocol .SECURITY_INFO_UNAVAILABLE ;
7172import static oracle .nosql .proxy .protocol .BinaryProtocol .SERVER_ERROR ;
7273import static oracle .nosql .proxy .protocol .BinaryProtocol .TABLE_NOT_FOUND ;
138139import java .util .ArrayList ;
139140import java .util .Arrays ;
140141import java .util .HashMap ;
142+ import java .util .HashSet ;
141143import java .util .List ;
142144import java .util .Map ;
145+ import java .util .Set ;
143146import java .util .TreeMap ;
144147import java .util .concurrent .CompletableFuture ;
145148import java .util .concurrent .CompletionException ;
146149import java .util .concurrent .RejectedExecutionException ;
150+ import java .util .concurrent .TimeoutException ;
147151import java .util .concurrent .TimeUnit ;
148152import java .util .concurrent .atomic .AtomicInteger ;
149153import java .util .logging .Level ;
210214import oracle .nosql .proxy .ValueSerializer .RowSerializerImpl ;
211215import oracle .nosql .proxy .audit .ProxyAuditManager ;
212216import oracle .nosql .proxy .filter .FilterHandler ;
213- import oracle .nosql .proxy .filter .FilterHandler .Action ;
214217import oracle .nosql .proxy .filter .FilterHandler .Filter ;
215218import oracle .nosql .proxy .protocol .ByteInputStream ;
216219import oracle .nosql .proxy .protocol .ByteOutputStream ;
232235import oracle .nosql .proxy .util .ErrorManager ;
233236import oracle .nosql .proxy .util .ProxyThreadPoolExecutor ;
234237import oracle .nosql .proxy .util .TableCache .TableEntry ;
238+ import oracle .nosql .util .filter .Rule ;
235239import oracle .nosql .util .tmi .IndexInfo ;
236240import oracle .nosql .util .tmi .TableInfo ;
237241import oracle .nosql .util .tmi .TableLimits ;
@@ -469,11 +473,6 @@ protected FullHttpResponse handleRequest(FullHttpRequest request,
469473 ChannelHandlerContext ctx ,
470474 LogContext lc ,
471475 Object callerContext ) {
472- /* Block all requests if there is "big red button" rule */
473- Action action = checkBlockAll (lc );
474- if (action != null ) {
475- return action .handleRequest (null , null , lc );
476- }
477476
478477 int threads = activeWorkerThreads .incrementAndGet ();
479478 if (stats != null ) {
@@ -487,6 +486,12 @@ protected FullHttpResponse handleRequest(FullHttpRequest request,
487486 /* message, retain() it here */
488487 request .retain ();
489488 try {
489+ /* Block all requests if there is "big red button" rule */
490+ Rule rule = getBlockAll (lc );
491+ if (rule != null ) {
492+ return filter .handleRequest (rc , rule );
493+ }
494+
490495 /* Handle OPTIONS method for pre-flight request. */
491496 if (HttpMethod .OPTIONS .equals (rc .request .method ())) {
492497 return handleOptions (rc .request , rc .lc );
@@ -988,7 +993,7 @@ private FullHttpResponse formulateErrorResponse(
988993 throw re ;
989994 } catch (FilterRequestException fre ) {
990995 /* this will currently always return null. Hmmm... */
991- return handleFilterRequest (fre , rc . requestId , rc . lc );
996+ return handleFilterRequest (fre , rc );
992997 } catch (Throwable t ) {
993998 /*
994999 * This error may indicate a bug in the proxy. Make sure
@@ -1764,6 +1769,10 @@ private boolean handleQuery(final RequestContext rc)
17641769
17651770 if (info .isPrepared == false ) {
17661771
1772+ if (info .numOperations != 0 || info .operationNumber != 0 ) {
1773+ throw new IllegalArgumentException (
1774+ "Parallel queries require a prepared query" );
1775+ }
17671776 /* this method also enforces limit on query string length */
17681777 cbInfo = TableUtils .getCallbackInfo (rc .actx , info .statement , tm );
17691778 cbInfo .checkSupportedDml ();
@@ -1905,7 +1914,7 @@ private boolean handleQuery(final RequestContext rc)
19051914 }
19061915
19071916 /*
1908- * Set ExceuteOptions .updateLimit for update query to limit the max
1917+ * Set ExecuteOptions .updateLimit for update query to limit the max
19091918 * number of records can be updated in a single query:
19101919 * - In onprem, set it to the limit set by application.
19111920 * - In cloud, the max number of records that can be updated in a
@@ -1919,6 +1928,13 @@ private boolean handleQuery(final RequestContext rc)
19191928 }
19201929 }
19211930
1931+ /* insert/update/upsert not allowed to be parallel */
1932+ if (isUpdateOp &&
1933+ (info .numOperations != 0 || info .operationNumber != 0 )) {
1934+ throw new IllegalArgumentException (
1935+ "Cannot perform parallel query on inserts or updates" );
1936+ }
1937+
19221938 /* FUTURE: use info.durability */
19231939
19241940 NsonSerializer ns = null ;
@@ -1966,6 +1982,48 @@ private boolean handleQuery(final RequestContext rc)
19661982 rc .bbos .writeInt (0 );
19671983 }
19681984
1985+ /*
1986+ * this method validates the parameters and will throw if invalid.
1987+ * It returns the total number of operations. If > 0 this is a parallel
1988+ * query
1989+ */
1990+ int numberOfOperations =
1991+ getParallelQueryOperations (info , prep , store .getTopology ());
1992+
1993+ /*
1994+ * Compute synchronous query results. If this is a parallel query
1995+ * the appropriate set of shards or partitions needs to be passed
1996+ */
1997+ Set <RepGroupId > shards = null ;
1998+ Set <Integer > partitions = null ;
1999+
2000+ if (info .shardId > 0 ) {
2001+ /*
2002+ * this is where the caller is explicitly handling a shard and
2003+ * is never parallel
2004+ */
2005+ shards = new HashSet <>(1 );
2006+ shards .add (new RepGroupId (info .shardId ));
2007+ } else if (numberOfOperations > 1 ) {
2008+ execOpts .setIsSimpleQuery (info .isSimpleQuery );
2009+ if (prep .getDistributionKind ().equals (
2010+ PreparedStatementImpl .DistributionKind .ALL_SHARDS )) {
2011+ /* used shard-based split, even if all partition query */
2012+ shards = computeParallelShards (info , store , rc );
2013+ } else {
2014+ /*
2015+ * there is no current async kv call to handle a set of
2016+ * partitions, so turn off async for this path.
2017+ * FUTURE: leave it async if KV supports it. See
2018+ * doAsyncQuery()
2019+ */
2020+ partitions =
2021+ computeParallelPartitions (
2022+ info , store .getTopology ().getNumPartitions ());
2023+ doAsync = false ;
2024+ }
2025+ }
2026+
19692027 if (doAsync ) {
19702028
19712029 if (info .traceLevel >= 5 ) {
@@ -1977,7 +2035,8 @@ private boolean handleQuery(final RequestContext rc)
19772035 final NsonSerializer nser = ns ;
19782036
19792037 Publisher <RecordValue > qpub =
1980- doAsyncQuery (store , prep , variables , execOpts , info .shardId ,
2038+ doAsyncQuery (store , prep , variables , execOpts ,
2039+ shards , partitions ,
19812040 info .traceLevel , rc .lc );
19822041
19832042 Subscriber <RecordValue > qsub = new Subscriber <RecordValue >() {
@@ -2094,11 +2153,9 @@ public void onComplete() {
20942153
20952154 int numResults = 0 ;
20962155
2097- /*
2098- * Compute synchronous query results
2099- */
21002156 QueryStatementResultImpl qres =
2101- doQuery (store , prep , variables , execOpts , info .shardId ,
2157+ doQuery (store , prep , variables , execOpts ,
2158+ shards , partitions ,
21022159 info .traceLevel , rc .lc );
21032160
21042161 if (ns == null && info .queryVersion > QUERY_V1 ) {
@@ -2138,6 +2195,147 @@ public void onComplete() {
21382195 return false ; // sync
21392196 }
21402197
2198+ /**
2199+ * Validate parallel query operation parameters and return total number
2200+ * of operations
2201+ */
2202+ private int getParallelQueryOperations (QueryOpInfo info ,
2203+ PreparedStatementImpl prep ,
2204+ Topology topo ) {
2205+ if (info .numOperations > 0 ) {
2206+ if (info .operationNumber <= 0 || info .operationNumber >
2207+ info .numOperations ) {
2208+ throw new IllegalArgumentException (
2209+ "Invalid parallel query parameters" );
2210+ }
2211+ /*
2212+ * cannot trust prep.isSimpleQuery() on an already-prepared
2213+ * statement, use the info passed from the driver
2214+ */
2215+ if (!info .isSimpleQuery || prep .getDistributionKind ().equals (
2216+ PreparedStatementImpl .DistributionKind .SINGLE_PARTITION )) {
2217+ /* allow 1 but it's the same as if it were 0, not parallel */
2218+ if (info .numOperations > 1 ) {
2219+ throw new IllegalArgumentException (
2220+ "Invalid number of operations for parallel query" );
2221+ }
2222+ /* a single partition query is not parallel */
2223+ return 0 ;
2224+ }
2225+ if (prep .getDistributionKind ().equals (
2226+ PreparedStatementImpl .DistributionKind .ALL_SHARDS )) {
2227+ if (info .numOperations > topo .getNumRepGroups ()) {
2228+ throw new IllegalArgumentException (
2229+ "Invalid number of operations for parallel query, " +
2230+ "it must be less than or equal to " +
2231+ topo .getNumRepGroups ());
2232+ }
2233+ } else if (info .numOperations > topo .getNumPartitions ()) {
2234+ throw new IllegalArgumentException (
2235+ "Invalid number of operations for parallel query, " +
2236+ "it must be less than or equal to " +
2237+ topo .getNumPartitions ());
2238+ }
2239+ return info .numOperations ;
2240+ } else if (info .operationNumber != 0 ) {
2241+ throw new IllegalArgumentException (
2242+ "Invalid parallel query parameters" );
2243+ }
2244+ return 0 ;
2245+ }
2246+
2247+ /*
2248+ * These methods use a combination of the store topology, the total
2249+ * number of parallel operations and the operation number to return sets
2250+ * of items (shards/partitions) in a deterministic manner. The sets must be
2251+ * the same/repeatable for any <topology, number of ops, op number>
2252+ * combination in order to properly partition the data being
2253+ * queried.
2254+ *
2255+ * It has already been verified that the number of operations is <=
2256+ * number of shards or partitions in the topology. The simplest algorithm
2257+ * is to walk the items assigning each to an operation number "bucket"
2258+ * until all of the items have been assigned. If the items aren't evenly
2259+ * divisible by the number of operations some buckets will have additional
2260+ * items.
2261+ *
2262+ * For example if the number of items is 8 and number of operations is 3
2263+ * then bucket 1 gets items 1, 4, 7, bucket 2 gets 2, 5, 8, and
2264+ * bucket 3 gets 3, 6.
2265+ *
2266+ * These assignments are logically static for the duration of a query but
2267+ * rather than round-trip them it's simpler to recalculate, which is not
2268+ * deemed expensive.
2269+ *
2270+ * This calculation does the above. Items are 1-based. Starting at 1 and
2271+ * going to the last item these items go in the target bucket (B)
2272+ * B = bucket number
2273+ * I = item number (start at 1)
2274+ * N = number of operations
2275+ * for (int I = 1; I <= numberOfItems; I++) {
2276+ * if ((I - B) % N == 0) {
2277+ * add to bucket B
2278+ * }
2279+ */
2280+ private Set <RepGroupId > computeParallelShards (QueryOpInfo info ,
2281+ KVStoreImpl store ,
2282+ RequestContext rc ) {
2283+
2284+ /*
2285+ * Must use the driver's "base" topology for all queries
2286+ */
2287+ int numShards ;
2288+ try {
2289+ numShards =
2290+ store .getDispatcher ().getTopologyManager ().getTopology (
2291+ store , rc .driverTopoSeqNum , rc .timeoutMs ).getNumRepGroups ();
2292+ /*
2293+ * if the driver's notion of topology is different from the current
2294+ * store topology, it means elasticity is happening and all-shard
2295+ * parallel queries are not compatible with elasticity
2296+ */
2297+ if (store .getTopology ().getNumRepGroups () != numShards ) {
2298+ /*
2299+ * use of RECOMPILE_QUERY is not very specific but can cause the
2300+ * caller to "start over" which is the behavior expected, because
2301+ * trying again will likely succeed
2302+ */
2303+ throw new RequestException (
2304+ RECOMPILE_QUERY , "Parallel queries on indexes are not " +
2305+ "supported during certain points in an elasticity " +
2306+ "operation. Please retry the entire coordinated operation" );
2307+ }
2308+ } catch (TimeoutException te ) {
2309+ throw new RequestException (
2310+ REQUEST_TIMEOUT , "Failed to get server state required to " +
2311+ "execute a query" );
2312+ }
2313+
2314+ Set <RepGroupId > shards = new HashSet <>();
2315+ int numOperations = info .numOperations ;
2316+ int bucket = info .operationNumber ;
2317+ for (int i = 1 ; i <= numShards ; i ++) {
2318+ if ((i - bucket ) % numOperations == 0 ) {
2319+ shards .add (new RepGroupId (i ));
2320+ }
2321+ }
2322+ return shards ;
2323+ }
2324+
2325+ /* see comment above. operation number is 1-based */
2326+ private Set <Integer > computeParallelPartitions (QueryOpInfo info ,
2327+ int numPartitions ) {
2328+ Set <Integer > partitions = new HashSet <>();
2329+ int numOperations = info .numOperations ;
2330+ int bucket = info .operationNumber ;
2331+ for (int i = 1 ; i <= numPartitions ; i ++) {
2332+ if ((i - bucket ) % numOperations == 0 ) {
2333+ partitions .add (i );
2334+ }
2335+ }
2336+ return partitions ;
2337+ }
2338+
21412339 private void finishQuery (
21422340 QueryOpInfo qinfo ,
21432341 boolean isAbsolute ,
@@ -3593,6 +3791,8 @@ private static class QueryOpInfo {
35933791 Map <String , FieldValue > bindVars ;
35943792 String queryName ;
35953793 String batchName ;
3794+ int numOperations ;
3795+ int operationNumber ;
35963796 }
35973797
35983798 /*
@@ -4133,6 +4333,10 @@ private void getV4QueryOpInfo(QueryOpInfo info, RequestContext rc)
41334333 info .isSimpleQuery = Nson .readNsonBoolean (bis );
41344334 } else if (name .equals (QUERY_NAME )) {
41354335 info .queryName = Nson .readNsonString (bis );
4336+ } else if (name .equals (NUM_QUERY_OPERATIONS )) {
4337+ info .numOperations = Nson .readNsonInt (bis );
4338+ } else if (name .equals (QUERY_OPERATION_NUM )) {
4339+ info .operationNumber = Nson .readNsonInt (bis );
41364340 } else if (name .equals (VIRTUAL_SCAN )) {
41374341 info .virtualScan = readVirtualScan (bis );
41384342 } else if (name .equals (SERVER_MEMORY_CONSUMPTION )) {
0 commit comments