5757import java .util .concurrent .ThreadFactory ;
5858import java .util .concurrent .ThreadPoolExecutor ;
5959import java .util .concurrent .TimeUnit ;
60+ import java .util .concurrent .atomic .LongAccumulator ;
6061import java .util .concurrent .atomic .LongAdder ;
6162import java .util .regex .Matcher ;
6263import java .util .regex .Pattern ;
8687import org .apache .hadoop .net .NetUtils ;
8788import org .apache .hadoop .security .UserGroupInformation ;
8889import org .apache .hadoop .util .StringUtils ;
90+ import org .apache .hadoop .util .Time ;
8991import org .eclipse .jetty .util .ajax .JSON ;
9092import org .slf4j .Logger ;
9193import org .slf4j .LoggerFactory ;
@@ -136,6 +138,14 @@ public class RouterRpcClient {
136138 private final boolean observerReadEnabledDefault ;
137139 /** Nameservice specific overrides of the default setting for enabling observer reads. */
138140 private HashSet <String > observerReadEnabledOverrides = new HashSet <>();
141+ /**
142+ * Period to refresh namespace stateID using active namenode.
143+ * This ensures the namespace stateID is fresh even when an
144+ * observer is trailing behind.
145+ */
146+ private long activeNNStateIdRefreshPeriodMs ;
147+ /** Last msync times for each namespace. */
148+ private final ConcurrentHashMap <String , LongAccumulator > lastActiveNNRefreshTimes ;
139149
140150 /** Pattern to parse a stack trace line. */
141151 private static final Pattern STACK_TRACE_PATTERN =
@@ -211,13 +221,25 @@ public RouterRpcClient(Configuration conf, Router router,
211221 this .observerReadEnabledDefault = conf .getBoolean (
212222 RBFConfigKeys .DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY ,
213223 RBFConfigKeys .DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE );
214- String [] observerReadOverrides = conf .getStrings (RBFConfigKeys .DFS_ROUTER_OBSERVER_READ_OVERRIDES );
224+ String [] observerReadOverrides =
225+ conf .getStrings (RBFConfigKeys .DFS_ROUTER_OBSERVER_READ_OVERRIDES );
215226 if (observerReadOverrides != null ) {
216227 observerReadEnabledOverrides .addAll (Arrays .asList (observerReadOverrides ));
217228 }
218229 if (this .observerReadEnabledDefault ) {
219230 LOG .info ("Observer read is enabled for router." );
220231 }
232+ this .activeNNStateIdRefreshPeriodMs = conf .getTimeDuration (
233+ RBFConfigKeys .DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY ,
234+ RBFConfigKeys .DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_DEFAULT ,
235+ TimeUnit .SECONDS , TimeUnit .MILLISECONDS );
236+ if (activeNNStateIdRefreshPeriodMs < 0 ) {
237+ LOG .info ("Periodic stateId freshness check is disabled"
238+ + " since '{}' is {}ms, which is less than 0." ,
239+ RBFConfigKeys .DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY ,
240+ activeNNStateIdRefreshPeriodMs );
241+ }
242+ this .lastActiveNNRefreshTimes = new ConcurrentHashMap <>();
221243 }
222244
223245 /**
@@ -1707,10 +1729,13 @@ private List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsI
17071729 boolean isObserverRead ) throws IOException {
17081730 final List <? extends FederationNamenodeContext > namenodes ;
17091731
1710- if (RouterStateIdContext .getClientStateIdFromCurrentCall (nsId ) > Long .MIN_VALUE ) {
1711- namenodes = namenodeResolver .getNamenodesForNameserviceId (nsId , isObserverRead );
1712- } else {
1713- namenodes = namenodeResolver .getNamenodesForNameserviceId (nsId , false );
1732+ boolean listObserverNamenodesFirst = isObserverRead
1733+ && isNamespaceStateIdFresh (nsId )
1734+ && (RouterStateIdContext .getClientStateIdFromCurrentCall (nsId ) > Long .MIN_VALUE );
1735+ namenodes = namenodeResolver .getNamenodesForNameserviceId (nsId , listObserverNamenodesFirst );
1736+ if (!listObserverNamenodesFirst ) {
1737+ // Refresh time of last call to active NameNode.
1738+ getTimeOfLastCallToActive (nsId ).accumulate (Time .monotonicNow ());
17141739 }
17151740
17161741 if (namenodes == null || namenodes .isEmpty ()) {
@@ -1721,7 +1746,8 @@ private List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsI
17211746 }
17221747
17231748 private boolean isObserverReadEligible (String nsId , Method method ) {
1724- boolean isReadEnabledForNamespace = observerReadEnabledDefault != observerReadEnabledOverrides .contains (nsId );
1749+ boolean isReadEnabledForNamespace =
1750+ observerReadEnabledDefault != observerReadEnabledOverrides .contains (nsId );
17251751 return isReadEnabledForNamespace && isReadCall (method );
17261752 }
17271753
@@ -1735,4 +1761,24 @@ private static boolean isReadCall(Method method) {
17351761 }
17361762 return !method .getAnnotationsByType (ReadOnly .class )[0 ].activeOnly ();
17371763 }
1764+
1765+ /**
1766+ * Checks and sets last refresh time for a namespace's stateId.
1767+ * Returns true if refresh time is newer than threshold.
1768+ * Otherwise, return false and call should be handled by active namenode.
1769+ * @param nsId namespaceID
1770+ */
1771+ @ VisibleForTesting
1772+ boolean isNamespaceStateIdFresh (String nsId ) {
1773+ if (activeNNStateIdRefreshPeriodMs < 0 ) {
1774+ return true ;
1775+ }
1776+ long timeSinceRefreshMs = Time .monotonicNow () - getTimeOfLastCallToActive (nsId ).get ();
1777+ return (timeSinceRefreshMs <= activeNNStateIdRefreshPeriodMs );
1778+ }
1779+
1780+ private LongAccumulator getTimeOfLastCallToActive (String namespaceId ) {
1781+ return lastActiveNNRefreshTimes
1782+ .computeIfAbsent (namespaceId , key -> new LongAccumulator (Math ::max , 0 ));
1783+ }
17381784}
0 commit comments