Skip to content

Commit 8e1539e

Browse files
committed
YARN-9266. General improvements in IntelFpgaOpenclPlugin. Contributed by Peter Bacsko.
1 parent 24793d2 commit 8e1539e

File tree

8 files changed

+615
-417
lines changed

8 files changed

+615
-417
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
2727
import org.apache.hadoop.conf.Configuration;
28-
import org.apache.hadoop.fs.Path;
2928
import org.apache.hadoop.util.StringUtils;
3029
import org.apache.hadoop.yarn.api.records.ContainerId;
3130
import org.apache.hadoop.yarn.api.records.Resource;
@@ -37,13 +36,13 @@
3736
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
3837
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
3938
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
39+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
4040
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.AbstractFpgaVendorPlugin;
4141
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
4242

4343
import java.util.ArrayList;
4444
import java.util.Arrays;
4545
import java.util.List;
46-
import java.util.Map;
4746

4847
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
4948

@@ -89,21 +88,25 @@ public String getRequestedIPID(Container container) {
8988
}
9089

9190
@Override
92-
public List<PrivilegedOperation> bootstrap(Configuration configuration) throws ResourceHandlerException {
91+
public List<PrivilegedOperation> bootstrap(Configuration configuration)
92+
throws ResourceHandlerException {
9393
// The plugin should be initilized by FpgaDiscoverer already
9494
if (!vendorPlugin.initPlugin(configuration)) {
95-
throw new ResourceHandlerException("FPGA plugin initialization failed", null);
95+
throw new ResourceHandlerException("FPGA plugin initialization failed");
9696
}
9797
LOG.info("FPGA Plugin bootstrap success.");
9898
// Get avialable devices minor numbers from toolchain or static configuration
99-
List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList = FpgaDiscoverer.getInstance().discover();
99+
List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList =
100+
FpgaDiscoverer.getInstance().discover();
100101
allocator.addFpga(vendorPlugin.getFpgaType(), fpgaDeviceList);
101-
this.cGroupsHandler.initializeCGroupController(CGroupsHandler.CGroupController.DEVICES);
102+
this.cGroupsHandler.initializeCGroupController(
103+
CGroupsHandler.CGroupController.DEVICES);
102104
return null;
103105
}
104106

105107
@Override
106-
public List<PrivilegedOperation> preStart(Container container) throws ResourceHandlerException {
108+
public List<PrivilegedOperation> preStart(Container container)
109+
throws ResourceHandlerException {
107110
// 1. Get requested FPGA type and count, choose corresponding FPGA plugin(s)
108111
// 2. Use allocator.assignFpga(type, count) to get FPGAAllocation
109112
// 3. If required, download to ensure IP file exists and configure IP file for all devices
@@ -126,15 +129,17 @@ public List<PrivilegedOperation> preStart(Container container) throws ResourceHa
126129
container, getRequestedIPID(container));
127130
LOG.info("FpgaAllocation:" + allocation);
128131

129-
PrivilegedOperation privilegedOperation = new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA,
132+
PrivilegedOperation privilegedOperation =
133+
new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA,
130134
Arrays.asList(CONTAINER_ID_CLI_OPTION, containerIdStr));
131135
if (!allocation.getDenied().isEmpty()) {
132136
List<Integer> denied = new ArrayList<>();
133137
allocation.getDenied().forEach(device -> denied.add(device.getMinor()));
134138
privilegedOperation.appendArgs(Arrays.asList(EXCLUDED_FPGAS_CLI_OPTION,
135139
StringUtils.join(",", denied)));
136140
}
137-
privilegedOperationExecutor.executePrivilegedOperation(privilegedOperation, true);
141+
privilegedOperationExecutor.executePrivilegedOperation(
142+
privilegedOperation, true);
138143

139144
if (deviceCount > 0) {
140145
/**
@@ -152,25 +157,30 @@ public List<PrivilegedOperation> preStart(Container container) throws ResourceHa
152157
* for different devices
153158
*
154159
* */
155-
ipFilePath = vendorPlugin.downloadIP(getRequestedIPID(container), container.getWorkDir(),
160+
ipFilePath = vendorPlugin.retrieveIPfilePath(
161+
getRequestedIPID(container),
162+
container.getWorkDir(),
156163
container.getResourceSet().getLocalizedResources());
157-
if (ipFilePath.isEmpty()) {
158-
LOG.warn("FPGA plugin failed to download IP but continue, please check the value of environment viable: " +
159-
REQUEST_FPGA_IP_ID_KEY + " if you want yarn to help");
164+
if (ipFilePath == null) {
165+
LOG.warn("FPGA plugin failed to downloaded IP, please check the" +
166+
" value of environment viable: " + REQUEST_FPGA_IP_ID_KEY +
167+
" if you want YARN to program the device");
160168
} else {
161169
LOG.info("IP file path:" + ipFilePath);
162170
List<FpgaResourceAllocator.FpgaDevice> allowed = allocation.getAllowed();
163171
String majorMinorNumber;
164172
for (int i = 0; i < allowed.size(); i++) {
165-
majorMinorNumber = allowed.get(i).getMajor() + ":" + allowed.get(i).getMinor();
166-
String currentIPID = allowed.get(i).getIPID();
173+
FpgaDevice device = allowed.get(i);
174+
majorMinorNumber = device.getMajor() + ":" + device.getMinor();
175+
String currentIPID = device.getIPID();
167176
if (null != currentIPID &&
168177
currentIPID.equalsIgnoreCase(getRequestedIPID(container))) {
169-
LOG.info("IP already in device \"" + allowed.get(i).getAliasDevName() + "," +
170-
majorMinorNumber + "\", skip reprogramming");
178+
LOG.info("IP already in device \"" +
179+
allowed.get(i).getAliasDevName() +
180+
"," + majorMinorNumber + "\", skip reprogramming");
171181
continue;
172182
}
173-
if (vendorPlugin.configureIP(ipFilePath, majorMinorNumber)) {
183+
if (vendorPlugin.configureIP(ipFilePath, device)) {
174184
// update the allocator that we update an IP of a device
175185
allocator.updateFpga(containerIdStr, allowed.get(i),
176186
getRequestedIPID(container));
@@ -186,7 +196,8 @@ public List<PrivilegedOperation> preStart(Container container) throws ResourceHa
186196
throw re;
187197
} catch (PrivilegedOperationException e) {
188198
allocator.cleanupAssignFpgas(containerIdStr);
189-
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, containerIdStr);
199+
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
200+
containerIdStr);
190201
LOG.warn("Could not update cgroup for container", e);
191202
throw new ResourceHandlerException(e);
192203
}
@@ -200,7 +211,8 @@ public List<PrivilegedOperation> preStart(Container container) throws ResourceHa
200211
}
201212

202213
@Override
203-
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId) throws ResourceHandlerException {
214+
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
215+
throws ResourceHandlerException {
204216
allocator.recoverAssignedFpgas(containerId);
205217
return null;
206218
}
@@ -212,7 +224,8 @@ public List<PrivilegedOperation> updateContainer(Container container)
212224
}
213225

214226
@Override
215-
public List<PrivilegedOperation> postComplete(ContainerId containerId) throws ResourceHandlerException {
227+
public List<PrivilegedOperation> postComplete(ContainerId containerId)
228+
throws ResourceHandlerException {
216229
allocator.cleanupAssignFpgas(containerId.toString());
217230
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
218231
containerId.toString());

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/AbstractFpgaVendorPlugin.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
import org.apache.hadoop.classification.InterfaceAudience;
2323
import org.apache.hadoop.classification.InterfaceStability;
24-
import org.apache.hadoop.conf.Configurable;
2524
import org.apache.hadoop.conf.Configuration;
2625
import org.apache.hadoop.fs.Path;
2726
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
27+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
2828

2929
import java.util.List;
3030
import java.util.Map;
@@ -38,7 +38,7 @@
3838

3939
@InterfaceAudience.Private
4040
@InterfaceStability.Unstable
41-
public interface AbstractFpgaVendorPlugin extends Configurable{
41+
public interface AbstractFpgaVendorPlugin {
4242

4343
/**
4444
* Check vendor's toolchain and required environment
@@ -72,19 +72,14 @@ public interface AbstractFpgaVendorPlugin extends Configurable{
7272
* localized file path and value is soft link names
7373
* @return The absolute path string of IP file
7474
* */
75-
String downloadIP(String id, String dstDir, Map<Path, List<String>> localizedResources);
75+
String retrieveIPfilePath(String id, String dstDir,
76+
Map<Path, List<String>> localizedResources);
7677

7778
/**
7879
* The vendor plugin configure an IP file to a device
7980
* @param ipPath The absolute path of the IP file
80-
* @param majorMinorNumber The device in format &lt;major:minor&gt;
81+
* @param device The FPGA device object
8182
* @return configure device ok or not
8283
* */
83-
boolean configureIP(String ipPath, String majorMinorNumber);
84-
85-
@Override
86-
void setConf(Configuration conf);
87-
88-
@Override
89-
Configuration getConf();
84+
boolean configureIP(String ipPath, FpgaDevice device);
9085
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
19+
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.regex.Matcher;
24+
import java.util.regex.Pattern;
25+
26+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
27+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin.InnerShellExecutor;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
final class AoclDiagnosticOutputParser {
32+
private AoclDiagnosticOutputParser() {
33+
// no instances
34+
}
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(
37+
AoclDiagnosticOutputParser.class);
38+
39+
/**
40+
* One real sample output of Intel FPGA SDK 17.0's "aocl diagnose" is as below:
41+
* "
42+
* aocl diagnose: Running diagnose from /home/fpga/intelFPGA_pro/17.0/hld/board/nalla_pcie/linux64/libexec
43+
*
44+
* ------------------------- acl0 -------------------------
45+
* Vendor: Nallatech ltd
46+
*
47+
* Phys Dev Name Status Information
48+
*
49+
* aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0)
50+
* PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8
51+
* FPGA temperature = 54.4 degrees C.
52+
* Total Card Power Usage = 31.7 Watts.
53+
* Device Power Usage = 0.0 Watts.
54+
*
55+
* DIAGNOSTIC_PASSED
56+
* ---------------------------------------------------------
57+
* "
58+
*
59+
* While per Intel's guide, the output(should be outdated or prior SDK version's) is as below:
60+
*
61+
* "
62+
* aocl diagnose: Running diagnostic from ALTERAOCLSDKROOT/board/&lt;board_name&gt;/
63+
* &lt;platform&gt;/libexec
64+
* Verified that the kernel mode driver is installed on the host machine.
65+
* Using board package from vendor: &lt;board_vendor_name&gt;
66+
* Querying information for all supported devices that are installed on the host
67+
* machine ...
68+
*
69+
* device_name Status Information
70+
*
71+
* acl0 Passed &lt;descriptive_board_name&gt;
72+
* PCIe dev_id = &lt;device_ID&gt;, bus:slot.func = 02:00.00,
73+
* at Gen 2 with 8 lanes.
74+
* FPGA temperature=43.0 degrees C.
75+
* acl1 Passed &lt;descriptive_board_name&gt;
76+
* PCIe dev_id = &lt;device_ID&gt;, bus:slot.func = 03:00.00,
77+
* at Gen 2 with 8 lanes.
78+
* FPGA temperature = 35.0 degrees C.
79+
*
80+
* Found 2 active device(s) installed on the host machine, to perform a full
81+
* diagnostic on a specific device, please run aocl diagnose &lt;device_name&gt;
82+
*
83+
* DIAGNOSTIC_PASSED
84+
* "
85+
* But this method only support the first output
86+
* */
87+
public static List<FpgaDevice> parseDiagnosticOutput(
88+
String output, InnerShellExecutor shellExecutor, String fpgaType) {
89+
if (output.contains("DIAGNOSTIC_PASSED")) {
90+
List<FpgaDevice> devices = new ArrayList<>();
91+
Matcher headerStartMatcher = Pattern.compile("acl[0-31]")
92+
.matcher(output);
93+
Matcher headerEndMatcher = Pattern.compile("(?i)DIAGNOSTIC_PASSED")
94+
.matcher(output);
95+
int sectionStartIndex;
96+
int sectionEndIndex;
97+
String aliasName;
98+
99+
while (headerStartMatcher.find()) {
100+
sectionStartIndex = headerStartMatcher.end();
101+
String section = null;
102+
aliasName = headerStartMatcher.group();
103+
while (headerEndMatcher.find(sectionStartIndex)) {
104+
sectionEndIndex = headerEndMatcher.start();
105+
section = output.substring(sectionStartIndex, sectionEndIndex);
106+
break;
107+
}
108+
109+
if (section == null) {
110+
LOG.warn("Unsupported diagnose output");
111+
LOG.warn("aocl output is: " + output);
112+
return Collections.emptyList();
113+
}
114+
115+
// devName, \(.*\)
116+
// busNum, bus:slot.func\s=\s.*,
117+
// FPGA temperature\s=\s.*
118+
// Total\sCard\sPower\sUsage\s=\s.*
119+
String[] fieldRegexes = new String[]{"\\(.*\\)\n",
120+
"(?i)bus:slot.func\\s=\\s.*,",
121+
"(?i)FPGA temperature\\s=\\s.*",
122+
"(?i)Total\\sCard\\sPower\\sUsage\\s=\\s.*"};
123+
String[] fields = new String[4];
124+
String tempFieldValue;
125+
126+
for (int i = 0; i < fieldRegexes.length; i++) {
127+
Matcher fieldMatcher = Pattern.compile(fieldRegexes[i])
128+
.matcher(section);
129+
if (!fieldMatcher.find()) {
130+
LOG.warn("Couldn't find " + fieldRegexes[i] + " pattern");
131+
fields[i] = "";
132+
continue;
133+
}
134+
tempFieldValue = fieldMatcher.group().trim();
135+
if (i == 0) {
136+
// special case for Device name
137+
fields[i] = tempFieldValue.substring(1,
138+
tempFieldValue.length() - 1);
139+
} else {
140+
String ss = tempFieldValue.split("=")[1].trim();
141+
fields[i] = ss.substring(0, ss.length() - 1);
142+
}
143+
}
144+
145+
String majorMinorNumber = shellExecutor
146+
.getMajorAndMinorNumber(fields[0]);
147+
if (null != majorMinorNumber) {
148+
String[] mmn = majorMinorNumber.split(":");
149+
150+
devices.add(new FpgaDevice(fpgaType,
151+
Integer.parseInt(mmn[0]),
152+
Integer.parseInt(mmn[1]), null,
153+
fields[0], aliasName, fields[1], fields[2], fields[3]));
154+
} else {
155+
LOG.warn("Failed to retrieve major/minor number for device");
156+
}
157+
}
158+
159+
return devices;
160+
} else {
161+
LOG.warn("The diagnostic has failed");
162+
LOG.warn("Output of aocl is: " + output);
163+
return Collections.emptyList();
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)