Skip to content

Commit f8d881d

Browse files
author
xiehaopeng
committed
feat(ignore-binlog-events): dynamically expand MigrationRangeMaxValues
1 parent a27e432 commit f8d881d

File tree

2 files changed

+79
-2
lines changed

2 files changed

+79
-2
lines changed

go/base/context.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ type MigrationContext struct {
204204
TotalDMLEventsIgnored int64
205205
DMLBatchSize int64
206206
IgnoreOverIterationRangeMaxBinlog bool
207+
IsMigrationRangeMaxValuesLocked bool
208+
MigrationRangeMaxValuesInitial *sql.ColumnValues
207209
isThrottled bool
208210
throttleReason string
209211
throttleReasonHint ThrottleReasonHint

go/logic/applier.go

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,15 @@ func (this *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueK
568568
return err
569569
}
570570
}
571-
this.migrationContext.Log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
571+
572+
// Save a snapshot copy of the initial MigrationRangeMaxValues
573+
if this.migrationContext.MigrationRangeMaxValues == nil {
574+
this.migrationContext.MigrationRangeMaxValuesInitial = nil
575+
} else {
576+
abstractValues := make([]interface{}, len(this.migrationContext.MigrationRangeMaxValues.AbstractValues()))
577+
copy(abstractValues, this.migrationContext.MigrationRangeMaxValues.AbstractValues())
578+
this.migrationContext.MigrationRangeMaxValuesInitial = sql.ToColumnValues(abstractValues)
579+
}
572580

573581
return rows.Err()
574582
}
@@ -611,6 +619,63 @@ func (this *Applier) ReadMigrationRangeValues() error {
611619
return tx.Commit()
612620
}
613621

622+
// ResetMigrationRangeMaxValues updates the MigrationRangeMaxValues with new values
623+
func (this *Applier) ResetMigrationRangeMaxValues(uniqueKeyAbstractValues []interface{}) {
624+
abstractValues := make([]interface{}, len(uniqueKeyAbstractValues))
625+
copy(abstractValues, uniqueKeyAbstractValues)
626+
this.migrationContext.MigrationRangeMaxValues = sql.ToColumnValues(abstractValues)
627+
this.migrationContext.Log.Debugf("Reset migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
628+
}
629+
630+
// LockMigrationRangeMaxValues locks the MigrationRangeMaxValues to prevent further updates
631+
func (this *Applier) LockMigrationRangeMaxValues() {
632+
if this.migrationContext.IsMigrationRangeMaxValuesLocked {
633+
return
634+
}
635+
this.migrationContext.IsMigrationRangeMaxValuesLocked = true
636+
this.migrationContext.Log.Infof("Lock migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
637+
}
638+
639+
// AttemptToLockMigrationRangeMaxValues attempts to lock MigrationRangeMaxValues to prevent endless copying.
640+
// To avoid infinite updates of MigrationRangeMaxValues causing the copy to never end,
641+
// we need a strategy to stop updates. When the initial copy target is achieved,
642+
// MigrationRangeMaxValues will be locked.
643+
func (this *Applier) AttemptToLockMigrationRangeMaxValues() {
644+
if this.migrationContext.IsMigrationRangeMaxValuesLocked {
645+
return
646+
}
647+
648+
// Currently only supports single-column unique index of int type
649+
uniqueKeyCols := this.migrationContext.UniqueKey.Columns.Columns()
650+
if len(uniqueKeyCols) != 1 {
651+
this.LockMigrationRangeMaxValues()
652+
return
653+
}
654+
uniqueKeyCol := uniqueKeyCols[0]
655+
if uniqueKeyCol.CompareValueFunc == nil {
656+
this.LockMigrationRangeMaxValues()
657+
return
658+
}
659+
660+
// Compare MigrationIterationRangeMinValues with MigrationRangeMaxValuesInitial to determine copy progress
661+
if this.migrationContext.MigrationIterationRangeMinValues == nil {
662+
return
663+
}
664+
than, err := uniqueKeyCol.CompareValueFunc(
665+
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues()[0],
666+
this.migrationContext.MigrationRangeMaxValuesInitial.AbstractValues()[0],
667+
)
668+
if err != nil {
669+
// If comparison fails, fallback to locking MigrationRangeMaxValues
670+
this.migrationContext.Log.Errore(err)
671+
this.LockMigrationRangeMaxValues()
672+
return
673+
}
674+
if than >= 0 {
675+
this.LockMigrationRangeMaxValues()
676+
}
677+
}
678+
614679
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
615680
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
616681
// no further chunk to work through, i.e. we're past the last chunk and are done with
@@ -620,6 +685,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
620685
if this.migrationContext.MigrationIterationRangeMinValues == nil {
621686
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
622687
}
688+
this.AttemptToLockMigrationRangeMaxValues()
623689
for i := 0; i < 2; i++ {
624690
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
625691
if i == 1 {
@@ -661,6 +727,8 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
661727
}
662728
}
663729
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
730+
// Ensure MigrationRangeMaxValues is locked after iteration is complete
731+
this.LockMigrationRangeMaxValues()
664732
return hasFurtherRange, nil
665733
}
666734

@@ -1282,7 +1350,14 @@ func (this *Applier) IsIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{})
12821350
case than < 0:
12831351
return true, nil
12841352
case than > 0:
1285-
return false, nil
1353+
// When the value is greater than MigrationRangeMaxValues boundary, attempt to dynamically expand MigrationRangeMaxValues
1354+
// After expand, treat this comparison as equal, otherwise it cannot be ignored
1355+
if !this.migrationContext.IsMigrationRangeMaxValuesLocked {
1356+
this.ResetMigrationRangeMaxValues(uniqueKeyArgs)
1357+
return true, nil
1358+
} else {
1359+
return false, nil
1360+
}
12861361
default:
12871362
// Since rowcopy is left-open-right-closed, when it is equal to the MigrationRangeMaxValues boundary value, it can be ignored.
12881363
return true, nil

0 commit comments

Comments
 (0)