Skip to content

Commit f8c0e26

Browse files
committed
Fix serial fsimage transfer during checkpoint with multiple namenodes
1 parent dd8fd7e commit f8c0e26

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,10 @@ private void doCheckpoint() throws InterruptedException, IOException {
248248
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
249249
// than the expected number of tasks to run or queue up
250250
// See HDFS-4816
251-
ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
252-
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
253-
uploadThreadFactory);
251+
ExecutorService executor =
252+
new ThreadPoolExecutor(activeNNAddresses.size(), activeNNAddresses.size(), 100,
253+
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
254+
uploadThreadFactory);
254255
// for right now, just match the upload to the nn address by convention. There is no need to
255256
// directly tie them together by adding a pair class.
256257
HashMap<String, Future<TransferFsImage.TransferResult>> uploads =

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,21 @@ public void testCheckpointCancellationDuringUpload() throws Exception {
458458
cluster.transitionToStandby(0);
459459
cluster.transitionToActive(1);
460460

461+
GenericTestUtils.waitFor(new Supplier<Boolean>() {
462+
@Override
463+
public Boolean get() {
464+
int transferThreadCount = 0;
465+
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
466+
ThreadInfo[] threads = threadBean.getThreadInfo(
467+
threadBean.getAllThreadIds(), 1);
468+
for (ThreadInfo thread: threads) {
469+
if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
470+
transferThreadCount++;
471+
}
472+
}
473+
return transferThreadCount == NUM_NNS - 1;
474+
}
475+
}, 1000, 30000);
461476

462477
// Wait to make sure background TransferFsImageUpload thread was cancelled.
463478
// This needs to be done before the next test in the suite starts, so that a

0 commit comments

Comments
 (0)