Skip to content

Restructure library #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion websocket.go → conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *Conn) close(err error) {
// closeErr.
c.closer.Close()

// See comment in dial.go
// See comment on bufioReaderPool in handshake.go
if c.client {
// By acquiring the locks, we ensure no goroutine will touch the bufio reader or writer
// and we can safely return them.
Expand Down
369 changes: 369 additions & 0 deletions websocket_test.go → conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/http/cookiejar"
"net/http/httptest"
"net/url"
"os"
"os/exec"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -1962,3 +1965,369 @@ func assertReadMessage(ctx context.Context, c *websocket.Conn, typ websocket.Mes
}
return assertEqualf(p, actP, "unexpected frame %v payload", actTyp)
}

func BenchmarkConn(b *testing.B) {
sizes := []int{
2,
16,
32,
512,
4096,
16384,
}

b.Run("write", func(b *testing.B) {
for _, size := range sizes {
b.Run(strconv.Itoa(size), func(b *testing.B) {
b.Run("stream", func(b *testing.B) {
benchConn(b, false, true, size)
})
b.Run("buffer", func(b *testing.B) {
benchConn(b, false, false, size)
})
})
}
})

b.Run("echo", func(b *testing.B) {
for _, size := range sizes {
b.Run(strconv.Itoa(size), func(b *testing.B) {
benchConn(b, true, true, size)
})
}
})
}

func benchConn(b *testing.B, echo, stream bool, size int) {
s, closeFn := testServer(b, func(w http.ResponseWriter, r *http.Request) error {
c, err := websocket.Accept(w, r, nil)
if err != nil {
return err
}
if echo {
wsecho.Loop(r.Context(), c)
} else {
discardLoop(r.Context(), c)
}
return nil
}, false)
defer closeFn()

wsURL := strings.Replace(s.URL, "http", "ws", 1)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

c, _, err := websocket.Dial(ctx, wsURL, nil)
if err != nil {
b.Fatal(err)
}
defer c.Close(websocket.StatusInternalError, "")

msg := []byte(strings.Repeat("2", size))
readBuf := make([]byte, len(msg))
b.SetBytes(int64(len(msg)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if stream {
w, err := c.Writer(ctx, websocket.MessageText)
if err != nil {
b.Fatal(err)
}

_, err = w.Write(msg)
if err != nil {
b.Fatal(err)
}

err = w.Close()
if err != nil {
b.Fatal(err)
}
} else {
err = c.Write(ctx, websocket.MessageText, msg)
if err != nil {
b.Fatal(err)
}
}

if echo {
_, r, err := c.Reader(ctx)
if err != nil {
b.Fatal(err)
}

_, err = io.ReadFull(r, readBuf)
if err != nil {
b.Fatal(err)
}
}
}
b.StopTimer()

c.Close(websocket.StatusNormalClosure, "")
}

func discardLoop(ctx context.Context, c *websocket.Conn) {
defer c.Close(websocket.StatusInternalError, "")

ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

b := make([]byte, 32768)
echo := func() error {
_, r, err := c.Reader(ctx)
if err != nil {
return err
}

_, err = io.CopyBuffer(ioutil.Discard, r, b)
if err != nil {
return err
}
return nil
}

for {
err := echo()
if err != nil {
return
}
}
}

func TestAutobahnPython(t *testing.T) {
// This test contains the old autobahn test suite tests that use the
// python binary. The approach is clunky and slow so new tests
// have been written in pure Go in websocket_test.go.
// These have been kept for correctness purposes and are occasionally ran.
if os.Getenv("AUTOBAHN_PYTHON") == "" {
t.Skip("Set $AUTOBAHN_PYTHON to run tests against the python autobahn test suite")
}

t.Run("server", testServerAutobahnPython)
t.Run("client", testClientAutobahnPython)
}

// https://github.com/crossbario/autobahn-python/tree/master/wstest
func testServerAutobahnPython(t *testing.T) {
t.Parallel()

s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
Subprotocols: []string{"echo"},
})
if err != nil {
t.Logf("server handshake failed: %+v", err)
return
}
wsecho.Loop(r.Context(), c)
}))
defer s.Close()

spec := map[string]interface{}{
"outdir": "ci/out/wstestServerReports",
"servers": []interface{}{
map[string]interface{}{
"agent": "main",
"url": strings.Replace(s.URL, "http", "ws", 1),
},
},
"cases": []string{"*"},
// We skip the UTF-8 handling tests as there isn't any reason to reject invalid UTF-8, just
// more performance overhead. 7.5.1 is the same.
// 12.* and 13.* as we do not support compression.
"exclude-cases": []string{"6.*", "7.5.1", "12.*", "13.*"},
}
specFile, err := ioutil.TempFile("", "websocketFuzzingClient.json")
if err != nil {
t.Fatalf("failed to create temp file for fuzzingclient.json: %v", err)
}
defer specFile.Close()

e := json.NewEncoder(specFile)
e.SetIndent("", "\t")
err = e.Encode(spec)
if err != nil {
t.Fatalf("failed to write spec: %v", err)
}

err = specFile.Close()
if err != nil {
t.Fatalf("failed to close file: %v", err)
}

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

args := []string{"--mode", "fuzzingclient", "--spec", specFile.Name()}
wstest := exec.CommandContext(ctx, "wstest", args...)
out, err := wstest.CombinedOutput()
if err != nil {
t.Fatalf("failed to run wstest: %v\nout:\n%s", err, out)
}

checkWSTestIndex(t, "./ci/out/wstestServerReports/index.json")
}

func unusedListenAddr() (string, error) {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return "", err
}
l.Close()
return l.Addr().String(), nil
}

// https://github.com/crossbario/autobahn-python/blob/master/wstest/testee_client_aio.py
func testClientAutobahnPython(t *testing.T) {
t.Parallel()

if os.Getenv("AUTOBAHN_PYTHON") == "" {
t.Skip("Set $AUTOBAHN_PYTHON to test against the python autobahn test suite")
}

serverAddr, err := unusedListenAddr()
if err != nil {
t.Fatalf("failed to get unused listen addr for wstest: %v", err)
}

wsServerURL := "ws://" + serverAddr

spec := map[string]interface{}{
"url": wsServerURL,
"outdir": "ci/out/wstestClientReports",
"cases": []string{"*"},
// See TestAutobahnServer for the reasons why we exclude these.
"exclude-cases": []string{"6.*", "7.5.1", "12.*", "13.*"},
}
specFile, err := ioutil.TempFile("", "websocketFuzzingServer.json")
if err != nil {
t.Fatalf("failed to create temp file for fuzzingserver.json: %v", err)
}
defer specFile.Close()

e := json.NewEncoder(specFile)
e.SetIndent("", "\t")
err = e.Encode(spec)
if err != nil {
t.Fatalf("failed to write spec: %v", err)
}

err = specFile.Close()
if err != nil {
t.Fatalf("failed to close file: %v", err)
}

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

args := []string{"--mode", "fuzzingserver", "--spec", specFile.Name(),
// Disables some server that runs as part of fuzzingserver mode.
// See https://github.com/crossbario/autobahn-testsuite/blob/058db3a36b7c3a1edf68c282307c6b899ca4857f/autobahntestsuite/autobahntestsuite/wstest.py#L124
"--webport=0",
}
wstest := exec.CommandContext(ctx, "wstest", args...)
err = wstest.Start()
if err != nil {
t.Fatal(err)
}
defer func() {
err := wstest.Process.Kill()
if err != nil {
t.Error(err)
}
}()

// Let it come up.
time.Sleep(time.Second * 5)

var cases int
func() {
c, _, err := websocket.Dial(ctx, wsServerURL+"/getCaseCount", nil)
if err != nil {
t.Fatal(err)
}
defer c.Close(websocket.StatusInternalError, "")

_, r, err := c.Reader(ctx)
if err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
cases, err = strconv.Atoi(string(b))
if err != nil {
t.Fatal(err)
}

c.Close(websocket.StatusNormalClosure, "")
}()

for i := 1; i <= cases; i++ {
func() {
ctx, cancel := context.WithTimeout(ctx, time.Second*45)
defer cancel()

c, _, err := websocket.Dial(ctx, fmt.Sprintf(wsServerURL+"/runCase?case=%v&agent=main", i), nil)
if err != nil {
t.Fatal(err)
}
wsecho.Loop(ctx, c)
}()
}

c, _, err := websocket.Dial(ctx, fmt.Sprintf(wsServerURL+"/updateReports?agent=main"), nil)
if err != nil {
t.Fatal(err)
}
c.Close(websocket.StatusNormalClosure, "")

checkWSTestIndex(t, "./ci/out/wstestClientReports/index.json")
}

func checkWSTestIndex(t *testing.T, path string) {
wstestOut, err := ioutil.ReadFile(path)
if err != nil {
t.Fatalf("failed to read index.json: %v", err)
}

var indexJSON map[string]map[string]struct {
Behavior string `json:"behavior"`
BehaviorClose string `json:"behaviorClose"`
}
err = json.Unmarshal(wstestOut, &indexJSON)
if err != nil {
t.Fatalf("failed to unmarshal index.json: %v", err)
}

var failed bool
for _, tests := range indexJSON {
for test, result := range tests {
switch result.Behavior {
case "OK", "NON-STRICT", "INFORMATIONAL":
default:
failed = true
t.Errorf("test %v failed", test)
}
switch result.BehaviorClose {
case "OK", "INFORMATIONAL":
default:
failed = true
t.Errorf("bad close behaviour for test %v", test)
}
}
}

if failed {
path = strings.Replace(path, ".json", ".html", 1)
if os.Getenv("CI") == "" {
t.Errorf("wstest found failure, see %q (output as an artifact in CI)", path)
}
}
}
Loading