Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,7 @@ public static StorageReport convert(StorageReportProto p) {
return new StorageReport(p.hasStorage() ? convert(p.getStorage())
: new DatanodeStorage(p.getStorageUuid()), p.getFailed(),
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed(), nonDfsUsed);
p.getBlockPoolUsed(), nonDfsUsed, p.getMount());
}

public static DatanodeStorage convert(DatanodeStorageProto s) {
Expand Down Expand Up @@ -2688,7 +2688,8 @@ public static StorageReportProto convert(StorageReport r) {
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
.setStorageUuid(r.getStorage().getStorageID())
.setStorage(convert(r.getStorage()))
.setNonDfsUsed(r.getNonDfsUsed());
.setNonDfsUsed(r.getNonDfsUsed())
.setMount(r.getMount());
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,27 @@ public class StorageReport {
private final long nonDfsUsed;
private final long remaining;
private final long blockPoolUsed;
private final String mount;

public static final StorageReport[] EMPTY_ARRAY = {};

public StorageReport(DatanodeStorage storage, boolean failed, long capacity,
long dfsUsed, long remaining, long bpUsed, long nonDfsUsed) {
this(storage, failed, capacity, dfsUsed,
remaining, bpUsed, nonDfsUsed, "");
}

public StorageReport(DatanodeStorage storage, boolean failed, long capacity,
long dfsUsed, long remaining, long bpUsed,
long nonDfsUsed, String mount) {
this.storage = storage;
this.failed = failed;
this.capacity = capacity;
this.dfsUsed = dfsUsed;
this.nonDfsUsed = nonDfsUsed;
this.remaining = remaining;
this.blockPoolUsed = bpUsed;
this.mount = mount;
}

public DatanodeStorage getStorage() {
Expand Down Expand Up @@ -69,4 +78,8 @@ public long getRemaining() {
public long getBlockPoolUsed() {
return blockPoolUsed;
}

public String getMount() {
return mount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ message StorageReportProto {
optional uint64 blockPoolUsed = 6 [ default = 0 ];
optional DatanodeStorageProto storage = 7; // supersedes StorageUuid
optional uint64 nonDfsUsed = 8;
optional string mount = 9;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,26 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_PROTECTED_SUBDIRECTORIES_ENABLE_DEFAULT =
false;

/**
* HDFS-15548 to allow DISK/ARCHIVE configured on the same disk mount.
* The default ratio will be applied if DISK/ARCHIVE are configured
* on same disk mount.
*
* Beware that capacity usage might be larger than 100% if there are already
* data blocks exist and the configured ratio is small, which will
* prevent the volume from taking new blocks until capacity is balanced out.
*/
public static final String DFS_DATANODE_ALLOW_SAME_DISK_TIERING =
"dfs.datanode.same-disk-tiering.enabled";
public static final boolean DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT =
false;

public static final String
DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE =
"dfs.datanode.reserve-for-archive.default.percentage";
public static final double
DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE_DEFAULT = 0.0;

// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
long totalBlockPoolUsed = 0;
long totalDfsUsed = 0;
long totalNonDfsUsed = 0;
Set<String> visitedMount = new HashSet<>();
Set<DatanodeStorageInfo> failedStorageInfos = null;

// Decide if we should check for any missing StorageReport and mark it as
Expand Down Expand Up @@ -472,7 +473,17 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
totalRemaining += report.getRemaining();
totalBlockPoolUsed += report.getBlockPoolUsed();
totalDfsUsed += report.getDfsUsed();
totalNonDfsUsed += report.getNonDfsUsed();
String mount = report.getMount();
// For volumes on the same mount,
// ignore duplicated volumes for nonDfsUsed.
if (mount == null || mount.isEmpty()) {
totalNonDfsUsed += report.getNonDfsUsed();
} else {
if (!visitedMount.contains(mount)) {
totalNonDfsUsed += report.getNonDfsUsed();
visitedMount.add(mount);
}
}
}

// Update total metrics for the node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ public StorageReport[] getStorageReports(String bpid)
volume.getDfsUsed(),
volume.getAvailable(),
volume.getBlockPoolUsed(bpid),
volume.getNonDfsUsed());
volume.getNonDfsUsed(),
volume.getMount()
);
reports.add(sr);
} catch (ClosedChannelException e) {
continue;
Expand All @@ -190,6 +192,10 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) {
}
}

MountVolumeMap getMountVolumeMap() {
return volumes.getMountVolumeMap();
}

@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
Expand Down Expand Up @@ -365,7 +371,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
blockChooserImpl);
blockChooserImpl, conf);
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf);
deletingBlock = new HashMap<String, Set<Long>>();
Expand Down Expand Up @@ -464,12 +470,27 @@ private void activateVolume(
LOG.error(errorMsg);
throw new IOException(errorMsg);
}
// Check if there is same storage type on the mount.
// Only useful when same disk tiering is turned on.
FsVolumeImpl volumeImpl = (FsVolumeImpl) ref.getVolume();
FsVolumeReference checkRef = volumes
.getMountVolumeMap()
.getVolumeRefByMountAndStorageType(
volumeImpl.getMount(), volumeImpl.getStorageType());
if (checkRef != null) {
final String errorMsg = String.format(
"Storage type %s already exists on same mount: %s.",
volumeImpl.getStorageType(), volumeImpl.getMount());
checkRef.close();
LOG.error(errorMsg);
throw new IOException(errorMsg);
}
volumeMap.mergeAll(replicaMap);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume((FsVolumeImpl) ref.getVolume());
asyncDiskService.addVolume(volumeImpl);
volumes.addVolume(ref);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
private final FileIoProvider fileIoProvider;
private final DataNodeVolumeMetrics metrics;
private URI baseURI;
private boolean enableSameDiskTiering;
private final String mount;
private double reservedForArchive;

/**
* Per-volume worker pool that processes new blocks to cache.
Expand Down Expand Up @@ -190,6 +193,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
this.conf = conf;
this.fileIoProvider = fileIoProvider;
this.enableSameDiskTiering =
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT);
if (enableSameDiskTiering && usage != null) {
this.mount = usage.getMount();
} else {
mount = "";
}
}

String getMount() {
return mount;
}

protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
Expand Down Expand Up @@ -407,21 +422,34 @@ long getBlockPoolUsed(String bpid) throws IOException {
* Return either the configured capacity of the file system if configured; or
* the capacity of the file system excluding space reserved for non-HDFS.
*
* When same-disk-tiering is turned on, the reported capacity
* will take reservedForArchive value into consideration of.
*
* @return the unreserved number of bytes left in this filesystem. May be
* zero.
*/
@VisibleForTesting
public long getCapacity() {
long capacity;
if (configuredCapacity < 0L) {
long remaining;
if (cachedCapacity > 0L) {
remaining = cachedCapacity - getReserved();
} else {
remaining = usage.getCapacity() - getReserved();
}
return Math.max(remaining, 0L);
capacity = Math.max(remaining, 0L);
} else {
capacity = configuredCapacity;
}

if (enableSameDiskTiering && dataset.getMountVolumeMap() != null) {
double capacityRatio = dataset.getMountVolumeMap()
.getCapacityRatioByMountAndStorageType(mount, storageType);
capacity = (long) (capacity * capacityRatio);
}
return configuredCapacity;

return capacity;
}

/**
Expand Down Expand Up @@ -452,7 +480,34 @@ public long getAvailable() throws IOException {
}

long getActualNonDfsUsed() throws IOException {
return usage.getUsed() - getDfsUsed();
// DISK and ARCHIVAL on same disk
// should share the same amount of reserved capacity.
// When calculating actual non dfs used,
// exclude DFS used capacity by another volume.
if (enableSameDiskTiering &&
(storageType == StorageType.DISK
|| storageType == StorageType.ARCHIVE)) {
StorageType counterpartStorageType = storageType == StorageType.DISK
? StorageType.ARCHIVE : StorageType.DISK;
FsVolumeReference counterpartRef = dataset
.getMountVolumeMap()
.getVolumeRefByMountAndStorageType(mount, counterpartStorageType);
if (counterpartRef != null) {
FsVolumeImpl counterpartVol = (FsVolumeImpl) counterpartRef.getVolume();
long used = getDfUsed() - getDfsUsed() - counterpartVol.getDfsUsed();
counterpartRef.close();
return used;
}
}
return getDfUsed() - getDfsUsed();
}

/**
* This function is only used for Mock.
*/
@VisibleForTesting
public long getDfUsed() {
return usage.getUsed();
}

private long getRemainingReserved() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
Expand Down Expand Up @@ -62,9 +63,13 @@ class FsVolumeList {
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
private final BlockScanner blockScanner;

private final boolean enableSameDiskTiering;
private final MountVolumeMap mountVolumeMap;

FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos,
BlockScanner blockScanner,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
VolumeChoosingPolicy<FsVolumeImpl> blockChooser,
Configuration config) {
this.blockChooser = blockChooser;
this.blockScanner = blockScanner;
this.checkDirsLock = new AutoCloseableLock();
Expand All @@ -73,6 +78,14 @@ class FsVolumeList {
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
volumeFailureInfo);
}
enableSameDiskTiering = config.getBoolean(
DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT);
mountVolumeMap = new MountVolumeMap(config);
}

MountVolumeMap getMountVolumeMap() {
return mountVolumeMap;
}

/**
Expand Down Expand Up @@ -291,6 +304,9 @@ public String toString() {
void addVolume(FsVolumeReference ref) {
FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume();
volumes.add(volume);
if (isSameDiskTieringApplied(volume)) {
mountVolumeMap.addVolume(volume);
}
if (blockScanner != null) {
blockScanner.addVolumeScanner(ref);
} else {
Expand All @@ -311,6 +327,9 @@ void addVolume(FsVolumeReference ref) {
*/
private void removeVolume(FsVolumeImpl target) {
if (volumes.remove(target)) {
if (isSameDiskTieringApplied(target)) {
mountVolumeMap.removeVolume(target);
}
if (blockScanner != null) {
blockScanner.removeVolumeScanner(target);
}
Expand All @@ -331,6 +350,15 @@ private void removeVolume(FsVolumeImpl target) {
}
}

/**
* Check if same disk tiering is applied to the volume.
*/
private boolean isSameDiskTieringApplied(FsVolumeImpl target) {
return enableSameDiskTiering &&
(target.getStorageType() == StorageType.DISK
|| target.getStorageType() == StorageType.ARCHIVE);
}

/**
* Dynamically remove volume in the list.
* @param storageLocation {@link StorageLocation} of the volume to be removed.
Expand Down
Loading