Skip to content

Commit 6f4ffd6

Browse files
author
limingxiang02
committed
HDFS-16534. Split FsDatasetImpl from block pool locks to volume grain locks.
1 parent 15a5ea2 commit 6f4ffd6

File tree

5 files changed

+95
-41
lines changed

5 files changed

+95
-41
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

Lines changed: 70 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,9 @@ private synchronized void activateVolume(
432432
ReplicaMap replicaMap,
433433
Storage.StorageDirectory sd, StorageType storageType,
434434
FsVolumeReference ref) throws IOException {
435+
for (String bp : volumeMap.getBlockPoolList()) {
436+
lockManager.addLock(LockLevel.VOLUME, bp, ref.getVolume().getStorageID());
437+
}
435438
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
436439
if (dnStorage != null) {
437440
final String errorMsg = String.format(
@@ -629,6 +632,9 @@ public void removeVolumes(
629632
synchronized (this) {
630633
for (String storageUuid : storageToRemove) {
631634
storageMap.remove(storageUuid);
635+
for (String bp : volumeMap.getBlockPoolList()) {
636+
lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid);
637+
}
632638
}
633639
}
634640
}
@@ -906,8 +912,8 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid)
906912
@Override // FsDatasetSpi
907913
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
908914
long blkOffset, long metaOffset) throws IOException {
909-
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl,
910-
b.getBlockPoolId())) {
915+
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
916+
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
911917
ReplicaInfo info = getReplicaInfo(b);
912918
FsVolumeReference ref = info.getVolume().obtainReference();
913919
try {
@@ -1372,8 +1378,8 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
13721378
@Override // FsDatasetSpi
13731379
public ReplicaHandler append(ExtendedBlock b,
13741380
long newGS, long expectedBlockLen) throws IOException {
1375-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
1376-
b.getBlockPoolId())) {
1381+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
1382+
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
13771383
// If the block was successfully finalized because all packets
13781384
// were successfully processed at the Datanode but the ack for
13791385
// some of the packets were not received by the client. The client
@@ -1425,7 +1431,8 @@ public ReplicaHandler append(ExtendedBlock b,
14251431
private ReplicaInPipeline append(String bpid,
14261432
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
14271433
throws IOException {
1428-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
1434+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
1435+
bpid, replicaInfo.getStorageUuid())) {
14291436
// If the block is cached, start uncaching it.
14301437
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
14311438
throw new IOException("Only a Finalized replica can be appended to; "
@@ -1554,8 +1561,8 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
15541561
LOG.info("Recover failed close " + b);
15551562
while (true) {
15561563
try {
1557-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
1558-
b.getBlockPoolId())) {
1564+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
1565+
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
15591566
// check replica's state
15601567
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
15611568
// bump the replica's GS
@@ -1578,7 +1585,7 @@ public ReplicaHandler createRbw(
15781585
StorageType storageType, String storageId, ExtendedBlock b,
15791586
boolean allowLazyPersist) throws IOException {
15801587
long startTimeMs = Time.monotonicNow();
1581-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
1588+
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
15821589
b.getBlockPoolId())) {
15831590
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
15841591
b.getBlockId());
@@ -1626,20 +1633,20 @@ public ReplicaHandler createRbw(
16261633
}
16271634

16281635
ReplicaInPipeline newReplicaInfo;
1629-
try {
1636+
try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME,
1637+
b.getBlockPoolId(), v.getStorageID())) {
16301638
newReplicaInfo = v.createRbw(b);
16311639
if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
16321640
throw new IOException("CreateRBW returned a replica of state "
16331641
+ newReplicaInfo.getReplicaInfo().getState()
16341642
+ " for block " + b.getBlockId());
16351643
}
1644+
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
1645+
return new ReplicaHandler(newReplicaInfo, ref);
16361646
} catch (IOException e) {
16371647
IOUtils.cleanupWithLogger(null, ref);
16381648
throw e;
16391649
}
1640-
1641-
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
1642-
return new ReplicaHandler(newReplicaInfo, ref);
16431650
} finally {
16441651
if (dataNodeMetrics != null) {
16451652
long createRbwMs = Time.monotonicNow() - startTimeMs;
@@ -1657,8 +1664,8 @@ public ReplicaHandler recoverRbw(
16571664
try {
16581665
while (true) {
16591666
try {
1660-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
1661-
b.getBlockPoolId())) {
1667+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
1668+
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
16621669
ReplicaInfo replicaInfo =
16631670
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
16641671
// check the replica's state
@@ -1689,8 +1696,8 @@ public ReplicaHandler recoverRbw(
16891696
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
16901697
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
16911698
throws IOException {
1692-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
1693-
b.getBlockPoolId())) {
1699+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
1700+
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
16941701
// check generation stamp
16951702
long replicaGenerationStamp = rbw.getGenerationStamp();
16961703
if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1751,8 +1758,8 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
17511758
public ReplicaInPipeline convertTemporaryToRbw(
17521759
final ExtendedBlock b) throws IOException {
17531760
long startTimeMs = Time.monotonicNow();
1754-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
1755-
b.getBlockPoolId())) {
1761+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
1762+
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
17561763
final long blockId = b.getBlockId();
17571764
final long expectedGs = b.getGenerationStamp();
17581765
final long visible = b.getNumBytes();
@@ -1887,12 +1894,12 @@ public ReplicaHandler createTemporary(StorageType storageType,
18871894
false);
18881895
}
18891896
long startHoldLockTimeMs = Time.monotonicNow();
1890-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
1891-
b.getBlockPoolId())) {
1892-
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
1893-
.getNumBytes());
1894-
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
1895-
ReplicaInPipeline newReplicaInfo;
1897+
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
1898+
.getNumBytes());
1899+
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
1900+
ReplicaInPipeline newReplicaInfo;
1901+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
1902+
b.getBlockPoolId(), v.getStorageID())) {
18961903
try {
18971904
newReplicaInfo = v.createTemporary(b);
18981905
LOG.debug("creating temporary for block: {} on volume: {}",
@@ -1949,8 +1956,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
19491956
ReplicaInfo replicaInfo = null;
19501957
ReplicaInfo finalizedReplicaInfo = null;
19511958
long startTimeMs = Time.monotonicNow();
1952-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
1953-
b.getBlockPoolId())) {
1959+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
1960+
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
19541961
if (Thread.interrupted()) {
19551962
// Don't allow data modifications from interrupted threads
19561963
throw new IOException("Cannot finalize block from Interrupted Thread");
@@ -1986,7 +1993,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
19861993

19871994
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
19881995
throws IOException {
1989-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
1996+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
1997+
bpid, replicaInfo.getStorageUuid())) {
19901998
// Compare generation stamp of old and new replica before finalizing
19911999
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
19922000
> replicaInfo.getGenerationStamp()) {
@@ -2032,8 +2040,8 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
20322040
@Override // FsDatasetSpi
20332041
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
20342042
long startTimeMs = Time.monotonicNow();
2035-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
2036-
b.getBlockPoolId())) {
2043+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
2044+
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
20372045
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
20382046
b.getLocalBlock());
20392047
if (replicaInfo != null &&
@@ -2423,10 +2431,17 @@ private void cacheBlock(String bpid, long blockId) {
24232431
long length, genstamp;
24242432
Executor volumeExecutor;
24252433

2426-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
2427-
ReplicaInfo info = volumeMap.get(bpid, blockId);
2434+
ReplicaInfo info = volumeMap.get(bpid, blockId);
2435+
if (info == null) {
2436+
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
2437+
bpid + ": ReplicaInfo not found.");
2438+
return;
2439+
}
2440+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
2441+
info.getStorageUuid())) {
24282442
boolean success = false;
24292443
try {
2444+
info = volumeMap.get(bpid, blockId);
24302445
if (info == null) {
24312446
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
24322447
bpid + ": ReplicaInfo not found.");
@@ -2619,7 +2634,8 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
26192634
curDirScannerNotifyCount = 0;
26202635
lastDirScannerNotifyTime = startTimeMs;
26212636
}
2622-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
2637+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
2638+
vol.getStorageID())) {
26232639
memBlockInfo = volumeMap.get(bpid, blockId);
26242640
if (memBlockInfo != null &&
26252641
memBlockInfo.getState() != ReplicaState.FINALIZED) {
@@ -2860,7 +2876,14 @@ ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
28602876
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
28612877
while (true) {
28622878
try {
2863-
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
2879+
ReplicaInfo replica = map.get(bpid, block.getBlockId());
2880+
if (replica == null) {
2881+
return null;
2882+
}
2883+
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
2884+
+ ", replica=" + replica);
2885+
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid,
2886+
replica.getStorageUuid())) {
28642887
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
28652888
}
28662889
} catch (MustStopExistingWriter e) {
@@ -2875,7 +2898,14 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
28752898
lockManager) throws IOException {
28762899
while (true) {
28772900
try {
2878-
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
2901+
ReplicaInfo replica = map.get(bpid, block.getBlockId());
2902+
if (replica == null) {
2903+
return null;
2904+
}
2905+
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
2906+
+ ", replica=" + replica);
2907+
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid,
2908+
replica.getStorageUuid())) {
28792909
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
28802910
}
28812911
} catch (MustStopExistingWriter e) {
@@ -2888,9 +2918,6 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
28882918
Block block, long recoveryId)
28892919
throws IOException, MustStopExistingWriter {
28902920
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
2891-
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
2892-
+ ", replica=" + replica);
2893-
28942921
//check replica
28952922
if (replica == null) {
28962923
return null;
@@ -2964,8 +2991,8 @@ public Replica updateReplicaUnderRecovery(
29642991
final long newBlockId,
29652992
final long newlength) throws IOException {
29662993
long startTimeMs = Time.monotonicNow();
2967-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
2968-
oldBlock.getBlockPoolId())) {
2994+
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
2995+
oldBlock.getBlockPoolId(), getReplicaInfo(oldBlock).getStorageUuid())) {
29692996
//get replica
29702997
final String bpid = oldBlock.getBlockPoolId();
29712998
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@@ -3109,6 +3136,10 @@ public void addBlockPool(String bpid, Configuration conf)
31093136
volumeExceptions.mergeException(e);
31103137
}
31113138
volumeMap.initBlockPool(bpid);
3139+
Set<String> vols = storageMap.keySet();
3140+
for (String v : vols) {
3141+
lockManager.addLock(LockLevel.VOLUME, bpid, v);
3142+
}
31123143
}
31133144
try {
31143145
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.apache.hadoop.util.Lists;
8989
import org.apache.hadoop.util.Time;
9090
import org.junit.Before;
91+
import org.junit.After;
9192
import org.junit.Test;
9293
import org.mockito.ArgumentCaptor;
9394
import org.mockito.Mockito;
@@ -157,6 +158,13 @@ public void setupMocks() throws Exception {
157158
Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager();
158159
}
159160

161+
@After
162+
public void checkDataSetLockManager() {
163+
dataSetLockManager.lockLeakCheck();
164+
// make sure no lock Leak.
165+
assertNull(dataSetLockManager.getLastException());
166+
}
167+
160168
/**
161169
* Set up a mock NN with the bare minimum for a DN to register to it.
162170
*/

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
3838

3939
public class ExternalVolumeImpl implements FsVolumeSpi {
40+
private final String defaultStroageId = "test";
4041
@Override
4142
public FsVolumeReference obtainReference() throws ClosedChannelException {
4243
return null;
@@ -54,7 +55,7 @@ public long getAvailable() throws IOException {
5455

5556
@Override
5657
public String getStorageID() {
57-
return null;
58+
return defaultStroageId;
5859
}
5960

6061
@Override

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.hadoop.util.StringUtils;
8181
import org.junit.Assert;
8282
import org.junit.Before;
83+
import org.junit.After;
8384
import org.junit.Rule;
8485
import org.junit.Test;
8586
import org.junit.rules.TestName;
@@ -236,6 +237,13 @@ public void setUp() throws IOException {
236237
assertEquals(0, dataset.getNumFailedVolumes());
237238
}
238239

240+
@After
241+
public void checkDataSetLockManager() {
242+
manager.lockLeakCheck();
243+
// make sure no lock Leak.
244+
assertNull(manager.getLastException());
245+
}
246+
239247
@Test
240248
public void testAddVolumes() throws IOException {
241249
final int numNewVolumes = 3;
@@ -687,6 +695,7 @@ public void testAddVolumeFailureReleasesInUseLock() throws IOException {
687695
FsDatasetImpl spyDataset = spy(dataset);
688696
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
689697
File badDir = new File(BASE_DIR, "bad");
698+
when(mockVolume.getStorageID()).thenReturn("test");
690699
badDir.mkdirs();
691700
doReturn(mockVolume).when(spyDataset)
692701
.createFsVolume(anyString(), any(StorageDirectory.class),

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.hadoop.hdfs.server.datanode.Replica;
5252
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
5353
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
54+
import org.apache.hadoop.hdfs.server.datanode.extdataset.ExternalVolumeImpl;
5455
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
5556
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
5657
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
@@ -218,7 +219,7 @@ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
218219
}
219220

220221
private static ReplicaInfo createReplicaInfo(Block b) {
221-
return new FinalizedReplica(b, null, null);
222+
return new FinalizedReplica(b, new ExternalVolumeImpl(), null);
222223
}
223224

224225
private static void assertEquals(ReplicaInfo originalInfo, ReplicaRecoveryInfo recoveryInfo) {
@@ -318,6 +319,10 @@ public void testInitReplicaRecovery() throws IOException {
318319
"replica.getGenerationStamp() < block.getGenerationStamp(), block=");
319320
}
320321
}
322+
323+
manager.lockLeakCheck();
324+
// make sure no lock Leak.
325+
assertNull(manager.getLastException());
321326
}
322327

323328
/**

0 commit comments

Comments
 (0)