diff --git a/appConf/logback.xml b/appConf/logback.xml index 77f83d0..bdf18bc 100644 --- a/appConf/logback.xml +++ b/appConf/logback.xml @@ -3,6 +3,11 @@ + LOG %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n diff --git a/build.sbt b/build.sbt index ee96bea..0d9d7b4 100644 --- a/build.sbt +++ b/build.sbt @@ -187,6 +187,8 @@ libraryDependencies in ThisBuild ++= { /*http*/ //"org.apache.httpcomponents" % "httpcore" % "4.1.2", "org.apache.httpcomponents" % "httpcore" % "4.4.15", + "org.apache.httpcomponents.core5" % "httpcore5" % "5.1.1", + "org.apache.httpcomponents.client5" % "httpclient5" % "5.1.2", /*memcached*/ //"spy" % "spymemcached" % "2.6", "net.spy" % "spymemcached" % "2.12.1", diff --git a/src/main/scala/metl/dependencies/utils/AsyncHttp.scala b/src/main/scala/metl/dependencies/utils/AsyncHttp.scala new file mode 100644 index 0000000..3bb7a52 --- /dev/null +++ b/src/main/scala/metl/dependencies/utils/AsyncHttp.scala @@ -0,0 +1,513 @@ +package com.metl.utils + +import java.util.concurrent.{Future,TimeUnit} + +import org.apache.hc.client5.http.async.methods.{ SimpleHttpRequest, SimpleHttpResponse, SimpleRequestBuilder, SimpleRequestProducer, SimpleResponseConsumer } +import org.apache.hc.client5.http.impl.async.{ CloseableHttpAsyncClient, HttpAsyncClients } +import org.apache.hc.client5.http.config.{RequestConfig} +import org.apache.hc.core5.concurrent.FutureCallback +import org.apache.hc.core5.http.{HttpHost,Header,ContentType} +import org.apache.hc.core5.http.message.{StatusLine,BasicHeader} +import org.apache.hc.core5.io.CloseMode +import org.apache.hc.core5.reactor.IOReactorConfig +import org.apache.hc.core5.util.Timeout +import org.apache.hc.core5.http2.HttpVersionPolicy + +import java.net.URI +import org.apache.commons.io.IOUtils + +import java.util.Date +import net.liftweb.common.Logger +import net.liftweb.util.Helpers._ + +import org.apache.http.conn.ManagedClientConnection + +trait IMeTLAsyncHttpClient { + def start:Unit = {} + def stop:Unit = {} + def addAuthorization(domain: String, username: String, password: String): Unit + + def get(uri: String,callback:String=>Unit): Unit = get(uri, List.empty[(String, String)],callback) + def get(uri: String, additionalHeaders: List[(String, String)],callback:String=>Unit): Unit + def getExpectingHTTPResponse( + uri: String, + additionalHeaders: List[(String, String)] = List.empty[(String, String)], + retriesSoFar: Int = 0, + redirectsSoFar: Int = 0, + exceptions: List[Throwable] = List.empty[Throwable], + startTime: Long = new Date().getTime, + callback: HTTPResponse => Unit):Unit + + def getAsString(uri: String,callback:String => Unit):Unit = + getAsString(uri, List.empty[(String, String)],callback) + def getAsString(uri: String, + additionalHeaders: List[(String, String)],callback:String => Unit): Unit + + def getAsBytes(uri: String,callback:Array[Byte]=>Unit):Unit = + getAsBytes(uri, List.empty[(String, String)],callback) + def getAsBytes(uri: String, + additionalHeaders: List[(String, String)],callback: Array[Byte]=>Unit):Unit + + def postBytes(uri: String, bytes: Array[Byte],callback:Array[Byte]=>Unit): Unit = + postBytes(uri, bytes, List.empty[(String, String)],callback) + def postBytes(uri: String, + bytes: Array[Byte], + additionalHeaders: List[(String, String)],callback: Array[Byte]=>Unit):Unit + def postBytesExpectingHTTPResponse( + uri: String, + bytes: Array[Byte], + additionalHeaders: List[(String, String)],callback: HTTPResponse=>Unit):Unit + + def postForm(uri: String, postItemList: List[(String, String)],callback: Array[Byte]=>Unit):Unit = + postForm(uri, postItemList, List.empty[(String, String)],callback) + def postForm(uri: String, + postItemList: List[(String, String)], + additionalHeaders: List[(String, String)], callback: Array[Byte]=>Unit):Unit + def postFormExpectingHTTPResponse( + uri: String, + postItemList: List[(String, String)], + additionalHeaders: List[(String, String)],callback: HTTPResponse=>Unit):Unit + + def postUnencodedForm(uri: String, + postItemList: List[(String, String)],callback: Array[Byte]=>Unit):Unit = + postUnencodedForm(uri, postItemList, List.empty[(String, String)],callback) + def postUnencodedForm(uri: String, + postItemList: List[(String, String)], + additionalHeaders: List[(String, String)],callback: Array[Byte]=>Unit):Unit + def postUnencodedFormExpectingHttpResponse( + uri: String, + postItemList: List[(String, String)], + additionalHeaders: List[(String, String)],callback: HTTPResponse=>Unit):Unit + + def setCookies(cookies: Map[String, Header]): Unit + def getCookies: Map[String, Header] + + def setHttpHeaders(headers: List[Header]): Unit + def getHttpHeaders: List[Header] +} + +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder +import org.apache.hc.core5.http.ssl.TLS +import org.apache.hc.core5.pool.PoolConcurrencyPolicy +import org.apache.hc.core5.pool.PoolReusePolicy + +object AsyncHelpers { + protected class TrustAllHosts extends javax.net.ssl.HostnameVerifier { + override def verify(_host: String, _session: javax.net.ssl.SSLSession) = true + } + lazy val verifyingTlsStrategy = ClientTlsStrategyBuilder.create() + .setTlsVersions(TLS.V_1_3, TLS.V_1_2) + .build() + lazy val trustingTlsStrategy = ClientTlsStrategyBuilder.create() + .setHostnameVerifier(new TrustAllHosts) + .setSslContext(org.apache.hc.core5.ssl.SSLContexts.custom() + .loadTrustMaterial(new org.apache.hc.client5.http.ssl.TrustAllStrategy) + .build() + ) + .setTlsVersions(TLS.V_1_3, TLS.V_1_2) + .build() +} + +class CleanAsyncHttpClient(checkCerts:Boolean = false) + extends IMeTLAsyncHttpClient + with Logger { + protected val connectionTimeout = 120 + //protected val connectionTimeout = 30 + protected val keepAliveTimeout = 120 + protected val readTimeout = 240000 + protected val maxRedirects = 20 + protected val maxRetries = 2 + protected val enableHttp2 = false + + + protected val connMgr = { + PoolingAsyncClientConnectionManagerBuilder.create() + .setTlsStrategy(checkCerts match { + case true => AsyncHelpers.verifyingTlsStrategy + case false => AsyncHelpers.trustingTlsStrategy + }) + .setPoolConcurrencyPolicy(PoolConcurrencyPolicy.STRICT) + .setConnPoolPolicy(PoolReusePolicy.LIFO) + .setConnectionTimeToLive(org.apache.hc.core5.util.TimeValue.ofMinutes(1L)) + .build(); + } + protected val client = { + HttpAsyncClients.custom() + .setConnectionManager(connMgr) + .setIOReactorConfig(IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(connectionTimeout)) + .build() + ) + .evictExpiredConnections() + .setVersionPolicy(enableHttp2 match { + case false => HttpVersionPolicy.FORCE_HTTP_1 + case true => HttpVersionPolicy.NEGOTIATE + }) + .evictIdleConnections(org.apache.hc.core5.util.TimeValue.ofMinutes(5L)) + .build() + } + override def start:Unit = { + client.start() + } + override def stop:Unit = { + client.close(CloseMode.IMMEDIATE) + } + private var authorizations: Map[String, (String, String)] = Map + .empty[String, (String, String)] + .withDefault((location) => ("anonymous", "unauthorized")) + protected var cookies = Map.empty[String, Header] + protected var httpHeaders = { + Array[Header]() + } + protected val noAction = (mcc:ManagedClientConnection,s:String,s2:String) => {} + protected val reqConfig = RequestConfig.custom() + .setConnectTimeout(connectionTimeout,TimeUnit.SECONDS) + .setDefaultKeepAlive(keepAliveTimeout,TimeUnit.SECONDS) + .setHardCancellationEnabled(true) +// .setMaxRedirects(maxRedirects) +// .setRedirectsEnabled(true) + .setRedirectsEnabled(false) + .setResponseTimeout(readTimeout,TimeUnit.MILLISECONDS) +// .setCircularRedirectsAllowed(true) + .build() + override def setCookies(cook: Map[String, Header]): Unit = cookies = cook + override def getCookies: Map[String, Header] = cookies + + def addHttpHeader(name: String, value: String): Unit = + setHttpHeaders(getHttpHeaders ::: List(new BasicHeader(name, value))) + override def setHttpHeaders(headers: List[Header]): Unit = + httpHeaders = headers.toArray + override def getHttpHeaders: List[Header] = httpHeaders.toList + + override def addAuthorization(domain: String, + username: String, + password: String): Unit = { + authorizations = authorizations.updated(domain, (username, password)) + } + + protected def executeHttpCall( + method:String, + uri:String, + headers:List[Tuple2[String,String]], + body:Option[Either[Array[Byte],List[Tuple2[String,String]]]], + callback:HTTPResponse=>Unit):Unit = { + doExecuteHttpCall(method,uri,headers,body,callback,0,0) + } + protected def defaultHeaders(uri:String):List[Tuple2[String,String]] = { + val u = new URI(uri) + val port = u.getPort + val host = u.getHost + val blacklist = List(80,443,-1) + List( + ("Host",blacklist.contains(port) match { + case true => host + case false => "%s:%s".format(host,port.toString) + }) + ) + } + protected def doExecuteHttpCall( + method:String, + uri:String, + headers:List[Tuple2[String,String]], + body:Option[Either[Array[Byte],List[Tuple2[String,String]]]], + callback:HTTPResponse=>Unit, + retryNumber:Int = 0, + redirectNumber:Int = 0, + exceptionsSoFar: List[Throwable] = List.empty[Throwable], + start: Long = new Date().getTime + ):Unit = { + try { + if ((maxRedirects > 0) && (redirectNumber > maxRedirects || exceptionsSoFar + .filter(e => e.isInstanceOf[RedirectException]) + .length > maxRedirects)) { + throw new RedirectException( + "exceeded configured maximum number of redirects (%s) when requesting: %s" + .format(maxRedirects, uri), + exceptionsSoFar) + } + if ((maxRetries > 0) && (retryNumber > maxRetries)) { + throw new RetryException( + "exceed maximum number of retries (%s) when requesting: %s" + .format(maxRetries, uri), + exceptionsSoFar) + } + val initReq = (method.trim.toLowerCase match { + case "get" => SimpleRequestBuilder.get() + case "post" => SimpleRequestBuilder.post() + case "put" => SimpleRequestBuilder.put() + case "patch" => SimpleRequestBuilder.patch() + case "delete" => SimpleRequestBuilder.delete() + case "head" => SimpleRequestBuilder.head() + case "trace" => SimpleRequestBuilder.trace() + case "options" => SimpleRequestBuilder.options() + }).setUri(uri).setHeaders((getHttpHeaders ::: ((defaultHeaders(uri) ::: headers).map(h => { + new BasicHeader(h._1,h._2) + }))):_*).setRequestConfig(reqConfig) + val req = body.map{ + case Left(bytes) => { + val contentType = headers.find(_._1.toLowerCase.trim == "content-type").map(h => ContentType.parse(h._2)).getOrElse(ContentType.APPLICATION_OCTET_STREAM) + initReq.setBody(bytes,contentType)/*.setHeader("Content-Type",contentType.toString).setHeader("Content-Length",bytes.length.toString)*/ + } + case Right(formItems) => { + val contentType = ContentType.parse("application/x-www-form-urlencoded") //ContentType.APPLICATION_FORM_URLENCODED + val form = formItems.map(fi => { + "%s=%s".format(urlEncode(fi._1),urlEncode(fi._2)) + }).mkString("&") + initReq.setBody(form,contentType)/*.setHeader("Content-Type",contentType.toString).setHeader("Content-Length",form.length.toString)*/ + } + }.getOrElse(initReq) + val request = req.build() + val futureResp = client.execute( + SimpleRequestProducer.create(request), + SimpleResponseConsumer.create(), + new FutureCallback[SimpleHttpResponse](){ + override def completed(resp:SimpleHttpResponse) = { + val hResp = HTTPResponse( + uri, + noAction, + resp.getBodyBytes(), + resp.getCode(), + Map(resp.getHeaders().toList.map(h => { + Tuple2(h.getName,h.getValue) + }):_*), + start, + new Date().getTime() + ) + respondToResponse(hResp,Nil,(hr2:HTTPResponse) => { + callback(hr2) + }) + } + override def failed(ex:Exception) = { + throw ex + } + override def cancelled() = { + throw new Exception("cancelled") + } + } + ) + futureResp.get + } catch { + case ex: RetryException => + throw new RetryException(ex.getMessage, ex.exceptions) + case ex: RedirectException => + throw new RedirectException(ex.getMessage, ex.exceptions) + case ex: Throwable => throw ex + } + } + + override def postBytes(uri: String, + bytes: Array[Byte], + additionalHeaders: List[(String, String)] = + List.empty[(String, String)], callback:Array[Byte] => Unit): Unit = { + postBytesExpectingHTTPResponse(uri, bytes, additionalHeaders,(hr:HTTPResponse) => { + respondToResponse(hr,additionalHeaders,(hr2:HTTPResponse) => { + callback(hr2.bytes) + }) + }) + } + override def postBytesExpectingHTTPResponse( + uri: String, + bytes: Array[Byte], + additionalHeaders: List[(String, String)] = List.empty[(String, String)], + callback:HTTPResponse=>Unit) + : Unit = { + executeHttpCall("post",uri, additionalHeaders, Some(Left(bytes)), callback) + } + override def postForm(uri: String, + postItemList: List[(String, String)], + additionalHeaders: List[(String, String)] = + List.empty[(String, String)], + callback: Array[Byte]=>Unit):Unit = { + postFormExpectingHTTPResponse(uri, postItemList, additionalHeaders,(hr:HTTPResponse) => { + respondToResponse(hr,additionalHeaders,(hr2:HTTPResponse) => { + callback(hr2.bytes) + }) + }) + } + override def postFormExpectingHTTPResponse( + uri: String, + postItemList: List[(String, String)], + additionalHeaders: List[(String, String)] = List.empty[(String, String)], + callback:HTTPResponse=>Unit):Unit = { + executeHttpCall("post", uri, additionalHeaders, Some(Right(postItemList)), callback) + } + override def postUnencodedForm(uri: String, + postItemList: List[(String, String)], + additionalHeaders: List[(String, String)] = + List.empty[(String, String)], + callback: Array[Byte]=>Unit):Unit = { + postUnencodedFormExpectingHttpResponse(uri, postItemList, additionalHeaders,(hr:HTTPResponse) => { + respondToResponse(hr,additionalHeaders,(hr2:HTTPResponse) => { + callback(hr2.bytes) + }) + }) + } + override def postUnencodedFormExpectingHttpResponse( + uri: String, + postItemList: List[(String, String)], + additionalHeaders: List[(String, String)] = List.empty[(String, String)], + callback:HTTPResponse=>Unit) + : Unit = { + val postForm = postItemList + .map(postItem => postItem._1 + "=" + postItem._2) + .mkString("&") + val bytes = postForm.getBytes("UTF-8") + executeHttpCall("post",uri, additionalHeaders ::: List( + Tuple2("Content-Type", """application/x-www-form-urlencoded"""), + Tuple2("Content-Length",bytes.length.toString) + ),Some(Left(bytes)), callback) + } + override def get(uri: String, + additionalHeaders: List[(String, String)] = + List.empty[(String, String)], + callback:String=>Unit): Unit = { + getAsString(uri, additionalHeaders,callback) + } + override def getAsString(uri: String, + additionalHeaders: List[(String, String)] = + List.empty[(String, String)], + callback:String => Unit): Unit = { + getAsBytes(uri,additionalHeaders,(hr:Array[Byte]) => { + callback(IOUtils.toString(hr)) + }) + } + override def getAsBytes(uri: String, + additionalHeaders: List[(String, String)] = + List.empty[(String, String)],callback: Array[Byte]=>Unit):Unit = { + getExpectingHTTPResponse(uri, additionalHeaders,0,0,Nil,new Date().getTime(),(hr:HTTPResponse) => { + respondToResponse(hr,additionalHeaders,(hr2:HTTPResponse) => { + callback(hr2.bytes) + }) + }) + } + override def getExpectingHTTPResponse( + uri: String, + additionalHeaders: List[(String, String)] = List.empty[(String, String)], + retriesSoFar: Int = 0, + redirectsSoFar: Int = 0, + exceptions: List[Throwable] = List.empty[Throwable], + startTime: Long = new Date().getTime, + callback:HTTPResponse => Unit): Unit = { + doExecuteHttpCall("get",uri,additionalHeaders,None,callback,retriesSoFar,redirectsSoFar,exceptions,startTime) + } + def respondToResponse(response: HTTPResponse, + additionalHeaders: List[(String, String)] = List.empty[(String, String)], + callback:HTTPResponse=>Unit):Unit = { + val uri = response.requestUrl + val tempOutput = response.bytes + response.statusCode match { + case 200 => callback(response) + case 300 | 301 | 302 | 303 => { + val newLoc = response.headers("Location") + val newLocUri = new URI(newLoc) + val oldLoc = new URI(uri) + val newLocString = if (newLocUri.getHost == null) { + oldLoc.resolve(newLocUri).toString + } else { + newLoc + } + getExpectingHTTPResponse( + newLocString, + additionalHeaders, + response.numberOfRetries, + response.numberOfRedirects + 1, + response.exceptions ::: List( + new RedirectException( + "healthy redirect from %s to %s".format(uri, newLocString), + response.exceptions)), + response.startMilis, + callback + ) + } + case 307 => { + val newLoc = response.headers("Location") + val newLocUri = new URI(newLoc) + val oldLoc = new URI(uri) + val newLocString = if (newLocUri.getHost == null) { + oldLoc.resolve(newLocUri).toString + } else { + newLoc + } + getExpectingHTTPResponse( + newLocString, + additionalHeaders, + response.numberOfRetries, + response.numberOfRedirects + 1, + response.exceptions ::: List( + new RedirectException( + "healthy redirect from %s to %s".format(uri, newLocString), + response.exceptions)), + response.startMilis, + callback + ) + } + /* + case 400 => + throw new WebException( + "bad request sent to %s: %s".format(uri, tempOutput), + 400, + uri) + case 401 => + throw new WebException( + "access to object at %s requires authentication".format(uri), + 401, + uri) + case 403 => + throw new WebException("access forbidden to object at %s".format(uri), + 403, + uri) + case 404 => + throw new WebException("object not found at %s".format(uri), 404, uri) + case 500 => + throw new WebException("server error encountered at %s: %s" + .format(uri, response.responseAsString), + 500, + uri) + case other => + throw new WebException( + "http status code (%s) not yet implemented, returned from %s" + .format(other, uri), + other, + uri) + */ + case other => callback(response) + } + } +} + +object AsyncHttp { + protected val checkCerts = false + def getClient = Stopwatch.time("AsyncHttp.getClient", { + new CleanAsyncHttpClient(checkCerts) + }) + def getAuthedClient(username: String, + password: String, + domain: String = "*") = { + Stopwatch.time("AsyncHttp.getAuthedClient", { + val client = new CleanAsyncHttpClient(checkCerts) + client.addAuthorization(domain, username, password) + client + }) + } + def cloneClient(incoming: CleanAsyncHttpClient): CleanAsyncHttpClient = { + Stopwatch.time( "AsyncHttp.cloneClient", { + val client = new CleanAsyncHttpClient(checkCerts) + client.setCookies(incoming.getCookies) + client.setHttpHeaders(incoming.getHttpHeaders) + client + }) + } + def getClient(headers: List[(String, String)]): CleanAsyncHttpClient = { + Stopwatch.time( "AsyncHttp.getClient(headers)", { + val newHeaders = headers.map(tup => new BasicHeader(tup._1, tup._2)).toList + val client = new CleanAsyncHttpClient(checkCerts) { + override val connectionTimeout = 3600 + override val keepAliveTimeout = 5400 + override val readTimeout = 7200000 + } + client.setHttpHeaders(newHeaders) + client + }) + } +} diff --git a/src/main/scala/metl/dependencies/utils/Http.scala b/src/main/scala/metl/dependencies/utils/Http.scala index 7bd9817..e2d7096 100644 --- a/src/main/scala/metl/dependencies/utils/Http.scala +++ b/src/main/scala/metl/dependencies/utils/Http.scala @@ -48,6 +48,8 @@ case class WebException(message: String, code: Int, path: String) extends Exception(message) {} trait IMeTLHttpClient { + def start:Unit = {} + def stop:Unit = {} def addAuthorization(domain: String, username: String, password: String): Unit def get(uri: String): String = get(uri, List.empty[(String, String)]) @@ -196,7 +198,7 @@ class CleanHttpClient(connMgr: ClientConnectionManager) uri: URI): Unit = { val port = determinePort(uri) val host = determineHost(uri) - val blacklist = List(80, 443) + val blacklist = List(80, 443, -1) if (blacklist.contains(port)) message.addHeader(new BasicHeader("Host", host)) else diff --git a/src/main/scala/metl/model/ServiceCheckConfigurator.scala b/src/main/scala/metl/model/ServiceCheckConfigurator.scala index 6810d22..b50318c 100644 --- a/src/main/scala/metl/model/ServiceCheckConfigurator.scala +++ b/src/main/scala/metl/model/ServiceCheckConfigurator.scala @@ -309,6 +309,63 @@ object ServiceCheckConfigurator extends ConfigFileReader with Logger { "password not specified") SvnSensor(metadata, host, username, password, period) } + case "sync_http" => { + val url = + getOrError(getText(sc, "url"), "", "url not specified") + val matcher: HTTPResponseMatcher = + HTTPResponseMatchers.configureFromXml( + {getNodes(sc,"thresholds")}) + val additionalHeaders = getNodes(sc, "header") + .map(c => { + (getText(c, "name").getOrElse(""), + getText(c, "value").getOrElse("")) + }) + .filter(h => { + h match { + case (n: String, v: String) + if n.length > 0 && v.length > 0 => + true + case _ => false + } + }) + .toList + SyncHttpSensor(metadata, url, additionalHeaders, matcher, period) + } + case "sync_http_with_credentials" => { + val url = + getOrError(getText(sc, "url"), "", "url not specified") + val username = getOrError(getText(sc, "username"), + "", + "username not specified") + val password = getOrError(getText(sc, "password"), + "", + "password not specified") + val matcher: HTTPResponseMatcher = + HTTPResponseMatchers.configureFromXml( + {getNodes(sc,"thresholds")}) + val additionalHeaders = getNodes(sc, "header") + .map(c => { + (getText(c, "name").getOrElse(""), + getText(c, "value").getOrElse("")) + }) + .filter(h => { + h match { + case (n: String, v: String) + if n.length > 0 && v.length > 0 => + true + case _ => false + } + }) + .toList + SyncHttpSensorWithBasicAuth(metadata, + url, + username, + password, + additionalHeaders, + matcher, + period) + } + case "http" => { val url = getOrError(getText(sc, "url"), "", "url not specified") diff --git a/src/main/scala/metl/model/ServiceChecks.scala b/src/main/scala/metl/model/ServiceChecks.scala index 995161e..3889e68 100644 --- a/src/main/scala/metl/model/ServiceChecks.scala +++ b/src/main/scala/metl/model/ServiceChecks.scala @@ -239,7 +239,6 @@ abstract class Sensor(metadata: SensorMetaData) success = false, duration = checkDuration) addCheckResult(cr, currentFailures >= failureTolerance) - } def calculateCheckDuration(timeTaken: Box[Double] = Empty): Double = { val now = new Date() @@ -327,21 +326,22 @@ abstract class Sensor(metadata: SensorMetaData) } private var isStopped = true def isRunning: Boolean = !isStopped - protected def performCheck = {} + protected def performCheck(after:()=>Unit) = {} private var isPerformingCheck = false - protected def privatePerformCheck = { + protected def privatePerformCheck(after:() => Unit) = { if (!isPerformingCheck) { isPerformingCheck = true lastCheckBegin = Full(new Date()) - performCheck - isPerformingCheck = false + performCheck(() => { + isPerformingCheck = false + after() + }) } } override def messageHandler = { case Check => { if (!isStopped) { - privatePerformCheck - schedule() + privatePerformCheck(() => schedule()) } } case StopSensor => { @@ -369,14 +369,17 @@ case class CheckUnexceptional(metadata: SensorMetaData, extends Sensor(metadata) { override val pollInterval = time def status = condition() - override def performCheck = succeed("Was expected") + override def performCheck(after:()=>Unit) = { + succeed("Was expected") + after() + } } case class CheckDoesnt(metadata: SensorMetaData, condition: Function0[Option[String]], time: TimeSpan = 5 seconds) extends Sensor(metadata) { override val pollInterval = 5 seconds - override def performCheck = { + override def performCheck(after:() => Unit) = { condition() match { case None => { succeed("Ok") @@ -384,6 +387,7 @@ case class CheckDoesnt(metadata: SensorMetaData, case Some(error) => throw new DashboardException("checkDoesn't failed", error) } + after() } } case class MatcherCheck(metadata: SensorMetaData, @@ -394,12 +398,16 @@ case class MatcherCheck(metadata: SensorMetaData, failureTolerance = 3 def status = "%s is %s".format(matcher.describe, matcher.verify(true).toString) - override def performCheck = succeed(status) + override def performCheck(after:() => Unit) = { + succeed(status) + after() + } } /* case class InvertedCheck(pinger:Pinger) extends Pinger(pinger.name,pinger.label,pinger.mode,pinger.severity) { - override def performCheck = { + override def performCheck(after:() => Unit) = { pinger.performCheck + after() } } */ diff --git a/src/main/scala/metl/model/sensor/DependencySensor.scala b/src/main/scala/metl/model/sensor/DependencySensor.scala index 030aa58..6bc0c22 100644 --- a/src/main/scala/metl/model/sensor/DependencySensor.scala +++ b/src/main/scala/metl/model/sensor/DependencySensor.scala @@ -37,5 +37,8 @@ case class DependencySensor( .mkString(" AND ") } } - override def performCheck = succeed(status) + override def performCheck(after:() => Unit) = { + succeed(status) + after() + } } diff --git a/src/main/scala/metl/model/sensor/HttpSensor.scala b/src/main/scala/metl/model/sensor/HttpSensor.scala index 5858452..0c9555a 100644 --- a/src/main/scala/metl/model/sensor/HttpSensor.scala +++ b/src/main/scala/metl/model/sensor/HttpSensor.scala @@ -1,6 +1,6 @@ package metl.model.sensor -import com.metl.utils.{HTTPResponse, Http} +import com.metl.utils.{HTTPResponse, Http, AsyncHttp} import metl.model._ import net.liftweb.common.Full import net.liftweb.util.Helpers._ @@ -150,11 +150,76 @@ case class HttpSensor( time: TimeSpan = 5 seconds) extends Sensor(metadata) { override val pollInterval = time + def getClient = AsyncHttp.getClient + var client = getClient + headers.foreach(h => client.addHttpHeader(h._1, h._2)) + override def resetEnvironment = { + client.stop + client = getClient + client.start + headers.foreach(h => client.addHttpHeader(h._1, h._2)) + } + override def performCheck(after:() => Unit) = { + client.getExpectingHTTPResponse(uri,Nil,0,0,Nil,new java.util.Date().getTime(),(response1:HTTPResponse) => { + client.respondToResponse(response1,Nil,(response:HTTPResponse) => { + val verificationResponse = matcher.verify(response) + if (!verificationResponse.success) { + throw new DashboardException("HTTP Verification failed", + verificationResponse.errors.mkString("\r\n")) + } + succeed(response.toString, Full(response.duration.toDouble)) + after() + }) + }) + } +} +case class HttpSensorWithBasicAuth( + metadata: SensorMetaData, + uri: String, + username: String, + password: String, + headers: List[Tuple2[String, String]] = List.empty[Tuple2[String, String]], + matcher: HTTPResponseMatcher = HTTPResponseMatchers.default, + time: TimeSpan = 5 seconds) + extends Sensor(metadata) { + override val pollInterval = time + def getClient = AsyncHttp.getAuthedClient(username, password) + var client = getClient + headers.foreach(h => client.addHttpHeader(h._1, h._2)) + override def resetEnvironment = { + client = getClient + headers.foreach(h => client.addHttpHeader(h._1, h._2)) + } + override def performCheck(after:() => Unit) = { + client.getExpectingHTTPResponse(uri,Nil,0,0,Nil,new java.util.Date().getTime(),(response1:HTTPResponse) => { + client.respondToResponse(response1,Nil,(response:HTTPResponse) => { + val verificationResponse = matcher.verify(response) + if (!verificationResponse.success) { + throw new DashboardException("HTTP Verification failed", + verificationResponse.errors.mkString("\r\n")) + } + succeed(response.toString, Full(response.duration.toDouble)) + after() + }) + }) + } +} + +case class SyncHttpSensor( + metadata: SensorMetaData, + uri: String, + headers: List[Tuple2[String, String]] = List.empty[Tuple2[String, String]], + matcher: HTTPResponseMatcher = HTTPResponseMatchers.default, + time: TimeSpan = 5 seconds) + extends Sensor(metadata) { + override val pollInterval = time def getClient = Http.getClient var client = getClient headers.foreach(h => client.addHttpHeader(h._1, h._2)) override def resetEnvironment = { + client.stop client = getClient + client.start headers.foreach(h => client.addHttpHeader(h._1, h._2)) } def status = { @@ -162,14 +227,18 @@ case class HttpSensor( client.respondToResponse(client.getExpectingHTTPResponse(uri)) val verificationResponse = matcher.verify(response) if (!verificationResponse.success) { - throw new DashboardException("HTTP Verification failed", + throw new DashboardException("HTTPwithAuth Verification failed", verificationResponse.errors.mkString("\r\n")) } (response, Full(response.duration.toDouble)) } - override def performCheck = succeed(status._1.toString, status._2) + override def performCheck(after:() => Unit) = { + val s = status + succeed(s._1.toString, s._2) + after() + } } -case class HttpSensorWithBasicAuth( +case class SyncHttpSensorWithBasicAuth( metadata: SensorMetaData, uri: String, username: String, @@ -196,5 +265,9 @@ case class HttpSensorWithBasicAuth( } (response, Full(response.duration.toDouble)) } - override def performCheck = succeed(status._1.toString, status._2) + override def performCheck(after:() => Unit) = { + val s = status + succeed(s._1.toString, s._2) + after() + } } diff --git a/src/main/scala/metl/model/sensor/LdapSensor.scala b/src/main/scala/metl/model/sensor/LdapSensor.scala index 928515e..11eadd6 100644 --- a/src/main/scala/metl/model/sensor/LdapSensor.scala +++ b/src/main/scala/metl/model/sensor/LdapSensor.scala @@ -60,5 +60,8 @@ class LdapSensor(metadata: SensorMetaData, ctx.close output } - override def performCheck = succeed(status.toString) + override def performCheck(after:() => Unit) = { + succeed(status.toString) + after() + } } diff --git a/src/main/scala/metl/model/sensor/MemCachedSensor.scala b/src/main/scala/metl/model/sensor/MemCachedSensor.scala index eb8aa01..f4e168d 100644 --- a/src/main/scala/metl/model/sensor/MemCachedSensor.scala +++ b/src/main/scala/metl/model/sensor/MemCachedSensor.scala @@ -35,5 +35,8 @@ case class PingMemCached(metadata: SensorMetaData, schedule() } }: PartialFunction[Throwable, Unit]) orElse super.exceptionHandler - override def performCheck = succeed(status.toString) + override def performCheck(after:() => Unit) = { + succeed(status.toString) + after() + } } diff --git a/src/main/scala/metl/model/sensor/MockSensor.scala b/src/main/scala/metl/model/sensor/MockSensor.scala index b8a6317..b28d78a 100644 --- a/src/main/scala/metl/model/sensor/MockSensor.scala +++ b/src/main/scala/metl/model/sensor/MockSensor.scala @@ -15,7 +15,10 @@ case class CountingSensor(metadata: SensorMetaData, time: TimeSpan = 5 seconds) count += 1 count.toString } - override def performCheck = succeed(status.toString) + override def performCheck(after:() => Unit) = { + succeed(status.toString) + after + } } case class WaitingSensor(metadata: SensorMetaData, @@ -29,5 +32,8 @@ case class WaitingSensor(metadata: SensorMetaData, Thread.sleep(waitTime) "waited %s".format(waitTime) } - override def performCheck = succeed(status.toString) + override def performCheck(after:() => Unit) = { + succeed(status.toString) + after() + } } diff --git a/src/main/scala/metl/model/sensor/MongoSensor.scala b/src/main/scala/metl/model/sensor/MongoSensor.scala index 27bbbde..9da8f07 100644 --- a/src/main/scala/metl/model/sensor/MongoSensor.scala +++ b/src/main/scala/metl/model/sensor/MongoSensor.scala @@ -23,5 +23,8 @@ case class PingMongo(metadata: SensorMetaData, mongo.close output } - override def performCheck: Unit = succeed(status.toString) + override def performCheck(after:() => Unit): Unit = { + succeed(status.toString) + after() + } } diff --git a/src/main/scala/metl/model/sensor/PingIcmpSensor.scala b/src/main/scala/metl/model/sensor/PingIcmpSensor.scala index 73af40f..de06e8e 100644 --- a/src/main/scala/metl/model/sensor/PingIcmpSensor.scala +++ b/src/main/scala/metl/model/sensor/PingIcmpSensor.scala @@ -99,12 +99,15 @@ case class PingICMPSensor(metadata: SensorMetaData, if (output.contains("cannot resolve") || output.contains("Unknown host") || output .contains("could not find host")) throw DashboardException("Ping failed", "Unknown host: " + output) - if (!(output.contains(" 0% packet loss") || output.contains("(0% loss)"))) + if (!(output.contains(" 0% packet loss") || output.contains("(0% loss)") || output.contains(" 0.0% packet loss") || output.contains("(0.0% loss)"))) throw DashboardException("Ping failed", "Packet loss recognised: " + output) val stringOutput = output.toString val timeTaken = pingTimeExtractor(stringOutput) (stringOutput, timeTaken) } - override def performCheck = succeed(status._1, status._2) + override def performCheck(after:() => Unit) = { + succeed(status._1, status._2) + after() + } } diff --git a/src/main/scala/metl/model/sensor/SambaSensor.scala b/src/main/scala/metl/model/sensor/SambaSensor.scala index 04e91dd..bf4d28f 100644 --- a/src/main/scala/metl/model/sensor/SambaSensor.scala +++ b/src/main/scala/metl/model/sensor/SambaSensor.scala @@ -47,5 +47,8 @@ case class SambaSensor(metadata: SensorMetaData, fileList.mkString(", ") } } - override def performCheck = succeed(status.toString) + override def performCheck(after:() => Unit) = { + succeed(status.toString) + after() + } } diff --git a/src/main/scala/metl/model/sensor/ScriptServiceCheckConfigurator.scala b/src/main/scala/metl/model/sensor/ScriptServiceCheckConfigurator.scala index 384a799..eb02a05 100644 --- a/src/main/scala/metl/model/sensor/ScriptServiceCheckConfigurator.scala +++ b/src/main/scala/metl/model/sensor/ScriptServiceCheckConfigurator.scala @@ -18,6 +18,7 @@ object FunctionalServiceCheck extends ConfigFileReader { (key, value) } }); + body = getNodes(mn, "body").headOption.map(_.text); headers = Map(getNodes(mn, "header").flatMap(hn => { for (key <- getAttr(hn, "key"); value <- getAttr(hn, "value")) yield { @@ -27,7 +28,29 @@ object FunctionalServiceCheck extends ConfigFileReader { matcher: HTTPResponseMatcher = HTTPResponseMatchers .configureFromXml( {getNodes(mn,"thresholds")})) - yield HttpFunctionalCheck(method, url, params, headers, matcher) + yield AsyncHttpFunctionalCheck(method, url, params, headers, body, matcher) + } + + case "syncHttp" => { + for (url <- getAttr(mn, "url"); + method <- getAttr(mn, "method"); + params = getNodes(mn, "parameter").flatMap(pn => { + for (key <- getAttr(pn, "key"); + value <- getAttr(pn, "value")) yield { + (key, value) + } + }); + body = getNodes(mn, "body").headOption.map(_.text); + headers = Map(getNodes(mn, "header").flatMap(hn => { + for (key <- getAttr(hn, "key"); + value <- getAttr(hn, "value")) yield { + (key, value) + } + }): _*); + matcher: HTTPResponseMatcher = HTTPResponseMatchers + .configureFromXml( + {getNodes(mn,"thresholds")})) + yield HttpFunctionalCheck(method, url, params, headers, body, matcher) } case "sql" => { for (driver <- getAttr(mn, "driver"); @@ -423,16 +446,13 @@ abstract class FunctionalServiceCheck extends Logger { see = Some(newSee) } protected def innerAct(previousResult: FunctionalCheckReturn, - interpolator: Interpolator): FunctionalCheckReturn + interpolator: Interpolator, + callback: Either[Throwable, FunctionalCheckReturn] => Unit):Unit def act( previousResult: FunctionalCheckReturn, - interpolator: Interpolator): Either[Exception, FunctionalCheckReturn] = - try { - trace( - "STEP: %s \r\n (env: %s)".format(this, - previousResult.updatedEnvironment)) - Right(innerAct(previousResult, interpolator)) - } catch { - case e: Exception => Left(e) - } + interpolator: Interpolator, + callback: Either[Throwable, FunctionalCheckReturn] => Unit):Unit = { + trace( "STEP: %s \r\n (env: %s)".format(this, previousResult.updatedEnvironment)) + innerAct(previousResult, interpolator,callback) + } } diff --git a/src/main/scala/metl/model/sensor/ScriptedSensor.scala b/src/main/scala/metl/model/sensor/ScriptedSensor.scala index c4dae23..a108512 100644 --- a/src/main/scala/metl/model/sensor/ScriptedSensor.scala +++ b/src/main/scala/metl/model/sensor/ScriptedSensor.scala @@ -7,7 +7,7 @@ import javax.naming.Context import javax.naming.directory.{InitialDirContext, SearchControls} import metl.model.GraphableData._ -import com.metl.utils.{CleanHttpClient, HTTPResponse, Http} +import com.metl.utils.{CleanHttpClient, HTTPResponse, Http, AsyncHttp, CleanAsyncHttpClient} import net.liftweb.common.{Box, Empty, Full, Logger} import net.liftweb.util.Helpers.{now, tryo} import net.liftweb.util.Helpers._ @@ -67,7 +67,8 @@ case class HttpAddBasicAuthorization(domain: String, password: String) extends FunctionalServiceCheck { override protected def innerAct(previousResult: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { see.foreach(s => { s.httpClient.addAuthorization( interpolator.interpolate(domain, previousResult.updatedEnvironment), @@ -75,7 +76,7 @@ case class HttpAddBasicAuthorization(domain: String, interpolator.interpolate(password, previousResult.updatedEnvironment) ) }) - previousResult + callback(Right(previousResult)) } } case class ICMPFunctionalCheck(uri: String, ipv6: Boolean = false) @@ -144,7 +145,8 @@ case class ICMPFunctionalCheck(uri: String, ipv6: Boolean = false) Empty } override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment @@ -164,14 +166,14 @@ case class ICMPFunctionalCheck(uri: String, ipv6: Boolean = false) } pingProcess.destroy if (output.length == 0) - throw DashboardException("Ping failed", - "ping command failed - no response from OS") + callback(Left(DashboardException("Ping failed", + "ping command failed - no response from OS"))) if (output.contains("cannot resolve") || output.contains("Unknown host") || output .contains("could not find host")) - throw DashboardException("Ping failed", "Unknown host: " + output) + callback(Left(DashboardException("Ping failed", "Unknown host: " + output))) if (!(output.contains(" 0% packet loss") || output.contains("(0% loss)"))) - throw DashboardException("Ping failed", - "Packet loss recognised: " + output) + callback(Left(DashboardException("Ping failed", + "Packet loss recognised: " + output))) val stringOutput = output.toString val timeTaken = pingTimeExtractor(stringOutput) val reportedTimeTaken: Double = timeTaken.openOr(0.0) @@ -182,11 +184,11 @@ case class ICMPFunctionalCheck(uri: String, ipv6: Boolean = false) "ipv6" -> ipv6, "timeTaken" -> reportedTimeTaken )) - FunctionalCheckReturn( + callback(Right(FunctionalCheckReturn( ScriptStepResult(body = stringOutput, duration = timeTaken.openOr(0)), totalDuration + reportedTimeTaken, environment, - newData :: fcr.data) + newData :: fcr.data))) } } @@ -220,7 +222,8 @@ case class JDBCFunctionalCheck( extends FunctionalServiceCheck { JDBCFunctionalCheckDriverInitializations.initialize(driver) override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment @@ -294,9 +297,9 @@ case class JDBCFunctionalCheck( } } if (errors.length == 1) { - throw errors.head + callback(Left(errors.head)) } else if (errors.length > 0) { - throw new CombinedExceptionsException(errors) + callback(Left(new CombinedExceptionsException(errors))) } else { val newData: Tuple2[Long, Map[String, GraphableDatum]] = (now.getTime, @@ -305,14 +308,14 @@ case class JDBCFunctionalCheck( tt => { ("timeTaken", GraphableDouble(tt)) }): _*)) - FunctionalCheckReturn( + callback(Right(FunctionalCheckReturn( result = ScriptStepResult(body = output.toString, duration = timeTaken.openOr(0)), duration = totalDuration + timeTaken.openOr(0.0), updatedEnvironment = environment, sqlResult = Some(output), data = newData :: fcr.data - ) + ))) } } } @@ -328,7 +331,8 @@ case class LdapFunctionalCheck(host: String, query: String) extends FunctionalServiceCheck { override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment @@ -392,7 +396,7 @@ case class LdapFunctionalCheck(host: String, "checkType" -> "ldap", "timeTaken" -> duration )) - FunctionalCheckReturn( + callback(Right(FunctionalCheckReturn( result = ScriptStepResult(output.toString, Map.empty[String, String], 0, @@ -401,77 +405,139 @@ case class LdapFunctionalCheck(host: String, updatedEnvironment = environment, ldapResults = Some(output), data = newData :: fcr.data - ) + ))) } } -case class HttpFunctionalCheck( +case class AsyncHttpFunctionalCheck( method: String, url: String, parameters: List[Tuple2[String, String]] = Nil, headers: Map[String, String] = Map.empty[String, String], + body: Option[String] = None, matcher: HTTPResponseMatcher = HTTPResponseMatchers.empty) extends FunctionalServiceCheck { override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - val client = see - .map(_.httpClient) - .getOrElse({ - throw new Exception("no available httpClient") - }) - val interpolatedHeaders = - headers.map(h => (h._1, interpolator.interpolate(h._2, environment))) - interpolatedHeaders.foreach(h => - client.addHttpHeader(h._1, interpolator.interpolate(h._2, environment))) - val interpolatedUrl = interpolator.interpolate(url, environment) - val interpolatedParameters = parameters.map( - p => - (interpolator.interpolate(p._1, environment), - interpolator.interpolate(p._2, environment))) - val innerResponse = method.trim.toLowerCase match { - case "get" => client.getExpectingHTTPResponse(interpolatedUrl) - case "post" => - client.postFormExpectingHTTPResponse(interpolatedUrl, - interpolatedParameters) - case unsupportedMethod => - throw new DashboardException( - "HTTP method not supported", - "%s [%s => %s] ([%s => %s],%s)".format(unsupportedMethod, - url, - interpolatedUrl, - parameters, - interpolatedParameters, - headers)) - } - val response = client.respondToResponse(innerResponse) - val verificationResponse = matcher.verify(response) - if (!verificationResponse.success) { - throw new DashboardException("HTTP Verification failed", - verificationResponse.errors.mkString("\r\n")) - } - val newData: Tuple2[Long, Map[String, GraphableDatum]] = - (now.getTime, - Map( - "checkType" -> "http", - "timeTaken" -> response.duration.toDouble, - "statusCode" -> response.statusCode - )) - FunctionalCheckReturn( - result = ScriptStepResult(response.responseAsString, - response.headers, - response.statusCode, - response.duration.toDouble), - duration = totalDuration + response.duration, - updatedEnvironment = environment, - httpResult = Some(response), - data = newData :: fcr.data - ) + see.map(_.asyncHttpClient) match { + case Some(client) => { + val interpolatedHeaders = + headers.map(h => (h._1, interpolator.interpolate(h._2, environment))) + interpolatedHeaders.foreach(h => + client.addHttpHeader(h._1, interpolator.interpolate(h._2, environment))) + val interpolatedUrl = interpolator.interpolate(url, environment) + val interpolatedParameters = parameters.map( + p => + (interpolator.interpolate(p._1, environment), + interpolator.interpolate(p._2, environment))) + val cb = (innerResponse:HTTPResponse) => { + client.respondToResponse(innerResponse,headers.toList,(response:HTTPResponse) => { + val verificationResponse = matcher.verify(response) + if (!verificationResponse.success) { + callback(Left(new DashboardException("HTTP Verification failed", + verificationResponse.errors.mkString("\r\n")))) + } else { + val newData: Tuple2[Long, Map[String, GraphableDatum]] = + (now.getTime, + Map( + "checkType" -> "http", + "timeTaken" -> response.duration.toDouble, + "statusCode" -> response.statusCode + )) + callback(Right(FunctionalCheckReturn( + result = ScriptStepResult(response.responseAsString, + response.headers, + response.statusCode, + response.duration.toDouble), + duration = totalDuration + response.duration, + updatedEnvironment = environment, + httpResult = Some(response), + data = newData :: fcr.data + ))) + } + }) + } + try { + method.trim.toLowerCase match { + case "get" => client.getExpectingHTTPResponse(interpolatedUrl,headers.toList,0,0,Nil,new Date().getTime(),cb) + case "post" => body.map(b => { + val interpolatedBody = interpolator.interpolate(b,environment) + client.postBytesExpectingHTTPResponse(interpolatedUrl,interpolatedBody.getBytes("UTF-8"),headers.toList,cb) + }).getOrElse({ + client.postFormExpectingHTTPResponse(interpolatedUrl,interpolatedParameters,headers.toList,cb) + }) + case unsupportedMethod => + callback(Left(new DashboardException( + "HTTP method not supported", + "%s [%s => %s] ([%s => %s],%s)".format(unsupportedMethod, + url, + interpolatedUrl, + parameters, + interpolatedParameters, + headers)))) + } + } catch { + case e:Exception => { + callback(Left(e)) + } + } + } + case None => callback(Left(new Exception("no available httpClient"))) + } } } +case class HttpFunctionalCheck(method:String,url:String,parameters:List[Tuple2[String,String]] = Nil,headers:Map[String,String] = Map.empty[String,String],body:Option[String],matcher:HTTPResponseMatcher = HTTPResponseMatchers.empty) extends FunctionalServiceCheck { + override protected def innerAct(fcr:FunctionalCheckReturn,interpolator:Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { + try { + val previousResult = fcr.result + val totalDuration = fcr.duration + val environment = fcr.updatedEnvironment + val client = see.map(_.httpClient).getOrElse({ + throw new Exception("no available httpClient") + }) + val interpolatedHeaders = headers.map(h => (h._1,interpolator.interpolate(h._2,environment))) + interpolatedHeaders.foreach(h => client.addHttpHeader(h._1,interpolator.interpolate(h._2,environment))) + val interpolatedUrl = interpolator.interpolate(url,environment) + val interpolatedParameters = parameters.map(p => (interpolator.interpolate(p._1,environment),interpolator.interpolate(p._2,environment))) + val innerResponse = method.trim.toLowerCase match { + case "get" => client.getExpectingHTTPResponse(interpolatedUrl) + case "post" => body.map(b => { + val interpolatedBody = interpolator.interpolate(b,environment) + client.postBytesExpectingHTTPResponse(interpolatedUrl,interpolatedBody.getBytes("UTF-8"),headers.toList) + }).getOrElse({ + client.postFormExpectingHTTPResponse(interpolatedUrl,interpolatedParameters,headers.toList) + }) + case unsupportedMethod => throw new DashboardException("HTTP method not supported","%s [%s => %s] ([%s => %s],%s)".format(unsupportedMethod,url,interpolatedUrl,parameters,interpolatedParameters,headers)) + } + val response = client.respondToResponse(innerResponse) + val verificationResponse = matcher.verify(response) + if (!verificationResponse.success){ + throw new DashboardException("HTTP Verification failed",verificationResponse.errors.mkString("\r\n")) + } + val newData:Tuple2[Long,Map[String,GraphableDatum]] = (now.getTime,Map( + "checkType" -> "http", + "timeTaken" -> response.duration.toDouble, + "statusCode" -> response.statusCode + )) + callback(Right(FunctionalCheckReturn( + result = ScriptStepResult(response.responseAsString,response.headers,response.statusCode,response.duration.toDouble), + duration = totalDuration + response.duration, + updatedEnvironment = environment, + httpResult = Some(response), + data = newData :: fcr.data + ))) + } catch { + case e:Exception => callback(Left(e)) + } + } +} + class TelnetFunctionalCheck[A](host: String, port: Int) extends FunctionalServiceCheck { protected val commandResponseTerminator: Option[String] = None @@ -511,7 +577,8 @@ class TelnetFunctionalCheck[A](host: String, port: Int) protected def convert(in: A): Map[String, GraphableDatum] = Map.empty[String, GraphableDatum] override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val now = new Date().getTime val tc = new TelnetClient() tc.connect(interpolator.interpolate(host, fcr.updatedEnvironment), port) @@ -527,7 +594,7 @@ class TelnetFunctionalCheck[A](host: String, port: Int) "timeTaken" -> GraphableDouble(duration), "statusCode" -> GraphableInt(200) ) ++ carrierData) - FunctionalCheckReturn( + callback(Right(FunctionalCheckReturn( result = ScriptStepResult(output._1.mkString("\r\n"), Map.empty[String, String], 200, @@ -535,7 +602,7 @@ class TelnetFunctionalCheck[A](host: String, port: Int) duration = fcr.duration + duration, updatedEnvironment = fcr.updatedEnvironment, data = newData :: fcr.data - ) + ))) } } @@ -682,7 +749,8 @@ case class JmxFunctionalCheck( import collection.JavaConverters._ override def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment @@ -800,7 +868,7 @@ case class JmxFunctionalCheck( "nonHeapMax" -> remoteMemory.nonHeap.max, "nonHeapUsed" -> remoteMemory.nonHeap.used )) - FunctionalCheckReturn( + callback(Right(FunctionalCheckReturn( result = ScriptStepResult(jmxResult.toString, Map.empty[String, String], 200, @@ -809,7 +877,7 @@ case class JmxFunctionalCheck( updatedEnvironment = environment, jmxResults = Some(jmxResult), data = newData :: fcr.data - ) + ))) } } @@ -852,16 +920,17 @@ abstract class JmxExtractingEnvironmentMutator extends FunctionalServiceCheck { environment: Map[String, String], interpolator: Interpolator): Map[String, String] override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - fcr.jmxResults + callback(Right(fcr.jmxResults .map( jmxResult => fcr.copy( updatedEnvironment = mutate(jmxResult, environment, interpolator))) - .getOrElse(fcr) + .getOrElse(fcr))) } } @@ -1037,15 +1106,16 @@ case class ResultValidator(description: String, validateResult: ScriptStepResult => Boolean) extends FunctionalServiceCheck { override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment if (validateResult(previousResult)) { - fcr + callback(Right(fcr)) } else { - throw new DashboardException("Result failed validation", - environment.toString) + callback(Left(new DashboardException("Result failed validation", + environment.toString))) } } } @@ -1055,15 +1125,16 @@ case class EnvironmentValidator( validateEnvironment: Map[String, String] => Boolean) extends FunctionalServiceCheck { override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment if (validateEnvironment(environment)) { - fcr + callback(Right(fcr)) } else { - throw new DashboardException("Environment failed validation", - environment.toString) + callback(Left(new DashboardException("Environment failed validation", + environment.toString))) } } } @@ -1072,22 +1143,24 @@ abstract class EnvironmentMutator extends FunctionalServiceCheck { environment: Map[String, String], interpolator: Interpolator): Map[String, String] override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - fcr.copy( - updatedEnvironment = mutate(previousResult, environment, interpolator)) + callback(Right(fcr.copy( + updatedEnvironment = mutate(previousResult, environment, interpolator)))) } } case class LastDataExtractor(key: String, dataAttribute: String) extends FunctionalServiceCheck { override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - fcr.data.headOption + callback(Right(fcr.data.headOption .flatMap(td => td._2.get(interpolator.interpolate(dataAttribute, environment))) .map(nv => { @@ -1096,17 +1169,18 @@ case class LastDataExtractor(key: String, dataAttribute: String) interpolator.interpolate(key, environment), interpolator.interpolate(nv.toString, environment))) }) - .getOrElse(fcr) + .getOrElse(fcr))) } } case class LatestDataExtractor(key: String, dataAttribute: String) extends FunctionalServiceCheck { override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - fcr.data + callback(Right(fcr.data .flatMap(td => td._2 .get(interpolator.interpolate(dataAttribute, fcr.updatedEnvironment)) @@ -1118,7 +1192,7 @@ case class LatestDataExtractor(key: String, dataAttribute: String) interpolator.interpolate(key, fcr.updatedEnvironment), interpolator.interpolate(nv._2.toString, fcr.updatedEnvironment))) }) - .getOrElse(fcr) + .getOrElse(fcr))) } } @@ -1127,16 +1201,17 @@ abstract class HttpExtractingEnvironmentMutator extends FunctionalServiceCheck { environment: Map[String, String], interpolator: Interpolator): Map[String, String] override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - fcr.httpResult + callback(Right(fcr.httpResult .map( httpResult => fcr.copy( updatedEnvironment = mutate(httpResult, environment, interpolator))) - .getOrElse(fcr) + .getOrElse(fcr))) } } @@ -1232,16 +1307,17 @@ abstract class SqlExtractingEnvironmentMutator environment: Map[String, String], interpolator: Interpolator): Map[String, String] override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - fcr.sqlResult + callback(Right(fcr.sqlResult .map( sqlResult => fcr.copy( updatedEnvironment = mutate(sqlResult, environment, interpolator))) - .getOrElse(fcr) + .getOrElse(fcr))) } } case class StoreSqlResultSet(key: String) @@ -1335,16 +1411,17 @@ abstract class LdapExtractingEnvironmentMutator environment: Map[String, String], interpolator: Interpolator): Map[String, String] override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - fcr.ldapResults + callback(Right(fcr.ldapResults .map( ldapResult => fcr.copy( updatedEnvironment = mutate(ldapResult, environment, interpolator))) - .getOrElse(fcr) + .getOrElse(fcr))) } } case class StoreLdapResults(key: String) @@ -1428,11 +1505,12 @@ abstract class ResultMutator extends FunctionalServiceCheck { environment: Map[String, String], interpolator: Interpolator): ScriptStepResult override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - fcr.copy(result = mutate(previousResult, environment, interpolator)) + callback(Right(fcr.copy(result = mutate(previousResult, environment, interpolator)))) } } @@ -1484,13 +1562,40 @@ case class StatusCodeStorer(key: String) extends EnvironmentMutator { } } + +case class Sequence(funcs:List[FunctionalServiceCheck]) extends FunctionalServiceCheck { + protected def doFuncs(s:Either[Throwable,FunctionalCheckReturn],interpolator:Interpolator,funcs:List[FunctionalServiceCheck],callback:Either[Throwable,FunctionalCheckReturn]=>Unit):Unit = { + s match { + case Left(e) => callback(s) + case Right(fcr) => { + funcs match { + case Nil => callback(s) + case head :: rest => { + head.act(fcr, interpolator, (ret:Either[Throwable,FunctionalCheckReturn]) => { + doFuncs(ret,interpolator, rest, callback) + }) + } + } + } + } + } + override protected def innerAct(fcr: FunctionalCheckReturn, + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { + doFuncs(Right(fcr),interpolator,funcs,callback) + } +} + case class Cond(key: String, value: String, thenFuncs: List[FunctionalServiceCheck], elseFuncs: List[FunctionalServiceCheck]) extends FunctionalServiceCheck { + protected lazy val tf = Sequence(thenFuncs) + protected lazy val ef = Sequence(elseFuncs) override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment @@ -1498,19 +1603,10 @@ case class Cond(key: String, if (environment .get(interpolator.interpolate(key, environment)) .exists(_ == interpolator.interpolate(value, environment))) { - thenFuncs.foreach(tf => { - state.right.toOption.map(s => { - state = tf.act(s, interpolator) - }) - }) + tf.act(fcr,interpolator, callback) } else { - elseFuncs.foreach(tf => { - state.right.toOption.map(s => { - state = tf.act(s, interpolator) - }) - }) + ef.act(fcr,interpolator, callback) } - state.left.map(e => throw e).right.toOption.get } } @@ -1518,24 +1614,34 @@ case class WhileLoop(key: String, value: String, funcs: List[FunctionalServiceCheck]) extends FunctionalServiceCheck { + protected def doWhile(s:Either[Throwable,FunctionalCheckReturn],check:Either[Throwable,FunctionalCheckReturn]=>Boolean,interpolator:Interpolator,callback:Either[Throwable,FunctionalCheckReturn]=>Unit):Unit = { + if (check(s)){ + s match { + case Left(e) => callback(s) + case Right(fcr) => { + funcs match { + case Nil => callback(s) + case other => { + Sequence(funcs).act(fcr,interpolator,(ret:Either[Throwable,FunctionalCheckReturn]) => { + doWhile(ret,check,interpolator,callback) + }) + } + } + } + } + } else { + callback(s) + } + } override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - var state: Either[Exception, FunctionalCheckReturn] = Right(fcr) - while (state.right.toOption.exists(s => - s.updatedEnvironment - .get(interpolator.interpolate(key, s.updatedEnvironment)) - .exists( - _ == interpolator.interpolate(value, s.updatedEnvironment)))) { - funcs.foreach(tf => { - state.right.toOption.map(s => { - state = tf.act(s, interpolator) - }) - }) - } - state.left.map(e => throw e).right.toOption.get + doWhile(Right(fcr),(state:Either[Throwable,FunctionalCheckReturn]) => { + state.right.toOption.exists(s => s.updatedEnvironment.get(interpolator.interpolate(key, s.updatedEnvironment)) .exists(_ == interpolator.interpolate(value, s.updatedEnvironment))) + },interpolator,callback) } } @@ -1545,39 +1651,55 @@ case class ForLoop(key: String, incrementing: Boolean, funcs: List[FunctionalServiceCheck]) extends FunctionalServiceCheck { + protected lazy val fs = Sequence(funcs) + protected def doWhile(s:Either[Throwable,FunctionalCheckReturn],interpolator:Interpolator,callback:Either[Throwable,FunctionalCheckReturn]=>Unit, current:Int):Unit = { + if ((incrementing && start < end) || ((!incrementing) && start > end)) { + s match { + case Left(e) => callback(s) + case Right(fcr) => { + funcs match { + case Nil => callback(s) + case _other => { + fs.act(fcr,interpolator,(ret:Either[Throwable,FunctionalCheckReturn]) => { + val newVal = incrementing match { + case true => current + 1 + case false => current - 1 + } + ret match { + case Right(nfcr) => { + val updatedRet = nfcr.copy(updatedEnvironment = nfcr.updatedEnvironment.updated( + interpolator.interpolate(key, nfcr.updatedEnvironment), + newVal.toString + )) + doWhile(Right(updatedRet),interpolator,callback,newVal) + } + case Left(e) => { + callback(ret) + } + } + }) + } + } + } + } + } else { + callback(s) + } + } + override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment - var counter = start - var state: Either[Exception, FunctionalCheckReturn] = Right( - fcr.copy(updatedEnvironment = environment.updated(key, counter.toString))) - if ((incrementing && start < end) || ((!incrementing) && start > end)) { - while (counter != end) { - state = state.right.map( - s => - s.copy( - updatedEnvironment = s.updatedEnvironment.updated( - interpolator.interpolate(key, s.updatedEnvironment), - counter.toString))) - funcs.foreach(tf => { - state.right.toOption.map(s => { - state = tf.act(s, interpolator) - }) - }) - if (incrementing) { - counter += 1 - } else { - counter -= 1 - } - } - } - state = state.right.map( - s => - s.copy(updatedEnvironment = s.updatedEnvironment - interpolator - .interpolate(key, s.updatedEnvironment))) - state.left.map(e => throw e).right.toOption.get + val state: Either[Exception, FunctionalCheckReturn] = Right( + fcr.copy(updatedEnvironment = environment.updated(key, start.toString))) + doWhile(state,interpolator,(ret:Either[Throwable,FunctionalCheckReturn]) => { + callback(ret.right.map(nfcr => { + nfcr.copy(updatedEnvironment = nfcr.updatedEnvironment - interpolator.interpolate(key, nfcr.updatedEnvironment)) + })) + },start) } } @@ -1585,33 +1707,49 @@ case class ForeachRegexFromResult(key: String, regex: String, funcs: List[FunctionalServiceCheck]) extends FunctionalServiceCheck { + protected val fs = Sequence(funcs) + protected def doForeach(s:Either[Throwable,FunctionalCheckReturn],items:List[String],interpolator:Interpolator,callback:Either[Throwable,FunctionalCheckReturn]=>Unit):Unit = { + s match { + case Left(e) => callback(s) + case Right(fcr) => { + items match { + case Nil => callback(s) + case m :: rest => { + val newValue = fcr.copy(updatedEnvironment = fcr.updatedEnvironment.updated(interpolator.interpolate(key, fcr.updatedEnvironment), m)) + fs.act(newValue,interpolator,(ret:Either[Throwable,FunctionalCheckReturn]) => { + ret match { + case Right(nfcr) => { + val newRet = nfcr.copy(updatedEnvironment = nfcr.updatedEnvironment - interpolator.interpolate(key, nfcr.updatedEnvironment)) + doForeach(Right(newRet),rest, interpolator,callback) + } + case Left(e) => { + callback(ret) + } + } + }) + } + } + } + } + } + override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment val Pattern = interpolator.interpolate(regex, environment).r.unanchored - var state: Either[Exception, FunctionalCheckReturn] = Right(fcr) previousResult.body match { case Pattern(matches @ _*) => { - matches.foreach(m => { - state = state.right.map(s => - s.copy(updatedEnvironment = s.updatedEnvironment - .updated(interpolator.interpolate(key, s.updatedEnvironment), m))) - funcs.foreach(tf => { - state.right.toOption.map(s => { - state = tf.act(s, interpolator) - }) - }) - }) - } - case _ => {} + doForeach(Right(fcr),matches.toList,interpolator,(nfcr:Either[Throwable,FunctionalCheckReturn]) => { + callback(nfcr.right.map(nfcrv => nfcrv.copy(updatedEnvironment = nfcrv.updatedEnvironment - interpolator.interpolate(key, nfcrv.updatedEnvironment)))) + }) + } + case _ => { + callback(Right(fcr)) + } } - state = state.right.map( - s => - s.copy(updatedEnvironment = s.updatedEnvironment - interpolator - .interpolate(key, s.updatedEnvironment))) - state.left.map(e => throw e).right.toOption.get } } @@ -1657,7 +1795,8 @@ case class RegexFromResult(key: String, regex: String) case class Delay(delay: Long, randomize: Boolean = false) extends FunctionalServiceCheck { override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment @@ -1668,7 +1807,7 @@ case class Delay(delay: Long, randomize: Boolean = false) case false => delay } Thread.sleep(amount) - fcr + callback(Right(fcr)) } } @@ -1676,9 +1815,35 @@ case class ForeachXPathFromResult(key: String, xPath: String, funcs: List[FunctionalServiceCheck]) extends FunctionalServiceCheck { + protected val fs = Sequence(funcs) + protected def doForeach(s:Either[Throwable,FunctionalCheckReturn],items:List[String],interpolator:Interpolator,callback:Either[Throwable,FunctionalCheckReturn]=>Unit):Unit = { + s match { + case Left(e) => callback(s) + case Right(fcr) => { + items match { + case Nil => callback(s) + case m :: rest => { + val newValue = fcr.copy(updatedEnvironment = fcr.updatedEnvironment.updated(interpolator.interpolate(key, fcr.updatedEnvironment), m)) + fs.act(newValue,interpolator,(ret:Either[Throwable,FunctionalCheckReturn]) => { + ret match { + case Right(nfcr) => { + val newRet = nfcr.copy(updatedEnvironment = nfcr.updatedEnvironment - interpolator.interpolate(key, nfcr.updatedEnvironment)) + doForeach(Right(newRet),rest, interpolator,callback) + } + case Left(e) => { + callback(ret) + } + } + }) + } + } + } + } + } import org.htmlcleaner._ override protected def innerAct(fcr: FunctionalCheckReturn, - interpolator: Interpolator) = { + interpolator: Interpolator, + callback:Either[Throwable,FunctionalCheckReturn] => Unit):Unit = { val previousResult = fcr.result val totalDuration = fcr.duration val environment = fcr.updatedEnvironment @@ -1688,22 +1853,15 @@ case class ForeachXPathFromResult(key: String, .evaluateXPath(interpolator.interpolate(xPath, environment)) .toList .map(_.toString) - matches.foreach(m => { - state = state.right.map( - s => - s.copy(updatedEnvironment = s.updatedEnvironment - .updated(interpolator.interpolate(key, s.updatedEnvironment), m))) - funcs.foreach(tf => { - state.right.toOption.map(s => { - state = tf.act(s, interpolator) - }) - }) - }) - state = state.right.map( - s => - s.copy(updatedEnvironment = s.updatedEnvironment - interpolator - .interpolate(key, s.updatedEnvironment))) - state.left.map(e => throw e).right.toOption.get + + matches match { + case Nil => callback(Right(fcr)) + case _other => { + doForeach(Right(fcr),matches.toList,interpolator,(nfcr:Either[Throwable,FunctionalCheckReturn]) => { + callback(nfcr.right.map(nfcrv => nfcrv.copy(updatedEnvironment = nfcrv.updatedEnvironment - interpolator.interpolate(key, nfcrv.updatedEnvironment)))) + }) + } + } } } @@ -1917,22 +2075,39 @@ case class EscapedCharKeyedStringInterpolator(startTag: String, class ScriptExecutionEnvironment { lazy val httpClient: CleanHttpClient = Http.getClient + lazy val asyncHttpClient: CleanAsyncHttpClient = AsyncHttp.getClient + def start = { + httpClient.start + asyncHttpClient.start + } + def stop = { + httpClient.stop + asyncHttpClient.stop + } } class ScriptEngine(interpolator: Interpolator) { - def execute(sequence: List[FunctionalServiceCheck]): FunctionalCheckReturn = { - val see = new ScriptExecutionEnvironment() - sequence.foldLeft(FunctionalCheckReturn.empty)((acc, i) => { - i.attachScriptExecutionEnvironment(see) - i.act(acc, interpolator) match { - case Left(e) => { - throw e - } - case Right(fcr) => { - fcr - } - } - }) + def execute(sequence: List[FunctionalServiceCheck],callback:Either[Throwable,FunctionalCheckReturn] => Unit, env:Option[ScriptExecutionEnvironment] = None,fcr:Option[FunctionalCheckReturn] = None): Unit = { + sequence match { + case Nil => callback(Right(fcr.getOrElse(FunctionalCheckReturn.empty))) + case head :: rest => { + val see = env.getOrElse(new ScriptExecutionEnvironment()) + see.start + val finalCallback = (finalFcr:Either[Throwable,FunctionalCheckReturn]) => { + see.stop + callback(finalFcr) + } + head.attachScriptExecutionEnvironment(see) + head.act(fcr.getOrElse(FunctionalCheckReturn.empty),interpolator,(v:Either[Throwable,FunctionalCheckReturn]) => v match { + case Left(e) => { + finalCallback(Left(e)) + } + case Right(fcr) => { + execute(rest,finalCallback,Some(see),Some(fcr)) + } + }) + } + } } } @@ -1943,12 +2118,21 @@ case class ScriptedSensor(metadata: SensorMetaData, extends Sensor(metadata) { override val pollInterval = time val scriptEngine = new ScriptEngine(interpolator) - def status = { - val fcr = scriptEngine.execute(sequence) - val finalResult = fcr.result - val totalDuration = fcr.duration - val data = fcr.data - (finalResult, Full(totalDuration), data) - } - override def performCheck = succeed(status._1.body, status._2, status._3) + override def performCheck(after:() => Unit) = { + scriptEngine.execute(sequence,(fcre:Either[Throwable,FunctionalCheckReturn]) => { + fcre match { + case Right(fcr) => { + val finalResult = fcr.result + val totalDuration = fcr.duration + val data = fcr.data + succeed(finalResult.body, Full(totalDuration), data) + after() + } + case Left(e) => { + fail(e.getMessage) + after() + } + } + }) + } } diff --git a/src/main/scala/metl/model/sensor/SqlSensor.scala b/src/main/scala/metl/model/sensor/SqlSensor.scala index 15ca286..3dd85d0 100644 --- a/src/main/scala/metl/model/sensor/SqlSensor.scala +++ b/src/main/scala/metl/model/sensor/SqlSensor.scala @@ -218,7 +218,7 @@ case class OracleSensor(metadata: SensorMetaData, override val pollInterval = time OracleSetup.initialize protected val connectionCreationTimeout = 10000L - override def performCheck = { + override def performCheck(after:() => Unit) = { var output = SQLResultSet(Map.empty[Int, SQLRow]) var errors = List.empty[Throwable] try { @@ -294,6 +294,7 @@ case class OracleSensor(metadata: SensorMetaData, } else if (errors.length > 0) { throw new CombinedExceptionsException(errors) } + after() } } @@ -362,7 +363,10 @@ case class MySQLSensor(metadata: SensorMetaData, } output } - override def performCheck = succeed(status.toString) + override def performCheck(after:() => Unit) = { + succeed(status.toString) + after() + } } case class SQLSensor(metadata: SensorMetaData, @@ -429,5 +433,8 @@ case class SQLSensor(metadata: SensorMetaData, } output } - override def performCheck = succeed(status.toString) + override def performCheck(after:() => Unit) = { + succeed(status.toString) + after() + } } diff --git a/src/main/scala/metl/model/sensor/SvnSensor.scala b/src/main/scala/metl/model/sensor/SvnSensor.scala index 1426e94..d82108d 100644 --- a/src/main/scala/metl/model/sensor/SvnSensor.scala +++ b/src/main/scala/metl/model/sensor/SvnSensor.scala @@ -27,5 +27,8 @@ case class SvnSensor(metadata: SensorMetaData, repo.closeSession logEntries.toString } - override def performCheck = succeed(status.toString) + override def performCheck(after:() => Unit) = { + succeed(status.toString) + after() + } } diff --git a/src/main/scala/metl/model/sensor/TelnetSensor.scala b/src/main/scala/metl/model/sensor/TelnetSensor.scala index 862680f..0072a6f 100644 --- a/src/main/scala/metl/model/sensor/TelnetSensor.scala +++ b/src/main/scala/metl/model/sensor/TelnetSensor.scala @@ -53,5 +53,8 @@ class TelnetSensor(metadata: SensorMetaData, tc.disconnect output.mkString("\n") } - override def performCheck = succeed(status) + override def performCheck(after:() => Unit) = { + succeed(status) + after() + } } diff --git a/src/main/scala/metl/model/sensor/XmppSensor.scala b/src/main/scala/metl/model/sensor/XmppSensor.scala index fc16010..475ea6f 100644 --- a/src/main/scala/metl/model/sensor/XmppSensor.scala +++ b/src/main/scala/metl/model/sensor/XmppSensor.scala @@ -80,7 +80,8 @@ case class XmppSensor(metadata: SensorMetaData, "Xmpp connection to %s didn't successfully connect".format(hostname)) } } - override def performCheck = { + override def performCheck(after:() => Unit) = { succeed(status.toString) + after() } }