Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,13 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool {
return tc.hasPerformedWritesLocked()
}

// HasBufferedWrites is part of the TxnSender interface.
func (tc *TxnCoordSender) HasBufferedWrites() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.hasBufferedWritesLocked()
}

// TestingShouldRetry is part of the TxnSender interface.
func (tc *TxnCoordSender) TestingShouldRetry() bool {
tc.mu.Lock()
Expand All @@ -1715,3 +1722,7 @@ func (tc *TxnCoordSender) hasPerformedReadsLocked() bool {
func (tc *TxnCoordSender) hasPerformedWritesLocked() bool {
return tc.mu.txn.Sequence != 0
}

func (tc *TxnCoordSender) hasBufferedWritesLocked() bool {
return tc.interceptorAlloc.txnWriteBuffer.hasBufferedWrites()
}
5 changes: 5 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool {
panic("unimplemented")
}

// HasBufferedWrites is part of TxnSenderFactory.
func (m *MockTransactionalSender) HasBufferedWrites() bool {
return false
}

// TestingShouldRetry is part of TxnSenderFactory.
func (m *MockTransactionalSender) TestingShouldRetry() bool {
return false
Expand Down
14 changes: 10 additions & 4 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,12 @@ type TxnSender interface {
SetOmitInRangefeeds()

// SetBufferedWritesEnabled toggles whether the writes are buffered on the
// gateway node until the commit time. Only allowed on the RootTxn. Buffered
// writes cannot be enabled on a txn that performed any requests. When
// disabling buffered writes, if there are any writes in the buffer, they
// are flushed with the next BatchRequest.
// gateway node until the commit time. Buffered writes cannot be enabled on
// a txn that performed any requests. When disabling buffered writes, if
// there are any writes in the buffer, they are flushed with the next
// BatchRequest.
//
// Only allowed on the RootTxn.
SetBufferedWritesEnabled(bool)

// BufferedWritesEnabled returns whether the buffered writes are enabled.
Expand Down Expand Up @@ -379,6 +381,10 @@ type TxnSender interface {
// transaction's current epoch.
HasPerformedWrites() bool

// HasBufferedWrites returns true if a write has been buffered for the
// transaction's current epoch.
HasBufferedWrites() bool

// TestingShouldRetry returns true if transaction retry errors should be
// randomly returned to callers. Note that it is the responsibility of
// (*kv.DB).Txn() to return the retries. This lives here since the
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,18 @@ func (txn *Txn) debugNameLocked() string {
return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID)
}

// SetBufferedWritesEnabled toggles whether the writes are buffered on the
// gateway node until the commit time. Buffered writes cannot be enabled on a
// txn that performed any requests. When disabling buffered writes, if there are
// any writes in the buffer, they are flushed with the next BatchRequest.
//
// Only allowed on the RootTxn.
func (txn *Txn) SetBufferedWritesEnabled(enabled bool) {
if txn.typ != RootTxn {
panic(errors.AssertionFailedf("SetBufferedWritesEnabled() called on leaf txn"))
panic(errors.AssertionFailedf(
"SetBufferedWritesEnabled(%t) called on leaf txn (buffer empty? %t)",
enabled, txn.HasBufferedWrites(),
))
}

txn.mu.Lock()
Expand Down Expand Up @@ -1862,6 +1871,14 @@ func (txn *Txn) HasPerformedWrites() bool {
return txn.mu.sender.HasPerformedWrites()
}

// HasBufferedWrites returns true if a write has been buffered for the
// transaction's current epoch.
func (txn *Txn) HasBufferedWrites() bool {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.HasBufferedWrites()
}

// AdmissionHeader returns the admission header for work done in the context
// of this transaction.
func (txn *Txn) AdmissionHeader() kvpb.AdmissionHeader {
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,7 @@ func runPlanInsidePlan(
}
}

distributePlan, distSQLProhibitedErr := getPlanDistribution(
ctx, plannerCopy.Descriptors().HasUncommittedTypes(),
plannerCopy.SessionData(), plan.main, &plannerCopy.distSQLVisitor,
)
distributePlan, distSQLProhibitedErr := plannerCopy.getPlanDistribution(ctx, plan.main)
distributeType := DistributionType(LocalDistribution)
if distributePlan.WillDistribute() {
distributeType = FullDistribution
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@ func (cf *cFetcher) Init(
// MVCC decoding.
if cf.mvccDecodeStrategy == storage.MVCCDecodingRequired {
if cf.txn != nil && cf.txn.BufferedWritesEnabled() {
if cf.txn.Type() == kv.LeafTxn {
// We're only allowed to disable buffered writes on the RootTxn.
// If we have a LeafTxn, we'll return an assertion error instead
// of crashing.
//
// Note that we might have a LeafTxn with no buffered writes, in
// which case BufferedWritesEnabled() is false.
return errors.AssertionFailedf("got LeafTxn when MVCC decoding is required")
}
cf.txn.SetBufferedWritesEnabled(false /* enabled */)
}
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2858,10 +2858,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}
}
}
distributePlan, distSQLProhibitedErr := getPlanDistribution(
ctx, planner.Descriptors().HasUncommittedTypes(),
ex.sessionData(), planner.curPlan.main, &planner.distSQLVisitor,
)
distributePlan, distSQLProhibitedErr := planner.getPlanDistribution(ctx, planner.curPlan.main)
if afterGetPlanDistribution != nil {
afterGetPlanDistribution()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (d *deleteRangeNode) startExec(params runParams) error {
// fetch kvs.
var spec fetchpb.IndexFetchSpec
if err := rowenc.InitIndexFetchSpec(
&spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* columnIDs */
&spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* fetchColumnIDs */
); err != nil {
return err
}
Expand Down
75 changes: 54 additions & 21 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ var (
cannotDistributeVectorSearchErr = newQueryNotSupportedError(
"vector search operation cannot be distributed",
)
cannotDistributeSystemColumnsAndBufferedWrites = newQueryNotSupportedError(
"system column (that requires MVCC decoding) is requested when writes have been buffered",
)
)

// mustWrapNode returns true if a node has no DistSQL-processor equivalent.
Expand Down Expand Up @@ -546,6 +549,7 @@ func checkSupportForPlanNode(
node planNode,
distSQLVisitor *distSQLExprCheckVisitor,
sd *sessiondata.SessionData,
txnHasBufferedWrites bool,
) (retRec distRecommendation, retErr error) {
if buildutil.CrdbTestBuild {
defer func() {
Expand All @@ -563,19 +567,19 @@ func checkSupportForPlanNode(
return shouldDistribute, nil

case *distinctNode:
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)

case *exportNode:
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)

case *filterNode:
if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil {
return cannotDistribute, err
}
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)

case *groupNode:
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
Expand Down Expand Up @@ -604,10 +608,14 @@ func checkSupportForPlanNode(
// TODO(nvanbenschoten): lift this restriction.
return cannotDistribute, cannotDistributeRowLevelLockingErr
}
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
// TODO(#144166): relax this.
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
}
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)

case *invertedFilterNode:
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd)
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd, txnHasBufferedWrites)

case *invertedJoinNode:
if n.fetch.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand All @@ -617,10 +625,14 @@ func checkSupportForPlanNode(
// TODO(nvanbenschoten): lift this restriction.
return cannotDistribute, cannotDistributeRowLevelLockingErr
}
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
// TODO(#144166): relax this.
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
}
if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -633,11 +645,11 @@ func checkSupportForPlanNode(
if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil {
return cannotDistribute, err
}
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -658,7 +670,7 @@ func checkSupportForPlanNode(
// Note that we don't need to check whether we support distribution of
// n.countExpr or n.offsetExpr because those expressions are evaluated
// locally, during the physical planning.
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)

case *lookupJoinNode:
if n.remoteLookupExpr != nil || n.remoteOnlyLookups {
Expand All @@ -672,7 +684,10 @@ func checkSupportForPlanNode(
// TODO(nvanbenschoten): lift this restriction.
return cannotDistribute, cannotDistributeRowLevelLockingErr
}

if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
// TODO(#144166): relax this.
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
}
if err := checkExprForDistSQL(n.lookupExpr, distSQLVisitor); err != nil {
return cannotDistribute, err
}
Expand All @@ -682,7 +697,7 @@ func checkSupportForPlanNode(
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -699,15 +714,15 @@ func checkSupportForPlanNode(
return cannotDistribute, err
}
}
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)

case *renderNode:
for _, e := range n.render {
if err := checkExprForDistSQL(e, distSQLVisitor); err != nil {
return cannotDistribute, err
}
}
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)

case *scanNode:
if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand All @@ -721,6 +736,10 @@ func checkSupportForPlanNode(
// This is a locality optimized scan.
return cannotDistribute, localityOptimizedOpNotDistributableErr
}
if txnHasBufferedWrites && n.fetchPlanningInfo.requiresMVCCDecoding() {
// TODO(#144166): relax this.
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
}
scanRec := canDistribute
if n.estimatedRowCount != 0 {
var suffix string
Expand Down Expand Up @@ -748,7 +767,7 @@ func checkSupportForPlanNode(
return scanRec, nil

case *sortNode:
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -765,7 +784,7 @@ func checkSupportForPlanNode(
return rec.compose(sortRec), nil

case *topKNode:
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -785,11 +804,11 @@ func checkSupportForPlanNode(
return canDistribute, nil

case *unionNode:
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
Expand Down Expand Up @@ -819,7 +838,7 @@ func checkSupportForPlanNode(
return cannotDistribute, cannotDistributeVectorSearchErr

case *windowNode:
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -845,6 +864,10 @@ func checkSupportForPlanNode(
// TODO(nvanbenschoten): lift this restriction.
return cannotDistribute, cannotDistributeRowLevelLockingErr
}
if txnHasBufferedWrites && side.fetch.requiresMVCCDecoding() {
// TODO(#144166): relax this.
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
}
}
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
return cannotDistribute, err
Expand All @@ -864,8 +887,9 @@ func checkSupportForInvertedFilterNode(
n *invertedFilterNode,
distSQLVisitor *distSQLExprCheckVisitor,
sd *sessiondata.SessionData,
txnHasBufferedWrites bool,
) (distRecommendation, error) {
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
if err != nil {
return cannotDistribute, err
}
Expand Down Expand Up @@ -5399,7 +5423,16 @@ func checkScanParallelizationIfLocal(
if len(n.reqOrdering) == 0 && n.parallelize {
hasScanNodeToParallelize = true
}
case *distinctNode, *explainVecNode, *indexJoinNode, *limitNode,
if n.fetchPlanningInfo.requiresMVCCDecoding() {
prohibitParallelization = true
return
}
case *indexJoinNode:
if n.fetch.requiresMVCCDecoding() {
prohibitParallelization = true
return
}
case *distinctNode, *explainVecNode, *limitNode,
*ordinalityNode, *sortNode, *unionNode, *valuesNode:
default:
prohibitParallelization = true
Expand Down
Loading
Loading