From 5c6ccf32e994e602bb70158d9886a615a94ea53d Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 9 Jul 2019 00:04:48 -0700 Subject: [PATCH 1/4] build: init go module --- go.mod | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 go.mod diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..abd616b --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/lightninglabs/gozmq + +go 1.12 From e047b846879ed0e3e06334780c0fa56e01604682 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 9 Jul 2019 00:16:28 -0700 Subject: [PATCH 2/4] zmq: block on the first frame when reading new messages We'll only set read deadlines on the underlying connection when reading messages of multiple frames after the first frame has been read. This is done to ensure we receive all of the frames of a message within a reasonable time frame. When reading the first frame, we want to avoid setting them as we don't know when a new message will be available for us to read, causing us to block until its delivery. --- zmq.go | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/zmq.go b/zmq.go index a3774c8..80ab6e4 100644 --- a/zmq.go +++ b/zmq.go @@ -193,7 +193,7 @@ func (c *Conn) subscribe(prefix string) error { } func (c *Conn) readCommand() (string, []byte, error) { - flag, buf, err := c.readFrame() + flag, buf, err := c.readFrame(true) if err != nil { return "", nil, err } @@ -256,10 +256,20 @@ func (c *Conn) readReady() error { } // Read a frame from the socket, setting deadline before each read to prevent -// timeouts during or between frames. -func (c *Conn) readFrame() (byte, []byte, error) { +// timeouts during or between frames. The initialFrame should be used to denote +// whether this is the first frame we'll read for a _new_ message. +// +// NOTE: This is a blocking call if there is nothing to read from the +// connection. +func (c *Conn) readFrame(initialFrame bool) (byte, []byte, error) { + // We'll only set a read deadline if this is not the first frame of a + // message. We do this to ensure we receive complete messages in a + // timely manner. + if !initialFrame { + c.conn.SetReadDeadline(time.Now().Add(c.timeout)) + } + var flagBuf [1]byte - c.conn.SetReadDeadline(time.Now().Add(c.timeout)) if _, err := io.ReadFull(c.conn, flagBuf[:1]); err != nil { return 0, nil, err } @@ -305,11 +315,22 @@ func (c *Conn) readFrame() (byte, []byte, error) { return flag, buf, nil } -// Read a message from the socket. +// readMessage reads a new message from the connection. +// +// NOTE: This is a blocking call if there is nothing to read from the +// connection. func (c *Conn) readMessage() ([][]byte, error) { + // We'll only set read deadlines on the underlying connection when + // reading messages of multiple frames after the first frame has been + // read. This is done to ensure we receive all of the frames of a + // message within a reasonable time frame. When reading the first frame, + // we want to avoid setting them as we don't know when a new message + // will be available for us to read. + initialFrame := true + var parts [][]byte for { - flag, buf, err := c.readFrame() + flag, buf, err := c.readFrame(initialFrame) if err != nil { return nil, err } @@ -326,6 +347,8 @@ func (c *Conn) readMessage() ([][]byte, error) { if len(parts) > 16 { return nil, errors.New("message has too many parts") } + + initialFrame = false } return parts, nil } From 09dc028c9271ae0117c63ab24103f9422f58944e Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 9 Jul 2019 00:18:49 -0700 Subject: [PATCH 3/4] zmq: prevent reconnecting when connection is explicitly terminated If the connection is explicitly terminated, we should not try to re-establish the connection since doing so signals that the connection is no longer needed. --- zmq.go | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/zmq.go b/zmq.go index 80ab6e4..2cc24a9 100644 --- a/zmq.go +++ b/zmq.go @@ -12,6 +12,7 @@ import ( "os" "regexp" "strings" + "sync" "time" ) @@ -64,6 +65,9 @@ type Conn struct { conn net.Conn topics []string timeout time.Duration + + closeConn sync.Once + quit chan struct{} } func (c *Conn) writeAll(buf []byte) error { @@ -362,7 +366,12 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro conn.SetDeadline(time.Now().Add(10 * time.Second)) - c := &Conn{conn, topics, timeout} + c := &Conn{ + conn: conn, + topics: topics, + timeout: timeout, + quit: make(chan struct{}), + } if err := c.writeGreeting(); err != nil { conn.Close() @@ -396,16 +405,27 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro } // Receive a message from the publisher. It blocks until a new message is -// received. +// received. If the connection times out and it was not explicitly terminated, +// then a timeout error is returned. Otherwise, if it was explicitly terminated, +// then io.EOF is returned. func (c *Conn) Receive() ([][]byte, error) { messages, err := c.readMessage() // If the error is either nil or a non-EOF error, we return it as-is. if err != io.EOF { return messages, err } - // We got an EOF, so our socket is disconnected. We attempt to - // reconnect. If successful, replace the existing connection with the - // new one. Either way, return a timeout error. + + // We got an EOF, so our socket is disconnected. If the connection was + // explicitly terminated, we'll return the EOF error. + select { + case <-c.quit: + return nil, io.EOF + default: + } + + // Otherwise, we'll attempt to reconnect. If successful, we'll replace + // the existing connection with the new one. Either way, return a + // timeout error. errTimeout := &net.OpError{ Op: "read", Net: c.conn.LocalAddr().Network(), @@ -413,19 +433,25 @@ func (c *Conn) Receive() ([][]byte, error) { Addr: c.conn.RemoteAddr(), Err: &reconnectError{err}, } - newConn, err := Subscribe(c.conn.RemoteAddr().String(), c.topics, - c.timeout) + newConn, err := Subscribe( + c.conn.RemoteAddr().String(), c.topics, c.timeout, + ) if err != nil { // Prevent CPU overuse by refused reconnection attempts. time.Sleep(c.timeout) } else { - c.Close() - *c = *newConn + c.conn.Close() + c.conn = newConn.conn } return nil, errTimeout } // Close the underlying connection. Any further operations will fail. func (c *Conn) Close() error { - return c.conn.Close() + var err error + c.closeConn.Do(func() { + close(c.quit) + err = c.conn.Close() + }) + return err } From 6e9a863447205823f560e9721743b95ac48be7c2 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 9 Jul 2019 00:21:22 -0700 Subject: [PATCH 4/4] zmq: expose underlying connection RemoteAddr method --- zmq.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/zmq.go b/zmq.go index 2cc24a9..8796ac9 100644 --- a/zmq.go +++ b/zmq.go @@ -455,3 +455,8 @@ func (c *Conn) Close() error { }) return err } + +// RemoteAddr returns the remote network address. +func (c *Conn) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() +}