-
Notifications
You must be signed in to change notification settings - Fork 1k
support download flushed binlog and parse event for cloud comput… #741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 4 commits
f558bf9
226e137
5c3feac
e7f35db
17fa910
b0193e0
3f517c8
d3e25a8
9f98eda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package canal | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/go-mysql-org/go-mysql/mysql" | ||
"github.com/go-mysql-org/go-mysql/replication" | ||
"github.com/pingcap/errors" | ||
) | ||
|
||
// BinlogFileDownload download the binlog file from cloud computing platform (etc. aliyun) | ||
type BinlogFileDownload func(mysql.Position) (localBinFilePath string, err error) | ||
|
||
// WithLocalBinlogDownload registers the local bin file download, | ||
// that allows download the flushed binlog file to local (etc. aliyun) | ||
func (c *Canal) WithLocalBinlogDownload(d BinlogFileDownload) { | ||
c.binFileDownload = d | ||
} | ||
|
||
func (c *Canal) adaptLocalBinFileStreamer(syncMasterStreamer *replication.BinlogStreamer, err error) (*LocalBinFileAdapterStreamer, error) { | ||
lance6716 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return &LocalBinFileAdapterStreamer{ | ||
BinlogStreamer: syncMasterStreamer, | ||
syncMasterStreamer: syncMasterStreamer, | ||
canal: c, | ||
binFileDownload: c.binFileDownload, | ||
}, err | ||
} | ||
|
||
// LocalBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform | ||
type LocalBinFileAdapterStreamer struct { | ||
lance6716 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
*replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer | ||
syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from startSyncer | ||
canal *Canal | ||
binFileDownload BinlogFileDownload | ||
} | ||
|
||
// GetEvent will auto switch the running streamer and return replication.BinlogEvent | ||
lance6716 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
func (s *LocalBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { | ||
if s.binFileDownload == nil { // not support to use local bin file | ||
return s.BinlogStreamer.GetEvent(ctx) | ||
} | ||
|
||
ev, err := s.BinlogStreamer.GetEvent(ctx) | ||
|
||
if err == nil { | ||
switch ev.Event.(type) { | ||
case *replication.RotateEvent: // RotateEvent means need to change steamer back to sync master to retry sync | ||
s.BinlogStreamer = s.syncMasterStreamer | ||
} | ||
return ev, err | ||
} | ||
|
||
if err == replication.ErrNeedSyncAgain { // restart master if last sync master syncer has error | ||
s.canal.syncer.Close() | ||
_ = s.canal.prepareSyncer() | ||
|
||
newStreamer, startErr := s.canal.startSyncer() | ||
if startErr == nil { | ||
ev, err = newStreamer.GetEvent(ctx) | ||
} | ||
// set all streamer to the new sync master streamer | ||
s.BinlogStreamer = newStreamer | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.syncMasterStreamer = newStreamer | ||
} | ||
|
||
if mysqlErr, ok := err.(*mysql.MyError); ok { | ||
lance6716 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
// change to local binlog file streamer to adapter the steamer | ||
if mysqlErr.Code == mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG && | ||
|
||
mysqlErr.Message == "Could not find first log file name in binary log index file" { | ||
gset := s.canal.master.GTIDSet() | ||
if gset == nil || gset.String() == "" { // currently only support xid mode | ||
lance6716 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") | ||
pos := s.canal.master.Position() | ||
newStreamer := newLocalBinFileStreamer(s.binFileDownload, pos) | ||
|
||
s.syncMasterStreamer = s.BinlogStreamer | ||
lance6716 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
s.BinlogStreamer = newStreamer | ||
|
||
return newStreamer.GetEvent(ctx) | ||
} | ||
} | ||
} | ||
|
||
return ev, err | ||
} | ||
|
||
func newLocalBinFileStreamer(download BinlogFileDownload, position mysql.Position) *replication.BinlogStreamer { | ||
streamer := replication.NewBinlogStreamer() | ||
binFilePath, err := download(position) | ||
if err != nil { | ||
streamer.CloseWithError(errors.New("local binlog file not exist")) | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
go func(binFilePath string, streamer *replication.BinlogStreamer) { | ||
beginFromHere := false | ||
_ = replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { | ||
lance6716 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
beginFromHere = true | ||
} | ||
if beginFromHere { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to add an error that no matching position for |
||
streamer.PutEvent(be) | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return nil | ||
}) | ||
}(binFilePath, streamer) | ||
|
||
return streamer | ||
} |
Uh oh!
There was an error while loading. Please reload this page.