@@ -82,10 +82,12 @@ type Connection struct {
82
82
// inFlightState records the state of the incoming and outgoing calls on a
83
83
// Connection.
84
84
type inFlightState struct {
85
- closing bool // disallow enqueuing further requests, and close the Closer when transitioning to idle
86
- readErr error
85
+ closing bool // disallow enqueuing further requests, and close the Closer when transitioning to idle
86
+ readErr error
87
+ writeErr error
87
88
88
- outgoing map [ID ]* AsyncCall // calls only
89
+ outgoingCalls map [ID ]* AsyncCall // calls only
90
+ outgoingNotifications int // # of notifications awaiting "write"
89
91
90
92
// incoming stores the total number of incoming calls and notifications
91
93
// that have not yet written or processed a result.
@@ -104,7 +106,7 @@ type inFlightState struct {
104
106
105
107
// updateInFlight locks the state of the connection's in-flight requests, allows
106
108
// f to mutate that state, and closes the connection if it is idle and either
107
- // is closing or has a read error.
109
+ // is closing or has a read or write error.
108
110
func (c * Connection ) updateInFlight (f func (* inFlightState )) {
109
111
c .stateMu .Lock ()
110
112
defer c .stateMu .Unlock ()
@@ -113,8 +115,8 @@ func (c *Connection) updateInFlight(f func(*inFlightState)) {
113
115
114
116
f (s )
115
117
116
- idle := s . incoming == 0 && len ( s . outgoing ) == 0 && ! s .handlerRunning
117
- if idle && (s .closing || s .readErr != nil ) && ! s .closed {
118
+ idle := len ( s . outgoingCalls ) == 0 && s . outgoingNotifications == 0 && s . incoming == 0 && ! s .handlerRunning
119
+ if idle && (s .closing || s .readErr != nil || s . writeErr != nil ) && ! s .closed {
118
120
c .closeErr <- c .closer .Close ()
119
121
if c .onDone != nil {
120
122
c .onDone ()
@@ -181,20 +183,42 @@ func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binde
181
183
// Notify invokes the target method but does not wait for a response.
182
184
// The params will be marshaled to JSON before sending over the wire, and will
183
185
// be handed to the method invoked.
184
- func (c * Connection ) Notify (ctx context.Context , method string , params interface {}) error {
185
- notify , err := NewNotification (method , params )
186
- if err != nil {
187
- return fmt .Errorf ("marshaling notify parameters: %v" , err )
188
- }
186
+ func (c * Connection ) Notify (ctx context.Context , method string , params interface {}) (err error ) {
189
187
ctx , done := event .Start (ctx , method ,
190
188
tag .Method .Of (method ),
191
189
tag .RPCDirection .Of (tag .Outbound ),
192
190
)
191
+ attempted := false
192
+
193
+ defer func () {
194
+ labelStatus (ctx , err )
195
+ done ()
196
+ if attempted {
197
+ c .updateInFlight (func (s * inFlightState ) {
198
+ s .outgoingNotifications --
199
+ })
200
+ }
201
+ }()
202
+
203
+ c .updateInFlight (func (s * inFlightState ) {
204
+ if s .writeErr != nil {
205
+ err = fmt .Errorf ("%w: %v" , ErrClientClosing , s .writeErr )
206
+ return
207
+ }
208
+ s .outgoingNotifications ++
209
+ attempted = true
210
+ })
211
+ if err != nil {
212
+ return err
213
+ }
214
+
215
+ notify , err := NewNotification (method , params )
216
+ if err != nil {
217
+ return fmt .Errorf ("marshaling notify parameters: %v" , err )
218
+ }
219
+
193
220
event .Metric (ctx , tag .Started .Of (1 ))
194
- err = c .write (ctx , notify )
195
- labelStatus (ctx , err )
196
- done ()
197
- return err
221
+ return c .write (ctx , notify )
198
222
}
199
223
200
224
// Call invokes the target method and returns an object that can be used to await the response.
@@ -239,10 +263,18 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{}
239
263
err = fmt .Errorf ("%w: %v" , ErrClientClosing , s .readErr )
240
264
return
241
265
}
242
- if s .outgoing == nil {
243
- s .outgoing = make (map [ID ]* AsyncCall )
266
+ if s .writeErr != nil {
267
+ // Don't start the call if the write end has failed, either.
268
+ // We have reason to believe that the write would not succeed,
269
+ // and if we avoid adding in-flight calls then eventually
270
+ // the connection will go idle and be closed.
271
+ err = fmt .Errorf ("%w: %v" , ErrClientClosing , s .writeErr )
272
+ return
273
+ }
274
+ if s .outgoingCalls == nil {
275
+ s .outgoingCalls = make (map [ID ]* AsyncCall )
244
276
}
245
- s .outgoing [ac .id ] = ac
277
+ s .outgoingCalls [ac .id ] = ac
246
278
})
247
279
if err != nil {
248
280
ac .retire (& Response {ID : id , Error : err })
@@ -254,8 +286,8 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{}
254
286
// Sending failed. We will never get a response, so deliver a fake one if it
255
287
// wasn't already retired by the connection breaking.
256
288
c .updateInFlight (func (s * inFlightState ) {
257
- if s .outgoing [ac .id ] == ac {
258
- delete (s .outgoing , ac .id )
289
+ if s .outgoingCalls [ac .id ] == ac {
290
+ delete (s .outgoingCalls , ac .id )
259
291
ac .retire (& Response {ID : id , Error : err })
260
292
} else {
261
293
// ac was already retired by the readIncoming goroutine:
@@ -405,8 +437,8 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter
405
437
406
438
case * Response :
407
439
c .updateInFlight (func (s * inFlightState ) {
408
- if ac , ok := s .outgoing [msg .ID ]; ok {
409
- delete (s .outgoing , msg .ID )
440
+ if ac , ok := s .outgoingCalls [msg .ID ]; ok {
441
+ delete (s .outgoingCalls , msg .ID )
410
442
ac .retire (msg )
411
443
} else {
412
444
// TODO: How should we report unexpected responses?
@@ -423,10 +455,10 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter
423
455
424
456
// Retire any outgoing requests that were still in flight: with the Reader no
425
457
// longer being processed, they necessarily cannot receive a response.
426
- for id , ac := range s .outgoing {
458
+ for id , ac := range s .outgoingCalls {
427
459
ac .retire (& Response {ID : id , Error : err })
428
460
}
429
- s .outgoing = nil
461
+ s .outgoingCalls = nil
430
462
})
431
463
}
432
464
@@ -482,6 +514,14 @@ func (c *Connection) acceptRequest(ctx context.Context, msg *Request, msgBytes i
482
514
err = ErrServerClosing
483
515
return
484
516
}
517
+
518
+ if s .writeErr != nil {
519
+ // The write side of the connection appears to be broken,
520
+ // so we won't be able to write a response to this request.
521
+ // Avoid unnecessary work to compute it.
522
+ err = fmt .Errorf ("%w: %v" , ErrServerClosing , s .writeErr )
523
+ return
524
+ }
485
525
}
486
526
})
487
527
if err != nil {
@@ -557,12 +597,19 @@ func (c *Connection) handleAsync() {
557
597
return
558
598
}
559
599
560
- var result interface {}
561
- err := req .ctx .Err ()
562
- if err == nil {
563
- // Only deliver to the Handler if not already cancelled.
564
- result , err = c .handler .Handle (req .ctx , req .Request )
600
+ // Only deliver to the Handler if not already canceled.
601
+ if err := req .ctx .Err (); err != nil {
602
+ c .updateInFlight (func (s * inFlightState ) {
603
+ if s .writeErr != nil {
604
+ // Assume that req.ctx was canceled due to s.writeErr.
605
+ // TODO(#51365): use a Context API to plumb this through req.ctx.
606
+ err = fmt .Errorf ("%w: %v" , ErrServerClosing , s .writeErr )
607
+ }
608
+ })
609
+ c .processResult ("handleAsync" , req , nil , err )
565
610
}
611
+
612
+ result , err := c .handler .Handle (req .ctx , req .Request )
566
613
c .processResult (c .handler , req , result , err )
567
614
}
568
615
}
@@ -646,12 +693,24 @@ func (c *Connection) write(ctx context.Context, msg Message) error {
646
693
n , err := writer .Write (ctx , msg )
647
694
event .Metric (ctx , tag .SentBytes .Of (n ))
648
695
649
- // TODO: if err != nil, that suggests that future writes will not succeed,
650
- // so we cannot possibly write the results of incoming Call requests.
651
- // If the read side of the connection is also broken, we also might not have
652
- // a way to receive cancellation notifications.
653
- //
654
- // Should we cancel the pending calls implicitly?
696
+ if err != nil && ctx .Err () == nil {
697
+ // The call to Write failed, and since ctx.Err() is nil we can't attribute
698
+ // the failure (even indirectly) to Context cancellation. The writer appears
699
+ // to be broken, and future writes are likely to also fail.
700
+ //
701
+ // If the read side of the connection is also broken, we might not even be
702
+ // able to receive cancellation notifications. Since we can't reliably write
703
+ // the results of incoming calls and can't receive explicit cancellations,
704
+ // cancel the calls now.
705
+ c .updateInFlight (func (s * inFlightState ) {
706
+ if s .writeErr == nil {
707
+ s .writeErr = err
708
+ for _ , r := range s .incomingByID {
709
+ r .cancel ()
710
+ }
711
+ }
712
+ })
713
+ }
655
714
656
715
return err
657
716
}
0 commit comments