Skip to content

Commit 462a8a7

Browse files
authored
Merge pull request #2 from aakselrod/auto-reconnect
add auto-reconnect while still returning a timeout error
2 parents 0d266ba + 5844fb4 commit 462a8a7

File tree

1 file changed

+39
-2
lines changed

1 file changed

+39
-2
lines changed

zmq.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@ var (
2121
MaxBodySize uint64 = 0x02000000
2222
)
2323

24+
// reconnectError wraps any error with a timeout, which allows clients to keep
25+
// attempting to read normally while the underlying socket attempts to
26+
// reconnect.
27+
type reconnectError struct {
28+
error
29+
}
30+
31+
// Timeout implements the timeout interface from net and other packages.
32+
func (e *reconnectError) Timeout() bool {
33+
return true
34+
}
35+
2436
func connFromAddr(addr string) (net.Conn, error) {
2537
re, err := regexp.Compile(`((tcp|unix|ipc)://)?([^:]*):?(\d*)`)
2638
if err != nil {
@@ -50,6 +62,7 @@ func connFromAddr(addr string) (net.Conn, error) {
5062
// Conn is a connection to a ZMQ server.
5163
type Conn struct {
5264
conn net.Conn
65+
topics []string
5366
timeout time.Duration
5467
}
5568

@@ -326,7 +339,7 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro
326339

327340
conn.SetDeadline(time.Now().Add(10 * time.Second))
328341

329-
c := &Conn{conn, timeout}
342+
c := &Conn{conn, topics, timeout}
330343

331344
if err := c.writeGreeting(); err != nil {
332345
conn.Close()
@@ -362,7 +375,31 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro
362375
// Receive a message from the publisher. It blocks until a new message is
363376
// received.
364377
func (c *Conn) Receive() ([][]byte, error) {
365-
return c.readMessage()
378+
messages, err := c.readMessage()
379+
// If the error is either nil or a non-EOF error, we return it as-is.
380+
if err != io.EOF {
381+
return messages, err
382+
}
383+
// We got an EOF, so our socket is disconnected. We attempt to
384+
// reconnect. If successful, replace the existing connection with the
385+
// new one. Either way, return a timeout error.
386+
errTimeout := &net.OpError{
387+
Op: "read",
388+
Net: c.conn.LocalAddr().Network(),
389+
Source: c.conn.LocalAddr(),
390+
Addr: c.conn.RemoteAddr(),
391+
Err: &reconnectError{err},
392+
}
393+
newConn, err := Subscribe(c.conn.RemoteAddr().String(), c.topics,
394+
c.timeout)
395+
if err != nil {
396+
// Prevent CPU overuse by refused reconnection attempts.
397+
time.Sleep(c.timeout)
398+
} else {
399+
c.Close()
400+
*c = *newConn
401+
}
402+
return nil, errTimeout
366403
}
367404

368405
// Close the underlying connection. Any further operations will fail.

0 commit comments

Comments
 (0)