@@ -1079,35 +1079,46 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
1079
1079
return kvpb .NewErrorf ("empty batch" )
1080
1080
}
1081
1081
1082
- if ba .MaxSpanRequestKeys != 0 || ba .TargetBytes != 0 {
1083
- // Verify that the batch contains only specific requests. Verify that a
1084
- // batch with a ReverseScan only contains ReverseScan range requests.
1085
- var foundForward , foundReverse bool
1086
- for _ , req := range ba .Requests {
1087
- inner := req .GetInner ()
1088
- switch inner .(type ) {
1089
- case * kvpb.ScanRequest , * kvpb.ResolveIntentRangeRequest ,
1090
- * kvpb.DeleteRangeRequest , * kvpb.RevertRangeRequest ,
1091
- * kvpb.ExportRequest , * kvpb.QueryLocksRequest , * kvpb.IsSpanEmptyRequest :
1092
- // Accepted forward range requests.
1093
- foundForward = true
1082
+ // Verify that forward and reverse range requests are never in the same
1083
+ // batch. Also verify that the batch with limits contains only specific
1084
+ // requests.
1085
+ var foundForward , foundReverse bool
1086
+ var disallowedReq string
1087
+ for _ , req := range ba .Requests {
1088
+ inner := req .GetInner ()
1089
+ switch inner .(type ) {
1090
+ case * kvpb.ScanRequest , * kvpb.ResolveIntentRangeRequest ,
1091
+ * kvpb.DeleteRangeRequest , * kvpb.RevertRangeRequest ,
1092
+ * kvpb.ExportRequest , * kvpb.QueryLocksRequest , * kvpb.IsSpanEmptyRequest :
1093
+ // Accepted forward range requests.
1094
+ foundForward = true
1094
1095
1095
- case * kvpb.ReverseScanRequest :
1096
- // Accepted reverse range requests.
1097
- foundReverse = true
1096
+ case * kvpb.ReverseScanRequest :
1097
+ // Accepted reverse range requests.
1098
+ foundReverse = true
1098
1099
1099
- case * kvpb.QueryIntentRequest , * kvpb.EndTxnRequest ,
1100
- * kvpb.GetRequest , * kvpb.ResolveIntentRequest , * kvpb.DeleteRequest , * kvpb.PutRequest :
1101
- // Accepted point requests that can be in batches with limit.
1100
+ case * kvpb.QueryIntentRequest , * kvpb.EndTxnRequest ,
1101
+ * kvpb.GetRequest , * kvpb.ResolveIntentRequest , * kvpb.DeleteRequest , * kvpb.PutRequest :
1102
+ // Accepted point requests that can be in batches with limit. No
1103
+ // need to set disallowedReq.
1102
1104
1103
- default :
1104
- return kvpb .NewErrorf ("batch with limit contains %s request" , inner .Method ())
1105
- }
1106
- }
1107
- if foundForward && foundReverse {
1108
- return kvpb .NewErrorf ("batch with limit contains both forward and reverse scans" )
1105
+ default :
1106
+ disallowedReq = inner .Method ().String ()
1109
1107
}
1110
1108
}
1109
+ if foundForward && foundReverse {
1110
+ return kvpb .NewErrorf ("batch contains both forward and reverse requests" )
1111
+ }
1112
+ if (ba .MaxSpanRequestKeys != 0 || ba .TargetBytes != 0 ) && disallowedReq != "" {
1113
+ return kvpb .NewErrorf ("batch with limit contains %s request" , disallowedReq )
1114
+ }
1115
+ // Also verify that IsReverse is set accordingly on the batch header.
1116
+ if foundForward && ba .Header .IsReverse {
1117
+ return kvpb .NewErrorf ("batch contains forward requests but IsReverse is set" )
1118
+ }
1119
+ if foundReverse && ! ba .Header .IsReverse {
1120
+ return kvpb .NewErrorf ("batch contains reverse requests but IsReverse is not set" )
1121
+ }
1111
1122
1112
1123
switch ba .WaitPolicy {
1113
1124
case lock .WaitPolicy_Block , lock .WaitPolicy_Error :
@@ -1268,7 +1279,6 @@ func (ds *DistSender) Send(
1268
1279
if err != nil {
1269
1280
return nil , kvpb .NewError (err )
1270
1281
}
1271
- isReverse := ba .IsReverse ()
1272
1282
1273
1283
// Determine whether this part of the BatchRequest contains a committing
1274
1284
// EndTxn request.
@@ -1282,9 +1292,9 @@ func (ds *DistSender) Send(
1282
1292
var rpl * kvpb.BatchResponse
1283
1293
var pErr * kvpb.Error
1284
1294
if withParallelCommit {
1285
- rpl , pErr = ds .divideAndSendParallelCommit (ctx , ba , rs , isReverse , 0 /* batchIdx */ )
1295
+ rpl , pErr = ds .divideAndSendParallelCommit (ctx , ba , rs , 0 /* batchIdx */ )
1286
1296
} else {
1287
- rpl , pErr = ds .divideAndSendBatchToRanges (ctx , ba , rs , isReverse , withCommit , 0 /* batchIdx */ )
1297
+ rpl , pErr = ds .divideAndSendBatchToRanges (ctx , ba , rs , withCommit , 0 /* batchIdx */ )
1288
1298
}
1289
1299
1290
1300
if pErr == errNo1PCTxn {
@@ -1475,7 +1485,7 @@ type response struct {
1475
1485
// method is never invoked recursively, but it is exposed to maintain symmetry
1476
1486
// with divideAndSendBatchToRanges.
1477
1487
func (ds * DistSender ) divideAndSendParallelCommit (
1478
- ctx context.Context , ba * kvpb.BatchRequest , rs roachpb.RSpan , isReverse bool , batchIdx int ,
1488
+ ctx context.Context , ba * kvpb.BatchRequest , rs roachpb.RSpan , batchIdx int ,
1479
1489
) (br * kvpb.BatchResponse , pErr * kvpb.Error ) {
1480
1490
// Search backwards, looking for the first pre-commit QueryIntent.
1481
1491
swapIdx := - 1
@@ -1491,7 +1501,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
1491
1501
if swapIdx == - 1 {
1492
1502
// No pre-commit QueryIntents. Nothing to split.
1493
1503
log .VEvent (ctx , 3 , "no pre-commit QueryIntents found, sending batch as-is" )
1494
- return ds .divideAndSendBatchToRanges (ctx , ba , rs , isReverse , true /* withCommit */ , batchIdx )
1504
+ return ds .divideAndSendBatchToRanges (ctx , ba , rs , true /* withCommit */ , batchIdx )
1495
1505
}
1496
1506
1497
1507
// Swap the EndTxn request and the first pre-commit QueryIntent. This
@@ -1518,7 +1528,8 @@ func (ds *DistSender) divideAndSendParallelCommit(
1518
1528
if err != nil {
1519
1529
return br , kvpb .NewError (err )
1520
1530
}
1521
- qiIsReverse := false // QueryIntentRequests do not carry the isReverse flag
1531
+ // No need to process QueryIntentRequests in the reverse order.
1532
+ qiBa .IsReverse = false
1522
1533
qiBatchIdx := batchIdx + 1
1523
1534
qiResponseCh := make (chan response , 1 )
1524
1535
@@ -1544,7 +1555,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
1544
1555
1545
1556
// Send the batch with withCommit=true since it will be inflight
1546
1557
// concurrently with the EndTxn batch below.
1547
- reply , pErr := ds .divideAndSendBatchToRanges (ctx , qiBa , qiRS , qiIsReverse , true /* withCommit */ , qiBatchIdx )
1558
+ reply , pErr := ds .divideAndSendBatchToRanges (ctx , qiBa , qiRS , true /* withCommit */ , qiBatchIdx )
1548
1559
qiResponseCh <- response {reply : reply , positions : positions , pErr : pErr }
1549
1560
}
1550
1561
@@ -1576,10 +1587,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
1576
1587
if err != nil {
1577
1588
return nil , kvpb .NewError (err )
1578
1589
}
1579
- // Note that we don't need to recompute isReverse for the updated batch
1580
- // since we only separated out QueryIntentRequests which don't carry the
1581
- // isReverse flag.
1582
- br , pErr = ds .divideAndSendBatchToRanges (ctx , ba , rs , isReverse , true /* withCommit */ , batchIdx )
1590
+ br , pErr = ds .divideAndSendBatchToRanges (ctx , ba , rs , true /* withCommit */ , batchIdx )
1583
1591
1584
1592
// Wait for the QueryIntent-only batch to complete and stitch
1585
1593
// the responses together.
@@ -1748,12 +1756,6 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
1748
1756
// is trimmed against each range which is part of the span and sent
1749
1757
// either serially or in parallel, if possible.
1750
1758
//
1751
- // isReverse indicates the direction that the provided span should be
1752
- // iterated over while sending requests. It is passed in by callers
1753
- // instead of being recomputed based on the requests in the batch to
1754
- // prevent the iteration direction from switching midway through a
1755
- // batch, in cases where partial batches recurse into this function.
1756
- //
1757
1759
// withCommit indicates that the batch contains a transaction commit
1758
1760
// or that a transaction commit is being run concurrently with this
1759
1761
// batch. Either way, if this is true then sendToReplicas will need
@@ -1763,12 +1765,7 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
1763
1765
// being processed by this method. It's specified as non-zero when
1764
1766
// this method is invoked recursively.
1765
1767
func (ds * DistSender ) divideAndSendBatchToRanges (
1766
- ctx context.Context ,
1767
- ba * kvpb.BatchRequest ,
1768
- rs roachpb.RSpan ,
1769
- isReverse bool ,
1770
- withCommit bool ,
1771
- batchIdx int ,
1768
+ ctx context.Context , ba * kvpb.BatchRequest , rs roachpb.RSpan , withCommit bool , batchIdx int ,
1772
1769
) (br * kvpb.BatchResponse , pErr * kvpb.Error ) {
1773
1770
// Clone the BatchRequest's transaction so that future mutations to the
1774
1771
// proto don't affect the proto in this batch.
@@ -1778,7 +1775,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
1778
1775
// Get initial seek key depending on direction of iteration.
1779
1776
var scanDir ScanDirection
1780
1777
var seekKey roachpb.RKey
1781
- if ! isReverse {
1778
+ if ! ba . IsReverse {
1782
1779
scanDir = Ascending
1783
1780
seekKey = rs .Key
1784
1781
} else {
@@ -1793,7 +1790,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
1793
1790
// Take the fast path if this batch fits within a single range.
1794
1791
if ! ri .NeedAnother (rs ) {
1795
1792
resp := ds .sendPartialBatch (
1796
- ctx , ba , rs , isReverse , withCommit , batchIdx , ri .Token (),
1793
+ ctx , ba , rs , withCommit , batchIdx , ri .Token (),
1797
1794
)
1798
1795
// resp.positions remains nil since the original batch is fully
1799
1796
// contained within a single range.
@@ -1915,7 +1912,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
1915
1912
}
1916
1913
1917
1914
if pErr == nil && couldHaveSkippedResponses {
1918
- fillSkippedResponses (ba , br , seekKey , resumeReason , isReverse )
1915
+ fillSkippedResponses (ba , br , seekKey , resumeReason )
1919
1916
}
1920
1917
}()
1921
1918
@@ -1994,7 +1991,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
1994
1991
// If we can reserve one of the limited goroutines available for parallel
1995
1992
// batch RPCs, send asynchronously.
1996
1993
if canParallelize && ! lastRange && ! ds .disableParallelBatches {
1997
- if ds .sendPartialBatchAsync (ctx , curRangeBatch , curRangeRS , isReverse , withCommit , batchIdx , ri .Token (), responseCh , positions ) {
1994
+ if ds .sendPartialBatchAsync (ctx , curRangeBatch , curRangeRS , withCommit , batchIdx , ri .Token (), responseCh , positions ) {
1998
1995
asyncSent = true
1999
1996
} else {
2000
1997
asyncThrottled = true
@@ -2009,7 +2006,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
2009
2006
}()
2010
2007
}
2011
2008
return ds .sendPartialBatch (
2012
- ctx , curRangeBatch , curRangeRS , isReverse , withCommit , batchIdx , ri .Token (),
2009
+ ctx , curRangeBatch , curRangeRS , withCommit , batchIdx , ri .Token (),
2013
2010
)
2014
2011
}()
2015
2012
resp .positions = positions
@@ -2089,7 +2086,6 @@ func (ds *DistSender) sendPartialBatchAsync(
2089
2086
ctx context.Context ,
2090
2087
ba * kvpb.BatchRequest ,
2091
2088
rs roachpb.RSpan ,
2092
- isReverse bool ,
2093
2089
withCommit bool ,
2094
2090
batchIdx int ,
2095
2091
routing rangecache.EvictionToken ,
@@ -2111,7 +2107,7 @@ func (ds *DistSender) sendPartialBatchAsync(
2111
2107
ds .metrics .AsyncSentCount .Inc (1 )
2112
2108
ds .metrics .AsyncInProgress .Inc (1 )
2113
2109
defer ds .metrics .AsyncInProgress .Dec (1 )
2114
- resp := ds .sendPartialBatch (ctx , ba , rs , isReverse , withCommit , batchIdx , routing )
2110
+ resp := ds .sendPartialBatch (ctx , ba , rs , withCommit , batchIdx , routing )
2115
2111
resp .positions = positions
2116
2112
responseCh <- resp
2117
2113
}(ctx )
@@ -2175,7 +2171,6 @@ func (ds *DistSender) sendPartialBatch(
2175
2171
ctx context.Context ,
2176
2172
ba * kvpb.BatchRequest ,
2177
2173
rs roachpb.RSpan ,
2178
- isReverse bool ,
2179
2174
withCommit bool ,
2180
2175
batchIdx int ,
2181
2176
routingTok rangecache.EvictionToken ,
@@ -2203,7 +2198,7 @@ func (ds *DistSender) sendPartialBatch(
2203
2198
2204
2199
if ! routingTok .Valid () {
2205
2200
var descKey roachpb.RKey
2206
- if isReverse {
2201
+ if ba . IsReverse {
2207
2202
descKey = rs .EndKey
2208
2203
} else {
2209
2204
descKey = rs .Key
@@ -2219,7 +2214,7 @@ func (ds *DistSender) sendPartialBatch(
2219
2214
// replica, while detecting hazardous cases where the follower does
2220
2215
// not have the latest information and the current descriptor did
2221
2216
// not result in a successful send.
2222
- routingTok , err = ds .getRoutingInfo (ctx , descKey , prevTok , isReverse )
2217
+ routingTok , err = ds .getRoutingInfo (ctx , descKey , prevTok , ba . IsReverse )
2223
2218
if err != nil {
2224
2219
log .VErrEventf (ctx , 1 , "range descriptor re-lookup failed: %s" , err )
2225
2220
// We set pErr if we encountered an error getting the descriptor in
@@ -2242,7 +2237,7 @@ func (ds *DistSender) sendPartialBatch(
2242
2237
}
2243
2238
if ! intersection .Equal (rs ) {
2244
2239
log .Eventf (ctx , "range shrunk; sub-dividing the request" )
2245
- reply , pErr = ds .divideAndSendBatchToRanges (ctx , ba , rs , isReverse , withCommit , batchIdx )
2240
+ reply , pErr = ds .divideAndSendBatchToRanges (ctx , ba , rs , withCommit , batchIdx )
2246
2241
return response {reply : reply , pErr : pErr }
2247
2242
}
2248
2243
}
@@ -2344,7 +2339,7 @@ func (ds *DistSender) sendPartialBatch(
2344
2339
// batch here would give a potentially larger response slice
2345
2340
// with unknown mapping to our truncated reply).
2346
2341
log .VEventf (ctx , 1 , "likely split; will resend. Got new descriptors: %s" , tErr .Ranges )
2347
- reply , pErr = ds .divideAndSendBatchToRanges (ctx , ba , rs , isReverse , withCommit , batchIdx )
2342
+ reply , pErr = ds .divideAndSendBatchToRanges (ctx , ba , rs , withCommit , batchIdx )
2348
2343
return response {reply : reply , pErr : pErr }
2349
2344
}
2350
2345
break
@@ -2393,7 +2388,6 @@ func fillSkippedResponses(
2393
2388
br * kvpb.BatchResponse ,
2394
2389
nextKey roachpb.RKey ,
2395
2390
resumeReason kvpb.ResumeReason ,
2396
- isReverse bool ,
2397
2391
) {
2398
2392
// Some requests might have no response at all if we used a batch-wide
2399
2393
// limit; simply create trivial responses for those. Note that any type
@@ -2423,7 +2417,7 @@ func fillSkippedResponses(
2423
2417
for i , resp := range br .Responses {
2424
2418
req := ba .Requests [i ].GetInner ()
2425
2419
hdr := resp .GetInner ().Header ()
2426
- maybeSetResumeSpan (req , & hdr , nextKey , isReverse )
2420
+ maybeSetResumeSpan (req , & hdr , nextKey , ba . IsReverse )
2427
2421
if hdr .ResumeSpan != nil {
2428
2422
hdr .ResumeReason = resumeReason
2429
2423
}
0 commit comments