Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func SelectWithRuntimeStats(ctx context.Context, dctx *distsqlctx.DistSQLContext
func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars any,
isRestrict bool, dctx *distsqlctx.DistSQLContext) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, dctx.KvExecCounter)
failpoint.Inject("mockAnalyzeRequestWaitForCancel", func(val failpoint.Value) {
if val.(bool) {
<-ctx.Done()
failpoint.Return(nil, ctx.Err())
}
})
kvReq.RequestSource.RequestSourceInternal = true
kvReq.RequestSource.RequestSourceType = kv.InternalTxnStats
resp := client.Send(ctx, kvReq, vars, &kv.ClientSendOption{})
Expand Down
135 changes: 121 additions & 14 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -101,6 +102,8 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) {
statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
infoSchema := sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema()
sessionVars := e.Ctx().GetSessionVars()
ctx, stop := e.buildAnalyzeKillCtx(ctx)
defer stop()

// Filter the locked tables.
tasks, needAnalyzeTableCnt, skippedTables, err := filterAndCollectTasks(e.tasks, statsHandle, infoSchema)
Expand Down Expand Up @@ -132,7 +135,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) {
taskCh := make(chan *analyzeTask, buildStatsConcurrency)
resultsCh := make(chan *statistics.AnalyzeResults, 1)
for range buildStatsConcurrency {
e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) })
e.wg.Run(func() { e.analyzeWorker(ctx, taskCh, resultsCh) })
}
pruneMode := variable.PartitionPruneMode(sessionVars.PartitionPruneMode.Load())
// needGlobalStats used to indicate whether we should merge the partition-level stats to global-level stats.
Expand All @@ -152,10 +155,12 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) {
dom.SysProcTracker().KillSysProcess(id)
}
})
sentTasks := 0
TASKLOOP:
for _, task := range tasks {
select {
case taskCh <- task:
sentTasks++
case <-e.errExitCh:
break TASKLOOP
case <-gctx.Done():
Expand All @@ -173,6 +178,17 @@ TASKLOOP:

err = e.waitFinish(ctx, g, resultsCh)
if err != nil {
if stderrors.Is(err, context.Canceled) {
if cause := context.Cause(ctx); cause != nil {
err = cause
}
}
for task := range taskCh {
finishJobWithLog(statsHandle, task.job, err)
}
for i := sentTasks; i < len(tasks); i++ {
finishJobWithLog(statsHandle, tasks[i].job, err)
}
return err
}

Expand Down Expand Up @@ -469,6 +485,28 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
break
}
if results.Err != nil {
if intest.InTest && stderrors.Is(results.Err, context.Canceled) {
jobInfo := ""
dbName := ""
tableName := ""
partitionName := ""
if results.Job != nil {
jobInfo = results.Job.JobInfo
dbName = results.Job.DBName
tableName = results.Job.TableName
partitionName = results.Job.PartitionName
}
statslogutil.StatsLogger().Info("analyze result canceled",
zap.Uint32("killSignal", e.Ctx().GetSessionVars().SQLKiller.GetKillSignal()),
zap.Uint64("connID", e.Ctx().GetSessionVars().ConnectionID),
zap.String("jobInfo", jobInfo),
zap.String("dbName", dbName),
zap.String("tableName", tableName),
zap.String("partitionName", partitionName),
zap.Error(results.Err),
zap.Stack("stack"),
)
}
err = results.Err
if isAnalyzeWorkerPanic(err) {
panicCnt++
Expand Down Expand Up @@ -503,7 +541,77 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
return err
}

func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
func (e *AnalyzeExec) buildAnalyzeKillCtx(parent context.Context) (context.Context, func()) {
ctx, cancel := context.WithCancelCause(parent)
killer := &e.Ctx().GetSessionVars().SQLKiller
killCh := killer.GetKillEventChan()
stopCh := make(chan struct{})
go func() {
for {
select {
case <-ctx.Done():
return
case <-stopCh:
return
case <-killCh:
status := killer.GetKillSignal()
if status == 0 {
return
}
err := killer.HandleSignal()
if err == nil {
err = exeerrors.ErrQueryInterrupted
}
cancel(err)
return
}
}
}()
return ctx, func() {
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildAnalyzeKillCtx creates a context.WithCancelCause, but the returned stop() only closes stopCh and never calls the cancel func when the analyze finishes normally. This can keep resources associated with the derived context alive until the parent context is done. Consider having stop() call cancel(...) as well (with an appropriate cause) to ensure timely cleanup.

Suggested change
return ctx, func() {
return ctx, func() {
cancel(context.Canceled)

Copilot uses AI. Check for mistakes.
cancel(context.Canceled)
close(stopCh)
}
}

func analyzeWorkerExitErr(ctx context.Context, errExitCh <-chan struct{}) error {
select {
case <-ctx.Done():
if err := context.Cause(ctx); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return exeerrors.ErrQueryInterrupted
case <-errExitCh:
return exeerrors.ErrQueryInterrupted
default:
return nil
}
}

func (e *AnalyzeExec) sendAnalyzeResult(ctx context.Context, statsHandle *handle.Handle, resultsCh chan<- *statistics.AnalyzeResults, result *statistics.AnalyzeResults) {
select {
case resultsCh <- result:
return
case <-ctx.Done():
case <-e.errExitCh:
}
err := result.Err
if err == nil {
err = context.Cause(ctx)
}
if err == nil {
err = ctx.Err()
}
if err == nil {
err = exeerrors.ErrQueryInterrupted
}
finishJobWithLog(statsHandle, result.Job, err)
}

// ctx must be from AnalyzeExec.buildAnalyzeKillCtx
func (e *AnalyzeExec) analyzeWorker(ctx context.Context, taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
var task *analyzeTask
statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
defer func() {
Expand All @@ -526,23 +634,22 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
var ok bool
task, ok = <-taskCh
if !ok {
break
return
}
if err := analyzeWorkerExitErr(ctx, e.errExitCh); err != nil {
finishJobWithLog(statsHandle, task.job, err)
return
}
failpoint.Inject("handleAnalyzeWorkerPanic", nil)
statsHandle.StartAnalyzeJob(task.job)
switch task.taskType {
case colTask:
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeColumnsPushDownEntry(e.gp, task.colExec):
}
statsHandle.StartAnalyzeJob(task.job)
result := analyzeColumnsPushDownEntry(ctx, e.gp, task.colExec)
e.sendAnalyzeResult(ctx, statsHandle, resultsCh, result)
case idxTask:
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeIndexPushdown(task.idxExec):
}
statsHandle.StartAnalyzeJob(task.job)
result := analyzeIndexPushdown(ctx, task.idxExec)
e.sendAnalyzeResult(ctx, statsHandle, resultsCh, result)
}
}
}
Expand Down
62 changes: 47 additions & 15 deletions pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"context"
stderrors "errors"
"fmt"
"math"
"strings"
Expand All @@ -32,14 +33,17 @@ import (
"github.com/pingcap/tidb/pkg/planner/core"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/statistics"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
)

// AnalyzeColumnsExec represents Analyze columns push down executor.
Expand All @@ -64,11 +68,31 @@ type AnalyzeColumnsExec struct {
memTracker *memory.Tracker
}

func analyzeColumnsPushDownEntry(gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults {
func analyzeColumnsPushDownEntry(ctx context.Context, gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults {
if e.AnalyzeInfo.StatsVersion >= statistics.Version2 {
return e.toV2().analyzeColumnsPushDownV2(gp)
res := e.toV2().analyzeColumnsPushDownV2(ctx, gp)
e.logAnalyzeCanceledInTest(ctx, res.Err, "analyze columns canceled")
return res
}
return e.toV1().analyzeColumnsPushDownV1()
res := e.toV1().analyzeColumnsPushDownV1(ctx)
e.logAnalyzeCanceledInTest(ctx, res.Err, "analyze columns canceled")
return res
}

func (e *AnalyzeColumnsExec) logAnalyzeCanceledInTest(ctx context.Context, err error, msg string) {
if !intest.InTest || err == nil || !stderrors.Is(err, context.Canceled) {
return
}
cause := context.Cause(ctx)
ctxErr := ctx.Err()
statslogutil.StatsLogger().Info(msg,
zap.Uint32("killSignal", e.ctx.GetSessionVars().SQLKiller.GetKillSignal()),
zap.Uint64("connID", e.ctx.GetSessionVars().ConnectionID),
zap.Error(err),
zap.Error(cause),
zap.Error(ctxErr),
zap.Stack("stack"),
)
}

func (e *AnalyzeColumnsExec) toV1() *AnalyzeColumnsExecV1 {
Expand All @@ -83,12 +107,12 @@ func (e *AnalyzeColumnsExec) toV2() *AnalyzeColumnsExecV2 {
}
}

func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
func (e *AnalyzeColumnsExec) open(ctx context.Context, ranges []*ranger.Range) error {
e.memTracker = memory.NewTracker(int(e.ctx.GetSessionVars().PlanID.Load()), -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.resultHandler = &tableResultHandler{}
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(ranges, true, false, !hasPkHist(e.handleCols))
firstResult, err := e.buildResp(firstPartRanges)
firstResult, err := e.buildResp(ctx, firstPartRanges)
if err != nil {
return err
}
Expand All @@ -97,7 +121,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
return nil
}
var secondResult distsql.SelectResult
secondResult, err = e.buildResp(secondPartRanges)
secondResult, err = e.buildResp(ctx, secondPartRanges)
if err != nil {
return err
}
Expand All @@ -106,7 +130,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
return nil
}

func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) {
func (e *AnalyzeColumnsExec) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetDistSQLCtx(), []int64{e.TableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges)
builder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
Expand All @@ -130,16 +154,16 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
if err != nil {
return nil, err
}
ctx := context.TODO()
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetDistSQLCtx())
if err != nil {
e.logAnalyzeCanceledInTest(ctx, err, "analyze columns distsql canceled")
return nil, err
}
return result, nil
}

func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats bool) (hists []*statistics.Histogram, cms []*statistics.CMSketch, topNs []*statistics.TopN, fms []*statistics.FMSketch, extStats *statistics.ExtendedStatsColl, err error) {
if err = e.open(ranges); err != nil {
func (e *AnalyzeColumnsExec) buildStats(ctx context.Context, ranges []*ranger.Range, needExtStats bool) (hists []*statistics.Histogram, cms []*statistics.CMSketch, topNs []*statistics.TopN, fms []*statistics.FMSketch, extStats *statistics.ExtendedStatsColl, err error) {
if err = e.open(ctx, ranges); err != nil {
return nil, nil, nil, nil, nil, err
}
defer func() {
Expand Down Expand Up @@ -186,11 +210,19 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
return nil, nil, nil, nil, nil, err
}
failpoint.Inject("mockSlowAnalyzeV1", func() {
time.Sleep(1000 * time.Second)
select {
case <-ctx.Done():
err := context.Cause(ctx)
if err == nil {
err = ctx.Err()
}
failpoint.Return(nil, nil, nil, nil, nil, err)
case <-time.After(1000 * time.Second):
}
})
data, err1 := e.resultHandler.nextRaw(context.TODO())
data, err1 := e.resultHandler.nextRaw(ctx)
if err1 != nil {
return nil, nil, nil, nil, nil, err1
return nil, nil, nil, nil, nil, normalizeCtxErrWithCause(ctx, err1)
}
if data == nil {
break
Expand Down Expand Up @@ -310,7 +342,7 @@ type AnalyzeColumnsExecV1 struct {
*AnalyzeColumnsExec
}

func (e *AnalyzeColumnsExecV1) analyzeColumnsPushDownV1() *statistics.AnalyzeResults {
func (e *AnalyzeColumnsExecV1) analyzeColumnsPushDownV1(ctx context.Context) *statistics.AnalyzeResults {
var ranges []*ranger.Range
if hc := e.handleCols; hc != nil {
if hc.IsInt() {
Expand All @@ -322,7 +354,7 @@ func (e *AnalyzeColumnsExecV1) analyzeColumnsPushDownV1() *statistics.AnalyzeRes
ranges = ranger.FullIntRange(false)
}
collExtStats := e.ctx.GetSessionVars().EnableExtendedStats
hists, cms, topNs, fms, extStats, err := e.buildStats(ranges, collExtStats)
hists, cms, topNs, fms, extStats, err := e.buildStats(ctx, ranges, collExtStats)
if err != nil {
return &statistics.AnalyzeResults{Err: err, Job: e.job}
}
Expand Down
Loading