diff --git a/canal/canal_test.go b/canal/canal_test.go index a5bb6ed92..e1c8ad7e3 100644 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -141,10 +141,48 @@ func (s *canalTestSuite) TestCanal(c *C) { s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`") s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18") - err := s.c.CatchMasterPos(10 * time.Second) + err := CatchMasterPos(s.c, 10*time.Second) c.Assert(err, IsNil) } +func CatchMasterPos(c *Canal, timeout time.Duration) error { + pos, err := c.GetMasterPos() + if err != nil { + return errors.Trace(err) + } + + return WaitUntilPos(c, pos, timeout) +} + +func FlushBinlog(c *Canal) error { + _, err := c.Execute("FLUSH BINARY LOGS") + return errors.Trace(err) +} + +func WaitUntilPos(c *Canal, pos mysql.Position, timeout time.Duration) error { + timer := time.NewTimer(timeout) + for { + select { + case <-timer.C: + return errors.Errorf("wait position %v too long > %s", pos, timeout) + default: + err := FlushBinlog(c) + if err != nil { + return errors.Trace(err) + } + curPos := c.master.Position() + if curPos.Compare(pos) >= 0 { + return nil + } else { + log.Debugf("master pos is %v, wait catching %v", curPos, pos) + time.Sleep(100 * time.Millisecond) + } + } + } + + return nil +} + func (s *canalTestSuite) TestCanalFilter(c *C) { // included sch, err := s.c.GetTable("test", "canal_test") diff --git a/canal/sync.go b/canal/sync.go index 9a8df4b96..27554c905 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -274,35 +274,6 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return c.eventHandler.OnRow(events) } -func (c *Canal) FlushBinlog() error { - _, err := c.Execute("FLUSH BINARY LOGS") - return errors.Trace(err) -} - -func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error { - timer := time.NewTimer(timeout) - for { - select { - case <-timer.C: - return errors.Errorf("wait position %v too long > %s", pos, timeout) - default: - err := c.FlushBinlog() - if err != nil { - return errors.Trace(err) - } - curPos := c.master.Position() - if curPos.Compare(pos) >= 0 { - return nil - } else { - log.Debugf("master pos is %v, wait catching %v", curPos, pos) - time.Sleep(100 * time.Millisecond) - } - } - } - - return nil -} - func (c *Canal) GetMasterPos() (mysql.Position, error) { rr, err := c.Execute("SHOW MASTER STATUS") if err != nil { @@ -337,12 +308,3 @@ func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error) { } return gset, nil } - -func (c *Canal) CatchMasterPos(timeout time.Duration) error { - pos, err := c.GetMasterPos() - if err != nil { - return errors.Trace(err) - } - - return c.WaitUntilPos(pos, timeout) -}