Skip to content

Commit e2b15d4

Browse files
elekRogPodge
authored andcommitted
HDDS-2022. Add additional freon tests
Closes apache#1341
1 parent 3aed995 commit e2b15d4

File tree

13 files changed

+1265
-1
lines changed

13 files changed

+1265
-1
lines changed

hadoop-ozone/tools/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
9191
<artifactId>jmh-generator-annprocess</artifactId>
9292
<version>1.19</version>
9393
</dependency>
94+
<dependency>
95+
<groupId>io.dropwizard.metrics</groupId>
96+
<artifactId>metrics-core</artifactId>
97+
</dependency>
98+
<dependency>
99+
<groupId>com.amazonaws</groupId>
100+
<artifactId>aws-java-sdk-s3</artifactId>
101+
<version>1.11.615</version>
102+
</dependency>
94103
<dependency>
95104
<groupId>com.google.code.findbugs</groupId>
96105
<artifactId>findbugs</artifactId>
Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
package org.apache.hadoop.ozone.freon;
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.net.InetSocketAddress;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
import java.util.regex.Matcher;
27+
import java.util.regex.Pattern;
28+
29+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
30+
import org.apache.hadoop.ipc.Client;
31+
import org.apache.hadoop.ipc.ProtobufRpcEngine;
32+
import org.apache.hadoop.ipc.RPC;
33+
import org.apache.hadoop.net.NetUtils;
34+
import org.apache.hadoop.ozone.OmUtils;
35+
import org.apache.hadoop.ozone.client.OzoneClient;
36+
import org.apache.hadoop.ozone.client.OzoneClientFactory;
37+
import org.apache.hadoop.ozone.client.OzoneVolume;
38+
import org.apache.hadoop.ozone.om.exceptions.OMException;
39+
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
40+
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
41+
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
42+
import org.apache.hadoop.security.UserGroupInformation;
43+
44+
import com.codahale.metrics.ConsoleReporter;
45+
import com.codahale.metrics.MetricRegistry;
46+
import org.apache.commons.codec.digest.DigestUtils;
47+
import org.apache.commons.lang3.RandomStringUtils;
48+
import org.apache.ratis.protocol.ClientId;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
51+
import picocli.CommandLine.Option;
52+
import picocli.CommandLine.ParentCommand;
53+
54+
/**
55+
* Base class for simplified performance tests.
56+
*/
57+
public class BaseFreonGenerator {
58+
59+
private static final Logger LOG =
60+
LoggerFactory.getLogger(BaseFreonGenerator.class);
61+
62+
private static final int CHECK_INTERVAL_MILLIS = 1000;
63+
64+
private static final String DIGEST_ALGORITHM = "MD5";
65+
66+
private static final Pattern ENV_VARIABLE_IN_PATTERN =
67+
Pattern.compile("__(.+?)__");
68+
69+
@ParentCommand
70+
private Freon freonCommand;
71+
72+
@Option(names = {"-n", "--number-of-tests"},
73+
description = "Number of the generated objects.",
74+
defaultValue = "1000")
75+
private long testNo = 1000;
76+
77+
@Option(names = {"-t", "--threads", "--thread"},
78+
description = "Number of threads used to execute",
79+
defaultValue = "10")
80+
private int threadNo;
81+
82+
@Option(names = {"-f", "--fail-at-end"},
83+
description = "If turned on, all the tasks will be executed even if "
84+
+ "there are failures.")
85+
private boolean failAtEnd;
86+
87+
@Option(names = {"-p", "--prefix"},
88+
description = "Unique identifier of the test execution. Usually used as"
89+
+ " a prefix of the generated object names. If empty, a random name"
90+
+ " will be generated",
91+
defaultValue = "")
92+
private String prefix = "";
93+
94+
private MetricRegistry metrics = new MetricRegistry();
95+
96+
private ExecutorService executor;
97+
98+
private AtomicLong successCounter;
99+
100+
private AtomicLong failureCounter;
101+
102+
private long startTime;
103+
104+
private PathSchema pathSchema;
105+
106+
/**
107+
* The main logic to execute a test generator.
108+
*
109+
* @param provider creates the new steps to execute.
110+
*/
111+
public void runTests(TaskProvider provider) {
112+
113+
executor = Executors.newFixedThreadPool(threadNo);
114+
115+
ProgressBar progressBar =
116+
new ProgressBar(System.out, testNo, successCounter::get);
117+
progressBar.start();
118+
119+
startTime = System.currentTimeMillis();
120+
//schedule the execution of all the tasks.
121+
122+
for (long i = 0; i < testNo; i++) {
123+
124+
final long counter = i;
125+
126+
executor.execute(() -> {
127+
try {
128+
129+
//in case of an other failed test, we shouldn't execute more tasks.
130+
if (!failAtEnd && failureCounter.get() > 0) {
131+
return;
132+
}
133+
134+
provider.executeNextTask(counter);
135+
successCounter.incrementAndGet();
136+
} catch (Exception e) {
137+
failureCounter.incrementAndGet();
138+
LOG.error("Error on executing task", e);
139+
}
140+
});
141+
}
142+
143+
// wait until all tasks are executed
144+
145+
while (successCounter.get() + failureCounter.get() < testNo && (
146+
failureCounter.get() == 0 || failAtEnd)) {
147+
try {
148+
Thread.sleep(CHECK_INTERVAL_MILLIS);
149+
} catch (InterruptedException e) {
150+
throw new RuntimeException(e);
151+
}
152+
}
153+
154+
//shutdown everything
155+
if (failureCounter.get() > 0 && !failAtEnd) {
156+
progressBar.terminate();
157+
} else {
158+
progressBar.shutdown();
159+
}
160+
executor.shutdown();
161+
try {
162+
executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
163+
} catch (Exception ex) {
164+
ex.printStackTrace();
165+
}
166+
167+
if (failureCounter.get() > 0) {
168+
throw new RuntimeException("One ore more freon test is failed.");
169+
}
170+
}
171+
172+
/**
173+
* Initialize internal counters, and variables. Call it before runTests.
174+
*/
175+
public void init() {
176+
177+
successCounter = new AtomicLong(0);
178+
failureCounter = new AtomicLong(0);
179+
180+
if (prefix.length() == 0) {
181+
prefix = RandomStringUtils.randomAlphanumeric(10);
182+
} else {
183+
//replace environment variables to support multi-node execution
184+
prefix = resolvePrefix(prefix);
185+
}
186+
LOG.info("Executing test with prefix {}", prefix);
187+
188+
pathSchema = new PathSchema(prefix);
189+
190+
Runtime.getRuntime().addShutdownHook(
191+
new Thread(this::printReport));
192+
}
193+
194+
/**
195+
* Resolve environment variables in the prefixes.
196+
*/
197+
public String resolvePrefix(String inputPrefix) {
198+
Matcher m = ENV_VARIABLE_IN_PATTERN.matcher(inputPrefix);
199+
StringBuffer sb = new StringBuffer();
200+
while (m.find()) {
201+
String environment = System.getenv(m.group(1));
202+
m.appendReplacement(sb, environment != null ? environment : "");
203+
}
204+
m.appendTail(sb);
205+
return sb.toString();
206+
}
207+
208+
/**
209+
* Print out reports from the executed tests.
210+
*/
211+
public void printReport() {
212+
ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
213+
reporter.report();
214+
System.out.println("Total execution time (sec): " + Math
215+
.round((System.currentTimeMillis() - startTime) / 1000.0));
216+
System.out.println("Failures: " + failureCounter.get());
217+
System.out.println("Successful executions: " + successCounter.get());
218+
}
219+
220+
/**
221+
* Create the OM RPC client to use it for testing.
222+
*/
223+
public OzoneManagerProtocolClientSideTranslatorPB createOmClient(
224+
OzoneConfiguration conf) throws IOException {
225+
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
226+
long omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
227+
InetSocketAddress omAddress = OmUtils.getOmAddressForClients(conf);
228+
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
229+
ProtobufRpcEngine.class);
230+
String clientId = ClientId.randomId().toString();
231+
return new OzoneManagerProtocolClientSideTranslatorPB(
232+
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress,
233+
ugi, conf, NetUtils.getDefaultSocketFactory(conf),
234+
Client.getRpcTimeout(conf)), clientId);
235+
}
236+
237+
/**
238+
* Generate a key/file name based on the prefix and counter.
239+
*/
240+
public String generateObjectName(long counter) {
241+
return pathSchema.getPath(counter);
242+
}
243+
244+
/**
245+
* Create missing target volume/bucket.
246+
*/
247+
public void ensureVolumeAndBucketExist(OzoneConfiguration ozoneConfiguration,
248+
String volumeName, String bucketName) throws IOException {
249+
250+
try (OzoneClient rpcClient = OzoneClientFactory
251+
.getRpcClient(ozoneConfiguration)) {
252+
253+
OzoneVolume volume = null;
254+
try {
255+
volume = rpcClient.getObjectStore().getVolume(volumeName);
256+
} catch (OMException ex) {
257+
if (ex.getResult() == ResultCodes.VOLUME_NOT_FOUND) {
258+
rpcClient.getObjectStore().createVolume(volumeName);
259+
volume = rpcClient.getObjectStore().getVolume(volumeName);
260+
} else {
261+
throw ex;
262+
}
263+
}
264+
265+
try {
266+
volume.getBucket(bucketName);
267+
} catch (OMException ex) {
268+
if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
269+
volume.createBucket(bucketName);
270+
}
271+
throw ex;
272+
}
273+
}
274+
}
275+
276+
/**
277+
* Create missing target volume.
278+
*/
279+
public void ensureVolumeExists(
280+
OzoneConfiguration ozoneConfiguration,
281+
String volumeName) throws IOException {
282+
try (OzoneClient rpcClient = OzoneClientFactory
283+
.getRpcClient(ozoneConfiguration)) {
284+
285+
try {
286+
rpcClient.getObjectStore().getVolume(volumeName);
287+
} catch (OMException ex) {
288+
if (ex.getResult() == ResultCodes.VOLUME_NOT_FOUND) {
289+
rpcClient.getObjectStore().createVolume(volumeName);
290+
}
291+
}
292+
293+
}
294+
}
295+
296+
/**
297+
* Calculate checksum of a byte array.
298+
*/
299+
public byte[] getDigest(byte[] content) throws IOException {
300+
DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM);
301+
dig.getMessageDigest().reset();
302+
return dig.digest(content);
303+
}
304+
305+
/**
306+
* Calculate checksum of an Input stream.
307+
*/
308+
public byte[] getDigest(InputStream stream) throws IOException {
309+
DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM);
310+
dig.getMessageDigest().reset();
311+
return dig.digest(stream);
312+
}
313+
314+
public String getPrefix() {
315+
return prefix;
316+
}
317+
318+
public MetricRegistry getMetrics() {
319+
return metrics;
320+
}
321+
322+
public OzoneConfiguration createOzoneConfiguration() {
323+
return freonCommand.createOzoneConfiguration();
324+
}
325+
/**
326+
* Simple contract to execute a new step during a freon test.
327+
*/
328+
@FunctionalInterface
329+
public interface TaskProvider {
330+
void executeNextTask(long step) throws Exception;
331+
}
332+
333+
}

0 commit comments

Comments
 (0)