Skip to content

Commit 074c7da

Browse files
committed
Add binary logger option for client and server
1 parent 36e4810 commit 074c7da

File tree

7 files changed

+301
-92
lines changed

7 files changed

+301
-92
lines changed

default_dial_option_server_option_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {
3838

3939
// Set and check the DialOptions
4040
opts := []DialOption{WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials())}
41-
internal.AddExtraDialOptions.(func(opt ...DialOption))(opts...)
41+
internal.AddGlobalDialOptions.(func(opt ...DialOption))(opts...)
4242
for i, opt := range opts {
4343
if extraDialOptions[i] != opt {
4444
t.Fatalf("Unexpected extra dial option at index %d: %v != %v", i, extraDialOptions[i], opt)
@@ -52,7 +52,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {
5252
cc.Close()
5353
}
5454

55-
internal.ClearExtraDialOptions()
55+
internal.ClearGlobalDialOptions()
5656
if len(extraDialOptions) != 0 {
5757
t.Fatalf("Unexpected len of extraDialOptions: %d != 0", len(extraDialOptions))
5858
}
@@ -62,7 +62,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
6262
const maxRecvSize = 998765
6363
// Set and check the ServerOptions
6464
opts := []ServerOption{Creds(insecure.NewCredentials()), MaxRecvMsgSize(maxRecvSize)}
65-
internal.AddExtraServerOptions.(func(opt ...ServerOption))(opts...)
65+
internal.AddGlobalServerOptions.(func(opt ...ServerOption))(opts...)
6666
for i, opt := range opts {
6767
if extraServerOptions[i] != opt {
6868
t.Fatalf("Unexpected extra server option at index %d: %v != %v", i, extraServerOptions[i], opt)
@@ -75,7 +75,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
7575
t.Fatalf("Unexpected s.opts.maxReceiveMessageSize: %d != %d", s.opts.maxReceiveMessageSize, maxRecvSize)
7676
}
7777

78-
internal.ClearExtraServerOptions()
78+
internal.ClearGlobalServerOptions()
7979
if len(extraServerOptions) != 0 {
8080
t.Fatalf("Unexpected len of extraServerOptions: %d != 0", len(extraServerOptions))
8181
}

dialoptions.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,18 @@ import (
2929
"google.golang.org/grpc/credentials/insecure"
3030
"google.golang.org/grpc/internal"
3131
internalbackoff "google.golang.org/grpc/internal/backoff"
32+
"google.golang.org/grpc/internal/binarylog"
3233
"google.golang.org/grpc/internal/transport"
3334
"google.golang.org/grpc/keepalive"
3435
"google.golang.org/grpc/resolver"
3536
"google.golang.org/grpc/stats"
3637
)
3738

3839
func init() {
39-
internal.AddExtraDialOptions = func(opt ...DialOption) {
40+
internal.AddGlobalDialOptions = func(opt ...DialOption) {
4041
extraDialOptions = append(extraDialOptions, opt...)
4142
}
42-
internal.ClearExtraDialOptions = func() {
43+
internal.ClearGlobalDialOptions = func() {
4344
extraDialOptions = nil
4445
}
4546
}
@@ -61,6 +62,7 @@ type dialOptions struct {
6162
timeout time.Duration
6263
scChan <-chan ServiceConfig
6364
authority string
65+
binaryLogger binarylog.Logger
6466
copts transport.ConnectOptions
6567
callOptions []CallOption
6668
channelzParentID *channelz.Identifier
@@ -401,6 +403,14 @@ func WithStatsHandler(h stats.Handler) DialOption {
401403
})
402404
}
403405

406+
// WithBinaryLogger returns a DialOption that specifies the binary logger for
407+
// this ClientConn.
408+
func WithBinaryLogger(bl binarylog.Logger) DialOption {
409+
return newFuncDialOption(func(o *dialOptions) {
410+
o.binaryLogger = bl
411+
})
412+
}
413+
404414
// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
405415
// non-temporary dial errors. If f is true, and dialer returns a non-temporary
406416
// error, gRPC will fail the connection to the network address and won't try to

gcp/observability/opencensus.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ func startOpenCensus(config *config) error {
117117
}
118118

119119
// Only register default StatsHandlers if other things are setup correctly.
120-
internal.AddExtraServerOptions.(func(opt ...grpc.ServerOption))(grpc.StatsHandler(&ocgrpc.ServerHandler{StartOptions: so}))
121-
internal.AddExtraDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(&ocgrpc.ClientHandler{StartOptions: so}))
120+
internal.AddGlobalServerOptions.(func(opt ...grpc.ServerOption))(grpc.StatsHandler(&ocgrpc.ServerHandler{StartOptions: so}))
121+
internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(&ocgrpc.ClientHandler{StartOptions: so}))
122122
logger.Infof("Enabled OpenCensus StatsHandlers for clients and servers")
123123

124124
return nil
@@ -128,8 +128,8 @@ func startOpenCensus(config *config) error {
128128
// packages if exporter was created.
129129
func stopOpenCensus() {
130130
if exporter != nil {
131-
internal.ClearExtraDialOptions()
132-
internal.ClearExtraServerOptions()
131+
internal.ClearGlobalDialOptions()
132+
internal.ClearGlobalServerOptions()
133133

134134
// Call these unconditionally, doesn't matter if not registered, will be
135135
// a noop if not registered.

internal/internal.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,20 +63,20 @@ var (
6363
// xDS-enabled server invokes this method on a grpc.Server when a particular
6464
// listener moves to "not-serving" mode.
6565
DrainServerTransports interface{} // func(*grpc.Server, string)
66-
// AddExtraServerOptions adds an array of ServerOption that will be
66+
// AddGlobalServerOptions adds an array of ServerOption that will be
6767
// effective globally for newly created servers. The priority will be: 1.
6868
// user-provided; 2. this method; 3. default values.
69-
AddExtraServerOptions interface{} // func(opt ...ServerOption)
70-
// ClearExtraServerOptions clears the array of extra ServerOption. This
69+
AddGlobalServerOptions interface{} // func(opt ...ServerOption)
70+
// ClearGlobalServerOptions clears the array of extra ServerOption. This
7171
// method is useful in testing and benchmarking.
72-
ClearExtraServerOptions func()
73-
// AddExtraDialOptions adds an array of DialOption that will be effective
72+
ClearGlobalServerOptions func()
73+
// AddGlobalDialOptions adds an array of DialOption that will be effective
7474
// globally for newly created client channels. The priority will be: 1.
7575
// user-provided; 2. this method; 3. default values.
76-
AddExtraDialOptions interface{} // func(opt ...DialOption)
77-
// ClearExtraDialOptions clears the array of extra DialOption. This
76+
AddGlobalDialOptions interface{} // func(opt ...DialOption)
77+
// ClearGlobalDialOptions clears the array of extra DialOption. This
7878
// method is useful in testing and benchmarking.
79-
ClearExtraDialOptions func()
79+
ClearGlobalDialOptions func()
8080
// JoinServerOptions combines the server options passed as arguments into a
8181
// single server option.
8282
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption

server.go

Lines changed: 90 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ func init() {
7373
internal.DrainServerTransports = func(srv *Server, addr string) {
7474
srv.drainServerTransports(addr)
7575
}
76-
internal.AddExtraServerOptions = func(opt ...ServerOption) {
76+
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
7777
extraServerOptions = opt
7878
}
79-
internal.ClearExtraServerOptions = func() {
79+
internal.ClearGlobalServerOptions = func() {
8080
extraServerOptions = nil
8181
}
8282
internal.JoinServerOptions = newJoinServerOption
@@ -156,6 +156,7 @@ type serverOptions struct {
156156
streamInt StreamServerInterceptor
157157
chainUnaryInts []UnaryServerInterceptor
158158
chainStreamInts []StreamServerInterceptor
159+
binaryLogger binarylog.Logger
159160
inTapHandle tap.ServerInHandle
160161
statsHandlers []stats.Handler
161162
maxConcurrentStreams uint32
@@ -469,6 +470,14 @@ func StatsHandler(h stats.Handler) ServerOption {
469470
})
470471
}
471472

473+
// BinaryLogger returns a ServerOption that can set the binary logger for the
474+
// server.
475+
func BinaryLogger(bl binarylog.Logger) ServerOption {
476+
return newFuncServerOption(func(o *serverOptions) {
477+
o.binaryLogger = bl
478+
})
479+
}
480+
472481
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
473482
// unknown service handler. The provided method is a bidi-streaming RPC service
474483
// handler that will be invoked instead of returning the "unimplemented" gRPC
@@ -1216,9 +1225,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
12161225
}
12171226
}()
12181227
}
1219-
1220-
binlog := binarylog.GetMethodLogger(stream.Method())
1221-
if binlog != nil {
1228+
var binlogs []binarylog.MethodLogger
1229+
if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1230+
binlogs = append(binlogs, ml)
1231+
}
1232+
if s.opts.binaryLogger != nil {
1233+
if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1234+
binlogs = append(binlogs, ml)
1235+
}
1236+
}
1237+
if len(binlogs) != 0 {
12221238
ctx := stream.Context()
12231239
md, _ := metadata.FromIncomingContext(ctx)
12241240
logEntry := &binarylog.ClientHeader{
@@ -1238,7 +1254,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
12381254
if peer, ok := peer.FromContext(ctx); ok {
12391255
logEntry.PeerAddr = peer.Addr
12401256
}
1241-
binlog.Log(logEntry)
1257+
for _, binlog := range binlogs {
1258+
binlog.Log(logEntry)
1259+
}
12421260
}
12431261

12441262
// comp and cp are used for compression. decomp and dc are used for
@@ -1278,7 +1296,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
12781296
}
12791297

12801298
var payInfo *payloadInfo
1281-
if len(shs) != 0 || binlog != nil {
1299+
if len(shs) != 0 || len(binlogs) != 0 {
12821300
payInfo = &payloadInfo{}
12831301
}
12841302
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
@@ -1304,10 +1322,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
13041322
Length: len(d),
13051323
})
13061324
}
1307-
if binlog != nil {
1308-
binlog.Log(&binarylog.ClientMessage{
1325+
if len(binlogs) != 0 {
1326+
cm := &binarylog.ClientMessage{
13091327
Message: d,
1310-
})
1328+
}
1329+
for _, binlog := range binlogs {
1330+
binlog.Log(cm)
1331+
}
13111332
}
13121333
if trInfo != nil {
13131334
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
@@ -1331,18 +1352,24 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
13311352
if e := t.WriteStatus(stream, appStatus); e != nil {
13321353
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
13331354
}
1334-
if binlog != nil {
1355+
if len(binlogs) != 0 {
13351356
if h, _ := stream.Header(); h.Len() > 0 {
13361357
// Only log serverHeader if there was header. Otherwise it can
13371358
// be trailer only.
1338-
binlog.Log(&binarylog.ServerHeader{
1359+
sh := &binarylog.ServerHeader{
13391360
Header: h,
1340-
})
1361+
}
1362+
for _, binlog := range binlogs {
1363+
binlog.Log(sh)
1364+
}
13411365
}
1342-
binlog.Log(&binarylog.ServerTrailer{
1366+
st := &binarylog.ServerTrailer{
13431367
Trailer: stream.Trailer(),
13441368
Err: appErr,
1345-
})
1369+
}
1370+
for _, binlog := range binlogs {
1371+
binlog.Log(st)
1372+
}
13461373
}
13471374
return appErr
13481375
}
@@ -1368,26 +1395,34 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
13681395
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
13691396
}
13701397
}
1371-
if binlog != nil {
1398+
if len(binlogs) != 0 {
13721399
h, _ := stream.Header()
1373-
binlog.Log(&binarylog.ServerHeader{
1400+
sh := &binarylog.ServerHeader{
13741401
Header: h,
1375-
})
1376-
binlog.Log(&binarylog.ServerTrailer{
1402+
}
1403+
st := &binarylog.ServerTrailer{
13771404
Trailer: stream.Trailer(),
13781405
Err: appErr,
1379-
})
1406+
}
1407+
for _, binlog := range binlogs {
1408+
binlog.Log(sh)
1409+
binlog.Log(st)
1410+
}
13801411
}
13811412
return err
13821413
}
1383-
if binlog != nil {
1414+
if len(binlogs) != 0 {
13841415
h, _ := stream.Header()
1385-
binlog.Log(&binarylog.ServerHeader{
1416+
sh := &binarylog.ServerHeader{
13861417
Header: h,
1387-
})
1388-
binlog.Log(&binarylog.ServerMessage{
1418+
}
1419+
sm := &binarylog.ServerMessage{
13891420
Message: reply,
1390-
})
1421+
}
1422+
for _, binlog := range binlogs {
1423+
binlog.Log(sh)
1424+
binlog.Log(sm)
1425+
}
13911426
}
13921427
if channelz.IsOn() {
13931428
t.IncrMsgSent()
@@ -1399,11 +1434,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
13991434
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
14001435
// error or allow the stats handler to see it?
14011436
err = t.WriteStatus(stream, statusOK)
1402-
if binlog != nil {
1403-
binlog.Log(&binarylog.ServerTrailer{
1437+
if len(binlogs) != 0 {
1438+
st := &binarylog.ServerTrailer{
14041439
Trailer: stream.Trailer(),
14051440
Err: appErr,
1406-
})
1441+
}
1442+
for _, binlog := range binlogs {
1443+
binlog.Log(st)
1444+
}
14071445
}
14081446
return err
14091447
}
@@ -1516,8 +1554,15 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
15161554
}()
15171555
}
15181556

1519-
ss.binlog = binarylog.GetMethodLogger(stream.Method())
1520-
if ss.binlog != nil {
1557+
if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1558+
ss.binlogs = append(ss.binlogs, ml)
1559+
}
1560+
if s.opts.binaryLogger != nil {
1561+
if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1562+
ss.binlogs = append(ss.binlogs, ml)
1563+
}
1564+
}
1565+
if len(ss.binlogs) != 0 {
15211566
md, _ := metadata.FromIncomingContext(ctx)
15221567
logEntry := &binarylog.ClientHeader{
15231568
Header: md,
@@ -1536,7 +1581,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
15361581
if peer, ok := peer.FromContext(ss.Context()); ok {
15371582
logEntry.PeerAddr = peer.Addr
15381583
}
1539-
ss.binlog.Log(logEntry)
1584+
for _, binlog := range ss.binlogs {
1585+
binlog.Log(logEntry)
1586+
}
15401587
}
15411588

15421589
// If dc is set and matches the stream's compression, use it. Otherwise, try
@@ -1602,11 +1649,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
16021649
ss.mu.Unlock()
16031650
}
16041651
t.WriteStatus(ss.s, appStatus)
1605-
if ss.binlog != nil {
1606-
ss.binlog.Log(&binarylog.ServerTrailer{
1652+
if len(ss.binlogs) != 0 {
1653+
st := &binarylog.ServerTrailer{
16071654
Trailer: ss.s.Trailer(),
16081655
Err: appErr,
1609-
})
1656+
}
1657+
for _, binlog := range ss.binlogs {
1658+
binlog.Log(st)
1659+
}
16101660
}
16111661
// TODO: Should we log an error from WriteStatus here and below?
16121662
return appErr
@@ -1617,11 +1667,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
16171667
ss.mu.Unlock()
16181668
}
16191669
err = t.WriteStatus(ss.s, statusOK)
1620-
if ss.binlog != nil {
1621-
ss.binlog.Log(&binarylog.ServerTrailer{
1670+
if len(ss.binlogs) != 0 {
1671+
st := &binarylog.ServerTrailer{
16221672
Trailer: ss.s.Trailer(),
16231673
Err: appErr,
1624-
})
1674+
}
1675+
for _, binlog := range ss.binlogs {
1676+
binlog.Log(st)
1677+
}
16251678
}
16261679
return err
16271680
}

0 commit comments

Comments
 (0)