Skip to content

Commit 94b884a

Browse files
authored
HDFS-16262. Async refresh of cached locations in DFSInputStream (#3527)
1 parent 43153e8 commit 94b884a

File tree

10 files changed

+915
-265
lines changed

10 files changed

+915
-265
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public class ClientContext {
6969
*/
7070
private final String name;
7171

72+
/**
73+
* The client conf used to initialize context.
74+
*/
75+
private final DfsClientConf dfsClientConf;
76+
7277
/**
7378
* String representation of the configuration.
7479
*/
@@ -130,6 +135,17 @@ public class ClientContext {
130135
*/
131136
private volatile DeadNodeDetector deadNodeDetector = null;
132137

138+
/**
139+
* The switch for the {@link LocatedBlocksRefresher}.
140+
*/
141+
private final boolean locatedBlocksRefresherEnabled;
142+
143+
/**
144+
* Periodically refresh the {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing
145+
* registered {@link DFSInputStream}s, to take advantage of changes in block placement.
146+
*/
147+
private volatile LocatedBlocksRefresher locatedBlocksRefresher = null;
148+
133149
/**
134150
* Count the reference of ClientContext.
135151
*/
@@ -146,6 +162,7 @@ private ClientContext(String name, DfsClientConf conf,
146162
final ShortCircuitConf scConf = conf.getShortCircuitConf();
147163

148164
this.name = name;
165+
this.dfsClientConf = conf;
149166
this.confString = scConf.confAsString();
150167
this.clientShortCircuitNum = conf.getClientShortCircuitNum();
151168
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
@@ -164,6 +181,7 @@ private ClientContext(String name, DfsClientConf conf,
164181
this.byteArrayManager = ByteArrayManager.newInstance(
165182
conf.getWriteByteArrayManagerConf());
166183
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
184+
this.locatedBlocksRefresherEnabled = conf.isLocatedBlocksRefresherEnabled();
167185
initTopologyResolution(config);
168186
}
169187

@@ -301,6 +319,21 @@ public DeadNodeDetector getDeadNodeDetector() {
301319
return deadNodeDetector;
302320
}
303321

322+
/**
323+
* If true, LocatedBlocksRefresher will be periodically refreshing LocatedBlocks
324+
* of registered DFSInputStreams.
325+
*/
326+
public boolean isLocatedBlocksRefresherEnabled() {
327+
return locatedBlocksRefresherEnabled;
328+
}
329+
330+
/**
331+
* Obtain LocatedBlocksRefresher of the current client.
332+
*/
333+
public LocatedBlocksRefresher getLocatedBlocksRefresher() {
334+
return locatedBlocksRefresher;
335+
}
336+
304337
/**
305338
* Increment the counter. Start the dead node detector thread if there is no
306339
* reference.
@@ -311,6 +344,10 @@ synchronized void reference() {
311344
deadNodeDetector = new DeadNodeDetector(name, configuration);
312345
deadNodeDetector.start();
313346
}
347+
if (locatedBlocksRefresherEnabled && locatedBlocksRefresher == null) {
348+
locatedBlocksRefresher = new LocatedBlocksRefresher(name, configuration, dfsClientConf);
349+
locatedBlocksRefresher.start();
350+
}
314351
}
315352

316353
/**
@@ -324,5 +361,10 @@ synchronized void unreference() {
324361
deadNodeDetector.shutdown();
325362
deadNodeDetector = null;
326363
}
364+
365+
if (counter == 0 && locatedBlocksRefresherEnabled && locatedBlocksRefresher != null) {
366+
locatedBlocksRefresher.shutdown();
367+
locatedBlocksRefresher = null;
368+
}
327369
}
328370
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -863,7 +863,7 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
863863
}
864864

865865
public long getRefreshReadBlkLocationsInterval() {
866-
return dfsClientConf.getRefreshReadBlockLocationsMS();
866+
return dfsClientConf.getLocatedBlocksRefresherInterval();
867867
}
868868

869869
/**
@@ -3459,4 +3459,36 @@ private boolean isDeadNodeDetectionEnabled() {
34593459
public DeadNodeDetector getDeadNodeDetector() {
34603460
return clientContext.getDeadNodeDetector();
34613461
}
3462+
3463+
/**
3464+
* Obtain LocatedBlocksRefresher of the current client.
3465+
*/
3466+
public LocatedBlocksRefresher getLocatedBlockRefresher() {
3467+
return clientContext.getLocatedBlocksRefresher();
3468+
}
3469+
3470+
/**
3471+
* Adds the {@link DFSInputStream} to the {@link LocatedBlocksRefresher}, so that
3472+
* the underlying {@link LocatedBlocks} is periodically refreshed.
3473+
*/
3474+
public void addLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
3475+
if (isLocatedBlocksRefresherEnabled()) {
3476+
clientContext.getLocatedBlocksRefresher().addInputStream(dfsInputStream);
3477+
}
3478+
}
3479+
3480+
/**
3481+
* Removes the {@link DFSInputStream} from the {@link LocatedBlocksRefresher}, so that
3482+
* the underlying {@link LocatedBlocks} is no longer periodically refreshed.
3483+
* @param dfsInputStream
3484+
*/
3485+
public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
3486+
if (isLocatedBlocksRefresherEnabled()) {
3487+
clientContext.getLocatedBlocksRefresher().removeInputStream(dfsInputStream);
3488+
}
3489+
}
3490+
3491+
private boolean isLocatedBlocksRefresherEnabled() {
3492+
return clientContext.isLocatedBlocksRefresherEnabled();
3493+
}
34623494
}

0 commit comments

Comments
 (0)