2020
2121import static org .apache .hadoop .fs .CreateFlag .CREATE ;
2222import static org .apache .hadoop .fs .CreateFlag .OVERWRITE ;
23+ import static org .apache .hadoop .yarn .conf .YarnConfiguration .numaAwarenessEnabled ;
2324
2425import org .apache .hadoop .classification .VisibleForTesting ;
2526import java .io .DataOutputStream ;
2829import java .io .IOException ;
2930import java .io .PrintStream ;
3031import java .net .InetSocketAddress ;
31- import java .util .ArrayList ;
3232import java .util .Arrays ;
33+ import java .util .ArrayList ;
3334import java .util .EnumSet ;
3435import java .util .List ;
3536import java .util .Map ;
3637import java .util .Optional ;
38+
3739import org .apache .commons .lang3 .RandomUtils ;
3840import org .apache .hadoop .classification .InterfaceAudience .Private ;
3941import org .apache .hadoop .fs .FileContext ;
5153import org .apache .hadoop .yarn .api .records .Resource ;
5254import org .apache .hadoop .yarn .conf .YarnConfiguration ;
5355import org .apache .hadoop .yarn .exceptions .ConfigurationException ;
56+ import org .apache .hadoop .yarn .exceptions .YarnException ;
5457import org .apache .hadoop .yarn .factory .providers .RecordFactoryProvider ;
5558import org .apache .hadoop .yarn .server .nodemanager .containermanager .container .Container ;
5659import org .apache .hadoop .yarn .server .nodemanager .containermanager .container .ContainerDiagnosticsUpdateEvent ;
5760import org .apache .hadoop .yarn .server .nodemanager .containermanager .launcher .ContainerLaunch ;
61+ import org .apache .hadoop .yarn .server .nodemanager .containermanager .linux .resources .ResourceHandlerException ;
62+ import org .apache .hadoop .yarn .server .nodemanager .containermanager .linux .resources .numa .NumaResourceAllocation ;
63+ import org .apache .hadoop .yarn .server .nodemanager .containermanager .linux .resources .numa .NumaResourceAllocator ;
5864import org .apache .hadoop .yarn .server .nodemanager .containermanager .localizer .ContainerLocalizer ;
5965import org .apache .hadoop .yarn .server .nodemanager .containermanager .runtime .ContainerExecutionException ;
6066import org .apache .hadoop .yarn .server .nodemanager .executor .ContainerExecContext ;
6167import org .apache .hadoop .yarn .server .nodemanager .executor .ContainerLivenessContext ;
68+ import org .apache .hadoop .yarn .server .nodemanager .executor .ContainerReacquisitionContext ;
6269import org .apache .hadoop .yarn .server .nodemanager .executor .ContainerReapContext ;
6370import org .apache .hadoop .yarn .server .nodemanager .executor .ContainerSignalContext ;
6471import org .apache .hadoop .yarn .server .nodemanager .executor .ContainerStartContext ;
@@ -86,6 +93,8 @@ public class DefaultContainerExecutor extends ContainerExecutor {
8693
8794 private String logDirPermissions = null ;
8895
96+ private NumaResourceAllocator numaResourceAllocator ;
97+
8998 /**
9099 * Default constructor for use in testing.
91100 */
@@ -137,7 +146,17 @@ protected void setScriptExecutable(Path script, String owner)
137146
138147 @ Override
139148 public void init (Context nmContext ) throws IOException {
140- // nothing to do or verify here
149+ if (numaAwarenessEnabled (getConf ())) {
150+ numaResourceAllocator = new NumaResourceAllocator (nmContext );
151+ try {
152+ numaResourceAllocator .init (this .getConf ());
153+ LOG .info ("NUMA resources allocation is enabled in DefaultContainer Executor," +
154+ " Successfully initialized NUMA resources allocator." );
155+ } catch (YarnException e ) {
156+ LOG .warn ("Improper NUMA configuration provided." , e );
157+ throw new IOException ("Failed to initialize configured numa subsystem!" );
158+ }
159+ }
141160 }
142161
143162 @ Override
@@ -300,11 +319,28 @@ public int launchContainer(ContainerStartContext ctx)
300319 setScriptExecutable (launchDst , user );
301320 setScriptExecutable (sb .getWrapperScriptPath (), user );
302321
322+ // adding numa commands based on configuration
323+ String [] numaCommands = new String []{};
324+
325+ if (numaResourceAllocator != null ) {
326+ try {
327+ NumaResourceAllocation numaResourceAllocation =
328+ numaResourceAllocator .allocateNumaNodes (container );
329+ if (numaResourceAllocation != null ) {
330+ numaCommands = getNumaCommands (numaResourceAllocation );
331+ }
332+ } catch (ResourceHandlerException e ) {
333+ LOG .error ("NumaResource Allocation failed!" , e );
334+ throw new IOException ("NumaResource Allocation Error!" , e );
335+ }
336+ }
337+
303338 shExec = buildCommandExecutor (sb .getWrapperScriptPath ().toString (),
304- containerIdStr , user , pidFile , container .getResource (),
305- new File (containerWorkDir .toUri ().getPath ()),
306- container .getLaunchContext ().getEnvironment ());
307-
339+ containerIdStr , user , pidFile , container .getResource (),
340+ new File (containerWorkDir .toUri ().getPath ()),
341+ container .getLaunchContext ().getEnvironment (),
342+ numaCommands );
343+
308344 if (isContainerActive (containerId )) {
309345 shExec .execute ();
310346 } else {
@@ -350,6 +386,7 @@ public int launchContainer(ContainerStartContext ctx)
350386 return exitCode ;
351387 } finally {
352388 if (shExec != null ) shExec .close ();
389+ postComplete (containerId );
353390 }
354391 return 0 ;
355392 }
@@ -372,16 +409,19 @@ public int relaunchContainer(ContainerStartContext ctx)
372409 * as the current working directory for the command. If null,
373410 * the current working directory is not modified.
374411 * @param environment the container environment
412+ * @param numaCommands list of prefix numa commands
375413 * @return the new {@link ShellCommandExecutor}
376414 * @see ShellCommandExecutor
377415 */
378- protected CommandExecutor buildCommandExecutor (String wrapperScriptPath ,
379- String containerIdStr , String user , Path pidFile , Resource resource ,
380- File workDir , Map <String , String > environment ) {
381-
416+ protected CommandExecutor buildCommandExecutor (String wrapperScriptPath ,
417+ String containerIdStr , String user , Path pidFile , Resource resource ,
418+ File workDir , Map <String , String > environment , String [] numaCommands ) {
419+
382420 String [] command = getRunCommand (wrapperScriptPath ,
383421 containerIdStr , user , pidFile , this .getConf (), resource );
384422
423+ command = concatStringCommands (command , numaCommands );
424+
385425 LOG .info ("launchContainer: {}" , Arrays .toString (command ));
386426 return new ShellCommandExecutor (
387427 command ,
@@ -1040,4 +1080,92 @@ public void updateYarnSysFS(Context ctx, String user,
10401080 String appId , String spec ) throws IOException {
10411081 throw new ServiceStateException ("Implementation unavailable" );
10421082 }
1083+
1084+ @ Override
1085+ public int reacquireContainer (ContainerReacquisitionContext ctx )
1086+ throws IOException , InterruptedException {
1087+ try {
1088+ if (numaResourceAllocator != null ) {
1089+ numaResourceAllocator .recoverNumaResource (ctx .getContainerId ());
1090+ }
1091+ return super .reacquireContainer (ctx );
1092+ } finally {
1093+ postComplete (ctx .getContainerId ());
1094+ }
1095+ }
1096+
1097+ /**
1098+ * clean up and release of resources.
1099+ *
1100+ * @param containerId containerId of running container
1101+ */
1102+ public void postComplete (final ContainerId containerId ) {
1103+ if (numaResourceAllocator != null ) {
1104+ try {
1105+ numaResourceAllocator .releaseNumaResource (containerId );
1106+ } catch (ResourceHandlerException e ) {
1107+ LOG .warn ("NumaResource release failed for " +
1108+ "containerId: {}. Exception: " , containerId , e );
1109+ }
1110+ }
1111+ }
1112+
1113+ /**
1114+ * @param resourceAllocation NonNull NumaResourceAllocation object reference
1115+ * @return Array of numa specific commands
1116+ */
1117+ String [] getNumaCommands (NumaResourceAllocation resourceAllocation ) {
1118+ String [] numaCommand = new String [3 ];
1119+ numaCommand [0 ] = this .getConf ().get (YarnConfiguration .NM_NUMA_AWARENESS_NUMACTL_CMD ,
1120+ YarnConfiguration .DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD );
1121+ numaCommand [1 ] = "--interleave=" + String .join ("," , resourceAllocation .getMemNodes ());
1122+ numaCommand [2 ] = "--cpunodebind=" + String .join ("," , resourceAllocation .getCpuNodes ());
1123+ return numaCommand ;
1124+
1125+ }
1126+
1127+ /**
1128+ * @param firstStringArray Array of String
1129+ * @param secondStringArray Array of String
1130+ * @return combined array of string where first elements are from firstStringArray
1131+ * and later are the elements from secondStringArray
1132+ */
1133+ String [] concatStringCommands (String [] firstStringArray , String [] secondStringArray ) {
1134+
1135+ if (firstStringArray == null && secondStringArray == null ) {
1136+ return null ;
1137+ }
1138+
1139+ int len = 0 ;
1140+
1141+ if (firstStringArray != null ){
1142+ len = len + firstStringArray .length ;
1143+ }
1144+
1145+ if (secondStringArray != null ){
1146+ len = len + secondStringArray .length ;
1147+ }
1148+
1149+ if (len == 0 ) {
1150+ return new String []{};
1151+ }
1152+
1153+ String [] ret = new String [len ];
1154+ int idx = 0 ;
1155+ for (int i =0 ; firstStringArray !=null && i < firstStringArray .length ; i ++) {
1156+ ret [idx ] = firstStringArray [i ];
1157+ idx ++;
1158+ }
1159+ for (int i =0 ; secondStringArray !=null && i < secondStringArray .length ; i ++) {
1160+ ret [idx ] = secondStringArray [i ];
1161+ idx ++;
1162+ }
1163+ return ret ;
1164+ }
1165+
1166+ @ VisibleForTesting
1167+ public void setNumaResourceAllocator (NumaResourceAllocator numaResourceAllocator ) {
1168+ this .numaResourceAllocator = numaResourceAllocator ;
1169+ }
1170+
10431171}
0 commit comments