diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 36a65d316f8e..b16c5802e00f 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1691,6 +1691,13 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool { return tc.hasPerformedWritesLocked() } +// HasBufferedWrites is part of the TxnSender interface. +func (tc *TxnCoordSender) HasBufferedWrites() bool { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.hasBufferedWritesLocked() +} + // TestingShouldRetry is part of the TxnSender interface. func (tc *TxnCoordSender) TestingShouldRetry() bool { tc.mu.Lock() @@ -1715,3 +1722,7 @@ func (tc *TxnCoordSender) hasPerformedReadsLocked() bool { func (tc *TxnCoordSender) hasPerformedWritesLocked() bool { return tc.mu.txn.Sequence != 0 } + +func (tc *TxnCoordSender) hasBufferedWritesLocked() bool { + return tc.interceptorAlloc.txnWriteBuffer.hasBufferedWrites() +} diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 152246beaa06..7dcdb2c35d4a 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -282,6 +282,11 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool { panic("unimplemented") } +// HasBufferedWrites is part of TxnSenderFactory. +func (m *MockTransactionalSender) HasBufferedWrites() bool { + return false +} + // TestingShouldRetry is part of TxnSenderFactory. func (m *MockTransactionalSender) TestingShouldRetry() bool { return false diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 6cd6c4c1b43b..faf6e4f5c8d3 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -134,10 +134,12 @@ type TxnSender interface { SetOmitInRangefeeds() // SetBufferedWritesEnabled toggles whether the writes are buffered on the - // gateway node until the commit time. Only allowed on the RootTxn. Buffered - // writes cannot be enabled on a txn that performed any requests. When - // disabling buffered writes, if there are any writes in the buffer, they - // are flushed with the next BatchRequest. + // gateway node until the commit time. Buffered writes cannot be enabled on + // a txn that performed any requests. When disabling buffered writes, if + // there are any writes in the buffer, they are flushed with the next + // BatchRequest. + // + // Only allowed on the RootTxn. SetBufferedWritesEnabled(bool) // BufferedWritesEnabled returns whether the buffered writes are enabled. @@ -379,6 +381,10 @@ type TxnSender interface { // transaction's current epoch. HasPerformedWrites() bool + // HasBufferedWrites returns true if a write has been buffered for the + // transaction's current epoch. + HasBufferedWrites() bool + // TestingShouldRetry returns true if transaction retry errors should be // randomly returned to callers. Note that it is the responsibility of // (*kv.DB).Txn() to return the retries. This lives here since the diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 92d95e8b700f..a4a110d554ab 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -443,9 +443,18 @@ func (txn *Txn) debugNameLocked() string { return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID) } +// SetBufferedWritesEnabled toggles whether the writes are buffered on the +// gateway node until the commit time. Buffered writes cannot be enabled on a +// txn that performed any requests. When disabling buffered writes, if there are +// any writes in the buffer, they are flushed with the next BatchRequest. +// +// Only allowed on the RootTxn. func (txn *Txn) SetBufferedWritesEnabled(enabled bool) { if txn.typ != RootTxn { - panic(errors.AssertionFailedf("SetBufferedWritesEnabled() called on leaf txn")) + panic(errors.AssertionFailedf( + "SetBufferedWritesEnabled(%t) called on leaf txn (buffer empty? %t)", + enabled, txn.HasBufferedWrites(), + )) } txn.mu.Lock() @@ -1862,6 +1871,14 @@ func (txn *Txn) HasPerformedWrites() bool { return txn.mu.sender.HasPerformedWrites() } +// HasBufferedWrites returns true if a write has been buffered for the +// transaction's current epoch. +func (txn *Txn) HasBufferedWrites() bool { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.HasBufferedWrites() +} + // AdmissionHeader returns the admission header for work done in the context // of this transaction. func (txn *Txn) AdmissionHeader() kvpb.AdmissionHeader { diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index 89a1fd4162ef..16fccac9c249 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -315,10 +315,7 @@ func runPlanInsidePlan( } } - distributePlan, distSQLProhibitedErr := getPlanDistribution( - ctx, plannerCopy.Descriptors().HasUncommittedTypes(), - plannerCopy.SessionData(), plan.main, &plannerCopy.distSQLVisitor, - ) + distributePlan, distSQLProhibitedErr := plannerCopy.getPlanDistribution(ctx, plan.main) distributeType := DistributionType(LocalDistribution) if distributePlan.WillDistribute() { distributeType = FullDistribution diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index a0cbce939428..963b2f691640 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -460,6 +460,15 @@ func (cf *cFetcher) Init( // MVCC decoding. if cf.mvccDecodeStrategy == storage.MVCCDecodingRequired { if cf.txn != nil && cf.txn.BufferedWritesEnabled() { + if cf.txn.Type() == kv.LeafTxn { + // We're only allowed to disable buffered writes on the RootTxn. + // If we have a LeafTxn, we'll return an assertion error instead + // of crashing. + // + // Note that we might have a LeafTxn with no buffered writes, in + // which case BufferedWritesEnabled() is false. + return errors.AssertionFailedf("got LeafTxn when MVCC decoding is required") + } cf.txn.SetBufferedWritesEnabled(false /* enabled */) } } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 83701c4d8cfb..9a1781cd9850 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2858,10 +2858,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( } } } - distributePlan, distSQLProhibitedErr := getPlanDistribution( - ctx, planner.Descriptors().HasUncommittedTypes(), - ex.sessionData(), planner.curPlan.main, &planner.distSQLVisitor, - ) + distributePlan, distSQLProhibitedErr := planner.getPlanDistribution(ctx, planner.curPlan.main) if afterGetPlanDistribution != nil { afterGetPlanDistribution() } diff --git a/pkg/sql/delete_range.go b/pkg/sql/delete_range.go index 3fba33de503a..b78c5cbfb363 100644 --- a/pkg/sql/delete_range.go +++ b/pkg/sql/delete_range.go @@ -94,7 +94,7 @@ func (d *deleteRangeNode) startExec(params runParams) error { // fetch kvs. var spec fetchpb.IndexFetchSpec if err := rowenc.InitIndexFetchSpec( - &spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* columnIDs */ + &spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* fetchColumnIDs */ ); err != nil { return err } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index c0cf2437b354..54cd8af195e6 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -461,6 +461,9 @@ var ( cannotDistributeVectorSearchErr = newQueryNotSupportedError( "vector search operation cannot be distributed", ) + cannotDistributeSystemColumnsAndBufferedWrites = newQueryNotSupportedError( + "system column (that requires MVCC decoding) is requested when writes have been buffered", + ) ) // mustWrapNode returns true if a node has no DistSQL-processor equivalent. @@ -546,6 +549,7 @@ func checkSupportForPlanNode( node planNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, + txnHasBufferedWrites bool, ) (retRec distRecommendation, retErr error) { if buildutil.CrdbTestBuild { defer func() { @@ -563,19 +567,19 @@ func checkSupportForPlanNode( return shouldDistribute, nil case *distinctNode: - return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) case *exportNode: - return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) case *filterNode: if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) case *groupNode: - rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } @@ -604,10 +608,14 @@ func checkSupportForPlanNode( // TODO(nvanbenschoten): lift this restriction. return cannotDistribute, cannotDistributeRowLevelLockingErr } - return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() { + // TODO(#144166): relax this. + return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites + } + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) case *invertedFilterNode: - return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd) + return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd, txnHasBufferedWrites) case *invertedJoinNode: if n.fetch.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -617,10 +625,14 @@ func checkSupportForPlanNode( // TODO(nvanbenschoten): lift this restriction. return cannotDistribute, cannotDistributeRowLevelLockingErr } + if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() { + // TODO(#144166): relax this. + return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites + } if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } @@ -633,11 +645,11 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd) + recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd) + recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } @@ -658,7 +670,7 @@ func checkSupportForPlanNode( // Note that we don't need to check whether we support distribution of // n.countExpr or n.offsetExpr because those expressions are evaluated // locally, during the physical planning. - return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) case *lookupJoinNode: if n.remoteLookupExpr != nil || n.remoteOnlyLookups { @@ -672,7 +684,10 @@ func checkSupportForPlanNode( // TODO(nvanbenschoten): lift this restriction. return cannotDistribute, cannotDistributeRowLevelLockingErr } - + if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() { + // TODO(#144166): relax this. + return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites + } if err := checkExprForDistSQL(n.lookupExpr, distSQLVisitor); err != nil { return cannotDistribute, err } @@ -682,7 +697,7 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } @@ -699,7 +714,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) case *renderNode: for _, e := range n.render { @@ -707,7 +722,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) case *scanNode: if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -721,6 +736,10 @@ func checkSupportForPlanNode( // This is a locality optimized scan. return cannotDistribute, localityOptimizedOpNotDistributableErr } + if txnHasBufferedWrites && n.fetchPlanningInfo.requiresMVCCDecoding() { + // TODO(#144166): relax this. + return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites + } scanRec := canDistribute if n.estimatedRowCount != 0 { var suffix string @@ -748,7 +767,7 @@ func checkSupportForPlanNode( return scanRec, nil case *sortNode: - rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } @@ -765,7 +784,7 @@ func checkSupportForPlanNode( return rec.compose(sortRec), nil case *topKNode: - rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } @@ -785,11 +804,11 @@ func checkSupportForPlanNode( return canDistribute, nil case *unionNode: - recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd) + recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd) + recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } @@ -819,7 +838,7 @@ func checkSupportForPlanNode( return cannotDistribute, cannotDistributeVectorSearchErr case *windowNode: - rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } @@ -845,6 +864,10 @@ func checkSupportForPlanNode( // TODO(nvanbenschoten): lift this restriction. return cannotDistribute, cannotDistributeRowLevelLockingErr } + if txnHasBufferedWrites && side.fetch.requiresMVCCDecoding() { + // TODO(#144166): relax this. + return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites + } } if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil { return cannotDistribute, err @@ -864,8 +887,9 @@ func checkSupportForInvertedFilterNode( n *invertedFilterNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, + txnHasBufferedWrites bool, ) (distRecommendation, error) { - rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { return cannotDistribute, err } @@ -5399,7 +5423,16 @@ func checkScanParallelizationIfLocal( if len(n.reqOrdering) == 0 && n.parallelize { hasScanNodeToParallelize = true } - case *distinctNode, *explainVecNode, *indexJoinNode, *limitNode, + if n.fetchPlanningInfo.requiresMVCCDecoding() { + prohibitParallelization = true + return + } + case *indexJoinNode: + if n.fetch.requiresMVCCDecoding() { + prohibitParallelization = true + return + } + case *distinctNode, *explainVecNode, *limitNode, *ordinalityNode, *sortNode, *unionNode, *valuesNode: default: prohibitParallelization = true diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index fcc1b247cd0e..4a544bb4be9d 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" @@ -1947,6 +1948,10 @@ func TestCheckScanParallelizationIfLocal(t *testing.T) { require.NoError(t, b.RunPostDeserializationChanges()) return b.BuildImmutableTable() } + mvccTimestampSysCol, err := catalog.MustFindColumnByID(makeTableDesc(), colinfo.MVCCTimestampColumnID) + require.NoError(t, err) + tableOidSysCol, err := catalog.MustFindColumnByID(makeTableDesc(), colinfo.TableOIDColumnID) + require.NoError(t, err) scanToParallelize := &scanNode{parallelize: true} for _, tc := range []struct { @@ -2070,6 +2075,34 @@ func TestCheckScanParallelizationIfLocal(t *testing.T) { // windowNode is not fully supported by the vectorized. prohibitParallelization: true, }, + { + plan: planComponents{main: planMaybePhysical{planNode: &scanNode{ + fetchPlanningInfo: fetchPlanningInfo{catalogCols: []catalog.Column{mvccTimestampSysCol}}, + }}}, + // Usage of crdb_internal_mvcc_timestamp prohibits parallelization + // since it forces usage of the LeafTxn, yet for buffered writes we + // might need to force usage of the RootTxn. + // TODO(#144166): relax this. + prohibitParallelization: true, + }, + { + plan: planComponents{main: planMaybePhysical{planNode: &scanNode{ + fetchPlanningInfo: fetchPlanningInfo{catalogCols: []catalog.Column{tableOidSysCol}}, + }}}, + // Usage of tableoid system column doesn't force usage of the + // LeafTxn, so parallelization is ok. + prohibitParallelization: false, + }, + { + plan: planComponents{main: planMaybePhysical{planNode: &indexJoinNode{indexJoinPlanningInfo: indexJoinPlanningInfo{ + fetch: fetchPlanningInfo{catalogCols: []catalog.Column{mvccTimestampSysCol}}}, + }}}, + // Usage of crdb_internal_mvcc_timestamp prohibits parallelization + // since it forces usage of the LeafTxn, yet for buffered writes we + // might need to force usage of the RootTxn. + // TODO(#144166): relax this. + prohibitParallelization: true, + }, // Unsupported edge cases. { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 686f73caa1eb..d4e56055c88f 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -824,6 +824,12 @@ func (dsp *DistSQLPlanner) Run( // which are using the internal executor is error-prone, so we just // disable the Streamer API for the "super-set" of problematic // cases. + // + // Furthermore, when we have buffered some writes and a system + // column that requires MVCC decoding is requested, we disable the + // usage of the streamer since we must have access to the RootTxn to + // handle such scenario. + // TODO(#144166): relax this. mustUseRootTxn := func() bool { for _, p := range plan.Processors { if n := p.Spec.Core.LocalPlanNode; n != nil { @@ -831,6 +837,31 @@ func (dsp *DistSQLPlanner) Run( log.VEventf(ctx, 3, "must use root txn due to %q wrapped planNode", n.Name) return true } + } else if txn.HasBufferedWrites() { + switch { + case p.Spec.Core.TableReader != nil: + if fetchSpecRequiresMVCCDecoding(p.Spec.Core.TableReader.FetchSpec) { + log.VEventf(ctx, 3, "must use root txn due to system column that requires MVCC decoding") + return true + } + case p.Spec.Core.JoinReader != nil: + if fetchSpecRequiresMVCCDecoding(p.Spec.Core.JoinReader.FetchSpec) { + log.VEventf(ctx, 3, "must use root txn due to system column that requires MVCC decoding") + return true + } + case p.Spec.Core.InvertedJoiner != nil: + if fetchSpecRequiresMVCCDecoding(p.Spec.Core.InvertedJoiner.FetchSpec) { + log.VEventf(ctx, 3, "must use root txn due to system column that requires MVCC decoding") + return true + } + case p.Spec.Core.ZigzagJoiner != nil: + for _, side := range p.Spec.Core.ZigzagJoiner.Sides { + if fetchSpecRequiresMVCCDecoding(side.FetchSpec) { + log.VEventf(ctx, 3, "must use root txn due to system column that requires MVCC decoding") + return true + } + } + } } } return false @@ -1901,10 +1932,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( skipDistSQLDiagramGeneration bool, mustUseLeafTxn bool, ) error { - subqueryDistribution, distSQLProhibitedErr := getPlanDistribution( - ctx, planner.Descriptors().HasUncommittedTypes(), - planner.SessionData(), subqueryPlan.plan, &planner.distSQLVisitor, - ) + subqueryDistribution, distSQLProhibitedErr := planner.getPlanDistribution(ctx, subqueryPlan.plan) distribute := DistributionType(LocalDistribution) if subqueryDistribution.WillDistribute() { distribute = FullDistribution @@ -2512,10 +2540,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery( associateNodeWithComponents func(exec.Node, execComponents), addTopLevelQueryStats func(stats *topLevelQueryStats), ) error { - postqueryDistribution, distSQLProhibitedErr := getPlanDistribution( - ctx, planner.Descriptors().HasUncommittedTypes(), - planner.SessionData(), postqueryPlan, &planner.distSQLVisitor, - ) + postqueryDistribution, distSQLProhibitedErr := planner.getPlanDistribution(ctx, postqueryPlan) distribute := DistributionType(LocalDistribution) if postqueryDistribution.WillDistribute() { distribute = FullDistribution diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index c1313167be09..f272362ada02 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -330,6 +330,15 @@ func (e *distSQLSpecExecFactory) ConstructScan( // TODO(nvanbenschoten): lift this restriction. recommendation = cannotDistribute } + for _, colID := range columnIDs { + if columnIDRequiresMVCCDecoding(colID) { + // TODO(yuzefovich): only require MVCC decoding when txn has + // buffered writes. + // TODO(#144166): relax this. + recommendation = cannotDistribute + break + } + } // Note that we don't do anything about the possible filter here since we // don't know yet whether we will have it. ConstructFilter is responsible @@ -870,11 +879,15 @@ func (e *distSQLSpecExecFactory) ConstructIndexJoin( } recommendation := canDistribute - if locking.Strength != tree.ForNone { + if locking.Strength != tree.ForNone || // Index joins that are performing row-level locking cannot currently be // distributed because their locks would not be propagated back to the root // transaction coordinator. // TODO(nvanbenschoten): lift this restriction. + fetch.requiresMVCCDecoding() { + // TODO(yuzefovich): only require MVCC decoding when txn has buffered + // writes. + // TODO(#144166): relax this. recommendation = cannotDistribute physPlan.EnsureSingleStreamOnGateway(e.ctx, nil /* finalizeLastStageCb */) } @@ -960,16 +973,19 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin( } recommendation := e.checkExprsAndMaybeMergeLastStage([]tree.TypedExpr{lookupExpr, onCond}, physPlan) - if locking.Strength != tree.ForNone { - // Lookup joins that are performing row-level locking cannot currently be - // distributed because their locks would not be propagated back to the root - // transaction coordinator. + noDistribution := locking.Strength != tree.ForNone || + // Lookup joins that are performing row-level locking cannot + // currently be distributed because their locks would not be + // propagated back to the root transaction coordinator. // TODO(nvanbenschoten): lift this restriction. - recommendation = cannotDistribute - physPlan.EnsureSingleStreamOnGateway(e.ctx, nil /* finalizeLastStageCb */) - } else if remoteLookupExpr != nil || remoteOnlyLookups { - // Do not distribute locality-optimized joins, since it would defeat the - // purpose of the optimization. + (remoteLookupExpr != nil || remoteOnlyLookups) || + // Do not distribute locality-optimized joins, since it would defeat + // the purpose of the optimization. + fetch.requiresMVCCDecoding() + // TODO(yuzefovich): only require MVCC decoding when txn has buffered + // writes. + // TODO(#144166): relax this. + if noDistribution { recommendation = cannotDistribute physPlan.EnsureSingleStreamOnGateway(e.ctx, nil /* finalizeLastStageCb */) } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 1af13e3e3d6e..2b2e91c93fbb 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -2212,12 +2212,8 @@ func shouldDistributeGivenRecAndMode( // remote node to the gateway. // TODO(yuzefovich): this will be easy to solve once the DistSQL spec factory is // completed but is quite annoying to do at the moment. -func getPlanDistribution( - ctx context.Context, - txnHasUncommittedTypes bool, - sd *sessiondata.SessionData, - plan planMaybePhysical, - distSQLVisitor *distSQLExprCheckVisitor, +func (p *planner) getPlanDistribution( + ctx context.Context, plan planMaybePhysical, ) (_ physicalplan.PlanDistribution, distSQLProhibitedErr error) { if plan.isPhysicalPlan() { // TODO(#47473): store the distSQLProhibitedErr for DistSQL spec factory @@ -2228,10 +2224,11 @@ func getPlanDistribution( // If this transaction has modified or created any types, it is not safe to // distribute due to limitations around leasing descriptors modified in the // current transaction. - if txnHasUncommittedTypes { + if p.Descriptors().HasUncommittedDescriptors() { return physicalplan.LocalPlan, nil } + sd := p.SessionData() if sd.DistSQLMode == sessiondatapb.DistSQLOff { return physicalplan.LocalPlan, nil } @@ -2241,7 +2238,20 @@ func getPlanDistribution( return physicalplan.LocalPlan, nil } - rec, err := checkSupportForPlanNode(ctx, plan.planNode, distSQLVisitor, sd) + // Determine whether the txn has buffered some writes. + txnHasBufferedWrites := p.txn.HasBufferedWrites() + if sd.BufferedWritesEnabled && p.curPlan.main == plan { + // Given that we're checking the plan distribution for the main query + // _before_ executing any of the subqueries, it's possible that some + // writes will have been buffered by one of the subqueries. In such a + // case, we'll assume that if the query as a whole has any mutations AND + // it has at least one subquery, then that subquery will perform some + // writes that will be buffered. + if p.curPlan.flags.IsSet(planFlagContainsMutation) && len(p.curPlan.subqueryPlans) > 0 { + txnHasBufferedWrites = true + } + } + rec, err := checkSupportForPlanNode(ctx, plan.planNode, &p.distSQLVisitor, sd, txnHasBufferedWrites) if err != nil { // Don't use distSQL for this request. log.VEventf(ctx, 1, "query not supported for distSQL: %s", err) diff --git a/pkg/sql/explain_plan.go b/pkg/sql/explain_plan.go index dc4c876788ff..32a7694b7190 100644 --- a/pkg/sql/explain_plan.go +++ b/pkg/sql/explain_plan.go @@ -58,10 +58,7 @@ func (e *explainPlanNode) startExec(params runParams) error { // Note that we delay adding the annotation about the distribution until // after the plan is finalized (when the physical plan is successfully // created). - distribution, _ := getPlanDistribution( - params.ctx, params.p.Descriptors().HasUncommittedTypes(), - params.extendedEvalCtx.SessionData(), plan.main, ¶ms.p.distSQLVisitor, - ) + distribution, _ := params.p.getPlanDistribution(params.ctx, plan.main) outerSubqueries := params.p.curPlan.subqueryPlans distSQLPlanner := params.extendedEvalCtx.DistSQLPlanner diff --git a/pkg/sql/explain_vec.go b/pkg/sql/explain_vec.go index 59957792a274..4abee9816e21 100644 --- a/pkg/sql/explain_vec.go +++ b/pkg/sql/explain_vec.go @@ -36,10 +36,7 @@ type explainVecNode struct { func (n *explainVecNode) startExec(params runParams) error { n.run.values = make(tree.Datums, 1) distSQLPlanner := params.extendedEvalCtx.DistSQLPlanner - distribution, _ := getPlanDistribution( - params.ctx, params.p.Descriptors().HasUncommittedTypes(), - params.extendedEvalCtx.SessionData(), n.plan.main, ¶ms.p.distSQLVisitor, - ) + distribution, _ := params.p.getPlanDistribution(params.ctx, n.plan.main) outerSubqueries := params.p.curPlan.subqueryPlans planCtx := newPlanningCtxForExplainPurposes(distSQLPlanner, params, n.plan.subqueryPlans, distribution) defer func() { diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_buffered_writes b/pkg/sql/logictest/testdata/logic_test/distsql_buffered_writes new file mode 100644 index 000000000000..b35d0cb0fd9a --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/distsql_buffered_writes @@ -0,0 +1,111 @@ +# LogicTest: 5node + +statement ok +SET kv_transaction_buffered_writes_enabled=true + +statement ok +SET CLUSTER SETTING kv.transaction.write_buffering.max_buffer_size = '2KiB'; + +subtest regression_151325 + +statement ok +CREATE TABLE kv (k INT PRIMARY KEY, v INT); +INSERT INTO kv VALUES (1, 1), (2, 2); + +statement ok +ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1, 2) AS g(i) + +retry +statement ok +ALTER TABLE kv EXPERIMENTAL_RELOCATE + SELECT ARRAY[i+1], i FROM generate_series(0, 2) AS g(i) + +# First txn performs a write that is buffered and then a stmt for which we +# should disable DistSQL. + +statement ok +BEGIN; + +# No writes have been buffered yet, so we shouldn't disable DistSQL. +query T +SELECT info FROM [EXPLAIN SELECT crdb_internal_mvcc_timestamp FROM kv] WHERE info LIKE 'distribution%' +---- +distribution: full + +statement ok +INSERT INTO kv VALUES (3, 3); + +query T +SELECT info FROM [EXPLAIN SELECT crdb_internal_mvcc_timestamp FROM kv] WHERE info LIKE 'distribution%' +---- +distribution: local + +# The tableoid system column doesn't require MVCC decoding, so it doesn't +# disable DistSQL. +query T +SELECT info FROM [EXPLAIN SELECT tableoid FROM kv] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Executing this query will flush the buffer and disable buffered writes for the +# txn. +statement count 3 +SELECT crdb_internal_mvcc_timestamp FROM kv; + +statement ok +COMMIT; + +# Another txn where the system column is fetched via the lookup join. +statement ok +BEGIN; + +# No writes have been buffered yet, so we shouldn't disable DistSQL. +query T +SELECT info FROM [EXPLAIN SELECT kv2.crdb_internal_mvcc_timestamp FROM kv AS kv1 INNER LOOKUP JOIN kv AS kv2 ON kv1.v = kv2.k] WHERE info LIKE 'distribution%' +---- +distribution: full + +statement ok +INSERT INTO kv VALUES (4, 4); + +query T +SELECT info FROM [EXPLAIN SELECT kv2.crdb_internal_mvcc_timestamp FROM kv AS kv1 INNER LOOKUP JOIN kv AS kv2 ON kv1.v = kv2.k] WHERE info LIKE 'distribution%' +---- +distribution: local + +# Executing this query will flush the buffer and disable buffered writes for the +# txn. +statement count 4 +SELECT kv2.crdb_internal_mvcc_timestamp FROM kv AS kv1 INNER LOOKUP JOIN kv AS kv2 ON kv1.v = kv2.k; + +statement ok +COMMIT; + +# Try another txn where the subquery in the first stmt buffers a write and +# DistSQL is disabled for the main query, but then the next stmt should +# encounter an error. + +statement ok +BEGIN; + +statement count 4 +SELECT + crdb_internal_mvcc_timestamp +FROM + [ + INSERT INTO kv VALUES (5, 5) RETURNING NULL + ], + kv; + +statement error duplicate key value violates unique constraint \"kv_pkey\" +INSERT INTO kv VALUES (5, 5); + +statement ok +ROLLBACK; + +query I +SELECT count(*) FROM kv; +---- +4 + +subtest end diff --git a/pkg/sql/logictest/testdata/logic_test/do b/pkg/sql/logictest/testdata/logic_test/do index 73e4355f19d2..2a3a68e916bc 100644 --- a/pkg/sql/logictest/testdata/logic_test/do +++ b/pkg/sql/logictest/testdata/logic_test/do @@ -286,3 +286,23 @@ END $$; subtest end + +statement ok +CREATE TABLE seed (_int8 INT8, _float8 FLOAT8); + +statement ok +INSERT INTO seed DEFAULT VALUES; + +statement ok +CREATE INDEX on seed (_int8, _float8); + +# Use the DO block to trigger runPlanInsidePlan code path, where we must disable +# usage of the Streamer for the SELECT query that uses +# 'crdb_internal_mvcc_timestamp' column after we've buffered some writes. +statement ok +DO $$ +BEGIN + UPDATE seed SET _int8 = 1; + SELECT _float8, _int8, crdb_internal_mvcc_timestamp FROM seed@seed__int8__float8_idx; +END; +$$; diff --git a/pkg/sql/logictest/tests/5node/BUILD.bazel b/pkg/sql/logictest/tests/5node/BUILD.bazel index 52bc50430958..9583f7762cec 100644 --- a/pkg/sql/logictest/tests/5node/BUILD.bazel +++ b/pkg/sql/logictest/tests/5node/BUILD.bazel @@ -12,7 +12,7 @@ go_test( "//build/toolchains:is_heavy": {"test.Pool": "heavy"}, "//conditions:default": {"test.Pool": "large"}, }), - shard_count = 21, + shard_count = 22, tags = ["cpu:3"], deps = [ "//pkg/base", diff --git a/pkg/sql/logictest/tests/5node/generated_test.go b/pkg/sql/logictest/tests/5node/generated_test.go index 7ca80935f06b..0e7795d90790 100644 --- a/pkg/sql/logictest/tests/5node/generated_test.go +++ b/pkg/sql/logictest/tests/5node/generated_test.go @@ -94,6 +94,13 @@ func TestLogic_distsql_agg( runLogicTest(t, "distsql_agg") } +func TestLogic_distsql_buffered_writes( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "distsql_buffered_writes") +} + func TestLogic_distsql_builtin( t *testing.T, ) { diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index a2f071db5a02..f7360bd194c2 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -413,6 +413,15 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { // MVCC decoding. if rf.mvccDecodeStrategy == storage.MVCCDecodingRequired { if rf.args.Txn != nil && rf.args.Txn.BufferedWritesEnabled() { + if rf.args.Txn.Type() == kv.LeafTxn { + // We're only allowed to disable buffered writes on the RootTxn. + // If we have a LeafTxn, we'll return an assertion error instead + // of crashing. + // + // Note that we might have a LeafTxn with no buffered writes, in + // which case BufferedWritesEnabled() is false. + return errors.AssertionFailedf("got LeafTxn when MVCC decoding is required") + } rf.args.Txn.SetBufferedWritesEnabled(false /* enabled */) } } @@ -540,6 +549,15 @@ func (rf *Fetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) error { // MVCC decoding. if rf.mvccDecodeStrategy == storage.MVCCDecodingRequired { if txn != nil && txn.BufferedWritesEnabled() { + if txn.Type() == kv.LeafTxn { + // We're only allowed to disable buffered writes on the RootTxn. + // If we have a LeafTxn, we'll return an assertion error instead + // of crashing. + // + // Note that we might have a LeafTxn with no buffered writes, in + // which case BufferedWritesEnabled() is false. + return errors.AssertionFailedf("got LeafTxn when MVCC decoding is required") + } txn.SetBufferedWritesEnabled(false /* enabled */) } } diff --git a/pkg/sql/scan.go b/pkg/sql/scan.go index cfb29910ae98..df9049bad7e1 100644 --- a/pkg/sql/scan.go +++ b/pkg/sql/scan.go @@ -11,8 +11,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/fetchpb" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -80,12 +82,7 @@ type fetchPlanningInfo struct { index catalog.Index colCfg scanColumnsConfig - // The table columns, possibly including ones currently in schema changes. - // TODO(radu/knz): currently we always load the entire row from KV and only - // skip unnecessary decodes to Datum. Investigate whether performance is to - // be gained (e.g. for tables with wide rows) by reading only certain - // columns from KV using point lookups instead of a single range lookup for - // the entire row. + // catalogCols contains only the columns that need to be fetched. catalogCols []catalog.Column // There is a 1-1 correspondence between catalogCols and columns. columns colinfo.ResultColumns @@ -235,3 +232,43 @@ func (n *scanNode) initDescSpecificCol(colCfg scanColumnsConfig, prefixCol catal n.columns = colinfo.ResultColumnsFromColumns(n.desc.GetID(), n.catalogCols) return nil } + +// columnIDRequiresMVCCDecoding returns whether the given columnID corresponds +// to a system column that requires MVCC decoding to be populated. +func columnIDRequiresMVCCDecoding(columnID descpb.ColumnID) bool { + if !colinfo.IsColIDSystemColumn(columnID) { + return false + } + switch colinfo.GetSystemColumnKindFromColumnID(columnID) { + case catpb.SystemColumnKind_MVCCTIMESTAMP, + catpb.SystemColumnKind_ORIGINID, + catpb.SystemColumnKind_ORIGINTIMESTAMP: + return true + case catpb.SystemColumnKind_TABLEOID: + return false + default: + panic(errors.AssertionFailedf("unexpected system column: %d", columnID)) + } +} + +// requiresMVCCDecoding returns true if at least one system column that requires +// MVCC decoding is fetched. +func (n *fetchPlanningInfo) requiresMVCCDecoding() bool { + for _, col := range n.catalogCols { + if columnIDRequiresMVCCDecoding(col.GetID()) { + return true + } + } + return false +} + +// fetchSpecRequiresMVCCDecoding returns true if at least one system column that +// requires MVCC decoding is fetched according to the spec. +func fetchSpecRequiresMVCCDecoding(fetchSpec fetchpb.IndexFetchSpec) bool { + for _, col := range fetchSpec.FetchedColumns { + if columnIDRequiresMVCCDecoding(col.ColumnID) { + return true + } + } + return false +} diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 43da48dd8619..29ba132bb924 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -580,11 +580,7 @@ func (sc *SchemaChanger) backfillQueryIntoTable( } } } - planDistribution, _ := getPlanDistribution( - ctx, localPlanner.Descriptors().HasUncommittedTypes(), - localPlanner.extendedEvalCtx.SessionData(), - localPlanner.curPlan.main, &localPlanner.distSQLVisitor, - ) + planDistribution, _ := localPlanner.getPlanDistribution(ctx, localPlanner.curPlan.main) isLocal := !planDistribution.WillDistribute() out := execinfrapb.ProcessorCoreUnion{BulkRowWriter: &execinfrapb.BulkRowWriterSpec{ Table: *table.TableDesc(),