Skip to content

Commit 4885f7c

Browse files
Bryan C. Millsgopherbot
Bryan C. Mills
authored andcommitted
internal/jsonrpc2_v2: eliminate a temporary connection leak in (*Server).run
Prior to this CL, (*Server).run only filters out inactive connections when it accepts a new connection. If existing connections complete, their associated resources can't be garbage-collected until either the next connection is accepted or the Listener is closed. This change moves the open-connection accounting to an explicit hook passed to newConnection, eliminating the need to call Wait entirely. For golang/go#46047 Change-Id: I3732cb463fcea0c142f17f2b1510fdfd2dbc81da Reviewed-on: https://go-review.googlesource.com/c/tools/+/388774 Auto-Submit: Bryan Mills <[email protected]> Reviewed-by: Ian Cottrell <[email protected]> gopls-CI: kokoro <[email protected]> Run-TryBot: Bryan Mills <[email protected]> TryBot-Result: Gopher Robot <[email protected]>
1 parent 739f55d commit 4885f7c

File tree

2 files changed

+22
-31
lines changed

2 files changed

+22
-31
lines changed

internal/jsonrpc2_v2/conn.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type Connection struct {
7474
handler Handler
7575

7676
onInternalError func(error)
77+
onDone func()
7778
}
7879

7980
// inFlightState records the state of the incoming and outgoing calls on a
@@ -113,6 +114,9 @@ func (c *Connection) updateInFlight(f func(*inFlightState)) {
113114
idle := s.incoming == 0 && len(s.outgoing) == 0 && !s.handlerRunning
114115
if idle && (s.closing || s.readErr != nil) && !s.closed {
115116
c.closeErr <- c.closer.Close()
117+
if c.onDone != nil {
118+
c.onDone()
119+
}
116120
s.closed = true
117121
}
118122
}
@@ -131,8 +135,13 @@ func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions
131135
}
132136

133137
// newConnection creates a new connection and runs it.
138+
//
134139
// This is used by the Dial and Serve functions to build the actual connection.
135-
func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) {
140+
//
141+
// The connection is closed automatically (and its resources cleaned up) when
142+
// the last request has completed after the underlying ReadWriteCloser breaks,
143+
// but it may be stopped earlier by calling Close (for a clean shutdown).
144+
func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binder, onDone func()) (*Connection, error) {
136145
// TODO: Should we create a new event span here?
137146
// This will propagate cancellation from ctx; should it?
138147
ctx := notDone{bindCtx}
@@ -141,6 +150,7 @@ func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binde
141150
closer: rwc,
142151
closeErr: make(chan error, 1),
143152
writer: make(chan Writer, 1),
153+
onDone: onDone,
144154
}
145155

146156
options, err := binder.Bind(bindCtx, c)

internal/jsonrpc2_v2/serve.go

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error
5454
if err != nil {
5555
return nil, err
5656
}
57-
return newConnection(ctx, rwc, binder)
57+
return newConnection(ctx, rwc, binder, nil)
5858
}
5959

6060
// Serve starts a new server listening for incoming connections and returns
@@ -84,53 +84,34 @@ func (s *Server) Wait() error {
8484
// duration, otherwise it exits only on error.
8585
func (s *Server) run(ctx context.Context) {
8686
defer s.async.done()
87-
var activeConns []*Connection
87+
88+
var activeConns sync.WaitGroup
8889
for {
89-
// we never close the accepted connection, we rely on the other end
90-
// closing or the socket closing itself naturally
90+
// We never close the accepted connection we rely on the other end
91+
// closing or the socket closing itself naturally.
9192
rwc, err := s.listener.Accept(ctx)
9293
if err != nil {
9394
if !isClosingError(err) {
9495
s.async.setError(err)
9596
}
96-
// we are done generating new connections for good
97+
// We are done generating new connections for good.
9798
break
9899
}
99100

100-
// see if any connections were closed while we were waiting
101-
activeConns = onlyActive(activeConns)
102-
103-
// a new inbound connection,
104-
conn, err := newConnection(ctx, rwc, s.binder)
101+
// A new inbound connection.
102+
activeConns.Add(1)
103+
_, err = newConnection(ctx, rwc, s.binder, activeConns.Done)
105104
if err != nil {
105+
activeConns.Done()
106106
if !isClosingError(err) {
107107
s.async.setError(err)
108108
s.listener.Close()
109109
break
110110
}
111111
continue
112112
}
113-
activeConns = append(activeConns, conn)
114-
}
115-
116-
// wait for all active conns to finish
117-
for _, c := range activeConns {
118-
c.Wait()
119-
}
120-
}
121-
122-
func onlyActive(conns []*Connection) []*Connection {
123-
i := 0
124-
for _, c := range conns {
125-
c.updateInFlight(func(s *inFlightState) {
126-
if !s.closed {
127-
conns[i] = c
128-
i++
129-
}
130-
})
131113
}
132-
// trim the slice down
133-
return conns[:i]
114+
activeConns.Wait()
134115
}
135116

136117
// isClosingError reports if the error occurs normally during the process of

0 commit comments

Comments
 (0)