3333
3434import java .io .IOException ;
3535import java .io .Serializable ;
36- import java .util .*;
36+ import java .util .Collections ;
37+ import java .util .HashMap ;
38+ import java .util .Iterator ;
39+ import java .util .LinkedList ;
40+ import java .util .List ;
41+ import java .util .Map ;
3742
3843import static org .apache .hadoop .yarn .api .records .ResourceInformation .FPGA_URI ;
3944
40-
4145/**
4246 * This FPGA resource allocator tends to be used by different FPGA vendor's plugin
4347 * A "type" parameter is taken into consideration when allocation
@@ -50,20 +54,21 @@ public class FpgaResourceAllocator {
5054 private List <FpgaDevice > allowedFpgas = new LinkedList <>();
5155
5256 //key is resource type of FPGA, vendor plugin supported ID
53- private LinkedHashMap <String , List <FpgaDevice >> availableFpga = new LinkedHashMap <>();
57+ private Map <String , List <FpgaDevice >> availableFpgas = new HashMap <>();
5458
55- //key is requestor, aka. container ID
56- private LinkedHashMap <String , List <FpgaDevice >> usedFpgaByRequestor = new LinkedHashMap <>();
59+ //key is the container ID
60+ private Map <String , List <FpgaDevice >> containerToFpgaMapping =
61+ new HashMap <>();
5762
5863 private Context nmContext ;
5964
6065 @ VisibleForTesting
61- public HashMap <String , List <FpgaDevice >> getAvailableFpga () {
62- return availableFpga ;
66+ Map <String , List <FpgaDevice >> getAvailableFpga () {
67+ return availableFpgas ;
6368 }
6469
6570 @ VisibleForTesting
66- public List <FpgaDevice > getAllowedFpga () {
71+ List <FpgaDevice > getAllowedFpga () {
6772 return allowedFpgas ;
6873 }
6974
@@ -72,25 +77,31 @@ public FpgaResourceAllocator(Context ctx) {
7277 }
7378
7479 @ VisibleForTesting
75- public int getAvailableFpgaCount () {
80+ int getAvailableFpgaCount () {
7681 int count = 0 ;
77- for (List <FpgaDevice > l : availableFpga .values ()) {
78- count += l .size ();
79- }
82+
83+ count = availableFpgas .values ()
84+ .stream ()
85+ .mapToInt (i -> i .size ())
86+ .sum ();
87+
8088 return count ;
8189 }
8290
8391 @ VisibleForTesting
84- public HashMap <String , List <FpgaDevice >> getUsedFpga () {
85- return usedFpgaByRequestor ;
92+ Map <String , List <FpgaDevice >> getUsedFpga () {
93+ return containerToFpgaMapping ;
8694 }
8795
8896 @ VisibleForTesting
89- public int getUsedFpgaCount () {
97+ int getUsedFpgaCount () {
9098 int count = 0 ;
91- for (List <FpgaDevice > l : usedFpgaByRequestor .values ()) {
92- count += l .size ();
93- }
99+
100+ count = containerToFpgaMapping .values ()
101+ .stream ()
102+ .mapToInt (i -> i .size ())
103+ .sum ();
104+
94105 return count ;
95106 }
96107
@@ -252,42 +263,31 @@ public String toString() {
252263 }
253264 }
254265
255- public synchronized void addFpga (String type , List <FpgaDevice > list ) {
256- availableFpga .putIfAbsent (type , new LinkedList <>());
266+ // called once during initialization
267+ public synchronized void addFpgaDevices (String type , List <FpgaDevice > list ) {
268+ availableFpgas .putIfAbsent (type , new LinkedList <>());
269+ List <FpgaDevice > fpgaDevices = new LinkedList <>();
270+
257271 for (FpgaDevice device : list ) {
258272 if (!allowedFpgas .contains (device )) {
259- allowedFpgas .add (device );
260- availableFpga .get (type ).add (device );
273+ fpgaDevices .add (device );
274+ availableFpgas .get (type ).add (device );
275+ } else {
276+ LOG .warn ("Duplicate device found: " + device + ". Ignored" );
261277 }
262278 }
263- LOG .info ("Add a list of FPGA Devices: " + list );
279+
280+ allowedFpgas = ImmutableList .copyOf (fpgaDevices );
281+ LOG .info ("Added a list of FPGA Devices: " + allowedFpgas );
264282 }
265283
266284 public synchronized void updateFpga (String requestor ,
267285 FpgaDevice device , String newIPID , String newHash ) {
268- List <FpgaDevice > usedFpgas = usedFpgaByRequestor .get (requestor );
269- int index = findMatchedFpga (usedFpgas , device );
270- if (-1 != index ) {
271- usedFpgas .get (index ).setIPID (newIPID );
272- FpgaDevice fpga = usedFpgas .get (index );
273- fpga .setIPID (newIPID );
274- fpga .setAocxHash (newHash );
275- } else {
276- LOG .warn ("Failed to update FPGA due to unknown reason " +
277- "that no record for this allocated device:" + device );
278- }
286+ device .setIPID (newIPID );
287+ device .setAocxHash (newHash );
279288 LOG .info ("Update IPID to " + newIPID +
280- " for this allocated device:" + device );
281- }
282-
283- private synchronized int findMatchedFpga (List <FpgaDevice > devices , FpgaDevice item ) {
284- int i = 0 ;
285- for (; i < devices .size (); i ++) {
286- if (devices .get (i ) == item ) {
287- return i ;
288- }
289- }
290- return -1 ;
289+ " for this allocated device: " + device );
290+ LOG .info ("Update IP hash to " + newHash );
291291 }
292292
293293 /**
@@ -301,7 +301,8 @@ private synchronized int findMatchedFpga(List<FpgaDevice> devices, FpgaDevice it
301301 * */
302302 public synchronized FpgaAllocation assignFpga (String type , long count ,
303303 Container container , String ipidHash ) throws ResourceHandlerException {
304- List <FpgaDevice > currentAvailableFpga = availableFpga .get (type );
304+ List <FpgaDevice > currentAvailableFpga = availableFpgas .get (type );
305+
305306 String requestor = container .getContainerId ().toString ();
306307 if (null == currentAvailableFpga ) {
307308 throw new ResourceHandlerException ("No such type of FPGA resource available: " + type );
@@ -341,8 +342,8 @@ public synchronized FpgaAllocation assignFpga(String type, long count,
341342 }
342343
343344 // update state store success, update internal used FPGAs
344- usedFpgaByRequestor .putIfAbsent (requestor , new LinkedList <>());
345- usedFpgaByRequestor .get (requestor ).addAll (assignedFpgas );
345+ containerToFpgaMapping .putIfAbsent (requestor , new LinkedList <>());
346+ containerToFpgaMapping .get (requestor ).addAll (assignedFpgas );
346347 }
347348
348349 return new FpgaAllocation (assignedFpgas , currentAvailableFpga );
@@ -390,14 +391,13 @@ public synchronized void recoverAssignedFpgas(ContainerId containerId) throws Re
390391 }
391392
392393 public synchronized void cleanupAssignFpgas (String requestor ) {
393- List <FpgaDevice > usedFpgas = usedFpgaByRequestor .get (requestor );
394+ List <FpgaDevice > usedFpgas = containerToFpgaMapping .get (requestor );
394395 if (usedFpgas != null ) {
395396 for (FpgaDevice device : usedFpgas ) {
396397 // Add back to availableFpga
397- availableFpga .get (device .getType ()).add (device );
398+ availableFpgas .get (device .getType ()).add (device );
398399 }
399- usedFpgaByRequestor .remove (requestor );
400+ containerToFpgaMapping .remove (requestor );
400401 }
401402 }
402-
403403}
0 commit comments