Skip to content

Commit 0799051

Browse files
committed
Merge remote-tracking branch 'origin/main' into handle-resolve
2 parents a1028be + 6ed9a14 commit 0799051

File tree

8 files changed

+65
-171
lines changed

8 files changed

+65
-171
lines changed

capability.go

Lines changed: 17 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,6 @@ type ClientKind = struct {
117117
}
118118

119119
type client struct {
120-
creatorFunc int
121-
creatorFile string
122-
creatorStack string
123-
creatorLine int
124-
125120
mu sync.Mutex // protects the struct
126121
limiter flowcontrol.FlowLimiter
127122
h *clientHook // nil if resolved to nil or released
@@ -156,19 +151,6 @@ type clientHook struct {
156151
resolvedHook *clientHook // valid only if resolved is closed
157152
}
158153

159-
func (c Client) setupLeakReporting(creatorFunc int) {
160-
if clientLeakFunc == nil {
161-
return
162-
}
163-
c.creatorFunc = creatorFunc
164-
_, c.creatorFile, c.creatorLine, _ = runtime.Caller(2)
165-
buf := bufferpool.Default.Get(1e6)
166-
n := runtime.Stack(buf, false)
167-
c.creatorStack = string(buf[:n])
168-
bufferpool.Default.Put(buf)
169-
c.setFinalizer()
170-
}
171-
172154
// NewClient creates the first reference to a capability.
173155
// If hook is nil, then NewClient returns nil.
174156
//
@@ -187,7 +169,7 @@ func NewClient(hook ClientHook) Client {
187169
}
188170
h.resolvedHook = h
189171
c := Client{client: &client{h: h}}
190-
c.setupLeakReporting(1)
172+
setupLeakReporting(c)
191173
return c
192174
}
193175

@@ -216,7 +198,7 @@ func newPromisedClient(hook ClientHook) (Client, *clientPromise) {
216198
metadata: *NewMetadata(),
217199
}
218200
c := Client{client: &client{h: h}}
219-
c.setupLeakReporting(2)
201+
setupLeakReporting(c)
220202
return c, &clientPromise{h: h}
221203
}
222204

@@ -534,7 +516,7 @@ func (c Client) AddRef() Client {
534516
c.h.refs++
535517
c.h.mu.Unlock()
536518
d := Client{client: &client{h: c.h}}
537-
d.setupLeakReporting(3)
519+
setupLeakReporting(d)
538520
return d
539521
}
540522

@@ -682,7 +664,7 @@ func (ch *clientHook) isResolved() bool {
682664
}
683665
}
684666

685-
var clientLeakFunc func(string)
667+
var setupLeakReporting func(Client) = func(Client) {}
686668

687669
// SetClientLeakFunc sets a callback for reporting Clients that went
688670
// out of scope without being released. The callback is not guaranteed
@@ -691,45 +673,19 @@ var clientLeakFunc func(string)
691673
//
692674
// SetClientLeakFunc must not be called after any calls to NewClient or
693675
// NewPromisedClient.
694-
func SetClientLeakFunc(f func(msg string)) {
695-
clientLeakFunc = f
696-
}
697-
698-
func (c Client) setFinalizer() {
699-
runtime.SetFinalizer(c.client, finalizeClient)
700-
}
701-
702-
func finalizeClient(c *client) {
703-
// Since there are no other references to c, then we don't have to
704-
// acquire the mutex to read.
705-
if c.released {
706-
return
707-
}
708-
709-
var fname string
710-
switch c.creatorFunc {
711-
case 1:
712-
fname = "NewClient"
713-
case 2:
714-
fname = "NewPromisedClient"
715-
case 3:
716-
fname = "AddRef"
717-
default:
718-
fname = "<???>"
719-
}
720-
var msg string
721-
if c.creatorFile == "" {
722-
msg = "leaked client created by " + fname
723-
} else {
724-
msg = "leaked client created by " + fname + " on " +
725-
c.creatorFile + ":" + str.Itod(c.creatorLine)
726-
}
727-
if c.creatorStack != "" {
728-
msg += "\nCreation stack trace:\n" + c.creatorStack + "\n"
676+
func SetClientLeakFunc(clientLeakFunc func(msg string)) {
677+
setupLeakReporting = func(c Client) {
678+
buf := bufferpool.Default.Get(1e6)
679+
n := runtime.Stack(buf, false)
680+
stack := string(buf[:n])
681+
bufferpool.Default.Put(buf)
682+
runtime.SetFinalizer(c.client, func(c *client) {
683+
if c.released {
684+
return
685+
}
686+
clientLeakFunc("leaked client created at:\n\n" + stack)
687+
})
729688
}
730-
731-
// finalizeClient will only be called if clientLeakFunc != nil.
732-
go clientLeakFunc(msg)
733689
}
734690

735691
// A ClientPromise resolves the identity of a client created by NewPromisedClient.
@@ -830,11 +786,7 @@ func (wc *WeakClient) AddRef() (c Client, ok bool) {
830786
wc.h.refs++
831787
wc.h.mu.Unlock()
832788
c = Client{client: &client{h: wc.h}}
833-
if clientLeakFunc != nil {
834-
c.creatorFunc = 3
835-
_, c.creatorFile, c.creatorLine, _ = runtime.Caller(1)
836-
c.setFinalizer()
837-
}
789+
setupLeakReporting(c)
838790
return c, true
839791
}
840792

message.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"errors"
77
"io"
8+
"net"
89
"sync"
910
"sync/atomic"
1011

@@ -653,6 +654,11 @@ func (e *Encoder) Encode(m *Message) error {
653654
return nil
654655
}
655656

657+
func (e *Encoder) write(bufs [][]byte) error {
658+
_, err := (*net.Buffers)(&bufs).WriteTo(e.w)
659+
return err
660+
}
661+
656662
// Marshal concatenates the segments in the message into a single byte
657663
// slice including framing.
658664
func (m *Message) Marshal() ([]byte, error) {

message_go18.go

Lines changed: 0 additions & 11 deletions
This file was deleted.

message_other.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

rpc/level0_test.go

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -918,11 +918,7 @@ func TestRecvBootstrapCall(t *testing.T) {
918918
})
919919
defer func() {
920920
finishTest(t, conn, p2)
921-
select {
922-
case <-srvShutdown:
923-
default:
924-
t.Error("Bootstrap client still alive after Close returned")
925-
}
921+
<-srvShutdown // Hangs if bootstrap client is never shut down.
926922
}()
927923

928924
ctx := context.Background()
@@ -1233,11 +1229,7 @@ func TestRecvBootstrapPipelineCall(t *testing.T) {
12331229
})
12341230
defer func() {
12351231
finishTest(t, conn, p2)
1236-
select {
1237-
case <-srvShutdown:
1238-
default:
1239-
t.Error("Bootstrap client still alive after Close returned")
1240-
}
1232+
<-srvShutdown // Will hang if closing does not shut down the client.
12411233
}()
12421234
ctx := context.Background()
12431235

@@ -1699,13 +1691,9 @@ func TestRecvCancel(t *testing.T) {
16991691
}
17001692

17011693
// 8. Verify that returned capability was shut down.
1702-
// There's no guarantee when the release/shutdown will happen, other
1703-
// than it will be released before Close returns.
1704-
select {
1705-
case <-retcapShutdown:
1706-
default:
1707-
t.Error("returned capability was not shut down")
1708-
}
1694+
// There's no guarantee exactly when the release/shutdown will happen,
1695+
// but Close should trigger it. Otherwise, this will hang:
1696+
<-retcapShutdown
17091697
}
17101698

17111699
// TestSendCancel makes a call, cancels the Context, then checks to

rpc/senderpromise_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,15 +153,15 @@ func TestResolveUnimplementedDrop(t *testing.T) {
153153
assert.Equal(t, rpccp.CapDescriptor_Which_senderPromise, desc.Which)
154154
emptyExportID = desc.SenderPromise
155155
}
156-
// 7. Fulfill promise
156+
// 6. Fulfill promise
157157
{
158158
pp := testcapnp.Empty_ServerToClient(emptyShutdowner{
159159
onShutdown: onShutdown,
160160
})
161161
r.Fulfill(pp)
162162
pp.Release()
163163
}
164-
// 8. Receive resolve, send unimplemented
164+
// 7. Receive resolve, send unimplemented
165165
{
166166
rmsg, release, err := recvMessage(ctx, p2)
167167
assert.NoError(t, err)
@@ -176,12 +176,12 @@ func TestResolveUnimplementedDrop(t *testing.T) {
176176
Unimplemented: rmsg,
177177
}))
178178
}
179-
// 9. Drop the promise on our side. Otherwise it will stay alive because of
179+
// 8. Drop the promise on our side. Otherwise it will stay alive because of
180180
// the bootstrap interface:
181181
{
182182
p.Release()
183183
}
184-
// 6. Send finish
184+
// 9. Send finish
185185
{
186186
assert.NoError(t, sendMessage(ctx, p2, &rpcMessage{
187187
Which: rpccp.Message_Which_finish,

server/server.go

Lines changed: 20 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (c *Call) Go() {
6969
return
7070
}
7171
c.acked = true
72-
go c.srv.handleCalls(c.srv.handleCallsCtx)
72+
go c.srv.handleCalls()
7373
}
7474

7575
// Shutdowner is the interface that wraps the Shutdown method.
@@ -84,15 +84,6 @@ type Server struct {
8484
brand any
8585
shutdown Shutdowner
8686

87-
// Cancels handleCallsCtx
88-
cancelHandleCalls context.CancelFunc
89-
90-
// Context used by the goroutine running handleCalls(). Note
91-
// the calls themselves will have different contexts, which
92-
// are not children of this context, but are supplied by
93-
// start().
94-
handleCallsCtx context.Context
95-
9687
// wg is incremented each time a method is queued, and
9788
// decremented after it is handled.
9889
wg sync.WaitGroup
@@ -114,19 +105,15 @@ func (s *Server) String() string {
114105
// guarantees message delivery order by blocking each call on the
115106
// return of the previous call or a call to Call.Go.
116107
func New(methods []Method, brand any, shutdown Shutdowner) *Server {
117-
ctx, cancel := context.WithCancel(context.Background())
118-
119108
srv := &Server{
120-
methods: make(sortedMethods, len(methods)),
121-
brand: brand,
122-
shutdown: shutdown,
123-
callQueue: mpsc.New[*Call](),
124-
cancelHandleCalls: cancel,
125-
handleCallsCtx: ctx,
109+
methods: make(sortedMethods, len(methods)),
110+
brand: brand,
111+
shutdown: shutdown,
112+
callQueue: mpsc.New[*Call](),
126113
}
127114
copy(srv.methods, methods)
128115
sort.Sort(srv.methods)
129-
go srv.handleCalls(ctx)
116+
go srv.handleCalls()
130117
return srv
131118
}
132119

@@ -171,38 +158,20 @@ func (srv *Server) Recv(ctx context.Context, r capnp.Recv) capnp.PipelineCaller
171158
return srv.start(ctx, mm, r)
172159
}
173160

174-
func (srv *Server) handleCalls(ctx context.Context) {
161+
func (srv *Server) handleCalls() {
162+
ctx := context.Background()
175163
for {
176164
call, err := srv.callQueue.Recv(ctx)
177165
if err != nil {
178-
// Context has been canceled; drain the rest of the queue,
179-
// invoking handleCall() with the cancelled context to
180-
// trigger cleanup.
181-
var ok bool
182-
call, ok = srv.callQueue.TryRecv()
183-
if !ok {
184-
return
166+
// Queue closed; wait for outstanding calls and shut down.
167+
if srv.shutdown != nil {
168+
srv.wg.Wait()
169+
srv.shutdown.Shutdown()
185170
}
171+
return
186172
}
187173

188-
// The context for the individual call is not necessarily
189-
// related to the context managing the server's lifetime
190-
// (ctx); we need to monitor both and pass the call a
191-
// context that will be canceled if *either* context is
192-
// cancelled.
193-
callCtx, cancelCall := context.WithCancel(call.ctx)
194-
go func() {
195-
defer cancelCall()
196-
select {
197-
case <-callCtx.Done():
198-
case <-ctx.Done():
199-
}
200-
}()
201-
func() {
202-
defer cancelCall()
203-
srv.handleCall(callCtx, call)
204-
}()
205-
174+
srv.handleCall(call)
206175
if call.acked {
207176
// Another goroutine has taken over; time
208177
// to retire.
@@ -211,10 +180,10 @@ func (srv *Server) handleCalls(ctx context.Context) {
211180
}
212181
}
213182

214-
func (srv *Server) handleCall(ctx context.Context, c *Call) {
183+
func (srv *Server) handleCall(c *Call) {
215184
defer srv.wg.Done()
216185

217-
err := c.method.Impl(ctx, c)
186+
err := c.method.Impl(c.ctx, c)
218187

219188
c.recv.ReleaseArgs()
220189
c.recv.Returner.PrepareReturn(err)
@@ -246,15 +215,11 @@ func (srv *Server) Brand() capnp.Brand {
246215
return capnp.Brand{Value: serverBrand{srv.brand}}
247216
}
248217

249-
// Shutdown waits for ongoing calls to finish and calls Shutdown on the
250-
// Shutdowner passed into NewServer. Shutdown must not be called more
251-
// than once.
218+
// Shutdown arranges for Shutdown to be called on the Shutdowner passed
219+
// into NewServer after outstanding all calls have been serviced.
220+
// Shutdown must not be called more than once.
252221
func (srv *Server) Shutdown() {
253-
srv.cancelHandleCalls()
254-
srv.wg.Wait()
255-
if srv.shutdown != nil {
256-
srv.shutdown.Shutdown()
257-
}
222+
srv.callQueue.Close()
258223
}
259224

260225
// IsServer reports whether a brand returned by capnp.Client.Brand

0 commit comments

Comments
 (0)