Skip to content

Commit e28dc52

Browse files
simbadzinaomalley
authored andcommitted
HDFS-16669: Enhance client protocol to propagate last seen state IDs for multiple nameservices.
Fixes #4584 Signed-off-by: Owen O'Malley <[email protected]>
1 parent 4138661 commit e28dc52

File tree

5 files changed

+127
-3
lines changed

5 files changed

+127
-3
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public interface AlignmentContext {
4646
void updateResponseState(RpcResponseHeaderProto.Builder header);
4747

4848
/**
49-
* This is the intended client method call to implement to recieve state info
49+
* This is the intended client method call to implement to receive state info
5050
* during RPC response processing.
5151
*
5252
* @param header The RPC response header.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,7 @@ public static class Call implements Schedulable,
820820
private volatile String detailedMetricsName = "";
821821
final int callId; // the client's call id
822822
final int retryCount; // the retry count of the call
823-
long timestampNanos; // time the call was received
823+
private final long timestampNanos; // time the call was received
824824
long responseTimestampNanos; // time the call was served
825825
private AtomicInteger responseWaitCount = new AtomicInteger(1);
826826
final RPC.RpcKind rpcKind;
@@ -1002,6 +1002,10 @@ public void setDeferredResponse(Writable response) {
10021002

10031003
public void setDeferredError(Throwable t) {
10041004
}
1005+
1006+
public long getTimestampNanos() {
1007+
return timestampNanos;
1008+
}
10051009
}
10061010

10071011
/** A RPC extended call queued for handling. */
@@ -1083,7 +1087,7 @@ public Void run() throws Exception {
10831087

10841088
try {
10851089
value = call(
1086-
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
1090+
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
10871091
} catch (Throwable e) {
10881092
populateResponseParamsOnError(e, responseParams);
10891093
}

hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
9191
optional RPCTraceInfoProto traceInfo = 6; // tracing info
9292
optional RPCCallerContextProto callerContext = 7; // call context
9393
optional int64 stateId = 8; // The last seen Global State ID
94+
// Alignment context info for use with routers.
95+
// The client should not interpret these bytes, but only forward bytes
96+
// received from RpcResponseHeaderProto.routerFederatedState.
97+
optional bytes routerFederatedState = 9;
9498
}
9599

96100

@@ -157,6 +161,10 @@ message RpcResponseHeaderProto {
157161
optional bytes clientId = 7; // Globally unique client ID
158162
optional sint32 retryCount = 8 [default = -1];
159163
optional int64 stateId = 9; // The last written Global State ID
164+
// Alignment context info for use with routers.
165+
// The client should not interpret these bytes, but only
166+
// forward them to the router using RpcRequestHeaderProto.routerFederatedState.
167+
optional bytes routerFederatedState = 10;
160168
}
161169

162170
message RpcSaslProto {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,3 +306,16 @@ message GetDisabledNameservicesRequestProto {
306306
message GetDisabledNameservicesResponseProto {
307307
repeated string nameServiceIds = 1;
308308
}
309+
310+
/////////////////////////////////////////////////
311+
// Alignment state for namespaces.
312+
/////////////////////////////////////////////////
313+
314+
/**
315+
* Clients should receive this message in RPC responses and forward it
316+
* in RPC requests without interpreting it. It should be encoded
317+
* as an obscure byte array when being sent to clients.
318+
*/
319+
message RouterFederatedStateProto {
320+
map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces.
321+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import java.io.IOException;
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
import org.apache.hadoop.ipc.AlignmentContext;
24+
import org.apache.hadoop.ipc.ClientId;
25+
import org.apache.hadoop.ipc.RPC;
26+
import org.apache.hadoop.ipc.RpcConstants;
27+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
28+
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
29+
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
30+
import org.apache.hadoop.util.ProtoUtil;
31+
import org.junit.Test;
32+
33+
import static org.junit.Assert.*;
34+
35+
36+
public class TestRouterFederatedState {
37+
38+
@Test
39+
public void testRpcRouterFederatedState() throws InvalidProtocolBufferException {
40+
byte[] uuid = ClientId.getClientId();
41+
Map<String, Long> expectedStateIds = new HashMap<String, Long>() {{
42+
put("namespace1", 11L );
43+
put("namespace2", 22L);
44+
}};
45+
46+
AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds);
47+
48+
RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
49+
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0,
50+
RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext);
51+
52+
Map<String, Long> stateIdsFromHeader =
53+
RouterFederatedStateProto.parseFrom(
54+
header.getRouterFederatedState().toByteArray()
55+
).getNamespaceStateIdsMap();
56+
57+
assertEquals(expectedStateIds, stateIdsFromHeader);
58+
}
59+
60+
private static class AlignmentContextWithRouterState implements AlignmentContext {
61+
62+
Map<String, Long> routerFederatedState;
63+
64+
public AlignmentContextWithRouterState(Map<String, Long> namespaceStates) {
65+
this.routerFederatedState = namespaceStates;
66+
}
67+
68+
@Override
69+
public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
70+
RouterFederatedStateProto fedState = RouterFederatedStateProto
71+
.newBuilder()
72+
.putAllNamespaceStateIds(routerFederatedState)
73+
.build();
74+
75+
header.setRouterFederatedState(fedState.toByteString());
76+
}
77+
78+
@Override
79+
public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {}
80+
81+
@Override
82+
public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {}
83+
84+
@Override
85+
public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException {
86+
return 0;
87+
}
88+
89+
@Override
90+
public long getLastSeenStateId() {
91+
return 0;
92+
}
93+
94+
@Override
95+
public boolean isCoordinatedCall(String protocolName, String method) {
96+
return false;
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)