3030import java .util .Comparator ;
3131import java .util .stream .Collectors ;
3232
33- import org .apache .commons .lang3 .NotImplementedException ;
3433import org .apache .curator .framework .recipes .shared .SharedCount ;
3534import org .apache .curator .framework .recipes .shared .VersionedValue ;
3635import org .apache .hadoop .classification .VisibleForTesting ;
4342import org .apache .hadoop .yarn .federation .proto .YarnServerFederationProtos .SubClusterIdProto ;
4443import org .apache .hadoop .yarn .federation .proto .YarnServerFederationProtos .SubClusterInfoProto ;
4544import org .apache .hadoop .yarn .federation .proto .YarnServerFederationProtos .SubClusterPolicyConfigurationProto ;
45+ import org .apache .hadoop .yarn .proto .YarnServerCommonProtos .VersionProto ;
4646import org .apache .hadoop .yarn .security .client .YARNDelegationTokenIdentifier ;
4747import org .apache .hadoop .yarn .server .federation .store .FederationStateStore ;
48+ import org .apache .hadoop .yarn .server .federation .store .exception .FederationStateVersionIncompatibleException ;
4849import org .apache .hadoop .yarn .server .federation .store .records .AddApplicationHomeSubClusterRequest ;
4950import org .apache .hadoop .yarn .server .federation .store .records .AddApplicationHomeSubClusterResponse ;
5051import org .apache .hadoop .yarn .server .federation .store .records .ApplicationHomeSubCluster ;
104105import org .apache .hadoop .yarn .server .federation .store .utils .FederationRouterRMTokenInputValidator ;
105106import org .apache .hadoop .yarn .server .records .Version ;
106107import org .apache .hadoop .yarn .api .records .ReservationId ;
108+ import org .apache .hadoop .yarn .server .records .impl .pb .VersionPBImpl ;
107109import org .apache .hadoop .yarn .util .Clock ;
108110import org .apache .hadoop .yarn .util .Records ;
109111import org .apache .hadoop .yarn .util .SystemClock ;
@@ -154,6 +156,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
154156 private final static String ROOT_ZNODE_NAME_POLICY = "policies" ;
155157 private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation" ;
156158
159+ protected static final String ROOT_ZNODE_NAME_VERSION = "version" ;
160+
157161 /** Store Delegation Token Node. */
158162 private final static String ROUTER_RM_DT_SECRET_MANAGER_ROOT = "router_rm_dt_secret_manager_root" ;
159163 private static final String ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
@@ -184,6 +188,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
184188 private String membershipZNode ;
185189 private String policiesZNode ;
186190 private String reservationsZNode ;
191+ private String versionNode ;
187192 private int maxAppsInStateStore ;
188193
189194 /** Directory to store the delegation token data. **/
@@ -195,6 +200,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
195200
196201 private volatile Clock clock = SystemClock .getInstance ();
197202
203+ protected static final Version CURRENT_VERSION_INFO = Version .newInstance (1 , 1 );
204+
198205 @ VisibleForTesting
199206 private ZKFederationStateStoreOpDurations opDurations =
200207 ZKFederationStateStoreOpDurations .getInstance ();
@@ -223,6 +230,7 @@ public void init(Configuration conf) throws YarnException {
223230 appsZNode = getNodePath (baseZNode , ROOT_ZNODE_NAME_APPLICATION );
224231 policiesZNode = getNodePath (baseZNode , ROOT_ZNODE_NAME_POLICY );
225232 reservationsZNode = getNodePath (baseZNode , ROOT_ZNODE_NAME_RESERVATION );
233+ versionNode = getNodePath (baseZNode , ROOT_ZNODE_NAME_VERSION );
226234
227235 // delegation token znodes
228236 routerRMDTSecretManagerRoot = getNodePath (baseZNode , ROUTER_RM_DT_SECRET_MANAGER_ROOT );
@@ -245,6 +253,7 @@ public void init(Configuration conf) throws YarnException {
245253 zkManager .createRootDirRecursively (routerRMDTSecretManagerRoot , zkAcl );
246254 zkManager .createRootDirRecursively (routerRMDTMasterKeysRootPath , zkAcl );
247255 zkManager .createRootDirRecursively (routerRMDelegationTokensRootPath , zkAcl );
256+ zkManager .createRootDirRecursively (versionNode , zkAcl );
248257 } catch (Exception e ) {
249258 String errMsg = "Cannot create base directories: " + e .getMessage ();
250259 FederationStateStoreUtils .logAndThrowStoreException (LOG , errMsg );
@@ -643,22 +652,49 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
643652
644653 @ Override
645654 public Version getCurrentVersion () {
646- throw new NotImplementedException ( "Code is not implemented" ) ;
655+ return CURRENT_VERSION_INFO ;
647656 }
648657
649658 @ Override
650659 public Version loadVersion () throws Exception {
651- throw new NotImplementedException ("Code is not implemented" );
660+ if (exists (versionNode )) {
661+ byte [] data = get (versionNode );
662+ if (data != null ) {
663+ return new VersionPBImpl (VersionProto .parseFrom (data ));
664+ }
665+ }
666+ return null ;
652667 }
653668
654669 @ Override
655670 public void storeVersion () throws Exception {
656- throw new NotImplementedException ("Code is not implemented" );
671+ byte [] data = ((VersionPBImpl ) CURRENT_VERSION_INFO ).getProto ().toByteArray ();
672+ boolean isUpdate = exists (versionNode );
673+ put (versionNode , data , isUpdate );
657674 }
658675
659676 @ Override
660677 public void checkVersion () throws Exception {
661- throw new NotImplementedException ("Code is not implemented" );
678+ Version loadedVersion = loadVersion ();
679+ LOG .info ("Loaded Router State Version Info = {}." , loadedVersion );
680+ Version currentVersion = getCurrentVersion ();
681+ if (loadedVersion != null && loadedVersion .equals (currentVersion )) {
682+ return ;
683+ }
684+
685+ // if there is no version info, treat it as CURRENT_VERSION_INFO;
686+ if (loadedVersion == null ) {
687+ loadedVersion = currentVersion ;
688+ }
689+
690+ if (loadedVersion .isCompatibleTo (currentVersion )) {
691+ LOG .info ("Storing Router State Version Info {}." , currentVersion );
692+ storeVersion ();
693+ } else {
694+ throw new FederationStateVersionIncompatibleException (
695+ "Expecting Router state version " + currentVersion +
696+ ", but loading version " + loadedVersion );
697+ }
662698 }
663699
664700 /**
0 commit comments