Skip to content

Commit 0b09554

Browse files
authored
Merge branch 'trunk' into YARN-11320
2 parents 7c516fb + 17035da commit 0b09554

File tree

17 files changed

+1317
-147
lines changed

17 files changed

+1317
-147
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,18 +1654,31 @@ synchronized void logEdit(final int length, final byte[] data) {
16541654
endTransaction(start);
16551655
}
16561656

1657+
void recoverUnclosedStreams() throws IOException {
1658+
recoverUnclosedStreams(false);
1659+
}
1660+
16571661
/**
16581662
* Run recovery on all journals to recover any unclosed segments
16591663
*/
1660-
synchronized void recoverUnclosedStreams() {
1664+
synchronized void recoverUnclosedStreams(boolean terminateOnFailure) throws IOException {
16611665
Preconditions.checkState(
16621666
state == State.BETWEEN_LOG_SEGMENTS,
16631667
"May not recover segments - wrong state: %s", state);
16641668
try {
16651669
journalSet.recoverUnfinalizedSegments();
16661670
} catch (IOException ex) {
1667-
// All journals have failed, it is handled in logSync.
1668-
// TODO: are we sure this is OK?
1671+
if (terminateOnFailure) {
1672+
final String msg = "Unable to recover log segments: "
1673+
+ "too few journals successfully recovered.";
1674+
LOG.error(msg, ex);
1675+
synchronized (journalSetLock) {
1676+
IOUtils.cleanupWithLogger(LOG, journalSet);
1677+
}
1678+
terminate(1, msg);
1679+
} else {
1680+
throw ex;
1681+
}
16691682
}
16701683
}
16711684

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1389,7 +1389,7 @@ void startActiveServices() throws IOException {
13891389
// During startup, we're already open for write during initialization.
13901390
editLog.initJournalsForWrite();
13911391
// May need to recover
1392-
editLog.recoverUnclosedStreams();
1392+
editLog.recoverUnclosedStreams(true);
13931393

13941394
LOG.info("Catching up to latest edits from old active before " +
13951395
"taking over writer role in edits logs");

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,8 @@ public Void run() throws Exception {
311311
startTime - lastLoadTimeMs);
312312
// It is already under the name system lock and the checkpointer
313313
// thread is already stopped. No need to acquire any other lock.
314-
editsTailed = doTailEdits();
314+
// HDFS-16689. Disable inProgress to use the streaming mechanism
315+
editsTailed = doTailEdits(false);
315316
} catch (InterruptedException e) {
316317
throw new IOException(e);
317318
} finally {
@@ -323,9 +324,13 @@ public Void run() throws Exception {
323324
}
324325
});
325326
}
326-
327+
327328
@VisibleForTesting
328329
public long doTailEdits() throws IOException, InterruptedException {
330+
return doTailEdits(inProgressOk);
331+
}
332+
333+
private long doTailEdits(boolean enableInProgress) throws IOException, InterruptedException {
329334
Collection<EditLogInputStream> streams;
330335
FSImage image = namesystem.getFSImage();
331336

@@ -334,7 +339,7 @@ public long doTailEdits() throws IOException, InterruptedException {
334339
long startTime = timer.monotonicNow();
335340
try {
336341
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
337-
null, inProgressOk, true);
342+
null, enableInProgress, true);
338343
} catch (IOException ioe) {
339344
// This is acceptable. If we try to tail edits in the middle of an edits
340345
// log roll, i.e. the last one has been finalized but the new inprogress

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.hadoop.ipc.RemoteException;
3434
import org.apache.hadoop.test.GenericTestUtils;
3535
import org.apache.hadoop.util.ExitUtil;
36-
import org.apache.hadoop.util.ExitUtil.ExitException;
3736
import org.junit.After;
3837
import org.junit.Before;
3938
import org.junit.Test;
@@ -197,10 +196,9 @@ public void testMismatchedNNIsRejected() throws Exception {
197196
.manageNameDfsDirs(false).format(false).checkExitOnShutdown(false)
198197
.build();
199198
fail("New NN with different namespace should have been rejected");
200-
} catch (ExitException ee) {
199+
} catch (IOException ioe) {
201200
GenericTestUtils.assertExceptionContains(
202-
"Unable to start log segment 1: too few journals", ee);
203-
assertTrue("Didn't terminate properly ", ExitUtil.terminateCalled());
201+
"recoverUnfinalizedSegments failed for too many journals", ioe);
204202
}
205203
}
206204
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.qjournal.client;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
22+
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
23+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
24+
import org.mockito.Mockito;
25+
import org.mockito.stubbing.Answer;
26+
27+
import java.io.IOException;
28+
import java.net.InetSocketAddress;
29+
import java.net.URI;
30+
import java.util.List;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Semaphore;
33+
34+
/**
35+
* One Util class to mock QJM for some UTs not in this package.
36+
*/
37+
public final class SpyQJournalUtil {
38+
39+
private SpyQJournalUtil() {
40+
}
41+
42+
/**
43+
* Mock a QuorumJournalManager with input uri, nsInfo and namServiceId.
44+
* @param conf input configuration.
45+
* @param uri input uri.
46+
* @param nsInfo input nameservice info.
47+
* @param nameServiceId input nameservice Id.
48+
* @return one mocked QuorumJournalManager.
49+
* @throws IOException throw IOException.
50+
*/
51+
public static QuorumJournalManager createSpyingQJM(Configuration conf,
52+
URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
53+
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
54+
@Override
55+
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
56+
String journalId, String nameServiceId, InetSocketAddress addr) {
57+
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
58+
nameServiceId, addr) {
59+
protected ExecutorService createSingleThreadExecutor() {
60+
// Don't parallelize calls to the quorum in the tests.
61+
// This makes the tests more deterministic.
62+
return new DirectExecutorService();
63+
}
64+
};
65+
return Mockito.spy(logger);
66+
}
67+
};
68+
return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId, spyFactory);
69+
}
70+
71+
/**
72+
* Mock Journals with different response for getJournaledEdits rpc with the input startTxid.
73+
* 1. First journal with one empty response.
74+
* 2. Second journal with one normal response.
75+
* 3. Third journal with one slow response.
76+
* @param manager input QuorumJournalManager.
77+
* @param startTxid input start txid.
78+
*/
79+
public static void mockJNWithEmptyOrSlowResponse(QuorumJournalManager manager, long startTxid) {
80+
List<AsyncLogger> spies = manager.getLoggerSetForTests().getLoggersForTests();
81+
Semaphore semaphore = new Semaphore(0);
82+
83+
// Mock JN0 return an empty response.
84+
Mockito.doAnswer(invocation -> {
85+
semaphore.release();
86+
return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
87+
}).when(spies.get(0))
88+
.getJournaledEdits(startTxid, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
89+
90+
// Mock JN1 return a normal response.
91+
spyGetJournaledEdits(spies, 1, startTxid, () -> semaphore.release(1));
92+
93+
// Mock JN2 return a slow response
94+
spyGetJournaledEdits(spies, 2, startTxid, () -> semaphore.acquireUninterruptibly(2));
95+
}
96+
97+
public static void spyGetJournaledEdits(List<AsyncLogger> spies,
98+
int jnSpyIdx, long fromTxId, Runnable preHook) {
99+
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
100+
preHook.run();
101+
@SuppressWarnings("unchecked")
102+
ListenableFuture<GetJournaledEditsResponseProto> result =
103+
(ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
104+
return result;
105+
}).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
106+
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
107+
}
108+
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java

Lines changed: 8 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
2323
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
2424
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
25+
import static org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil.spyGetJournaledEdits;
2526
import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows;
2627
import static org.junit.Assert.assertEquals;
2728
import static org.junit.Assert.assertFalse;
@@ -34,12 +35,10 @@
3435
import java.io.IOException;
3536
import java.net.InetSocketAddress;
3637
import java.net.URI;
37-
import java.net.URISyntaxException;
3838
import java.net.URL;
3939
import java.net.UnknownHostException;
4040
import java.util.ArrayList;
4141
import java.util.List;
42-
import java.util.concurrent.ExecutorService;
4342
import java.util.concurrent.Semaphore;
4443
import java.util.concurrent.TimeoutException;
4544
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,7 +58,6 @@
5958
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
6059
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
6160
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
62-
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
6361
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
6462
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
6563
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@@ -68,7 +66,6 @@
6866
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
6967
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
7068
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
71-
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
7269
import org.apache.hadoop.io.IOUtils;
7370
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
7471
import org.apache.hadoop.test.GenericTestUtils;
@@ -1135,9 +1132,9 @@ public void testSelectViaRPCAfterJNJitter() throws Exception {
11351132
writeTxns(stm, 21, 20);
11361133

11371134
Semaphore semaphore = new Semaphore(0);
1138-
spyGetJournaledEdits(0, 21, () -> semaphore.release(1));
1139-
spyGetJournaledEdits(1, 21, () -> semaphore.release(1));
1140-
spyGetJournaledEdits(2, 21, () -> semaphore.acquireUninterruptibly(2));
1135+
spyGetJournaledEdits(spies, 0, 21, () -> semaphore.release(1));
1136+
spyGetJournaledEdits(spies, 1, 21, () -> semaphore.release(1));
1137+
spyGetJournaledEdits(spies, 2, 21, () -> semaphore.acquireUninterruptibly(2));
11411138

11421139
List<EditLogInputStream> streams = new ArrayList<>();
11431140
qjm.selectInputStreams(streams, 21, true, true);
@@ -1147,17 +1144,6 @@ public void testSelectViaRPCAfterJNJitter() throws Exception {
11471144
assertEquals(40, streams.get(0).getLastTxId());
11481145
}
11491146

1150-
private void spyGetJournaledEdits(int jnSpyIdx, long fromTxId, Runnable preHook) {
1151-
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
1152-
preHook.run();
1153-
@SuppressWarnings("unchecked")
1154-
ListenableFuture<GetJournaledEditsResponseProto> result =
1155-
(ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
1156-
return result;
1157-
}).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
1158-
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
1159-
}
1160-
11611147
@Test
11621148
public void testSelectViaRpcAfterJNRestart() throws Exception {
11631149
EditLogOutputStream stm =
@@ -1210,27 +1196,10 @@ public void testGetJournalAddressListWithResolution() throws Exception {
12101196
// expected
12111197
}
12121198
}
1213-
1214-
private QuorumJournalManager createSpyingQJM()
1215-
throws IOException, URISyntaxException {
1216-
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
1217-
@Override
1218-
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
1219-
String journalId, String nameServiceId, InetSocketAddress addr) {
1220-
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
1221-
nameServiceId, addr) {
1222-
protected ExecutorService createSingleThreadExecutor() {
1223-
// Don't parallelize calls to the quorum in the tests.
1224-
// This makes the tests more deterministic.
1225-
return new DirectExecutorService();
1226-
}
1227-
};
1228-
1229-
return Mockito.spy(logger);
1230-
}
1231-
};
1232-
return closeLater(new QuorumJournalManager(
1233-
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory));
1199+
1200+
private QuorumJournalManager createSpyingQJM() throws IOException {
1201+
return closeLater(SpyQJournalUtil.createSpyingQJM(
1202+
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, null));
12341203
}
12351204

12361205
private static void waitForAllPendingCalls(AsyncLoggerSet als)

0 commit comments

Comments
 (0)