diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index c7b8f5f3078fc..164ea5fed0b1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -1953,7 +1953,7 @@ public static StorageReport convert(StorageReportProto p) { return new StorageReport(p.hasStorage() ? convert(p.getStorage()) : new DatanodeStorage(p.getStorageUuid()), p.getFailed(), p.getCapacity(), p.getDfsUsed(), p.getRemaining(), - p.getBlockPoolUsed(), nonDfsUsed); + p.getBlockPoolUsed(), nonDfsUsed, p.getMount()); } public static DatanodeStorage convert(DatanodeStorageProto s) { @@ -2688,7 +2688,8 @@ public static StorageReportProto convert(StorageReport r) { .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining()) .setStorageUuid(r.getStorage().getStorageID()) .setStorage(convert(r.getStorage())) - .setNonDfsUsed(r.getNonDfsUsed()); + .setNonDfsUsed(r.getNonDfsUsed()) + .setMount(r.getMount()); return builder.build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java index 7042b58a2f4b2..7c189b1f39fea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java @@ -28,11 +28,19 @@ public class StorageReport { private final long nonDfsUsed; private final long remaining; private final long blockPoolUsed; + private final String mount; public static final StorageReport[] EMPTY_ARRAY = {}; public StorageReport(DatanodeStorage storage, boolean failed, long capacity, long dfsUsed, long remaining, long bpUsed, long nonDfsUsed) { + this(storage, failed, capacity, dfsUsed, + remaining, bpUsed, nonDfsUsed, ""); + } + + public StorageReport(DatanodeStorage storage, boolean failed, long capacity, + long dfsUsed, long remaining, long bpUsed, + long nonDfsUsed, String mount) { this.storage = storage; this.failed = failed; this.capacity = capacity; @@ -40,6 +48,7 @@ public StorageReport(DatanodeStorage storage, boolean failed, long capacity, this.nonDfsUsed = nonDfsUsed; this.remaining = remaining; this.blockPoolUsed = bpUsed; + this.mount = mount; } public DatanodeStorage getStorage() { @@ -69,4 +78,8 @@ public long getRemaining() { public long getBlockPoolUsed() { return blockPoolUsed; } + + public String getMount() { + return mount; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 7c49c2335c268..77f42afddefa3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -158,6 +158,7 @@ message StorageReportProto { optional uint64 blockPoolUsed = 6 [ default = 0 ]; optional DatanodeStorageProto storage = 7; // supersedes StorageUuid optional uint64 nonDfsUsed = 8; + optional string mount = 9; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 07ab0aa3269ab..efc315cbaeafb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1503,6 +1503,26 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_PROTECTED_SUBDIRECTORIES_ENABLE_DEFAULT = false; + /** + * HDFS-15548 to allow DISK/ARCHIVE configured on the same disk mount. + * The default ratio will be applied if DISK/ARCHIVE are configured + * on same disk mount. + * + * Beware that capacity usage might be larger than 100% if there are already + * data blocks exist and the configured ratio is small, which will + * prevent the volume from taking new blocks until capacity is balanced out. + */ + public static final String DFS_DATANODE_ALLOW_SAME_DISK_TIERING = + "dfs.datanode.same-disk-tiering.enabled"; + public static final boolean DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT = + false; + + public static final String + DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE = + "dfs.datanode.reserve-for-archive.default.percentage"; + public static final double + DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE_DEFAULT = 0.0; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 3fa9b3ad51dd2..7742f374c518c 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -404,6 +404,7 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity, long totalBlockPoolUsed = 0; long totalDfsUsed = 0; long totalNonDfsUsed = 0; + Set visitedMount = new HashSet<>(); Set failedStorageInfos = null; // Decide if we should check for any missing StorageReport and mark it as @@ -472,7 +473,17 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity, totalRemaining += report.getRemaining(); totalBlockPoolUsed += report.getBlockPoolUsed(); totalDfsUsed += report.getDfsUsed(); - totalNonDfsUsed += report.getNonDfsUsed(); + String mount = report.getMount(); + // For volumes on the same mount, + // ignore duplicated volumes for nonDfsUsed. + if (mount == null || mount.isEmpty()) { + totalNonDfsUsed += report.getNonDfsUsed(); + } else { + if (!visitedMount.contains(mount)) { + totalNonDfsUsed += report.getNonDfsUsed(); + visitedMount.add(mount); + } + } } // Update total metrics for the node. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index de898e93432f9..2914789fe7c7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -171,7 +171,9 @@ public StorageReport[] getStorageReports(String bpid) volume.getDfsUsed(), volume.getAvailable(), volume.getBlockPoolUsed(bpid), - volume.getNonDfsUsed()); + volume.getNonDfsUsed(), + volume.getMount() + ); reports.add(sr); } catch (ClosedChannelException e) { continue; @@ -190,6 +192,10 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) { } } + MountVolumeMap getMountVolumeMap() { + return volumes.getMountVolumeMap(); + } + @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { @@ -365,7 +371,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), - blockChooserImpl); + blockChooserImpl, conf); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf); deletingBlock = new HashMap>(); @@ -464,12 +470,27 @@ private void activateVolume( LOG.error(errorMsg); throw new IOException(errorMsg); } + // Check if there is same storage type on the mount. + // Only useful when same disk tiering is turned on. + FsVolumeImpl volumeImpl = (FsVolumeImpl) ref.getVolume(); + FsVolumeReference checkRef = volumes + .getMountVolumeMap() + .getVolumeRefByMountAndStorageType( + volumeImpl.getMount(), volumeImpl.getStorageType()); + if (checkRef != null) { + final String errorMsg = String.format( + "Storage type %s already exists on same mount: %s.", + volumeImpl.getStorageType(), volumeImpl.getMount()); + checkRef.close(); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } volumeMap.mergeAll(replicaMap); storageMap.put(sd.getStorageUuid(), new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); - asyncDiskService.addVolume((FsVolumeImpl) ref.getVolume()); + asyncDiskService.addVolume(volumeImpl); volumes.addVolume(ref); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 5843c7d6696b7..d5900fa2566c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -134,6 +134,9 @@ public class FsVolumeImpl implements FsVolumeSpi { private final FileIoProvider fileIoProvider; private final DataNodeVolumeMetrics metrics; private URI baseURI; + private boolean enableSameDiskTiering; + private final String mount; + private double reservedForArchive; /** * Per-volume worker pool that processes new blocks to cache. @@ -190,6 +193,18 @@ public class FsVolumeImpl implements FsVolumeSpi { } this.conf = conf; this.fileIoProvider = fileIoProvider; + this.enableSameDiskTiering = + conf.getBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, + DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT); + if (enableSameDiskTiering && usage != null) { + this.mount = usage.getMount(); + } else { + mount = ""; + } + } + + String getMount() { + return mount; } protected ThreadPoolExecutor initializeCacheExecutor(File parent) { @@ -407,11 +422,15 @@ long getBlockPoolUsed(String bpid) throws IOException { * Return either the configured capacity of the file system if configured; or * the capacity of the file system excluding space reserved for non-HDFS. * + * When same-disk-tiering is turned on, the reported capacity + * will take reservedForArchive value into consideration of. + * * @return the unreserved number of bytes left in this filesystem. May be * zero. */ @VisibleForTesting public long getCapacity() { + long capacity; if (configuredCapacity < 0L) { long remaining; if (cachedCapacity > 0L) { @@ -419,9 +438,18 @@ public long getCapacity() { } else { remaining = usage.getCapacity() - getReserved(); } - return Math.max(remaining, 0L); + capacity = Math.max(remaining, 0L); + } else { + capacity = configuredCapacity; + } + + if (enableSameDiskTiering && dataset.getMountVolumeMap() != null) { + double capacityRatio = dataset.getMountVolumeMap() + .getCapacityRatioByMountAndStorageType(mount, storageType); + capacity = (long) (capacity * capacityRatio); } - return configuredCapacity; + + return capacity; } /** @@ -452,7 +480,34 @@ public long getAvailable() throws IOException { } long getActualNonDfsUsed() throws IOException { - return usage.getUsed() - getDfsUsed(); + // DISK and ARCHIVAL on same disk + // should share the same amount of reserved capacity. + // When calculating actual non dfs used, + // exclude DFS used capacity by another volume. + if (enableSameDiskTiering && + (storageType == StorageType.DISK + || storageType == StorageType.ARCHIVE)) { + StorageType counterpartStorageType = storageType == StorageType.DISK + ? StorageType.ARCHIVE : StorageType.DISK; + FsVolumeReference counterpartRef = dataset + .getMountVolumeMap() + .getVolumeRefByMountAndStorageType(mount, counterpartStorageType); + if (counterpartRef != null) { + FsVolumeImpl counterpartVol = (FsVolumeImpl) counterpartRef.getVolume(); + long used = getDfUsed() - getDfsUsed() - counterpartVol.getDfsUsed(); + counterpartRef.close(); + return used; + } + } + return getDfUsed() - getDfsUsed(); + } + + /** + * This function is only used for Mock. + */ + @VisibleForTesting + public long getDfUsed() { + return usage.getUsed(); } private long getRemainingReserved() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 049654b567dd5..2d6593df9bd04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -62,9 +63,13 @@ class FsVolumeList { private final VolumeChoosingPolicy blockChooser; private final BlockScanner blockScanner; + private final boolean enableSameDiskTiering; + private final MountVolumeMap mountVolumeMap; + FsVolumeList(List initialVolumeFailureInfos, BlockScanner blockScanner, - VolumeChoosingPolicy blockChooser) { + VolumeChoosingPolicy blockChooser, + Configuration config) { this.blockChooser = blockChooser; this.blockScanner = blockScanner; this.checkDirsLock = new AutoCloseableLock(); @@ -73,6 +78,14 @@ class FsVolumeList { volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), volumeFailureInfo); } + enableSameDiskTiering = config.getBoolean( + DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, + DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT); + mountVolumeMap = new MountVolumeMap(config); + } + + MountVolumeMap getMountVolumeMap() { + return mountVolumeMap; } /** @@ -291,6 +304,9 @@ public String toString() { void addVolume(FsVolumeReference ref) { FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume(); volumes.add(volume); + if (isSameDiskTieringApplied(volume)) { + mountVolumeMap.addVolume(volume); + } if (blockScanner != null) { blockScanner.addVolumeScanner(ref); } else { @@ -311,6 +327,9 @@ void addVolume(FsVolumeReference ref) { */ private void removeVolume(FsVolumeImpl target) { if (volumes.remove(target)) { + if (isSameDiskTieringApplied(target)) { + mountVolumeMap.removeVolume(target); + } if (blockScanner != null) { blockScanner.removeVolumeScanner(target); } @@ -331,6 +350,15 @@ private void removeVolume(FsVolumeImpl target) { } } + /** + * Check if same disk tiering is applied to the volume. + */ + private boolean isSameDiskTieringApplied(FsVolumeImpl target) { + return enableSameDiskTiering && + (target.getStorageType() == StorageType.DISK + || target.getStorageType() == StorageType.ARCHIVE); + } + /** * Dynamically remove volume in the list. * @param storageLocation {@link StorageLocation} of the volume to be removed. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MountVolumeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MountVolumeInfo.java new file mode 100644 index 0000000000000..660cae26e40a3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MountVolumeInfo.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; + +import java.nio.channels.ClosedChannelException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * MountVolumeInfo is a wrapper of + * detailed volume information for MountVolumeMap. + */ +@InterfaceAudience.Private +class MountVolumeInfo { + private final ConcurrentMap + storageTypeVolumeMap; + private double reservedForArchiveDefault; + + MountVolumeInfo(Configuration conf) { + storageTypeVolumeMap = new ConcurrentHashMap<>(); + reservedForArchiveDefault = conf.getDouble( + DFSConfigKeys.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, + DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE_DEFAULT); + if (reservedForArchiveDefault > 1) { + FsDatasetImpl.LOG.warn("Value of reserve-for-archival is > 100%." + + " Setting it to 100%."); + reservedForArchiveDefault = 1; + } + if (reservedForArchiveDefault < 0) { + FsDatasetImpl.LOG.warn("Value of reserve-for-archival is < 0." + + " Setting it to 0.0"); + reservedForArchiveDefault = 0; + } + } + + FsVolumeReference getVolumeRef(StorageType storageType) { + try { + FsVolumeImpl volumeImpl = storageTypeVolumeMap + .getOrDefault(storageType, null); + if (volumeImpl != null) { + return volumeImpl.obtainReference(); + } + } catch (ClosedChannelException e) { + FsDatasetImpl.LOG.warn("Volume closed when getting volume" + + " by storage type: " + storageType); + } + return null; + } + + /** + * Return configured capacity ratio. + * If the volume is the only one on the mount, + * return 1 to avoid unnecessary allocation. + * + * TODO: We should support customized capacity ratio for volumes. + */ + double getCapacityRatio(StorageType storageType) { + if (storageTypeVolumeMap.containsKey(storageType) + && storageTypeVolumeMap.size() > 1) { + if (storageType == StorageType.ARCHIVE) { + return reservedForArchiveDefault; + } else if (storageType == StorageType.DISK) { + return 1 - reservedForArchiveDefault; + } + } + return 1; + } + + /** + * Add a volume to the mapping. + * If there is already storage type exists on same mount, skip this volume. + */ + boolean addVolume(FsVolumeImpl volume) { + if (storageTypeVolumeMap.containsKey(volume.getStorageType())) { + FsDatasetImpl.LOG.error("Found storage type already exist." + + " Skipping for now. Please check disk configuration"); + return false; + } + storageTypeVolumeMap.put(volume.getStorageType(), volume); + return true; + } + + + void removeVolume(FsVolumeImpl target) { + storageTypeVolumeMap.remove(target.getStorageType()); + } + + int size() { + return storageTypeVolumeMap.size(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MountVolumeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MountVolumeMap.java new file mode 100644 index 0000000000000..6fe4d3a690a5d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MountVolumeMap.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * MountVolumeMap contains information of the relationship + * between underlying filesystem mount and datanode volumes. + * + * This is useful when configuring block tiering on same disk mount + * (HDFS-15548). For now, + * we don't configure multiple volumes with same storage type on one mount. + */ +@InterfaceAudience.Private +class MountVolumeMap { + private final ConcurrentMap + mountVolumeMapping; + private final Configuration conf; + + MountVolumeMap(Configuration conf) { + mountVolumeMapping = new ConcurrentHashMap<>(); + this.conf = conf; + } + + FsVolumeReference getVolumeRefByMountAndStorageType(String mount, + StorageType storageType) { + if (mountVolumeMapping.containsKey(mount)) { + return mountVolumeMapping + .get(mount).getVolumeRef(storageType); + } + return null; + } + + /** + * Return capacity ratio. + * If not exists, return 1 to use full capacity. + */ + double getCapacityRatioByMountAndStorageType(String mount, + StorageType storageType) { + if (mountVolumeMapping.containsKey(mount)) { + return mountVolumeMapping.get(mount).getCapacityRatio(storageType); + } + return 1; + } + + void addVolume(FsVolumeImpl volume) { + String mount = volume.getMount(); + if (!mount.isEmpty()) { + MountVolumeInfo info; + if (mountVolumeMapping.containsKey(mount)) { + info = mountVolumeMapping.get(mount); + } else { + info = new MountVolumeInfo(conf); + mountVolumeMapping.put(mount, info); + } + info.addVolume(volume); + } + } + + void removeVolume(FsVolumeImpl target) { + String mount = target.getMount(); + if (!mount.isEmpty()) { + MountVolumeInfo info = mountVolumeMapping.get(mount); + info.removeVolume(target); + if (info.size() == 0) { + mountVolumeMapping.remove(mount); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index db152e60bd65a..ab420c5068e19 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5975,4 +5975,29 @@ + + dfs.datanode.same-disk-tiering.enabled + false + + HDFS-15548 to allow DISK/ARCHIVE to be + configured on the same disk mount to manage disk IO. + When this is enabled, datanode will control the capacity + of DISK/ARCHIVE based on reserve-for-archive.percentage. + + + + + dfs.datanode.reserve-for-archive.default.percentage + 0.0 + + Default disk capacity ratio of ARCHIVE volume, + expected the value to be between 0 to 1. + This will be applied when DISK/ARCHIVE volumes are configured + on the same mount, which is detected by datanode. + Beware that capacity usage might be >100% if there are already + data blocks exist and the configured ratio is small, which will + prevent the volume from taking new blocks + until capacity is balanced out. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 626cceefea01e..fc4c07bd24583 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -163,6 +163,26 @@ private static void createStorageDirs(DataStorage storage, Configuration conf, when(storage.getNumStorageDirs()).thenReturn(numDirs); } + private static StorageLocation createStorageWithStorageType(String subDir, + StorageType storageType, Configuration conf, DataStorage storage, + DataNode dataNode) throws IOException { + String archiveStorageType = "[" + storageType + "]"; + String path = BASE_DIR + subDir; + new File(path).mkdirs(); + String pathUri = new Path(path).toUri().toString(); + StorageLocation loc = StorageLocation.parse(archiveStorageType + pathUri); + Storage.StorageDirectory sd = new Storage.StorageDirectory( + loc); + DataStorage.createStorageID(sd, false, conf); + + DataStorage.VolumeBuilder builder = + new DataStorage.VolumeBuilder(storage, sd); + when(storage.prepareVolume(eq(dataNode), eq(loc), + anyList())) + .thenReturn(builder); + return loc; + } + private int getNumVolumes() { try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { @@ -339,6 +359,57 @@ public void testAddVolumes() throws IOException { assertTrue(actualVolumes.containsAll(expectedVolumes)); } + // When turning on same disk tiering, + // we should prevent misconfig that + // volumes with same storage type created on same mount. + @Test + public void testAddVolumeWithSameDiskTiering() throws IOException { + datanode = mock(DataNode.class); + storage = mock(DataStorage.class); + this.conf = new Configuration(); + this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); + this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY, + replicaCacheRootDir); + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, + true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, + 0.5); + + when(datanode.getConf()).thenReturn(conf); + final DNConf dnConf = new DNConf(datanode); + when(datanode.getDnConf()).thenReturn(dnConf); + final BlockScanner disabledBlockScanner = new BlockScanner(datanode); + when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); + final ShortCircuitRegistry shortCircuitRegistry = + new ShortCircuitRegistry(conf); + when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry); + + createStorageDirs(storage, conf, 1); + dataset = new FsDatasetImpl(datanode, storage, conf); + + List nsInfos = Lists.newArrayList(); + for (String bpid : BLOCK_POOL_IDS) { + nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); + } + dataset.addVolume( + createStorageWithStorageType("archive1", + StorageType.ARCHIVE, conf, storage, datanode), nsInfos); + assertEquals(2, dataset.getVolumeCount()); + + // Add second ARCHIVAL volume should fail fsDataSetImpl. + try { + dataset.addVolume( + createStorageWithStorageType("archive2", + StorageType.ARCHIVE, conf, storage, datanode), nsInfos); + fail("Should throw exception for" + + " same storage type already exists on same mount."); + } catch (IOException e) { + assertTrue(e.getMessage() + .startsWith("Storage type ARCHIVE already exists on same mount:")); + } + } + @Test public void testAddVolumeWithSameStorageUuid() throws IOException { HdfsConfiguration config = new HdfsConfiguration(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index ab74b8db28c23..41109cf4b6c63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -88,7 +88,8 @@ public void setUp() { @Test(timeout=30000) public void testGetNextVolumeWithClosedVolume() throws IOException { FsVolumeList volumeList = new FsVolumeList( - Collections.emptyList(), blockScanner, blockChooser); + Collections.emptyList(), + blockScanner, blockChooser, conf); final List volumes = new ArrayList<>(); for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "nextvolume-" + i); @@ -131,7 +132,7 @@ public Boolean get() { @Test(timeout=30000) public void testReleaseVolumeRefIfNoBlockScanner() throws IOException { FsVolumeList volumeList = new FsVolumeList( - Collections.emptyList(), null, blockChooser); + Collections.emptyList(), null, blockChooser, conf); File volDir = new File(baseDir, "volume-0"); volDir.mkdirs(); FsVolumeImpl volume = new FsVolumeImplBuilder() @@ -452,4 +453,145 @@ public void testGetCachedVolumeCapacity() throws IOException { conf.setBoolean(DFSConfigKeys.DFS_DATANODE_FIXED_VOLUME_SIZE_KEY, DFSConfigKeys.DFS_DATANODE_FIXED_VOLUME_SIZE_DEFAULT); } + + // Test basics with same disk archival turned on. + @Test + public void testGetVolumeWithSameDiskArchival() throws Exception { + File diskVolDir = new File(baseDir, "volume-disk"); + File archivalVolDir = new File(baseDir, "volume-archival"); + diskVolDir.mkdirs(); + archivalVolDir.mkdirs(); + double reservedForArchival = 0.75; + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, + true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, + reservedForArchival); + FsVolumeImpl diskVolume = new FsVolumeImplBuilder() + .setConf(conf) + .setDataset(dataset) + .setStorageID("storage-id") + .setStorageDirectory( + new StorageDirectory( + StorageLocation.parse(diskVolDir.getPath()))) + .build(); + FsVolumeImpl archivalVolume = new FsVolumeImplBuilder() + .setConf(conf) + .setDataset(dataset) + .setStorageID("storage-id") + .setStorageDirectory( + new StorageDirectory(StorageLocation + .parse("[ARCHIVE]" + archivalVolDir.getPath()))) + .build(); + FsVolumeList volumeList = new FsVolumeList( + Collections.emptyList(), + blockScanner, blockChooser, conf); + volumeList.addVolume(archivalVolume.obtainReference()); + volumeList.addVolume(diskVolume.obtainReference()); + + assertEquals(diskVolume.getMount(), archivalVolume.getMount()); + String device = diskVolume.getMount(); + + // 1) getVolumeRef should return correct reference. + assertEquals(diskVolume, + volumeList.getMountVolumeMap() + .getVolumeRefByMountAndStorageType( + device, StorageType.DISK).getVolume()); + assertEquals(archivalVolume, + volumeList.getMountVolumeMap() + .getVolumeRefByMountAndStorageType( + device, StorageType.ARCHIVE).getVolume()); + + // 2) removeVolume should work as expected + volumeList.removeVolume(diskVolume.getStorageLocation(), true); + assertNull(volumeList.getMountVolumeMap() + .getVolumeRefByMountAndStorageType( + device, StorageType.DISK)); + assertEquals(archivalVolume, volumeList.getMountVolumeMap() + .getVolumeRefByMountAndStorageType( + device, StorageType.ARCHIVE).getVolume()); + } + + // Test dfs stats with same disk archival + @Test + public void testDfsUsageStatWithSameDiskArchival() throws Exception { + File diskVolDir = new File(baseDir, "volume-disk"); + File archivalVolDir = new File(baseDir, "volume-archival"); + diskVolDir.mkdirs(); + archivalVolDir.mkdirs(); + + long dfCapacity = 1100L; + double reservedForArchival = 0.75; + // Disk and Archive shares same du Reserved. + long duReserved = 100L; + long diskDfsUsage = 100L; + long archivalDfsUsage = 200L; + long dfUsage = 700L; + long dfAvailable = 300L; + + // Set up DISK and ARCHIVAL and capacity. + conf.setLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, duReserved); + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, + true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, + reservedForArchival); + FsVolumeImpl diskVolume = new FsVolumeImplBuilder() + .setConf(conf) + .setDataset(dataset) + .setStorageID("storage-id") + .setStorageDirectory( + new StorageDirectory(StorageLocation.parse(diskVolDir.getPath()))) + .build(); + FsVolumeImpl archivalVolume = new FsVolumeImplBuilder() + .setConf(conf) + .setDataset(dataset) + .setStorageID("storage-id") + .setStorageDirectory( + new StorageDirectory( + StorageLocation.parse("[ARCHIVE]" + archivalVolDir.getPath()))) + .build(); + FsVolumeImpl spyDiskVolume = Mockito.spy(diskVolume); + FsVolumeImpl spyArchivalVolume = Mockito.spy(archivalVolume); + long testDfCapacity = dfCapacity - duReserved; + spyDiskVolume.setCapacityForTesting(testDfCapacity); + spyArchivalVolume.setCapacityForTesting(testDfCapacity); + Mockito.doReturn(dfAvailable).when(spyDiskVolume).getDfAvailable(); + Mockito.doReturn(dfAvailable).when(spyArchivalVolume).getDfAvailable(); + + MountVolumeMap mountVolumeMap = new MountVolumeMap(conf); + mountVolumeMap.addVolume(spyDiskVolume); + mountVolumeMap.addVolume(spyArchivalVolume); + Mockito.doReturn(mountVolumeMap).when(dataset).getMountVolumeMap(); + + // 1) getCapacity() should reflect configured archive storage percentage. + long diskStorageTypeCapacity = + (long) ((dfCapacity - duReserved) * (1 - reservedForArchival)); + assertEquals(diskStorageTypeCapacity, spyDiskVolume.getCapacity()); + long archiveStorageTypeCapacity = + (long) ((dfCapacity - duReserved) * (reservedForArchival)); + assertEquals(archiveStorageTypeCapacity, spyArchivalVolume.getCapacity()); + + // 2) getActualNonDfsUsed() should count in both DISK and ARCHIVE. + // expectedActualNonDfsUsage = + // diskUsage - archivalDfsUsage - diskDfsUsage + long expectedActualNonDfsUsage = 400L; + Mockito.doReturn(diskDfsUsage) + .when(spyDiskVolume).getDfsUsed(); + Mockito.doReturn(archivalDfsUsage) + .when(spyArchivalVolume).getDfsUsed(); + Mockito.doReturn(dfUsage) + .when(spyDiskVolume).getDfUsed(); + Mockito.doReturn(dfUsage) + .when(spyArchivalVolume).getDfUsed(); + assertEquals(expectedActualNonDfsUsage, + spyDiskVolume.getActualNonDfsUsed()); + assertEquals(expectedActualNonDfsUsage, + spyArchivalVolume.getActualNonDfsUsed()); + + // 3) When there is only one volume on a disk mount, + // we allocate the full disk capacity regardless of the default ratio. + mountVolumeMap.removeVolume(spyArchivalVolume); + assertEquals(dfCapacity - duReserved, spyDiskVolume.getCapacity()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java index c3abc12bf91c0..a7155b5703cf7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.fs.StorageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -185,6 +186,58 @@ public void testVolumeSize() throws Exception { (namesystem.getCapacityUsed() + namesystem.getCapacityRemaining() + namesystem.getNonDfsUsedSpace() + fileCount * fs .getDefaultBlockSize()) - configCapacity < 1 * 1024); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * We split the disk to DISK/ARCHIVE volumes and test if NN gets correct stat. + */ + @Test + public void testVolumeSizeWithSameDiskTiering() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + + // Set aside fifth of the total capacity as reserved + long reserved = 10000; + conf.setLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, reserved); + + try { + double reserveForAchive = 0.3; + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, + true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, + reserveForAchive); + cluster = new MiniDFSCluster.Builder(conf).storageTypes( + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}).build(); + cluster.waitActive(); + + final FsDatasetTestUtils utils = cluster.getFsDatasetTestUtils(0); + + long configCapacity = cluster.getNamesystem().getCapacityTotal(); + + // Disk capacity should be just the raw capacity + // as two volumes shares the capacity. + long rawCapacity = utils.getRawCapacity(); + long diskCapacity = (long) ((rawCapacity - reserved) * reserveForAchive) + + (long) ((rawCapacity - reserved) * (1 - reserveForAchive)) + + reserved; + + // Ensure reserved should not be double counted. + assertEquals(configCapacity, diskCapacity - reserved); + + DataNode dn = cluster.getDataNodes().get(0); + // Ensure nonDfsUsed is not double counted. + long singleVolumeUsed = dn.getFSDataset() + .getStorageReports(cluster.getNamesystem().getBlockPoolId())[0] + .getNonDfsUsed(); + cluster.triggerHeartbeats(); + assertTrue(cluster.getNamesystem().getCapacityUsed() + < singleVolumeUsed * 2); } finally { if (cluster != null) {