Skip to content
This repository was archived by the owner on Apr 4, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions falcon-regression/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES

NEW FEATURES
FALCON-1566 Add test for SLA monitoring API (Pragya Mittal)

FALCON-1567 Test case for Lifecycle feature (Pragya Mittal)

FALCON-1784 Add regression test for for FALCON-1647 (Paul Isaychuk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ public ServiceResponse getEntityLineage(String params)
if (StringUtils.isNotEmpty(params)){
url += colo.isEmpty() ? "?" + params : "&" + params;
}
return Util.sendRequestLineage(createUrl(url), "get", null, null);
return Util.sendJSONRequest(createUrl(url), "get", null, null);
}

/**
Expand All @@ -715,4 +715,19 @@ public InstanceDependencyResult getInstanceDependencies(
.createAndSendRequestProcessInstance(url, params, allColo, user);
}

/**
* Retrieves sla alerts.
* @param params list of optional parameters
* @return instances with sla missed.
*/
public ServiceResponse getSlaAlert(String params)
throws URISyntaxException, AuthenticationException, InterruptedException, IOException {
String url = createUrl(this.hostname + URLS.SLA.getValue(),
getEntityType());
if (StringUtils.isNotEmpty(params)) {
url += params;
}
return Util.sendJSONRequest(createUrl(url), "get", null, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.LineageGraphResult;
import org.apache.falcon.resource.SchedulableEntityInstanceResult;
import org.apache.http.HttpResponse;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -121,4 +122,13 @@ public LineageGraphResult getLineageGraphResult() {
return lineageGraphResult;
}

/**
* Retrieves SchedulableEntityInstanceResult from a message if possible.
* @return SchedulableEntityInstanceResult
*/
public SchedulableEntityInstanceResult getSlaResult() {
SchedulableEntityInstanceResult schedulableEntityInstanceResult = new GsonBuilder().create().fromJson(message,
SchedulableEntityInstanceResult.class);
return schedulableEntityInstanceResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public enum URLS {
STATUS_URL("/api/entities/status"),
ENTITY_SUMMARY("/api/entities/summary"),
SUBMIT_AND_SCHEDULE_URL("/api/entities/submitAndSchedule"),
SLA("/api/entities/sla-alert"),
ENTITY_LINEAGE("/api/metadata/lineage/entities"),
INSTANCE_RUNNING("/api/instance/running"),
INSTANCE_STATUS("/api/instance/status"),
Expand Down Expand Up @@ -595,7 +596,7 @@ public static Document convertStringToDocument(String xmlStr) {
* @throws URISyntaxException
* @throws AuthenticationException
*/
public static ServiceResponse sendRequestLineage(String url, String method, String data, String user)
public static ServiceResponse sendJSONRequest(String url, String method, String data, String user)
throws IOException, URISyntaxException, AuthenticationException, InterruptedException {
BaseRequest request = new BaseRequest(url, method, user, data);
request.addHeader(RequestKeys.CONTENT_TYPE_HEADER, RequestKeys.JSON_CONTENT_TYPE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.regression.SLA;

import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.falcon.resource.SchedulableEntityInstanceResult;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.List;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collections;


/**
* Feed SLA monitoring tests.
* Test assumes following properties are set in startup.properties of server :
* *.feed.sla.statusCheck.frequency.seconds=60
* *.feed.sla.lookAheadWindow.millis=60000
*/
@Test(groups = { "distributed", "embedded" })
public class FeedSLAMonitoringTest extends BaseTestClass {

private ColoHelper cluster = servers.get(0);
private FileSystem clusterFS = serverFS.get(0);
private String baseTestHDFSDir = cleanAndGetTestDir();
private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN;
private List<String> slaFeedNames;
private List<Frequency> slaFeedFrequencies;
private String clusterName;
private static final Logger LOGGER = Logger.getLogger(FeedSLAMonitoringTest.class);

private String startTime;
private String endTime;
private String slaStartTime;
private String slaEndTime;
private int noOfFeeds;
private int statusCheckFrequency;

private static final Comparator<SchedulableEntityInstance> DEPENDENCY_COMPARATOR =
new Comparator<SchedulableEntityInstance>() {
@Override
public int compare(SchedulableEntityInstance o1, SchedulableEntityInstance o2) {
return o1.compareTo(o2);
}
};

/**
* Submitting 3 feeds with different frequencies and sla values.
* @throws Exception
*/
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {

bundles[0] = BundleUtil.readELBundle();
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle(this);
bundles[0].setInputFeedDataPath(feedInputPath);
clusterName = bundles[0].getClusterNames().get(0);
ServiceResponse response =
prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
AssertUtil.assertSucceeded(response);

startTime = TimeUtil.getTimeWrtSystemTime(-10);
endTime = TimeUtil.addMinsToTime(startTime, 20);
noOfFeeds=3;

LOGGER.info("Time range between : " + startTime + " and " + endTime);
final String oldFeedName = bundles[0].getInputFeedNameFromBundle();
slaFeedFrequencies = Arrays.asList(new Frequency("1", Frequency.TimeUnit.minutes),
new Frequency("2", Frequency.TimeUnit.minutes),
new Frequency("4", Frequency.TimeUnit.minutes));

slaFeedNames = Arrays.asList(oldFeedName + "-1", oldFeedName + "-2", oldFeedName + "-3");

//Submit 3 feeds with different frequencies and sla values.
for (int bIndex = 0; bIndex < noOfFeeds; ++bIndex) {
final FeedMerlin ipFeed = new FeedMerlin(bundles[0].getInputFeedFromBundle());

ipFeed.setValidity(startTime, endTime);
ipFeed.setAvailabilityFlag("_SUCCESS");

//set slaLow and slaHigh
ipFeed.setSla(new Frequency("1", Frequency.TimeUnit.minutes),
new Frequency("2", Frequency.TimeUnit.minutes));
ipFeed.setName(slaFeedNames.get(bIndex));
ipFeed.setFrequency(slaFeedFrequencies.get(bIndex));

LOGGER.info("Feed is : " + ipFeed.toString());

AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(ipFeed.toString()));
}
}

@AfterMethod(alwaysRun = true)
public void tearDown() throws IOException {
cleanTestsDirs();
removeTestClassEntities();
}

/**
* The following test submits 3 feeds, checks the slaAlert for a given time range and validates its output.
* It also checks the sla status when feed is deleted , data created with/without _SUCCESS folder.
* @throws Exception
*/
@Test
public void feedSLATest() throws Exception {
/**TEST : Check sla response for a given time range
*/

statusCheckFrequency=60; // 60 seconds

// Map of instanceDate and corresponding list of SchedulableEntityInstance
Map<String, List<SchedulableEntityInstance>> instanceEntityMap = new HashMap<>();

slaStartTime = startTime;
slaEndTime = TimeUtil.addMinsToTime(slaStartTime, 10);
DateTime slaStartDate = TimeUtil.oozieDateToDate(slaStartTime);
DateTime slaEndDate = TimeUtil.oozieDateToDate(slaEndTime);

List<SchedulableEntityInstance> expectedInstances = new ArrayList<>();
SchedulableEntityInstance expectedSchedulableEntityInstance;

for (int index = 0; index < noOfFeeds; ++index) {

DateTime dt = new DateTime(slaStartDate);
while (!dt.isAfter(slaEndDate)) {

expectedSchedulableEntityInstance = new SchedulableEntityInstance(slaFeedNames.get(index),
clusterName, dt.toDate(), EntityType.FEED);
expectedSchedulableEntityInstance.setTags("Missed SLA High");
expectedInstances.add(expectedSchedulableEntityInstance);

if (!instanceEntityMap.containsKey(dt.toString())) {
instanceEntityMap.put(dt.toString(), new ArrayList<SchedulableEntityInstance>());
}
instanceEntityMap.get(dt.toString()).add(expectedSchedulableEntityInstance);
dt = dt.plusMinutes(slaFeedFrequencies.get(index).getFrequencyAsInt());

}
}

TimeUtil.sleepSeconds(statusCheckFrequency);

SchedulableEntityInstanceResult response = prism.getFeedHelper().getSlaAlert(
"?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult();

LOGGER.info(response.getMessage());

validateInstances(response, expectedInstances);

/**TEST : Create missing dependencies with _SUCCESS directory and check sla response
*/

String dateEntry = (String) instanceEntityMap.keySet().toArray()[1];
LOGGER.info(dateEntry + "/" + instanceEntityMap.get(dateEntry));
List<String> dataDates = InstanceUtil.getMinuteDatesToPath(dateEntry, dateEntry, 0);

HadoopUtil.createFolders(clusterFS, baseTestHDFSDir + "/input/", dataDates);

//sla response for feeds when _SUCCESS file is missing from dataPath
response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult();

// Response does not change as it checks for _SUCCESS file
validateInstances(response, expectedInstances);

//Create _SUCCESS file
HadoopUtil.recreateDir(clusterFS, baseTestHDFSDir + "/input/" + dataDates.get(0) + "/_SUCCESS");
for (SchedulableEntityInstance instance : instanceEntityMap.get(dateEntry)) {
expectedInstances.remove(instance);
}
instanceEntityMap.remove(dateEntry);

TimeUtil.sleepSeconds(statusCheckFrequency);

//sla response for feeds when _SUCCESS file is available in dataPath
response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult();
validateInstances(response, expectedInstances);

/** TEST : Delete feed and check sla response
*/
String deletedFeed = slaFeedNames.get(0);
prism.getFeedHelper().deleteByName(deletedFeed, null);

for (Map.Entry<String, List<SchedulableEntityInstance>> entry : instanceEntityMap.entrySet())
{
LOGGER.info(entry.getKey() + "/" + entry.getValue());
for (SchedulableEntityInstance instance : entry.getValue()) {
if (instance.getEntityName().equals(deletedFeed)) {
expectedInstances.remove(instance);
}
}

}
TimeUtil.sleepSeconds(statusCheckFrequency);
response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult();
validateInstances(response, expectedInstances);

}

/**
* Validating expected response with actual response.
* @param response SchedulableEntityInstanceResult response
* @param expectedInstances List of expected instances
*/
private static void validateInstances(SchedulableEntityInstanceResult response,
List<SchedulableEntityInstance> expectedInstances) {

List<SchedulableEntityInstance> actualInstances = Arrays.asList(response.getInstances());

for (SchedulableEntityInstance instance : actualInstances) {
instance.setTags("Missed SLA High");
}

Collections.sort(expectedInstances, DEPENDENCY_COMPARATOR);
Collections.sort(actualInstances, DEPENDENCY_COMPARATOR);

Assert.assertEquals(actualInstances, expectedInstances, "Instances mismatch for");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.*;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.log4j.Logger;
import org.apache.oozie.client.OozieClient;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand Down