Skip to content

Commit a32cfc2

Browse files
committed
HDFS-15382. Split one FsDatasetImpl lock to block pool grain locks. (#3941). Contributed by limingxiang.
Signed-off-by: He Xiaoqiao <[email protected]> Signed-off-by: litao <[email protected]>
1 parent 672e380 commit a32cfc2

22 files changed

+384
-447
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
5656
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
5757
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
58+
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
5859
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
5960
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
6061
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -309,6 +310,10 @@ private void connectToNNAndHandshake() throws IOException {
309310
// info.
310311
NamespaceInfo nsInfo = retrieveNamespaceInfo();
311312

313+
// init block pool lock when init.
314+
dn.getDataSetLockManager().addLock(LockLevel.BLOCK_POOl,
315+
nsInfo.getBlockPoolID());
316+
312317
// Verify that this matches the other NN in this HA pair.
313318
// This also initializes our block pool in the DN if we are
314319
// the first NN connection for this BP.

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
4242
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
4343
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
44+
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
4445
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
4546
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
4647
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@@ -256,7 +257,8 @@ class BlockSender implements java.io.Closeable {
256257
// the append write.
257258
ChunkChecksum chunkChecksum = null;
258259
final long replicaVisibleLength;
259-
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
260+
try (AutoCloseableLock lock = datanode.getDataSetLockManager().readLock(
261+
LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
260262
replica = getReplica(block, datanode);
261263
replicaVisibleLength = replica.getVisibleLength();
262264
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
import org.apache.hadoop.hdfs.DFSUtilClient;
147147
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
148148
import org.apache.hadoop.hdfs.HdfsConfiguration;
149+
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
149150
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
150151
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
151152
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -433,6 +434,7 @@ public static InetSocketAddress createSocketAddr(String target) {
433434
.availableProcessors();
434435
private static final double CONGESTION_RATIO = 1.5;
435436
private DiskBalancer diskBalancer;
437+
private DataSetLockManager dataSetLockManager;
436438

437439
private final ExecutorService xferService;
438440

@@ -474,6 +476,7 @@ private static Tracer createTracer(Configuration conf) {
474476
this.pipelineSupportSlownode = false;
475477
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
476478
this.dnConf = new DNConf(this);
479+
this.dataSetLockManager = new DataSetLockManager(conf);
477480
initOOBTimeout();
478481
storageLocationChecker = null;
479482
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
@@ -492,6 +495,7 @@ private static Tracer createTracer(Configuration conf) {
492495
super(conf);
493496
this.tracer = createTracer(conf);
494497
this.fileIoProvider = new FileIoProvider(conf, this);
498+
this.dataSetLockManager = new DataSetLockManager(conf);
495499
this.blockScanner = new BlockScanner(this);
496500
this.lastDiskErrorCheck = 0;
497501
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -2461,6 +2465,7 @@ public void shutdown() {
24612465
notifyAll();
24622466
}
24632467
tracer.close();
2468+
dataSetLockManager.lockLeakCheck();
24642469
}
24652470

24662471
/**
@@ -3367,7 +3372,8 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
33673372
final BlockConstructionStage stage;
33683373

33693374
//get replica information
3370-
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
3375+
try (AutoCloseableLock lock = dataSetLockManager.readLock(
3376+
LockLevel.BLOCK_POOl, b.getBlockPoolId())) {
33713377
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
33723378
b.getBlockId());
33733379
if (null == storedBlock) {
@@ -4084,6 +4090,10 @@ private static boolean isWrite(BlockConstructionStage stage) {
40844090
|| stage == PIPELINE_SETUP_APPEND_RECOVERY);
40854091
}
40864092

4093+
public DataSetLockManager getDataSetLockManager() {
4094+
return dataSetLockManager;
4095+
}
4096+
40874097
boolean isSlownodeByNameserviceId(String nsId) {
40884098
return blockPoolManager.isSlownodeByNameserviceId(nsId);
40894099
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.hadoop.conf.Configuration;
2626
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi
2727
.FsVolumeReferences;
28-
import org.apache.hadoop.util.AutoCloseableLock;
2928
import org.apache.hadoop.hdfs.DFSConfigKeys;
3029
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
3130
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
@@ -502,16 +501,13 @@ private void createWorkPlan(NodePlan plan) throws DiskBalancerException {
502501
private Map<String, String> getStorageIDToVolumeBasePathMap()
503502
throws DiskBalancerException {
504503
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
505-
FsDatasetSpi.FsVolumeReferences references;
506-
try {
507-
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
508-
references = this.dataset.getFsVolumeReferences();
509-
for (int ndx = 0; ndx < references.size(); ndx++) {
510-
FsVolumeSpi vol = references.get(ndx);
511-
storageIDToVolBasePathMap.put(vol.getStorageID(),
512-
vol.getBaseURI().getPath());
513-
}
514-
references.close();
504+
// Get volumes snapshot so no need to acquire dataset lock.
505+
try (FsDatasetSpi.FsVolumeReferences references = dataset.
506+
getFsVolumeReferences()) {
507+
for (int ndx = 0; ndx < references.size(); ndx++) {
508+
FsVolumeSpi vol = references.get(ndx);
509+
storageIDToVolBasePathMap.put(vol.getStorageID(),
510+
vol.getBaseURI().getPath());
515511
}
516512
} catch (IOException ex) {
517513
LOG.error("Disk Balancer - Internal Error.", ex);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@
3535
import org.apache.hadoop.classification.InterfaceAudience;
3636
import org.apache.hadoop.conf.Configuration;
3737
import org.apache.hadoop.fs.StorageType;
38+
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
39+
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
3840
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
39-
import org.apache.hadoop.util.AutoCloseableLock;
4041
import org.apache.hadoop.hdfs.DFSConfigKeys;
4142
import org.apache.hadoop.hdfs.protocol.Block;
4243
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -241,7 +242,7 @@ StorageReport[] getStorageReports(String bpid)
241242
* Gets a list of references to the finalized blocks for the given block pool.
242243
* <p>
243244
* Callers of this function should call
244-
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
245+
* {@link FsDatasetSpi#acquireDatasetLockManager} to avoid blocks' status being
245246
* changed during list iteration.
246247
* </p>
247248
* @return a list of references to the finalized blocks for the given block
@@ -657,21 +658,12 @@ ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
657658
ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
658659
FsVolumeSpi destination) throws IOException;
659660

660-
/**
661-
* Acquire the lock of the data set. This prevents other threads from
662-
* modifying the volume map structure inside the datanode, but other changes
663-
* are still possible. For example modifying the genStamp of a block instance.
664-
*/
665-
AutoCloseableLock acquireDatasetLock();
666-
667661
/***
668-
* Acquire the read lock of the data set. This prevents other threads from
669-
* modifying the volume map structure inside the datanode, but other changes
670-
* are still possible. For example modifying the genStamp of a block instance.
662+
* Acquire lock Manager for the data set. This prevents other threads from
663+
* modifying the volume map structure inside the datanode.
671664
* @return The AutoClosable read lock instance.
672665
*/
673-
AutoCloseableLock acquireDatasetReadLock();
674-
666+
DataNodeLockManager<? extends AutoCloseDataSetLock> acquireDatasetLockManager();
675667

676668
/**
677669
* Deep copy the replica info belonging to given block pool.

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.concurrent.ForkJoinTask;
4444
import java.util.concurrent.RecursiveAction;
4545
import java.util.concurrent.atomic.AtomicLong;
46-
import java.util.concurrent.locks.ReentrantReadWriteLock;
4746
import java.util.concurrent.TimeUnit;
4847

4948
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
@@ -914,7 +913,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) {
914913

915914
private boolean readReplicasFromCache(ReplicaMap volumeMap,
916915
final RamDiskReplicaTracker lazyWriteReplicaMap) {
917-
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
916+
ReplicaMap tmpReplicaMap = new ReplicaMap();
918917
File replicaFile = new File(replicaCacheDir, REPLICA_CACHE_FILE);
919918
// Check whether the file exists or not.
920919
if (!replicaFile.exists()) {

0 commit comments

Comments
 (0)