diff --git a/zmq.go b/zmq.go index 8ec11d4..a3774c8 100644 --- a/zmq.go +++ b/zmq.go @@ -21,6 +21,18 @@ var ( MaxBodySize uint64 = 0x02000000 ) +// reconnectError wraps any error with a timeout, which allows clients to keep +// attempting to read normally while the underlying socket attempts to +// reconnect. +type reconnectError struct { + error +} + +// Timeout implements the timeout interface from net and other packages. +func (e *reconnectError) Timeout() bool { + return true +} + func connFromAddr(addr string) (net.Conn, error) { re, err := regexp.Compile(`((tcp|unix|ipc)://)?([^:]*):?(\d*)`) if err != nil { @@ -50,6 +62,7 @@ func connFromAddr(addr string) (net.Conn, error) { // Conn is a connection to a ZMQ server. type Conn struct { conn net.Conn + topics []string timeout time.Duration } @@ -326,7 +339,7 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro conn.SetDeadline(time.Now().Add(10 * time.Second)) - c := &Conn{conn, timeout} + c := &Conn{conn, topics, timeout} if err := c.writeGreeting(); err != nil { conn.Close() @@ -362,7 +375,31 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro // Receive a message from the publisher. It blocks until a new message is // received. func (c *Conn) Receive() ([][]byte, error) { - return c.readMessage() + messages, err := c.readMessage() + // If the error is either nil or a non-EOF error, we return it as-is. + if err != io.EOF { + return messages, err + } + // We got an EOF, so our socket is disconnected. We attempt to + // reconnect. If successful, replace the existing connection with the + // new one. Either way, return a timeout error. + errTimeout := &net.OpError{ + Op: "read", + Net: c.conn.LocalAddr().Network(), + Source: c.conn.LocalAddr(), + Addr: c.conn.RemoteAddr(), + Err: &reconnectError{err}, + } + newConn, err := Subscribe(c.conn.RemoteAddr().String(), c.topics, + c.timeout) + if err != nil { + // Prevent CPU overuse by refused reconnection attempts. + time.Sleep(c.timeout) + } else { + c.Close() + *c = *newConn + } + return nil, errTimeout } // Close the underlying connection. Any further operations will fail.