1919
2020package org .apache .hadoop .yarn .server .nodemanager .containermanager .resourceplugin .fpga ;
2121
22- import com .google .common .annotations .VisibleForTesting ;
22+ import java .io .File ;
23+ import java .io .IOException ;
24+ import java .util .List ;
25+ import java .util .Optional ;
26+ import java .util .Set ;
27+ import java .util .function .Function ;
28+ import java .util .stream .Collectors ;
29+
2330import org .apache .hadoop .conf .Configuration ;
31+ import org .apache .hadoop .fs .FileUtil ;
32+ import org .apache .hadoop .util .Shell .ShellCommandExecutor ;
2433import org .apache .hadoop .yarn .conf .YarnConfiguration ;
2534import org .apache .hadoop .yarn .exceptions .YarnException ;
2635import org .apache .hadoop .yarn .server .nodemanager .containermanager .linux .resources .ResourceHandlerException ;
2736import org .apache .hadoop .yarn .server .nodemanager .containermanager .linux .resources .fpga .FpgaResourceAllocator ;
37+ import org .apache .hadoop .yarn .server .nodemanager .containermanager .resourceplugin .fpga .discovery .AoclOutputBasedDiscoveryStrategy ;
38+ import org .apache .hadoop .yarn .server .nodemanager .containermanager .resourceplugin .fpga .discovery .FPGADiscoveryStrategy ;
39+ import org .apache .hadoop .yarn .server .nodemanager .containermanager .resourceplugin .fpga .discovery .ScriptBasedFPGADiscoveryStrategy ;
40+ import org .apache .hadoop .yarn .server .nodemanager .containermanager .resourceplugin .fpga .discovery .SettingsBasedFPGADiscoveryStrategy ;
2841import org .slf4j .Logger ;
2942import org .slf4j .LoggerFactory ;
3043
31- import java .util .Iterator ;
32- import java .util .List ;
44+ import com .google .common .annotations .VisibleForTesting ;
45+ import com .google .common .collect .ImmutableList ;
46+ import com .google .common .collect .Sets ;
3347
3448public class FpgaDiscoverer {
35-
36- public static final Logger LOG = LoggerFactory .getLogger (
49+ private static final Logger LOG = LoggerFactory .getLogger (
3750 FpgaDiscoverer .class );
3851
3952 private static FpgaDiscoverer instance ;
@@ -44,8 +57,10 @@ public class FpgaDiscoverer {
4457
4558 private List <FpgaResourceAllocator .FpgaDevice > currentFpgaInfo = null ;
4659
60+ private Function <String , Optional <String >> scriptRunner = this ::runScript ;
61+
4762 // shell command timeout
48- private static final int MAX_EXEC_TIMEOUT_MS = 10 * 1000 ;
63+ public static final int MAX_EXEC_TIMEOUT_MS = 10 * 1000 ;
4964
5065 static {
5166 instance = new FpgaDiscoverer ();
@@ -56,31 +71,41 @@ public static FpgaDiscoverer getInstance() {
5671 }
5772
5873 @ VisibleForTesting
59- public synchronized static FpgaDiscoverer setInstance (FpgaDiscoverer newInstance ) {
74+ void setScriptRunner (Function <String , Optional <String >> scriptRunner ) {
75+ this .scriptRunner = scriptRunner ;
76+ }
77+
78+ @ VisibleForTesting
79+ static void reset () {
80+ instance = new FpgaDiscoverer ();
81+ }
82+
83+ @ VisibleForTesting
84+ public static FpgaDiscoverer setInstance (FpgaDiscoverer newInstance ) {
6085 instance = newInstance ;
6186 return instance ;
6287 }
6388
6489 @ VisibleForTesting
65- public synchronized void setConf (Configuration conf ) {
66- this .conf = conf ;
90+ public void setConf (Configuration configuration ) {
91+ this .conf = configuration ;
6792 }
6893
6994 public List <FpgaResourceAllocator .FpgaDevice > getCurrentFpgaInfo () {
7095 return currentFpgaInfo ;
7196 }
7297
73- public synchronized void setResourceHanderPlugin (AbstractFpgaVendorPlugin plugin ) {
74- this .plugin = plugin ;
98+ public void setResourceHanderPlugin (AbstractFpgaVendorPlugin vendorPlugin ) {
99+ this .plugin = vendorPlugin ;
75100 }
76101
77- public synchronized boolean diagnose () {
102+ public boolean diagnose () {
78103 return this .plugin .diagnose (MAX_EXEC_TIMEOUT_MS );
79104 }
80105
81- public synchronized void initialize (Configuration conf ) throws YarnException {
82- this .conf = conf ;
83- this .plugin .initPlugin (conf );
106+ public void initialize (Configuration config ) throws YarnException {
107+ this .conf = config ;
108+ this .plugin .initPlugin (config );
84109 // Try to diagnose FPGA
85110 LOG .info ("Trying to diagnose FPGA information ..." );
86111 if (!diagnose ()) {
@@ -91,40 +116,45 @@ public synchronized void initialize(Configuration conf) throws YarnException {
91116 /**
92117 * get avialable devices minor numbers from toolchain or static configuration
93118 * */
94- public synchronized List <FpgaResourceAllocator .FpgaDevice > discover () throws ResourceHandlerException {
119+ public List <FpgaResourceAllocator .FpgaDevice > discover ()
120+ throws ResourceHandlerException {
95121 List <FpgaResourceAllocator .FpgaDevice > list ;
96122 String allowed = this .conf .get (YarnConfiguration .NM_FPGA_ALLOWED_DEVICES );
97- // whatever static or auto discover, we always needs
98- // the vendor plugin to discover. For instance, IntelFpgaOpenclPlugin need to
99- // setup a mapping of <major:minor> to <aliasDevName>
100- list = this .plugin .discover (MAX_EXEC_TIMEOUT_MS );
101- if (0 == list .size ()) {
102- throw new ResourceHandlerException ("No FPGA devices detected!" );
123+
124+ String availableDevices = conf .get (
125+ YarnConfiguration .NM_FPGA_AVAILABLE_DEVICES );
126+ String discoveryScript = conf .get (
127+ YarnConfiguration .NM_FPGA_DEVICE_DISCOVERY_SCRIPT );
128+
129+ FPGADiscoveryStrategy discoveryStrategy ;
130+ if (availableDevices != null ) {
131+ discoveryStrategy =
132+ new SettingsBasedFPGADiscoveryStrategy (
133+ plugin .getFpgaType (), availableDevices );
134+ } else if (discoveryScript != null ) {
135+ discoveryStrategy =
136+ new ScriptBasedFPGADiscoveryStrategy (
137+ plugin .getFpgaType (), scriptRunner , discoveryScript );
138+ } else {
139+ discoveryStrategy = new AoclOutputBasedDiscoveryStrategy (plugin );
103140 }
104- currentFpgaInfo = list ;
105- if (allowed .equalsIgnoreCase (
141+
142+ list = discoveryStrategy .discover ();
143+
144+ if (allowed == null || allowed .equalsIgnoreCase (
106145 YarnConfiguration .AUTOMATICALLY_DISCOVER_GPU_DEVICES )) {
107- return list ;
146+ return list ;
108147 } else if (allowed .matches ("(\\ d,)*\\ d" )){
109- String [] minors = allowed .split ("," );
110- Iterator <FpgaResourceAllocator .FpgaDevice > iterator = list .iterator ();
111- // remove the non-configured minor numbers
112- FpgaResourceAllocator .FpgaDevice t ;
113- while (iterator .hasNext ()) {
114- boolean valid = false ;
115- t = iterator .next ();
116- for (String minorNumber : minors ) {
117- if (t .getMinor ().toString ().equals (minorNumber )) {
118- valid = true ;
119- break ;
120- }
121- }
122- if (!valid ) {
123- iterator .remove ();
124- }
125- }
148+ Set <String > minors = Sets .newHashSet (allowed .split ("," ));
149+
150+ // Replace list with a filtered one
151+ list = list
152+ .stream ()
153+ .filter (dev -> minors .contains (dev .getMinor ().toString ()))
154+ .collect (Collectors .toList ());
155+
126156 // if the count of user configured is still larger than actual
127- if (list .size () != minors .length ) {
157+ if (list .size () != minors .size () ) {
128158 LOG .warn ("We continue although there're mistakes in user's configuration " +
129159 YarnConfiguration .NM_FPGA_ALLOWED_DEVICES +
130160 "user configured:" + allowed + ", while the real:" + list .toString ());
@@ -133,7 +163,41 @@ public synchronized List<FpgaResourceAllocator.FpgaDevice> discover() throws Res
133163 throw new ResourceHandlerException ("Invalid value configured for " +
134164 YarnConfiguration .NM_FPGA_ALLOWED_DEVICES + ":\" " + allowed + "\" " );
135165 }
166+
167+ currentFpgaInfo = ImmutableList .copyOf (list );
168+
136169 return list ;
137170 }
138171
172+ private Optional <String > runScript (String path ) {
173+ if (path == null || path .trim ().isEmpty ()) {
174+ LOG .error ("Undefined script" );
175+ return Optional .empty ();
176+ }
177+
178+ File f = new File (path );
179+ if (!f .exists ()) {
180+ LOG .error ("Script does not exist" );
181+ return Optional .empty ();
182+ }
183+
184+ if (!FileUtil .canExecute (f )) {
185+ LOG .error ("Script is not executable" );
186+ return Optional .empty ();
187+ }
188+
189+ ShellCommandExecutor shell = new ShellCommandExecutor (
190+ new String [] {path },
191+ null ,
192+ null ,
193+ MAX_EXEC_TIMEOUT_MS );
194+ try {
195+ shell .execute ();
196+ String output = shell .getOutput ();
197+ return Optional .of (output );
198+ } catch (IOException e ) {
199+ LOG .error ("Cannot execute script" , e );
200+ return Optional .empty ();
201+ }
202+ }
139203}
0 commit comments