From ff63b191ded94e60736449335780d05025fd0dc7 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Fri, 27 Sep 2019 13:39:17 -0500 Subject: [PATCH 1/2] Fix unaligned 64 bit atomic loads on 32 bit platforms Closes #153 --- conn.go | 5 +++-- conn_common.go | 2 +- websocket_js.go | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/conn.go b/conn.go index 3d7d574e..2d0339e4 100644 --- a/conn.go +++ b/conn.go @@ -70,7 +70,8 @@ type Conn struct { activeReader *messageReader // readFrameLock is acquired to read from bw. readFrameLock chan struct{} - readClosed int64 + // Not int32 because of https://github.com/nhooyr/websocket/issues/153 + readClosed int32 readHeaderBuf []byte controlPayloadBuf []byte @@ -341,7 +342,7 @@ func (c *Conn) handleControl(ctx context.Context, h header) error { // See https://github.com/nhooyr/websocket/issues/87#issue-451703332 // Most users should not need this. func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) { - if atomic.LoadInt64(&c.readClosed) == 1 { + if atomic.LoadInt32(&c.readClosed) == 1 { return 0, nil, fmt.Errorf("websocket connection read closed") } diff --git a/conn_common.go b/conn_common.go index ae0fe554..9a4f9043 100644 --- a/conn_common.go +++ b/conn_common.go @@ -178,7 +178,7 @@ func (c *netConn) SetReadDeadline(t time.Time) error { // Use this when you do not want to read data messages from the connection anymore but will // want to write messages to it. func (c *Conn) CloseRead(ctx context.Context) context.Context { - atomic.StoreInt64(&c.readClosed, 1) + atomic.StoreInt32(&c.readClosed, 1) ctx, cancel := context.WithCancel(ctx) go func() { diff --git a/websocket_js.go b/websocket_js.go index 3822797b..dcb02061 100644 --- a/websocket_js.go +++ b/websocket_js.go @@ -89,7 +89,7 @@ func (c *Conn) closeWithInternal() { // Read attempts to read a message from the connection. // The maximum time spent waiting is bounded by the context. func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { - if atomic.LoadInt64(&c.readClosed) == 1 { + if atomic.LoadInt32(&c.readClosed) == 1 { return 0, nil, fmt.Errorf("websocket connection read closed") } From 4b51f4a8c995a1cf2bd4aaec3a54c519d9c3ea36 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Fri, 27 Sep 2019 14:05:11 -0500 Subject: [PATCH 2/2] Remove atomic integer loads and stores in favour of atomic.Value - Also allows SetReadLimit to be called concurrently which is a nice touch --- ci/wasm.sh | 2 ++ conn.go | 16 ++++++++-------- conn_common.go | 25 +++++++++++++++++++++++-- websocket_js.go | 16 ++++++++++------ 4 files changed, 43 insertions(+), 16 deletions(-) diff --git a/ci/wasm.sh b/ci/wasm.sh index 0290f188..1497ba24 100755 --- a/ci/wasm.sh +++ b/ci/wasm.sh @@ -26,5 +26,7 @@ GOOS=js GOARCH=wasm go test -exec=wasmbrowsertest ./... -args "$WS_ECHO_SERVER_U if ! wait "$wsjstestPID"; then echo "wsjstest exited unsuccessfully" + echo "output:" + cat "$wsjstestOut" exit 1 fi diff --git a/conn.go b/conn.go index 2d0339e4..2679edcc 100644 --- a/conn.go +++ b/conn.go @@ -20,8 +20,7 @@ import ( ) // Conn represents a WebSocket connection. -// All methods may be called concurrently except for Reader, Read -// and SetReadLimit. +// All methods may be called concurrently except for Reader and Read. // // You must always read from the connection. Otherwise control // frames will not be handled. See the docs on Reader and CloseRead. @@ -56,7 +55,7 @@ type Conn struct { writeHeaderBuf []byte writeHeader *header // read limit for a message in bytes. - msgReadLimit int64 + msgReadLimit *atomicInt64 // Used to ensure a previous writer is not used after being closed. activeWriter atomic.Value @@ -70,8 +69,7 @@ type Conn struct { activeReader *messageReader // readFrameLock is acquired to read from bw. readFrameLock chan struct{} - // Not int32 because of https://github.com/nhooyr/websocket/issues/153 - readClosed int32 + readClosed *atomicInt64 readHeaderBuf []byte controlPayloadBuf []byte @@ -91,7 +89,8 @@ type Conn struct { func (c *Conn) init() { c.closed = make(chan struct{}) - c.msgReadLimit = 32768 + c.msgReadLimit = &atomicInt64{} + c.msgReadLimit.Store(32768) c.writeMsgLock = make(chan struct{}, 1) c.writeFrameLock = make(chan struct{}, 1) @@ -106,6 +105,7 @@ func (c *Conn) init() { c.writeHeaderBuf = makeWriteHeaderBuf() c.writeHeader = &header{} c.readHeaderBuf = makeReadHeaderBuf() + c.readClosed = &atomicInt64{} c.controlPayloadBuf = make([]byte, maxControlFramePayload) runtime.SetFinalizer(c, func(c *Conn) { @@ -342,7 +342,7 @@ func (c *Conn) handleControl(ctx context.Context, h header) error { // See https://github.com/nhooyr/websocket/issues/87#issue-451703332 // Most users should not need this. func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) { - if atomic.LoadInt32(&c.readClosed) == 1 { + if c.readClosed.Load() == 1 { return 0, nil, fmt.Errorf("websocket connection read closed") } @@ -392,7 +392,7 @@ func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) { c.readerMsgHeader = h c.readerFrameEOF = false c.readerMaskPos = 0 - c.readMsgLeft = c.msgReadLimit + c.readMsgLeft = c.msgReadLimit.Load() r := &messageReader{ c: c, diff --git a/conn_common.go b/conn_common.go index 9a4f9043..47146110 100644 --- a/conn_common.go +++ b/conn_common.go @@ -178,7 +178,7 @@ func (c *netConn) SetReadDeadline(t time.Time) error { // Use this when you do not want to read data messages from the connection anymore but will // want to write messages to it. func (c *Conn) CloseRead(ctx context.Context) context.Context { - atomic.StoreInt32(&c.readClosed, 1) + c.readClosed.Store(1) ctx, cancel := context.WithCancel(ctx) go func() { @@ -200,7 +200,7 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context { // // When the limit is hit, the connection will be closed with StatusMessageTooBig. func (c *Conn) SetReadLimit(n int64) { - c.msgReadLimit = n + c.msgReadLimit.Store(n) } func (c *Conn) setCloseErr(err error) { @@ -208,3 +208,24 @@ func (c *Conn) setCloseErr(err error) { c.closeErr = fmt.Errorf("websocket closed: %w", err) }) } + +// See https://github.com/nhooyr/websocket/issues/153 +type atomicInt64 struct { + v atomic.Value +} + +func (v *atomicInt64) Load() int64 { + i, ok := v.v.Load().(int64) + if !ok { + return 0 + } + return i +} + +func (v *atomicInt64) Store(i int64) { + v.v.Store(i) +} + +func (v *atomicInt64) String() string { + return fmt.Sprint(v.v.Load()) +} diff --git a/websocket_js.go b/websocket_js.go index dcb02061..2226d3a4 100644 --- a/websocket_js.go +++ b/websocket_js.go @@ -10,7 +10,6 @@ import ( "reflect" "runtime" "sync" - "sync/atomic" "syscall/js" "nhooyr.io/websocket/internal/bpool" @@ -21,9 +20,10 @@ import ( type Conn struct { ws wsjs.WebSocket - msgReadLimit int64 + // read limit for a message in bytes. + msgReadLimit *atomicInt64 - readClosed int64 + readClosed *atomicInt64 closeOnce sync.Once closed chan struct{} closeErrOnce sync.Once @@ -49,7 +49,11 @@ func (c *Conn) close(err error) { func (c *Conn) init() { c.closed = make(chan struct{}) c.readSignal = make(chan struct{}, 1) - c.msgReadLimit = 32768 + + c.msgReadLimit = &atomicInt64{} + c.msgReadLimit.Store(32768) + + c.readClosed = &atomicInt64{} c.releaseOnClose = c.ws.OnClose(func(e wsjs.CloseEvent) { cerr := CloseError{ @@ -89,7 +93,7 @@ func (c *Conn) closeWithInternal() { // Read attempts to read a message from the connection. // The maximum time spent waiting is bounded by the context. func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { - if atomic.LoadInt32(&c.readClosed) == 1 { + if c.readClosed.Load() == 1 { return 0, nil, fmt.Errorf("websocket connection read closed") } @@ -97,7 +101,7 @@ func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { if err != nil { return 0, nil, fmt.Errorf("failed to read: %w", err) } - if int64(len(p)) > c.msgReadLimit { + if int64(len(p)) > c.msgReadLimit.Load() { c.Close(StatusMessageTooBig, fmt.Sprintf("read limited at %v bytes", c.msgReadLimit)) return 0, nil, c.closeErr }