Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public boolean isRunning() {
@VisibleForTesting
@SuppressFBWarnings(value="NN_NAKED_NOTIFY",
justification="Used only for testing")
synchronized void processContainersNow() {
public synchronized void processContainersNow() {
notify();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ private ContainerPlacementPolicyFactory() {

public static ContainerPlacementPolicy getPolicy(Configuration conf,
final NodeManager nodeManager, NetworkTopology clusterMap,
final boolean fallback) throws SCMException{
final boolean fallback, SCMContainerPlacementMetrics metrics)
throws SCMException{
final Class<? extends ContainerPlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
ContainerPlacementPolicy.class);
Constructor<? extends ContainerPlacementPolicy> constructor;
try {
constructor = placementClass.getDeclaredConstructor(NodeManager.class,
Configuration.class, NetworkTopology.class, boolean.class);
Configuration.class, NetworkTopology.class, boolean.class,
SCMContainerPlacementMetrics.class);
LOG.info("Create container placement policy of type " +
placementClass.getCanonicalName());
} catch (NoSuchMethodException e) {
Expand All @@ -64,7 +66,8 @@ public static ContainerPlacementPolicy getPolicy(Configuration conf,
}

try {
return constructor.newInstance(nodeManager, conf, clusterMap, fallback);
return constructor.newInstance(nodeManager, conf, clusterMap, fallback,
metrics);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate class " +
placementClass.getCanonicalName() + " for " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
*/
public SCMContainerPlacementCapacity(final NodeManager nodeManager,
final Configuration conf, final NetworkTopology networkTopology,
final boolean fallback) {
final boolean fallback, final SCMContainerPlacementMetrics metrics) {
super(nodeManager, conf);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.placement.algorithms;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;

/**
* This class is for maintaining Topology aware container placement statistics.
*/
@Metrics(about="SCM Container Placement Metrics", context = "ozone")
public class SCMContainerPlacementMetrics implements MetricsSource {
public static final String SOURCE_NAME =
SCMContainerPlacementMetrics.class.getSimpleName();
private static final MetricsInfo RECORD_INFO = Interns.info(SOURCE_NAME,
"SCM Placement Metrics");
private static MetricsRegistry registry;

// total datanode allocation request count
@Metric private MutableCounterLong datanodeRequestCount;
// datanode allocation attempt count, including success, fallback and failed
@Metric private MutableCounterLong datanodeChooseAttemptCount;
// datanode successful allocation count
@Metric private MutableCounterLong datanodeChooseSuccessCount;
// datanode allocated with some allocation constrains compromised
@Metric private MutableCounterLong datanodeChooseFallbackCount;

public SCMContainerPlacementMetrics() {
}

public static SCMContainerPlacementMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
registry = new MetricsRegistry(RECORD_INFO);
return ms.register(SOURCE_NAME, "SCM Placement Metrics",
new SCMContainerPlacementMetrics());
}

public void incrDatanodeRequestCount(long count) {
System.out.println("request + 1");
this.datanodeRequestCount.incr(count);
}

public void incrDatanodeChooseSuccessCount() {
System.out.println("success + 1");
this.datanodeChooseSuccessCount.incr(1);
}

public void incrDatanodeChooseFallbackCount() {
System.out.println("fallback + 1");
this.datanodeChooseFallbackCount.incr(1);
}

public void incrDatanodeChooseAttemptCount() {
System.out.println("attempt + 1");
this.datanodeChooseAttemptCount.incr(1);
}

public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}

@VisibleForTesting
public long getDatanodeRequestCount() {
return this.datanodeRequestCount.value();
}

@VisibleForTesting
public long getDatanodeChooseSuccessCount() {
return this.datanodeChooseSuccessCount.value();
}

@VisibleForTesting
public long getDatanodeChooseFallbackCount() {
return this.datanodeChooseFallbackCount.value();
}

@VisibleForTesting
public long getDatanodeChooseAttemptCount() {
return this.datanodeChooseAttemptCount.value();
}

@Override
public void getMetrics(MetricsCollector collector, boolean all) {
registry.snapshot(collector.addRecord(registry.info().name()), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
private boolean fallback;
private static final int RACK_LEVEL = 1;
private static final int MAX_RETRY= 3;
private final SCMContainerPlacementMetrics metrics;

/**
* Constructs a Container Placement with rack awareness.
Expand All @@ -66,10 +67,11 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
*/
public SCMContainerPlacementRackAware(final NodeManager nodeManager,
final Configuration conf, final NetworkTopology networkTopology,
final boolean fallback) {
final boolean fallback, final SCMContainerPlacementMetrics metrics) {
super(nodeManager, conf);
this.networkTopology = networkTopology;
this.fallback = fallback;
this.metrics = metrics;
}

/**
Expand All @@ -93,7 +95,7 @@ public List<DatanodeDetails> chooseDatanodes(
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
int nodesRequired, final long sizeRequired) throws SCMException {
Preconditions.checkArgument(nodesRequired > 0);

metrics.incrDatanodeRequestCount(nodesRequired);
int datanodeCount = networkTopology.getNumOfLeafNode(NetConstants.ROOT);
int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
if (datanodeCount < nodesRequired + excludedNodesCount) {
Expand Down Expand Up @@ -241,16 +243,19 @@ private Node chooseNode(List<Node> excludedNodes, Node affinityNode,
int ancestorGen = RACK_LEVEL;
int maxRetry = MAX_RETRY;
List<Node> excludedNodesForCapacity = null;
boolean isFallbacked = false;
while(true) {
Node node = networkTopology.chooseRandom(NetConstants.ROOT, null,
excludedNodes, affinityNode, ancestorGen);
metrics.incrDatanodeChooseAttemptCount();
if (node == null) {
// cannot find the node which meets all constrains
LOG.warn("Failed to find the datanode. excludedNodes:" +
(excludedNodes == null ? "" : excludedNodes.toString()) +
", affinityNode:" +
(affinityNode == null ? "" : affinityNode.getNetworkFullPath()));
if (fallback) {
isFallbacked = true;
// fallback, don't consider the affinity node
if (affinityNode != null) {
affinityNode = null;
Expand All @@ -267,11 +272,15 @@ private Node chooseNode(List<Node> excludedNodes, Node affinityNode,
" excludedNodes and affinityNode constrains.", null);
}
if (hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
LOG.debug("Datanode {} is chosen. Required size is {}",
LOG.warn("Datanode {} is chosen. Required size is {}",
node.toString(), sizeRequired);
if (excludedNodes != null && excludedNodesForCapacity != null) {
excludedNodes.removeAll(excludedNodesForCapacity);
}
metrics.incrDatanodeChooseSuccessCount();
if (isFallbacked) {
metrics.incrDatanodeChooseFallbackCount();
}
return node;
} else {
maxRetry--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
*/
public SCMContainerPlacementRandom(final NodeManager nodeManager,
final Configuration conf, final NetworkTopology networkTopology,
final boolean fallback) {
final boolean fallback, final SCMContainerPlacementMetrics metrics) {
super(nodeManager, conf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.safemode.SafeModeHandler;
Expand Down Expand Up @@ -387,9 +389,11 @@ private void initializeSystemManagers(OzoneConfiguration conf,
conf, scmStorageConfig, eventQueue, clusterMap);
}

SCMContainerPlacementMetrics placementMetrics =
SCMContainerPlacementMetrics.create();
ContainerPlacementPolicy containerPlacementPolicy =
ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager,
clusterMap, true);
clusterMap, true, placementMetrics);

if (configurator.getPipelineManager() != null) {
pipelineManager = configurator.getPipelineManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void testRackAwarePolicy() throws IOException {
.thenReturn(new SCMNodeMetric(storageCapacity, 70L, 30L));

ContainerPlacementPolicy policy = ContainerPlacementPolicyFactory
.getPolicy(conf, nodeManager, cluster, true);
.getPolicy(conf, nodeManager, cluster, true,
SCMContainerPlacementMetrics.create());

int nodeNum = 3;
List<DatanodeDetails> datanodeDetails =
Expand All @@ -117,7 +118,7 @@ public void testRackAwarePolicy() throws IOException {
@Test
public void testDefaultPolicy() throws IOException {
ContainerPlacementPolicy policy = ContainerPlacementPolicyFactory
.getPolicy(conf, null, null, true);
.getPolicy(conf, null, null, true, null);
Assert.assertSame(SCMContainerPlacementRandom.class, policy.getClass());
}

Expand All @@ -138,14 +139,14 @@ public void testConstuctorNotFound() throws SCMException {
// set a placement class which does't have the right constructor implemented
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
DummyImpl.class.getName());
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true);
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true, null);
}

@Test(expected = RuntimeException.class)
public void testClassNotImplemented() throws SCMException {
// set a placement class not implemented
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
"org.apache.hadoop.hdds.scm.container.placement.algorithm.HelloWorld");
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true);
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void chooseDatanodes() throws SCMException {
.thenReturn(new SCMNodeMetric(100L, 70L, 30L));

SCMContainerPlacementCapacity scmContainerPlacementRandom =
new SCMContainerPlacementCapacity(mockNodeManager, conf, null, true);
new SCMContainerPlacementCapacity(mockNodeManager, conf, null, true,
null);

List<DatanodeDetails> existingNodes = new ArrayList<>();
existingNodes.add(datanodes.get(0));
Expand Down
Loading