Skip to content

Commit 837cc9b

Browse files
Merge branch 'apache:trunk' into YARN-11093
2 parents a5c1e3c + 6bd2444 commit 837cc9b

File tree

9 files changed

+93
-49
lines changed

9 files changed

+93
-49
lines changed

hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.hadoop.classification.InterfaceAudience;
2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.hadoop.fs.Path;
23-
import org.apache.log4j.PropertyConfigurator;
23+
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

@@ -104,8 +104,6 @@ public class KMSConfiguration {
104104

105105
public static final boolean KEY_AUTHORIZATION_ENABLE_DEFAULT = true;
106106

107-
private static final String LOG4J_PROPERTIES = "kms-log4j.properties";
108-
109107
static {
110108
Configuration.addDefaultResource(KMS_DEFAULT_XML);
111109
Configuration.addDefaultResource(KMS_SITE_XML);
@@ -163,31 +161,20 @@ public static boolean isACLsFileNewer(long time) {
163161
return newer;
164162
}
165163

166-
public static void initLogging() {
167-
String confDir = System.getProperty(KMS_CONFIG_DIR);
168-
if (confDir == null) {
169-
throw new RuntimeException("System property '" +
170-
KMSConfiguration.KMS_CONFIG_DIR + "' not defined");
164+
/**
165+
* Validate whether "kms.config.dir" and "log4j.configuration" are defined in the System
166+
* properties. If not, abort the KMS WebServer.
167+
*/
168+
public static void validateSystemProps() {
169+
if (System.getProperty(KMS_CONFIG_DIR) == null) {
170+
String errorMsg = "System property '" + KMS_CONFIG_DIR + "' not defined";
171+
System.err.println("Aborting KMSWebServer because " + errorMsg);
172+
throw new RuntimeException(errorMsg);
171173
}
172174
if (System.getProperty("log4j.configuration") == null) {
173-
System.setProperty("log4j.defaultInitOverride", "true");
174-
boolean fromClasspath = true;
175-
File log4jConf = new File(confDir, LOG4J_PROPERTIES).getAbsoluteFile();
176-
if (log4jConf.exists()) {
177-
PropertyConfigurator.configureAndWatch(log4jConf.getPath(), 1000);
178-
fromClasspath = false;
179-
} else {
180-
ClassLoader cl = Thread.currentThread().getContextClassLoader();
181-
URL log4jUrl = cl.getResource(LOG4J_PROPERTIES);
182-
if (log4jUrl != null) {
183-
PropertyConfigurator.configure(log4jUrl);
184-
}
185-
}
186-
LOG.debug("KMS log starting");
187-
if (fromClasspath) {
188-
LOG.warn("Log4j configuration file '{}' not found", LOG4J_PROPERTIES);
189-
LOG.warn("Logging with INFO level to standard output");
190-
}
175+
String errorMsg = "System property 'log4j.configuration' not defined";
176+
System.err.println("Aborting KMSWebServer because " + errorMsg);
177+
throw new RuntimeException(errorMsg);
191178
}
192179
}
193180
}

hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public URL getKMSUrl() {
185185
}
186186

187187
public static void main(String[] args) throws Exception {
188-
KMSConfiguration.initLogging();
188+
KMSConfiguration.validateSystemProps();
189189
StringUtils.startupShutdownMessage(KMSWebServer.class, args, LOG);
190190
Configuration conf = KMSConfiguration.getKMSConf();
191191
Configuration sslConf = SSLFactory.readSSLConfiguration(conf, SSLFactory.Mode.SERVER);

hadoop-common-project/hadoop-kms/src/main/libexec/shellprofile.d/hadoop-kms.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ function hadoop_subcommand_kms
4949
"-Dkms.config.dir=${HADOOP_CONF_DIR}"
5050
hadoop_add_param HADOOP_OPTS "-Dkms.log.dir=" \
5151
"-Dkms.log.dir=${HADOOP_LOG_DIR}"
52+
hadoop_add_param HADOOP_OPTS "-Dlog4j.configuration=" \
53+
"-Dlog4j.configuration=file:${HADOOP_CONF_DIR}/kms-log4j.properties"
5254

5355
if [[ "${HADOOP_DAEMON_MODE}" == "default" ]] ||
5456
[[ "${HADOOP_DAEMON_MODE}" == "start" ]]; then

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ boolean deadNodesContain(DatanodeInfo nodeInfo) {
224224
}
225225

226226
/**
227-
* Grab the open-file info from namenode
227+
* Grab the open-file info from namenode.
228228
* @param refreshLocatedBlocks whether to re-fetch locatedblocks
229229
*/
230230
void openInfo(boolean refreshLocatedBlocks) throws IOException {
@@ -940,7 +940,8 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
940940
* @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
941941
* false.
942942
*/
943-
private DNAddrPair chooseDataNode(LocatedBlock block,
943+
@VisibleForTesting
944+
DNAddrPair chooseDataNode(LocatedBlock block,
944945
Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
945946
throws IOException {
946947
while (true) {
@@ -955,6 +956,14 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
955956
}
956957
}
957958

959+
/**
960+
* RefetchLocations should only be called when there are no active requests
961+
* to datanodes. In the hedged read case this means futures should be empty.
962+
* @param block The locatedBlock to get new datanode locations for.
963+
* @param ignoredNodes A list of ignored nodes. This list can be null and can be cleared.
964+
* @return the locatedBlock with updated datanode locations.
965+
* @throws IOException
966+
*/
958967
private LocatedBlock refetchLocations(LocatedBlock block,
959968
Collection<DatanodeInfo> ignoredNodes) throws IOException {
960969
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
@@ -999,13 +1008,24 @@ private LocatedBlock refetchLocations(LocatedBlock block,
9991008
throw new InterruptedIOException(
10001009
"Interrupted while choosing DataNode for read.");
10011010
}
1002-
clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
1011+
clearCachedNodeState(ignoredNodes);
10031012
openInfo(true);
10041013
block = refreshLocatedBlock(block);
10051014
failures++;
10061015
return block;
10071016
}
10081017

1018+
/**
1019+
* Clear both the dead nodes and the ignored nodes
1020+
* @param ignoredNodes is cleared
1021+
*/
1022+
private void clearCachedNodeState(Collection<DatanodeInfo> ignoredNodes) {
1023+
clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
1024+
if (ignoredNodes != null) {
1025+
ignoredNodes.clear();
1026+
}
1027+
}
1028+
10091029
/**
10101030
* Get the best node from which to stream the data.
10111031
* @param block LocatedBlock, containing nodes in priority order.
@@ -1337,8 +1357,12 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
13371357
} catch (InterruptedException ie) {
13381358
// Ignore and retry
13391359
}
1340-
if (refetch) {
1341-
refetchLocations(block, ignored);
1360+
// If refetch is true, then all nodes are in deadNodes or ignoredNodes.
1361+
// We should loop through all futures and remove them, so we do not
1362+
// have concurrent requests to the same node.
1363+
// Once all futures are cleared, we can clear the ignoredNodes and retry.
1364+
if (refetch && futures.isEmpty()) {
1365+
block = refetchLocations(block, ignored);
13421366
}
13431367
// We got here if exception. Ignore this node on next go around IFF
13441368
// we found a chosenNode to hedge read against.

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4174,7 +4174,7 @@ DirectoryListing getListing(String src, byte[] startAfter,
41744174
logAuditEvent(false, operationName, src);
41754175
throw e;
41764176
}
4177-
if (needLocation && isObserver()) {
4177+
if (dl != null && needLocation && isObserver()) {
41784178
for (HdfsFileStatus fs : dl.getPartialListing()) {
41794179
if (fs instanceof HdfsLocatedFileStatus) {
41804180
LocatedBlocks lbs = ((HdfsLocatedFileStatus) fs).getLocatedBlocks();

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.io.IOException;
2929
import java.net.InetSocketAddress;
30+
import java.util.ArrayList;
3031
import java.util.Arrays;
3132
import java.util.Collection;
3233
import java.util.HashMap;
@@ -35,11 +36,14 @@
3536
import org.apache.hadoop.fs.FSDataOutputStream;
3637
import org.apache.hadoop.fs.Path;
3738
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
39+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
3840
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
41+
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
3942
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
4043
import org.apache.hadoop.hdfs.server.datanode.DataNode;
4144
import org.apache.hadoop.util.Time;
4245
import org.junit.After;
46+
import org.junit.Assert;
4347
import org.junit.Before;
4448
import org.junit.Test;
4549
import org.junit.runner.RunWith;
@@ -200,6 +204,25 @@ public void testDeferredRegistrationGetAllBlocks() throws IOException {
200204
testWithRegistrationMethod(DFSInputStream::getAllBlocks);
201205
}
202206

207+
/**
208+
* If the ignoreList contains all datanodes, the ignoredList should be cleared to take advantage
209+
* of retries built into chooseDataNode. This is needed for hedged reads
210+
* @throws IOException
211+
*/
212+
@Test
213+
public void testClearIgnoreListChooseDataNode() throws IOException {
214+
final String fileName = "/test_cache_locations";
215+
filePath = createFile(fileName);
216+
217+
try (DFSInputStream fin = dfsClient.open(fileName)) {
218+
LocatedBlocks existing = fin.locatedBlocks;
219+
LocatedBlock block = existing.getLastLocatedBlock();
220+
ArrayList<DatanodeInfo> ignoreList = new ArrayList<>(Arrays.asList(block.getLocations()));
221+
Assert.assertNotNull(fin.chooseDataNode(block, ignoreList, true));
222+
Assert.assertEquals(0, ignoreList.size());
223+
}
224+
}
225+
203226
@FunctionalInterface
204227
interface ThrowingConsumer {
205228
void accept(DFSInputStream fin) throws IOException;

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
603603
input.read(0, buffer, 0, 1024);
604604
Assert.fail("Reading the block should have thrown BlockMissingException");
605605
} catch (BlockMissingException e) {
606-
assertEquals(3, input.getHedgedReadOpsLoopNumForTesting());
606+
// The result of 9 is due to 2 blocks by 4 iterations plus one because
607+
// hedgedReadOpsLoopNumForTesting is incremented at start of the loop.
608+
assertEquals(9, input.getHedgedReadOpsLoopNumForTesting());
607609
assertTrue(metrics.getHedgedReadOps() == 0);
608610
} finally {
609611
Mockito.reset(injector);

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,16 +1075,14 @@ public void testCleanShutdownOfVolume() throws Exception {
10751075
@Test(timeout = 30000)
10761076
public void testReportBadBlocks() throws Exception {
10771077
boolean threwException = false;
1078-
MiniDFSCluster cluster = null;
1079-
try {
1080-
Configuration config = new HdfsConfiguration();
1081-
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
1078+
final Configuration config = new HdfsConfiguration();
1079+
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
1080+
.numDataNodes(1).build()) {
10821081
cluster.waitActive();
10831082

10841083
Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks());
10851084
DataNode dataNode = cluster.getDataNodes().get(0);
1086-
ExtendedBlock block =
1087-
new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), 0);
1085+
ExtendedBlock block = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), 0);
10881086
try {
10891087
// Test the reportBadBlocks when the volume is null
10901088
dataNode.reportBadBlocks(block);
@@ -1101,15 +1099,11 @@ public void testReportBadBlocks() throws Exception {
11011099

11021100
block = DFSTestUtil.getFirstBlock(fs, filePath);
11031101
// Test for the overloaded method reportBadBlocks
1104-
dataNode.reportBadBlocks(block, dataNode.getFSDataset()
1105-
.getFsVolumeReferences().get(0));
1106-
Thread.sleep(3000);
1107-
BlockManagerTestUtil.updateState(cluster.getNamesystem()
1108-
.getBlockManager());
1109-
// Verify the bad block has been reported to namenode
1110-
Assert.assertEquals(1, cluster.getNamesystem().getCorruptReplicaBlocks());
1111-
} finally {
1112-
cluster.shutdown();
1102+
dataNode.reportBadBlocks(block, dataNode.getFSDataset().getFsVolumeReferences().get(0));
1103+
DataNodeTestUtils.triggerHeartbeat(dataNode);
1104+
BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager());
1105+
assertEquals("Corrupt replica blocks could not be reflected with the heartbeat", 1,
1106+
cluster.getNamesystem().getCorruptReplicaBlocks());
11131107
}
11141108
}
11151109

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
7272
import org.apache.hadoop.ipc.metrics.RpcMetrics;
7373
import org.apache.hadoop.test.GenericTestUtils;
74+
import org.apache.hadoop.test.LambdaTestUtils;
7475
import org.apache.hadoop.util.Time;
7576
import org.apache.hadoop.util.concurrent.HadoopExecutors;
7677
import org.junit.After;
@@ -652,6 +653,17 @@ public void run() {
652653
}
653654
}
654655

656+
@Test
657+
public void testGetListingForDeletedDir() throws Exception {
658+
Path path = new Path("/dir1/dir2/testFile");
659+
dfs.create(path).close();
660+
661+
assertTrue(dfs.delete(new Path("/dir1/dir2"), true));
662+
663+
LambdaTestUtils.intercept(FileNotFoundException.class,
664+
() -> dfs.listLocatedStatus(new Path("/dir1/dir2")));
665+
}
666+
655667
@Test
656668
public void testSimpleReadEmptyDirOrFile() throws IOException {
657669
// read empty dir

0 commit comments

Comments
 (0)