From 440e96a6304f0cd0bc0b533c9c71ea08c15aa396 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Thu, 27 Apr 2023 00:14:10 -0400 Subject: [PATCH 01/27] clientPromise.fulfill: use a deferred.Queue This makes the implementation of Promise.Resolve a bit cleaner. --- answer.go | 12 +++++------- capability.go | 13 +++++++++---- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/answer.go b/answer.go index c3a7343b..22357f3c 100644 --- a/answer.go +++ b/answer.go @@ -7,6 +7,7 @@ import ( "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/internal/str" + "zenhack.net/go/util/deferred" "zenhack.net/go/util/sync/mutex" ) @@ -157,9 +158,10 @@ func (p *Promise) Reject(e error) { // If e != nil, then this is equivalent to p.Reject(e). // Otherwise, it is equivalent to p.Fulfill(r). func (p *Promise) Resolve(r Ptr, e error) { - var ( - shutdownPromises []*clientPromise + dq := &deferred.Queue{} + defer dq.Run() + var ( // We need to access some of these fields from p.state while // not holding the lock, so we store them here while holding it. // p.clients cannot be touched in the pending resolution state, @@ -190,8 +192,7 @@ func (p *Promise) Resolve(r Ptr, e error) { res := resolution{p.method, r, e} for path, cp := range clients { t := path.transform() - cp.promise.fulfill(res.client(t)) - shutdownPromises = append(shutdownPromises, cp.promise) + cp.promise.fulfill(dq, res.client(t)) cp.promise = nil } if callsStopped != nil { @@ -207,9 +208,6 @@ func (p *Promise) Resolve(r Ptr, e error) { } p.signals = nil }) - for _, promise := range shutdownPromises { - promise.shutdown() - } } // requireUnresolved is a helper method for checking for duplicate diff --git a/capability.go b/capability.go index fcbbe57c..d9532452 100644 --- a/capability.go +++ b/capability.go @@ -11,6 +11,7 @@ import ( "capnproto.org/go/capnp/v3/exp/bufferpool" "capnproto.org/go/capnp/v3/flowcontrol" "capnproto.org/go/capnp/v3/internal/str" + "zenhack.net/go/util/deferred" "zenhack.net/go/util/maybe" "zenhack.net/go/util/sync/mutex" ) @@ -720,8 +721,9 @@ func (cp *clientPromise) Reject(err error) { // hook may have been shut down earlier if the client ran out of // references. func (cp *clientPromise) Fulfill(c Client) { - cp.fulfill(c) - cp.shutdown() + dq := &deferred.Queue{} + defer dq.Run() + cp.fulfill(dq, c) } // shutdown waits for all outstanding calls on the hook to complete and @@ -732,8 +734,11 @@ func (cp *clientPromise) shutdown() { } // fulfill is like Fulfill, except that it does not wait for outsanding calls -// to return answers or shut down the underlying hook. -func (cp *clientPromise) fulfill(c Client) { +// to return answers or shut down the underlying hook; instead, it adds functions +// to do this to dq. +func (cp *clientPromise) fulfill(dq *deferred.Queue, c Client) { + dq.Defer(cp.shutdown) + // Obtain next client hook. var rh *clientHook if (c != Client{}) { From 2b31fa23d19ac2fa2af80b1c137dc98ef96b544f Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Thu, 27 Apr 2023 18:54:02 -0400 Subject: [PATCH 02/27] Move the responsibility for resolveHook onto a new type ...called clientCursor. Step 1 in a broader refactoring. --- capability.go | 141 +++++++++++++++++++++++++++++--------------------- 1 file changed, 82 insertions(+), 59 deletions(-) diff --git a/capability.go b/capability.go index 22b90f70..e04f2c42 100644 --- a/capability.go +++ b/capability.go @@ -120,7 +120,7 @@ type client struct { type clientState struct { limiter flowcontrol.FlowLimiter - h *clientHook // nil if resolved to nil or released + cursor *clientCursor // never nil released bool stream struct { @@ -129,6 +129,55 @@ type clientState struct { } } +// clientCursor is an indirection pointing to a link in the resolution +// chain of clientHooks. Places that need to do path shortening should +// store one of these, rather than storing clientHook directly. +// +// TODO: we want to move towards sharing one clientCursor (protected +// by a mutex) between all Clients and the like, to allow us to +// shorten from afar, rather than waiting for methods to be called +// on the relevant Client. +type clientCursor struct { + hook *clientHook // nil if resolved to nil or released +} + +// compress advances the hook referred to by this cursor as far +// as possible without blocking on a resolution. +func (c *clientCursor) compress() { + if c.hook == nil { + return + } + l := c.hook.state.Lock() + c.hook, l = resolveHook(c.hook, l) + if c.hook != nil { + l.Unlock() + } +} + +// resolveHook is a helper for clientCursor.compress. It resolves h as much +// as possible without blocking. +// +// l must point to the state belonging to h. When resolveHook returns, +// l will be invalid. The returned Locked will point at the state of +// the returned clientHook if they are not nil. +func resolveHook(h *clientHook, l *mutex.Locked[clientHookState]) (*clientHook, *mutex.Locked[clientHookState]) { + for { + if !l.Value().isResolved() { + return h, l + } + r, ok := l.Value().resolvedHook.Get() + if !ok { + return h, l + } + l.Unlock() + h = r + if h == nil { + return nil, nil + } + l = h.state.Lock() + } +} + // clientHook is a reference-counted wrapper for a ClientHook. // It is assumed that a clientHook's address uniquely identifies a hook, // since they are only created in NewClient and NewPromisedClient. @@ -171,7 +220,7 @@ func NewClient(hook ClientHook) Client { refs: 1, }), } - cs := mutex.New(clientState{h: h}) + cs := mutex.New(clientState{cursor: &clientCursor{hook: h}}) c := Client{client: &client{state: cs}} setupLeakReporting(c) return c @@ -202,7 +251,7 @@ func newPromisedClient(hook ClientHook) (Client, *clientPromise) { refs: 1, }), } - cs := mutex.New(clientState{h: h}) + cs := mutex.New(clientState{cursor: &clientCursor{hook: h}}) c := Client{client: &client{state: cs}} setupLeakReporting(c) return c, &clientPromise{h: h} @@ -216,18 +265,18 @@ func (c Client) startCall() (hook ClientHook, resolved, released bool, finish fu return nil, true, false, func() {} } return mutex.With4(&c.state, func(c *clientState) (hook ClientHook, resolved, released bool, finish func()) { - if c.h == nil { + if c.cursor.hook == nil { return nil, true, c.released, func() {} } - l := c.h.state.Lock() - c.h, l = resolveHook(c.h, l) - if c.h == nil { + c.cursor.compress() + l := c.cursor.hook.state.Lock() + if c.cursor.hook == nil { return nil, true, false, func() {} } l.Value().refs++ isResolved := l.Value().isResolved() l.Unlock() - savedHook := c.h + savedHook := c.cursor.hook return savedHook.ClientHook, isResolved, false, func() { shutdown := func() {} savedHook.state.With(func(s *clientHookState) { @@ -246,42 +295,20 @@ func (c Client) peek() (hook *clientHook, released bool, resolved bool) { return nil, false, true } return mutex.With3(&c.state, func(c *clientState) (hook *clientHook, released bool, resolved bool) { - if c.h == nil { + if c.cursor.hook == nil { return nil, c.released, true } - l := c.h.state.Lock() - c.h, l = resolveHook(c.h, l) - if c.h == nil { + c.cursor.compress() + if c.cursor.hook == nil { return nil, false, true } + l := c.cursor.hook.state.Lock() resolved = l.Value().isResolved() l.Unlock() - return c.h, false, resolved + return c.cursor.hook, false, resolved }) } -// resolveHook resolves h as much as possible without blocking. -// l must point to the state belonging to h. When resolveHook returns, -// l will be invalid. The returned Locked will point at the state of -// the returned clientHook if they are not nil. -func resolveHook(h *clientHook, l *mutex.Locked[clientHookState]) (*clientHook, *mutex.Locked[clientHookState]) { - for { - if !l.Value().isResolved() { - return h, l - } - r, ok := l.Value().resolvedHook.Get() - if !ok { - return h, l - } - l.Unlock() - h = r - if h == nil { - return nil, nil - } - l = h.state.Lock() - } -} - // Get the current flowcontrol.FlowLimiter used to manage flow control // for this client. func (c Client) GetFlowLimiter() flowcontrol.FlowLimiter { @@ -520,17 +547,17 @@ func (c Client) AddRef() Client { if c.released { panic("AddRef on released client") } - if c.h == nil { + if c.cursor.hook == nil { return Client{} } - l := c.h.state.Lock() - c.h, l = resolveHook(c.h, l) - if c.h == nil { + c.cursor.compress() + if c.cursor.hook == nil { return Client{} } + l := c.cursor.hook.state.Lock() l.Value().refs++ l.Unlock() - cs := mutex.New(clientState{h: c.h}) + cs := mutex.New(clientState{cursor: &clientCursor{hook: c.cursor.hook}}) d := Client{client: &client{state: cs}} setupLeakReporting(d) return d @@ -562,7 +589,7 @@ func (c Client) State() ClientState { Brand: h.Brand(), IsPromise: !resolved, Metadata: mutex.With1(&c.state, func(c *clientState) *Metadata { - return &c.h.metadata + return &c.cursor.hook.metadata }), } } @@ -600,21 +627,21 @@ func (c Client) String() string { cl.Unlock() return "" } - if cl.Value().h == nil { + if cl.Value().cursor.hook == nil { cl.Unlock() return "" } - hl := cl.Value().h.state.Lock() - cl.Value().h, hl = resolveHook(cl.Value().h, hl) - if cl.Value().h == nil { + cl.Value().cursor.compress() + hl := cl.Value().cursor.hook.state.Lock() + if cl.Value().cursor.hook == nil { cl.Unlock() return "" } var s string if hl.Value().isResolved() { - s = "" + s = "" } else { - s = "" + s = "" } hl.Unlock() cl.Unlock() @@ -632,19 +659,15 @@ func (c Client) Release() { return } cl := c.state.Lock() - if cl.Value().released || cl.Value().h == nil { + cl.Value().cursor.compress() + if cl.Value().released || cl.Value().cursor.hook == nil { cl.Unlock() return } cl.Value().released = true - hl := cl.Value().h.state.Lock() - cl.Value().h, hl = resolveHook(cl.Value().h, hl) - if cl.Value().h == nil { - cl.Unlock() - return - } - h := cl.Value().h - cl.Value().h = nil + h := cl.Value().cursor.hook + hl := h.state.Lock() + cl.Value().cursor.hook = nil hl.Value().refs-- if hl.Value().refs > 0 { hl.Unlock() @@ -747,7 +770,7 @@ func (cp *clientPromise) fulfill(dq *deferred.Queue, c Client) { panic("ClientPromise.Fulfill with a released client") } // TODO(maybe): c.h = resolveHook(c.h) - rh = c.h + rh = c.cursor.hook }) } @@ -800,7 +823,7 @@ func (wc *WeakClient) AddRef() (c Client, ok bool) { } l.Value().refs++ l.Unlock() - cs := mutex.New(clientState{h: wc.h}) + cs := mutex.New(clientState{cursor: &clientCursor{hook: wc.h}}) c = Client{client: &client{state: cs}} setupLeakReporting(c) return c, true @@ -1006,7 +1029,7 @@ func ErrorClient(e error) Client { refs: 1, }), } - cs := mutex.New(clientState{h: h}) + cs := mutex.New(clientState{cursor: &clientCursor{hook: h}}) return Client{client: &client{state: cs}} } From 86e7e4a3762adf7f646185130a49b6fc4ea25e16 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Thu, 27 Apr 2023 18:55:59 -0400 Subject: [PATCH 03/27] Factor clientHook deref into its own function. --- capability.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/capability.go b/capability.go index e04f2c42..e6b42298 100644 --- a/capability.go +++ b/capability.go @@ -192,6 +192,17 @@ type clientHook struct { state mutex.Mutex[clientHookState] } +func (h *clientHook) deref() { + shutdown := func() {} + h.state.With(func(s *clientHookState) { + s.refs-- + if s.refs == 0 { + shutdown = h.Shutdown + } + }) + shutdown() +} + type clientHookState struct { // resolved is closed after resolvedHook is set resolved chan struct{} @@ -277,16 +288,7 @@ func (c Client) startCall() (hook ClientHook, resolved, released bool, finish fu isResolved := l.Value().isResolved() l.Unlock() savedHook := c.cursor.hook - return savedHook.ClientHook, isResolved, false, func() { - shutdown := func() {} - savedHook.state.With(func(s *clientHookState) { - s.refs-- - if s.refs == 0 { - shutdown = savedHook.Shutdown - } - }) - shutdown() - } + return savedHook.ClientHook, isResolved, false, savedHook.deref }) } From a6a1e7e7afdcb8bcb9d9a17a64eca48ffd0522d4 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 29 Apr 2023 08:48:13 -0400 Subject: [PATCH 04/27] WIP: rework internal refcounting in clients. We now use the rc package. There are some test failures that need to be pinned down. --- capability.go | 313 +++++++++++++++++++++----------------------------- 1 file changed, 132 insertions(+), 181 deletions(-) diff --git a/capability.go b/capability.go index 42b39c4e..a8ac8c87 100644 --- a/capability.go +++ b/capability.go @@ -13,6 +13,7 @@ import ( "capnproto.org/go/capnp/v3/internal/str" "zenhack.net/go/util/deferred" "zenhack.net/go/util/maybe" + "zenhack.net/go/util/rc" "zenhack.net/go/util/sync/mutex" ) @@ -120,7 +121,7 @@ type client struct { type clientState struct { limiter flowcontrol.FlowLimiter - cursor *clientCursor // never nil + cursor *rc.Ref[mutex.Mutex[clientCursor]] // nil iff this is an initially-nil client released bool stream struct { @@ -138,7 +139,23 @@ type clientState struct { // shorten from afar, rather than waiting for methods to be called // on the relevant Client. type clientCursor struct { - hook *clientHook // nil if resolved to nil or released + hook *rc.Ref[clientHook] // nil if resolved to nil or released +} + +func newClientCursor(hook clientHook) *rc.Ref[mutex.Mutex[clientCursor]] { + return rc.NewRefInPlace(func(c *mutex.Mutex[clientCursor]) func() { + c.With(func(c *clientCursor) { + c.hook = rc.NewRefInPlace(func(h *clientHook) func() { + *h = hook + return h.Shutdown + }) + }) + return func() { + c.With(func(c *clientCursor) { + c.hook.Release() + }) + } + }) } // compress advances the hook referred to by this cursor as far @@ -147,7 +164,7 @@ func (c *clientCursor) compress() { if c.hook == nil { return } - l := c.hook.state.Lock() + l := c.hook.Value().state.Lock() c.hook, l = resolveHook(c.hook, l) if c.hook != nil { l.Unlock() @@ -160,7 +177,7 @@ func (c *clientCursor) compress() { // l must point to the state belonging to h. When resolveHook returns, // l will be invalid. The returned Locked will point at the state of // the returned clientHook if they are not nil. -func resolveHook(h *clientHook, l *mutex.Locked[clientHookState]) (*clientHook, *mutex.Locked[clientHookState]) { +func resolveHook(h *rc.Ref[clientHook], l *mutex.Locked[clientHookState]) (*rc.Ref[clientHook], *mutex.Locked[clientHookState]) { for { if !l.Value().isResolved() { return h, l @@ -174,7 +191,7 @@ func resolveHook(h *clientHook, l *mutex.Locked[clientHookState]) (*clientHook, if h == nil { return nil, nil } - l = h.state.Lock() + l = h.Value().state.Lock() } } @@ -192,26 +209,13 @@ type clientHook struct { state mutex.Mutex[clientHookState] } -func (h *clientHook) deref() { - shutdown := func() {} - h.state.With(func(s *clientHookState) { - s.refs-- - if s.refs == 0 { - shutdown = h.Shutdown - } - }) - shutdown() -} - type clientHookState struct { // resolved is closed after resolvedHook is set resolved chan struct{} - refs int // how many open Clients reference this clientHook - // Valid only if resolved is closed. Absent if this // was not a promise. - resolvedHook maybe.Maybe[*clientHook] + resolvedHook maybe.Maybe[*rc.Ref[clientHook]] } // NewClient creates the first reference to a capability. @@ -223,15 +227,14 @@ func NewClient(hook ClientHook) Client { if hook == nil { return Client{} } - h := &clientHook{ + h := clientHook{ ClientHook: hook, metadata: *NewMetadata(), state: mutex.New(clientHookState{ resolved: closedSignal, - refs: 1, }), } - cs := mutex.New(clientState{cursor: &clientCursor{hook: h}}) + cs := mutex.New(clientState{cursor: newClientCursor(h)}) c := Client{client: &client{state: cs}} setupLeakReporting(c) return c @@ -254,60 +257,45 @@ func newPromisedClient(hook ClientHook) (Client, *clientPromise) { if hook == nil { panic("NewPromisedClient(nil)") } - h := &clientHook{ + h := clientHook{ ClientHook: hook, metadata: *NewMetadata(), state: mutex.New(clientHookState{ resolved: make(chan struct{}), - refs: 1, }), } - cs := mutex.New(clientState{cursor: &clientCursor{hook: h}}) + cursor := newClientCursor(h) + cs := mutex.New(clientState{cursor: newClientCursor(h)}) c := Client{client: &client{state: cs}} setupLeakReporting(c) - return c, &clientPromise{h: h} + return c, &clientPromise{cursor: cursor.Weak()} } // startCall holds onto a hook to prevent it from shutting down until // finish is called. It resolves the client's hook as much as possible // first. The caller must not be holding onto c.mu. -func (c Client) startCall() (hook ClientHook, resolved, released bool, finish func()) { - if c.client == nil { - return nil, true, false, func() {} - } - return mutex.With4(&c.state, func(c *clientState) (hook ClientHook, resolved, released bool, finish func()) { - if c.cursor.hook == nil { - return nil, true, c.released, func() {} - } - c.cursor.compress() - l := c.cursor.hook.state.Lock() - if c.cursor.hook == nil { - return nil, true, false, func() {} - } - l.Value().refs++ - isResolved := l.Value().isResolved() - l.Unlock() - savedHook := c.cursor.hook - return savedHook.ClientHook, isResolved, false, savedHook.deref - }) -} - -func (c Client) peek() (hook *clientHook, resolved, released bool) { +func (c Client) startCall() (hook *rc.Ref[clientHook], resolved, released bool) { if c.client == nil { return nil, true, false } - return mutex.With3(&c.state, func(c *clientState) (hook *clientHook, released bool, resolved bool) { - if c.cursor.hook == nil { + return mutex.With3(&c.state, func(c *clientState) (hook *rc.Ref[clientHook], resolved, released bool) { + if c.released || !c.cursor.IsValid() { return nil, true, c.released } - c.cursor.compress() - if c.cursor.hook == nil { + hook, ok := mutex.With2(c.cursor.Value(), func(c *clientCursor) (*rc.Ref[clientHook], bool) { + c.compress() + if c.hook.IsValid() { + return c.hook.AddRef(), true + } + return nil, false + }) + if !ok { return nil, true, false } - l := c.cursor.hook.state.Lock() - resolved = l.Value().isResolved() - l.Unlock() - return c.cursor.hook, resolved, false + resolved = mutex.With1(&hook.Value().state, func(s *clientHookState) bool { + return s.isResolved() + }) + return hook, resolved, false }) } @@ -344,8 +332,8 @@ func (c Client) SetFlowLimiter(lim flowcontrol.FlowLimiter) { // This method respects the flow control policy configured with SetFlowLimiter; // it may block if the sender is sending too fast. func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { - h, _, released, finish := c.startCall() - defer finish() + h, _, released := c.startCall() + defer h.Release() if released { return ErrorAnswer(s.Method, errors.New("call on released client")), func() {} } @@ -381,7 +369,7 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { return err } - ans, rel := h.Send(ctx, s) + ans, rel := h.Value().Send(ctx, s) // FIXME: an earlier version of this code called StartMessage() from // within PlaceArgs -- but that can result in a deadlock, since it means // the client hook is holding a lock while we're waiting on the limiter. @@ -476,8 +464,8 @@ func (c Client) WaitStreaming() error { // Note that unlike SendCall, this method does *not* respect the flow // control policy configured with SetFlowLimiter. func (c Client) RecvCall(ctx context.Context, r Recv) PipelineCaller { - h, _, released, finish := c.startCall() - defer finish() + h, _, released := c.startCall() + defer h.Release() if released { r.Reject(errors.New("call on released client")) return nil @@ -486,14 +474,15 @@ func (c Client) RecvCall(ctx context.Context, r Recv) PipelineCaller { r.Reject(errors.New("call on null client")) return nil } - return h.Recv(ctx, r) + return h.Value().Recv(ctx, r) } // IsValid reports whether c is a valid reference to a capability. // A reference is invalid if it is nil, has resolved to null, or has // been released. func (c Client) IsValid() bool { - h, _, released := c.peek() + h, _, released := c.startCall() + defer h.Release() return !released && h != nil } @@ -502,15 +491,23 @@ func (c Client) IsValid() bool { // are not fully resolved: use Resolve if this is an issue. If either // c or c2 are released, then IsSame panics. func (c Client) IsSame(c2 Client) bool { - h1, _, released := c.peek() + h1, _, released := c.startCall() + defer h1.Release() if released { panic("IsSame on released client") } - h2, _, released := c2.peek() + h2, _, released := c2.startCall() + defer h2.Release() if released { panic("IsSame on released client") } - return h1 == h2 + if !h1.IsValid() && !h2.IsValid() { + return true + } + if !h1.IsValid() || !h2.IsValid() { + return false + } + return h1.Value() == h2.Value() } // Resolve blocks until the capability is fully resolved or the Context is Done. @@ -518,7 +515,8 @@ func (c Client) IsSame(c2 Client) bool { // if the capability resolves to an error. func (c Client) Resolve(ctx context.Context) error { for { - h, resolved, released := c.peek() + h, resolved, released := c.startCall() + defer h.Release() if released { return errors.New("cannot resolve released client") } @@ -527,7 +525,7 @@ func (c Client) Resolve(ctx context.Context) error { return nil } - resolvedCh := mutex.With1(&h.state, func(s *clientHookState) <-chan struct{} { + resolvedCh := mutex.With1(&h.Value().state, func(s *clientHookState) <-chan struct{} { return s.resolved }) @@ -545,22 +543,13 @@ func (c Client) AddRef() Client { if c.client == nil { return Client{} } + h, _, released := c.startCall() + defer h.Release() + if released { + panic("AddRef on released client") + } return mutex.With1(&c.state, func(c *clientState) Client { - if c.released { - panic("AddRef on released client") - } - if c.cursor.hook == nil { - return Client{} - } - c.cursor.compress() - if c.cursor.hook == nil { - return Client{} - } - l := c.cursor.hook.state.Lock() - l.Value().refs++ - l.Unlock() - cs := mutex.New(clientState{cursor: &clientCursor{hook: c.cursor.hook}}) - d := Client{client: &client{state: cs}} + d := Client{client: &client{state: mutex.New(clientState{cursor: c.cursor.AddRef()})}} setupLeakReporting(d) return d }) @@ -569,30 +558,27 @@ func (c Client) AddRef() Client { // WeakRef creates a new WeakClient that refers to the same capability // as c. If c is nil or has resolved to null, then WeakRef returns nil. func (c Client) WeakRef() *WeakClient { - h, _, released := c.peek() - if released { - panic("WeakRef on released client") - } - if h == nil { - return nil - } - return &WeakClient{h: h} + cursor := mutex.With1(&c.state, func(s *clientState) *rc.WeakRef[mutex.Mutex[clientCursor]] { + if s.released { + panic("WeakRef on released client") + } + return s.cursor.Weak() + }) + return &WeakClient{r: cursor} } // State reads the current state of the client. It returns the zero // ClientState if c is nil, has resolved to null, or has been released. func (c Client) State() ClientState { - h, resolved, _, finish := c.startCall() - defer finish() + h, resolved, _ := c.startCall() + defer h.Release() if h == nil { return ClientState{} } return ClientState{ - Brand: h.Brand(), + Brand: h.Value().Brand(), IsPromise: !resolved, - Metadata: mutex.With1(&c.state, func(c *clientState) *Metadata { - return &c.cursor.hook.metadata - }), + Metadata: &h.Value().metadata, } } @@ -624,29 +610,20 @@ func (c Client) String() string { if c.client == nil { return "" } - cl := c.state.Lock() - if cl.Value().released { - cl.Unlock() + h, resolved, released := c.startCall() + defer h.Release() + if released { return "" } - if cl.Value().cursor.hook == nil { - cl.Unlock() - return "" - } - cl.Value().cursor.compress() - hl := cl.Value().cursor.hook.state.Lock() - if cl.Value().cursor.hook == nil { - cl.Unlock() + if h == nil { return "" } var s string - if hl.Value().isResolved() { - s = "" + if resolved { + s = "" } else { - s = "" + s = "" } - hl.Unlock() - cl.Unlock() return s } @@ -660,26 +637,13 @@ func (c Client) Release() { if c.client == nil { return } - cl := c.state.Lock() - cl.Value().cursor.compress() - if cl.Value().released || cl.Value().cursor.hook == nil { - cl.Unlock() - return - } - cl.Value().released = true - h := cl.Value().cursor.hook - hl := h.state.Lock() - cl.Value().cursor.hook = nil - hl.Value().refs-- - if hl.Value().refs > 0 { - hl.Unlock() - cl.Unlock() - return - } - hl.Unlock() - cl.Unlock() - h.Shutdown() - c.GetFlowLimiter().Release() + limiter := c.GetFlowLimiter() + c.state.With(func(s *clientState) { + if s.cursor.IsValid() { + s.cursor.Release() + limiter.Release() + } + }) } func (c Client) EncodeAsPtr(seg *Segment) Ptr { @@ -732,7 +696,7 @@ func SetClientLeakFunc(clientLeakFunc func(msg string)) { // A ClientPromise resolves the identity of a client created by NewPromisedClient. type clientPromise struct { - h *clientHook + cursor *rc.WeakRef[mutex.Mutex[clientCursor]] } func (cp *clientPromise) Reject(err error) { @@ -751,82 +715,70 @@ func (cp *clientPromise) Fulfill(c Client) { cp.fulfill(dq, c) } +/* // shutdown waits for all outstanding calls on the hook to complete and // references to be dropped, and then shuts down the hook. The caller // must have previously invoked cp.fulfill(). func (cp *clientPromise) shutdown() { cp.h.Shutdown() } +*/ // fulfill is like Fulfill, except that it does not wait for outsanding calls // to return answers or shut down the underlying hook; instead, it adds functions // to do this to dq. func (cp *clientPromise) fulfill(dq *deferred.Queue, c Client) { - dq.Defer(cp.shutdown) + // dq.Defer(cp.shutdown) + + dq.Defer(c.Release) + cursor, ok := cp.cursor.AddRef() + if !ok { + return + } + dq.Defer(cursor.Release) // Obtain next client hook. - var rh *clientHook + var rh *rc.Ref[clientHook] if (c != Client{}) { - c.state.With(func(c *clientState) { - if c.released { - panic("ClientPromise.Fulfill with a released client") - } - // TODO(maybe): c.h = resolveHook(c.h) - rh = c.cursor.hook - }) + h, _, released := c.startCall() + if released { + panic("ClientPromise.Fulfill with a released client") + } + rh = h } // Mark hook as resolved. - l := cp.h.state.Lock() - if l.Value().isResolved() { - l.Unlock() - panic("ClientPromise.Fulfill called more than once") - } - l.Value().resolvedHook = maybe.New(rh) - close(l.Value().resolved) - refs := l.Value().refs - l.Value().refs = 0 - if refs == 0 { - l.Unlock() - return - } - - rh, l = resolveHook(cp.h, l) // swaps mutex on cp.h for mutex on rh - if rh != nil { - l.Value().refs += refs - l.Unlock() - } + cursor.Value().With(func(c *clientCursor) { + h := c.hook + h.Value().state.With(func(s *clientHookState) { + if s.isResolved() { + panic("ClientPromise.Fulfill called more than once") + } + s.resolvedHook = maybe.New(rh) + close(s.resolved) + }) + c.compress() + }) } // A WeakClient is a weak reference to a capability: it refers to a // capability without preventing it from being shut down. The zero // value is a null reference. type WeakClient struct { - h *clientHook + r *rc.WeakRef[mutex.Mutex[clientCursor]] } // AddRef creates a new Client that refers to the same capability as c // as long as the capability hasn't already been shut down. func (wc *WeakClient) AddRef() (c Client, ok bool) { - if wc == nil { + if wc == nil || wc.r == nil { return Client{}, true } - if wc.h == nil { - return Client{}, true - } - l := wc.h.state.Lock() - wc.h, l = resolveHook(wc.h, l) - if wc.h == nil { - return Client{}, true - } - if l.Value().refs == 0 { - l.Unlock() + cursor, ok := wc.r.AddRef() + if !ok { return Client{}, false } - l.Value().refs++ - l.Unlock() - cs := mutex.New(clientState{cursor: &clientCursor{hook: wc.h}}) - c = Client{client: &client{state: cs}} + c = Client{client: &client{state: mutex.New(clientState{cursor: cursor.AddRef()})}} setupLeakReporting(c) return c, true } @@ -1023,15 +975,14 @@ func ErrorClient(e error) Client { } // Avoid NewClient because it can set a finalizer. - h := &clientHook{ + h := clientHook{ ClientHook: errorClient{e}, metadata: *NewMetadata(), state: mutex.New(clientHookState{ resolved: closedSignal, - refs: 1, }), } - cs := mutex.New(clientState{cursor: &clientCursor{hook: h}}) + cs := mutex.New(clientState{cursor: newClientCursor(h)}) return Client{client: &client{state: cs}} } From 8cd32692dd30e94675969b8f5518487753816417 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 29 Apr 2023 08:55:31 -0400 Subject: [PATCH 05/27] resolveHook: fiddle with refcounts as we walk down the chain. --- capability.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/capability.go b/capability.go index a8ac8c87..1205f671 100644 --- a/capability.go +++ b/capability.go @@ -178,6 +178,7 @@ func (c *clientCursor) compress() { // l will be invalid. The returned Locked will point at the state of // the returned clientHook if they are not nil. func resolveHook(h *rc.Ref[clientHook], l *mutex.Locked[clientHookState]) (*rc.Ref[clientHook], *mutex.Locked[clientHookState]) { + h = h.Steal() for { if !l.Value().isResolved() { return h, l @@ -187,10 +188,11 @@ func resolveHook(h *rc.Ref[clientHook], l *mutex.Locked[clientHookState]) (*rc.R return h, l } l.Unlock() - h = r - if h == nil { + h.Release() + if r == nil { return nil, nil } + h = r.AddRef() l = h.Value().state.Lock() } } From d42f6fbde8b0af504a84f348ab4ce332c36958a5 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 29 Apr 2023 09:12:03 -0400 Subject: [PATCH 06/27] dummyHook: show a bit more info in String() --- capability_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/capability_test.go b/capability_test.go index 24944244..95f6ea90 100644 --- a/capability_test.go +++ b/capability_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "testing" "time" ) @@ -269,7 +270,10 @@ type dummyHook struct { } func (dh *dummyHook) String() string { - return "&dummyHook{}" + return fmt.Sprintf( + "&dummyHook{calls: %v, brand: %v, shutdowns: %v}", + dh.calls, dh.brand, dh.shutdowns, + ) } func (dh *dummyHook) Send(_ context.Context, s Send) (*Answer, ReleaseFunc) { From 1bc69d82b8e6f1cbcb04c64d07fcda4e918b561b Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 29 Apr 2023 09:14:01 -0400 Subject: [PATCH 07/27] More info in test error message --- capability_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capability_test.go b/capability_test.go index 95f6ea90..51866d13 100644 --- a/capability_test.go +++ b/capability_test.go @@ -203,7 +203,7 @@ func TestPromisedClient(t *testing.T) { finish() if !ca.IsSame(cb) { - t.Error("after resolution, ca != cb") + t.Errorf("after resolution, ca != cb (%v vs. %v)", ca, cb) } state = ca.State() if state.Brand.Value != int(222) { From e2d067e68ed9ffe558f40330b0192767eba55dd9 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 29 Apr 2023 09:16:20 -0400 Subject: [PATCH 08/27] clientPromise.fulfill: don't release the argument. Tests seem to expect this to still be good. --- capability.go | 1 - 1 file changed, 1 deletion(-) diff --git a/capability.go b/capability.go index 1205f671..281be2ff 100644 --- a/capability.go +++ b/capability.go @@ -732,7 +732,6 @@ func (cp *clientPromise) shutdown() { func (cp *clientPromise) fulfill(dq *deferred.Queue, c Client) { // dq.Defer(cp.shutdown) - dq.Defer(c.Release) cursor, ok := cp.cursor.AddRef() if !ok { return From 1e0fdb06ec5c22b35f2cc6d0b121abc624c56597 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 29 Apr 2023 09:17:33 -0400 Subject: [PATCH 09/27] Delete commented out code. --- capability.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/capability.go b/capability.go index 281be2ff..335c74fd 100644 --- a/capability.go +++ b/capability.go @@ -717,21 +717,10 @@ func (cp *clientPromise) Fulfill(c Client) { cp.fulfill(dq, c) } -/* -// shutdown waits for all outstanding calls on the hook to complete and -// references to be dropped, and then shuts down the hook. The caller -// must have previously invoked cp.fulfill(). -func (cp *clientPromise) shutdown() { - cp.h.Shutdown() -} -*/ - // fulfill is like Fulfill, except that it does not wait for outsanding calls // to return answers or shut down the underlying hook; instead, it adds functions // to do this to dq. func (cp *clientPromise) fulfill(dq *deferred.Queue, c Client) { - // dq.Defer(cp.shutdown) - cursor, ok := cp.cursor.AddRef() if !ok { return From 888eece94adc6ff45560e16df6954c00674e7d5e Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 29 Apr 2023 09:20:02 -0400 Subject: [PATCH 10/27] Fix leak in WeakClient.AddRef() --- capability.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capability.go b/capability.go index 335c74fd..c846c666 100644 --- a/capability.go +++ b/capability.go @@ -768,7 +768,7 @@ func (wc *WeakClient) AddRef() (c Client, ok bool) { if !ok { return Client{}, false } - c = Client{client: &client{state: mutex.New(clientState{cursor: cursor.AddRef()})}} + c = Client{client: &client{state: mutex.New(clientState{cursor: cursor})}} setupLeakReporting(c) return c, true } From aa2bf27148f0275dabcb85d860792955c33de96b Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 29 Apr 2023 10:03:13 -0400 Subject: [PATCH 11/27] Fix incorrect comment. With an initially null Client, the client pointer is nil so this field doesn't even exist. --- capability.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capability.go b/capability.go index c846c666..c6592d2f 100644 --- a/capability.go +++ b/capability.go @@ -121,7 +121,7 @@ type client struct { type clientState struct { limiter flowcontrol.FlowLimiter - cursor *rc.Ref[mutex.Mutex[clientCursor]] // nil iff this is an initially-nil client + cursor *rc.Ref[mutex.Mutex[clientCursor]] // never nil released bool stream struct { From 84cf90ed88e712d1614aa304af95590b281f3727 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 29 Apr 2023 10:11:21 -0400 Subject: [PATCH 12/27] Remember to set released flag in Client.Release --- capability.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/capability.go b/capability.go index c6592d2f..241a6e57 100644 --- a/capability.go +++ b/capability.go @@ -641,7 +641,8 @@ func (c Client) Release() { } limiter := c.GetFlowLimiter() c.state.With(func(s *clientState) { - if s.cursor.IsValid() { + if !s.released { + s.released = true s.cursor.Release() limiter.Release() } From 404ca4491826961dce633e728489964c9737b103 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Mon, 8 May 2023 23:51:45 -0400 Subject: [PATCH 13/27] Fix bogus duplicate call to newClientCursor() This fixes a number of tests, but not all of them. --- capability.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capability.go b/capability.go index 241a6e57..b86c1265 100644 --- a/capability.go +++ b/capability.go @@ -267,7 +267,7 @@ func newPromisedClient(hook ClientHook) (Client, *clientPromise) { }), } cursor := newClientCursor(h) - cs := mutex.New(clientState{cursor: newClientCursor(h)}) + cs := mutex.New(clientState{cursor: cursor}) c := Client{client: &client{state: cs}} setupLeakReporting(c) return c, &clientPromise{cursor: cursor.Weak()} From ace3d487bc552bb9f32ef6482bfcc006fe0feb4f Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Tue, 9 May 2023 00:19:54 -0400 Subject: [PATCH 14/27] clientHook: release resolution when released. --- capability.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/capability.go b/capability.go index b86c1265..d712f6e7 100644 --- a/capability.go +++ b/capability.go @@ -147,7 +147,17 @@ func newClientCursor(hook clientHook) *rc.Ref[mutex.Mutex[clientCursor]] { c.With(func(c *clientCursor) { c.hook = rc.NewRefInPlace(func(h *clientHook) func() { *h = hook - return h.Shutdown + return func() { + h.Shutdown() + h.state.With(func(s *clientHookState) { + if !s.isResolved() { + return + } + if ref, ok := s.resolvedHook.Get(); ok { + ref.Release() + } + }) + } }) }) return func() { @@ -187,12 +197,15 @@ func resolveHook(h *rc.Ref[clientHook], l *mutex.Locked[clientHookState]) (*rc.R if !ok { return h, l } + if r != nil { + r = r.AddRef() + } l.Unlock() h.Release() if r == nil { return nil, nil } - h = r.AddRef() + h = r l = h.Value().state.Lock() } } From 8e192424e421a2b77ced5192dcbd48c764fd3323 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Wed, 10 May 2023 23:46:04 -0400 Subject: [PATCH 15/27] Simplify the contract for resolveHook. We no longer have a bunch of call sites that need to do other things with the locked value, so let's just acquire it inside the function and simplify the heck out of how this is used. --- capability.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/capability.go b/capability.go index d712f6e7..bda19940 100644 --- a/capability.go +++ b/capability.go @@ -174,28 +174,24 @@ func (c *clientCursor) compress() { if c.hook == nil { return } - l := c.hook.Value().state.Lock() - c.hook, l = resolveHook(c.hook, l) - if c.hook != nil { - l.Unlock() - } + c.hook = resolveHook(c.hook) } // resolveHook is a helper for clientCursor.compress. It resolves h as much -// as possible without blocking. -// -// l must point to the state belonging to h. When resolveHook returns, -// l will be invalid. The returned Locked will point at the state of -// the returned clientHook if they are not nil. -func resolveHook(h *rc.Ref[clientHook], l *mutex.Locked[clientHookState]) (*rc.Ref[clientHook], *mutex.Locked[clientHookState]) { +// as possible without blocking. Takes ownership of h, and the returned +// reference will be owned by the caller. +func resolveHook(h *rc.Ref[clientHook]) *rc.Ref[clientHook] { h = h.Steal() + l := h.Value().state.Lock() for { if !l.Value().isResolved() { - return h, l + l.Unlock() + return h } r, ok := l.Value().resolvedHook.Get() if !ok { - return h, l + l.Unlock() + return h } if r != nil { r = r.AddRef() @@ -203,7 +199,7 @@ func resolveHook(h *rc.Ref[clientHook], l *mutex.Locked[clientHookState]) (*rc.R l.Unlock() h.Release() if r == nil { - return nil, nil + return nil } h = r l = h.Value().state.Lock() From 21e36c0cea7e68c12e174f02ff6025d4358d588c Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Thu, 11 May 2023 00:06:55 -0400 Subject: [PATCH 16/27] Cleanup the structure of resolution state. ...as well as naming it as such; there's no other misc. clientHookState data, it's all promise resolution. --- capability.go | 88 +++++++++++++++++++++++++++------------------------ 1 file changed, 47 insertions(+), 41 deletions(-) diff --git a/capability.go b/capability.go index bda19940..bc09ce5c 100644 --- a/capability.go +++ b/capability.go @@ -149,14 +149,14 @@ func newClientCursor(hook clientHook) *rc.Ref[mutex.Mutex[clientCursor]] { *h = hook return func() { h.Shutdown() - h.state.With(func(s *clientHookState) { - if !s.isResolved() { - return - } - if ref, ok := s.resolvedHook.Get(); ok { - ref.Release() - } - }) + r, ok := h.resolution.Get() + if ok { + r.With(func(s *resolveState) { + if s.isResolved() { + s.resolvedHook.Release() + } + }) + } } }) }) @@ -181,28 +181,26 @@ func (c *clientCursor) compress() { // as possible without blocking. Takes ownership of h, and the returned // reference will be owned by the caller. func resolveHook(h *rc.Ref[clientHook]) *rc.Ref[clientHook] { - h = h.Steal() - l := h.Value().state.Lock() for { - if !l.Value().isResolved() { - l.Unlock() + if h == nil { return h } - r, ok := l.Value().resolvedHook.Get() + res, ok := h.Value().resolution.Get() if !ok { + return h + } + l := res.Lock() + if !l.Value().isResolved() { l.Unlock() return h } + r := l.Value().resolvedHook if r != nil { r = r.AddRef() } l.Unlock() h.Release() - if r == nil { - return nil - } h = r - l = h.Value().state.Lock() } } @@ -217,16 +215,17 @@ type clientHook struct { // Place for callers to attach arbitrary metadata to the client. metadata Metadata - state mutex.Mutex[clientHookState] + // State of the promise's resolution. If this is absent, then + // this clientHook is not a promise. + resolution maybe.Maybe[*mutex.Mutex[resolveState]] } -type clientHookState struct { +type resolveState struct { // resolved is closed after resolvedHook is set resolved chan struct{} - // Valid only if resolved is closed. Absent if this - // was not a promise. - resolvedHook maybe.Maybe[*rc.Ref[clientHook]] + // Valid only if resolved is closed. + resolvedHook *rc.Ref[clientHook] } // NewClient creates the first reference to a capability. @@ -241,9 +240,6 @@ func NewClient(hook ClientHook) Client { h := clientHook{ ClientHook: hook, metadata: *NewMetadata(), - state: mutex.New(clientHookState{ - resolved: closedSignal, - }), } cs := mutex.New(clientState{cursor: newClientCursor(h)}) c := Client{client: &client{state: cs}} @@ -268,14 +264,14 @@ func newPromisedClient(hook ClientHook) (Client, *clientPromise) { if hook == nil { panic("NewPromisedClient(nil)") } - h := clientHook{ + rs := mutex.New(resolveState{ + resolved: make(chan struct{}), + }) + cursor := newClientCursor(clientHook{ ClientHook: hook, metadata: *NewMetadata(), - state: mutex.New(clientHookState{ - resolved: make(chan struct{}), - }), - } - cursor := newClientCursor(h) + resolution: maybe.New(&rs), + }) cs := mutex.New(clientState{cursor: cursor}) c := Client{client: &client{state: cs}} setupLeakReporting(c) @@ -303,7 +299,11 @@ func (c Client) startCall() (hook *rc.Ref[clientHook], resolved, released bool) if !ok { return nil, true, false } - resolved = mutex.With1(&hook.Value().state, func(s *clientHookState) bool { + r, ok := hook.Value().resolution.Get() + if !ok { + return hook, true, false + } + resolved = mutex.With1(r, func(s *resolveState) bool { return s.isResolved() }) return hook, resolved, false @@ -526,17 +526,22 @@ func (c Client) IsSame(c2 Client) bool { // if the capability resolves to an error. func (c Client) Resolve(ctx context.Context) error { for { - h, resolved, released := c.startCall() + h, _, released := c.startCall() defer h.Release() if released { return errors.New("cannot resolve released client") } - if resolved { + if h == nil { + return nil + } + + r, ok := h.Value().resolution.Get() + if !ok { return nil } - resolvedCh := mutex.With1(&h.Value().state, func(s *clientHookState) <-chan struct{} { + resolvedCh := mutex.With1(r, func(s *resolveState) <-chan struct{} { return s.resolved }) @@ -670,7 +675,7 @@ func (Client) DecodeFromPtr(p Ptr) Client { var _ TypeParam[Client] = Client{} // isResolve reports whether the clientHook s belongs to is resolved. -func (s *clientHookState) isResolved() bool { +func (s *resolveState) isResolved() bool { select { case <-s.resolved: return true @@ -750,11 +755,15 @@ func (cp *clientPromise) fulfill(dq *deferred.Queue, c Client) { // Mark hook as resolved. cursor.Value().With(func(c *clientCursor) { h := c.hook - h.Value().state.With(func(s *clientHookState) { + r, ok := h.Value().resolution.Get() + if !ok { + panic("BUG: clientPromise referred to a clientHook that was not a promise") + } + r.With(func(s *resolveState) { if s.isResolved() { panic("ClientPromise.Fulfill called more than once") } - s.resolvedHook = maybe.New(rh) + s.resolvedHook = rh close(s.resolved) }) c.compress() @@ -978,9 +987,6 @@ func ErrorClient(e error) Client { h := clientHook{ ClientHook: errorClient{e}, metadata: *NewMetadata(), - state: mutex.New(clientHookState{ - resolved: closedSignal, - }), } cs := mutex.New(clientState{cursor: newClientCursor(h)}) return Client{client: &client{state: cs}} From a70c592f65cc3f9bca3e1f488868159fbd5c0bc9 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Thu, 11 May 2023 16:33:51 -0400 Subject: [PATCH 17/27] Remove superfluous resolveHook helper. ...just inline it into compress; there's no other logic in there anyway. --- capability.go | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/capability.go b/capability.go index bc09ce5c..79c7ae9f 100644 --- a/capability.go +++ b/capability.go @@ -171,36 +171,26 @@ func newClientCursor(hook clientHook) *rc.Ref[mutex.Mutex[clientCursor]] { // compress advances the hook referred to by this cursor as far // as possible without blocking on a resolution. func (c *clientCursor) compress() { - if c.hook == nil { - return - } - c.hook = resolveHook(c.hook) -} - -// resolveHook is a helper for clientCursor.compress. It resolves h as much -// as possible without blocking. Takes ownership of h, and the returned -// reference will be owned by the caller. -func resolveHook(h *rc.Ref[clientHook]) *rc.Ref[clientHook] { for { - if h == nil { - return h + if c.hook == nil { + return } - res, ok := h.Value().resolution.Get() + res, ok := c.hook.Value().resolution.Get() if !ok { - return h + return } l := res.Lock() if !l.Value().isResolved() { l.Unlock() - return h + return } r := l.Value().resolvedHook if r != nil { r = r.AddRef() } l.Unlock() - h.Release() - h = r + c.hook.Release() + c.hook = r } } From 4bd640545f20b7a1dfa91f0a68d1763828e988ad Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Thu, 11 May 2023 19:46:15 -0400 Subject: [PATCH 18/27] Bump go-util version --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index a2b85986..46eaa4f6 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/tinylib/msgp v1.1.5 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 - zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 + zenhack.net/go/util v0.0.0-20230414211804-99ae9bf14f02 ) require ( diff --git a/go.sum b/go.sum index a57a0ee2..4f790f48 100644 --- a/go.sum +++ b/go.sum @@ -54,3 +54,5 @@ zenhack.net/go/util v0.0.0-20230327231740-da8cb323921c h1:L+T38E+u91e956ykUrYKHZ zenhack.net/go/util v0.0.0-20230327231740-da8cb323921c/go.mod h1:0lafdGg7tDb7RcXASgmJmRbLFLkAxu328+KGIs7icDE= zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 h1:yksDCGMVzyn3vlyf0GZ3huiF5FFaMGQpQ3UJvR0EoGA= zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5/go.mod h1:1LtNdPAs8WH+BTcQiZAOo2MIKD/5jyK/u7sZ9ZPe5SE= +zenhack.net/go/util v0.0.0-20230414211804-99ae9bf14f02 h1:0iYx7hqltFcLvxDeCdg84gEJp8BJ6SgGti431U2ztIU= +zenhack.net/go/util v0.0.0-20230414211804-99ae9bf14f02/go.mod h1:1LtNdPAs8WH+BTcQiZAOo2MIKD/5jyK/u7sZ9ZPe5SE= From 27ae512047e158cdf0ef56aea8447f9430f9f6ac Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Fri, 12 May 2023 01:45:18 -0400 Subject: [PATCH 19/27] Factor out release logic for clientHook newClientCursor was a bit too hard to read. --- capability.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/capability.go b/capability.go index 79c7ae9f..2d1a9bc7 100644 --- a/capability.go +++ b/capability.go @@ -147,17 +147,7 @@ func newClientCursor(hook clientHook) *rc.Ref[mutex.Mutex[clientCursor]] { c.With(func(c *clientCursor) { c.hook = rc.NewRefInPlace(func(h *clientHook) func() { *h = hook - return func() { - h.Shutdown() - r, ok := h.resolution.Get() - if ok { - r.With(func(s *resolveState) { - if s.isResolved() { - s.resolvedHook.Release() - } - }) - } - } + return h.Release }) }) return func() { @@ -210,6 +200,18 @@ type clientHook struct { resolution maybe.Maybe[*mutex.Mutex[resolveState]] } +func (h *clientHook) Release() { + h.Shutdown() + r, ok := h.resolution.Get() + if ok { + r.With(func(s *resolveState) { + if s.isResolved() { + s.resolvedHook.Release() + } + }) + } +} + type resolveState struct { // resolved is closed after resolvedHook is set resolved chan struct{} From 39f9e3d7bd508c2f3b2f69d0519685a4b60f0645 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Fri, 12 May 2023 01:47:35 -0400 Subject: [PATCH 20/27] Delete a done TODO --- capability.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/capability.go b/capability.go index 2d1a9bc7..6acf0f41 100644 --- a/capability.go +++ b/capability.go @@ -133,11 +133,6 @@ type clientState struct { // clientCursor is an indirection pointing to a link in the resolution // chain of clientHooks. Places that need to do path shortening should // store one of these, rather than storing clientHook directly. -// -// TODO: we want to move towards sharing one clientCursor (protected -// by a mutex) between all Clients and the like, to allow us to -// shorten from afar, rather than waiting for methods to be called -// on the relevant Client. type clientCursor struct { hook *rc.Ref[clientHook] // nil if resolved to nil or released } From 9da2ba2eb231886d2cb20c8b6793d957d0179663 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 13 May 2023 20:20:38 -0400 Subject: [PATCH 21/27] Client.IsSame: remove duplicate calls to IsValid() ...which isn't entirely trivial. --- capability.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/capability.go b/capability.go index 6acf0f41..b50fbaf4 100644 --- a/capability.go +++ b/capability.go @@ -499,10 +499,12 @@ func (c Client) IsSame(c2 Client) bool { if released { panic("IsSame on released client") } - if !h1.IsValid() && !h2.IsValid() { + valid1 := h1.IsValid() + valid2 := h2.IsValid() + if !valid1 && !valid2 { return true } - if !h1.IsValid() || !h2.IsValid() { + if !valid1 || !valid2 { return false } return h1.Value() == h2.Value() From 7970dd840844f9f11567a82414c0488280afdef9 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 13 May 2023 20:29:23 -0400 Subject: [PATCH 22/27] Remove unnecessary indirection for WeakClient. --- capability.go | 8 ++++---- rpc/import.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/capability.go b/capability.go index b50fbaf4..9443de99 100644 --- a/capability.go +++ b/capability.go @@ -562,14 +562,14 @@ func (c Client) AddRef() Client { // WeakRef creates a new WeakClient that refers to the same capability // as c. If c is nil or has resolved to null, then WeakRef returns nil. -func (c Client) WeakRef() *WeakClient { +func (c Client) WeakRef() WeakClient { cursor := mutex.With1(&c.state, func(s *clientState) *rc.WeakRef[mutex.Mutex[clientCursor]] { if s.released { panic("WeakRef on released client") } return s.cursor.Weak() }) - return &WeakClient{r: cursor} + return WeakClient{r: cursor} } // State reads the current state of the client. It returns the zero @@ -768,8 +768,8 @@ type WeakClient struct { // AddRef creates a new Client that refers to the same capability as c // as long as the capability hasn't already been shut down. -func (wc *WeakClient) AddRef() (c Client, ok bool) { - if wc == nil || wc.r == nil { +func (wc WeakClient) AddRef() (c Client, ok bool) { + if wc.r == nil { return Client{}, true } cursor, ok := wc.r.AddRef() diff --git a/rpc/import.go b/rpc/import.go index b99b4393..0d6288ec 100644 --- a/rpc/import.go +++ b/rpc/import.go @@ -16,7 +16,7 @@ type importID uint32 // impent is an entry in the import table. All fields are protected by // Conn.mu. type impent struct { - wc *capnp.WeakClient + wc capnp.WeakClient // wireRefs is the number of times that the importID has appeared in // messages received from the remote vat. Used to populate the From e0d6e81182df0c2798854765ddcaceb70febd1c7 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 13 May 2023 22:34:43 -0400 Subject: [PATCH 23/27] Slightly refactor clientCursor. We always pass this around in a mutex anyway, so just move that inside the struct like everything else that has synchronized state. --- capability.go | 94 +++++++++++++++++++++++++++------------------------ 1 file changed, 49 insertions(+), 45 deletions(-) diff --git a/capability.go b/capability.go index 9443de99..d4423017 100644 --- a/capability.go +++ b/capability.go @@ -121,7 +121,7 @@ type client struct { type clientState struct { limiter flowcontrol.FlowLimiter - cursor *rc.Ref[mutex.Mutex[clientCursor]] // never nil + cursor *rc.Ref[clientCursor] // never nil released bool stream struct { @@ -134,49 +134,53 @@ type clientState struct { // chain of clientHooks. Places that need to do path shortening should // store one of these, rather than storing clientHook directly. type clientCursor struct { - hook *rc.Ref[clientHook] // nil if resolved to nil or released + hook mutex.Mutex[*rc.Ref[clientHook]] // nil if resolved to nil or released } -func newClientCursor(hook clientHook) *rc.Ref[mutex.Mutex[clientCursor]] { - return rc.NewRefInPlace(func(c *mutex.Mutex[clientCursor]) func() { - c.With(func(c *clientCursor) { - c.hook = rc.NewRefInPlace(func(h *clientHook) func() { - *h = hook - return h.Release - }) - }) - return func() { - c.With(func(c *clientCursor) { - c.hook.Release() - }) - } +func newClientCursor(hook clientHook) *rc.Ref[clientCursor] { + hookRef := rc.NewRefInPlace(func(h *clientHook) func() { + *h = hook + return h.Release + }) + return rc.NewRefInPlace(func(c *clientCursor) func() { + *c = clientCursor{hook: mutex.New(hookRef)} + return c.Release }) } // compress advances the hook referred to by this cursor as far // as possible without blocking on a resolution. func (c *clientCursor) compress() { - for { - if c.hook == nil { - return - } - res, ok := c.hook.Value().resolution.Get() - if !ok { - return - } - l := res.Lock() - if !l.Value().isResolved() { + c.hook.With(func(hook **rc.Ref[clientHook]) { + for { + h := *hook + if h == nil { + return + } + res, ok := h.Value().resolution.Get() + if !ok { + return + } + l := res.Lock() + if !l.Value().isResolved() { + l.Unlock() + return + } + r := l.Value().resolvedHook + if r != nil { + r = r.AddRef() + } l.Unlock() - return - } - r := l.Value().resolvedHook - if r != nil { - r = r.AddRef() + h.Release() + *hook = r } - l.Unlock() - c.hook.Release() - c.hook = r - } + }) +} + +func (c *clientCursor) Release() { + c.hook.With(func(hook **rc.Ref[clientHook]) { + (*hook).Release() + }) } // clientHook is a reference-counted wrapper for a ClientHook. @@ -276,10 +280,11 @@ func (c Client) startCall() (hook *rc.Ref[clientHook], resolved, released bool) if c.released || !c.cursor.IsValid() { return nil, true, c.released } - hook, ok := mutex.With2(c.cursor.Value(), func(c *clientCursor) (*rc.Ref[clientHook], bool) { - c.compress() - if c.hook.IsValid() { - return c.hook.AddRef(), true + c.cursor.Value().compress() + hook, ok := mutex.With2(&c.cursor.Value().hook, func(h **rc.Ref[clientHook]) (*rc.Ref[clientHook], bool) { + ret := *h + if ret.IsValid() { + return ret.AddRef(), true } return nil, false }) @@ -563,7 +568,7 @@ func (c Client) AddRef() Client { // WeakRef creates a new WeakClient that refers to the same capability // as c. If c is nil or has resolved to null, then WeakRef returns nil. func (c Client) WeakRef() WeakClient { - cursor := mutex.With1(&c.state, func(s *clientState) *rc.WeakRef[mutex.Mutex[clientCursor]] { + cursor := mutex.With1(&c.state, func(s *clientState) *rc.WeakRef[clientCursor] { if s.released { panic("WeakRef on released client") } @@ -702,7 +707,7 @@ func SetClientLeakFunc(clientLeakFunc func(msg string)) { // A ClientPromise resolves the identity of a client created by NewPromisedClient. type clientPromise struct { - cursor *rc.WeakRef[mutex.Mutex[clientCursor]] + cursor *rc.WeakRef[clientCursor] } func (cp *clientPromise) Reject(err error) { @@ -742,9 +747,8 @@ func (cp *clientPromise) fulfill(dq *deferred.Queue, c Client) { } // Mark hook as resolved. - cursor.Value().With(func(c *clientCursor) { - h := c.hook - r, ok := h.Value().resolution.Get() + cursor.Value().hook.With(func(h **rc.Ref[clientHook]) { + r, ok := (*h).Value().resolution.Get() if !ok { panic("BUG: clientPromise referred to a clientHook that was not a promise") } @@ -755,15 +759,15 @@ func (cp *clientPromise) fulfill(dq *deferred.Queue, c Client) { s.resolvedHook = rh close(s.resolved) }) - c.compress() }) + cursor.Value().compress() } // A WeakClient is a weak reference to a capability: it refers to a // capability without preventing it from being shut down. The zero // value is a null reference. type WeakClient struct { - r *rc.WeakRef[mutex.Mutex[clientCursor]] + r *rc.WeakRef[clientCursor] } // AddRef creates a new Client that refers to the same capability as c From 99c0b23ad7d721031fc77711d1f0791c1a6d2358 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Mon, 15 May 2023 20:21:09 -0400 Subject: [PATCH 24/27] Remember to release promise clients in answer entries. Fixes a leak. I'm not sure why this worked before the rc refactor. --- rpc/answer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rpc/answer.go b/rpc/answer.go index b5ab5c96..362070be 100644 --- a/rpc/answer.go +++ b/rpc/answer.go @@ -292,6 +292,7 @@ func (ans *ansent) completeSendReturn(dq *deferred.Queue) error { } else { ans.promise.Resolve(ans.returner.results.Content()) } + dq.Defer(ans.promise.ReleaseClients) ans.promise = nil } ans.sendMsg() @@ -338,6 +339,7 @@ func (ans *ansent) completeSendException(dq *deferred.Queue) { if ans.promise != nil { ans.promise.Reject(ex) + dq.Defer(ans.promise.ReleaseClients) ans.promise = nil } if ans.sendMsg != nil { From ca34bdef5cd2dcaa4c6e846d65fc9cb77ebcbb63 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Mon, 15 May 2023 21:01:36 -0400 Subject: [PATCH 25/27] Fix typo in test name --- rpc/level0_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/level0_test.go b/rpc/level0_test.go index a103274a..901d929e 100644 --- a/rpc/level0_test.go +++ b/rpc/level0_test.go @@ -1360,7 +1360,7 @@ func TestRecvBootstrapPipelineCall(t *testing.T) { // TestDuplicateBootstrap calls Bootstrap twice on the same connection, // and verifies that the results are the same. -func TestDuplicateBoostrap(t *testing.T) { +func TestDuplicateBootstrap(t *testing.T) { t.Parallel() left, right := transport.NewPipe(1) From 9b715d04993906af7caef05e673c15fb684d4bd9 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Tue, 23 May 2023 21:34:24 -0400 Subject: [PATCH 26/27] Change test to match new WeakRef semantics. Per some discussion on matrix, the way weakrefs interact with promise resolution has changed, but I don't think we actually care, so this just changes the test to match. --- capability_test.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/capability_test.go b/capability_test.go index 51866d13..0fc2d131 100644 --- a/capability_test.go +++ b/capability_test.go @@ -7,6 +7,8 @@ import ( "fmt" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestClient(t *testing.T) { @@ -600,20 +602,12 @@ func TestWeakPromisedClient(t *testing.T) { defer ca.Release() cb2, ok := wa.AddRef() defer cb2.Release() - if !ok { - t.Error("wa.AddRef() failed after releasing ca") - } - if !cb.IsSame(cb2) { - t.Error("cb != cb2") - } + assert.False(t, ok, "wa.AddRef() failed after releasing ca") + assert.False(t, cb.IsSame(cb2), "cb != cb2") + cb.Release() - defer cb.Release() - if b.shutdowns > 0 { - t.Error("b shut down before cb2.Release") - } - cb2.Release() if b.shutdowns == 0 { - t.Error("b not shut down after cb2.Release") + t.Error("b not shut down after cb.Release") } } From ad5855002f1d0c5161e51a2288ee294fd01ef257 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Wed, 24 May 2023 19:19:04 -0400 Subject: [PATCH 27/27] Revert change to resolved check in Client.Resolve As Louis pointed out, this is a bit weird, and I'm not sure why I changed it. --- capability.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/capability.go b/capability.go index d4423017..71275eef 100644 --- a/capability.go +++ b/capability.go @@ -520,13 +520,13 @@ func (c Client) IsSame(c2 Client) bool { // if the capability resolves to an error. func (c Client) Resolve(ctx context.Context) error { for { - h, _, released := c.startCall() + h, resolved, released := c.startCall() defer h.Release() if released { return errors.New("cannot resolve released client") } - if h == nil { + if resolved { return nil }