@@ -175,9 +175,25 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{}
175
175
// are racing the response.
176
176
// rchan is buffered in case the response arrives without a listener.
177
177
result .response = make (chan * Response , 1 )
178
- pending := <- c .outgoingBox
179
- pending [result .id ] = result .response
180
- c .outgoingBox <- pending
178
+ outgoing , ok := <- c .outgoingBox
179
+ if ! ok {
180
+ // If the call failed due to (say) an I/O error or broken pipe, attribute it
181
+ // as such. (If the error is nil, then the connection must have been shut
182
+ // down cleanly.)
183
+ err := c .async .wait ()
184
+ if err == nil {
185
+ err = ErrClientClosing
186
+ }
187
+
188
+ resp , respErr := NewResponse (result .id , nil , err )
189
+ if respErr != nil {
190
+ panic (fmt .Errorf ("unexpected error from NewResponse: %w" , respErr ))
191
+ }
192
+ result .response <- resp
193
+ return result
194
+ }
195
+ outgoing [result .id ] = result .response
196
+ c .outgoingBox <- outgoing
181
197
// now we are ready to send
182
198
if err := c .write (ctx , call ); err != nil {
183
199
// sending failed, we will never get a response, so deliver a fake one
@@ -290,8 +306,19 @@ func (c *Connection) Close() error {
290
306
291
307
// readIncoming collects inbound messages from the reader and delivers them, either responding
292
308
// to outgoing calls or feeding requests to the queue.
293
- func (c * Connection ) readIncoming (ctx context.Context , reader Reader , toQueue chan <- * incoming ) {
294
- defer close (toQueue )
309
+ func (c * Connection ) readIncoming (ctx context.Context , reader Reader , toQueue chan <- * incoming ) (err error ) {
310
+ defer func () {
311
+ // Retire any outgoing requests that were still in flight.
312
+ // With the Reader no longer being processed, they necessarily cannot receive a response.
313
+ outgoing := <- c .outgoingBox
314
+ close (c .outgoingBox ) // Prevent new outgoing requests, which would deadlock.
315
+ for id , responseBox := range outgoing {
316
+ responseBox <- & Response {ID : id , Error : err }
317
+ }
318
+
319
+ close (toQueue )
320
+ }()
321
+
295
322
for {
296
323
// get the next message
297
324
// no lock is needed, this is the only reader
@@ -301,7 +328,7 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch
301
328
if ! isClosingError (err ) {
302
329
c .async .setError (err )
303
330
}
304
- return
331
+ return err
305
332
}
306
333
switch msg := msg .(type ) {
307
334
case * Request :
@@ -340,12 +367,12 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch
340
367
}
341
368
342
369
func (c * Connection ) incomingResponse (msg * Response ) {
343
- pending := <- c .outgoingBox
344
- response , ok := pending [msg .ID ]
345
- if ok {
346
- delete (pending , msg .ID )
370
+ var response chan <- * Response
371
+ if outgoing , ok := <- c .outgoingBox ; ok {
372
+ response = outgoing [msg .ID ]
373
+ delete (outgoing , msg .ID )
374
+ c .outgoingBox <- outgoing
347
375
}
348
- c .outgoingBox <- pending
349
376
if response != nil {
350
377
response <- msg
351
378
}
0 commit comments