Skip to content

Commit e8ae632

Browse files
elekanuengineer
authored andcommitted
HDDS-2068. Make StorageContainerDatanodeProtocolService message based
Signed-off-by: Anu Engineer <[email protected]>
1 parent 0d2d6f9 commit e8ae632

File tree

8 files changed

+265
-156
lines changed

8 files changed

+265
-156
lines changed

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.hadoop.hdds.protocol.proto
2525
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
2626

27+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest;
28+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest.Builder;
29+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeResponse;
2730
import org.apache.hadoop.hdds.protocol.proto
2831
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
2932
import org.apache.hadoop.hdds.protocol.proto
@@ -38,13 +41,15 @@
3841
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
3942
import org.apache.hadoop.hdds.protocol.proto
4043
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
44+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
4145
import org.apache.hadoop.ipc.ProtobufHelper;
4246
import org.apache.hadoop.ipc.ProtocolTranslator;
4347
import org.apache.hadoop.ipc.RPC;
4448
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
4549

4650
import java.io.Closeable;
4751
import java.io.IOException;
52+
import java.util.function.Consumer;
4853

4954
/**
5055
* This class is the client-side translator to translate the requests made on
@@ -96,6 +101,25 @@ public Object getUnderlyingProxyObject() {
96101
return rpcProxy;
97102
}
98103

104+
/**
105+
* Helper method to wrap the request and send the message.
106+
*/
107+
private SCMDatanodeResponse submitRequest(Type type,
108+
Consumer<SCMDatanodeRequest.Builder> builderConsumer) throws IOException {
109+
final SCMDatanodeResponse response;
110+
try {
111+
Builder builder = SCMDatanodeRequest.newBuilder()
112+
.setCmdType(type);
113+
builderConsumer.accept(builder);
114+
SCMDatanodeRequest wrapper = builder.build();
115+
116+
response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
117+
} catch (ServiceException ex) {
118+
throw ProtobufHelper.getRemoteException(ex);
119+
}
120+
return response;
121+
}
122+
99123
/**
100124
* Returns SCM version.
101125
*
@@ -104,16 +128,11 @@ public Object getUnderlyingProxyObject() {
104128
*/
105129
@Override
106130
public SCMVersionResponseProto getVersion(SCMVersionRequestProto
107-
unused) throws IOException {
108-
SCMVersionRequestProto request =
109-
SCMVersionRequestProto.newBuilder().build();
110-
final SCMVersionResponseProto response;
111-
try {
112-
response = rpcProxy.getVersion(NULL_RPC_CONTROLLER, request);
113-
} catch (ServiceException ex) {
114-
throw ProtobufHelper.getRemoteException(ex);
115-
}
116-
return response;
131+
request) throws IOException {
132+
return submitRequest(Type.GetVersion,
133+
(builder) -> builder
134+
.setGetVersionRequest(SCMVersionRequestProto.newBuilder().build()))
135+
.getGetVersionResponse();
117136
}
118137

119138
/**
@@ -126,13 +145,9 @@ public SCMVersionResponseProto getVersion(SCMVersionRequestProto
126145
@Override
127146
public SCMHeartbeatResponseProto sendHeartbeat(
128147
SCMHeartbeatRequestProto heartbeat) throws IOException {
129-
final SCMHeartbeatResponseProto resp;
130-
try {
131-
resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, heartbeat);
132-
} catch (ServiceException e) {
133-
throw ProtobufHelper.getRemoteException(e);
134-
}
135-
return resp;
148+
return submitRequest(Type.SendHeartbeat,
149+
(builder) -> builder.setSendHeartbeatRequest(heartbeat))
150+
.getSendHeartbeatResponse();
136151
}
137152

138153
/**
@@ -155,13 +170,8 @@ public SCMRegisteredResponseProto register(
155170
req.setContainerReport(containerReportsRequestProto);
156171
req.setPipelineReports(pipelineReportsProto);
157172
req.setNodeReport(nodeReport);
158-
final SCMRegisteredResponseProto response;
159-
try {
160-
response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build());
161-
} catch (ServiceException e) {
162-
throw ProtobufHelper.getRemoteException(e);
163-
}
164-
return response;
173+
return submitRequest(Type.Register,
174+
(builder) -> builder.setRegisterRequest(req))
175+
.getRegisterResponse();
165176
}
166-
167177
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java

Lines changed: 67 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,24 @@
1616
*/
1717
package org.apache.hadoop.ozone.protocolPB;
1818

19-
import com.google.protobuf.RpcController;
20-
import com.google.protobuf.ServiceException;
21-
import org.apache.hadoop.hdds.protocol.proto
22-
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
23-
import org.apache.hadoop.hdds.protocol.proto
24-
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
25-
import org.apache.hadoop.hdds.protocol.proto
26-
.StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
27-
import org.apache.hadoop.hdds.protocol.proto
28-
.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
29-
import org.apache.hadoop.hdds.protocol.proto
30-
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
31-
import org.apache.hadoop.hdds.protocol.proto
32-
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
33-
import org.apache.hadoop.hdds.protocol.proto
34-
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
35-
import org.apache.hadoop.hdds.protocol.proto
36-
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
37-
import org.apache.hadoop.hdds.protocol.proto
38-
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
19+
import java.io.IOException;
20+
21+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
22+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
23+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
24+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest;
25+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeResponse;
26+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
27+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
28+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Status;
29+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
30+
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
3931
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
4032

41-
import java.io.IOException;
33+
import com.google.protobuf.RpcController;
34+
import com.google.protobuf.ServiceException;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
4237

4338
/**
4439
* This class is the server-side translator that forwards requests received on
@@ -48,47 +43,71 @@
4843
public class StorageContainerDatanodeProtocolServerSideTranslatorPB
4944
implements StorageContainerDatanodeProtocolPB {
5045

46+
private static final Logger LOG = LoggerFactory
47+
.getLogger(StorageContainerDatanodeProtocolServerSideTranslatorPB.class);
48+
5149
private final StorageContainerDatanodeProtocol impl;
50+
private final OzoneProtocolMessageDispatcher<SCMDatanodeRequest,
51+
SCMDatanodeResponse> dispatcher;
5252

5353
public StorageContainerDatanodeProtocolServerSideTranslatorPB(
54-
StorageContainerDatanodeProtocol impl) {
54+
StorageContainerDatanodeProtocol impl,
55+
ProtocolMessageMetrics protocolMessageMetrics) {
5556
this.impl = impl;
57+
dispatcher =
58+
new OzoneProtocolMessageDispatcher<>("SCMDatanodeProtocol",
59+
protocolMessageMetrics,
60+
LOG);
5661
}
5762

58-
@Override
59-
public SCMVersionResponseProto getVersion(RpcController controller,
60-
SCMVersionRequestProto request)
61-
throws ServiceException {
62-
try {
63-
return impl.getVersion(request);
64-
} catch (IOException e) {
65-
throw new ServiceException(e);
66-
}
63+
public SCMRegisteredResponseProto register(
64+
SCMRegisterRequestProto request) throws IOException {
65+
ContainerReportsProto containerRequestProto = request
66+
.getContainerReport();
67+
NodeReportProto dnNodeReport = request.getNodeReport();
68+
PipelineReportsProto pipelineReport = request.getPipelineReports();
69+
return impl.register(request.getDatanodeDetails(), dnNodeReport,
70+
containerRequestProto, pipelineReport);
71+
6772
}
6873

6974
@Override
70-
public SCMRegisteredResponseProto register(RpcController controller,
71-
SCMRegisterRequestProto request) throws ServiceException {
72-
try {
73-
ContainerReportsProto containerRequestProto = request
74-
.getContainerReport();
75-
NodeReportProto dnNodeReport = request.getNodeReport();
76-
PipelineReportsProto pipelineReport = request.getPipelineReports();
77-
return impl.register(request.getDatanodeDetails(), dnNodeReport,
78-
containerRequestProto, pipelineReport);
79-
} catch (IOException e) {
80-
throw new ServiceException(e);
81-
}
75+
public SCMDatanodeResponse submitRequest(RpcController controller,
76+
SCMDatanodeRequest request) throws ServiceException {
77+
return dispatcher.processRequest(request, this::processMessage,
78+
request.getCmdType(), request.getTraceID());
8279
}
8380

84-
@Override
85-
public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller,
86-
SCMHeartbeatRequestProto request) throws ServiceException {
81+
public SCMDatanodeResponse processMessage(SCMDatanodeRequest request)
82+
throws ServiceException {
8783
try {
88-
return impl.sendHeartbeat(request);
84+
Type cmdType = request.getCmdType();
85+
switch (cmdType) {
86+
case GetVersion:
87+
return SCMDatanodeResponse.newBuilder()
88+
.setCmdType(cmdType)
89+
.setStatus(Status.OK)
90+
.setGetVersionResponse(
91+
impl.getVersion(request.getGetVersionRequest()))
92+
.build();
93+
case SendHeartbeat:
94+
return SCMDatanodeResponse.newBuilder()
95+
.setCmdType(cmdType)
96+
.setStatus(Status.OK)
97+
.setSendHeartbeatResponse(
98+
impl.sendHeartbeat(request.getSendHeartbeatRequest()))
99+
.build();
100+
case Register:
101+
return SCMDatanodeResponse.newBuilder()
102+
.setCmdType(cmdType)
103+
.setStatus(Status.OK)
104+
.setRegisterResponse(register(request.getRegisterRequest()))
105+
.build();
106+
default:
107+
throw new ServiceException("Unknown command type: " + cmdType);
108+
}
89109
} catch (IOException e) {
90110
throw new ServiceException(e);
91111
}
92112
}
93-
94113
}

hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,45 @@ package hadoop.hdds;
3434

3535
import "hdds.proto";
3636

37+
38+
message SCMDatanodeRequest {
39+
required Type cmdType = 1; // Type of the command
40+
41+
optional string traceID = 2;
42+
43+
optional SCMVersionRequestProto getVersionRequest = 3;
44+
optional SCMRegisterRequestProto registerRequest = 4;
45+
optional SCMHeartbeatRequestProto sendHeartbeatRequest = 5;
46+
}
47+
48+
message SCMDatanodeResponse {
49+
required Type cmdType = 1; // Type of the command
50+
51+
optional string traceID = 2;
52+
53+
optional bool success = 3 [default = true];
54+
55+
optional string message = 4;
56+
57+
required Status status = 5;
58+
59+
optional SCMVersionResponseProto getVersionResponse = 6;
60+
optional SCMRegisteredResponseProto registerResponse = 7;
61+
optional SCMHeartbeatResponseProto sendHeartbeatResponse = 8;
62+
63+
}
64+
65+
enum Type {
66+
GetVersion = 1;
67+
Register = 2;
68+
SendHeartbeat = 3;
69+
}
70+
71+
enum Status {
72+
OK = 1;
73+
ERROR = 2;
74+
}
75+
3776
/**
3877
* Request for version info of the software stack on the server.
3978
*/
@@ -385,21 +424,6 @@ message ReplicateContainerCommandProto {
385424
*/
386425
service StorageContainerDatanodeProtocolService {
387426

388-
/**
389-
* Gets the version information from the SCM.
390-
*/
391-
rpc getVersion (SCMVersionRequestProto) returns (SCMVersionResponseProto);
392-
393-
/**
394-
* Registers a data node with SCM.
395-
*/
396-
rpc register (SCMRegisterRequestProto) returns (SCMRegisteredResponseProto);
397-
398-
/**
399-
* Send heartbeat from datanode to SCM. HB's under SCM looks more
400-
* like life line protocol than HB's under HDFS. In other words, it is
401-
* extremely light weight and contains no data payload.
402-
*/
403-
rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
404-
427+
//Message sent from Datanode to SCM as a heartbeat.
428+
rpc submitRequest (SCMDatanodeRequest) returns (SCMDatanodeResponse);
405429
}

hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import org.apache.hadoop.ipc.ProtobufRpcEngine;
3030
import org.apache.hadoop.ipc.RPC;
3131
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
32+
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
3233
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
3334
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
3435
import org.apache.hadoop.test.GenericTestUtils;
3536

3637
import com.google.protobuf.BlockingService;
3738
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
39+
import org.mockito.Mockito;
3840

3941
/**
4042
* Test Endpoint class.
@@ -91,7 +93,7 @@ public static RPC.Server startScmRpcServer(Configuration configuration,
9193
StorageContainerDatanodeProtocolService.
9294
newReflectiveBlockingService(
9395
new StorageContainerDatanodeProtocolServerSideTranslatorPB(
94-
server));
96+
server, Mockito.mock(ProtocolMessageMetrics.class)));
9597

9698
RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss,
9799
StorageContainerDatanodeProtocolPB.class, scmDatanodeService,

0 commit comments

Comments
 (0)