Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions x/sync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (c *client) GetRangeProof(
// [parseFn] parses the raw response.
// If the request is unsuccessful or the response can't be parsed,
// retries the request to a different peer until [ctx] expires.
// Returns [errAppRequestSendFailed] if we fail to send an AppRequest.
// Returns [errAppSendFailed] if we fail to send an AppRequest/AppResponse.
// This should be treated as a fatal error.
func getAndParse[T any](
ctx context.Context,
Expand All @@ -301,7 +301,7 @@ func getAndParse[T any](
}
}

if errors.Is(err, errAppRequestSendFailed) {
if errors.Is(err, errAppSendFailed) {
// Failing to send an AppRequest is a fatal error.
return nil, err
}
Expand Down Expand Up @@ -340,7 +340,7 @@ func getAndParse[T any](
// until the node receives a response, failure notification
// or [ctx] is canceled.
// Returns the peer's NodeID and response.
// Returns [errAppRequestSendFailed] if we failed to send an AppRequest.
// Returns [errAppSendFailed] if we failed to send an AppRequest/AppResponse.
// This should be treated as fatal.
// It's safe to call this method multiple times concurrently.
func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) {
Expand Down
6 changes: 3 additions & 3 deletions x/sync/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,18 +794,18 @@ func TestAppRequestSendFailed(t *testing.T) {
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(ids.NodeID{}, nil, errAppRequestSendFailed).Times(2)
).Return(ids.NodeID{}, nil, errAppSendFailed).Times(2)

_, err := client.GetChangeProof(
context.Background(),
&pb.SyncGetChangeProofRequest{},
nil, // database is unused
)
require.ErrorIs(err, errAppRequestSendFailed)
require.ErrorIs(err, errAppSendFailed)

_, err = client.GetRangeProof(
context.Background(),
&pb.SyncGetRangeProofRequest{},
)
require.ErrorIs(err, errAppRequestSendFailed)
require.ErrorIs(err, errAppSendFailed)
}
17 changes: 13 additions & 4 deletions x/sync/network_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ const minRequestHandlingDuration = 100 * time.Millisecond
var (
_ NetworkClient = (*networkClient)(nil)

errAcquiringSemaphore = errors.New("error acquiring semaphore")
errRequestFailed = errors.New("request failed")
errAppRequestSendFailed = errors.New("failed to send AppRequest")
errAcquiringSemaphore = errors.New("error acquiring semaphore")
errRequestFailed = errors.New("request failed")
errAppSendFailed = errors.New("failed to send app message")
)

// NetworkClient defines ability to send request / response through the Network
Expand Down Expand Up @@ -189,6 +189,7 @@ func (c *networkClient) getRequestHandler(requestID uint32) (ResponseHandler, bo
return handler, true
}

// If [errAppSendFailed] is returned this should be considered fatal.
func (c *networkClient) RequestAny(
ctx context.Context,
minVersion *version.Application,
Expand All @@ -212,6 +213,7 @@ func (c *networkClient) RequestAny(
return nodeID, response, err
}

// If [errAppSendFailed] is returned this should be considered fatal.
func (c *networkClient) Request(
ctx context.Context,
nodeID ids.NodeID,
Expand All @@ -229,6 +231,7 @@ func (c *networkClient) Request(

// Sends [request] to [nodeID] and returns the response.
// Returns an error if the request failed or [ctx] is canceled.
// If [errAppSendFailed] is returned this should be considered fatal.
// Blocks until a response is received or the [ctx] is canceled fails.
// Releases active requests semaphore if there was an error in sending the request.
// Assumes [nodeID] is never [c.myNodeID] since we guarantee
Expand All @@ -254,7 +257,13 @@ func (c *networkClient) request(
// Send an app request to the peer.
if err := c.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil {
c.lock.Unlock()
return nil, fmt.Errorf("%w: %s", errAppRequestSendFailed, err)
c.log.Fatal(
"failed to send app request",
zap.Stringer("nodeID", nodeID),
zap.Int("requestLen", len(request)),
zap.Error(err),
)
return nil, fmt.Errorf("%w: %s", errAppSendFailed, err)
}

handler := newResponseHandler()
Expand Down
66 changes: 50 additions & 16 deletions x/sync/network_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewNetworkServer(appSender common.AppSender, db DB, log logging.Logger) *Ne
}

// AppRequest is called by avalanchego -> VM when there is an incoming AppRequest from a peer.
// Never returns errors as they are considered fatal.
// Returns a non-nil error iff we fail to send an app message. This is a fatal error.
// Sends a response back to the sender if length of response returned by the handler > 0.
func (s *NetworkServer) AppRequest(
ctx context.Context,
Expand Down Expand Up @@ -121,14 +121,20 @@ func (s *NetworkServer) AppRequest(
return nil
}

if err != nil && !isTimeout(err) {
// log unexpected errors instead of returning them, since they are fatal.
s.log.Warn(
"unexpected error handling AppRequest",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
if err != nil {
if errors.Is(err, errAppSendFailed) {
return err
}

if !isTimeout(err) {
// log unexpected errors instead of returning them, since they are fatal.
s.log.Warn(
"unexpected error handling AppRequest",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
}
}
return nil
}
Expand All @@ -141,6 +147,7 @@ func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] {
}

// Generates a change proof and sends it to [nodeID].
// If [errAppSendFailed] is returned, this should be considered fatal.
func (s *NetworkServer) HandleChangeProofRequest(
ctx context.Context,
nodeID ids.NodeID,
Expand Down Expand Up @@ -212,8 +219,17 @@ func (s *NetworkServer) HandleChangeProofRequest(
return err
}

// TODO handle this fatal error
return s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes)
if err := s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes); err != nil {
s.log.Fatal(
"failed to send app response",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Int("responseLen", len(proofBytes)),
zap.Error(err),
)
return fmt.Errorf("%w: %s", errAppSendFailed, err)
}
return nil
}

// We generated a change proof. See if it's small enough.
Expand All @@ -227,8 +243,17 @@ func (s *NetworkServer) HandleChangeProofRequest(
}

if len(proofBytes) < bytesLimit {
// TODO handle this fatal error
return s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes)
if err := s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes); err != nil {
s.log.Fatal(
"failed to send app response",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Int("responseLen", len(proofBytes)),
zap.Error(err),
)
return fmt.Errorf("%w: %s", errAppSendFailed, err)
}
return nil
}

// The proof was too large. Try to shrink it.
Expand All @@ -238,7 +263,7 @@ func (s *NetworkServer) HandleChangeProofRequest(
}

// Generates a range proof and sends it to [nodeID].
// TODO danlaine how should we handle context cancellation?
// If [errAppSendFailed] is returned, this should be considered fatal.
func (s *NetworkServer) HandleRangeProofRequest(
ctx context.Context,
nodeID ids.NodeID,
Expand Down Expand Up @@ -276,8 +301,17 @@ func (s *NetworkServer) HandleRangeProofRequest(
if err != nil {
return err
}
// TODO handle this fatal error
return s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes)
if err := s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes); err != nil {
s.log.Fatal(
"failed to send app response",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Int("responseLen", len(proofBytes)),
zap.Error(err),
)
return fmt.Errorf("%w: %s", errAppSendFailed, err)
}
return nil
}

// Get the range proof specified by [req].
Expand Down
109 changes: 109 additions & 0 deletions x/sync/network_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,112 @@ func Test_Server_GetChangeProof(t *testing.T) {
})
}
}

// Test that AppRequest returns a non-nil error if we fail to send
// an AppRequest or AppResponse.
func TestAppRequestErrAppSendFailed(t *testing.T) {
startRootID := ids.GenerateTestID()
endRootID := ids.GenerateTestID()

type test struct {
name string
request *pb.Request
handlerFunc func(*gomock.Controller) *NetworkServer
expectedErr error
}

tests := []test{
{
name: "GetChangeProof",
request: &pb.Request{
Message: &pb.Request_ChangeProofRequest{
ChangeProofRequest: &pb.SyncGetChangeProofRequest{
StartRootHash: startRootID[:],
EndRootHash: endRootID[:],
StartKey: []byte{1},
EndKey: &pb.MaybeBytes{Value: []byte{2}},
KeyLimit: 100,
BytesLimit: 100,
},
},
},
handlerFunc: func(ctrl *gomock.Controller) *NetworkServer {
sender := common.NewMockSender(ctrl)
sender.EXPECT().SendAppResponse(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(errAppSendFailed).AnyTimes()

db := merkledb.NewMockMerkleDB(ctrl)
db.EXPECT().GetChangeProof(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&merkledb.ChangeProof{}, nil).Times(1)

return NewNetworkServer(sender, db, logging.NoLog{})
},
expectedErr: errAppSendFailed,
},
{
name: "GetRangeProof",
request: &pb.Request{
Message: &pb.Request_RangeProofRequest{
RangeProofRequest: &pb.SyncGetRangeProofRequest{
RootHash: endRootID[:],
StartKey: []byte{1},
EndKey: &pb.MaybeBytes{Value: []byte{2}},
KeyLimit: 100,
BytesLimit: 100,
},
},
},
handlerFunc: func(ctrl *gomock.Controller) *NetworkServer {
sender := common.NewMockSender(ctrl)
sender.EXPECT().SendAppResponse(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(errAppSendFailed).AnyTimes()

db := merkledb.NewMockMerkleDB(ctrl)
db.EXPECT().GetRangeProofAtRoot(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&merkledb.RangeProof{}, nil).Times(1)

return NewNetworkServer(sender, db, logging.NoLog{})
},
expectedErr: errAppSendFailed,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)

handler := tt.handlerFunc(ctrl)
requestBytes, err := proto.Marshal(tt.request)
require.NoError(err)

err = handler.AppRequest(
context.Background(),
ids.EmptyNodeID,
0,
time.Now().Add(10*time.Second),
requestBytes,
)
require.ErrorIs(err, tt.expectedErr)
})
}
}