diff --git a/answer.go b/answer.go index b59b4f92..b1fd04d2 100644 --- a/answer.go +++ b/answer.go @@ -7,6 +7,7 @@ import ( "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/internal/str" + "zenhack.net/go/util/deferred" "zenhack.net/go/util/sync/mutex" ) @@ -135,7 +136,8 @@ func (p *Promise) Reject(e error) { // If e != nil, then this is equivalent to p.Reject(e). // Otherwise, it is equivalent to p.Fulfill(r). func (p *Promise) Resolve(r Ptr, e error) { - var shutdownPromises []*clientPromise + dq := &deferred.Queue{} + defer dq.Run() // It's ok to extract p.clients and use it while not holding the lock: // it may not be accessed in the pending resolution state, so we have @@ -155,8 +157,7 @@ func (p *Promise) Resolve(r Ptr, e error) { res := resolution{p.method, r, e} for path, cp := range clients { t := path.transform() - cp.promise.fulfill(res.client(t)) - shutdownPromises = append(shutdownPromises, cp.promise) + cp.promise.fulfill(dq, res.client(t)) cp.promise = nil } @@ -168,9 +169,6 @@ func (p *Promise) Resolve(r Ptr, e error) { } p.signals = nil }) - for _, promise := range shutdownPromises { - promise.shutdown() - } } // requireUnresolved is a helper method for checking for duplicate diff --git a/capability.go b/capability.go index fd684845..71275eef 100644 --- a/capability.go +++ b/capability.go @@ -11,7 +11,9 @@ import ( "capnproto.org/go/capnp/v3/exp/bufferpool" "capnproto.org/go/capnp/v3/flowcontrol" "capnproto.org/go/capnp/v3/internal/str" + "zenhack.net/go/util/deferred" "zenhack.net/go/util/maybe" + "zenhack.net/go/util/rc" "zenhack.net/go/util/sync/mutex" ) @@ -119,7 +121,7 @@ type client struct { type clientState struct { limiter flowcontrol.FlowLimiter - h *clientHook // nil if resolved to nil or released + cursor *rc.Ref[clientCursor] // never nil released bool stream struct { @@ -128,6 +130,59 @@ type clientState struct { } } +// clientCursor is an indirection pointing to a link in the resolution +// chain of clientHooks. Places that need to do path shortening should +// store one of these, rather than storing clientHook directly. +type clientCursor struct { + hook mutex.Mutex[*rc.Ref[clientHook]] // nil if resolved to nil or released +} + +func newClientCursor(hook clientHook) *rc.Ref[clientCursor] { + hookRef := rc.NewRefInPlace(func(h *clientHook) func() { + *h = hook + return h.Release + }) + return rc.NewRefInPlace(func(c *clientCursor) func() { + *c = clientCursor{hook: mutex.New(hookRef)} + return c.Release + }) +} + +// compress advances the hook referred to by this cursor as far +// as possible without blocking on a resolution. +func (c *clientCursor) compress() { + c.hook.With(func(hook **rc.Ref[clientHook]) { + for { + h := *hook + if h == nil { + return + } + res, ok := h.Value().resolution.Get() + if !ok { + return + } + l := res.Lock() + if !l.Value().isResolved() { + l.Unlock() + return + } + r := l.Value().resolvedHook + if r != nil { + r = r.AddRef() + } + l.Unlock() + h.Release() + *hook = r + } + }) +} + +func (c *clientCursor) Release() { + c.hook.With(func(hook **rc.Ref[clientHook]) { + (*hook).Release() + }) +} + // clientHook is a reference-counted wrapper for a ClientHook. // It is assumed that a clientHook's address uniquely identifies a hook, // since they are only created in NewClient and NewPromisedClient. @@ -139,18 +194,29 @@ type clientHook struct { // Place for callers to attach arbitrary metadata to the client. metadata Metadata - state mutex.Mutex[clientHookState] + // State of the promise's resolution. If this is absent, then + // this clientHook is not a promise. + resolution maybe.Maybe[*mutex.Mutex[resolveState]] } -type clientHookState struct { +func (h *clientHook) Release() { + h.Shutdown() + r, ok := h.resolution.Get() + if ok { + r.With(func(s *resolveState) { + if s.isResolved() { + s.resolvedHook.Release() + } + }) + } +} + +type resolveState struct { // resolved is closed after resolvedHook is set resolved chan struct{} - refs int // how many open Clients reference this clientHook - - // Valid only if resolved is closed. Absent if this - // was not a promise. - resolvedHook maybe.Maybe[*clientHook] + // Valid only if resolved is closed. + resolvedHook *rc.Ref[clientHook] } // NewClient creates the first reference to a capability. @@ -162,15 +228,11 @@ func NewClient(hook ClientHook) Client { if hook == nil { return Client{} } - h := &clientHook{ + h := clientHook{ ClientHook: hook, metadata: *NewMetadata(), - state: mutex.New(clientHookState{ - resolved: closedSignal, - refs: 1, - }), } - cs := mutex.New(clientState{h: h}) + cs := mutex.New(clientState{cursor: newClientCursor(h)}) c := Client{client: &client{state: cs}} setupLeakReporting(c) return c @@ -193,92 +255,51 @@ func newPromisedClient(hook ClientHook) (Client, *clientPromise) { if hook == nil { panic("NewPromisedClient(nil)") } - h := &clientHook{ + rs := mutex.New(resolveState{ + resolved: make(chan struct{}), + }) + cursor := newClientCursor(clientHook{ ClientHook: hook, metadata: *NewMetadata(), - state: mutex.New(clientHookState{ - resolved: make(chan struct{}), - refs: 1, - }), - } - cs := mutex.New(clientState{h: h}) + resolution: maybe.New(&rs), + }) + cs := mutex.New(clientState{cursor: cursor}) c := Client{client: &client{state: cs}} setupLeakReporting(c) - return c, &clientPromise{h: h} + return c, &clientPromise{cursor: cursor.Weak()} } // startCall holds onto a hook to prevent it from shutting down until // finish is called. It resolves the client's hook as much as possible // first. The caller must not be holding onto c.mu. -func (c Client) startCall() (hook ClientHook, resolved, released bool, finish func()) { - if c.client == nil { - return nil, true, false, func() {} - } - return mutex.With4(&c.state, func(c *clientState) (hook ClientHook, resolved, released bool, finish func()) { - if c.h == nil { - return nil, true, c.released, func() {} - } - l := c.h.state.Lock() - c.h, l = resolveHook(c.h, l) - if c.h == nil { - return nil, true, false, func() {} - } - l.Value().refs++ - isResolved := l.Value().isResolved() - l.Unlock() - savedHook := c.h - return savedHook.ClientHook, isResolved, false, func() { - shutdown := func() {} - savedHook.state.With(func(s *clientHookState) { - s.refs-- - if s.refs == 0 { - shutdown = savedHook.Shutdown - } - }) - shutdown() - } - }) -} - -func (c Client) peek() (hook *clientHook, resolved, released bool) { +func (c Client) startCall() (hook *rc.Ref[clientHook], resolved, released bool) { if c.client == nil { return nil, true, false } - return mutex.With3(&c.state, func(c *clientState) (hook *clientHook, resolved, released bool) { - if c.h == nil { + return mutex.With3(&c.state, func(c *clientState) (hook *rc.Ref[clientHook], resolved, released bool) { + if c.released || !c.cursor.IsValid() { return nil, true, c.released } - l := c.h.state.Lock() - c.h, l = resolveHook(c.h, l) - if c.h == nil { + c.cursor.Value().compress() + hook, ok := mutex.With2(&c.cursor.Value().hook, func(h **rc.Ref[clientHook]) (*rc.Ref[clientHook], bool) { + ret := *h + if ret.IsValid() { + return ret.AddRef(), true + } + return nil, false + }) + if !ok { return nil, true, false } - resolved = l.Value().isResolved() - l.Unlock() - return c.h, resolved, false - }) -} - -// resolveHook resolves h as much as possible without blocking. -// l must point to the state belonging to h. When resolveHook returns, -// l will be invalid. The returned Locked will point at the state of -// the returned clientHook if they are not nil. -func resolveHook(h *clientHook, l *mutex.Locked[clientHookState]) (*clientHook, *mutex.Locked[clientHookState]) { - for { - if !l.Value().isResolved() { - return h, l - } - r, ok := l.Value().resolvedHook.Get() + r, ok := hook.Value().resolution.Get() if !ok { - return h, l + return hook, true, false } - l.Unlock() - h = r - if h == nil { - return nil, nil - } - l = h.state.Lock() - } + resolved = mutex.With1(r, func(s *resolveState) bool { + return s.isResolved() + }) + return hook, resolved, false + }) } // Get the current flowcontrol.FlowLimiter used to manage flow control @@ -314,8 +335,8 @@ func (c Client) SetFlowLimiter(lim flowcontrol.FlowLimiter) { // This method respects the flow control policy configured with SetFlowLimiter; // it may block if the sender is sending too fast. func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { - h, _, released, finish := c.startCall() - defer finish() + h, _, released := c.startCall() + defer h.Release() if released { return ErrorAnswer(s.Method, errors.New("call on released client")), func() {} } @@ -351,7 +372,7 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { return err } - ans, rel := h.Send(ctx, s) + ans, rel := h.Value().Send(ctx, s) // FIXME: an earlier version of this code called StartMessage() from // within PlaceArgs -- but that can result in a deadlock, since it means // the client hook is holding a lock while we're waiting on the limiter. @@ -446,8 +467,8 @@ func (c Client) WaitStreaming() error { // Note that unlike SendCall, this method does *not* respect the flow // control policy configured with SetFlowLimiter. func (c Client) RecvCall(ctx context.Context, r Recv) PipelineCaller { - h, _, released, finish := c.startCall() - defer finish() + h, _, released := c.startCall() + defer h.Release() if released { r.Reject(errors.New("call on released client")) return nil @@ -456,14 +477,15 @@ func (c Client) RecvCall(ctx context.Context, r Recv) PipelineCaller { r.Reject(errors.New("call on null client")) return nil } - return h.Recv(ctx, r) + return h.Value().Recv(ctx, r) } // IsValid reports whether c is a valid reference to a capability. // A reference is invalid if it is nil, has resolved to null, or has // been released. func (c Client) IsValid() bool { - h, _, released := c.peek() + h, _, released := c.startCall() + defer h.Release() return !released && h != nil } @@ -472,15 +494,25 @@ func (c Client) IsValid() bool { // are not fully resolved: use Resolve if this is an issue. If either // c or c2 are released, then IsSame panics. func (c Client) IsSame(c2 Client) bool { - h1, _, released := c.peek() + h1, _, released := c.startCall() + defer h1.Release() if released { panic("IsSame on released client") } - h2, _, released := c2.peek() + h2, _, released := c2.startCall() + defer h2.Release() if released { panic("IsSame on released client") } - return h1 == h2 + valid1 := h1.IsValid() + valid2 := h2.IsValid() + if !valid1 && !valid2 { + return true + } + if !valid1 || !valid2 { + return false + } + return h1.Value() == h2.Value() } // Resolve blocks until the capability is fully resolved or the Context is Done. @@ -488,7 +520,8 @@ func (c Client) IsSame(c2 Client) bool { // if the capability resolves to an error. func (c Client) Resolve(ctx context.Context) error { for { - h, resolved, released := c.peek() + h, resolved, released := c.startCall() + defer h.Release() if released { return errors.New("cannot resolve released client") } @@ -497,7 +530,12 @@ func (c Client) Resolve(ctx context.Context) error { return nil } - resolvedCh := mutex.With1(&h.state, func(s *clientHookState) <-chan struct{} { + r, ok := h.Value().resolution.Get() + if !ok { + return nil + } + + resolvedCh := mutex.With1(r, func(s *resolveState) <-chan struct{} { return s.resolved }) @@ -515,22 +553,13 @@ func (c Client) AddRef() Client { if c.client == nil { return Client{} } + h, _, released := c.startCall() + defer h.Release() + if released { + panic("AddRef on released client") + } return mutex.With1(&c.state, func(c *clientState) Client { - if c.released { - panic("AddRef on released client") - } - if c.h == nil { - return Client{} - } - l := c.h.state.Lock() - c.h, l = resolveHook(c.h, l) - if c.h == nil { - return Client{} - } - l.Value().refs++ - l.Unlock() - cs := mutex.New(clientState{h: c.h}) - d := Client{client: &client{state: cs}} + d := Client{client: &client{state: mutex.New(clientState{cursor: c.cursor.AddRef()})}} setupLeakReporting(d) return d }) @@ -538,31 +567,28 @@ func (c Client) AddRef() Client { // WeakRef creates a new WeakClient that refers to the same capability // as c. If c is nil or has resolved to null, then WeakRef returns nil. -func (c Client) WeakRef() *WeakClient { - h, _, released := c.peek() - if released { - panic("WeakRef on released client") - } - if h == nil { - return nil - } - return &WeakClient{h: h} +func (c Client) WeakRef() WeakClient { + cursor := mutex.With1(&c.state, func(s *clientState) *rc.WeakRef[clientCursor] { + if s.released { + panic("WeakRef on released client") + } + return s.cursor.Weak() + }) + return WeakClient{r: cursor} } // State reads the current state of the client. It returns the zero // ClientState if c is nil, has resolved to null, or has been released. func (c Client) State() ClientState { - h, resolved, _, finish := c.startCall() - defer finish() + h, resolved, _ := c.startCall() + defer h.Release() if h == nil { return ClientState{} } return ClientState{ - Brand: h.Brand(), + Brand: h.Value().Brand(), IsPromise: !resolved, - Metadata: mutex.With1(&c.state, func(c *clientState) *Metadata { - return &c.h.metadata - }), + Metadata: &h.Value().metadata, } } @@ -594,29 +620,20 @@ func (c Client) String() string { if c.client == nil { return "" } - cl := c.state.Lock() - if cl.Value().released { - cl.Unlock() + h, resolved, released := c.startCall() + defer h.Release() + if released { return "" } - if cl.Value().h == nil { - cl.Unlock() - return "" - } - hl := cl.Value().h.state.Lock() - cl.Value().h, hl = resolveHook(cl.Value().h, hl) - if cl.Value().h == nil { - cl.Unlock() + if h == nil { return "" } var s string - if hl.Value().isResolved() { - s = "" + if resolved { + s = "" } else { - s = "" + s = "" } - hl.Unlock() - cl.Unlock() return s } @@ -630,30 +647,14 @@ func (c Client) Release() { if c.client == nil { return } - cl := c.state.Lock() - if cl.Value().released || cl.Value().h == nil { - cl.Unlock() - return - } - cl.Value().released = true - hl := cl.Value().h.state.Lock() - cl.Value().h, hl = resolveHook(cl.Value().h, hl) - if cl.Value().h == nil { - cl.Unlock() - return - } - h := cl.Value().h - cl.Value().h = nil - hl.Value().refs-- - if hl.Value().refs > 0 { - hl.Unlock() - cl.Unlock() - return - } - hl.Unlock() - cl.Unlock() - h.Shutdown() - c.GetFlowLimiter().Release() + limiter := c.GetFlowLimiter() + c.state.With(func(s *clientState) { + if !s.released { + s.released = true + s.cursor.Release() + limiter.Release() + } + }) } func (c Client) EncodeAsPtr(seg *Segment) Ptr { @@ -668,7 +669,7 @@ func (Client) DecodeFromPtr(p Ptr) Client { var _ TypeParam[Client] = Client{} // isResolve reports whether the clientHook s belongs to is resolved. -func (s *clientHookState) isResolved() bool { +func (s *resolveState) isResolved() bool { select { case <-s.resolved: return true @@ -706,7 +707,7 @@ func SetClientLeakFunc(clientLeakFunc func(msg string)) { // A ClientPromise resolves the identity of a client created by NewPromisedClient. type clientPromise struct { - h *clientHook + cursor *rc.WeakRef[clientCursor] } func (cp *clientPromise) Reject(err error) { @@ -720,83 +721,66 @@ func (cp *clientPromise) Reject(err error) { // hook may have been shut down earlier if the client ran out of // references. func (cp *clientPromise) Fulfill(c Client) { - cp.fulfill(c) - cp.shutdown() -} - -// shutdown waits for all outstanding calls on the hook to complete and -// references to be dropped, and then shuts down the hook. The caller -// must have previously invoked cp.fulfill(). -func (cp *clientPromise) shutdown() { - cp.h.Shutdown() + dq := &deferred.Queue{} + defer dq.Run() + cp.fulfill(dq, c) } // fulfill is like Fulfill, except that it does not wait for outsanding calls -// to return answers or shut down the underlying hook. -func (cp *clientPromise) fulfill(c Client) { +// to return answers or shut down the underlying hook; instead, it adds functions +// to do this to dq. +func (cp *clientPromise) fulfill(dq *deferred.Queue, c Client) { + cursor, ok := cp.cursor.AddRef() + if !ok { + return + } + dq.Defer(cursor.Release) + // Obtain next client hook. - var rh *clientHook + var rh *rc.Ref[clientHook] if (c != Client{}) { - c.state.With(func(c *clientState) { - if c.released { - panic("ClientPromise.Fulfill with a released client") - } - // TODO(maybe): c.h = resolveHook(c.h) - rh = c.h - }) + h, _, released := c.startCall() + if released { + panic("ClientPromise.Fulfill with a released client") + } + rh = h } // Mark hook as resolved. - l := cp.h.state.Lock() - if l.Value().isResolved() { - l.Unlock() - panic("ClientPromise.Fulfill called more than once") - } - l.Value().resolvedHook = maybe.New(rh) - close(l.Value().resolved) - refs := l.Value().refs - l.Value().refs = 0 - if refs == 0 { - l.Unlock() - return - } - - rh, l = resolveHook(cp.h, l) // swaps mutex on cp.h for mutex on rh - if rh != nil { - l.Value().refs += refs - l.Unlock() - } + cursor.Value().hook.With(func(h **rc.Ref[clientHook]) { + r, ok := (*h).Value().resolution.Get() + if !ok { + panic("BUG: clientPromise referred to a clientHook that was not a promise") + } + r.With(func(s *resolveState) { + if s.isResolved() { + panic("ClientPromise.Fulfill called more than once") + } + s.resolvedHook = rh + close(s.resolved) + }) + }) + cursor.Value().compress() } // A WeakClient is a weak reference to a capability: it refers to a // capability without preventing it from being shut down. The zero // value is a null reference. type WeakClient struct { - h *clientHook + r *rc.WeakRef[clientCursor] } // AddRef creates a new Client that refers to the same capability as c // as long as the capability hasn't already been shut down. -func (wc *WeakClient) AddRef() (c Client, ok bool) { - if wc == nil { - return Client{}, true - } - if wc.h == nil { +func (wc WeakClient) AddRef() (c Client, ok bool) { + if wc.r == nil { return Client{}, true } - l := wc.h.state.Lock() - wc.h, l = resolveHook(wc.h, l) - if wc.h == nil { - return Client{}, true - } - if l.Value().refs == 0 { - l.Unlock() + cursor, ok := wc.r.AddRef() + if !ok { return Client{}, false } - l.Value().refs++ - l.Unlock() - cs := mutex.New(clientState{h: wc.h}) - c = Client{client: &client{state: cs}} + c = Client{client: &client{state: mutex.New(clientState{cursor: cursor})}} setupLeakReporting(c) return c, true } @@ -993,15 +977,11 @@ func ErrorClient(e error) Client { } // Avoid NewClient because it can set a finalizer. - h := &clientHook{ + h := clientHook{ ClientHook: errorClient{e}, metadata: *NewMetadata(), - state: mutex.New(clientHookState{ - resolved: closedSignal, - refs: 1, - }), } - cs := mutex.New(clientState{h: h}) + cs := mutex.New(clientState{cursor: newClientCursor(h)}) return Client{client: &client{state: cs}} } diff --git a/capability_test.go b/capability_test.go index 24944244..0fc2d131 100644 --- a/capability_test.go +++ b/capability_test.go @@ -4,8 +4,11 @@ import ( "bytes" "context" "errors" + "fmt" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestClient(t *testing.T) { @@ -202,7 +205,7 @@ func TestPromisedClient(t *testing.T) { finish() if !ca.IsSame(cb) { - t.Error("after resolution, ca != cb") + t.Errorf("after resolution, ca != cb (%v vs. %v)", ca, cb) } state = ca.State() if state.Brand.Value != int(222) { @@ -269,7 +272,10 @@ type dummyHook struct { } func (dh *dummyHook) String() string { - return "&dummyHook{}" + return fmt.Sprintf( + "&dummyHook{calls: %v, brand: %v, shutdowns: %v}", + dh.calls, dh.brand, dh.shutdowns, + ) } func (dh *dummyHook) Send(_ context.Context, s Send) (*Answer, ReleaseFunc) { @@ -596,20 +602,12 @@ func TestWeakPromisedClient(t *testing.T) { defer ca.Release() cb2, ok := wa.AddRef() defer cb2.Release() - if !ok { - t.Error("wa.AddRef() failed after releasing ca") - } - if !cb.IsSame(cb2) { - t.Error("cb != cb2") - } + assert.False(t, ok, "wa.AddRef() failed after releasing ca") + assert.False(t, cb.IsSame(cb2), "cb != cb2") + cb.Release() - defer cb.Release() - if b.shutdowns > 0 { - t.Error("b shut down before cb2.Release") - } - cb2.Release() if b.shutdowns == 0 { - t.Error("b not shut down after cb2.Release") + t.Error("b not shut down after cb.Release") } } diff --git a/go.mod b/go.mod index a2b85986..46eaa4f6 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/tinylib/msgp v1.1.5 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 - zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 + zenhack.net/go/util v0.0.0-20230414211804-99ae9bf14f02 ) require ( diff --git a/go.sum b/go.sum index a57a0ee2..4f790f48 100644 --- a/go.sum +++ b/go.sum @@ -54,3 +54,5 @@ zenhack.net/go/util v0.0.0-20230327231740-da8cb323921c h1:L+T38E+u91e956ykUrYKHZ zenhack.net/go/util v0.0.0-20230327231740-da8cb323921c/go.mod h1:0lafdGg7tDb7RcXASgmJmRbLFLkAxu328+KGIs7icDE= zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 h1:yksDCGMVzyn3vlyf0GZ3huiF5FFaMGQpQ3UJvR0EoGA= zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5/go.mod h1:1LtNdPAs8WH+BTcQiZAOo2MIKD/5jyK/u7sZ9ZPe5SE= +zenhack.net/go/util v0.0.0-20230414211804-99ae9bf14f02 h1:0iYx7hqltFcLvxDeCdg84gEJp8BJ6SgGti431U2ztIU= +zenhack.net/go/util v0.0.0-20230414211804-99ae9bf14f02/go.mod h1:1LtNdPAs8WH+BTcQiZAOo2MIKD/5jyK/u7sZ9ZPe5SE= diff --git a/rpc/answer.go b/rpc/answer.go index b5ab5c96..362070be 100644 --- a/rpc/answer.go +++ b/rpc/answer.go @@ -292,6 +292,7 @@ func (ans *ansent) completeSendReturn(dq *deferred.Queue) error { } else { ans.promise.Resolve(ans.returner.results.Content()) } + dq.Defer(ans.promise.ReleaseClients) ans.promise = nil } ans.sendMsg() @@ -338,6 +339,7 @@ func (ans *ansent) completeSendException(dq *deferred.Queue) { if ans.promise != nil { ans.promise.Reject(ex) + dq.Defer(ans.promise.ReleaseClients) ans.promise = nil } if ans.sendMsg != nil { diff --git a/rpc/import.go b/rpc/import.go index b99b4393..0d6288ec 100644 --- a/rpc/import.go +++ b/rpc/import.go @@ -16,7 +16,7 @@ type importID uint32 // impent is an entry in the import table. All fields are protected by // Conn.mu. type impent struct { - wc *capnp.WeakClient + wc capnp.WeakClient // wireRefs is the number of times that the importID has appeared in // messages received from the remote vat. Used to populate the diff --git a/rpc/level0_test.go b/rpc/level0_test.go index a103274a..901d929e 100644 --- a/rpc/level0_test.go +++ b/rpc/level0_test.go @@ -1360,7 +1360,7 @@ func TestRecvBootstrapPipelineCall(t *testing.T) { // TestDuplicateBootstrap calls Bootstrap twice on the same connection, // and verifies that the results are the same. -func TestDuplicateBoostrap(t *testing.T) { +func TestDuplicateBootstrap(t *testing.T) { t.Parallel() left, right := transport.NewPipe(1)