Skip to content

Commit c6aa4f6

Browse files
omalleyGitHub AE
authored andcommitted
LIHADOOP-67839: Address resilancy when StateStore gets exceptions. (apache#19)
Addresses 'Cannot locate a registered namenode for XXX' errors.
1 parent b873f59 commit c6aa4f6

File tree

5 files changed

+166
-8
lines changed

5 files changed

+166
-8
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,13 @@ public boolean loadCache(boolean force) {
123123
// Our cache depends on the store, update it first
124124
try {
125125
MembershipStore membership = getMembershipStore();
126-
membership.loadCache(force);
126+
if (!membership.loadCache(force)) {
127+
return false;
128+
}
127129
DisabledNameserviceStore disabled = getDisabledNameserviceStore();
128-
disabled.loadCache(force);
130+
if (!disabled.loadCache(force)) {
131+
return false;
132+
}
129133
} catch (IOException e) {
130134
LOG.error("Cannot update membership from the State Store", e);
131135
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,9 @@ public boolean loadCache(boolean force) {
398398
try {
399399
// Our cache depends on the store, update it first
400400
MountTableStore mountTable = this.getMountTableStore();
401-
mountTable.loadCache(force);
401+
if (!mountTable.loadCache(force)) {
402+
return false;
403+
}
402404

403405
GetMountTableEntriesRequest request =
404406
GetMountTableEntriesRequest.newInstance("/");

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ protected CachedRecordStore(
100100
* @throws StateStoreUnavailableException If the cache is not initialized.
101101
*/
102102
private void checkCacheAvailable() throws StateStoreUnavailableException {
103-
if (!this.initialized) {
103+
if (!getDriver().isDriverReady() || !this.initialized) {
104104
throw new StateStoreUnavailableException(
105105
"Cached State Store not initialized, " +
106106
getRecordClass().getSimpleName() + " records not valid");
@@ -125,7 +125,6 @@ public boolean loadCache(boolean force) throws IOException {
125125
} catch (IOException e) {
126126
LOG.error("Cannot get \"{}\" records from the State Store",
127127
getRecordClass().getSimpleName());
128-
this.initialized = false;
129128
return false;
130129
}
131130

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ public NamenodeHeartbeatResponse namenodeHeartbeat(
185185

186186
@Override
187187
public boolean loadCache(boolean force) throws IOException {
188-
super.loadCache(force);
188+
if (!super.loadCache(force)) {
189+
return false;
190+
}
189191

190192
// Update local cache atomically
191193
cacheWriteLock.lock();

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java

Lines changed: 153 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -20,9 +20,23 @@
2020
import static org.junit.Assert.assertEquals;
2121

2222
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.Iterator;
26+
import java.util.List;
27+
import java.util.Map;
2328

29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
31+
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
32+
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
33+
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
2434
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
35+
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
36+
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
37+
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
2538
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
39+
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreBaseImpl;
2640
import org.junit.Test;
2741

2842
/**
@@ -40,7 +54,7 @@ public class TestRouterState {
4054
private static final RouterServiceState STATE = RouterServiceState.RUNNING;
4155

4256

43-
private RouterState generateRecord() throws IOException {
57+
private RouterState generateRecord() {
4458
RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE);
4559
record.setVersion(VERSION);
4660
record.setCompileInfo(COMPILE_INFO);
@@ -82,4 +96,141 @@ public void testSerialization() throws IOException {
8296

8397
validateRecord(newRecord);
8498
}
99+
100+
/**
101+
* A mock StateStoreDriver that runs in memory and can cause errors.
102+
*/
103+
public static class MockStateStoreDriver extends StateStoreBaseImpl {
104+
boolean giveErrors = false;
105+
boolean initialized = false;
106+
Map<String, Map<String, BaseRecord>> valueMap = new HashMap<>();
107+
108+
@Override
109+
public boolean initDriver() {
110+
initialized = true;
111+
return true;
112+
}
113+
114+
@Override
115+
public <T extends BaseRecord> boolean initRecordStorage(String className,
116+
Class<T> clazz) {
117+
return true;
118+
}
119+
120+
@Override
121+
public boolean isDriverReady() {
122+
return initialized;
123+
}
124+
125+
@Override
126+
public void close() throws Exception {
127+
valueMap.clear();
128+
initialized = false;
129+
}
130+
131+
private void checkErrors() throws IOException {
132+
if (giveErrors) {
133+
throw new IOException("Induced errors");
134+
}
135+
}
136+
137+
@Override
138+
@SuppressWarnings({"rawtypes", "unchecked"})
139+
public <T extends BaseRecord> QueryResult get(Class<T> clazz) throws IOException {
140+
checkErrors();
141+
Map<String, BaseRecord> map = valueMap.get(StateStoreUtils.getRecordName(clazz));
142+
List<BaseRecord> results = map != null
143+
? new ArrayList<>(map.values()) : new ArrayList<>();
144+
return new QueryResult<>(results, System.currentTimeMillis());
145+
}
146+
147+
@Override
148+
public <T extends BaseRecord> boolean putAll(List<T> records,
149+
boolean allowUpdate,
150+
boolean errorIfExists)
151+
throws IOException {
152+
checkErrors();
153+
for (T record: records) {
154+
Map<String, BaseRecord> map =
155+
valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()),
156+
k -> new HashMap<>());
157+
String key = record.getPrimaryKey();
158+
BaseRecord oldRecord = map.get(key);
159+
if (oldRecord == null || allowUpdate) {
160+
map.put(key, record);
161+
} else if (errorIfExists) {
162+
throw new IOException("Record already exists for " + record.getClass()
163+
+ ": " + key);
164+
}
165+
}
166+
return true;
167+
}
168+
169+
@Override
170+
public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
171+
checkErrors();
172+
valueMap.remove(StateStoreUtils.getRecordName(clazz));
173+
return true;
174+
}
175+
176+
@Override
177+
@SuppressWarnings("unchecked")
178+
public <T extends BaseRecord> int remove(Class<T> clazz,
179+
Query<T> query)
180+
throws IOException {
181+
checkErrors();
182+
int result = 0;
183+
Map<String, BaseRecord> map =
184+
valueMap.get(StateStoreUtils.getRecordName(clazz));
185+
if (map != null) {
186+
for (Iterator<BaseRecord> itr = map.values().iterator(); itr.hasNext(); ) {
187+
BaseRecord record = itr.next();
188+
if (query.matches((T) record)) {
189+
itr.remove();
190+
result += 1;
191+
}
192+
}
193+
}
194+
return result;
195+
}
196+
}
197+
198+
@Test
199+
public void testStateStoreResilience() throws Exception {
200+
StateStoreService service = new StateStoreService();
201+
Configuration conf = new Configuration();
202+
conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
203+
MockStateStoreDriver.class,
204+
StateStoreDriver.class);
205+
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
206+
service.init(conf);
207+
MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver();
208+
// Add two records for block1
209+
driver.put(MembershipState.newInstance("routerId", "ns1",
210+
"ns1-ha1", "cluster1", "block1", "rpc1",
211+
"service1", "lifeline1", "https", "nn01",
212+
FederationNamenodeServiceState.ACTIVE, false), false, false);
213+
driver.put(MembershipState.newInstance("routerId", "ns1",
214+
"ns1-ha2", "cluster1", "block1", "rpc2",
215+
"service2", "lifeline2", "https", "nn02",
216+
FederationNamenodeServiceState.STANDBY, false), false, false);
217+
// load the cache
218+
service.loadDriver();
219+
MembershipNamenodeResolver resolver = new MembershipNamenodeResolver(conf, service);
220+
service.refreshCaches(true);
221+
222+
// look up block1
223+
List<? extends FederationNamenodeContext> result =
224+
resolver.getNamenodesForBlockPoolId("block1");
225+
assertEquals(2, result.size());
226+
227+
// cause io errors and then reload the cache
228+
driver.giveErrors = true;
229+
service.refreshCaches(true);
230+
231+
// make sure the old cache is still there
232+
result = resolver.getNamenodesForBlockPoolId("block1");
233+
assertEquals(2, result.size());
234+
service.stop();
235+
}
85236
}

0 commit comments

Comments
 (0)