Skip to content

Commit 1d3c783

Browse files
committed
refactor cutover for MySQL 8.x rename feature && support OceanBase
1 parent 0263a20 commit 1d3c783

File tree

6 files changed

+173
-30
lines changed

6 files changed

+173
-30
lines changed

go/base/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type MigrationContext struct {
103103
GoogleCloudPlatform bool
104104
AzureMySQL bool
105105
AttemptInstantDDL bool
106+
OceanBase bool
106107

107108
// SkipPortValidation allows skipping the port validation in `ValidateConnection`
108109
// This is useful when connecting to a MySQL instance where the external port

go/base/utils.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package base
77

88
import (
9+
"errors"
910
"fmt"
1011
"os"
1112
"regexp"
@@ -62,6 +63,10 @@ func StringContainsAll(s string, substrings ...string) bool {
6263
}
6364

6465
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
66+
if err := validateOceanBaseConnection(db, migrationContext); err != nil {
67+
return "", err
68+
}
69+
6570
versionQuery := `select @@global.version`
6671

6772
var version string
@@ -84,7 +89,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
8489
// GCP set users port to "NULL", replace it by gh-ost param
8590
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
8691
var port int
87-
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
92+
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBase {
8893
port = connectionConfig.Key.Port
8994
} else {
9095
portQuery := `select @@global.port`
@@ -102,3 +107,27 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
102107
return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort)
103108
}
104109
}
110+
111+
func validateOceanBaseConnection(db *gosql.DB, migrationContext *MigrationContext) error {
112+
versionCommentQuery := `select @@global.version_comment`
113+
var versionComment string
114+
if err := db.QueryRow(versionCommentQuery).Scan(&versionComment); err != nil {
115+
return nil
116+
}
117+
if !strings.Contains(versionComment, "OceanBase") {
118+
return nil
119+
}
120+
121+
migrationContext.Log.Infof("OceanBase connection identified, version_comment: %v", versionComment)
122+
migrationContext.OceanBase = true
123+
124+
enableLockPriorityQuery := `select value from oceanbase.GV$OB_PARAMETERS where name='enable_lock_priority'`
125+
var enableLockPriority bool
126+
if err := db.QueryRow(enableLockPriorityQuery).Scan(&enableLockPriority); err != nil {
127+
return err
128+
}
129+
if !enableLockPriority {
130+
return errors.New("system parameter 'enable_lock_priority' should be true to support cut-over")
131+
}
132+
return nil
133+
}

go/logic/applier.go

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (this *Applier) InitDBConnections() (err error) {
100100
if err := this.validateAndReadGlobalVariables(); err != nil {
101101
return err
102102
}
103-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
103+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
104104
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
105105
return err
106106
} else {
@@ -773,24 +773,28 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
773773
return chunkSize, rowsAffected, duration, nil
774774
}
775775

776-
// LockOriginalTable places a write lock on the original table
777-
func (this *Applier) LockOriginalTable() error {
778-
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
779-
sql.EscapeName(this.migrationContext.DatabaseName),
780-
sql.EscapeName(this.migrationContext.OriginalTableName),
781-
)
782-
this.migrationContext.Log.Infof("Locking %s.%s",
783-
sql.EscapeName(this.migrationContext.DatabaseName),
784-
sql.EscapeName(this.migrationContext.OriginalTableName),
785-
)
776+
// lockTable places a write lock on the specific table
777+
func (this *Applier) lockTable(databaseName, tableName string) error {
778+
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, databaseName, tableName)
779+
this.migrationContext.Log.Infof("Locking %s.%s", databaseName, tableName)
786780
this.migrationContext.LockTablesStartTime = time.Now()
787781
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
788782
return err
789783
}
790-
this.migrationContext.Log.Infof("Table locked")
784+
this.migrationContext.Log.Infof("Table %s.%s locked", databaseName, tableName)
791785
return nil
792786
}
793787

788+
// LockOriginalTable places a write lock on the original table
789+
func (this *Applier) LockOriginalTable() error {
790+
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
791+
}
792+
793+
// LockGhostTable places a write lock on the ghost table
794+
func (this *Applier) LockGhostTable() error {
795+
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName())
796+
}
797+
794798
// UnlockTables makes tea. No wait, it unlocks tables.
795799
func (this *Applier) UnlockTables() error {
796800
query := `unlock /* gh-ost */ tables`
@@ -1096,7 +1100,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
10961100

10971101
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
10981102
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
1099-
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
1103+
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds)
11001104
if _, err := tx.Exec(query); err != nil {
11011105
tableLocked <- err
11021106
return err
@@ -1171,25 +1175,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
11711175
return nil
11721176
}
11731177

1174-
// AtomicCutoverRename
1175-
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1176-
tx, err := this.db.Begin()
1178+
func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error {
1179+
tx, err := db.Begin()
11771180
if err != nil {
11781181
return err
11791182
}
11801183
defer func() {
11811184
tx.Rollback()
1182-
sessionIdChan <- -1
1183-
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1185+
if sessionIdChan != nil {
1186+
sessionIdChan <- -1
1187+
}
1188+
if tablesRenamed != nil {
1189+
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1190+
}
11841191
}()
1185-
var sessionId int64
1186-
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1187-
return err
1192+
1193+
if sessionIdChan != nil {
1194+
var sessionId int64
1195+
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1196+
return err
1197+
}
1198+
sessionIdChan <- sessionId
11881199
}
1189-
sessionIdChan <- sessionId
11901200

11911201
this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
1192-
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
1202+
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
11931203
if _, err := tx.Exec(query); err != nil {
11941204
return err
11951205
}
@@ -1206,14 +1216,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
12061216
)
12071217
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
12081218
if _, err := tx.Exec(query); err != nil {
1209-
tablesRenamed <- err
1219+
if tablesRenamed != nil {
1220+
tablesRenamed <- err
1221+
}
12101222
return this.migrationContext.Log.Errore(err)
12111223
}
1212-
tablesRenamed <- nil
1224+
if tablesRenamed != nil {
1225+
tablesRenamed <- nil
1226+
}
12131227
this.migrationContext.Log.Infof("Tables renamed")
12141228
return nil
12151229
}
12161230

1231+
// AtomicCutoverRename renames tables for atomic cut over in non lock session
1232+
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1233+
return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed)
1234+
}
1235+
1236+
// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session
1237+
func (this *Applier) AtomicCutoverRenameWithLock() error {
1238+
return this.atomicCutoverRename(this.singletonDB, nil, nil)
1239+
}
1240+
12171241
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
12181242
query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName)
12191243
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

go/logic/inspect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (this *Inspector) InitDBConnections() (err error) {
6060
}
6161
this.dbVersion = this.migrationContext.InspectorMySQLVersion
6262

63-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
63+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
6464
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
6565
return err
6666
} else {

go/logic/migrator.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,11 @@ func (this *Migrator) canStopStreaming() bool {
202202

203203
// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
204204
func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
205+
if dmlEvent.NewColumnValues == nil {
206+
// in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is
207+
// converted to a DELETE event and an INSERT event, we need to skip the DELETE event.
208+
return nil
209+
}
205210
// Hey, I created the changelog table, I know the type of columns it has!
206211
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
207212
case "state":
@@ -564,9 +569,15 @@ func (this *Migrator) cutOver() (err error) {
564569

565570
switch this.migrationContext.CutOverType {
566571
case base.CutOverAtomic:
567-
// Atomic solution: we use low timeout and multiple attempts. But for
568-
// each failed attempt, we throttle until replication lag is back to normal
569-
err = this.atomicCutOver()
572+
if this.migrationContext.OceanBase || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
573+
// Atomic solution for latest MySQL: cut over the tables in the same session where the origin
574+
// table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions
575+
err = this.atomicCutOverMySQL8()
576+
} else {
577+
// Atomic solution: we use low timeout and multiple attempts. But for
578+
// each failed attempt, we throttle until replication lag is back to normal
579+
err = this.atomicCutOver()
580+
}
570581
case base.CutOverTwoStep:
571582
err = this.cutOverTwoStep()
572583
default:
@@ -651,6 +662,39 @@ func (this *Migrator) cutOverTwoStep() (err error) {
651662
return nil
652663
}
653664

665+
// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute
666+
// what's left of last DML entries, and atomically swap original->old, then new->original.
667+
// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is
668+
// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html.
669+
func (this *Migrator) atomicCutOverMySQL8() (err error) {
670+
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
671+
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
672+
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
673+
674+
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
675+
return err
676+
}
677+
678+
if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
679+
return err
680+
}
681+
if err := this.retryOperation(this.applier.LockGhostTable); err != nil {
682+
return err
683+
}
684+
685+
if err := this.applier.AtomicCutoverRenameWithLock(); err != nil {
686+
return err
687+
}
688+
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
689+
return err
690+
}
691+
692+
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
693+
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
694+
this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
695+
return nil
696+
}
697+
654698
// atomicCutOver
655699
func (this *Migrator) atomicCutOver() (err error) {
656700
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)

go/mysql/utils.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mysql
88
import (
99
gosql "database/sql"
1010
"fmt"
11+
"strconv"
1112
"strings"
1213
"sync"
1314
"time"
@@ -252,3 +253,47 @@ func GetTriggers(db *gosql.DB, databaseName, tableName string) (triggers []Trigg
252253
}
253254
return triggers, nil
254255
}
256+
257+
func versionTokens(version string, digits int) []int {
258+
v := strings.Split(version, "-")[0]
259+
tokens := strings.Split(v, ".")
260+
intTokens := make([]int, digits)
261+
for i := range tokens {
262+
if i >= digits {
263+
break
264+
}
265+
intTokens[i], _ = strconv.Atoi(tokens[i])
266+
}
267+
return intTokens
268+
}
269+
270+
func isSmallerVersion(version string, otherVersion string, digits int) bool {
271+
v := versionTokens(version, digits)
272+
o := versionTokens(otherVersion, digits)
273+
for i := 0; i < len(v); i++ {
274+
if v[i] < o[i] {
275+
return true
276+
}
277+
if v[i] > o[i] {
278+
return false
279+
}
280+
if i == digits {
281+
break
282+
}
283+
}
284+
return false
285+
}
286+
287+
// IsSmallerMajorVersion tests two versions against another and returns true if
288+
// the former is a smaller "major" version than the latter.
289+
// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9
290+
func IsSmallerMajorVersion(version string, otherVersion string) bool {
291+
return isSmallerVersion(version, otherVersion, 2)
292+
}
293+
294+
// IsSmallerMinorVersion tests two versions against another and returns true if
295+
// the former is a smaller "minor" version than the latter.
296+
// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7
297+
func IsSmallerMinorVersion(version string, otherVersion string) bool {
298+
return isSmallerVersion(version, otherVersion, 3)
299+
}

0 commit comments

Comments
 (0)