Skip to content

zmq: prevent unnecessary timeouts and return EOF on connection termination #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/lightninglabs/gozmq

go 1.12
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this defined here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

86 changes: 70 additions & 16 deletions zmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"regexp"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -64,6 +65,9 @@ type Conn struct {
conn net.Conn
topics []string
timeout time.Duration

closeConn sync.Once
quit chan struct{}
}

func (c *Conn) writeAll(buf []byte) error {
Expand Down Expand Up @@ -193,7 +197,7 @@ func (c *Conn) subscribe(prefix string) error {
}

func (c *Conn) readCommand() (string, []byte, error) {
flag, buf, err := c.readFrame()
flag, buf, err := c.readFrame(true)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -256,10 +260,20 @@ func (c *Conn) readReady() error {
}

// Read a frame from the socket, setting deadline before each read to prevent
// timeouts during or between frames.
func (c *Conn) readFrame() (byte, []byte, error) {
// timeouts during or between frames. The initialFrame should be used to denote
// whether this is the first frame we'll read for a _new_ message.
//
// NOTE: This is a blocking call if there is nothing to read from the
// connection.
func (c *Conn) readFrame(initialFrame bool) (byte, []byte, error) {
// We'll only set a read deadline if this is not the first frame of a
// message. We do this to ensure we receive complete messages in a
// timely manner.
if !initialFrame {
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
}

var flagBuf [1]byte
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
if _, err := io.ReadFull(c.conn, flagBuf[:1]); err != nil {
return 0, nil, err
}
Expand Down Expand Up @@ -305,11 +319,22 @@ func (c *Conn) readFrame() (byte, []byte, error) {
return flag, buf, nil
}

// Read a message from the socket.
// readMessage reads a new message from the connection.
//
// NOTE: This is a blocking call if there is nothing to read from the
// connection.
func (c *Conn) readMessage() ([][]byte, error) {
// We'll only set read deadlines on the underlying connection when
// reading messages of multiple frames after the first frame has been
// read. This is done to ensure we receive all of the frames of a
// message within a reasonable time frame. When reading the first frame,
// we want to avoid setting them as we don't know when a new message
// will be available for us to read.
initialFrame := true

var parts [][]byte
for {
flag, buf, err := c.readFrame()
flag, buf, err := c.readFrame(initialFrame)
if err != nil {
return nil, err
}
Expand All @@ -326,6 +351,8 @@ func (c *Conn) readMessage() ([][]byte, error) {
if len(parts) > 16 {
return nil, errors.New("message has too many parts")
}

initialFrame = false
}
return parts, nil
}
Expand All @@ -339,7 +366,12 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro

conn.SetDeadline(time.Now().Add(10 * time.Second))

c := &Conn{conn, topics, timeout}
c := &Conn{
conn: conn,
topics: topics,
timeout: timeout,
quit: make(chan struct{}),
}

if err := c.writeGreeting(); err != nil {
conn.Close()
Expand Down Expand Up @@ -373,36 +405,58 @@ func Subscribe(addr string, topics []string, timeout time.Duration) (*Conn, erro
}

// Receive a message from the publisher. It blocks until a new message is
// received.
// received. If the connection times out and it was not explicitly terminated,
// then a timeout error is returned. Otherwise, if it was explicitly terminated,
// then io.EOF is returned.
func (c *Conn) Receive() ([][]byte, error) {
messages, err := c.readMessage()
// If the error is either nil or a non-EOF error, we return it as-is.
if err != io.EOF {
return messages, err
}
// We got an EOF, so our socket is disconnected. We attempt to
// reconnect. If successful, replace the existing connection with the
// new one. Either way, return a timeout error.

// We got an EOF, so our socket is disconnected. If the connection was
// explicitly terminated, we'll return the EOF error.
select {
case <-c.quit:
return nil, io.EOF
default:
}

// Otherwise, we'll attempt to reconnect. If successful, we'll replace
// the existing connection with the new one. Either way, return a
// timeout error.
errTimeout := &net.OpError{
Op: "read",
Net: c.conn.LocalAddr().Network(),
Source: c.conn.LocalAddr(),
Addr: c.conn.RemoteAddr(),
Err: &reconnectError{err},
}
newConn, err := Subscribe(c.conn.RemoteAddr().String(), c.topics,
c.timeout)
newConn, err := Subscribe(
c.conn.RemoteAddr().String(), c.topics, c.timeout,
)
if err != nil {
// Prevent CPU overuse by refused reconnection attempts.
time.Sleep(c.timeout)
} else {
c.Close()
*c = *newConn
c.conn.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No error to return here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still want to return errTimeout. Closing this connection seems like more of a best effort thing.

c.conn = newConn.conn
}
return nil, errTimeout
}

// Close the underlying connection. Any further operations will fail.
func (c *Conn) Close() error {
return c.conn.Close()
var err error
c.closeConn.Do(func() {
close(c.quit)
err = c.conn.Close()
})
return err
}

// RemoteAddr returns the remote network address.
func (c *Conn) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}