Skip to content

Commit b5b4b8d

Browse files
committed
sql: disable DistSQL when txn buffered writes and MVCC decoding is required
Previously, if we buffered some writes on the RootTxn and then issued a query that touches system columns that require MVCC decoding (like `crdb_internal_mvcc_timestamp`) that is executed via DistSQL, it would crash the process. The reason is that MVCC decoding is currently not supported by the txnWriteBuffer, so we disable buffered writes, which is only allowed on the RootTxn but we'd have the LeafTxn due to the plan being distributed. This commit fixes this issue by disabling DistSQL if - the txn has buffered some writes, and - one of the system columns that requires MVCC decoding is requested. Executing the query will proceed with the "local" execution model which will flush the buffer since we'll still disable buffered writes because of MVCC decoding. There are a couple of other complications to keep in mind: - we're deciding on the plan distribution for the main query _before_ we execute any of the subqueries. Thus, to be safe in that case, if the query as a whole has a mutation AND it has at least one subquery, we'll "assume the worst" and will treat the subquery as if it's guaranteed to buffer some writes. - usage of LeafTxn can also be forced due to parallel processors. One such case is "parallelize scans if local", and now we'll prohibit parallelization if one of the system columns is requested. I decided to omit the release note given it's an edge case in disabled by default feature. Release note: None
1 parent 85577c3 commit b5b4b8d

15 files changed

+346
-45
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1691,6 +1691,13 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool {
16911691
return tc.hasPerformedWritesLocked()
16921692
}
16931693

1694+
// HasBufferedWrites is part of the TxnSender interface.
1695+
func (tc *TxnCoordSender) HasBufferedWrites() bool {
1696+
tc.mu.Lock()
1697+
defer tc.mu.Unlock()
1698+
return tc.hasBufferedWritesLocked()
1699+
}
1700+
16941701
// TestingShouldRetry is part of the TxnSender interface.
16951702
func (tc *TxnCoordSender) TestingShouldRetry() bool {
16961703
tc.mu.Lock()
@@ -1715,3 +1722,7 @@ func (tc *TxnCoordSender) hasPerformedReadsLocked() bool {
17151722
func (tc *TxnCoordSender) hasPerformedWritesLocked() bool {
17161723
return tc.mu.txn.Sequence != 0
17171724
}
1725+
1726+
func (tc *TxnCoordSender) hasBufferedWritesLocked() bool {
1727+
return tc.interceptorAlloc.txnWriteBuffer.hasBufferedWrites()
1728+
}

pkg/kv/mock_transactional_sender.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,11 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool {
282282
panic("unimplemented")
283283
}
284284

285+
// HasBufferedWrites is part of TxnSenderFactory.
286+
func (m *MockTransactionalSender) HasBufferedWrites() bool {
287+
return false
288+
}
289+
285290
// TestingShouldRetry is part of TxnSenderFactory.
286291
func (m *MockTransactionalSender) TestingShouldRetry() bool {
287292
return false

pkg/kv/sender.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,12 @@ type TxnSender interface {
134134
SetOmitInRangefeeds()
135135

136136
// SetBufferedWritesEnabled toggles whether the writes are buffered on the
137-
// gateway node until the commit time. Only allowed on the RootTxn. Buffered
138-
// writes cannot be enabled on a txn that performed any requests. When
139-
// disabling buffered writes, if there are any writes in the buffer, they
140-
// are flushed with the next BatchRequest.
137+
// gateway node until the commit time. Buffered writes cannot be enabled on
138+
// a txn that performed any requests. When disabling buffered writes, if
139+
// there are any writes in the buffer, they are flushed with the next
140+
// BatchRequest.
141+
//
142+
// Only allowed on the RootTxn.
141143
SetBufferedWritesEnabled(bool)
142144

143145
// BufferedWritesEnabled returns whether the buffered writes are enabled.
@@ -379,6 +381,10 @@ type TxnSender interface {
379381
// transaction's current epoch.
380382
HasPerformedWrites() bool
381383

384+
// HasBufferedWrites returns true if a write has been buffered for the
385+
// transaction's current epoch.
386+
HasBufferedWrites() bool
387+
382388
// TestingShouldRetry returns true if transaction retry errors should be
383389
// randomly returned to callers. Note that it is the responsibility of
384390
// (*kv.DB).Txn() to return the retries. This lives here since the

pkg/kv/txn.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,18 @@ func (txn *Txn) debugNameLocked() string {
443443
return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID)
444444
}
445445

446+
// SetBufferedWritesEnabled toggles whether the writes are buffered on the
447+
// gateway node until the commit time. Buffered writes cannot be enabled on a
448+
// txn that performed any requests. When disabling buffered writes, if there are
449+
// any writes in the buffer, they are flushed with the next BatchRequest.
450+
//
451+
// Only allowed on the RootTxn.
446452
func (txn *Txn) SetBufferedWritesEnabled(enabled bool) {
447453
if txn.typ != RootTxn {
448-
panic(errors.AssertionFailedf("SetBufferedWritesEnabled() called on leaf txn"))
454+
panic(errors.AssertionFailedf(
455+
"SetBufferedWritesEnabled(%t) called on leaf txn (buffer empty? %t)",
456+
enabled, txn.HasBufferedWrites(),
457+
))
449458
}
450459

451460
txn.mu.Lock()
@@ -1862,6 +1871,14 @@ func (txn *Txn) HasPerformedWrites() bool {
18621871
return txn.mu.sender.HasPerformedWrites()
18631872
}
18641873

1874+
// HasBufferedWrites returns true if a write has been buffered for the
1875+
// transaction's current epoch.
1876+
func (txn *Txn) HasBufferedWrites() bool {
1877+
txn.mu.Lock()
1878+
defer txn.mu.Unlock()
1879+
return txn.mu.sender.HasBufferedWrites()
1880+
}
1881+
18651882
// AdmissionHeader returns the admission header for work done in the context
18661883
// of this transaction.
18671884
func (txn *Txn) AdmissionHeader() kvpb.AdmissionHeader {

pkg/sql/colfetcher/cfetcher.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,15 @@ func (cf *cFetcher) Init(
460460
// MVCC decoding.
461461
if cf.mvccDecodeStrategy == storage.MVCCDecodingRequired {
462462
if cf.txn != nil && cf.txn.BufferedWritesEnabled() {
463+
if cf.txn.Type() == kv.LeafTxn {
464+
// We're only allowed to disable buffered writes on the RootTxn.
465+
// If we have a LeafTxn, we'll return an assertion error instead
466+
// of crashing.
467+
//
468+
// Note that we might have a LeafTxn with no buffered writes, in
469+
// which case BufferedWritesEnabled() is false.
470+
return errors.AssertionFailedf("got LeafTxn when MVCC decoding is required")
471+
}
463472
cf.txn.SetBufferedWritesEnabled(false /* enabled */)
464473
}
465474
}

pkg/sql/delete_range.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (d *deleteRangeNode) startExec(params runParams) error {
9494
// fetch kvs.
9595
var spec fetchpb.IndexFetchSpec
9696
if err := rowenc.InitIndexFetchSpec(
97-
&spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* columnIDs */
97+
&spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* fetchColumnIDs */
9898
); err != nil {
9999
return err
100100
}

pkg/sql/distsql_physical_planner.go

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,9 @@ var (
461461
cannotDistributeVectorSearchErr = newQueryNotSupportedError(
462462
"vector search operation cannot be distributed",
463463
)
464+
cannotDistributeSystemColumnsAndBufferedWrites = newQueryNotSupportedError(
465+
"system column (that requires MVCC decoding) is requested when writes have been buffered",
466+
)
464467
)
465468

466469
// mustWrapNode returns true if a node has no DistSQL-processor equivalent.
@@ -546,6 +549,7 @@ func checkSupportForPlanNode(
546549
node planNode,
547550
distSQLVisitor *distSQLExprCheckVisitor,
548551
sd *sessiondata.SessionData,
552+
txnHasBufferedWrites bool,
549553
) (retRec distRecommendation, retErr error) {
550554
if buildutil.CrdbTestBuild {
551555
defer func() {
@@ -563,19 +567,19 @@ func checkSupportForPlanNode(
563567
return shouldDistribute, nil
564568

565569
case *distinctNode:
566-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
570+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
567571

568572
case *exportNode:
569-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
573+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
570574

571575
case *filterNode:
572576
if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil {
573577
return cannotDistribute, err
574578
}
575-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
579+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
576580

577581
case *groupNode:
578-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
582+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
579583
if err != nil {
580584
return cannotDistribute, err
581585
}
@@ -604,10 +608,14 @@ func checkSupportForPlanNode(
604608
// TODO(nvanbenschoten): lift this restriction.
605609
return cannotDistribute, cannotDistributeRowLevelLockingErr
606610
}
607-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
611+
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
612+
// TODO(#144166): relax this.
613+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
614+
}
615+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
608616

609617
case *invertedFilterNode:
610-
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd)
618+
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd, txnHasBufferedWrites)
611619

612620
case *invertedJoinNode:
613621
if n.fetch.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
@@ -617,10 +625,14 @@ func checkSupportForPlanNode(
617625
// TODO(nvanbenschoten): lift this restriction.
618626
return cannotDistribute, cannotDistributeRowLevelLockingErr
619627
}
628+
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
629+
// TODO(#144166): relax this.
630+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
631+
}
620632
if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil {
621633
return cannotDistribute, err
622634
}
623-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
635+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
624636
if err != nil {
625637
return cannotDistribute, err
626638
}
@@ -633,11 +645,11 @@ func checkSupportForPlanNode(
633645
if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil {
634646
return cannotDistribute, err
635647
}
636-
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
648+
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites)
637649
if err != nil {
638650
return cannotDistribute, err
639651
}
640-
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
652+
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites)
641653
if err != nil {
642654
return cannotDistribute, err
643655
}
@@ -658,7 +670,7 @@ func checkSupportForPlanNode(
658670
// Note that we don't need to check whether we support distribution of
659671
// n.countExpr or n.offsetExpr because those expressions are evaluated
660672
// locally, during the physical planning.
661-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
673+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
662674

663675
case *lookupJoinNode:
664676
if n.remoteLookupExpr != nil || n.remoteOnlyLookups {
@@ -672,7 +684,10 @@ func checkSupportForPlanNode(
672684
// TODO(nvanbenschoten): lift this restriction.
673685
return cannotDistribute, cannotDistributeRowLevelLockingErr
674686
}
675-
687+
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
688+
// TODO(#144166): relax this.
689+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
690+
}
676691
if err := checkExprForDistSQL(n.lookupExpr, distSQLVisitor); err != nil {
677692
return cannotDistribute, err
678693
}
@@ -682,7 +697,7 @@ func checkSupportForPlanNode(
682697
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
683698
return cannotDistribute, err
684699
}
685-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
700+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
686701
if err != nil {
687702
return cannotDistribute, err
688703
}
@@ -699,15 +714,15 @@ func checkSupportForPlanNode(
699714
return cannotDistribute, err
700715
}
701716
}
702-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
717+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
703718

704719
case *renderNode:
705720
for _, e := range n.render {
706721
if err := checkExprForDistSQL(e, distSQLVisitor); err != nil {
707722
return cannotDistribute, err
708723
}
709724
}
710-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
725+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
711726

712727
case *scanNode:
713728
if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
@@ -721,6 +736,10 @@ func checkSupportForPlanNode(
721736
// This is a locality optimized scan.
722737
return cannotDistribute, localityOptimizedOpNotDistributableErr
723738
}
739+
if txnHasBufferedWrites && n.fetchPlanningInfo.requiresMVCCDecoding() {
740+
// TODO(#144166): relax this.
741+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
742+
}
724743
scanRec := canDistribute
725744
if n.estimatedRowCount != 0 {
726745
var suffix string
@@ -748,7 +767,7 @@ func checkSupportForPlanNode(
748767
return scanRec, nil
749768

750769
case *sortNode:
751-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
770+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
752771
if err != nil {
753772
return cannotDistribute, err
754773
}
@@ -765,7 +784,7 @@ func checkSupportForPlanNode(
765784
return rec.compose(sortRec), nil
766785

767786
case *topKNode:
768-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
787+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
769788
if err != nil {
770789
return cannotDistribute, err
771790
}
@@ -785,11 +804,11 @@ func checkSupportForPlanNode(
785804
return canDistribute, nil
786805

787806
case *unionNode:
788-
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
807+
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites)
789808
if err != nil {
790809
return cannotDistribute, err
791810
}
792-
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
811+
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites)
793812
if err != nil {
794813
return cannotDistribute, err
795814
}
@@ -819,7 +838,7 @@ func checkSupportForPlanNode(
819838
return cannotDistribute, cannotDistributeVectorSearchErr
820839

821840
case *windowNode:
822-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
841+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
823842
if err != nil {
824843
return cannotDistribute, err
825844
}
@@ -845,6 +864,10 @@ func checkSupportForPlanNode(
845864
// TODO(nvanbenschoten): lift this restriction.
846865
return cannotDistribute, cannotDistributeRowLevelLockingErr
847866
}
867+
if txnHasBufferedWrites && side.fetch.requiresMVCCDecoding() {
868+
// TODO(#144166): relax this.
869+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
870+
}
848871
}
849872
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
850873
return cannotDistribute, err
@@ -864,8 +887,9 @@ func checkSupportForInvertedFilterNode(
864887
n *invertedFilterNode,
865888
distSQLVisitor *distSQLExprCheckVisitor,
866889
sd *sessiondata.SessionData,
890+
txnHasBufferedWrites bool,
867891
) (distRecommendation, error) {
868-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
892+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
869893
if err != nil {
870894
return cannotDistribute, err
871895
}
@@ -5399,7 +5423,16 @@ func checkScanParallelizationIfLocal(
53995423
if len(n.reqOrdering) == 0 && n.parallelize {
54005424
hasScanNodeToParallelize = true
54015425
}
5402-
case *distinctNode, *explainVecNode, *indexJoinNode, *limitNode,
5426+
if n.fetchPlanningInfo.requiresMVCCDecoding() {
5427+
prohibitParallelization = true
5428+
return
5429+
}
5430+
case *indexJoinNode:
5431+
if n.fetch.requiresMVCCDecoding() {
5432+
prohibitParallelization = true
5433+
return
5434+
}
5435+
case *distinctNode, *explainVecNode, *limitNode,
54035436
*ordinalityNode, *sortNode, *unionNode, *valuesNode:
54045437
default:
54055438
prohibitParallelization = true

0 commit comments

Comments
 (0)