From f871275bdfe09e20533d2137760ee9732a0cedd7 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 25 Dec 2025 12:25:25 +0800 Subject: [PATCH 01/23] * Signed-off-by: Weizhen Wang --- pkg/executor/analyze_col_v2.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index a0734e5b5399e..1c631e0b27c6e 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -235,7 +235,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( // Start workers to merge the result from collectors. mergeResultCh := make(chan *samplingMergeResult, 1) mergeTaskCh := make(chan []byte, 1) - var taskEg errgroup.Group + taskEg, taskCtx := errgroup.WithContext(context.Background()) // Start read data from resultHandler and send them to mergeTaskCh. taskEg.Go(func() (err error) { defer func() { @@ -243,7 +243,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( err = getAnalyzePanicErr(r) } }() - return readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker) + return readDataAndSendTask(taskCtx,e.ctx, e.resultHandler, mergeTaskCh, e.memTracker) }) e.samplingMergeWg = &util.WaitGroupWrapper{} e.samplingMergeWg.Add(samplingStatsConcurrency) @@ -255,7 +255,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( } // Merge the result from collectors. mergeWorkerPanicCnt := 0 - mergeEg, mergeCtx := errgroup.WithContext(context.Background()) + mergeEg, mergeCtx := errgroup.WithContext(taskCtx) mergeEg.Go(func() (err error) { defer func() { if r := recover(); r != nil { @@ -863,24 +863,24 @@ type samplingBuildTask struct { slicePos int } -func readDataAndSendTask(ctx sessionctx.Context, handler *tableResultHandler, mergeTaskCh chan []byte, memTracker *memory.Tracker) error { +func readDataAndSendTask(ctx context.Context, sctx sessionctx.Context, handler *tableResultHandler, mergeTaskCh chan []byte, memTracker *memory.Tracker) error { // After all tasks are sent, close the mergeTaskCh to notify the mergeWorker that all tasks have been sent. defer close(mergeTaskCh) for { failpoint.Inject("mockKillRunningV2AnalyzeJob", func() { - dom := domain.GetDomain(ctx) + dom := domain.GetDomain(sctx) for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() { dom.SysProcTracker().KillSysProcess(id) } }) - if err := ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil { + if err := sctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil { return err } failpoint.Inject("mockSlowAnalyzeV2", func() { time.Sleep(1000 * time.Second) }) - data, err := handler.nextRaw(context.TODO()) + data, err := handler.nextRaw(ctx) if err != nil { return errors.Trace(err) } From 4812821b55f4859e74ff6cb17b4133fb183ce098 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 25 Dec 2025 12:42:25 +0800 Subject: [PATCH 02/23] * Signed-off-by: Weizhen Wang --- pkg/executor/analyze_col_v2.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 1c631e0b27c6e..8103f8f573858 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -243,7 +243,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( err = getAnalyzePanicErr(r) } }() - return readDataAndSendTask(taskCtx,e.ctx, e.resultHandler, mergeTaskCh, e.memTracker) + return readDataAndSendTask(taskCtx, e.ctx, e.resultHandler, mergeTaskCh, e.memTracker) }) e.samplingMergeWg = &util.WaitGroupWrapper{} e.samplingMergeWg.Add(samplingStatsConcurrency) From 64f33053ff25fba24954c7145b4abee5e5feeca1 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 31 Dec 2025 19:28:45 +0800 Subject: [PATCH 03/23] *: use context Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 25 +++++++--- pkg/executor/analyze_col.go | 2 +- pkg/executor/analyze_col_v2.go | 85 ++++++++++++++++++++------------- pkg/executor/analyze_idx.go | 28 ++++++----- pkg/util/sqlkiller/sqlkiller.go | 35 +++++++++++++- 5 files changed, 120 insertions(+), 55 deletions(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 3858b7f97f953..8bcf593d1c9f0 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -101,6 +101,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) { statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() infoSchema := sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema() sessionVars := e.Ctx().GetSessionVars() + ctx = e.Ctx().GetSessionVars().SQLKiller.GetKillEventCtx(ctx) // Filter the locked tables. tasks, needAnalyzeTableCnt, skippedTables, err := filterAndCollectTasks(e.tasks, statsHandle, infoSchema) @@ -503,7 +504,8 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( return err } -func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) { +// ctx must be from SQLKiller.GetKillEventCtx +func (e *AnalyzeExec) analyzeWorker(ctx context.Context, taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) { var task *analyzeTask statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() defer func() { @@ -524,24 +526,33 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- }() for { var ok bool - task, ok = <-taskCh - if !ok { - break + select { + case task, ok = <-taskCh: + if !ok { + return + } } failpoint.Inject("handleAnalyzeWorkerPanic", nil) - statsHandle.StartAnalyzeJob(task.job) switch task.taskType { case colTask: select { + case <-ctx.Done(): + return case <-e.errExitCh: return - case resultsCh <- analyzeColumnsPushDownEntry(e.gp, task.colExec): + default: + statsHandle.StartAnalyzeJob(task.job) + resultsCh <- analyzeColumnsPushDownEntry(ctx, e.gp, task.colExec) } case idxTask: select { + case <-ctx.Done(): + return case <-e.errExitCh: return - case resultsCh <- analyzeIndexPushdown(task.idxExec): + default: + statsHandle.StartAnalyzeJob(task.job) + resultsCh <- analyzeIndexPushdown(ctx, task.idxExec) } } } diff --git a/pkg/executor/analyze_col.go b/pkg/executor/analyze_col.go index 2ae1ad2764bf6..f79e849a03dc7 100644 --- a/pkg/executor/analyze_col.go +++ b/pkg/executor/analyze_col.go @@ -64,7 +64,7 @@ type AnalyzeColumnsExec struct { memTracker *memory.Tracker } -func analyzeColumnsPushDownEntry(gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults { +func analyzeColumnsPushDownEntry(ctx context.Context,gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults { if e.AnalyzeInfo.StatsVersion >= statistics.Version2 { return e.toV2().analyzeColumnsPushDownV2(gp) } diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 8103f8f573858..bb78d28dd275e 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -54,7 +54,7 @@ type AnalyzeColumnsExecV2 struct { *AnalyzeColumnsExec } -func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(gp *gp.Pool) *statistics.AnalyzeResults { +func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(ctx context.Context, gp *gp.Pool) *statistics.AnalyzeResults { var ranges []*ranger.Range if hc := e.handleCols; hc != nil { if hc.IsInt() { @@ -94,8 +94,8 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(gp *gp.Pool) *statistics return &statistics.AnalyzeResults{Err: err, Job: e.job} } idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1) - e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, samplingStatsConcurrency) - count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(gp, ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency) + e.handleNDVForSpecialIndexes(ctx, specialIndexes, idxNDVPushDownCh, samplingStatsConcurrency) + count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ctx, gp, ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency) if err != nil { e.memTracker.Release(e.memTracker.BytesConsumed()) return &statistics.AnalyzeResults{Err: err, Job: e.job} @@ -200,6 +200,7 @@ func printAnalyzeMergeCollectorLog(oldRootCount, newRootCount, subCount, tableID } func (e *AnalyzeColumnsExecV2) buildSamplingStats( + ctx context.Context, gp *gp.Pool, ranges []*ranger.Range, needExtStats bool, @@ -235,7 +236,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( // Start workers to merge the result from collectors. mergeResultCh := make(chan *samplingMergeResult, 1) mergeTaskCh := make(chan []byte, 1) - taskEg, taskCtx := errgroup.WithContext(context.Background()) + taskEg, taskCtx := errgroup.WithContext(ctx) // Start read data from resultHandler and send them to mergeTaskCh. taskEg.Go(func() (err error) { defer func() { @@ -250,7 +251,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( for i := range samplingStatsConcurrency { id := i gp.Go(func() { - e.subMergeWorker(mergeResultCh, mergeTaskCh, l, id) + e.subMergeWorker(ctx, mergeResultCh, mergeTaskCh, l, id) }) } // Merge the result from collectors. @@ -356,7 +357,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( // Start workers to build stats. for range samplingStatsConcurrency { e.samplingBuilderWg.Run(func() { - e.subBuildWorker(buildResultChan, buildTaskChan, hists, topns, sampleCollectors, exitCh) + e.subBuildWorker(ctx, buildResultChan, buildTaskChan, hists, topns, sampleCollectors, exitCh) }) } // Generate tasks for building stats. @@ -435,7 +436,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( } // handleNDVForSpecialIndexes deals with the logic to analyze the index containing the virtual column when the mode is full sampling. -func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, samplingStatsConcurrency int) { +func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(ctx context.Context, indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, samplingStatsConcurrency int) { defer func() { if r := recover(); r != nil { logutil.BgLogger().Warn("analyze ndv for special index panicked", zap.Any("recover", r), zap.Stack("stack")) @@ -457,7 +458,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In var subIndexWorkerWg = NewAnalyzeResultsNotifyWaitGroupWrapper(resultsCh) subIndexWorkerWg.Add(samplingStatsConcurrency) for range samplingStatsConcurrency { - subIndexWorkerWg.Run(func() { e.subIndexWorkerForNDV(taskCh, resultsCh) }) + subIndexWorkerWg.Run(func() { e.subIndexWorkerForNDV(ctx, taskCh, resultsCh) }) } for _, task := range tasks { taskCh <- task @@ -469,21 +470,27 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In } var err error statsHandle := domain.GetDomain(e.ctx).StatsHandle() +LOOP: for panicCnt < samplingStatsConcurrency { - results, ok := <-resultsCh - if !ok { - break - } - if results.Err != nil { - err = results.Err - statsHandle.FinishAnalyzeJob(results.Job, err, statistics.TableAnalysisJob) - if isAnalyzeWorkerPanic(err) { - panicCnt++ + select { + case results, ok := <-resultsCh: + if !ok { + break LOOP } - continue + if results.Err != nil { + err = results.Err + statsHandle.FinishAnalyzeJob(results.Job, err, statistics.TableAnalysisJob) + if isAnalyzeWorkerPanic(err) { + panicCnt++ + } + continue LOOP + } + statsHandle.FinishAnalyzeJob(results.Job, nil, statistics.TableAnalysisJob) + totalResult.results[results.Ars[0].Hist[0].ID] = results + case <-ctx.Done(): + err = ctx.Err() + break LOOP } - statsHandle.FinishAnalyzeJob(results.Job, nil, statistics.TableAnalysisJob) - totalResult.results[results.Ars[0].Hist[0].ID] = results } if err != nil { totalResult.err = err @@ -492,7 +499,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In } // subIndexWorker receive the task for each index and return the result for them. -func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, resultsCh chan *statistics.AnalyzeResults) { +func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(ctx context.Context, taskCh chan *analyzeTask, resultsCh chan *statistics.AnalyzeResults) { var task *analyzeTask statsHandle := domain.GetDomain(e.ctx).StatsHandle() defer func() { @@ -507,10 +514,15 @@ func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, re }() for { var ok bool - task, ok = <-taskCh - if !ok { - break + select { + case task, ok = <-taskCh: + if !ok { + return + } + case <-ctx.Done(): + return } + statsHandle.StartAnalyzeJob(task.job) if task.taskType != idxTask { resultsCh <- &statistics.AnalyzeResults{ @@ -520,7 +532,7 @@ func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, re continue } task.idxExec.job = task.job - resultsCh <- analyzeIndexNDVPushDown(task.idxExec) + resultsCh <- analyzeIndexNDVPushDown(ctx, task.idxExec) } } @@ -589,7 +601,7 @@ func (e *AnalyzeColumnsExecV2) buildSubIndexJobForSpecialIndex(indexInfos []*mod return tasks } -func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResult, taskCh <-chan []byte, l int, index int) { +func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, resultCh chan<- *samplingMergeResult, taskCh <-chan []byte, l int, index int) { // Only close the resultCh in the first worker. closeTheResultCh := index == 0 defer func() { @@ -626,12 +638,11 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize)) } statsHandle := domain.GetDomain(e.ctx).StatsHandle() - for { - data, ok := <-taskCh + select { + case data, ok := <-taskCh: if !ok { break } - // Unmarshal the data. dataSize := int64(cap(data)) colResp := &tipb.AnalyzeColumnsResp{} @@ -664,15 +675,23 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu e.memTracker.Consume(newRetCollectorSize - oldRetCollectorSize - subCollectorSize) e.memTracker.Release(dataSize + colRespSize) subCollector.DestroyAndPutToPool() + case <-ctx.Done(): + err := ctx.Err() + if err != nil { + resultCh <- &samplingMergeResult{err: err} + return + } + if intest.InTest { + panic("this ctx should be canncelled with the error") + } } - resultCh <- &samplingMergeResult{collector: retCollector} } -func (e *AnalyzeColumnsExecV2) subBuildWorker(resultCh chan error, taskCh chan *samplingBuildTask, hists []*statistics.Histogram, topns []*statistics.TopN, collectors []*statistics.SampleCollector, exitCh chan struct{}) { +func (e *AnalyzeColumnsExecV2) subBuildWorker(ctx context.Context, resultCh chan error, taskCh chan *samplingBuildTask, hists []*statistics.Histogram, topns []*statistics.TopN, collectors []*statistics.SampleCollector, exitCh chan struct{}) { defer func() { if r := recover(); r != nil { - logutil.BgLogger().Warn("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack")) + logutil.BgLogger().Warn("analyze subBuildWorker panicked", zap.Any("recover", r), zap.Stack("stack")) metrics.PanicCounter.WithLabelValues(metrics.LabelAnalyze).Inc() resultCh <- getAnalyzePanicErr(r) } @@ -841,6 +860,8 @@ workLoop: releaseCollectorMemory() case <-exitCh: return + case <-ctx.Done(): + return } } } diff --git a/pkg/executor/analyze_idx.go b/pkg/executor/analyze_idx.go index 45834a73bb6d7..7f8a46a01da6d 100644 --- a/pkg/executor/analyze_idx.go +++ b/pkg/executor/analyze_idx.go @@ -47,7 +47,7 @@ type AnalyzeIndexExec struct { countNullRes distsql.SelectResult } -func analyzeIndexPushdown(idxExec *AnalyzeIndexExec) *statistics.AnalyzeResults { +func analyzeIndexPushdown(ctx context.Context, idxExec *AnalyzeIndexExec) *statistics.AnalyzeResults { ranges := ranger.FullRange() // For single-column index, we do not load null rows from TiKV, so the built histogram would not include // null values, and its `NullCount` would be set by result of another distsql call to get null rows. @@ -57,7 +57,7 @@ func analyzeIndexPushdown(idxExec *AnalyzeIndexExec) *statistics.AnalyzeResults if len(idxExec.idxInfo.Columns) == 1 { ranges = ranger.FullNotNullRange() } - hist, cms, fms, topN, err := idxExec.buildStats(ranges, true) + hist, cms, fms, topN, err := idxExec.buildStats(ctx, ranges, true) if err != nil { return &statistics.AnalyzeResults{Err: err, Job: idxExec.job} } @@ -95,7 +95,7 @@ func analyzeIndexPushdown(idxExec *AnalyzeIndexExec) *statistics.AnalyzeResults return result } -func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool) (hist *statistics.Histogram, cms *statistics.CMSketch, fms *statistics.FMSketch, topN *statistics.TopN, err error) { +func (e *AnalyzeIndexExec) buildStats(ctx context.Context, ranges []*ranger.Range, considerNull bool) (hist *statistics.Histogram, cms *statistics.CMSketch, fms *statistics.FMSketch, topN *statistics.TopN, err error) { if err = e.open(ranges, considerNull); err != nil { return nil, nil, nil, nil, err } @@ -105,12 +105,12 @@ func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool) err = err1 } }() - hist, cms, fms, topN, err = e.buildStatsFromResult(e.result, true) + hist, cms, fms, topN, err = e.buildStatsFromResult(ctx, e.result, true) if err != nil { return nil, nil, nil, nil, err } if e.countNullRes != nil { - nullHist, _, _, _, err := e.buildStatsFromResult(e.countNullRes, false) + nullHist, _, _, _, err := e.buildStatsFromResult(ctx, e.countNullRes, false) if err != nil { return nil, nil, nil, nil, err } @@ -179,7 +179,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang return nil } -func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, needCMS bool) (*statistics.Histogram, *statistics.CMSketch, *statistics.FMSketch, *statistics.TopN, error) { +func (e *AnalyzeIndexExec) buildStatsFromResult(killerCtx context.Context, result distsql.SelectResult, needCMS bool) (*statistics.Histogram, *statistics.CMSketch, *statistics.FMSketch, *statistics.TopN, error) { failpoint.Inject("buildStatsFromResult", func(val failpoint.Value) { if val.(bool) { failpoint.Return(nil, nil, nil, nil, errors.New("mock buildStatsFromResult error")) @@ -204,8 +204,10 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee dom.SysProcTracker().KillSysProcess(id) } }) - if err := e.ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil { - return nil, nil, nil, nil, err + select { + case <-killerCtx.Done(): + return nil, nil, nil, nil, killerCtx.Err() + default: } failpoint.Inject("mockSlowAnalyzeIndex", func() { time.Sleep(1000 * time.Second) @@ -240,7 +242,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee return hist, cms, fms, topn, nil } -func (e *AnalyzeIndexExec) buildSimpleStats(ranges []*ranger.Range, considerNull bool) (fms *statistics.FMSketch, nullHist *statistics.Histogram, err error) { +func (e *AnalyzeIndexExec) buildSimpleStats(killerCtx context.Context, ranges []*ranger.Range, considerNull bool) (fms *statistics.FMSketch, nullHist *statistics.Histogram, err error) { if err = e.open(ranges, considerNull); err != nil { return nil, nil, err } @@ -250,9 +252,9 @@ func (e *AnalyzeIndexExec) buildSimpleStats(ranges []*ranger.Range, considerNull err = err1 } }() - _, _, fms, _, err = e.buildStatsFromResult(e.result, false) + _, _, fms, _, err = e.buildStatsFromResult(killerCtx, e.result, false) if e.countNullRes != nil { - nullHist, _, _, _, err := e.buildStatsFromResult(e.countNullRes, false) + nullHist, _, _, _, err := e.buildStatsFromResult(killerCtx, e.countNullRes, false) if err != nil { return nil, nil, err } @@ -263,7 +265,7 @@ func (e *AnalyzeIndexExec) buildSimpleStats(ranges []*ranger.Range, considerNull return fms, nil, nil } -func analyzeIndexNDVPushDown(idxExec *AnalyzeIndexExec) *statistics.AnalyzeResults { +func analyzeIndexNDVPushDown(killerCtx context.Context, idxExec *AnalyzeIndexExec) *statistics.AnalyzeResults { ranges := ranger.FullRange() // For single-column index, we do not load null rows from TiKV, so the built histogram would not include // null values, and its `NullCount` would be set by result of another distsql call to get null rows. @@ -273,7 +275,7 @@ func analyzeIndexNDVPushDown(idxExec *AnalyzeIndexExec) *statistics.AnalyzeResul if len(idxExec.idxInfo.Columns) == 1 { ranges = ranger.FullNotNullRange() } - fms, nullHist, err := idxExec.buildSimpleStats(ranges, len(idxExec.idxInfo.Columns) == 1) + fms, nullHist, err := idxExec.buildSimpleStats(killerCtx, ranges, len(idxExec.idxInfo.Columns) == 1) if err != nil { return &statistics.AnalyzeResults{Err: err, Job: idxExec.job} } diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index 363a07269708f..a1cf6c5e8bb1e 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -15,11 +15,13 @@ package sqlkiller import ( + "context" "math/rand" "sync" "sync/atomic" "time" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/intest" @@ -42,12 +44,16 @@ const ( // so that errors in client can be correctly converted to tidb errors. ) +var killError = errors.New("it has been killed by the sql killer") + // SQLKiller is used to kill a query. type SQLKiller struct { Finish func() killEvent struct { - ch chan struct{} - desc string + ch chan struct{} + cancelFn context.CancelCauseFunc + ctx context.Context + desc string sync.Mutex triggered bool } @@ -82,6 +88,25 @@ func (killer *SQLKiller) GetKillEventChan() <-chan struct{} { return killer.killEvent.ch } +func (killer *SQLKiller) GetKillEventCtx(parent context.Context) context.Context { + killer.killEvent.Lock() + defer killer.killEvent.Unlock() + + if killer.killEvent.ctx != nil { + return killer.killEvent.ctx + } + if parent == nil { + killer.killEvent.ctx, killer.killEvent.cancelFn = context.WithCancelCause(context.Background()) + } else { + killer.killEvent.ctx, killer.killEvent.cancelFn = context.WithCancelCause(parent) + } + if killer.killEvent.triggered { + killer.killEvent.cancelFn(killError) + } + + return killer.killEvent.ctx +} + func (killer *SQLKiller) triggerKillEvent() { killer.killEvent.Lock() defer killer.killEvent.Unlock() @@ -93,6 +118,9 @@ func (killer *SQLKiller) triggerKillEvent() { if killer.killEvent.ch != nil { close(killer.killEvent.ch) } + if killer.killEvent.ctx != nil && killer.killEvent.cancelFn != nil { + killer.killEvent.cancelFn(killError) + } killer.killEvent.triggered = true } @@ -103,6 +131,9 @@ func (killer *SQLKiller) resetKillEvent() { if !killer.killEvent.triggered && killer.killEvent.ch != nil { close(killer.killEvent.ch) } + if !killer.killEvent.triggered && killer.killEvent.ctx != nil && killer.killEvent.cancelFn != nil { + killer.killEvent.cancelFn(errors.New("sql killer: killed by reseting sql killer")) + } killer.killEvent.ch = nil killer.killEvent.triggered = false killer.killEvent.desc = "" From 64e72d85dde41473bb1818213cc1dd2c0d5bb708 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 31 Dec 2025 19:42:26 +0800 Subject: [PATCH 04/23] * Signed-off-by: Weizhen Wang --- pkg/util/sqlkiller/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/sqlkiller/BUILD.bazel b/pkg/util/sqlkiller/BUILD.bazel index 5a3a76d5a558e..965f8152243fd 100644 --- a/pkg/util/sqlkiller/BUILD.bazel +++ b/pkg/util/sqlkiller/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/util/dbterror/exeerrors", "//pkg/util/intest", "//pkg/util/logutil", + "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@org_uber_go_zap//:zap", ], From d6863449c8f599bf653d5bdce0b36e293d87c6cd Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 31 Dec 2025 22:40:37 +0800 Subject: [PATCH 05/23] * Signed-off-by: Weizhen Wang --- pkg/util/sqlkiller/sqlkiller.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index a1cf6c5e8bb1e..4f2286827e7c6 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -44,7 +44,7 @@ const ( // so that errors in client can be correctly converted to tidb errors. ) -var killError = errors.New("it has been killed by the sql killer") +var errKilled = errors.New("it has been killed by the sql killer") // SQLKiller is used to kill a query. type SQLKiller struct { @@ -88,6 +88,7 @@ func (killer *SQLKiller) GetKillEventChan() <-chan struct{} { return killer.killEvent.ch } +// GetKillEventCtx returns a context which will be canceled when the kill signal is sent. func (killer *SQLKiller) GetKillEventCtx(parent context.Context) context.Context { killer.killEvent.Lock() defer killer.killEvent.Unlock() @@ -101,7 +102,7 @@ func (killer *SQLKiller) GetKillEventCtx(parent context.Context) context.Context killer.killEvent.ctx, killer.killEvent.cancelFn = context.WithCancelCause(parent) } if killer.killEvent.triggered { - killer.killEvent.cancelFn(killError) + killer.killEvent.cancelFn(errKilled) } return killer.killEvent.ctx @@ -119,7 +120,7 @@ func (killer *SQLKiller) triggerKillEvent() { close(killer.killEvent.ch) } if killer.killEvent.ctx != nil && killer.killEvent.cancelFn != nil { - killer.killEvent.cancelFn(killError) + killer.killEvent.cancelFn(errKilled) } killer.killEvent.triggered = true } From 30fd1df6da911202caf5b1df1092e3f19a6912c1 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 16:48:31 +0800 Subject: [PATCH 06/23] update Signed-off-by: Weizhen Wang --- pkg/distsql/distsql.go | 6 ++++ pkg/executor/analyze.go | 2 +- pkg/executor/analyze_col.go | 25 +++++++------ pkg/executor/analyze_col_v2.go | 2 +- pkg/executor/test/analyzetest/analyze_test.go | 36 +++++++++++++++++++ 5 files changed, 56 insertions(+), 15 deletions(-) diff --git a/pkg/distsql/distsql.go b/pkg/distsql/distsql.go index 0701ad12b9cba..11bb4c17d588d 100644 --- a/pkg/distsql/distsql.go +++ b/pkg/distsql/distsql.go @@ -177,6 +177,12 @@ func SelectWithRuntimeStats(ctx context.Context, dctx *distsqlctx.DistSQLContext func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars any, isRestrict bool, dctx *distsqlctx.DistSQLContext) (SelectResult, error) { ctx = WithSQLKvExecCounterInterceptor(ctx, dctx.KvExecCounter) + failpoint.Inject("mockAnalyzeRequestWaitForCancel", func(val failpoint.Value) { + if val.(bool) { + <-ctx.Done() + failpoint.Return(nil, ctx.Err()) + } + }) kvReq.RequestSource.RequestSourceInternal = true kvReq.RequestSource.RequestSourceType = kv.InternalTxnStats resp := client.Send(ctx, kvReq, vars, &kv.ClientSendOption{}) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 8bcf593d1c9f0..e83d5d955dde7 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -133,7 +133,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) { taskCh := make(chan *analyzeTask, buildStatsConcurrency) resultsCh := make(chan *statistics.AnalyzeResults, 1) for range buildStatsConcurrency { - e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) }) + e.wg.Run(func() { e.analyzeWorker(ctx, taskCh, resultsCh) }) } pruneMode := variable.PartitionPruneMode(sessionVars.PartitionPruneMode.Load()) // needGlobalStats used to indicate whether we should merge the partition-level stats to global-level stats. diff --git a/pkg/executor/analyze_col.go b/pkg/executor/analyze_col.go index f79e849a03dc7..d31f36e2bc0c4 100644 --- a/pkg/executor/analyze_col.go +++ b/pkg/executor/analyze_col.go @@ -64,11 +64,11 @@ type AnalyzeColumnsExec struct { memTracker *memory.Tracker } -func analyzeColumnsPushDownEntry(ctx context.Context,gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults { +func analyzeColumnsPushDownEntry(ctx context.Context, gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults { if e.AnalyzeInfo.StatsVersion >= statistics.Version2 { - return e.toV2().analyzeColumnsPushDownV2(gp) + return e.toV2().analyzeColumnsPushDownV2(ctx, gp) } - return e.toV1().analyzeColumnsPushDownV1() + return e.toV1().analyzeColumnsPushDownV1(ctx) } func (e *AnalyzeColumnsExec) toV1() *AnalyzeColumnsExecV1 { @@ -83,12 +83,12 @@ func (e *AnalyzeColumnsExec) toV2() *AnalyzeColumnsExecV2 { } } -func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error { +func (e *AnalyzeColumnsExec) open(ctx context.Context, ranges []*ranger.Range) error { e.memTracker = memory.NewTracker(int(e.ctx.GetSessionVars().PlanID.Load()), -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.resultHandler = &tableResultHandler{} firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(ranges, true, false, !hasPkHist(e.handleCols)) - firstResult, err := e.buildResp(firstPartRanges) + firstResult, err := e.buildResp(ctx, firstPartRanges) if err != nil { return err } @@ -97,7 +97,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error { return nil } var secondResult distsql.SelectResult - secondResult, err = e.buildResp(secondPartRanges) + secondResult, err = e.buildResp(ctx, secondPartRanges) if err != nil { return err } @@ -106,7 +106,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error { return nil } -func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) { +func (e *AnalyzeColumnsExec) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetDistSQLCtx(), []int64{e.TableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges) builder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger()) @@ -130,7 +130,6 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe if err != nil { return nil, err } - ctx := context.TODO() result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetDistSQLCtx()) if err != nil { return nil, err @@ -138,8 +137,8 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe return result, nil } -func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats bool) (hists []*statistics.Histogram, cms []*statistics.CMSketch, topNs []*statistics.TopN, fms []*statistics.FMSketch, extStats *statistics.ExtendedStatsColl, err error) { - if err = e.open(ranges); err != nil { +func (e *AnalyzeColumnsExec) buildStats(ctx context.Context, ranges []*ranger.Range, needExtStats bool) (hists []*statistics.Histogram, cms []*statistics.CMSketch, topNs []*statistics.TopN, fms []*statistics.FMSketch, extStats *statistics.ExtendedStatsColl, err error) { + if err = e.open(ctx, ranges); err != nil { return nil, nil, nil, nil, nil, err } defer func() { @@ -188,7 +187,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo failpoint.Inject("mockSlowAnalyzeV1", func() { time.Sleep(1000 * time.Second) }) - data, err1 := e.resultHandler.nextRaw(context.TODO()) + data, err1 := e.resultHandler.nextRaw(ctx) if err1 != nil { return nil, nil, nil, nil, nil, err1 } @@ -310,7 +309,7 @@ type AnalyzeColumnsExecV1 struct { *AnalyzeColumnsExec } -func (e *AnalyzeColumnsExecV1) analyzeColumnsPushDownV1() *statistics.AnalyzeResults { +func (e *AnalyzeColumnsExecV1) analyzeColumnsPushDownV1(ctx context.Context) *statistics.AnalyzeResults { var ranges []*ranger.Range if hc := e.handleCols; hc != nil { if hc.IsInt() { @@ -322,7 +321,7 @@ func (e *AnalyzeColumnsExecV1) analyzeColumnsPushDownV1() *statistics.AnalyzeRes ranges = ranger.FullIntRange(false) } collExtStats := e.ctx.GetSessionVars().EnableExtendedStats - hists, cms, topNs, fms, extStats, err := e.buildStats(ranges, collExtStats) + hists, cms, topNs, fms, extStats, err := e.buildStats(ctx, ranges, collExtStats) if err != nil { return &statistics.AnalyzeResults{Err: err, Job: e.job} } diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index bb78d28dd275e..a19c2c99b26ba 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -216,7 +216,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( err error, ) { // Open memory tracker and resultHandler. - if err = e.open(ranges); err != nil { + if err = e.open(ctx, ranges); err != nil { return 0, nil, nil, nil, nil, err } defer func() { diff --git a/pkg/executor/test/analyzetest/analyze_test.go b/pkg/executor/test/analyzetest/analyze_test.go index 4dbee17dc9303..bf5d86582628a 100644 --- a/pkg/executor/test/analyzetest/analyze_test.go +++ b/pkg/executor/test/analyzetest/analyze_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + goerrors "errors" "strconv" "strings" "testing" @@ -143,6 +144,41 @@ func TestAnalyzeRestrict(t *testing.T) { require.Nil(t, rs) } +func TestAnalyzeCancelOnCtx(t *testing.T) { + if kerneltype.IsNextGen() { + t.Skip("analyze V1 cannot support in the next gen") + } + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values (1), (2)") + tk.MustExec("set @@tidb_analyze_version = 1") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/mockAnalyzeRequestWaitForCancel", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/mockAnalyzeRequestWaitForCancel")) + }() + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + _, err := tk.Session().ExecuteInternal(ctx, "analyze table t") + done <- err + }() + cancel() + + select { + case err := <-done: + require.Error(t, err) + require.True(t, goerrors.Is(err, context.Canceled)) + case <-time.After(5 * time.Second): + t.Fatal("analyze does not stop after context canceled") + } +} + func TestAnalyzeParameters(t *testing.T) { if kerneltype.IsNextGen() { t.Skip("analyze V1 cannot support in the next gen") From 2dc0b325baf762c7b87983c3429311def408f8a4 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 16:55:03 +0800 Subject: [PATCH 07/23] update Signed-off-by: Weizhen Wang --- pkg/executor/test/analyzetest/analyze_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/executor/test/analyzetest/analyze_test.go b/pkg/executor/test/analyzetest/analyze_test.go index bf5d86582628a..d422d85c36b17 100644 --- a/pkg/executor/test/analyzetest/analyze_test.go +++ b/pkg/executor/test/analyzetest/analyze_test.go @@ -145,9 +145,6 @@ func TestAnalyzeRestrict(t *testing.T) { } func TestAnalyzeCancelOnCtx(t *testing.T) { - if kerneltype.IsNextGen() { - t.Skip("analyze V1 cannot support in the next gen") - } store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -155,7 +152,7 @@ func TestAnalyzeCancelOnCtx(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int)") tk.MustExec("insert into t values (1), (2)") - tk.MustExec("set @@tidb_analyze_version = 1") + tk.MustExec("set @@tidb_analyze_version = 2") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/mockAnalyzeRequestWaitForCancel", "return(true)")) defer func() { From 011aa3152287a47df9d6feb0dcec9302cc233b8c Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 17:42:40 +0800 Subject: [PATCH 08/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 74 +++++++++++++++++++++++---------- pkg/executor/analyze_col_v2.go | 9 +++- pkg/executor/analyze_idx.go | 21 ++++++---- pkg/util/sqlkiller/sqlkiller.go | 4 +- 4 files changed, 73 insertions(+), 35 deletions(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index e83d5d955dde7..611f6207fa47c 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/util/exeerrors" "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle" @@ -504,6 +505,43 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( return err } +func analyzeWorkerExitErr(ctx context.Context, errExitCh <-chan struct{}) error { + select { + case <-ctx.Done(): + if err := context.Cause(ctx); err != nil { + return err + } + if err := ctx.Err(); err != nil { + return err + } + return exeerrors.ErrQueryInterrupted + case <-errExitCh: + return exeerrors.ErrQueryInterrupted + default: + return nil + } +} + +func (e *AnalyzeExec) sendAnalyzeResult(ctx context.Context, statsHandle *handle.Handle, resultsCh chan<- *statistics.AnalyzeResults, result *statistics.AnalyzeResults) { + select { + case resultsCh <- result: + return + case <-ctx.Done(): + case <-e.errExitCh: + } + err := result.Err + if err == nil { + err = context.Cause(ctx) + } + if err == nil { + err = ctx.Err() + } + if err == nil { + err = exeerrors.ErrQueryInterrupted + } + finishJobWithLog(statsHandle, result.Job, err) +} + // ctx must be from SQLKiller.GetKillEventCtx func (e *AnalyzeExec) analyzeWorker(ctx context.Context, taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) { var task *analyzeTask @@ -526,34 +564,24 @@ func (e *AnalyzeExec) analyzeWorker(ctx context.Context, taskCh <-chan *analyzeT }() for { var ok bool - select { - case task, ok = <-taskCh: - if !ok { - return - } + task, ok = <-taskCh + if !ok { + return + } + if err := analyzeWorkerExitErr(ctx, e.errExitCh); err != nil { + finishJobWithLog(statsHandle, task.job, err) + return } failpoint.Inject("handleAnalyzeWorkerPanic", nil) switch task.taskType { case colTask: - select { - case <-ctx.Done(): - return - case <-e.errExitCh: - return - default: - statsHandle.StartAnalyzeJob(task.job) - resultsCh <- analyzeColumnsPushDownEntry(ctx, e.gp, task.colExec) - } + statsHandle.StartAnalyzeJob(task.job) + result := analyzeColumnsPushDownEntry(ctx, e.gp, task.colExec) + e.sendAnalyzeResult(ctx, statsHandle, resultsCh, result) case idxTask: - select { - case <-ctx.Done(): - return - case <-e.errExitCh: - return - default: - statsHandle.StartAnalyzeJob(task.job) - resultsCh <- analyzeIndexPushdown(ctx, task.idxExec) - } + statsHandle.StartAnalyzeJob(task.job) + result := analyzeIndexPushdown(ctx, task.idxExec) + e.sendAnalyzeResult(ctx, statsHandle, resultsCh, result) } } } diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index a19c2c99b26ba..54683a9c868e9 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -676,13 +676,18 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, resultCh chan e.memTracker.Release(dataSize + colRespSize) subCollector.DestroyAndPutToPool() case <-ctx.Done(): - err := ctx.Err() + err := context.Cause(ctx) + if err != nil { + resultCh <- &samplingMergeResult{err: err} + return + } + err = ctx.Err() if err != nil { resultCh <- &samplingMergeResult{err: err} return } if intest.InTest { - panic("this ctx should be canncelled with the error") + panic("this ctx should be canceled with the error") } } resultCh <- &samplingMergeResult{collector: retCollector} diff --git a/pkg/executor/analyze_idx.go b/pkg/executor/analyze_idx.go index 7f8a46a01da6d..f4c217b123ec1 100644 --- a/pkg/executor/analyze_idx.go +++ b/pkg/executor/analyze_idx.go @@ -96,7 +96,7 @@ func analyzeIndexPushdown(ctx context.Context, idxExec *AnalyzeIndexExec) *stati } func (e *AnalyzeIndexExec) buildStats(ctx context.Context, ranges []*ranger.Range, considerNull bool) (hist *statistics.Histogram, cms *statistics.CMSketch, fms *statistics.FMSketch, topN *statistics.TopN, err error) { - if err = e.open(ranges, considerNull); err != nil { + if err = e.open(ctx, ranges, considerNull); err != nil { return nil, nil, nil, nil, err } defer func() { @@ -122,14 +122,14 @@ func (e *AnalyzeIndexExec) buildStats(ctx context.Context, ranges []*ranger.Rang return hist, cms, fms, topN, nil } -func (e *AnalyzeIndexExec) open(ranges []*ranger.Range, considerNull bool) error { - err := e.fetchAnalyzeResult(ranges, false) +func (e *AnalyzeIndexExec) open(ctx context.Context, ranges []*ranger.Range, considerNull bool) error { + err := e.fetchAnalyzeResult(ctx, ranges, false) if err != nil { return err } if considerNull && len(e.idxInfo.Columns) == 1 { ranges = ranger.NullRange() - err = e.fetchAnalyzeResult(ranges, true) + err = e.fetchAnalyzeResult(ctx, ranges, true) if err != nil { return err } @@ -140,7 +140,7 @@ func (e *AnalyzeIndexExec) open(ranges []*ranger.Range, considerNull bool) error // fetchAnalyzeResult builds and dispatches the `kv.Request` from given ranges, and stores the `SelectResult` // in corresponding fields based on the input `isNullRange` argument, which indicates if the range is the // special null range for single-column index to get the null count. -func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRange bool) error { +func (e *AnalyzeIndexExec) fetchAnalyzeResult(ctx context.Context, ranges []*ranger.Range, isNullRange bool) error { var builder distsql.RequestBuilder var kvReqBuilder *distsql.RequestBuilder if e.isCommonHandle && e.idxInfo.Primary { @@ -166,7 +166,6 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang if err != nil { return err } - ctx := context.TODO() result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetDistSQLCtx()) if err != nil { return err @@ -206,13 +205,17 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(killerCtx context.Context, resul }) select { case <-killerCtx.Done(): - return nil, nil, nil, nil, killerCtx.Err() + err := context.Cause(killerCtx) + if err == nil { + err = killerCtx.Err() + } + return nil, nil, nil, nil, err default: } failpoint.Inject("mockSlowAnalyzeIndex", func() { time.Sleep(1000 * time.Second) }) - data, err := result.NextRaw(context.TODO()) + data, err := result.NextRaw(killerCtx) if err != nil { return nil, nil, nil, nil, err } @@ -243,7 +246,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(killerCtx context.Context, resul } func (e *AnalyzeIndexExec) buildSimpleStats(killerCtx context.Context, ranges []*ranger.Range, considerNull bool) (fms *statistics.FMSketch, nullHist *statistics.Histogram, err error) { - if err = e.open(ranges, considerNull); err != nil { + if err = e.open(killerCtx, ranges, considerNull); err != nil { return nil, nil, err } defer func() { diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index 4f2286827e7c6..7e4d4e883cb67 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -133,9 +133,11 @@ func (killer *SQLKiller) resetKillEvent() { close(killer.killEvent.ch) } if !killer.killEvent.triggered && killer.killEvent.ctx != nil && killer.killEvent.cancelFn != nil { - killer.killEvent.cancelFn(errors.New("sql killer: killed by reseting sql killer")) + killer.killEvent.cancelFn(errors.New("sql killer: killed by resetting sql killer")) } killer.killEvent.ch = nil + killer.killEvent.ctx = nil + killer.killEvent.cancelFn = nil killer.killEvent.triggered = false killer.killEvent.desc = "" } From 4ccfa5461ffdd9daf9e5c440ff20f6db7acaea38 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 18:16:06 +0800 Subject: [PATCH 09/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 611f6207fa47c..6e219db4b178f 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/util/exeerrors" "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle" @@ -47,6 +46,7 @@ import ( handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/sqlescape" "github.com/pingcap/tipb/go-tipb" From 90b5fa5d33468bfb182fcf19ea44d384b6b90673 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 19:32:27 +0800 Subject: [PATCH 10/23] update Signed-off-by: Weizhen Wang --- pkg/executor/test/analyzetest/analyze_test.go | 60 +++++++++---------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/pkg/executor/test/analyzetest/analyze_test.go b/pkg/executor/test/analyzetest/analyze_test.go index d422d85c36b17..bdc058aaef5de 100644 --- a/pkg/executor/test/analyzetest/analyze_test.go +++ b/pkg/executor/test/analyzetest/analyze_test.go @@ -17,13 +17,15 @@ package analyzetest import ( "context" "encoding/json" - "fmt" goerrors "errors" + "fmt" "strconv" "strings" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config/kerneltype" @@ -47,7 +49,6 @@ import ( "github.com/pingcap/tidb/pkg/testkit/analyzehelper" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" - "github.com/stretchr/testify/require" ) func TestAnalyzePartition(t *testing.T) { @@ -142,38 +143,33 @@ func TestAnalyzeRestrict(t *testing.T) { rs, err := tk.Session().ExecuteInternal(ctx, "analyze table t") require.Nil(t, err) require.Nil(t, rs) -} - -func TestAnalyzeCancelOnCtx(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int)") - tk.MustExec("insert into t values (1), (2)") - tk.MustExec("set @@tidb_analyze_version = 2") + t.Run("cancel_on_ctx", func(t *testing.T) { + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values (1), (2)") + tk.MustExec("set @@tidb_analyze_version = 2") - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/mockAnalyzeRequestWaitForCancel", "return(true)")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/mockAnalyzeRequestWaitForCancel")) - }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/mockAnalyzeRequestWaitForCancel", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/mockAnalyzeRequestWaitForCancel")) + }() - ctx, cancel := context.WithCancel(context.Background()) - done := make(chan error, 1) - go func() { - _, err := tk.Session().ExecuteInternal(ctx, "analyze table t") - done <- err - }() - cancel() - - select { - case err := <-done: - require.Error(t, err) - require.True(t, goerrors.Is(err, context.Canceled)) - case <-time.After(5 * time.Second): - t.Fatal("analyze does not stop after context canceled") - } + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + _, err := tk.Session().ExecuteInternal(ctx, "analyze table t") + done <- err + }() + cancel() + + select { + case err := <-done: + require.Error(t, err) + require.True(t, goerrors.Is(err, context.Canceled)) + case <-time.After(5 * time.Second): + t.Fatal("analyze does not stop after context canceled") + } + }) } func TestAnalyzeParameters(t *testing.T) { From 224081b5ff093ada0a55165374d62d6a8b29e3fe Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 21:08:10 +0800 Subject: [PATCH 11/23] update Signed-off-by: Weizhen Wang --- pkg/executor/test/analyzetest/analyze_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/executor/test/analyzetest/analyze_test.go b/pkg/executor/test/analyzetest/analyze_test.go index bdc058aaef5de..338830a04fb12 100644 --- a/pkg/executor/test/analyzetest/analyze_test.go +++ b/pkg/executor/test/analyzetest/analyze_test.go @@ -24,8 +24,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config/kerneltype" @@ -49,6 +47,7 @@ import ( "github.com/pingcap/tidb/pkg/testkit/analyzehelper" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" + "github.com/stretchr/testify/require" ) func TestAnalyzePartition(t *testing.T) { From 6f83757d5147c88b225a7da86ba6043459a2fc61 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 21:42:54 +0800 Subject: [PATCH 12/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze_col_v2.go | 47 +++++++++++++++++++-------------- pkg/util/sqlkiller/sqlkiller.go | 6 ++++- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 54683a9c868e9..91c74f8eb5000 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -251,7 +251,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( for i := range samplingStatsConcurrency { id := i gp.Go(func() { - e.subMergeWorker(ctx, mergeResultCh, mergeTaskCh, l, id) + e.subMergeWorker(taskCtx, mergeResultCh, mergeTaskCh, l, id) }) } // Merge the result from collectors. @@ -488,7 +488,10 @@ LOOP: statsHandle.FinishAnalyzeJob(results.Job, nil, statistics.TableAnalysisJob) totalResult.results[results.Ars[0].Hist[0].ID] = results case <-ctx.Done(): - err = ctx.Err() + err = context.Cause(ctx) + if err == nil { + err = ctx.Err() + } break LOOP } } @@ -638,11 +641,13 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, resultCh chan retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize)) } statsHandle := domain.GetDomain(e.ctx).StatsHandle() - select { - case data, ok := <-taskCh: - if !ok { - break - } + for { + select { + case data, ok := <-taskCh: + if !ok { + resultCh <- &samplingMergeResult{collector: retCollector} + return + } // Unmarshal the data. dataSize := int64(cap(data)) colResp := &tipb.AnalyzeColumnsResp{} @@ -675,22 +680,24 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, resultCh chan e.memTracker.Consume(newRetCollectorSize - oldRetCollectorSize - subCollectorSize) e.memTracker.Release(dataSize + colRespSize) subCollector.DestroyAndPutToPool() - case <-ctx.Done(): - err := context.Cause(ctx) - if err != nil { - resultCh <- &samplingMergeResult{err: err} - return - } - err = ctx.Err() - if err != nil { - resultCh <- &samplingMergeResult{err: err} + case <-ctx.Done(): + err := context.Cause(ctx) + if err != nil { + resultCh <- &samplingMergeResult{err: err} + return + } + err = ctx.Err() + if err != nil { + resultCh <- &samplingMergeResult{err: err} + return + } + if intest.InTest { + panic("this ctx should be canceled with the error") + } + resultCh <- &samplingMergeResult{err: errors.New("context canceled without error")} return } - if intest.InTest { - panic("this ctx should be canceled with the error") - } } - resultCh <- &samplingMergeResult{collector: retCollector} } func (e *AnalyzeColumnsExecV2) subBuildWorker(ctx context.Context, resultCh chan error, taskCh chan *samplingBuildTask, hists []*statistics.Histogram, topns []*statistics.TopN, collectors []*statistics.SampleCollector, exitCh chan struct{}) { diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index 7e4d4e883cb67..18621391d08a5 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -120,7 +120,11 @@ func (killer *SQLKiller) triggerKillEvent() { close(killer.killEvent.ch) } if killer.killEvent.ctx != nil && killer.killEvent.cancelFn != nil { - killer.killEvent.cancelFn(errKilled) + err := killer.getKillError(atomic.LoadUint32(&killer.Signal)) + if err == nil { + err = errKilled + } + killer.killEvent.cancelFn(err) } killer.killEvent.triggered = true } From 643191f3102d3587f5ac6f6d6eb69eea61d533d3 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 21:54:39 +0800 Subject: [PATCH 13/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze_col_v2.go | 64 +++++++++++++++++----------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 91c74f8eb5000..15b585223661e 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -648,38 +648,38 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, resultCh chan resultCh <- &samplingMergeResult{collector: retCollector} return } - // Unmarshal the data. - dataSize := int64(cap(data)) - colResp := &tipb.AnalyzeColumnsResp{} - err := colResp.Unmarshal(data) - if err != nil { - resultCh <- &samplingMergeResult{err: err} - return - } - // Consume the memory of the data. - colRespSize := int64(colResp.Size()) - e.memTracker.Consume(colRespSize) - - // Update processed rows. - subCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l) - subCollector.Base().FromProto(colResp.RowCollector, e.memTracker) - statsHandle.UpdateAnalyzeJobProgress(e.job, subCollector.Base().Count) - - // Print collect log. - oldRetCollectorSize := retCollector.Base().MemSize - oldRetCollectorCount := retCollector.Base().Count - retCollector.MergeCollector(subCollector) - newRetCollectorCount := retCollector.Base().Count - printAnalyzeMergeCollectorLog(oldRetCollectorCount, newRetCollectorCount, subCollector.Base().Count, - e.tableID.TableID, e.tableID.PartitionID, e.TableID.IsPartitionTable(), - "merge subCollector in concurrency in AnalyzeColumnsExecV2", index) - - // Consume the memory of the result. - newRetCollectorSize := retCollector.Base().MemSize - subCollectorSize := subCollector.Base().MemSize - e.memTracker.Consume(newRetCollectorSize - oldRetCollectorSize - subCollectorSize) - e.memTracker.Release(dataSize + colRespSize) - subCollector.DestroyAndPutToPool() + // Unmarshal the data. + dataSize := int64(cap(data)) + colResp := &tipb.AnalyzeColumnsResp{} + err := colResp.Unmarshal(data) + if err != nil { + resultCh <- &samplingMergeResult{err: err} + return + } + // Consume the memory of the data. + colRespSize := int64(colResp.Size()) + e.memTracker.Consume(colRespSize) + + // Update processed rows. + subCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l) + subCollector.Base().FromProto(colResp.RowCollector, e.memTracker) + statsHandle.UpdateAnalyzeJobProgress(e.job, subCollector.Base().Count) + + // Print collect log. + oldRetCollectorSize := retCollector.Base().MemSize + oldRetCollectorCount := retCollector.Base().Count + retCollector.MergeCollector(subCollector) + newRetCollectorCount := retCollector.Base().Count + printAnalyzeMergeCollectorLog(oldRetCollectorCount, newRetCollectorCount, subCollector.Base().Count, + e.tableID.TableID, e.tableID.PartitionID, e.TableID.IsPartitionTable(), + "merge subCollector in concurrency in AnalyzeColumnsExecV2", index) + + // Consume the memory of the result. + newRetCollectorSize := retCollector.Base().MemSize + subCollectorSize := subCollector.Base().MemSize + e.memTracker.Consume(newRetCollectorSize - oldRetCollectorSize - subCollectorSize) + e.memTracker.Release(dataSize + colRespSize) + subCollector.DestroyAndPutToPool() case <-ctx.Done(): err := context.Cause(ctx) if err != nil { From e9c35aa563a06098afe4c486c3d7ddee3ff49bf4 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 22:16:07 +0800 Subject: [PATCH 14/23] update Signed-off-by: Weizhen Wang --- pkg/util/sqlkiller/sqlkiller.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index 18621391d08a5..a2dec4d013021 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -102,7 +102,11 @@ func (killer *SQLKiller) GetKillEventCtx(parent context.Context) context.Context killer.killEvent.ctx, killer.killEvent.cancelFn = context.WithCancelCause(parent) } if killer.killEvent.triggered { - killer.killEvent.cancelFn(errKilled) + err := killer.getKillError(atomic.LoadUint32(&killer.Signal)) + if err == nil { + err = errKilled + } + killer.killEvent.cancelFn(err) } return killer.killEvent.ctx From da96e1162b6a90ae80022286661e1daf4f948fa1 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 26 Jan 2026 23:30:31 +0800 Subject: [PATCH 15/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 34 ++++++++++++++++++++++++++-- pkg/util/sqlkiller/sqlkiller.go | 39 --------------------------------- 2 files changed, 32 insertions(+), 41 deletions(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 6e219db4b178f..cab7f9a1cd381 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -24,6 +24,7 @@ import ( "slices" "strconv" "strings" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -102,7 +103,8 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) { statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() infoSchema := sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema() sessionVars := e.Ctx().GetSessionVars() - ctx = e.Ctx().GetSessionVars().SQLKiller.GetKillEventCtx(ctx) + ctx, cancel := e.buildAnalyzeKillCtx(ctx) + defer cancel(nil) // Filter the locked tables. tasks, needAnalyzeTableCnt, skippedTables, err := filterAndCollectTasks(e.tasks, statsHandle, infoSchema) @@ -505,6 +507,34 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( return err } +func (e *AnalyzeExec) buildAnalyzeKillCtx(parent context.Context) (context.Context, context.CancelCauseFunc) { + ctx, cancel := context.WithCancelCause(parent) + killer := &e.Ctx().GetSessionVars().SQLKiller + killCh := killer.GetKillEventChan() + ticker := time.NewTicker(100 * time.Millisecond) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-killCh: + if err := killer.HandleSignal(); err != nil { + cancel(err) + return + } + killCh = nil + case <-ticker.C: + if err := killer.HandleSignal(); err != nil { + cancel(err) + return + } + } + } + }() + return ctx, cancel +} + func analyzeWorkerExitErr(ctx context.Context, errExitCh <-chan struct{}) error { select { case <-ctx.Done(): @@ -542,7 +572,7 @@ func (e *AnalyzeExec) sendAnalyzeResult(ctx context.Context, statsHandle *handle finishJobWithLog(statsHandle, result.Job, err) } -// ctx must be from SQLKiller.GetKillEventCtx +// ctx must be from AnalyzeExec.buildAnalyzeKillCtx func (e *AnalyzeExec) analyzeWorker(ctx context.Context, taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) { var task *analyzeTask statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index a2dec4d013021..6902efc6b238a 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -15,7 +15,6 @@ package sqlkiller import ( - "context" "math/rand" "sync" "sync/atomic" @@ -51,8 +50,6 @@ type SQLKiller struct { Finish func() killEvent struct { ch chan struct{} - cancelFn context.CancelCauseFunc - ctx context.Context desc string sync.Mutex triggered bool @@ -88,30 +85,6 @@ func (killer *SQLKiller) GetKillEventChan() <-chan struct{} { return killer.killEvent.ch } -// GetKillEventCtx returns a context which will be canceled when the kill signal is sent. -func (killer *SQLKiller) GetKillEventCtx(parent context.Context) context.Context { - killer.killEvent.Lock() - defer killer.killEvent.Unlock() - - if killer.killEvent.ctx != nil { - return killer.killEvent.ctx - } - if parent == nil { - killer.killEvent.ctx, killer.killEvent.cancelFn = context.WithCancelCause(context.Background()) - } else { - killer.killEvent.ctx, killer.killEvent.cancelFn = context.WithCancelCause(parent) - } - if killer.killEvent.triggered { - err := killer.getKillError(atomic.LoadUint32(&killer.Signal)) - if err == nil { - err = errKilled - } - killer.killEvent.cancelFn(err) - } - - return killer.killEvent.ctx -} - func (killer *SQLKiller) triggerKillEvent() { killer.killEvent.Lock() defer killer.killEvent.Unlock() @@ -123,13 +96,6 @@ func (killer *SQLKiller) triggerKillEvent() { if killer.killEvent.ch != nil { close(killer.killEvent.ch) } - if killer.killEvent.ctx != nil && killer.killEvent.cancelFn != nil { - err := killer.getKillError(atomic.LoadUint32(&killer.Signal)) - if err == nil { - err = errKilled - } - killer.killEvent.cancelFn(err) - } killer.killEvent.triggered = true } @@ -140,12 +106,7 @@ func (killer *SQLKiller) resetKillEvent() { if !killer.killEvent.triggered && killer.killEvent.ch != nil { close(killer.killEvent.ch) } - if !killer.killEvent.triggered && killer.killEvent.ctx != nil && killer.killEvent.cancelFn != nil { - killer.killEvent.cancelFn(errors.New("sql killer: killed by resetting sql killer")) - } killer.killEvent.ch = nil - killer.killEvent.ctx = nil - killer.killEvent.cancelFn = nil killer.killEvent.triggered = false killer.killEvent.desc = "" } From 60610cf74c9c42544b2d3bc08f13a1c138cbb180 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 27 Jan 2026 00:36:10 +0800 Subject: [PATCH 16/23] update Signed-off-by: Weizhen Wang --- pkg/util/sqlkiller/sqlkiller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index 6902efc6b238a..e92ecf996e67c 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -49,8 +49,8 @@ var errKilled = errors.New("it has been killed by the sql killer") type SQLKiller struct { Finish func() killEvent struct { - ch chan struct{} - desc string + ch chan struct{} + desc string sync.Mutex triggered bool } From ee89249d77b8a9b75e601ff676b975f18d977a45 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 27 Jan 2026 10:30:08 +0800 Subject: [PATCH 17/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 29 ++++++++++++++--------------- pkg/util/sqlkiller/sqlkiller.go | 3 --- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index cab7f9a1cd381..a3f7de86f8d4c 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -24,7 +24,6 @@ import ( "slices" "strconv" "strings" - "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -103,8 +102,8 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) { statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() infoSchema := sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema() sessionVars := e.Ctx().GetSessionVars() - ctx, cancel := e.buildAnalyzeKillCtx(ctx) - defer cancel(nil) + ctx, stop := e.buildAnalyzeKillCtx(ctx) + defer stop() // Filter the locked tables. tasks, needAnalyzeTableCnt, skippedTables, err := filterAndCollectTasks(e.tasks, statsHandle, infoSchema) @@ -507,32 +506,32 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( return err } -func (e *AnalyzeExec) buildAnalyzeKillCtx(parent context.Context) (context.Context, context.CancelCauseFunc) { +func (e *AnalyzeExec) buildAnalyzeKillCtx(parent context.Context) (context.Context, func()) { ctx, cancel := context.WithCancelCause(parent) killer := &e.Ctx().GetSessionVars().SQLKiller killCh := killer.GetKillEventChan() - ticker := time.NewTicker(100 * time.Millisecond) + stopCh := make(chan struct{}) go func() { - defer ticker.Stop() for { select { case <-ctx.Done(): return + case <-stopCh: + return case <-killCh: - if err := killer.HandleSignal(); err != nil { - cancel(err) - return - } - killCh = nil - case <-ticker.C: - if err := killer.HandleSignal(); err != nil { - cancel(err) + status := killer.GetKillSignal() + if status == 0 { return } + err := exeerrors.ErrQueryInterrupted + cancel(err) + return } } }() - return ctx, cancel + return ctx, func() { + close(stopCh) + } } func analyzeWorkerExitErr(ctx context.Context, errExitCh <-chan struct{}) error { diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index e92ecf996e67c..363a07269708f 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/intest" @@ -43,8 +42,6 @@ const ( // so that errors in client can be correctly converted to tidb errors. ) -var errKilled = errors.New("it has been killed by the sql killer") - // SQLKiller is used to kill a query. type SQLKiller struct { Finish func() From 4cb00b435f52ecb0684cc4db18615946f0ec4bfd Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 27 Jan 2026 13:20:19 +0800 Subject: [PATCH 18/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index a3f7de86f8d4c..02d01a7afd2bd 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -472,6 +472,15 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( break } if results.Err != nil { + if intest.InTest && stderrors.Is(results.Err, context.Canceled) { + statslogutil.StatsLogger().Info("analyze result canceled", + zap.Uint32("killSignal", e.Ctx().GetSessionVars().SQLKiller.GetKillSignal()), + zap.Uint64("connID", e.Ctx().GetSessionVars().ConnectionID), + zap.String("jobInfo", results.Job.String()), + zap.Error(results.Err), + zap.Stack("stack"), + ) + } err = results.Err if isAnalyzeWorkerPanic(err) { panicCnt++ From ff0aec87a2b81d04ff2a03342003193fa1161128 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 27 Jan 2026 13:51:11 +0800 Subject: [PATCH 19/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze_col.go | 36 +++++++++++++++++++++++++++++++++--- pkg/executor/analyze_idx.go | 15 +++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/pkg/executor/analyze_col.go b/pkg/executor/analyze_col.go index d31f36e2bc0c4..e30e1ad986d7d 100644 --- a/pkg/executor/analyze_col.go +++ b/pkg/executor/analyze_col.go @@ -16,6 +16,7 @@ package executor import ( "context" + stderrors "errors" "fmt" "math" "strings" @@ -32,14 +33,17 @@ import ( "github.com/pingcap/tidb/pkg/planner/core" plannerutil "github.com/pingcap/tidb/pkg/planner/util" "github.com/pingcap/tidb/pkg/statistics" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/ranger" "github.com/pingcap/tipb/go-tipb" "github.com/tiancaiamao/gp" + "go.uber.org/zap" ) // AnalyzeColumnsExec represents Analyze columns push down executor. @@ -66,9 +70,35 @@ type AnalyzeColumnsExec struct { func analyzeColumnsPushDownEntry(ctx context.Context, gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults { if e.AnalyzeInfo.StatsVersion >= statistics.Version2 { - return e.toV2().analyzeColumnsPushDownV2(ctx, gp) - } - return e.toV1().analyzeColumnsPushDownV1(ctx) + res := e.toV2().analyzeColumnsPushDownV2(ctx, gp) + if intest.InTest && res.Err != nil && stderrors.Is(res.Err, context.Canceled) { + cause := context.Cause(ctx) + ctxErr := ctx.Err() + statslogutil.StatsLogger().Info("analyze columns canceled", + zap.Uint32("killSignal", e.ctx.GetSessionVars().SQLKiller.GetKillSignal()), + zap.Uint64("connID", e.ctx.GetSessionVars().ConnectionID), + zap.Error(res.Err), + zap.Error(cause), + zap.Error(ctxErr), + zap.Stack("stack"), + ) + } + return res + } + res := e.toV1().analyzeColumnsPushDownV1(ctx) + if intest.InTest && res.Err != nil && stderrors.Is(res.Err, context.Canceled) { + cause := context.Cause(ctx) + ctxErr := ctx.Err() + statslogutil.StatsLogger().Info("analyze columns canceled", + zap.Uint32("killSignal", e.ctx.GetSessionVars().SQLKiller.GetKillSignal()), + zap.Uint64("connID", e.ctx.GetSessionVars().ConnectionID), + zap.Error(res.Err), + zap.Error(cause), + zap.Error(ctxErr), + zap.Stack("stack"), + ) + } + return res } func (e *AnalyzeColumnsExec) toV1() *AnalyzeColumnsExecV1 { diff --git a/pkg/executor/analyze_idx.go b/pkg/executor/analyze_idx.go index f4c217b123ec1..057e280cd74b6 100644 --- a/pkg/executor/analyze_idx.go +++ b/pkg/executor/analyze_idx.go @@ -16,6 +16,7 @@ package executor import ( "context" + stderrors "errors" "math" "time" @@ -29,8 +30,10 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/ranger" "github.com/pingcap/tipb/go-tipb" @@ -59,6 +62,18 @@ func analyzeIndexPushdown(ctx context.Context, idxExec *AnalyzeIndexExec) *stati } hist, cms, fms, topN, err := idxExec.buildStats(ctx, ranges, true) if err != nil { + if intest.InTest && stderrors.Is(err, context.Canceled) { + cause := context.Cause(ctx) + ctxErr := ctx.Err() + statslogutil.StatsLogger().Info("analyze index canceled", + zap.Uint32("killSignal", idxExec.ctx.GetSessionVars().SQLKiller.GetKillSignal()), + zap.Uint64("connID", idxExec.ctx.GetSessionVars().ConnectionID), + zap.Error(err), + zap.Error(cause), + zap.Error(ctxErr), + zap.Stack("stack"), + ) + } return &statistics.AnalyzeResults{Err: err, Job: idxExec.job} } var statsVer = statistics.Version1 From cae7fa435c857525d9a3525774376a83521a1f07 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 27 Jan 2026 13:53:36 +0800 Subject: [PATCH 20/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 25 ++- pkg/executor/analyze_col.go | 55 +++---- pkg/executor/analyze_col_v2.go | 145 ++++++++++++++---- pkg/executor/analyze_idx.go | 42 +++-- pkg/executor/analyze_utils.go | 13 ++ pkg/executor/table_reader.go | 4 +- pkg/executor/test/analyzetest/analyze_test.go | 21 ++- .../memorycontrol/memory_control_test.go | 2 +- .../handle/autoanalyze/autoanalyze.go | 50 +++--- pkg/util/sqlkiller/BUILD.bazel | 1 - 10 files changed, 264 insertions(+), 94 deletions(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 02d01a7afd2bd..6f3a1a6d82825 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -176,6 +176,11 @@ TASKLOOP: err = e.waitFinish(ctx, g, resultsCh) if err != nil { + if stderrors.Is(err, context.Canceled) { + if cause := context.Cause(ctx); cause != nil { + err = cause + } + } return err } @@ -473,10 +478,23 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( } if results.Err != nil { if intest.InTest && stderrors.Is(results.Err, context.Canceled) { + jobInfo := "" + dbName := "" + tableName := "" + partitionName := "" + if results.Job != nil { + jobInfo = results.Job.JobInfo + dbName = results.Job.DBName + tableName = results.Job.TableName + partitionName = results.Job.PartitionName + } statslogutil.StatsLogger().Info("analyze result canceled", zap.Uint32("killSignal", e.Ctx().GetSessionVars().SQLKiller.GetKillSignal()), zap.Uint64("connID", e.Ctx().GetSessionVars().ConnectionID), - zap.String("jobInfo", results.Job.String()), + zap.String("jobInfo", jobInfo), + zap.String("dbName", dbName), + zap.String("tableName", tableName), + zap.String("partitionName", partitionName), zap.Error(results.Err), zap.Stack("stack"), ) @@ -532,7 +550,10 @@ func (e *AnalyzeExec) buildAnalyzeKillCtx(parent context.Context) (context.Conte if status == 0 { return } - err := exeerrors.ErrQueryInterrupted + err := killer.HandleSignal() + if err == nil { + err = exeerrors.ErrQueryInterrupted + } cancel(err) return } diff --git a/pkg/executor/analyze_col.go b/pkg/executor/analyze_col.go index e30e1ad986d7d..0a9ee89d53b85 100644 --- a/pkg/executor/analyze_col.go +++ b/pkg/executor/analyze_col.go @@ -71,36 +71,30 @@ type AnalyzeColumnsExec struct { func analyzeColumnsPushDownEntry(ctx context.Context, gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults { if e.AnalyzeInfo.StatsVersion >= statistics.Version2 { res := e.toV2().analyzeColumnsPushDownV2(ctx, gp) - if intest.InTest && res.Err != nil && stderrors.Is(res.Err, context.Canceled) { - cause := context.Cause(ctx) - ctxErr := ctx.Err() - statslogutil.StatsLogger().Info("analyze columns canceled", - zap.Uint32("killSignal", e.ctx.GetSessionVars().SQLKiller.GetKillSignal()), - zap.Uint64("connID", e.ctx.GetSessionVars().ConnectionID), - zap.Error(res.Err), - zap.Error(cause), - zap.Error(ctxErr), - zap.Stack("stack"), - ) - } + e.logAnalyzeCanceledInTest(ctx, res.Err, "analyze columns canceled") return res } res := e.toV1().analyzeColumnsPushDownV1(ctx) - if intest.InTest && res.Err != nil && stderrors.Is(res.Err, context.Canceled) { - cause := context.Cause(ctx) - ctxErr := ctx.Err() - statslogutil.StatsLogger().Info("analyze columns canceled", - zap.Uint32("killSignal", e.ctx.GetSessionVars().SQLKiller.GetKillSignal()), - zap.Uint64("connID", e.ctx.GetSessionVars().ConnectionID), - zap.Error(res.Err), - zap.Error(cause), - zap.Error(ctxErr), - zap.Stack("stack"), - ) - } + e.logAnalyzeCanceledInTest(ctx, res.Err, "analyze columns canceled") return res } +func (e *AnalyzeColumnsExec) logAnalyzeCanceledInTest(ctx context.Context, err error, msg string) { + if !intest.InTest || err == nil || !stderrors.Is(err, context.Canceled) { + return + } + cause := context.Cause(ctx) + ctxErr := ctx.Err() + statslogutil.StatsLogger().Info(msg, + zap.Uint32("killSignal", e.ctx.GetSessionVars().SQLKiller.GetKillSignal()), + zap.Uint64("connID", e.ctx.GetSessionVars().ConnectionID), + zap.Error(err), + zap.Error(cause), + zap.Error(ctxErr), + zap.Stack("stack"), + ) +} + func (e *AnalyzeColumnsExec) toV1() *AnalyzeColumnsExecV1 { return &AnalyzeColumnsExecV1{ AnalyzeColumnsExec: e, @@ -162,6 +156,7 @@ func (e *AnalyzeColumnsExec) buildResp(ctx context.Context, ranges []*ranger.Ran } result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetDistSQLCtx()) if err != nil { + e.logAnalyzeCanceledInTest(ctx, err, "analyze columns distsql canceled") return nil, err } return result, nil @@ -215,11 +210,19 @@ func (e *AnalyzeColumnsExec) buildStats(ctx context.Context, ranges []*ranger.Ra return nil, nil, nil, nil, nil, err } failpoint.Inject("mockSlowAnalyzeV1", func() { - time.Sleep(1000 * time.Second) + select { + case <-ctx.Done(): + err := context.Cause(ctx) + if err == nil { + err = ctx.Err() + } + failpoint.Return(nil, nil, nil, nil, nil, err) + case <-time.After(1000 * time.Second): + } }) data, err1 := e.resultHandler.nextRaw(ctx) if err1 != nil { - return nil, nil, nil, nil, nil, err1 + return nil, nil, nil, nil, nil, normalizeCtxErrWithCause(ctx, err1) } if data == nil { break diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 15b585223661e..98f22e299b8eb 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -236,7 +236,9 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( // Start workers to merge the result from collectors. mergeResultCh := make(chan *samplingMergeResult, 1) mergeTaskCh := make(chan []byte, 1) - taskEg, taskCtx := errgroup.WithContext(ctx) + taskCtx, taskCancel := context.WithCancelCause(ctx) + defer taskCancel(nil) + var taskEg errgroup.Group // Start read data from resultHandler and send them to mergeTaskCh. taskEg.Go(func() (err error) { defer func() { @@ -244,19 +246,23 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( err = getAnalyzePanicErr(r) } }() - return readDataAndSendTask(taskCtx, e.ctx, e.resultHandler, mergeTaskCh, e.memTracker) + err = readDataAndSendTask(taskCtx, e.ctx, e.resultHandler, mergeTaskCh, e.memTracker) + if err != nil { + taskCancel(err) + } + return err }) e.samplingMergeWg = &util.WaitGroupWrapper{} e.samplingMergeWg.Add(samplingStatsConcurrency) + mergeWorkerPanicCnt := 0 + mergeEg, mergeCtx := errgroup.WithContext(taskCtx) for i := range samplingStatsConcurrency { id := i gp.Go(func() { - e.subMergeWorker(taskCtx, mergeResultCh, mergeTaskCh, l, id) + e.subMergeWorker(mergeCtx, taskCtx, mergeResultCh, mergeTaskCh, l, id) }) } // Merge the result from collectors. - mergeWorkerPanicCnt := 0 - mergeEg, mergeCtx := errgroup.WithContext(taskCtx) mergeEg.Go(func() (err error) { defer func() { if r := recover(); r != nil { @@ -290,15 +296,36 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( }) err = taskEg.Wait() if err != nil { - mergeCtx.Done() + err = normalizeCtxErrWithCause(taskCtx, err) + if intest.InTest { + cause := context.Cause(taskCtx) + ctxErr := taskCtx.Err() + logutil.BgLogger().Info("analyze columns read task failed", + zap.Uint32("killSignal", e.ctx.GetSessionVars().SQLKiller.GetKillSignal()), + zap.Uint64("connID", e.ctx.GetSessionVars().ConnectionID), + zap.Error(err), + zap.Bool("isCtxCanceled", stderrors.Is(err, context.Canceled)), + zap.Error(cause), + zap.Error(ctxErr), + zap.Stack("stack"), + ) + } if err1 := mergeEg.Wait(); err1 != nil { - err = stderrors.Join(err, err1) + err1 = normalizeCtxErrWithCause(taskCtx, err1) + if !stderrors.Is(err1, err) && err1.Error() != err.Error() { + err = stderrors.Join(err, err1) + } } return 0, nil, nil, nil, nil, getAnalyzePanicErr(err) } err = mergeEg.Wait() + if err != nil { + err = normalizeCtxErrWithCause(taskCtx, err) + } defer e.memTracker.Release(rootRowCollector.Base().MemSize) if err != nil { + taskCancel(err) + e.logAnalyzeCanceledInTest(mergeCtx, err, "analyze columns merge canceled") return 0, nil, nil, nil, nil, err } @@ -448,8 +475,12 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(ctx context.Context, i }() tasks := e.buildSubIndexJobForSpecialIndex(indexInfos) taskCh := make(chan *analyzeTask, len(tasks)) + pendingJobs := make(map[uint64]*statistics.AnalyzeJob, len(tasks)) for _, task := range tasks { AddNewAnalyzeJob(e.ctx, task.job) + if task.job != nil && task.job.ID != nil { + pendingJobs[*task.job.ID] = task.job + } } resultsCh := make(chan *statistics.AnalyzeResults, len(tasks)) if len(tasks) < samplingStatsConcurrency { @@ -472,27 +503,35 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(ctx context.Context, i statsHandle := domain.GetDomain(e.ctx).StatsHandle() LOOP: for panicCnt < samplingStatsConcurrency { - select { - case results, ok := <-resultsCh: - if !ok { - break LOOP - } - if results.Err != nil { - err = results.Err - statsHandle.FinishAnalyzeJob(results.Job, err, statistics.TableAnalysisJob) - if isAnalyzeWorkerPanic(err) { - panicCnt++ - } - continue LOOP + results, ok := <-resultsCh + if !ok { + break LOOP + } + if results.Job != nil && results.Job.ID != nil { + delete(pendingJobs, *results.Job.ID) + } + if results.Err != nil { + err = results.Err + statsHandle.FinishAnalyzeJob(results.Job, err, statistics.TableAnalysisJob) + if isAnalyzeWorkerPanic(err) { + panicCnt++ } - statsHandle.FinishAnalyzeJob(results.Job, nil, statistics.TableAnalysisJob) - totalResult.results[results.Ars[0].Hist[0].ID] = results - case <-ctx.Done(): + continue LOOP + } + statsHandle.FinishAnalyzeJob(results.Job, nil, statistics.TableAnalysisJob) + totalResult.results[results.Ars[0].Hist[0].ID] = results + } + if err == nil { + if ctxErr := ctx.Err(); ctxErr != nil { err = context.Cause(ctx) if err == nil { - err = ctx.Err() + err = ctxErr } - break LOOP + } + } + if err != nil && len(pendingJobs) > 0 { + for _, job := range pendingJobs { + statsHandle.FinishAnalyzeJob(job, err, statistics.TableAnalysisJob) } } if err != nil { @@ -604,7 +643,7 @@ func (e *AnalyzeColumnsExecV2) buildSubIndexJobForSpecialIndex(indexInfos []*mod return tasks } -func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, resultCh chan<- *samplingMergeResult, taskCh <-chan []byte, l int, index int) { +func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, parentCtx context.Context, resultCh chan<- *samplingMergeResult, taskCh <-chan []byte, l int, index int) { // Only close the resultCh in the first worker. closeTheResultCh := index == 0 defer func() { @@ -640,6 +679,10 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, resultCh chan for range l { retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize)) } + cleanupCollector := func() { + // Ensure collector resources are released on early exit paths. + retCollector.DestroyAndPutToPool() + } statsHandle := domain.GetDomain(e.ctx).StatsHandle() for { select { @@ -653,6 +696,7 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, resultCh chan colResp := &tipb.AnalyzeColumnsResp{} err := colResp.Unmarshal(data) if err != nil { + cleanupCollector() resultCh <- &samplingMergeResult{err: err} return } @@ -682,24 +726,51 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(ctx context.Context, resultCh chan subCollector.DestroyAndPutToPool() case <-ctx.Done(): err := context.Cause(ctx) + if (err == nil || stderrors.Is(err, context.Canceled)) && parentCtx != nil { + parentErr := context.Cause(parentCtx) + if parentErr != nil { + err = parentErr + } + } if err != nil { + e.logAnalyzeCanceledInTest(ctx, err, "analyze columns subMergeWorker canceled") + cleanupCollector() resultCh <- &samplingMergeResult{err: err} return } err = ctx.Err() if err != nil { + e.logAnalyzeCanceledInTest(ctx, err, "analyze columns subMergeWorker canceled") + cleanupCollector() resultCh <- &samplingMergeResult{err: err} return } if intest.InTest { panic("this ctx should be canceled with the error") } + cleanupCollector() resultCh <- &samplingMergeResult{err: errors.New("context canceled without error")} return } } } +func (e *AnalyzeColumnsExecV2) logAnalyzeCanceledInTest(ctx context.Context, err error, msg string) { + if !intest.InTest || err == nil || !stderrors.Is(err, context.Canceled) { + return + } + cause := context.Cause(ctx) + ctxErr := ctx.Err() + logutil.BgLogger().Info(msg, + zap.Uint32("killSignal", e.ctx.GetSessionVars().SQLKiller.GetKillSignal()), + zap.Uint64("connID", e.ctx.GetSessionVars().ConnectionID), + zap.Error(err), + zap.Error(cause), + zap.Error(ctxErr), + zap.Stack("stack"), + ) +} + func (e *AnalyzeColumnsExecV2) subBuildWorker(ctx context.Context, resultCh chan error, taskCh chan *samplingBuildTask, hists []*statistics.Histogram, topns []*statistics.TopN, collectors []*statistics.SampleCollector, exitCh chan struct{}) { defer func() { if r := recover(); r != nil { @@ -910,11 +981,33 @@ func readDataAndSendTask(ctx context.Context, sctx sessionctx.Context, handler * return err } failpoint.Inject("mockSlowAnalyzeV2", func() { - time.Sleep(1000 * time.Second) + select { + case <-ctx.Done(): + err := context.Cause(ctx) + if err == nil { + err = ctx.Err() + } + failpoint.Return(err) + case <-time.After(1000 * time.Second): + } }) data, err := handler.nextRaw(ctx) if err != nil { + err = normalizeCtxErrWithCause(ctx, err) + if intest.InTest { + cause := context.Cause(ctx) + ctxErr := ctx.Err() + logutil.BgLogger().Info("analyze columns nextRaw failed", + zap.Uint32("killSignal", sctx.GetSessionVars().SQLKiller.GetKillSignal()), + zap.Uint64("connID", sctx.GetSessionVars().ConnectionID), + zap.Error(err), + zap.Bool("isCtxCanceled", stderrors.Is(err, context.Canceled)), + zap.Error(cause), + zap.Error(ctxErr), + zap.Stack("stack"), + ) + } return errors.Trace(err) } if data == nil { diff --git a/pkg/executor/analyze_idx.go b/pkg/executor/analyze_idx.go index 057e280cd74b6..a4ec54e6a8e97 100644 --- a/pkg/executor/analyze_idx.go +++ b/pkg/executor/analyze_idx.go @@ -62,18 +62,7 @@ func analyzeIndexPushdown(ctx context.Context, idxExec *AnalyzeIndexExec) *stati } hist, cms, fms, topN, err := idxExec.buildStats(ctx, ranges, true) if err != nil { - if intest.InTest && stderrors.Is(err, context.Canceled) { - cause := context.Cause(ctx) - ctxErr := ctx.Err() - statslogutil.StatsLogger().Info("analyze index canceled", - zap.Uint32("killSignal", idxExec.ctx.GetSessionVars().SQLKiller.GetKillSignal()), - zap.Uint64("connID", idxExec.ctx.GetSessionVars().ConnectionID), - zap.Error(err), - zap.Error(cause), - zap.Error(ctxErr), - zap.Stack("stack"), - ) - } + idxExec.logAnalyzeCanceledInTest(ctx, err, "analyze index canceled") return &statistics.AnalyzeResults{Err: err, Job: idxExec.job} } var statsVer = statistics.Version1 @@ -183,6 +172,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ctx context.Context, ranges []*ran } result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetDistSQLCtx()) if err != nil { + e.logAnalyzeCanceledInTest(ctx, err, "analyze index distsql canceled") return err } if isNullRange { @@ -228,10 +218,20 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(killerCtx context.Context, resul default: } failpoint.Inject("mockSlowAnalyzeIndex", func() { - time.Sleep(1000 * time.Second) + select { + case <-killerCtx.Done(): + err := context.Cause(killerCtx) + if err == nil { + err = killerCtx.Err() + } + failpoint.Return(nil, nil, nil, nil, err) + case <-time.After(1000 * time.Second): + } }) data, err := result.NextRaw(killerCtx) if err != nil { + err = normalizeCtxErrWithCause(killerCtx, err) + e.logAnalyzeCanceledInTest(killerCtx, err, "analyze index nextRaw canceled") return nil, nil, nil, nil, err } if data == nil { @@ -260,6 +260,22 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(killerCtx context.Context, resul return hist, cms, fms, topn, nil } +func (e *AnalyzeIndexExec) logAnalyzeCanceledInTest(ctx context.Context, err error, msg string) { + if !intest.InTest || err == nil || !stderrors.Is(err, context.Canceled) { + return + } + cause := context.Cause(ctx) + ctxErr := ctx.Err() + statslogutil.StatsLogger().Info(msg, + zap.Uint32("killSignal", e.ctx.GetSessionVars().SQLKiller.GetKillSignal()), + zap.Uint64("connID", e.ctx.GetSessionVars().ConnectionID), + zap.Error(err), + zap.Error(cause), + zap.Error(ctxErr), + zap.Stack("stack"), + ) +} + func (e *AnalyzeIndexExec) buildSimpleStats(killerCtx context.Context, ranges []*ranger.Range, considerNull bool) (fms *statistics.FMSketch, nullHist *statistics.Histogram, err error) { if err = e.open(killerCtx, ranges, considerNull); err != nil { return nil, nil, err diff --git a/pkg/executor/analyze_utils.go b/pkg/executor/analyze_utils.go index b1b0b45ac2854..db62f30dd37cb 100644 --- a/pkg/executor/analyze_utils.go +++ b/pkg/executor/analyze_utils.go @@ -16,6 +16,7 @@ package executor import ( "context" + stderrors "errors" "strconv" "sync" @@ -108,6 +109,18 @@ func getAnalyzePanicErr(r any) error { return errors.Trace(errAnalyzeWorkerPanic) } +func normalizeCtxErrWithCause(ctx context.Context, err error) error { + if err == nil { + return nil + } + if stderrors.Is(err, context.Canceled) || stderrors.Is(err, context.DeadlineExceeded) { + if cause := context.Cause(ctx); cause != nil { + return cause + } + } + return err +} + // analyzeResultsNotifyWaitGroupWrapper is a wrapper for sync.WaitGroup // Please add all goroutine count when to `Add` to avoid exiting in advance. type analyzeResultsNotifyWaitGroupWrapper struct { diff --git a/pkg/executor/table_reader.go b/pkg/executor/table_reader.go index dc150673f7bf7..d375e79c01776 100644 --- a/pkg/executor/table_reader.go +++ b/pkg/executor/table_reader.go @@ -690,7 +690,7 @@ func (tr *tableResultHandler) nextRaw(ctx context.Context) (data []byte, err err if !tr.optionalFinished { data, err = tr.optionalResult.NextRaw(ctx) if err != nil { - return nil, err + return nil, normalizeCtxErrWithCause(ctx, err) } if data != nil { return data, nil @@ -699,7 +699,7 @@ func (tr *tableResultHandler) nextRaw(ctx context.Context) (data []byte, err err } data, err = tr.result.NextRaw(ctx) if err != nil { - return nil, err + return nil, normalizeCtxErrWithCause(ctx, err) } return data, nil } diff --git a/pkg/executor/test/analyzetest/analyze_test.go b/pkg/executor/test/analyzetest/analyze_test.go index 338830a04fb12..1c18f955c0d8c 100644 --- a/pkg/executor/test/analyzetest/analyze_test.go +++ b/pkg/executor/test/analyzetest/analyze_test.go @@ -17,7 +17,6 @@ package analyzetest import ( "context" "encoding/json" - goerrors "errors" "fmt" "strconv" "strings" @@ -142,6 +141,7 @@ func TestAnalyzeRestrict(t *testing.T) { rs, err := tk.Session().ExecuteInternal(ctx, "analyze table t") require.Nil(t, err) require.Nil(t, rs) + tk.MustExec("truncate table mysql.analyze_jobs") t.Run("cancel_on_ctx", func(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int)") @@ -153,18 +153,27 @@ func TestAnalyzeRestrict(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/mockAnalyzeRequestWaitForCancel")) }() - ctx, cancel := context.WithCancel(context.Background()) + baseCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + ctx, cancel := context.WithCancel(baseCtx) done := make(chan error, 1) go func() { _, err := tk.Session().ExecuteInternal(ctx, "analyze table t") done <- err }() - cancel() select { case err := <-done: - require.Error(t, err) - require.True(t, goerrors.Is(err, context.Canceled)) + t.Fatalf("analyze finished before cancel, err=%v", err) + case <-time.After(50 * time.Millisecond): + } + cancel() + + select { + case <-done: + rows := tk.MustQuery("select state, fail_reason from mysql.analyze_jobs where table_name = 't' order by end_time desc limit 1").Rows() + require.Len(t, rows, 1) + require.Equal(t, "failed", strings.ToLower(rows[0][0].(string))) + require.Contains(t, rows[0][1].(string), "context canceled") case <-time.After(5 * time.Second): t.Fatal("analyze does not stop after context canceled") } @@ -1986,6 +1995,7 @@ func testKillAutoAnalyze(t *testing.T, ver int) { }() } require.True(t, h.HandleAutoAnalyze(), comment) + require.NoError(t, h.Update(context.Background(), is)) currentVersion := h.GetPhysicalTableStats(tableInfo.ID, tableInfo).Version if status == "finished" { // If we kill a finished job, after kill command the status is still finished and the table stats are updated. @@ -2062,6 +2072,7 @@ func TestKillAutoAnalyzeIndex(t *testing.T) { }() } require.True(t, h.HandleAutoAnalyze(), comment) + require.NoError(t, h.Update(context.Background(), is)) currentVersion := h.GetPhysicalTableStats(tblInfo.ID, tblInfo).Version if status == "finished" { // If we kill a finished job, after kill command the status is still finished and the index stats are updated. diff --git a/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go b/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go index 66462343c208d..bb9ff3cd90525 100644 --- a/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go +++ b/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go @@ -98,7 +98,7 @@ func TestGlobalMemoryControlForPrepareAnalyze(t *testing.T) { require.NoError(t, err0) _, err1 := tk0.Exec(sqlExecute) // Killed and the WarnMsg is WarnMsgSuffixForInstance instead of WarnMsgSuffixForSingleQuery - require.True(t, strings.Contains(err1.Error(), "Your query has been cancelled due to exceeding the allowed memory limit for the tidb-server instance and this query is currently using the most memory. Please try narrowing your query scope or increase the tidb_server_memory_limit and try again.")) + require.True(t, strings.Contains(err1.Error(), "Your query has been cancelled due to exceeding the allowed memory limit for the tidb-server instance and this query is currently using the most memory. Please try narrowing your query scope or increase the tidb_server_memory_limit and try again."), err1.Error()) runtime.GC() require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/memory/ReadMemStats")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/mockAnalyzeMergeWorkerSlowConsume")) diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index 5d92137e24c53..e83fb3132b1aa 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -845,34 +845,48 @@ func finishAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy } job.EndTime = time.Now() - var sql string - var args []any + setStartTime := false + if job.StartTime.IsZero() { + // If the job is killed before it starts, ensure start_time is set for display. + job.StartTime = job.EndTime + setStartTime = true + } // process_id is used to see which process is running the analyze job and kill the analyze job. After the analyze job // is finished(or failed), process_id is useless and we set it to NULL to avoid `kill tidb process_id` wrongly. + state := statistics.AnalyzeFinished + failReason := "" if analyzeErr != nil { - failReason := analyzeErr.Error() + state = statistics.AnalyzeFailed + failReason = analyzeErr.Error() const textMaxLength = 65535 if len(failReason) > textMaxLength { failReason = failReason[:textMaxLength] } + } - if analyzeType == statistics.TableAnalysisJob { - sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?" - args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID} - } else { - sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?" - args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID} - } - } else { - if analyzeType == statistics.TableAnalysisJob { - sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?" - args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID} - } else { - sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?" - args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID} - } + setClauses := make([]string, 0, 6) + args := make([]any, 0, 6) + if setStartTime { + setClauses = append(setClauses, "start_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)") + args = append(args, job.StartTime.UTC().Format(types.TimeFormat)) } + if analyzeType == statistics.TableAnalysisJob { + setClauses = append(setClauses, "processed_rows = processed_rows + %?") + args = append(args, job.Progress.GetDeltaCount()) + } + setClauses = append(setClauses, "end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)") + args = append(args, job.EndTime.UTC().Format(types.TimeFormat)) + setClauses = append(setClauses, "state = %?") + args = append(args, state) + if analyzeErr != nil { + setClauses = append(setClauses, "fail_reason = %?") + args = append(args, failReason) + } + setClauses = append(setClauses, "process_id = NULL") + + sql := fmt.Sprintf("UPDATE mysql.analyze_jobs SET %s WHERE id = %%?", strings.Join(setClauses, ", ")) + args = append(args, *job.ID) _, _, err := statsutil.ExecRows(sctx, sql, args...) if err != nil { diff --git a/pkg/util/sqlkiller/BUILD.bazel b/pkg/util/sqlkiller/BUILD.bazel index 965f8152243fd..5a3a76d5a558e 100644 --- a/pkg/util/sqlkiller/BUILD.bazel +++ b/pkg/util/sqlkiller/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "//pkg/util/dbterror/exeerrors", "//pkg/util/intest", "//pkg/util/logutil", - "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@org_uber_go_zap//:zap", ], From e4f2a12b93cdaee8baaa23743ff7275e38c4a3c6 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Fri, 30 Jan 2026 15:10:20 +0800 Subject: [PATCH 21/23] executor: cancel analyze kill ctx on stop --- pkg/executor/analyze.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 6f3a1a6d82825..88808a68e73d4 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -560,6 +560,7 @@ func (e *AnalyzeExec) buildAnalyzeKillCtx(parent context.Context) (context.Conte } }() return ctx, func() { + cancel(context.Canceled) close(stopCh) } } From a7247c6c31094764183a4aeff4cf4562a775e5f5 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Fri, 30 Jan 2026 15:19:31 +0800 Subject: [PATCH 22/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 88808a68e73d4..8f11e84a02f45 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -156,9 +156,11 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) { } }) TASKLOOP: + sentTasks := 0 for _, task := range tasks { select { case taskCh <- task: + sentTasks++ case <-e.errExitCh: break TASKLOOP case <-gctx.Done(): @@ -181,6 +183,12 @@ TASKLOOP: err = cause } } + for task := range taskCh { + finishJobWithLog(statsHandle, task.job, err) + } + for i := sentTasks; i < len(tasks); i++ { + finishJobWithLog(statsHandle, tasks[i].job, err) + } return err } From f250aad1ba682e311d0c230bd43a05e81198ddfa Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Fri, 30 Jan 2026 15:33:17 +0800 Subject: [PATCH 23/23] update Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 8f11e84a02f45..473ac6d1d8487 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -155,8 +155,8 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) { dom.SysProcTracker().KillSysProcess(id) } }) -TASKLOOP: sentTasks := 0 +TASKLOOP: for _, task := range tasks { select { case taskCh <- task: