Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a3ffc4e
WIP: handle incoming resolve messages.
zenhack Mar 16, 2023
36b13bf
cleanup: remove unnecessary declaration of importClient.
zenhack Mar 18, 2023
3cbff97
Factor some logic out of handleDisembargo
zenhack Mar 18, 2023
9fbd8c6
First stab at disembargos on imports.
zenhack Mar 18, 2023
2061cfa
Add a test for disembargos on senderPromises.
zenhack Mar 19, 2023
d50b6ce
Add another test wrt promise resolution.
zenhack Mar 19, 2023
b11ac49
Fix the ownership semantics for NewLocalPromise
zenhack Mar 21, 2023
a1028be
Remove no longer necessary .Release()
zenhack Mar 21, 2023
0799051
Merge remote-tracking branch 'origin/main' into handle-resolve
zenhack Mar 22, 2023
9a40e87
Minor style cleanup
zenhack Mar 23, 2023
e46bf47
Correctly send receiverLoopbacks that target promisedAnswers.
zenhack Mar 23, 2023
14b7864
Merge remote-tracking branch 'origin/main' into handle-resolve
zenhack Mar 24, 2023
49578d5
Merge remote-tracking branch 'origin/main' into handle-resolve
zenhack Mar 30, 2023
3a32a77
Fix build errors due to bitrot.
zenhack Mar 30, 2023
c78b076
Clean up the way local promises work.
zenhack Mar 30, 2023
d7861ab
Merge branch 'localpromise-fixes' into handle-resolve
zenhack Mar 30, 2023
2e138f0
WIP: put a ClientSnapshot in the exports table
zenhack May 25, 2023
9999902
ClientSnapshot.Release(): take pointer receiver.
zenhack May 26, 2023
7213fc1
Do leak reporting for ClientSnapshots, not just Clients.
zenhack May 26, 2023
1cc3130
Merge remote-tracking branch 'origin/main' into handle-resolve
zenhack May 27, 2023
8d71f0b
Merge branch 'export-snapshot' into handle-resolve
zenhack May 27, 2023
91d062a
CapTable: rename Get -> GetClient, Add -> AddClient
zenhack May 27, 2023
5e0023a
CapTable: Add *Snapshot variants of Add & Get
zenhack May 27, 2023
abf193b
Separate client/snapshot versions of CapTable.At()
zenhack May 27, 2023
76f95d5
Remove argument to CapTable.Reset()
zenhack May 27, 2023
61ca9e5
CapTable.Set: split into client/snapshot variants
zenhack May 27, 2023
f967703
CapTable: change internal table to store snapshots.
zenhack May 27, 2023
fe08d87
WIP: Add a Steal() method to Client and ClientSnapshot
zenhack May 27, 2023
afeb534
Add missing call to .AddRef()
zenhack May 27, 2023
7a23f9b
pogs tests: clone input before inserting.
zenhack May 27, 2023
2372b53
Mark TestDuplicateBootstrap as flaky.
zenhack May 27, 2023
1acb315
Merge branch 'client.Steal' into captable-snapshots
zenhack May 27, 2023
843e8b6
Merge branch '523-flaky' into captable-snapshots
zenhack May 27, 2023
575916e
CapTable: maintain snapshots & clients in parallel.
zenhack May 27, 2023
75200fc
Merge branch 'captable-snapshots' into handle-resolve
zenhack May 27, 2023
0a99d60
Fix build errors.
zenhack May 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Promise struct {
// - Resolved. Fulfill or Reject has finished.

state mutex.Mutex[promiseState]

resolver Resolver[Ptr]
}

type promiseState struct {
Expand Down Expand Up @@ -64,11 +66,13 @@ type clientAndPromise struct {
}

// NewPromise creates a new unresolved promise. The PipelineCaller will
// be used to make pipelined calls before the promise resolves.
func NewPromise(m Method, pc PipelineCaller) *Promise {
// be used to make pipelined calls before the promise resolves. If resolver
// is not nil, calls to Fulfill will be forwarded to it.
func NewPromise(m Method, pc PipelineCaller, resolver Resolver[Ptr]) *Promise {
if pc == nil {
panic("NewPromise(nil)")
}

resolved := make(chan struct{})
p := &Promise{
method: m,
Expand All @@ -77,6 +81,7 @@ func NewPromise(m Method, pc PipelineCaller) *Promise {
signals: []func(){func() { close(resolved) }},
caller: pc,
}),
resolver: resolver,
}
p.ans.f.promise = p
p.ans.metadata = *NewMetadata()
Expand Down Expand Up @@ -152,6 +157,14 @@ func (p *Promise) Resolve(r Ptr, e error) {
return p.clients
})

if p.resolver != nil {
if e == nil {
p.resolver.Fulfill(r)
} else {
p.resolver.Reject(e)
}
}

// Pending resolution state: wait for clients to be fulfilled
// and calls to have answers.
res := resolution{p.method, r, e}
Expand Down
14 changes: 7 additions & 7 deletions answer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var dummyMethod = Method{

func TestPromiseReject(t *testing.T) {
t.Run("Done", func(t *testing.T) {
p := NewPromise(dummyMethod, dummyPipelineCaller{})
p := NewPromise(dummyMethod, dummyPipelineCaller{}, nil)
done := p.Answer().Done()
p.Reject(errors.New("omg bbq"))
select {
Expand All @@ -27,7 +27,7 @@ func TestPromiseReject(t *testing.T) {
}
})
t.Run("Struct", func(t *testing.T) {
p := NewPromise(dummyMethod, dummyPipelineCaller{})
p := NewPromise(dummyMethod, dummyPipelineCaller{}, nil)
defer p.ReleaseClients()
ans := p.Answer()
p.Reject(errors.New("omg bbq"))
Expand All @@ -36,7 +36,7 @@ func TestPromiseReject(t *testing.T) {
}
})
t.Run("Client", func(t *testing.T) {
p := NewPromise(dummyMethod, dummyPipelineCaller{})
p := NewPromise(dummyMethod, dummyPipelineCaller{}, nil)
defer p.ReleaseClients()
pc := p.Answer().Field(1, nil).Client()
p.Reject(errors.New("omg bbq"))
Expand All @@ -57,7 +57,7 @@ func TestPromiseFulfill(t *testing.T) {
t.Parallel()

t.Run("Done", func(t *testing.T) {
p := NewPromise(dummyMethod, dummyPipelineCaller{})
p := NewPromise(dummyMethod, dummyPipelineCaller{}, nil)
done := p.Answer().Done()
msg, seg, _ := NewMessage(SingleSegment(nil))
defer msg.Release()
Expand All @@ -72,7 +72,7 @@ func TestPromiseFulfill(t *testing.T) {
}
})
t.Run("Struct", func(t *testing.T) {
p := NewPromise(dummyMethod, dummyPipelineCaller{})
p := NewPromise(dummyMethod, dummyPipelineCaller{}, nil)
defer p.ReleaseClients()
ans := p.Answer()
msg, seg, _ := NewMessage(SingleSegment(nil))
Expand All @@ -92,7 +92,7 @@ func TestPromiseFulfill(t *testing.T) {
}
})
t.Run("Client", func(t *testing.T) {
p := NewPromise(dummyMethod, dummyPipelineCaller{})
p := NewPromise(dummyMethod, dummyPipelineCaller{}, nil)
defer p.ReleaseClients()
pc := p.Answer().Field(1, nil).Client()

Expand All @@ -103,7 +103,7 @@ func TestPromiseFulfill(t *testing.T) {
defer msg.Release()

res, _ := NewStruct(seg, ObjectSize{PointerCount: 3})
res.SetPtr(1, NewInterface(seg, msg.CapTable().Add(c.AddRef())).ToPtr())
res.SetPtr(1, NewInterface(seg, msg.CapTable().AddClient(c.AddRef())).ToPtr())

p.Fulfill(res.ToPtr())

Expand Down
2 changes: 1 addition & 1 deletion answerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (sr *StructReturner) Answer(m Method, pcall PipelineCaller) (*Answer, Relea
}
}
}
sr.p = NewPromise(m, pcall)
sr.p = NewPromise(m, pcall, nil)
ans := sr.p.Answer()
return ans, func() {
<-ans.Done()
Expand Down
73 changes: 58 additions & 15 deletions capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,16 @@ func (i Interface) value(paddr address) rawPointer {
// or nil if the pointer is invalid.
func (i Interface) Client() (c Client) {
if msg := i.Message(); msg != nil {
c = msg.CapTable().Get(i)
c = msg.CapTable().GetClient(i)
}
return
}

// Snapshot is like Client except that it returns a snapshot.
func (i Interface) Snapshot() (c ClientSnapshot) {
if msg := i.Message(); msg != nil {
c = msg.CapTable().GetSnapshot(i)
}
return
}

Expand Down Expand Up @@ -550,6 +557,13 @@ func (c Client) AddRef() Client {
})
}

// Steal steals the receiver, and returns a new client for the same capability
// owned by the caller. This can be useful for tracking down ownership bugs.
func (c Client) Steal() Client {
defer c.Release()
return c.AddRef()
}

// WeakRef creates a new WeakClient that refers to the same capability
// as c. If c is nil or has resolved to null, then WeakRef returns nil.
func (c Client) WeakRef() WeakClient {
Expand All @@ -566,7 +580,9 @@ func (c Client) WeakRef() WeakClient {
// ClientSnapshot if c is nil, has resolved to null, or has been released.
func (c Client) Snapshot() ClientSnapshot {
h, _, _ := c.startCall()
return ClientSnapshot{hook: h}
s := ClientSnapshot{hook: h}
setupLeakReporting(s)
return s
}

// A Brand is an opaque value used to identify a capability.
Expand Down Expand Up @@ -606,6 +622,9 @@ func (cs ClientSnapshot) Recv(ctx context.Context, r Recv) PipelineCaller {

// Client returns a client pointing at the most-resolved version of the snapshot.
func (cs ClientSnapshot) Client() Client {
if !cs.IsValid() {
return Client{}
}
cursor := rc.NewRefInPlace(func(c *clientCursor) func() {
*c = clientCursor{hook: mutex.New(cs.hook.AddRef())}
c.compress()
Expand All @@ -630,23 +649,34 @@ func (cs ClientSnapshot) Brand() Brand {
// Return a the reference to the Metadata associated with this client hook.
// Callers may store whatever they need here.
func (cs ClientSnapshot) Metadata() *Metadata {
return &cs.hook.Value().metadata
if cs.hook.IsValid() {
return &cs.hook.Value().metadata
}
return nil
}

// Create a copy of the snapshot, with its own underlying reference.
func (cs ClientSnapshot) AddRef() ClientSnapshot {
cs.hook = cs.hook.AddRef()
setupLeakReporting(cs)
return cs
}

// Steal is like Client.Steal() but for snapshots.
func (cs ClientSnapshot) Steal() ClientSnapshot {
defer cs.Release()
return cs.AddRef()
}

// Release the reference to the hook.
func (cs ClientSnapshot) Release() {
func (cs *ClientSnapshot) Release() {
cs.hook.Release()
}

func (cs *ClientSnapshot) Resolve1(ctx context.Context) error {
var err error
cs.hook, _, err = resolve1ClientHook(ctx, cs.hook)
setupLeakReporting(*cs)
return err
}

Expand All @@ -658,6 +688,7 @@ func (cs *ClientSnapshot) resolve1(ctx context.Context) (more bool, err error) {
func (cs *ClientSnapshot) Resolve(ctx context.Context) error {
var err error
cs.hook, err = resolveClientHook(ctx, cs.hook)
setupLeakReporting(*cs)
return err
}

Expand Down Expand Up @@ -746,7 +777,7 @@ func (c Client) Release() {
}

func (c Client) EncodeAsPtr(seg *Segment) Ptr {
capId := seg.Message().CapTable().Add(c)
capId := seg.Message().CapTable().AddClient(c)
return NewInterface(seg, capId).ToPtr()
}

Expand All @@ -766,7 +797,7 @@ func (s *resolveState) isResolved() bool {
}
}

var setupLeakReporting func(Client) = func(Client) {}
var setupLeakReporting func(any) = func(any) {}

// SetClientLeakFunc sets a callback for reporting Clients that went
// out of scope without being released. The callback is not guaranteed
Expand All @@ -776,20 +807,32 @@ var setupLeakReporting func(Client) = func(Client) {}
// SetClientLeakFunc must not be called after any calls to NewClient or
// NewPromisedClient.
func SetClientLeakFunc(clientLeakFunc func(msg string)) {
setupLeakReporting = func(c Client) {
setupLeakReporting = func(v any) {
buf := bufferpool.Default.Get(1e6)
n := runtime.Stack(buf, false)
stack := string(buf[:n])
bufferpool.Default.Put(buf)
runtime.SetFinalizer(c.client, func(c *client) {
released := mutex.With1(&c.state, func(c *clientState) bool {
return c.released
switch c := v.(type) {
case Client:
runtime.SetFinalizer(c.client, func(c *client) {
released := mutex.With1(&c.state, func(c *clientState) bool {
return c.released
})
if released {
return
}
clientLeakFunc("leaked client created at:\n\n" + stack)
})
if released {
return
}
clientLeakFunc("leaked client created at:\n\n" + stack)
})
case ClientSnapshot:
runtime.SetFinalizer(c.hook, func(c *rc.Ref[clientHook]) {
if !c.IsValid() {
return
}
clientLeakFunc("leaked client snapshot created at:\n\n" + stack)
})
default:
panic("setupLeakReporting called on unrecognized type!")
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion capability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestResolve(t *testing.T) {
}
t.Run("Clients", func(t *testing.T) {
test(t, "Waits for the full chain", func(t *testing.T, p1, p2 Client, r1, r2 Resolver[Client]) {
r1.Fulfill(p2)
r1.Fulfill(p2.AddRef())
ctx, cancel := context.WithTimeout(context.Background(), time.Second/10)
defer cancel()
require.NotNil(t, p1.Resolve(ctx), "blocks on second promise")
Expand Down
2 changes: 1 addition & 1 deletion capnpc-go/templates/structCapabilityField
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ func (s {{.Node.Name}}) Set{{.Field.Name|title}}(c {{.FieldType}}) error {
return capnp.Struct(s).SetPtr({{.Field.Slot.Offset}}, capnp.Ptr{})
}
seg := s.Segment()
in := capnp.NewInterface(seg, seg.Message().CapTable().Add(c))
in := capnp.NewInterface(seg, seg.Message().CapTable().AddClient(c))
return capnp.Struct(s).SetPtr({{.Field.Slot.Offset}}, in.ToPtr())
}
2 changes: 1 addition & 1 deletion capnpc-go/templates/structInterfaceField
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func (s {{.Node.Name}}) Set{{.Field.Name|title}}(v {{.FieldType}}) error {
return capnp.Struct(s).SetPtr({{.Field.Slot.Offset}}, capnp.Ptr{})
}
seg := s.Segment()
in := capnp.NewInterface(seg, seg.Message().CapTable().Add(capnp.Client(v)))
in := capnp.NewInterface(seg, seg.Message().CapTable().AddClient(capnp.Client(v)))
return capnp.Struct(s).SetPtr({{.Field.Slot.Offset}}, in.ToPtr())
}

Loading