Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
Expand Down Expand Up @@ -307,6 +308,10 @@ private void connectToNNAndHandshake() throws IOException {
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();

// init block pool lock when init.
dn.getDataSetLockManager().addLock(LockLevel.BLOCK_POOl,
nsInfo.getBlockPoolID());

// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
Expand Down Expand Up @@ -256,7 +257,8 @@ class BlockSender implements java.io.Closeable {
// the append write.
ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength;
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
try (AutoCloseableLock lock = datanode.getDataSetLockManager().readLock(
LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
Expand Down Expand Up @@ -408,6 +409,7 @@ public static InetSocketAddress createSocketAddr(String target) {
.availableProcessors();
private static final double CONGESTION_RATIO = 1.5;
private DiskBalancer diskBalancer;
private DataSetLockManager dataSetLockManager;

private final ExecutorService xferService;

Expand Down Expand Up @@ -449,6 +451,7 @@ private static Tracer createTracer(Configuration conf) {
this.pipelineSupportSlownode = false;
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
this.dnConf = new DNConf(this);
this.dataSetLockManager = new DataSetLockManager(conf);
initOOBTimeout();
storageLocationChecker = null;
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
Expand All @@ -467,6 +470,7 @@ private static Tracer createTracer(Configuration conf) {
super(conf);
this.tracer = createTracer(conf);
this.fileIoProvider = new FileIoProvider(conf, this);
this.dataSetLockManager = new DataSetLockManager(conf);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
Expand Down Expand Up @@ -2324,6 +2328,7 @@ public void shutdown() {
notifyAll();
}
tracer.close();
dataSetLockManager.lockLeakCheck();
}

/**
Expand Down Expand Up @@ -3230,7 +3235,8 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final BlockConstructionStage stage;

//get replica information
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
try (AutoCloseableLock lock = dataSetLockManager.readLock(
LockLevel.BLOCK_POOl, b.getBlockPoolId())) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {
Expand Down Expand Up @@ -3947,6 +3953,10 @@ private static boolean isWrite(BlockConstructionStage stage) {
|| stage == PIPELINE_SETUP_APPEND_RECOVERY);
}

public DataSetLockManager getDataSetLockManager() {
return dataSetLockManager;
}

boolean isSlownodeByNameserviceId(String nsId) {
return blockPoolManager.isSlownodeByNameserviceId(nsId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi
.FsVolumeReferences;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
Expand Down Expand Up @@ -502,16 +501,13 @@ private void createWorkPlan(NodePlan plan) throws DiskBalancerException {
private Map<String, String> getStorageIDToVolumeBasePathMap()
throws DiskBalancerException {
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references;
try {
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just suggest to keep it and improve to #dataSetLock as other improvement. We should create another jira to remove it if this is not necessary now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method just get volumes snapshot so no need to acquire dataset lock.

references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
storageIDToVolBasePathMap.put(vol.getStorageID(),
vol.getBaseURI().getPath());
}
references.close();
// Get volumes snapshot so no need to acquire dataset lock.
try (FsDatasetSpi.FsVolumeReferences references = dataset.
getFsVolumeReferences()) {
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
storageIDToVolBasePathMap.put(vol.getStorageID(),
vol.getBaseURI().getPath());
}
} catch (IOException ex) {
LOG.error("Disk Balancer - Internal Error.", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
Expand Down Expand Up @@ -241,7 +242,7 @@ StorageReport[] getStorageReports(String bpid)
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
* {@link FsDatasetSpi#acquireDatasetLockManager} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
Expand Down Expand Up @@ -657,21 +658,12 @@ ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
FsVolumeSpi destination) throws IOException;

/**
* Acquire the lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
*/
AutoCloseableLock acquireDatasetLock();

/***
* Acquire the read lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
* Acquire lock Manager for the data set. This prevents other threads from
* modifying the volume map structure inside the datanode.
* @return The AutoClosable read lock instance.
*/
AutoCloseableLock acquireDatasetReadLock();

DataNodeLockManager<? extends AutoCloseDataSetLock> acquireDatasetLockManager();

/**
* Deep copy the replica info belonging to given block pool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
Expand Down Expand Up @@ -914,7 +913,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) {

private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) {
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
ReplicaMap tmpReplicaMap = new ReplicaMap();
File replicaFile = new File(replicaCacheDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not.
if (!replicaFile.exists()) {
Expand Down
Loading