-
Notifications
You must be signed in to change notification settings - Fork 1k
support download flushed binlog and parse event for cloud comput… #741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
f558bf9
226e137
5c3feac
e7f35db
17fa910
b0193e0
3f517c8
d3e25a8
9f98eda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package canal | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/go-mysql-org/go-mysql/mysql" | ||
"github.com/go-mysql-org/go-mysql/replication" | ||
"github.com/pingcap/errors" | ||
) | ||
|
||
// BinlogFileDownloader downloads the binlog file and return the path to it. It's often used to download binlog backup from RDS service. | ||
type BinlogFileDownloader func(mysql.Position) (localBinFilePath string, err error) | ||
|
||
// WithLocalBinlogDownloader registers the local bin file downloader, | ||
// that allows download the backup binlog file from RDS service to local | ||
func (c *Canal) WithLocalBinlogDownloader(d BinlogFileDownloader) { | ||
c.binFileDownloader = d | ||
} | ||
|
||
func (c *Canal) adaptLocalBinFileStreamer(remoteBinlogStreamer *replication.BinlogStreamer, err error) (*localBinFileAdapterStreamer, error) { | ||
return &localBinFileAdapterStreamer{ | ||
BinlogStreamer: remoteBinlogStreamer, | ||
syncMasterStreamer: remoteBinlogStreamer, | ||
canal: c, | ||
binFileDownloader: c.binFileDownloader, | ||
}, err | ||
} | ||
|
||
// localBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform | ||
type localBinFileAdapterStreamer struct { | ||
*replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer | ||
syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from canal startSyncer | ||
canal *Canal | ||
binFileDownloader BinlogFileDownloader | ||
} | ||
|
||
// GetEvent will auto switch the local and remote streamer to get binlog event if possible. | ||
func (s *localBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { | ||
if s.binFileDownloader == nil { // not support to use local bin file | ||
return s.BinlogStreamer.GetEvent(ctx) | ||
} | ||
|
||
ev, err := s.BinlogStreamer.GetEvent(ctx) | ||
|
||
if err == nil { | ||
switch ev.Event.(type) { | ||
case *replication.RotateEvent: // RotateEvent means need to change steamer back to sync master to retry sync | ||
s.BinlogStreamer = s.syncMasterStreamer | ||
} | ||
return ev, err | ||
} | ||
|
||
if err == replication.ErrNeedSyncAgain { // restart master if last sync master syncer has error | ||
s.canal.syncer.Close() | ||
_ = s.canal.prepareSyncer() | ||
|
||
newStreamer, startErr := s.canal.startSyncer() | ||
if startErr != nil { | ||
return nil, startErr | ||
} | ||
ev, err = newStreamer.GetEvent(ctx) | ||
// set all streamer to the new sync master streamer | ||
s.BinlogStreamer = newStreamer | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.syncMasterStreamer = newStreamer | ||
} | ||
|
||
if mysqlErr, ok := err.(*mysql.MyError); ok { | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// change to local binlog file streamer to adapter the steamer | ||
if mysqlErr.Code == mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the binlog file is purged just at the time we are switching to it, should we also generate a "fake rotate evnet" as the real binlog streamer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if syncer switch to a purged binlog file, 'not find first log file' happen and syncer closed. it try to download binlog to local. the local file contains rotate event, and will put the rotate event to streamer. then canal's masterInfo will change to the new position and it restart syncer to try if new position file is on master, if not the new started syncer will also closed by 'not find first log file' and download next binlog to local. so generate a "fake rotate evnet" maybe not necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean the "fake rorate event" which comes before the beginning of any events in binlog, not the real rotate event at the end of each binlog file. You can find it when reading binlog
for above two events, the first one is the real rotate event which can be found at the end of mysql-bin.000001, and the second one is a fake rotate event There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "fake rorate event" not exist in binlog file, so no need to process? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "fake rorate event" does not exist in binlog file but it is sent to downstream and can be find when reading binlog 😅 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when using local binlog, the real syncer is closed, canal will not read event from server's binlog, all event read from AdapterStreamer is parsed from binlog file, that will not exist "fake rorate event". "fake rorate event" will send to streamer after finish parse binlog file and restart the read syncer, and runSyncBinlog will process that event |
||
mysqlErr.Message == "Could not find first log file name in binary log index file" { | ||
gset := s.canal.master.GTIDSet() | ||
if gset == nil || gset.String() == "" { // currently only support position based replication | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") | ||
pos := s.canal.master.Position() | ||
newStreamer := newLocalBinFileStreamer(s.binFileDownloader, pos) | ||
|
||
s.syncMasterStreamer = s.BinlogStreamer | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.BinlogStreamer = newStreamer | ||
|
||
return newStreamer.GetEvent(ctx) | ||
} | ||
} | ||
} | ||
|
||
return ev, err | ||
} | ||
|
||
func newLocalBinFileStreamer(download BinlogFileDownloader, position mysql.Position) *replication.BinlogStreamer { | ||
streamer := replication.NewBinlogStreamer() | ||
binFilePath, err := download(position) | ||
if err != nil { | ||
streamer.CloseWithError(errors.New("local binlog file not exist")) | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
go func(binFilePath string, streamer *replication.BinlogStreamer) { | ||
beginFromHere := false | ||
_ = replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
beginFromHere = true | ||
} | ||
if beginFromHere { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to add an error that no matching position for |
||
streamer.PutEvent(be) | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return nil | ||
}) | ||
}(binFilePath, streamer) | ||
|
||
return streamer | ||
} |
Uh oh!
There was an error while loading. Please reload this page.