Skip to content

Commit 57ecde4

Browse files
authored
Merge branch 'apache:trunk' into HADOOP-18302
2 parents 839dc70 + 4890ba5 commit 57ecde4

File tree

67 files changed

+2155
-364
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+2155
-364
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,16 @@ default int maxReadSizeForVectorReads() {
114114
* As a result of the call, each range will have FileRange.setData(CompletableFuture)
115115
* called with a future that when complete will have a ByteBuffer with the
116116
* data from the file's range.
117+
* <p>
118+
* The position returned by getPos() after readVectored() is undefined.
119+
* </p>
120+
* <p>
121+
* If a file is changed while the readVectored() operation is in progress, the output is
122+
* undefined. Some ranges may have old data, some may have new and some may have both.
123+
* </p>
124+
* <p>
125+
* While a readVectored() operation is in progress, normal read api calls may block.
126+
* </p>
117127
* @param ranges the byte ranges to read
118128
* @param allocate the function to allocate ByteBuffer
119129
* @throws IOException any IOE.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public interface AlignmentContext {
4646
void updateResponseState(RpcResponseHeaderProto.Builder header);
4747

4848
/**
49-
* This is the intended client method call to implement to recieve state info
49+
* This is the intended client method call to implement to receive state info
5050
* during RPC response processing.
5151
*
5252
* @param header The RPC response header.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
8080
@Override
8181
@SuppressWarnings("unchecked")
8282
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
83-
ConnectionId connId, Configuration conf, SocketFactory factory)
84-
throws IOException {
85-
final Invoker invoker = new Invoker(protocol, connId, conf, factory);
83+
ConnectionId connId, Configuration conf, SocketFactory factory,
84+
AlignmentContext alignmentContext) throws IOException {
85+
final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
8686
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
8787
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
8888
}
@@ -126,7 +126,7 @@ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
126126
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
127127
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
128128
new Class[] { protocol }, new Invoker(protocol, connId, conf,
129-
factory)), false);
129+
factory, null)), false);
130130
}
131131

132132
protected static class Invoker implements RpcInvocationHandler {
@@ -147,9 +147,8 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
147147
throws IOException {
148148
this(protocol, Client.ConnectionId.getConnectionId(
149149
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
150-
conf, factory);
150+
conf, factory, alignmentContext);
151151
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
152-
this.alignmentContext = alignmentContext;
153152
}
154153

155154
/**
@@ -158,14 +157,16 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
158157
* @param connId input connId.
159158
* @param conf input Configuration.
160159
* @param factory input factory.
160+
* @param alignmentContext Alignment context
161161
*/
162162
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
163-
Configuration conf, SocketFactory factory) {
163+
Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
164164
this.remoteId = connId;
165165
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
166166
this.protocolName = RPC.getProtocolName(protocol);
167167
this.clientProtocolVersion = RPC
168168
.getProtocolVersion(protocol);
169+
this.alignmentContext = alignmentContext;
169170
}
170171

171172
private RequestHeaderProto constructRpcRequestHeader(Method method) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ public <T> ProtocolProxy<T> getProxy(
103103
@Override
104104
@SuppressWarnings("unchecked")
105105
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
106-
ConnectionId connId, Configuration conf, SocketFactory factory)
107-
throws IOException {
108-
final Invoker invoker = new Invoker(protocol, connId, conf, factory);
106+
ConnectionId connId, Configuration conf, SocketFactory factory,
107+
AlignmentContext alignmentContext) throws IOException {
108+
final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
109109
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
110110
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
111111
}
@@ -133,7 +133,7 @@ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
133133
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
134134
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
135135
new Class[]{protocol}, new Invoker(protocol, connId, conf,
136-
factory)), false);
136+
factory, null)), false);
137137
}
138138

139139
protected static class Invoker implements RpcInvocationHandler {
@@ -154,9 +154,8 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
154154
throws IOException {
155155
this(protocol, Client.ConnectionId.getConnectionId(
156156
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
157-
conf, factory);
157+
conf, factory, alignmentContext);
158158
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
159-
this.alignmentContext = alignmentContext;
160159
}
161160

162161
/**
@@ -166,14 +165,16 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
166165
* @param connId input connId.
167166
* @param conf input Configuration.
168167
* @param factory input factory.
168+
* @param alignmentContext Alignment context
169169
*/
170170
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
171-
Configuration conf, SocketFactory factory) {
171+
Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
172172
this.remoteId = connId;
173173
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
174174
this.protocolName = RPC.getProtocolName(protocol);
175175
this.clientProtocolVersion = RPC
176176
.getProtocolVersion(protocol);
177+
this.alignmentContext = alignmentContext;
177178
}
178179

179180
private RequestHeaderProto constructRpcRequestHeader(Method method) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,11 +558,32 @@ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
558558
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
559559
long clientVersion, ConnectionId connId, Configuration conf,
560560
SocketFactory factory) throws IOException {
561+
return getProtocolProxy(protocol, clientVersion, connId, conf,
562+
factory, null);
563+
}
564+
565+
/**
566+
* Get a protocol proxy that contains a proxy connection to a remote server
567+
* and a set of methods that are supported by the server.
568+
*
569+
* @param <T> Generics Type T
570+
* @param protocol protocol class
571+
* @param clientVersion client's version
572+
* @param connId client connection identifier
573+
* @param conf configuration
574+
* @param factory socket factory
575+
* @param alignmentContext StateID alignment context
576+
* @return the protocol proxy
577+
* @throws IOException if the far end through a RemoteException
578+
*/
579+
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
580+
long clientVersion, ConnectionId connId, Configuration conf,
581+
SocketFactory factory, AlignmentContext alignmentContext) throws IOException {
561582
if (UserGroupInformation.isSecurityEnabled()) {
562583
SaslRpcServer.init(conf);
563584
}
564585
return getProtocolEngine(protocol, conf).getProxy(
565-
protocol, clientVersion, connId, conf, factory);
586+
protocol, clientVersion, connId, conf, factory, alignmentContext);
566587
}
567588

568589
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,13 @@ <T> ProtocolProxy<T> getProxy(Class<T> protocol,
6666
* @param connId input ConnectionId.
6767
* @param conf input Configuration.
6868
* @param factory input factory.
69+
* @param alignmentContext Alignment context
6970
* @throws IOException raised on errors performing I/O.
7071
* @return ProtocolProxy.
7172
*/
7273
<T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
73-
Client.ConnectionId connId, Configuration conf, SocketFactory factory)
74+
Client.ConnectionId connId, Configuration conf, SocketFactory factory,
75+
AlignmentContext alignmentContext)
7476
throws IOException;
7577

7678
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ public static class Call implements Schedulable,
925925
private volatile String detailedMetricsName = "";
926926
final int callId; // the client's call id
927927
final int retryCount; // the retry count of the call
928-
long timestampNanos; // time the call was received
928+
private final long timestampNanos; // time the call was received
929929
long responseTimestampNanos; // time the call was served
930930
private AtomicInteger responseWaitCount = new AtomicInteger(1);
931931
final RPC.RpcKind rpcKind;
@@ -1107,6 +1107,10 @@ public void setDeferredResponse(Writable response) {
11071107

11081108
public void setDeferredError(Throwable t) {
11091109
}
1110+
1111+
public long getTimestampNanos() {
1112+
return timestampNanos;
1113+
}
11101114
}
11111115

11121116
/** A RPC extended call queued for handling. */
@@ -1188,7 +1192,7 @@ public Void run() throws Exception {
11881192

11891193
try {
11901194
value = call(
1191-
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
1195+
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
11921196
} catch (Throwable e) {
11931197
populateResponseParamsOnError(e, responseParams);
11941198
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,16 +315,18 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
315315
* @param connId input ConnectionId.
316316
* @param conf input Configuration.
317317
* @param factory input factory.
318+
* @param alignmentContext Alignment context
318319
* @throws IOException raised on errors performing I/O.
319320
* @return ProtocolProxy.
320321
*/
321322
@Override
322323
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
323-
Client.ConnectionId connId, Configuration conf, SocketFactory factory)
324+
Client.ConnectionId connId, Configuration conf, SocketFactory factory,
325+
AlignmentContext alignmentContext)
324326
throws IOException {
325327
return getProxy(protocol, clientVersion, connId.getAddress(),
326328
connId.getTicket(), conf, factory, connId.getRpcTimeout(),
327-
connId.getRetryPolicy(), null, null);
329+
connId.getRetryPolicy(), null, alignmentContext);
328330
}
329331

330332
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,10 @@ String getPassword(Configuration conf, String alias, String defaultPass) {
319319
*/
320320
@Override
321321
public synchronized void destroy() {
322-
if (trustManager != null) {
322+
if (fileMonitoringTimer != null) {
323323
fileMonitoringTimer.cancel();
324+
}
325+
if (trustManager != null) {
324326
trustManager = null;
325327
keyManagers = null;
326328
trustManagers = null;

hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
9191
optional RPCTraceInfoProto traceInfo = 6; // tracing info
9292
optional RPCCallerContextProto callerContext = 7; // call context
9393
optional int64 stateId = 8; // The last seen Global State ID
94+
// Alignment context info for use with routers.
95+
// The client should not interpret these bytes, but only forward bytes
96+
// received from RpcResponseHeaderProto.routerFederatedState.
97+
optional bytes routerFederatedState = 9;
9498
}
9599

96100

@@ -157,6 +161,10 @@ message RpcResponseHeaderProto {
157161
optional bytes clientId = 7; // Globally unique client ID
158162
optional sint32 retryCount = 8 [default = -1];
159163
optional int64 stateId = 9; // The last written Global State ID
164+
// Alignment context info for use with routers.
165+
// The client should not interpret these bytes, but only
166+
// forward them to the router using RpcRequestHeaderProto.routerFederatedState.
167+
optional bytes routerFederatedState = 10;
160168
}
161169

162170
message RpcSaslProto {

0 commit comments

Comments
 (0)