-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HDFS-15548. Allow configuring DISK/ARCHIVE storage types on same device mount #2288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
f6467d2
c95742f
e73ac9d
74a4a82
551a908
7d61d22
55031be
db0fb34
db2129e
0441fdf
9d08923
b2708f1
cc9e623
9f2250e
1e91d78
46d01b7
8ef9e0c
95aedec
b177dc6
57c52d6
4381dec
33f0006
21441d0
e456c2b
97f7c03
2dd5209
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -134,6 +134,9 @@ public class FsVolumeImpl implements FsVolumeSpi { | |
| private final FileIoProvider fileIoProvider; | ||
| private final DataNodeVolumeMetrics metrics; | ||
| private URI baseURI; | ||
| private boolean enableSameDiskArchival; | ||
| private final String device; | ||
|
||
| private double reservedForArchive; | ||
|
|
||
| /** | ||
| * Per-volume worker pool that processes new blocks to cache. | ||
|
|
@@ -190,6 +193,26 @@ public class FsVolumeImpl implements FsVolumeSpi { | |
| } | ||
| this.conf = conf; | ||
| this.fileIoProvider = fileIoProvider; | ||
| this.enableSameDiskArchival = | ||
| conf.getBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, | ||
| DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT); | ||
| if (enableSameDiskArchival) { | ||
| this.device = usage.getMount(); | ||
| reservedForArchive = conf.getDouble( | ||
|
||
| DFSConfigKeys.DFS_DATANODE_RESERVE_FOR_ARCHIVE_PERCENTAGE, | ||
| DFSConfigKeys.DFS_DATANODE_RESERVE_FOR_ARCHIVE_PERCENTAGE_DEFAULT); | ||
| if (reservedForArchive >= 1) { | ||
| FsDatasetImpl.LOG.warn("Value of reserve-for-archival is >= 100% for " | ||
| + currentDir + ". Setting it to 99%."); | ||
| reservedForArchive = 0.99; | ||
|
||
| } | ||
| } else { | ||
| device = ""; | ||
| } | ||
| } | ||
|
|
||
| String getDevice() { | ||
| return device; | ||
| } | ||
|
|
||
| protected ThreadPoolExecutor initializeCacheExecutor(File parent) { | ||
|
|
@@ -412,16 +435,31 @@ long getBlockPoolUsed(String bpid) throws IOException { | |
| */ | ||
| @VisibleForTesting | ||
| public long getCapacity() { | ||
| long capacity; | ||
| if (configuredCapacity < 0L) { | ||
| long remaining; | ||
| if (cachedCapacity > 0L) { | ||
| remaining = cachedCapacity - getReserved(); | ||
| } else { | ||
| remaining = usage.getCapacity() - getReserved(); | ||
| } | ||
| return Math.max(remaining, 0L); | ||
| capacity = Math.max(remaining, 0L); | ||
| } else { | ||
| capacity = configuredCapacity; | ||
| } | ||
|
|
||
| if (enableSameDiskArchival) { | ||
|
||
| double reservedForArchival = conf.getDouble( | ||
LeonGao91 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| DFSConfigKeys.DFS_DATANODE_RESERVE_FOR_ARCHIVE_PERCENTAGE, | ||
| DFSConfigKeys.DFS_DATANODE_RESERVE_FOR_ARCHIVE_PERCENTAGE_DEFAULT); | ||
| if (storageType == StorageType.ARCHIVE) { | ||
| capacity = (long) (capacity * reservedForArchival); | ||
| } else { | ||
| capacity = (long) (capacity * (1 - reservedForArchival)); | ||
| } | ||
| } | ||
| return configuredCapacity; | ||
|
|
||
| return capacity; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -452,7 +490,33 @@ public long getAvailable() throws IOException { | |
| } | ||
|
|
||
| long getActualNonDfsUsed() throws IOException { | ||
| return usage.getUsed() - getDfsUsed(); | ||
| // DISK and ARCHIVAL on same disk | ||
LeonGao91 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // should share the same amount of reserved capacity. | ||
| // When calculating actual non dfs used, | ||
| // exclude DFS used capacity by another volume. | ||
| if (enableSameDiskArchival && | ||
| (storageType == StorageType.DISK | ||
| || storageType == StorageType.ARCHIVE)) { | ||
| StorageType counterpartStorageType = storageType == StorageType.DISK | ||
| ? StorageType.ARCHIVE : StorageType.DISK; | ||
| FsVolumeReference counterpartRef = dataset | ||
| .getVolumeRef(device, counterpartStorageType); | ||
| if (counterpartRef != null) { | ||
| FsVolumeImpl counterpartVol = (FsVolumeImpl) counterpartRef.getVolume(); | ||
| long used = getDfUsed() - getDfsUsed() - counterpartVol.getDfsUsed(); | ||
| counterpartRef.close(); | ||
| return used; | ||
| } | ||
| } | ||
| return getDfUsed() - getDfsUsed(); | ||
| } | ||
|
|
||
| /** | ||
| * This function is only used for Mock. | ||
| */ | ||
| @VisibleForTesting | ||
| public long getDfUsed() { | ||
| return usage.getUsed(); | ||
| } | ||
|
|
||
| private long getRemainingReserved() throws IOException { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,12 +29,14 @@ | |
| import java.util.Map; | ||
| import java.util.TreeMap; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.CopyOnWriteArrayList; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.locks.Condition; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.StorageType; | ||
| import org.apache.hadoop.hdfs.DFSConfigKeys; | ||
| import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; | ||
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; | ||
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; | ||
|
|
@@ -62,9 +64,14 @@ class FsVolumeList { | |
| private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser; | ||
| private final BlockScanner blockScanner; | ||
|
|
||
| private boolean enableSameDiskTiering; | ||
LeonGao91 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private ConcurrentMap<String, Map<StorageType, FsVolumeImpl>> | ||
| deviceVolumeMapping; | ||
|
|
||
| FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos, | ||
| BlockScanner blockScanner, | ||
| VolumeChoosingPolicy<FsVolumeImpl> blockChooser) { | ||
| VolumeChoosingPolicy<FsVolumeImpl> blockChooser, | ||
| Configuration config) { | ||
| this.blockChooser = blockChooser; | ||
| this.blockScanner = blockScanner; | ||
| this.checkDirsLock = new AutoCloseableLock(); | ||
|
|
@@ -73,6 +80,12 @@ class FsVolumeList { | |
| volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), | ||
| volumeFailureInfo); | ||
| } | ||
| enableSameDiskTiering = config.getBoolean( | ||
| DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, | ||
| DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT); | ||
| if (enableSameDiskTiering) { | ||
| deviceVolumeMapping = new ConcurrentHashMap<>(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -82,6 +95,29 @@ List<FsVolumeImpl> getVolumes() { | |
| return Collections.unmodifiableList(volumes); | ||
| } | ||
|
|
||
| /** | ||
| * Get vol by device and storage type. | ||
| * This is used when same-disk-tiering is enabled. | ||
| */ | ||
| FsVolumeReference getVolumeRefByDeviceAndStorageType(String device, | ||
| StorageType storageType) { | ||
| if (deviceVolumeMapping != null | ||
| && deviceVolumeMapping.containsKey(device)) { | ||
| try { | ||
| FsVolumeImpl volume = deviceVolumeMapping | ||
| .get(device).getOrDefault(storageType, null); | ||
| if (volume != null) { | ||
| return volume.obtainReference(); | ||
| } | ||
| } catch (ClosedChannelException e) { | ||
| FsDatasetImpl.LOG.warn("Volume closed when getting volume" + | ||
| " by device and storage type: " | ||
| + device + ", " + storageType); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, | ||
| long blockSize, String storageId) throws IOException { | ||
| while (true) { | ||
|
|
@@ -291,6 +327,23 @@ public String toString() { | |
| void addVolume(FsVolumeReference ref) { | ||
| FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume(); | ||
| volumes.add(volume); | ||
| if (enableSameDiskTiering && | ||
|
||
| (volume.getStorageType() == StorageType.DISK | ||
| || volume.getStorageType() == StorageType.ARCHIVE)) { | ||
| String device = volume.getDevice(); | ||
| if (!device.isEmpty()) { | ||
| Map<StorageType, FsVolumeImpl> storageTypeMap = | ||
| deviceVolumeMapping | ||
| .getOrDefault(device, new ConcurrentHashMap<>()); | ||
| if (storageTypeMap.containsKey(volume.getStorageType())) { | ||
| FsDatasetImpl.LOG.error("Found storage type already exist." + | ||
| " Skipping for now. Please check disk configuration"); | ||
| } else { | ||
| storageTypeMap.put(volume.getStorageType(), volume); | ||
| deviceVolumeMapping.put(device, storageTypeMap); | ||
| } | ||
| } | ||
| } | ||
| if (blockScanner != null) { | ||
| blockScanner.addVolumeScanner(ref); | ||
| } else { | ||
|
|
@@ -311,6 +364,18 @@ void addVolume(FsVolumeReference ref) { | |
| */ | ||
| private void removeVolume(FsVolumeImpl target) { | ||
| if (volumes.remove(target)) { | ||
| if (enableSameDiskTiering && | ||
| (target.getStorageType() == StorageType.DISK | ||
| || target.getStorageType() == StorageType.ARCHIVE)) { | ||
| String device = target.getDevice(); | ||
| if (!device.isEmpty()) { | ||
| Map storageTypeMap = deviceVolumeMapping.get(device); | ||
| storageTypeMap.remove(target.getStorageType()); | ||
| if (storageTypeMap.isEmpty()) { | ||
| deviceVolumeMapping.remove(device); | ||
| } | ||
| } | ||
| } | ||
| if (blockScanner != null) { | ||
| blockScanner.removeVolumeScanner(target); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take another look at the patch, I think it may be better to have the percentage as a tag added to the configuration "dfs.datanode.data.dir", just following the storage type tag. In this way on the same datanode we can have different percentage settings for different mount points. What do you think?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention is to have a configuration as a "default value" for all disks, as in normal cases one datanode server comes with the same type of HDDs. Therefore we can keep the DN configuration less verbose for most of the use cases.
However, you are right that we should allow users to configure different values, and it is a good idea to put it under "dfs.datanode.data.dir".
I will create a follow-up JIRA to address it, so we can keep this PR from being too big, as that could involve quite some change.