@@ -51,17 +51,17 @@ type Connection struct {
51
51
closeOnce sync.Once
52
52
closer io.Closer
53
53
54
- writerBox chan Writer
55
- outgoingBox chan map [ID ]chan <- * Response
56
- incomingBox chan map [ID ]* incoming
57
- async * async
54
+ writer chan Writer
55
+ outgoing chan map [ID ]chan <- * Response
56
+ incoming chan map [ID ]* incoming
57
+ async * async
58
58
}
59
59
60
60
type AsyncCall struct {
61
- id ID
62
- response chan * Response // the channel a response will be delivered on
63
- resultBox chan asyncResult
64
- endSpan func () // close the tracing span when all processing for the message is complete
61
+ id ID
62
+ response chan * Response // the channel a response will be delivered on
63
+ result chan asyncResult
64
+ endSpan func () // close the tracing span when all processing for the message is complete
65
65
}
66
66
67
67
type asyncResult struct {
@@ -87,11 +87,11 @@ func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions
87
87
// This is used by the Dial and Serve functions to build the actual connection.
88
88
func newConnection (ctx context.Context , rwc io.ReadWriteCloser , binder Binder ) (* Connection , error ) {
89
89
c := & Connection {
90
- closer : rwc ,
91
- writerBox : make (chan Writer , 1 ),
92
- outgoingBox : make (chan map [ID ]chan <- * Response , 1 ),
93
- incomingBox : make (chan map [ID ]* incoming , 1 ),
94
- async : newAsync (),
90
+ closer : rwc ,
91
+ writer : make (chan Writer , 1 ),
92
+ outgoing : make (chan map [ID ]chan <- * Response , 1 ),
93
+ incoming : make (chan map [ID ]* incoming , 1 ),
94
+ async : newAsync (),
95
95
}
96
96
97
97
options , err := binder .Bind (ctx , c )
@@ -107,8 +107,8 @@ func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (
107
107
if options .Handler == nil {
108
108
options .Handler = defaultHandler {}
109
109
}
110
- c .outgoingBox <- make (map [ID ]chan <- * Response )
111
- c .incomingBox <- make (map [ID ]* incoming )
110
+ c .outgoing <- make (map [ID ]chan <- * Response )
111
+ c .incoming <- make (map [ID ]* incoming )
112
112
// the goroutines started here will continue until the underlying stream is closed
113
113
reader := options .Framer .Reader (rwc )
114
114
readToQueue := make (chan * incoming )
@@ -119,7 +119,7 @@ func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (
119
119
120
120
// releaseing the writer must be the last thing we do in case any requests
121
121
// are blocked waiting for the connection to be ready
122
- c .writerBox <- options .Framer .Writer (rwc )
122
+ c .writer <- options .Framer .Writer (rwc )
123
123
return c , nil
124
124
}
125
125
@@ -154,14 +154,14 @@ func (c *Connection) Notify(ctx context.Context, method string, params interface
154
154
// If sending the call failed, the response will be ready and have the error in it.
155
155
func (c * Connection ) Call (ctx context.Context , method string , params interface {}) * AsyncCall {
156
156
result := & AsyncCall {
157
- id : Int64ID (atomic .AddInt64 (& c .seq , 1 )),
158
- resultBox : make (chan asyncResult , 1 ),
157
+ id : Int64ID (atomic .AddInt64 (& c .seq , 1 )),
158
+ result : make (chan asyncResult , 1 ),
159
159
}
160
160
// generate a new request identifier
161
161
call , err := NewCall (result .id , method , params )
162
162
if err != nil {
163
163
//set the result to failed
164
- result .resultBox <- asyncResult {err : fmt .Errorf ("marshaling call parameters: %w" , err )}
164
+ result .result <- asyncResult {err : fmt .Errorf ("marshaling call parameters: %w" , err )}
165
165
return result
166
166
}
167
167
ctx , endSpan := event .Start (ctx , method ,
@@ -175,7 +175,7 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{}
175
175
// are racing the response.
176
176
// rchan is buffered in case the response arrives without a listener.
177
177
result .response = make (chan * Response , 1 )
178
- outgoing , ok := <- c .outgoingBox
178
+ outgoing , ok := <- c .outgoing
179
179
if ! ok {
180
180
// If the call failed due to (say) an I/O error or broken pipe, attribute it
181
181
// as such. (If the error is nil, then the connection must have been shut
@@ -193,7 +193,7 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{}
193
193
return result
194
194
}
195
195
outgoing [result .id ] = result .response
196
- c .outgoingBox <- outgoing
196
+ c .outgoing <- outgoing
197
197
// now we are ready to send
198
198
if err := c .write (ctx , call ); err != nil {
199
199
// sending failed, we will never get a response, so deliver a fake one
@@ -212,8 +212,8 @@ func (a *AsyncCall) ID() ID { return a.id }
212
212
// returned, or a call that failed to send in the first place.
213
213
func (a * AsyncCall ) IsReady () bool {
214
214
select {
215
- case r := <- a .resultBox :
216
- a .resultBox <- r
215
+ case r := <- a .result :
216
+ a .result <- r
217
217
return true
218
218
default :
219
219
return false
@@ -236,14 +236,14 @@ func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
236
236
r .result = response .Result
237
237
event .Label (ctx , tag .StatusCode .Of ("OK" ))
238
238
}
239
- case r = <- a .resultBox :
239
+ case r = <- a .result :
240
240
// result already available
241
241
case <- ctx .Done ():
242
242
event .Label (ctx , tag .StatusCode .Of ("CANCELLED" ))
243
243
return ctx .Err ()
244
244
}
245
245
// refill the box for the next caller
246
- a .resultBox <- r
246
+ a .result <- r
247
247
// and unpack the result
248
248
if r .err != nil {
249
249
return r .err
@@ -259,8 +259,8 @@ func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
259
259
// Respond must be called exactly once for any message for which a handler
260
260
// returns ErrAsyncResponse. It must not be called for any other message.
261
261
func (c * Connection ) Respond (id ID , result interface {}, rerr error ) error {
262
- pending := <- c .incomingBox
263
- defer func () { c .incomingBox <- pending }()
262
+ pending := <- c .incoming
263
+ defer func () { c .incoming <- pending }()
264
264
entry , found := pending [id ]
265
265
if ! found {
266
266
return nil
@@ -277,8 +277,8 @@ func (c *Connection) Respond(id ID, result interface{}, rerr error) error {
277
277
// not cause any messages that have not arrived yet with that ID to be
278
278
// cancelled.
279
279
func (c * Connection ) Cancel (id ID ) {
280
- pending := <- c .incomingBox
281
- defer func () { c .incomingBox <- pending }()
280
+ pending := <- c .incoming
281
+ defer func () { c .incoming <- pending }()
282
282
if entry , found := pending [id ]; found && entry .cancel != nil {
283
283
entry .cancel ()
284
284
entry .cancel = nil
@@ -310,10 +310,10 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch
310
310
defer func () {
311
311
// Retire any outgoing requests that were still in flight.
312
312
// With the Reader no longer being processed, they necessarily cannot receive a response.
313
- outgoing := <- c .outgoingBox
314
- close (c .outgoingBox ) // Prevent new outgoing requests, which would deadlock.
315
- for id , responseBox := range outgoing {
316
- responseBox <- & Response {ID : id , Error : err }
313
+ outgoing := <- c .outgoing
314
+ close (c .outgoing ) // Prevent new outgoing requests, which would deadlock.
315
+ for id , response := range outgoing {
316
+ response <- & Response {ID : id , Error : err }
317
317
}
318
318
319
319
close (toQueue )
@@ -352,9 +352,9 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch
352
352
// if the request is a call, add it to the incoming map so it can be
353
353
// cancelled by id
354
354
if msg .IsCall () {
355
- pending := <- c .incomingBox
355
+ pending := <- c .incoming
356
356
pending [msg .ID ] = entry
357
- c .incomingBox <- pending
357
+ c .incoming <- pending
358
358
}
359
359
// send the message to the incoming queue
360
360
toQueue <- entry
@@ -368,10 +368,10 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch
368
368
369
369
func (c * Connection ) incomingResponse (msg * Response ) {
370
370
var response chan <- * Response
371
- if outgoing , ok := <- c .outgoingBox ; ok {
371
+ if outgoing , ok := <- c .outgoing ; ok {
372
372
response = outgoing [msg .ID ]
373
373
delete (outgoing , msg .ID )
374
- c .outgoingBox <- outgoing
374
+ c .outgoing <- outgoing
375
375
}
376
376
if response != nil {
377
377
response <- msg
@@ -468,8 +468,8 @@ func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQ
468
468
func (c * Connection ) reply (entry * incoming , result interface {}, rerr error ) {
469
469
if entry .request .IsCall () {
470
470
// we have a call finishing, remove it from the incoming map
471
- pending := <- c .incomingBox
472
- defer func () { c .incomingBox <- pending }()
471
+ pending := <- c .incoming
472
+ defer func () { c .incoming <- pending }()
473
473
delete (pending , entry .request .ID )
474
474
}
475
475
if err := c .respond (entry , result , rerr ); err != nil {
@@ -527,8 +527,8 @@ func (c *Connection) respond(entry *incoming, result interface{}, rerr error) er
527
527
// write is used by all things that write outgoing messages, including replies.
528
528
// it makes sure that writes are atomic
529
529
func (c * Connection ) write (ctx context.Context , msg Message ) error {
530
- writer := <- c .writerBox
531
- defer func () { c .writerBox <- writer }()
530
+ writer := <- c .writer
531
+ defer func () { c .writer <- writer }()
532
532
n , err := writer .Write (ctx , msg )
533
533
event .Metric (ctx , tag .SentBytes .Of (n ))
534
534
return err
0 commit comments