Skip to content

Commit 3142f6b

Browse files
committed
HDDS-1586. Allow Ozone RPC client to read with topology awareness.
1 parent 15d82fc commit 3142f6b

File tree

38 files changed

+814
-106
lines changed

38 files changed

+814
-106
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.hadoop.hdds.HddsUtils;
2525
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
26+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2627
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
2728
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
2829
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@@ -64,7 +65,7 @@
6465
import java.util.concurrent.TimeoutException;
6566

6667
/**
67-
* A Client for the storageContainer protocol.
68+
* A Client for the storageContainer protocol for read object data.
6869
*/
6970
public class XceiverClientGrpc extends XceiverClientSpi {
7071
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
@@ -76,6 +77,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
7677
private final Semaphore semaphore;
7778
private boolean closed = false;
7879
private SecurityConfig secConfig;
80+
private final boolean topologyAwareRead;
7981

8082
/**
8183
* Constructs a client that can communicate with the Container framework on
@@ -96,16 +98,20 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
9698
this.metrics = XceiverClientManager.getXceiverClientMetrics();
9799
this.channels = new HashMap<>();
98100
this.asyncStubs = new HashMap<>();
101+
this.topologyAwareRead = Boolean.parseBoolean(config.get(
102+
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
103+
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
99104
}
100105

101106
/**
102107
* To be used when grpc token is not enabled.
103108
*/
104109
@Override
105110
public void connect() throws Exception {
106-
// leader by default is the 1st datanode in the datanode list of pipleline
107-
DatanodeDetails dn = this.pipeline.getFirstNode();
108-
// just make a connection to the 1st datanode at the beginning
111+
// connect to the closest node, if closest node doesn't exist, delegate to
112+
// first node, which is usually the leader in the pipeline.
113+
DatanodeDetails dn = this.pipeline.getClosestNode();
114+
// just make a connection to the picked datanode at the beginning
109115
connectToDatanode(dn, null);
110116
}
111117

@@ -114,9 +120,11 @@ public void connect() throws Exception {
114120
*/
115121
@Override
116122
public void connect(String encodedToken) throws Exception {
117-
// leader by default is the 1st datanode in the datanode list of pipleline
118-
DatanodeDetails dn = this.pipeline.getFirstNode();
119-
// just make a connection to the 1st datanode at the beginning
123+
// connect to the closest node, if closest node doesn't exist, delegate to
124+
// first node, which is usually the leader in the pipeline.
125+
DatanodeDetails dn;
126+
dn = this.pipeline.getClosestNode();
127+
// just make a connection to the picked datanode at the beginning
120128
connectToDatanode(dn, encodedToken);
121129
}
122130

@@ -132,7 +140,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
132140

133141
// Add credential context to the client call
134142
String userName = UserGroupInformation.getCurrentUser().getShortUserName();
135-
LOG.debug("Connecting to server Port : " + dn.getIpAddress());
143+
LOG.debug("Nodes in pipeline : {}", pipeline.getNodes().toString());
144+
LOG.debug("Connecting to server : {}", dn.getIpAddress());
136145
NettyChannelBuilder channelBuilder =
137146
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
138147
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
@@ -252,7 +261,15 @@ private XceiverClientReply sendCommandWithRetry(
252261
// TODO: cache the correct leader info in here, so that any subsequent calls
253262
// should first go to leader
254263
XceiverClientReply reply = new XceiverClientReply(null);
255-
for (DatanodeDetails dn : pipeline.getNodes()) {
264+
List<DatanodeDetails> datanodeList;
265+
if ((request.getCmdType() == ContainerProtos.Type.ReadChunk ||
266+
request.getCmdType() == ContainerProtos.Type.GetSmallFile) &&
267+
topologyAwareRead) {
268+
datanodeList = pipeline.getNodesInOrder();
269+
} else {
270+
datanodeList = pipeline.getNodes();
271+
}
272+
for (DatanodeDetails dn : datanodeList) {
256273
try {
257274
LOG.debug("Executing command " + request + " on datanode " + dn);
258275
// In case the command gets retried on a 2nd datanode,
@@ -349,6 +366,8 @@ private XceiverClientReply sendCommandAsync(
349366
reconnect(dn, token);
350367
}
351368

369+
LOG.debug("Send command {} to datanode {}", request.getCmdType().toString(),
370+
dn.getNetworkFullPath());
352371
final CompletableFuture<ContainerCommandResponseProto> replyFuture =
353372
new CompletableFuture<>();
354373
semaphore.acquire();

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
3030
import org.apache.hadoop.ozone.OzoneSecurityUtil;
3131
import org.apache.hadoop.security.UserGroupInformation;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3234

3335
import java.io.Closeable;
3436
import java.io.IOException;
@@ -57,14 +59,16 @@
5759
* not being used for a period of time.
5860
*/
5961
public class XceiverClientManager implements Closeable {
60-
62+
private static final Logger LOG =
63+
LoggerFactory.getLogger(XceiverClientManager.class);
6164
//TODO : change this to SCM configuration class
6265
private final Configuration conf;
6366
private final Cache<String, XceiverClientSpi> clientCache;
6467
private final boolean useRatis;
6568

6669
private static XceiverClientMetrics metrics;
6770
private boolean isSecurityEnabled;
71+
private final boolean topologyAwareRead;
6872
/**
6973
* Creates a new XceiverClientManager.
7074
*
@@ -98,6 +102,9 @@ public void onRemoval(
98102
}
99103
}
100104
}).build();
105+
topologyAwareRead = Boolean.parseBoolean(conf.get(
106+
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
107+
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
101108
}
102109

103110
@VisibleForTesting
@@ -118,12 +125,32 @@ public Cache<String, XceiverClientSpi> getClientCache() {
118125
*/
119126
public XceiverClientSpi acquireClient(Pipeline pipeline)
120127
throws IOException {
128+
return acquireClient(pipeline, false);
129+
}
130+
131+
/**
132+
* Acquires a XceiverClientSpi connected to a container for read.
133+
*
134+
* If there is already a cached XceiverClientSpi, simply return
135+
* the cached otherwise create a new one.
136+
*
137+
* @param pipeline the container pipeline for the client connection
138+
* @return XceiverClientSpi connected to a container
139+
* @throws IOException if a XceiverClientSpi cannot be acquired
140+
*/
141+
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
142+
throws IOException {
143+
return acquireClient(pipeline, true);
144+
}
145+
146+
private XceiverClientSpi acquireClient(Pipeline pipeline, boolean read)
147+
throws IOException {
121148
Preconditions.checkNotNull(pipeline);
122149
Preconditions.checkArgument(pipeline.getNodes() != null);
123150
Preconditions.checkArgument(!pipeline.getNodes().isEmpty());
124151

125152
synchronized (clientCache) {
126-
XceiverClientSpi info = getClient(pipeline);
153+
XceiverClientSpi info = getClient(pipeline, read);
127154
info.incrementReference();
128155
return info;
129156
}
@@ -136,12 +163,28 @@ public XceiverClientSpi acquireClient(Pipeline pipeline)
136163
* @param invalidateClient if true, invalidates the client in cache
137164
*/
138165
public void releaseClient(XceiverClientSpi client, boolean invalidateClient) {
166+
releaseClient(client, invalidateClient, false);
167+
}
168+
169+
/**
170+
* Releases a read XceiverClientSpi after use.
171+
*
172+
* @param client client to release
173+
* @param invalidateClient if true, invalidates the client in cache
174+
*/
175+
public void releaseClientForReadData(XceiverClientSpi client,
176+
boolean invalidateClient) {
177+
releaseClient(client, invalidateClient, true);
178+
}
179+
180+
private void releaseClient(XceiverClientSpi client, boolean invalidateClient,
181+
boolean read) {
139182
Preconditions.checkNotNull(client);
140183
synchronized (clientCache) {
141184
client.decrementReference();
142185
if (invalidateClient) {
143186
Pipeline pipeline = client.getPipeline();
144-
String key = pipeline.getId().getId().toString() + pipeline.getType();
187+
String key = getPipelineCacheKey(pipeline, read);
145188
XceiverClientSpi cachedClient = clientCache.getIfPresent(key);
146189
if (cachedClient == client) {
147190
clientCache.invalidate(key);
@@ -150,11 +193,13 @@ public void releaseClient(XceiverClientSpi client, boolean invalidateClient) {
150193
}
151194
}
152195

153-
private XceiverClientSpi getClient(Pipeline pipeline)
196+
private XceiverClientSpi getClient(Pipeline pipeline, boolean forRead)
154197
throws IOException {
155198
HddsProtos.ReplicationType type = pipeline.getType();
156199
try {
157-
String key = pipeline.getId().getId().toString() + type;
200+
// create different client for read different pipeline node based on
201+
// network topology
202+
String key = getPipelineCacheKey(pipeline, forRead);
158203
// Append user short name to key to prevent a different user
159204
// from using same instance of xceiverClient.
160205
key = isSecurityEnabled ?
@@ -184,6 +229,19 @@ public XceiverClientSpi call() throws Exception {
184229
}
185230
}
186231

232+
private String getPipelineCacheKey(Pipeline pipeline, boolean forRead) {
233+
String key = pipeline.getId().getId().toString() + pipeline.getType();
234+
if (topologyAwareRead && forRead) {
235+
try {
236+
key += pipeline.getClosestNode().getHostName();
237+
} catch (IOException e) {
238+
LOG.error("Failed to get closest node to create pipeline cache key:" +
239+
e.getMessage());
240+
}
241+
}
242+
return key;
243+
}
244+
187245
/**
188246
* Close and remove all the cached clients.
189247
*/

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
151151
pipeline = Pipeline.newBuilder(pipeline)
152152
.setType(HddsProtos.ReplicationType.STAND_ALONE).build();
153153
}
154-
xceiverClient = xceiverClientManager.acquireClient(pipeline);
154+
xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);
155155
boolean success = false;
156156
List<ChunkInfo> chunks;
157157
try {
@@ -170,7 +170,7 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
170170
success = true;
171171
} finally {
172172
if (!success) {
173-
xceiverClientManager.releaseClient(xceiverClient, false);
173+
xceiverClientManager.releaseClientForReadData(xceiverClient, false);
174174
}
175175
}
176176

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ protected DatanodeDetails(DatanodeDetails datanodeDetails) {
7676
this.ipAddress = datanodeDetails.ipAddress;
7777
this.hostName = datanodeDetails.hostName;
7878
this.ports = datanodeDetails.ports;
79+
this.setNetworkName(datanodeDetails.getNetworkName());
7980
}
8081

8182
/**
@@ -192,6 +193,12 @@ public static DatanodeDetails getFromProtoBuf(
192193
builder.addPort(newPort(
193194
Port.Name.valueOf(port.getName().toUpperCase()), port.getValue()));
194195
}
196+
if (datanodeDetailsProto.hasNetworkLocation()) {
197+
builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
198+
}
199+
if (datanodeDetailsProto.hasNetworkName()) {
200+
builder.setNetworkName(datanodeDetailsProto.getNetworkName());
201+
}
195202
return builder.build();
196203
}
197204

@@ -213,6 +220,7 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() {
213220
builder.setCertSerialId(certSerialId);
214221
}
215222
builder.setNetworkLocation(getNetworkLocation());
223+
builder.setNetworkName(getNetworkName());
216224

217225
for (Port port : ports) {
218226
builder.addPorts(HddsProtos.Port.newBuilder()
@@ -268,6 +276,7 @@ public static final class Builder {
268276
private String id;
269277
private String ipAddress;
270278
private String hostName;
279+
private String networkName;
271280
private String networkLocation;
272281
private List<Port> ports;
273282
private String certSerialId;
@@ -313,6 +322,17 @@ public Builder setHostName(String host) {
313322
return this;
314323
}
315324

325+
/**
326+
* Sets the network name of DataNode.
327+
*
328+
* @param name network name
329+
* @return DatanodeDetails.Builder
330+
*/
331+
public Builder setNetworkName(String name) {
332+
this.networkName = name;
333+
return this;
334+
}
335+
316336
/**
317337
* Sets the network location of DataNode.
318338
*
@@ -358,8 +378,12 @@ public DatanodeDetails build() {
358378
if (networkLocation == null) {
359379
networkLocation = NetConstants.DEFAULT_RACK;
360380
}
361-
return new DatanodeDetails(id, ipAddress, hostName, networkLocation,
362-
ports, certSerialId);
381+
DatanodeDetails dn = new DatanodeDetails(id, ipAddress, hostName,
382+
networkLocation, ports, certSerialId);
383+
if (networkName != null) {
384+
dn.setNetworkName(networkName);
385+
}
386+
return dn;
363387
}
364388
}
365389

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,10 @@ public final class ScmConfigKeys {
368368
"ozone.scm.network.topology.schema.file";
369369
public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT =
370370
"network-topology-default.xml";
371+
public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED =
372+
"dfs.network.topology.aware.read.enable";
373+
public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT =
374+
"true";
371375

372376
public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled";
373377
public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true;

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdds.scm.net;
1919

2020
import java.util.Collection;
21+
import java.util.List;
2122

2223
/**
2324
* The interface defines a network topology.
@@ -246,5 +247,6 @@ Node getNode(int leafIndex, String scope, String excludedScope,
246247
* @param nodes Available replicas with the requested data
247248
* @param activeLen Number of active nodes at the front of the array
248249
*/
249-
void sortByDistanceCost(Node reader, Node[] nodes, int activeLen);
250+
List<? extends Node> sortByDistanceCost(Node reader,
251+
List<? extends Node> nodes, int activeLen);
250252
}

0 commit comments

Comments
 (0)