Skip to content

add auto-reconnect while still returning a timeout error #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 24, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions zmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ var (
MaxBodySize uint64 = 0x02000000
)

// reconnectError wraps any error with a timeout, which allows clients to keep
// attempting to read normally while the underlying socket attempts to
// reconnect.
type reconnectError struct {
error
}

// Timeout implements the timeout interface from net and other packages.
func (e *reconnectError) Timeout() bool {
return true
}

func connFromAddr(addr string) (net.Conn, error) {
re, err := regexp.Compile(`((tcp|unix|ipc)://)?([^:]*):?(\d*)`)
if err != nil {
Expand Down Expand Up @@ -50,6 +62,7 @@ func connFromAddr(addr string) (net.Conn, error) {
// Conn is a connection to a ZMQ server.
type Conn struct {
conn net.Conn
topics []string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this value ever being set. So it would see that with this, we'd be able to reconnect, but then would lose notifications concerning the topics we care about?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment on line 342.

timeout time.Duration
}

Expand Down Expand Up @@ -326,7 +339,7 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro

conn.SetDeadline(time.Now().Add(10 * time.Second))

c := &Conn{conn, timeout}
c := &Conn{conn, topics, timeout}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where topics is set.


if err := c.writeGreeting(); err != nil {
conn.Close()
Expand Down Expand Up @@ -362,7 +375,31 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro
// Receive a message from the publisher. It blocks until a new message is
// received.
func (c *Conn) Receive() ([][]byte, error) {
return c.readMessage()
messages, err := c.readMessage()
// If the error is either nil or a non-EOF error, we return it as-is.
if err != io.EOF {
return messages, err
}
// We got an EOF, so our socket is disconnected. We attempt to
// reconnect. If successful, replace the existing connection with the
// new one. Either way, return a timeout error.
errTimeout := &net.OpError{
Op: "read",
Net: c.conn.LocalAddr().Network(),
Source: c.conn.LocalAddr(),
Addr: c.conn.RemoteAddr(),
Err: &reconnectError{err},
}
newConn, err := Subscribe(c.conn.RemoteAddr().String(), c.topics,
c.timeout)
if err != nil {
// Prevent CPU overuse by refused reconnection attempts.
time.Sleep(c.timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should use an exponential back off here? For reference the btcsuite rpcclient does something similar: https://github.com/btcsuite/btcd/blob/master/rpcclient/infrastructure.go#L641

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used synchronously, which means each time we back off, we can delay processing of other events in an event loop that include a call to this function. For example, https://github.com/Roasbeef/btcwallet/blob/master/chain/bitcoind.go#L632 is in a loop that also checks for updates to watch lists and rescans. The rpcclient reconnect handler runs as a goroutine.

An option for doing the incremental back off would be to do that in a goroutine, and just sleep/return a timeout error in the synchronous portion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option for doing incremental backoff is to add more state to Conn to keep track of last attempted reconnection time when disconnected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so even with this patch, we need reconnection logic within the wallet as well? As this just return a timeout error in either case, even if we're able to reconnect successfully.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the wallet already ignores read timeouts, this code just emulates those: https://github.com/Roasbeef/btcwallet/blob/master/chain/bitcoind.go#L636

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we reconnect successfully, the next attempt at reading will return a message if there is one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, thanks! OK, will test out this patch locally.

} else {
c.Close()
*c = *newConn
}
return nil, errTimeout
}

// Close the underlying connection. Any further operations will fail.
Expand Down