Skip to content

Commit 0bb5c5d

Browse files
author
xiehaopeng
committed
feat(ignore-binlog-events): dynamically expand MigrationRangeMaxValues
1 parent a27e432 commit 0bb5c5d

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed

go/base/context.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ type MigrationContext struct {
204204
TotalDMLEventsIgnored int64
205205
DMLBatchSize int64
206206
IgnoreOverIterationRangeMaxBinlog bool
207+
IsMigrationRangeMaxValuesLocked bool
208+
MigrationRangeMaxValuesInitial *sql.ColumnValues
207209
isThrottled bool
208210
throttleReason string
209211
throttleReasonHint ThrottleReasonHint

go/logic/applier.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,11 @@ func (this *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueK
568568
return err
569569
}
570570
}
571+
// Save a snapshot copy of the initial MigrationRangeMaxValues
572+
abstractValues := make([]interface{}, len(this.migrationContext.MigrationRangeMaxValues.AbstractValues()))
573+
copy(abstractValues, this.migrationContext.MigrationRangeMaxValues.AbstractValues())
574+
this.migrationContext.MigrationRangeMaxValuesInitial = sql.ToColumnValues(abstractValues)
575+
571576
this.migrationContext.Log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
572577

573578
return rows.Err()
@@ -611,6 +616,63 @@ func (this *Applier) ReadMigrationRangeValues() error {
611616
return tx.Commit()
612617
}
613618

619+
// ResetMigrationRangeMaxValues updates the MigrationRangeMaxValues with new values
620+
func (this *Applier) ResetMigrationRangeMaxValues(uniqueKeyAbstractValues []interface{}) {
621+
abstractValues := make([]interface{}, len(uniqueKeyAbstractValues))
622+
copy(abstractValues, uniqueKeyAbstractValues)
623+
this.migrationContext.MigrationRangeMaxValues = sql.ToColumnValues(abstractValues)
624+
this.migrationContext.Log.Debugf("Reset migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
625+
}
626+
627+
// LockMigrationRangeMaxValues locks the MigrationRangeMaxValues to prevent further updates
628+
func (this *Applier) LockMigrationRangeMaxValues() {
629+
if this.migrationContext.IsMigrationRangeMaxValuesLocked {
630+
return
631+
}
632+
this.migrationContext.IsMigrationRangeMaxValuesLocked = true
633+
this.migrationContext.Log.Infof("Lock migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
634+
}
635+
636+
// AttemptToLockMigrationRangeMaxValues attempts to lock MigrationRangeMaxValues to prevent endless copying.
637+
// To avoid infinite updates of MigrationRangeMaxValues causing the copy to never end,
638+
// we need a strategy to stop updates. When the initial copy target is achieved,
639+
// MigrationRangeMaxValues will be locked.
640+
func (this *Applier) AttemptToLockMigrationRangeMaxValues() {
641+
if this.migrationContext.IsMigrationRangeMaxValuesLocked {
642+
return
643+
}
644+
645+
// Currently only supports single-column unique index of int type
646+
uniqueKeyCols := this.migrationContext.UniqueKey.Columns.Columns()
647+
if len(uniqueKeyCols) != 1 {
648+
this.LockMigrationRangeMaxValues()
649+
return
650+
}
651+
uniqueKeyCol := uniqueKeyCols[0]
652+
if uniqueKeyCol.CompareValueFunc == nil {
653+
this.LockMigrationRangeMaxValues()
654+
return
655+
}
656+
657+
// Compare MigrationIterationRangeMinValues with MigrationRangeMaxValuesInitial to determine copy progress
658+
if this.migrationContext.MigrationIterationRangeMinValues == nil {
659+
return
660+
}
661+
than, err := uniqueKeyCol.CompareValueFunc(
662+
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues()[0],
663+
this.migrationContext.MigrationRangeMaxValuesInitial.AbstractValues()[0],
664+
)
665+
if err != nil {
666+
// If comparison fails, fallback to locking MigrationRangeMaxValues
667+
this.migrationContext.Log.Errore(err)
668+
this.LockMigrationRangeMaxValues()
669+
return
670+
}
671+
if than >= 0 {
672+
this.LockMigrationRangeMaxValues()
673+
}
674+
}
675+
614676
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
615677
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
616678
// no further chunk to work through, i.e. we're past the last chunk and are done with
@@ -620,6 +682,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
620682
if this.migrationContext.MigrationIterationRangeMinValues == nil {
621683
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
622684
}
685+
this.AttemptToLockMigrationRangeMaxValues()
623686
for i := 0; i < 2; i++ {
624687
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
625688
if i == 1 {
@@ -661,6 +724,8 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
661724
}
662725
}
663726
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
727+
// Ensure MigrationRangeMaxValues is locked after iteration is complete
728+
this.LockMigrationRangeMaxValues()
664729
return hasFurtherRange, nil
665730
}
666731

@@ -1282,7 +1347,14 @@ func (this *Applier) IsIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{})
12821347
case than < 0:
12831348
return true, nil
12841349
case than > 0:
1285-
return false, nil
1350+
// When the value is greater than MigrationRangeMaxValues boundary, attempt to dynamically expand MigrationRangeMaxValues
1351+
// After expand, treat this comparison as equal, otherwise it cannot be ignored
1352+
if !this.migrationContext.IsMigrationRangeMaxValuesLocked {
1353+
this.ResetMigrationRangeMaxValues(uniqueKeyArgs)
1354+
return true, nil
1355+
} else {
1356+
return false, nil
1357+
}
12861358
default:
12871359
// Since rowcopy is left-open-right-closed, when it is equal to the MigrationRangeMaxValues boundary value, it can be ignored.
12881360
return true, nil

0 commit comments

Comments
 (0)