Skip to content

Commit 3586f00

Browse files
author
limingxiang02
committed
HDFS-15382. Split one FsDatasetImpl lock to block pool grain locks.
1 parent 02f6bad commit 3586f00

22 files changed

+384
-447
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
5454
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
5555
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
56+
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
5657
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
5758
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
5859
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -307,6 +308,10 @@ private void connectToNNAndHandshake() throws IOException {
307308
// info.
308309
NamespaceInfo nsInfo = retrieveNamespaceInfo();
309310

311+
// init block pool lock when init.
312+
dn.getDataSetLockManager().addLock(LockLevel.BLOCK_POOl,
313+
nsInfo.getBlockPoolID());
314+
310315
// Verify that this matches the other NN in this HA pair.
311316
// This also initializes our block pool in the DN if we are
312317
// the first NN connection for this BP.

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
4242
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
4343
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
44+
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
4445
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
4546
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
4647
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@@ -256,7 +257,8 @@ class BlockSender implements java.io.Closeable {
256257
// the append write.
257258
ChunkChecksum chunkChecksum = null;
258259
final long replicaVisibleLength;
259-
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
260+
try (AutoCloseableLock lock = datanode.getDataSetLockManager().readLock(
261+
LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
260262
replica = getReplica(block, datanode);
261263
replicaVisibleLength = replica.getVisibleLength();
262264
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@
129129
import org.apache.hadoop.hdfs.DFSUtilClient;
130130
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
131131
import org.apache.hadoop.hdfs.HdfsConfiguration;
132+
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
132133
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
133134
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
134135
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -408,6 +409,7 @@ public static InetSocketAddress createSocketAddr(String target) {
408409
.availableProcessors();
409410
private static final double CONGESTION_RATIO = 1.5;
410411
private DiskBalancer diskBalancer;
412+
private DataSetLockManager dataSetLockManager;
411413

412414
private final ExecutorService xferService;
413415

@@ -449,6 +451,7 @@ private static Tracer createTracer(Configuration conf) {
449451
this.pipelineSupportSlownode = false;
450452
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
451453
this.dnConf = new DNConf(this);
454+
this.dataSetLockManager = new DataSetLockManager(conf);
452455
initOOBTimeout();
453456
storageLocationChecker = null;
454457
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
@@ -467,6 +470,7 @@ private static Tracer createTracer(Configuration conf) {
467470
super(conf);
468471
this.tracer = createTracer(conf);
469472
this.fileIoProvider = new FileIoProvider(conf, this);
473+
this.dataSetLockManager = new DataSetLockManager(conf);
470474
this.blockScanner = new BlockScanner(this);
471475
this.lastDiskErrorCheck = 0;
472476
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -2324,6 +2328,7 @@ public void shutdown() {
23242328
notifyAll();
23252329
}
23262330
tracer.close();
2331+
dataSetLockManager.lockLeakCheck();
23272332
}
23282333

23292334
/**
@@ -3230,7 +3235,8 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
32303235
final BlockConstructionStage stage;
32313236

32323237
//get replica information
3233-
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
3238+
try (AutoCloseableLock lock = dataSetLockManager.readLock(
3239+
LockLevel.BLOCK_POOl, b.getBlockPoolId())) {
32343240
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
32353241
b.getBlockId());
32363242
if (null == storedBlock) {
@@ -3947,6 +3953,10 @@ private static boolean isWrite(BlockConstructionStage stage) {
39473953
|| stage == PIPELINE_SETUP_APPEND_RECOVERY);
39483954
}
39493955

3956+
public DataSetLockManager getDataSetLockManager() {
3957+
return dataSetLockManager;
3958+
}
3959+
39503960
boolean isSlownodeByNameserviceId(String nsId) {
39513961
return blockPoolManager.isSlownodeByNameserviceId(nsId);
39523962
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.hadoop.conf.Configuration;
2626
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi
2727
.FsVolumeReferences;
28-
import org.apache.hadoop.util.AutoCloseableLock;
2928
import org.apache.hadoop.hdfs.DFSConfigKeys;
3029
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
3130
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
@@ -502,16 +501,13 @@ private void createWorkPlan(NodePlan plan) throws DiskBalancerException {
502501
private Map<String, String> getStorageIDToVolumeBasePathMap()
503502
throws DiskBalancerException {
504503
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
505-
FsDatasetSpi.FsVolumeReferences references;
506-
try {
507-
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
508-
references = this.dataset.getFsVolumeReferences();
509-
for (int ndx = 0; ndx < references.size(); ndx++) {
510-
FsVolumeSpi vol = references.get(ndx);
511-
storageIDToVolBasePathMap.put(vol.getStorageID(),
512-
vol.getBaseURI().getPath());
513-
}
514-
references.close();
504+
// Get volumes snapshot so no need to acquire dataset lock.
505+
try (FsDatasetSpi.FsVolumeReferences references = dataset.
506+
getFsVolumeReferences()) {
507+
for (int ndx = 0; ndx < references.size(); ndx++) {
508+
FsVolumeSpi vol = references.get(ndx);
509+
storageIDToVolBasePathMap.put(vol.getStorageID(),
510+
vol.getBaseURI().getPath());
515511
}
516512
} catch (IOException ex) {
517513
LOG.error("Disk Balancer - Internal Error.", ex);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@
3535
import org.apache.hadoop.classification.InterfaceAudience;
3636
import org.apache.hadoop.conf.Configuration;
3737
import org.apache.hadoop.fs.StorageType;
38+
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
39+
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
3840
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
39-
import org.apache.hadoop.util.AutoCloseableLock;
4041
import org.apache.hadoop.hdfs.DFSConfigKeys;
4142
import org.apache.hadoop.hdfs.protocol.Block;
4243
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -241,7 +242,7 @@ StorageReport[] getStorageReports(String bpid)
241242
* Gets a list of references to the finalized blocks for the given block pool.
242243
* <p>
243244
* Callers of this function should call
244-
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
245+
* {@link FsDatasetSpi#acquireDatasetLockManager} to avoid blocks' status being
245246
* changed during list iteration.
246247
* </p>
247248
* @return a list of references to the finalized blocks for the given block
@@ -657,21 +658,12 @@ ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
657658
ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
658659
FsVolumeSpi destination) throws IOException;
659660

660-
/**
661-
* Acquire the lock of the data set. This prevents other threads from
662-
* modifying the volume map structure inside the datanode, but other changes
663-
* are still possible. For example modifying the genStamp of a block instance.
664-
*/
665-
AutoCloseableLock acquireDatasetLock();
666-
667661
/***
668-
* Acquire the read lock of the data set. This prevents other threads from
669-
* modifying the volume map structure inside the datanode, but other changes
670-
* are still possible. For example modifying the genStamp of a block instance.
662+
* Acquire lock Manager for the data set. This prevents other threads from
663+
* modifying the volume map structure inside the datanode.
671664
* @return The AutoClosable read lock instance.
672665
*/
673-
AutoCloseableLock acquireDatasetReadLock();
674-
666+
DataNodeLockManager<? extends AutoCloseDataSetLock> acquireDatasetLockManager();
675667

676668
/**
677669
* Deep copy the replica info belonging to given block pool.

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.concurrent.ForkJoinTask;
4444
import java.util.concurrent.RecursiveAction;
4545
import java.util.concurrent.atomic.AtomicLong;
46-
import java.util.concurrent.locks.ReentrantReadWriteLock;
4746
import java.util.concurrent.TimeUnit;
4847

4948
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
@@ -914,7 +913,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) {
914913

915914
private boolean readReplicasFromCache(ReplicaMap volumeMap,
916915
final RamDiskReplicaTracker lazyWriteReplicaMap) {
917-
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
916+
ReplicaMap tmpReplicaMap = new ReplicaMap();
918917
File replicaFile = new File(replicaCacheDir, REPLICA_CACHE_FILE);
919918
// Check whether the file exists or not.
920919
if (!replicaFile.exists()) {

0 commit comments

Comments
 (0)