Skip to content

Commit e001f8e

Browse files
authored
HADOOP-17814. Provide fallbacks for identity/cost providers and backoff enable (#3230)
Reviewed-by: Wei-Chiu Chuang <[email protected]> Signed-off-by: Takanobu Asanuma <[email protected]>
1 parent f2b6c03 commit e001f8e

File tree

5 files changed

+154
-2
lines changed

5 files changed

+154
-2
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,18 @@ private CostProvider parseCostProvider(String ns, Configuration conf) {
283283
ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
284284
CostProvider.class);
285285

286+
if (providers.size() < 1) {
287+
String[] nsPort = ns.split("\\.");
288+
if (nsPort.length == 2) {
289+
// Only if ns is split with ".", we can separate namespace and port.
290+
// In the absence of "ipc.<port>.cost-provider.impl" property,
291+
// we look up "ipc.cost-provider.impl" property.
292+
providers = conf.getInstances(
293+
nsPort[0] + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
294+
CostProvider.class);
295+
}
296+
}
297+
286298
if (providers.size() < 1) {
287299
LOG.info("CostProvider not specified, defaulting to DefaultCostProvider");
288300
return new DefaultCostProvider();
@@ -303,6 +315,18 @@ private IdentityProvider parseIdentityProvider(String ns,
303315
ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
304316
IdentityProvider.class);
305317

318+
if (providers.size() < 1) {
319+
String[] nsPort = ns.split("\\.");
320+
if (nsPort.length == 2) {
321+
// Only if ns is split with ".", we can separate namespace and port.
322+
// In the absence of "ipc.<port>.identity-provider.impl" property,
323+
// we look up "ipc.identity-provider.impl" property.
324+
providers = conf.getInstances(
325+
nsPort[0] + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
326+
IdentityProvider.class);
327+
}
328+
}
329+
306330
if (providers.size() < 1) {
307331
LOG.info("IdentityProvider not specified, " +
308332
"defaulting to UserIdentityProvider");

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -831,12 +831,14 @@ public synchronized void refreshCallQueue(Configuration conf) {
831831
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
832832
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
833833
maxQueueSize, prefix, conf);
834-
callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf));
834+
callQueue.setClientBackoffEnabled(getClientBackoffEnable(
835+
CommonConfigurationKeys.IPC_NAMESPACE, port, conf));
835836
}
836837

837838
/**
838839
* Get from config if client backoff is enabled on that port.
839840
*/
841+
@Deprecated
840842
static boolean getClientBackoffEnable(
841843
String prefix, Configuration conf) {
842844
String name = prefix + "." +
@@ -845,6 +847,32 @@ static boolean getClientBackoffEnable(
845847
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
846848
}
847849

850+
/**
851+
* Return boolean value configured by property 'ipc.<port>.backoff.enable'
852+
* if it is present. If the config is not present, default config
853+
* (without port) is used to derive class i.e 'ipc.backoff.enable',
854+
* and derived value is returned if configured. Otherwise, default value
855+
* {@link CommonConfigurationKeys#IPC_BACKOFF_ENABLE_DEFAULT} is returned.
856+
*
857+
* @param namespace Namespace "ipc".
858+
* @param port Server's listener port.
859+
* @param conf Configuration properties.
860+
* @return Value returned based on configuration.
861+
*/
862+
static boolean getClientBackoffEnable(
863+
String namespace, int port, Configuration conf) {
864+
String name = namespace + "." + port + "." +
865+
CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
866+
boolean valueWithPort = conf.getBoolean(name,
867+
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
868+
if (valueWithPort != CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT) {
869+
return valueWithPort;
870+
}
871+
return conf.getBoolean(namespace + "."
872+
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE,
873+
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
874+
}
875+
848876
/** A generic call queued for handling. */
849877
public static class Call implements Schedulable,
850878
PrivilegedExceptionAction<Void> {
@@ -3184,7 +3212,8 @@ protected Server(String bindAddress, int port,
31843212
this.callQueue = new CallQueueManager<>(
31853213
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
31863214
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
3187-
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
3215+
getClientBackoffEnable(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
3216+
maxQueueSize, prefix, conf);
31883217

31893218
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
31903219
this.authorize =

hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2504,6 +2504,17 @@
25042504
</description>
25052505
</property>
25062506

2507+
<property>
2508+
<name>ipc.backoff.enable</name>
2509+
<value>false</value>
2510+
<description>
2511+
This property is used as fallback property in case
2512+
"ipc.[port_number].backoff.enable" is not defined.
2513+
It determines whether or not to enable client backoff when
2514+
a queue is full.
2515+
</description>
2516+
</property>
2517+
25072518
<property>
25082519
<name>ipc.[port_number].callqueue.impl</name>
25092520
<value>java.util.concurrent.LinkedBlockingQueue</value>
@@ -2586,6 +2597,17 @@
25862597
</description>
25872598
</property>
25882599

2600+
<property>
2601+
<name>ipc.identity-provider.impl</name>
2602+
<value>org.apache.hadoop.ipc.UserIdentityProvider</value>
2603+
<description>
2604+
This property is used as fallback property in case
2605+
"ipc.[port_number].identity-provider.impl" is not defined.
2606+
The identity provider mapping user requests to their identity.
2607+
This property applies to DecayRpcScheduler.
2608+
</description>
2609+
</property>
2610+
25892611
<property>
25902612
<name>ipc.[port_number].cost-provider.impl</name>
25912613
<value>org.apache.hadoop.ipc.DefaultCostProvider</value>
@@ -2596,6 +2618,19 @@
25962618
</description>
25972619
</property>
25982620

2621+
<property>
2622+
<name>ipc.cost-provider.impl</name>
2623+
<value>org.apache.hadoop.ipc.DefaultCostProvider</value>
2624+
<description>
2625+
This property is used as fallback property in case
2626+
"ipc.[port_number].cost-provider.impl" is not defined.
2627+
The cost provider mapping user requests to their cost. To
2628+
enable determination of cost based on processing time, use
2629+
org.apache.hadoop.ipc.WeightedTimeCostProvider.
2630+
This property applies to DecayRpcScheduler.
2631+
</description>
2632+
</property>
2633+
25992634
<property>
26002635
<name>ipc.[port_number].decay-scheduler.period-ms</name>
26012636
<value>5000</value>

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public void initializeMemberVariables() {
156156

157157
// FairCallQueue configs that includes dynamic ports in its keys
158158
xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable");
159+
xmlPropsToSkipCompare.add("ipc.backoff.enable");
159160
xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.impl");
160161
xmlPropsToSkipCompare.add("ipc.callqueue.impl");
161162
xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.impl");
@@ -164,7 +165,9 @@ public void initializeMemberVariables() {
164165
xmlPropsToSkipCompare.add(
165166
"ipc.[port_number].faircallqueue.multiplexer.weights");
166167
xmlPropsToSkipCompare.add("ipc.[port_number].identity-provider.impl");
168+
xmlPropsToSkipCompare.add("ipc.identity-provider.impl");
167169
xmlPropsToSkipCompare.add("ipc.[port_number].cost-provider.impl");
170+
xmlPropsToSkipCompare.add("ipc.cost-provider.impl");
168171
xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.period-ms");
169172
xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.decay-factor");
170173
xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.thresholds");

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,29 @@ private Schedulable mockCall(String id) {
5555
return mockCall;
5656
}
5757

58+
private static class TestIdentityProvider implements IdentityProvider {
59+
public String makeIdentity(Schedulable obj) {
60+
UserGroupInformation ugi = obj.getUserGroupInformation();
61+
if (ugi == null) {
62+
return null;
63+
}
64+
return ugi.getShortUserName();
65+
}
66+
}
67+
68+
private static class TestCostProvider implements CostProvider {
69+
70+
@Override
71+
public void init(String namespace, Configuration conf) {
72+
// No-op
73+
}
74+
75+
@Override
76+
public long getCost(ProcessingDetails details) {
77+
return 1;
78+
}
79+
}
80+
5881
private DecayRpcScheduler scheduler;
5982

6083
@Test(expected=IllegalArgumentException.class)
@@ -83,6 +106,44 @@ public void testParsePeriod() {
83106
assertEquals(1058L, scheduler.getDecayPeriodMillis());
84107
}
85108

109+
@Test
110+
@SuppressWarnings("deprecation")
111+
public void testParsePeriodWithPortLessIdentityProvider() {
112+
// By default
113+
scheduler = new DecayRpcScheduler(1, "ipc.50", new Configuration());
114+
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
115+
scheduler.getDecayPeriodMillis());
116+
117+
// Custom
118+
Configuration conf = new Configuration();
119+
conf.setLong("ipc.51." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
120+
1058);
121+
conf.unset("ipc.51." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY);
122+
conf.set("ipc." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
123+
"org.apache.hadoop.ipc.TestDecayRpcScheduler$TestIdentityProvider");
124+
scheduler = new DecayRpcScheduler(1, "ipc.51", conf);
125+
assertEquals(1058L, scheduler.getDecayPeriodMillis());
126+
}
127+
128+
@Test
129+
@SuppressWarnings("deprecation")
130+
public void testParsePeriodWithPortLessCostProvider() {
131+
// By default
132+
scheduler = new DecayRpcScheduler(1, "ipc.52", new Configuration());
133+
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
134+
scheduler.getDecayPeriodMillis());
135+
136+
// Custom
137+
Configuration conf = new Configuration();
138+
conf.setLong("ipc.52." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
139+
1058);
140+
conf.unset("ipc.52." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY);
141+
conf.set("ipc." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
142+
"org.apache.hadoop.ipc.TestDecayRpcScheduler$TestCostProvider");
143+
scheduler = new DecayRpcScheduler(1, "ipc.52", conf);
144+
assertEquals(1058L, scheduler.getDecayPeriodMillis());
145+
}
146+
86147
@Test
87148
@SuppressWarnings("deprecation")
88149
public void testParseFactor() {

0 commit comments

Comments
 (0)