From 4796b511fe1e0efdbdad7ab8a7f8a42345f1d0bd Mon Sep 17 00:00:00 2001 From: Francesco Di Chiara Date: Mon, 18 Dec 2023 10:24:05 +0100 Subject: [PATCH 1/5] Make jobCluster API calls return latest job in launched state instead of latest submitted (#518) * namedjobs return the last job in launched state instead of submitted * even for discovery return latest job in launched state * review comment --- .../java/io/mantisrx/client/MantisClient.java | 12 - .../master/client/MantisMasterClientApi.java | 3 +- .../mantisrx/master/IJobClustersManager.java | 1 + .../master/JobClustersManagerActor.java | 16 ++ .../JobClusterRouteHandlerAkkaImpl.java | 54 ++--- .../handlers/JobDiscoveryRouteHandler.java | 3 + .../JobDiscoveryRouteHandlerAkkaImpl.java | 105 ++++++--- .../api/akka/route/v0/JobDiscoveryRoute.java | 6 +- .../master/jobcluster/IJobClusterManager.java | 1 + .../master/jobcluster/JobClusterActor.java | 223 +++++------------- .../proto/JobClusterManagerProto.java | 30 +++ 11 files changed, 203 insertions(+), 251 deletions(-) diff --git a/mantis-client/src/main/java/io/mantisrx/client/MantisClient.java b/mantis-client/src/main/java/io/mantisrx/client/MantisClient.java index a61a2f220..5dbfa0b2a 100644 --- a/mantis-client/src/main/java/io/mantisrx/client/MantisClient.java +++ b/mantis-client/src/main/java/io/mantisrx/client/MantisClient.java @@ -146,18 +146,6 @@ public Observable> getSinkClientByJobName(final String jobName final AtomicReference lastJobIdRef = new AtomicReference<>(); return clientWrapper.getNamedJobsIds(jobName) .doOnUnsubscribe(() -> lastJobIdRef.set(null)) - // .lift(new Observable.Operator() { - // @Override - // public Subscriber call(Subscriber subscriber) { - // subscriber.add(Subscriptions.create(new Action0() { - // @Override - // public void call() { - // lastJobIdRef.set(null); - // } - // })); - // return subscriber; - // } - // }) .filter((String newJobId) -> { logger.info("Got job cluster's new jobId=" + newJobId); return newJobIdIsGreater(lastJobIdRef.get(), newJobId); diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java index 3e619aee0..5a0f2167e 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java @@ -757,8 +757,7 @@ public Observable namedJobInfo(final String jobName) { .timeout(3 * MASTER_SCHED_INFO_HEARTBEAT_INTERVAL_SECS, TimeUnit.SECONDS) .filter(namedJobInfo -> namedJobInfo != null - && !JobSchedulingInfo.HB_JobId.equals(namedJobInfo.getName())) - ; + && !JobSchedulingInfo.HB_JobId.equals(namedJobInfo.getName())); })) .repeatWhen(repeatLogic) .retryWhen(retryLogic) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/IJobClustersManager.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/IJobClustersManager.java index df6654a50..bc8475d3c 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/IJobClustersManager.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/IJobClustersManager.java @@ -78,6 +78,7 @@ public interface IJobClustersManager { // worker related messages void onGetLastSubmittedJobIdSubject(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request); + void onGetLastLaunchedJobIdSubject(JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request); void onWorkerEvent(WorkerEvent r); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java index 608f00e6e..6f4fcc9db 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java @@ -92,6 +92,8 @@ import io.mantisrx.master.jobcluster.job.JobState; import io.mantisrx.master.jobcluster.proto.BaseResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsRequest; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.ResubmitWorkerRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterArtifactRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterLabelsRequest; @@ -234,6 +236,7 @@ private Receive getInitializedBehavior() { .match(GetJobClusterRequest.class, this::onJobClusterGet) .match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted) .match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject) + .match(GetLastLaunchedJobIdStreamRequest.class, this::onGetLastLaunchedJobIdSubject) .match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers) // List Job Cluster related messages .match(ListJobClustersRequest.class, this::onJobClustersList) @@ -291,6 +294,7 @@ private Receive getInitializingBehavior() { .match(GetJobClusterRequest.class, (x) -> getSender().tell(new GetJobClusterResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf())) .match(ListCompletedJobsInClusterRequest.class, (x) -> logger.warn(genUnexpectedMsg(x.toString(), state))) .match(GetLastSubmittedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastSubmittedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf())) + .match(GetLastLaunchedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastLaunchedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf())) .match(ListArchivedWorkersRequest.class, (x) -> getSender().tell(new ListArchivedWorkersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf())) .match(ListJobClustersRequest.class, (x) -> getSender().tell(new ListJobClustersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf())) .match(ListJobsRequest.class, (x) -> getSender().tell(new ListJobsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf())) @@ -522,6 +526,18 @@ public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest r) sender.tell(new GetLastSubmittedJobIdStreamResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getClusterName(), empty()), getSelf()); } } + + @Override + public void onGetLastLaunchedJobIdSubject(GetLastLaunchedJobIdStreamRequest r) { + Optional jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(r.getClusterName()); + ActorRef sender = getSender(); + if (jobClusterInfo.isPresent()) { + jobClusterInfo.get().jobClusterActor.forward(r, getContext()); + } else { + sender.tell(new GetLastLaunchedJobIdStreamResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getClusterName(), empty()), getSelf()); + } + } + @Override public void onWorkerEvent(WorkerEvent workerEvent) { if(logger.isDebugEnabled()) { logger.debug("Entering JobClusterManagerActor:onWorkerEvent {}", workerEvent); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java index 44d977d5e..b90719a00 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java @@ -54,104 +54,90 @@ public JobClusterRouteHandlerAkkaImpl(ActorRef jobClusterManagerActor) { @Override public CompletionStage create(final JobClusterManagerProto.CreateJobClusterRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.CreateJobClusterResponse.class::cast); - return response; } @Override public CompletionStage update(JobClusterManagerProto.UpdateJobClusterRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.UpdateJobClusterResponse.class::cast); - return response; } @Override public CompletionStage delete(JobClusterManagerProto.DeleteJobClusterRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.DeleteJobClusterResponse.class::cast); - return response; } @Override public CompletionStage disable(JobClusterManagerProto.DisableJobClusterRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.DisableJobClusterResponse.class::cast); - return response; } @Override public CompletionStage enable(JobClusterManagerProto.EnableJobClusterRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.EnableJobClusterResponse.class::cast); - return response; } @Override public CompletionStage updateArtifact(JobClusterManagerProto.UpdateJobClusterArtifactRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.UpdateJobClusterArtifactResponse.class::cast); - return response; } @Override public CompletionStage updateSchedulingInfo(String clusterName, UpdateSchedulingInfoRequest request) { - CompletionStage response = - ask( - jobClustersManagerActor, - new UpdateSchedulingInfo(request.requestId, clusterName, request.getSchedulingInfo(), - request.getVersion()), - timeout) - .thenApply(UpdateSchedulingInfoResponse.class::cast); - return response; + return ask( + jobClustersManagerActor, + new UpdateSchedulingInfo(request.requestId, clusterName, request.getSchedulingInfo(), + request.getVersion()), + timeout) + .thenApply(UpdateSchedulingInfoResponse.class::cast); } @Override public CompletionStage updateSLA(JobClusterManagerProto.UpdateJobClusterSLARequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.UpdateJobClusterSLAResponse.class::cast); - return response; } @Override public CompletionStage updateWorkerMigrateStrategy(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse.class::cast); - return response; } @Override public CompletionStage updateLabels(JobClusterManagerProto.UpdateJobClusterLabelsRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.UpdateJobClusterLabelsResponse.class::cast); - return response; } @Override public CompletionStage submit(JobClusterManagerProto.SubmitJobRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.SubmitJobResponse.class::cast); - return response; } @Override public CompletionStage getJobClusterDetails(JobClusterManagerProto.GetJobClusterRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.GetJobClusterResponse.class::cast); - return response; } @Override public CompletionStage getAllJobClusters(JobClusterManagerProto.ListJobClustersRequest request) { allJobClustersGET.increment(); - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.ListJobClustersResponse.class::cast); - return response; } @Override public CompletionStage getLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest request) { - CompletionStage response = ask(jobClustersManagerActor, request, timeout) + return ask(jobClustersManagerActor, request, timeout) .thenApply(JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse.class::cast); - return response; } + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandler.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandler.java index 83c9d89b5..43dc30ce7 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandler.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandler.java @@ -27,4 +27,7 @@ CompletionStage schedulingInfoStream(final JobClusterManagerP final boolean sendHeartbeats); CompletionStage lastSubmittedJobIdStream(final JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request, final boolean sendHeartbeats); + + CompletionStage lastLaunchedJobIdStream(final JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request, + final boolean sendHeartbeats); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.java index e7b45555a..f625987dc 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.java @@ -27,9 +27,12 @@ import io.mantisrx.common.metrics.Metrics; import io.mantisrx.master.api.akka.route.proto.JobClusterInfo; import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto; +import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto.JobClusterInfoResponse; import io.mantisrx.master.jobcluster.proto.BaseResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoResponse; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse; import io.mantisrx.server.core.JobSchedulingInfo; @@ -58,9 +61,10 @@ public class JobDiscoveryRouteHandlerAkkaImpl implements JobDiscoveryRouteHandle private final Counter schedInfoStreamErrors; private final Counter lastSubmittedJobIdStreamErrors; - + private final Counter lastLaunchedJobIdStreamErrors; private final AsyncLoadingCache schedInfoCache; private final AsyncLoadingCache lastSubmittedJobIdStreamRespCache; + private final AsyncLoadingCache lastLaunchedJobIdStreamRespCache; public JobDiscoveryRouteHandlerAkkaImpl(ActorRef jobClustersManagerActor, Duration serverIdleTimeout) { this.jobClustersManagerActor = jobClustersManagerActor; @@ -77,13 +81,20 @@ public JobDiscoveryRouteHandlerAkkaImpl(ActorRef jobClustersManagerActor, Durati .maximumSize(500) .buildAsync(this::lastSubmittedJobId); + lastLaunchedJobIdStreamRespCache = Caffeine.newBuilder() + .expireAfterWrite(5, TimeUnit.SECONDS) + .maximumSize(500) + .buildAsync(this::lastLaunchedJobId); + Metrics m = new Metrics.Builder() .id("JobDiscoveryRouteHandlerAkkaImpl") .addCounter("schedInfoStreamErrors") .addCounter("lastSubmittedJobIdStreamErrors") + .addCounter("lastLaunchedJobIdStreamErrors") .build(); this.schedInfoStreamErrors = m.getCounter("schedInfoStreamErrors"); this.lastSubmittedJobIdStreamErrors = m.getCounter("lastSubmittedJobIdStreamErrors"); + this.lastLaunchedJobIdStreamErrors = m.getCounter("lastLaunchedJobIdStreamErrors"); } @@ -159,47 +170,77 @@ private CompletableFuture lastSubmittedJobI } @Override - public CompletionStage lastSubmittedJobIdStream(final GetLastSubmittedJobIdStreamRequest request, - final boolean sendHeartbeats) { - CompletionStage response = lastSubmittedJobIdStreamRespCache.get(request); + public CompletionStage lastSubmittedJobIdStream( + final GetLastSubmittedJobIdStreamRequest request, final boolean sendHeartbeats) { + try { - return response - .thenApply(lastSubmittedJobIdResp -> { - Optional> jobIdSubjectO = lastSubmittedJobIdResp.getjobIdBehaviorSubject(); - if (lastSubmittedJobIdResp.responseCode.equals(BaseResponse.ResponseCode.SUCCESS) && jobIdSubjectO.isPresent()) { - Observable jobClusterInfoObs = jobIdSubjectO.get().map(jobId -> new JobClusterInfo(jobId.getCluster(), jobId.getId())); + CompletionStage response = lastSubmittedJobIdStreamRespCache.get(request); + return response.thenApply(r -> streamJobIdBehaviorSubject(r, r.getjobIdBehaviorSubject(), sendHeartbeats, lastSubmittedJobIdStreamErrors)); + } catch (Exception e) { + logger.error("caught exception fetching lastSubmittedJobId stream for {}", request.getClusterName(), e); + lastSubmittedJobIdStreamErrors.increment(); + return CompletableFuture.completedFuture(new JobClusterInfoResponse( + 0, + BaseResponse.ResponseCode.SERVER_ERROR, + "Failed to get last submitted jobId stream for " + request.getClusterName() + " error: " + e.getMessage() + )); + } + } - Observable heartbeats = - Observable.interval(5, serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS) - .map(x -> JOB_CLUSTER_INFO_HB_INSTANCE) - .takeWhile(x -> sendHeartbeats == true); + private CompletableFuture lastLaunchedJobId(final GetLastLaunchedJobIdStreamRequest request, Executor executor) { + return ask(jobClustersManagerActor, request, askTimeout) + .thenApply(GetLastLaunchedJobIdStreamResponse.class::cast) + .toCompletableFuture(); + } - Observable jobClusterInfoWithHB = Observable.merge(jobClusterInfoObs, heartbeats); - return new JobDiscoveryRouteProto.JobClusterInfoResponse( - lastSubmittedJobIdResp.requestId, - lastSubmittedJobIdResp.responseCode, - lastSubmittedJobIdResp.message, - jobClusterInfoWithHB - ); - } else { - logger.info("Failed to get lastSubmittedJobId stream for job cluster {}", request.getClusterName()); - lastSubmittedJobIdStreamErrors.increment(); - return new JobDiscoveryRouteProto.JobClusterInfoResponse( - lastSubmittedJobIdResp.requestId, - lastSubmittedJobIdResp.responseCode, - lastSubmittedJobIdResp.message - ); - } - }); + @Override + public CompletionStage lastLaunchedJobIdStream( + final GetLastLaunchedJobIdStreamRequest request, final boolean sendHeartbeats) { + try { + CompletionStage response = lastLaunchedJobIdStreamRespCache.get(request); + return response.thenApply(r -> streamJobIdBehaviorSubject(r, r.getjobIdBehaviorSubject(), sendHeartbeats, lastLaunchedJobIdStreamErrors)); } catch (Exception e) { logger.error("caught exception fetching lastSubmittedJobId stream for {}", request.getClusterName(), e); - lastSubmittedJobIdStreamErrors.increment(); - return CompletableFuture.completedFuture(new JobDiscoveryRouteProto.JobClusterInfoResponse( + lastLaunchedJobIdStreamErrors.increment(); + return CompletableFuture.completedFuture(new JobClusterInfoResponse( 0, BaseResponse.ResponseCode.SERVER_ERROR, "Failed to get last submitted jobId stream for " + request.getClusterName() + " error: " + e.getMessage() )); } } + + /** + * + * @param response response from actor + * @param jobIdSubjectO BehaviorSubject that exposes latest jobId for a jobCluster in Accepted/Launched state depending on endpoint + */ + private JobClusterInfoResponse streamJobIdBehaviorSubject( + BaseResponse response, Optional> jobIdSubjectO, + boolean sendHeartbeats, Counter counter) { + if (response.responseCode.equals(BaseResponse.ResponseCode.SUCCESS) && jobIdSubjectO.isPresent()) { + Observable jobClusterInfoObs = jobIdSubjectO.get().map(jobId -> new JobClusterInfo(jobId.getCluster(), jobId.getId())); + + Observable heartbeats = + Observable.interval(5, serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS) + .map(x -> JOB_CLUSTER_INFO_HB_INSTANCE) + .takeWhile(x -> sendHeartbeats); + + Observable jobClusterInfoWithHB = Observable.merge(jobClusterInfoObs, heartbeats); + return new JobClusterInfoResponse( + response.requestId, + response.responseCode, + response.message, + jobClusterInfoWithHB + ); + } else { + counter.increment(); + return new JobClusterInfoResponse( + response.requestId, + response.responseCode, + response.message + ); + } + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java index b08a87231..b4ce6aebe 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java @@ -134,12 +134,12 @@ private Route getJobDiscoveryRoutes() { "/namedjobs/{} called", jobCluster); jobClusterInfoStreamGET.increment(); - JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest req = - new JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest( + JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest req = + new JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest( jobCluster); CompletionStage jobClusterInfoRespCS = - jobDiscoveryRouteHandler.lastSubmittedJobIdStream( + jobDiscoveryRouteHandler.lastLaunchedJobIdStream( req, sendHeartbeats.orElse(false)); return completeAsync( diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/IJobClusterManager.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/IJobClusterManager.java index f8a7a50a3..31ae44494 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/IJobClusterManager.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/IJobClusterManager.java @@ -88,6 +88,7 @@ public interface IJobClusterManager { void onGetJobStatusSubject(JobClusterManagerProto.GetJobSchedInfoRequest request); void onGetLastSubmittedJobIdSubject(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request); + void onGetLastLaunchedJobIdSubject(JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request); void onEnforceSLARequest(EnforceSLARequest request); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java index 1f7848909..384e0feaa 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java @@ -73,6 +73,8 @@ import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoResponse; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest; @@ -139,6 +141,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -150,6 +153,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -220,8 +224,8 @@ public static Props props( private final JobManager jobManager; private final MantisSchedulerFactory mantisSchedulerFactory; private final LifecycleEventPublisher eventPublisher; - private final BehaviorSubject jobIdSubmissionSubject; + private final BehaviorSubject jobIdLaunchedSubject; private final JobDefinitionResolver jobDefinitionResolver = new JobDefinitionResolver(); private final Metrics metrics; @@ -240,6 +244,7 @@ public JobClusterActor( this.jobManager = new JobManager(name, getContext(), mantisSchedulerFactory, eventPublisher, jobStore, costsCalculator); jobIdSubmissionSubject = BehaviorSubject.create(); + jobIdLaunchedSubject = BehaviorSubject.create(); initializedBehavior = buildInitializedBehavior(); disabledBehavior = buildDisabledBehavior(); @@ -422,6 +427,7 @@ private Receive buildDisabledBehavior() { .match(GetJobSchedInfoRequest.class, (x) -> getSender().tell(new GetJobSchedInfoResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(GetLatestJobDiscoveryInfoRequest.class, (x) -> getSender().tell(new GetLatestJobDiscoveryInfoResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(GetLastSubmittedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastSubmittedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) + .match(GetLastLaunchedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastLaunchedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(ListJobIdsRequest.class, (x) -> getSender().tell(new ListJobIdsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), new ArrayList()), getSelf())) .match(ListJobsRequest.class, (x) -> getSender().tell(new ListJobsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), new ArrayList()), getSelf())) .match(ListWorkersRequest.class, (x) -> getSender().tell(new ListWorkersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), new ArrayList()), getSelf())) @@ -516,6 +522,7 @@ private Receive buildInitialBehavior() { .match(GetJobSchedInfoRequest.class, (x) -> getSender().tell(new GetJobSchedInfoResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(GetLatestJobDiscoveryInfoRequest.class, (x) -> getSender().tell(new GetLatestJobDiscoveryInfoResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(GetLastSubmittedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastSubmittedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) + .match(GetLastLaunchedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastLaunchedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(ListJobIdsRequest.class, (x) -> getSender().tell(new ListJobIdsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), getSelf())) .match(ListJobsRequest.class, (x) -> getSender().tell(new ListJobsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), getSelf())) .match(ListWorkersRequest.class, (x) -> getSender().tell(new ListWorkersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), getSelf())) @@ -610,6 +617,7 @@ private Receive buildInitializedBehavior() { .match(JobProto.JobInitialized.class, this::onJobInitialized) .match(JobStartedEvent.class, this::onJobStarted) .match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject) + .match(GetLastLaunchedJobIdStreamRequest.class, this::onGetLastLaunchedJobIdSubject) .match(ScaleStageRequest.class, this::onScaleStage) // EXPECTED MESSAGES END // // EXPECTED MESSAGES BEGIN // @@ -687,9 +695,6 @@ private void setBookkeepingTimer(long checkAgainInSecs) { public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest initReq) { ActorRef sender = getSender(); logger.info("In onJobClusterInitialize {}", this.name); - if (logger.isDebugEnabled()) { - logger.debug("Init Request {}", initReq); - } jobClusterMetadata = new JobClusterMetadataImpl.Builder() .withLastJobCount(initReq.lastJobNumber) .withIsDisabled(initReq.isDisabled) @@ -850,11 +855,14 @@ private void initRunningJobs(JobClusterProto.InitializeJobClusterRequest initReq ,() -> { // Push the last jobId - if(initReq.jobList.size() > 0) { + if (initReq.jobList.size() > 0) { JobId lastJobId = new JobId(this.name, initReq.lastJobNumber); this.jobIdSubmissionSubject.onNext(lastJobId); } - + initReq.jobList.stream() + .filter(m -> m.getState() == JobState.Launched) + .max(Comparator.comparingLong(m -> m.getJobId().getJobNum())) + .ifPresent(m -> this.jobIdLaunchedSubject.onNext(m.getJobId())); setBookkeepingTimer(BOOKKEEPING_INTERVAL_SECS); @@ -940,7 +948,6 @@ public void onJobClusterDelete(final JobClusterProto.DeleteJobClusterRequest req @Override public void onJobIdList(final ListJobIdsRequest request) { - if(logger.isTraceEnabled()) { logger.trace("Entering JCA:onJobIdList"); } final ActorRef sender = getSender(); Set jobIdsFilteredByLabelsSet = new HashSet<>(); // If labels criterion is given prefilter by labels @@ -949,7 +956,6 @@ public void onJobIdList(final ListJobIdsRequest request) { // Found no matching jobs for given labels exit if(jobIdsFilteredByLabelsSet.isEmpty()) { sender.tell(new ListJobIdsResponse(request.requestId, SUCCESS, "No JobIds match given Label criterion", new ArrayList<>()), sender); - if(logger.isTraceEnabled()) { logger.trace("Exit JCA:onJobIdList"); } return; } } @@ -966,7 +972,6 @@ public void onJobIdList(final ListJobIdsRequest request) { } sender.tell(new ListJobIdsResponse(request.requestId, SUCCESS, "", jobIdList), sender); - if(logger.isTraceEnabled()) { logger.trace("Exit JCA:onJobIdList"); } } @Override @@ -980,7 +985,6 @@ public void onJobList(final ListJobsRequest request) { jobIdsFilteredByLabelsSet = jobManager.getJobsMatchingLabels(request.getCriteria().getMatchingLabels(), request.getCriteria().getLabelsOperand()); // Found no jobs matching labels exit if(jobIdsFilteredByLabelsSet.isEmpty()) { - if(logger.isTraceEnabled()) { logger.trace("Exit JCA:onJobList {}" , jobIdsFilteredByLabelsSet.size()); } sender.tell(new ListJobsResponse(request.requestId, SUCCESS, "", new ArrayList<>()), self); return; } @@ -988,27 +992,22 @@ public void onJobList(final ListJobsRequest request) { // Found jobs matching labels or no labels criterion given. // Apply additional criterion to both active and completed jobs - getFilteredNonTerminalJobList(request.getCriteria(), jobIdsFilteredByLabelsSet).mergeWith( - getFilteredTerminalJobList(request.getCriteria(), jobIdsFilteredByLabelsSet)) - .collect(() -> Lists.newArrayList(), List::add) - .doOnNext(resultList -> { - if (logger.isTraceEnabled()) { - logger.trace("Exit JCA:onJobList {}", resultList.size()); - } - sender.tell(new ListJobsResponse(request.requestId, SUCCESS, "", resultList), self); - }) - .subscribe(); + getFilteredNonTerminalJobList(request.getCriteria(),jobIdsFilteredByLabelsSet).mergeWith( + getFilteredTerminalJobList(request.getCriteria(),jobIdsFilteredByLabelsSet)) + .collect(() -> Lists.newArrayList(), List::add) + .doOnNext(resultList -> { + sender.tell(new ListJobsResponse(request.requestId, SUCCESS, "", resultList), self); + }) + .subscribe(); } @Override public void onListArchivedWorkers(final ListArchivedWorkersRequest request) { - if(logger.isTraceEnabled()) { logger.trace("In onListArchiveWorkers {}", request); } try { List workerList = jobStore.getArchivedWorkers(request.getJobId().getId()); if(workerList.size() > request.getLimit()) { workerList = workerList.subList(0, request.getLimit()); } - if(logger.isTraceEnabled()) { logger.trace("Returning {} archived Workers", workerList.size()); } getSender().tell(new ListArchivedWorkersResponse(request.requestId, SUCCESS, "", workerList), getSelf()); } catch(Exception e) { logger.error("Exception listing archived workers", e); @@ -1017,7 +1016,6 @@ public void onListArchivedWorkers(final ListArchivedWorkersRequest request) { } public void onListActiveWorkers(final ListWorkersRequest r) { - if(logger.isTraceEnabled()) { logger.trace("Enter JobClusterActor:onListActiveWorkers {}", r); } Optional jobInfo = jobManager.getJobInfoForNonTerminalJob(r.getJobId()); if(jobInfo.isPresent()) { @@ -1026,15 +1024,12 @@ public void onListActiveWorkers(final ListWorkersRequest r) { logger.warn("No such active job {} ", r.getJobId()); getSender().tell(new ListWorkersResponse(r.requestId,CLIENT_ERROR,"No such active job " + r.getJobId(), Lists.newArrayList()),getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:onListActiveWorkers {}", r); } } private List getFilteredNonTerminalJobIdList(ListJobCriteria request, Set prefilteredJobIdSet) { - if(logger.isTraceEnabled()) { logger.trace("Enter JobClusterActor:getFilteredNonTerminalJobIdList {}", request); } if((request.getJobState().isPresent() && request.getJobState().get().equals(JobState.MetaState.Terminal))) { - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredNonTerminalJobIdList with empty"); } return Collections.emptyList(); } List jobInfoList; @@ -1058,18 +1053,13 @@ private List getFilteredNonTerminalJobIdList(ListJobCriteria request, .build()) .collect(Collectors.toList());; - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredNonTerminalJobIdList {}", jIdList.size()); } return jIdList; } private List getFilteredTerminalJobIdList(ListJobCriteria request, Set prefilteredJobIdSet) { - if(logger.isTraceEnabled()) { logger.trace("Enter JobClusterActor:getFilteredTerminalJobIdList {}", request); } - if((request.getJobState().isPresent() && !request.getJobState().get().equals(JobState.MetaState.Terminal))) { - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredTerminalJobIdList with empty"); } return Collections.emptyList(); } else if(!request.getJobState().isPresent() && (request.getActiveOnly().isPresent() && request.getActiveOnly().get())) { - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredTerminalJobIdList with empty"); } return Collections.emptyList(); } List completedJobsList; @@ -1095,18 +1085,14 @@ private List getFilteredTerminalJobIdList(ListJobCriteria request, Se .filter(Objects::nonNull) .collect(Collectors.toList()); - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredTerminalJobIdList {}", completedJobIdList.size()); } return completedJobIdList; } private Observable getFilteredNonTerminalJobList(ListJobCriteria request, Set prefilteredJobIdSet) { - if(logger.isTraceEnabled()) { logger.trace("Entering JobClusterActor:getFilteredNonTerminalJobList"); } Duration timeout = Duration.ofMillis(500); if((request.getJobState().isPresent() && request.getJobState().get().equals(JobState.MetaState.Terminal))) { - - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredNonTerminalJobList with empty"); } return Observable.empty(); } List jobInfoList; @@ -1156,13 +1142,9 @@ private Observable getFilteredNonTerminalJobList(ListJobC */ private Observable getFilteredTerminalJobList(ListJobCriteria request, Set jobIdSet) { - if(logger.isTraceEnabled()) { logger.trace("JobClusterActor:getFilteredTerminalJobList"); } - if((request.getJobState().isPresent() && !request.getJobState().get().equals(JobState.MetaState.Terminal))) { - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredTerminalJobList with empty"); } return Observable.empty(); } else if(!request.getJobState().isPresent() && (request.getActiveOnly().isPresent() && request.getActiveOnly().get())) { - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredTerminalJobList with empty"); } return Observable.empty(); } List jobInfoList; @@ -1200,15 +1182,12 @@ private Observable getFilteredTerminalJobList(ListJobCrit @Override public void onJobListCompleted(final ListCompletedJobsInClusterRequest request) { - if(logger.isTraceEnabled()) { logger.trace ("Enter onJobListCompleted {}", request); } final ActorRef sender = getSender(); List completedJobsList = jobManager.getCompletedJobsList(request.getLimit(), null); sender.tell(new ListCompletedJobsInClusterResponse(request.requestId, SUCCESS, "", completedJobsList), sender); - if(logger.isTraceEnabled()) { logger.trace ("Exit onJobListCompleted {}", completedJobsList.size()); } } @Override public void onJobClusterDisable(final DisableJobClusterRequest req) { - if(logger.isTraceEnabled()) { logger.trace("Enter onJobClusterDisable {}", req); } ActorRef sender = getSender(); try { IJobClusterMetadata jobClusterMetadata = new JobClusterMetadataImpl.Builder().withIsDisabled(true) @@ -1249,12 +1228,9 @@ public void onJobClusterDisable(final DisableJobClusterRequest req) { sender.tell(new DisableJobClusterResponse(req.requestId, SERVER_ERROR, errorMsg), getSelf()); numJobClusterDisableErrors.increment(); } - if(logger.isTraceEnabled()) { logger.trace("Exit onJobClusterDisable"); } - } @Override public void onJobClusterEnable(final EnableJobClusterRequest req) { - if(logger.isTraceEnabled()) { logger.trace("Enter onJobClusterEnable"); } ActorRef sender = getSender(); try { IJobClusterMetadata jobClusterMetadata = new JobClusterMetadataImpl.Builder().withIsDisabled(false) @@ -1287,19 +1263,16 @@ public void onJobClusterEnable(final EnableJobClusterRequest req) { sender.tell(new EnableJobClusterResponse(req.requestId, SERVER_ERROR, errorMsg), getSelf()); numJobClusterEnableErrors.increment(); } - if(logger.isTraceEnabled()) { logger.trace("Enter onJobClusterEnable"); } } @Override public void onJobClusterGet(final GetJobClusterRequest request) { final ActorRef sender = getSender(); - if(logger.isTraceEnabled()) { logger.trace("In JobCluster Get " + jobClusterMetadata); } if(this.name.equals(request.getJobClusterName())) { MantisJobClusterMetadataView clusterView = generateJobClusterMetadataView(this.jobClusterMetadata, this.jobClusterMetadata.isDisabled(), ofNullable(this.cronManager).map(x -> x.isCronActive).orElse(false)); sender.tell(new GetJobClusterResponse(request.requestId, SUCCESS, "", of(clusterView)), getSelf()); } else { sender.tell(new GetJobClusterResponse(request.requestId, CLIENT_ERROR, "Cluster Name " + request.getJobClusterName() + " in request Does not match cluster Name " + this.name + " of Job Cluster Actor", Optional.empty()), getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit onJobClusterGet"); } } private MantisJobClusterMetadataView generateJobClusterMetadataView(IJobClusterMetadata jobClusterMetadata, boolean isDisabled, boolean cronActive) { @@ -1526,7 +1499,6 @@ private JobDefinition fromJobClusterDefinition(String user, IJobClusterDefinitio private void submitJob(JobDefinition jobDefinition, ActorRef sender, String user) throws PersistException { - if (logger.isTraceEnabled()) { logger.trace("Enter submitJobb"); } JobId jId = null; try { validateJobDefinition(jobDefinition); @@ -1580,12 +1552,9 @@ private void submitJob(JobDefinition jobDefinition, ActorRef sender, String user numJobSubmissionFailures.increment(); throw new IllegalStateException(e); } - if(logger.isTraceEnabled()) { logger.trace("Exit submitJob"); } - } @Override public void onJobInitialized(JobProto.JobInitialized jobInited) { - if(logger.isTraceEnabled()) { logger.trace("Enter onJobInitialized"); } jobManager.markJobInitialized(jobInited.jobId, System.currentTimeMillis()); if(jobInited.responseCode == SUCCESS) { @@ -1602,9 +1571,7 @@ public void onJobInitialized(JobProto.JobInitialized jobInited) { } else { logger.warn("No such job found {}", jobInited.jobId); } - } - if(logger.isTraceEnabled()) { logger.trace("Exit onJobInitialized"); } } /** @@ -1615,18 +1582,17 @@ public void onJobInitialized(JobProto.JobInitialized jobInited) { public void onJobStarted(final JobStartedEvent startedEvent) { logger.info("job {} started event", startedEvent.jobid); - Optional jobInfoOp = jobManager.getJobInfoForNonTerminalJob(startedEvent.jobid); - - if(jobInfoOp.isPresent()) { - // enforce SLA - jobManager.markJobStarted(jobInfoOp.get()); - getSelf().tell(new JobClusterProto.EnforceSLARequest(Instant.now(), of(jobInfoOp.get().jobDefinition)), getSelf()); - } - + jobManager + .getJobInfoForNonTerminalJob(startedEvent.jobid) + .ifPresent(jobInfo -> { + jobIdLaunchedSubject.onNext(startedEvent.jobid); + jobManager.markJobStarted(jobInfo); + // Enforce SLA + getSelf().tell(new JobClusterProto.EnforceSLARequest(Instant.now(), of(jobInfo.jobDefinition)), getSelf()); + }); } private void cleanUpOnJobSubmitFailure(JobId jId) { - if(logger.isTraceEnabled()) { logger.trace("Enter cleanUpOnJobSubmitFailure {}", jId); } if(jId != null) { Optional jobInfoOp = jobManager.getJobInfoForNonTerminalJob(jId); if (jobInfoOp.isPresent()) { // ensure there is a record of this job @@ -1648,8 +1614,6 @@ private void cleanUpOnJobSubmitFailure(JobId jId) { } else { logger.warn("cleanup on Job Submit failure failed as there was no JobId"); } - if(logger.isTraceEnabled()) { logger.trace("Exit cleanUpOnJobSubmitFailure {}", jId); } - } /** @@ -1707,7 +1671,6 @@ private void validateConstraints(List softConstraints, List jobInfo = jobManager.getJobInfoForNonTerminalJob(r.getWorkerId().getJobId()); if(jobInfo.isPresent()) { @@ -1733,8 +1696,6 @@ public void onWorkerEvent(WorkerEvent r) { logger.warn("Terminal Event from worker {} has no valid running job. Ignoring event ", r.getWorkerId()); } } - if(logger.isTraceEnabled()) { logger.trace("Exit onWorkerEvent {}", r); } - } /** @@ -1742,9 +1703,7 @@ public void onWorkerEvent(WorkerEvent r) { */ @Override public void onResubmitWorkerRequest(ResubmitWorkerRequest req) { - if(logger.isTraceEnabled()) { logger.trace("Enter onResubmitWorkerRequest {}", req); } onResubmitWorker(req); - if(logger.isTraceEnabled()) { logger.trace("Exit onResubmitWorkerRequest {}", req); } } /** @@ -1784,7 +1743,6 @@ public void onJobKillRequest(KillJobRequest req) { */ @Override public void onKillJobResponse(JobClusterProto.KillJobResponse resp) { - if(logger.isTraceEnabled()) { logger.trace("Enter onKillJobResponse {}", resp); } if (resp.responseCode == SUCCESS) { Optional jInfo = jobManager.getJobInfoForNonTerminalJob(resp.jobId); @@ -1858,15 +1816,11 @@ public void onKillJobResponse(JobClusterProto.KillJobResponse resp) { getSelf()); } } - - if(logger.isTraceEnabled()) { logger.trace("Exit onKillJobResponse {}", resp); } - } @Override public void onGetJobDetailsRequest(GetJobDetailsRequest req) { - if(logger.isTraceEnabled()) { logger.trace("Enter GetJobDetails {}", req); } GetJobDetailsResponse response = new GetJobDetailsResponse(req.requestId, CLIENT_ERROR_NOT_FOUND, "Job " + req.getJobId() + " not found", empty()); Optional jInfo = jobManager.getJobInfoForNonTerminalJob(req.getJobId()); if(jInfo.isPresent()) { @@ -1897,15 +1851,13 @@ public void onGetJobDetailsRequest(GetJobDetailsRequest req) { } } getSender().tell(response, getSelf()); - if(logger.isTraceEnabled()) { logger.trace("Exit GetJobDetails {}", req); } } @Override public void onGetLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest request) { - if(logger.isTraceEnabled()) { logger.trace("Enter onGetLatestJobDiscoveryInfo {}", request); } ActorRef sender = getSender(); if(this.name.equals(request.getJobCluster())) { - JobId latestJobId = jobIdSubmissionSubject.getValue(); + JobId latestJobId = jobIdLaunchedSubject.getValue(); logger.debug("[{}] latest job Id for cluster: {}", name, latestJobId); if (latestJobId != null) { Optional jInfo = jobManager.getJobInfoForNonTerminalJob(latestJobId); @@ -1933,65 +1885,55 @@ public void onGetLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDisco logger.warn(msg); sender.tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(request.requestId, SERVER_ERROR, msg, empty()), getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit onGetLatestJobDiscoveryInfo {}", request); } - } @Override public void onGetJobStatusSubject(GetJobSchedInfoRequest request) { - if(logger.isTraceEnabled()) { logger.trace("Enter onGetJobStatusSubject {}", request); } - Optional jInfo = jobManager.getJobInfoForNonTerminalJob(request.getJobId()); - if(jInfo.isPresent()) { - if(logger.isDebugEnabled()) { logger.debug("Forwarding getJobDetails to job actor for {}", request.getJobId()); } + if (jInfo.isPresent()) { jInfo.get().jobActor.forward(request, getContext()); - } else { // Could be a terminated job GetJobSchedInfoResponse response = new GetJobSchedInfoResponse(request.requestId, CLIENT_ERROR, "Job " + request.getJobId() + " not found or not active", empty()); getSender().tell(response, getSelf()); } - - if(logger.isTraceEnabled()) { logger.trace("Exit onGetJobStatusSubject "); } - } + @Override public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest request) { - if(logger.isTraceEnabled()) { logger.trace("Enter onGetLastSubmittedJobIdSubject {}", request); } ActorRef sender = getSender(); if(this.name.equals(request.getClusterName())) { - sender.tell(new GetLastSubmittedJobIdStreamResponse(request.requestId,SUCCESS,"",of(this.jobIdSubmissionSubject)),getSelf()); + sender.tell(new GetLastSubmittedJobIdStreamResponse(request.requestId, SUCCESS,"", of(this.jobIdSubmissionSubject)),getSelf()); } else { String msg = "Job Cluster " + request.getClusterName() + " In request does not match the name of this actor " + this.name; logger.warn(msg); sender.tell(new GetLastSubmittedJobIdStreamResponse(request.requestId,CLIENT_ERROR ,msg,empty()),getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit onGetLastSubmittedJobIdSubject {}", request); } - } + @Override + public void onGetLastLaunchedJobIdSubject(GetLastLaunchedJobIdStreamRequest request) { + ActorRef sender = getSender(); + if(this.name.equals(request.getClusterName())) { + sender.tell(new GetLastLaunchedJobIdStreamResponse(request.requestId, SUCCESS,"", of(this.jobIdLaunchedSubject)),getSelf()); + } else { + String msg = "Job Cluster " + request.getClusterName() + " In request does not match the name of this actor " + this.name; + logger.warn(msg); + sender.tell(new GetLastLaunchedJobIdStreamResponse(request.requestId, CLIENT_ERROR ,msg,empty()),getSelf()); + } + } @Override public void onBookkeepingRequest(JobClusterProto.BookkeepingRequest request) { - if(logger.isTraceEnabled()) { logger.trace("Enter onBookkeepingRequest for JobCluster {}", this.name); } // Enforce SLA if exists onEnforceSLARequest(new JobClusterProto.EnforceSLARequest()); // Tell all child jobs to migrate workers on disabled VMs (if any) jobManager.actorToJobIdMap.keySet().forEach(actorRef -> actorRef.tell(new JobProto.MigrateDisabledVmWorkersRequest(request.time), ActorRef.noSender())); - if(logger.isTraceEnabled()) { logger.trace("Exit onBookkeepingRequest for JobCluster {}", name); } } @Override public void onEnforceSLARequest(JobClusterProto.EnforceSLARequest request) { - if(logger.isTraceEnabled()) { logger.trace("Enter onEnforceSLA for JobCluster {} with request", this.name, request); } numSLAEnforcementExecutions.increment(); long now = request.timeOfEnforcement.toEpochMilli(); - List pendingInitializationJobsPriorToCutoff = jobManager.getJobActorsStuckInInit(now, getExpirePendingInitializeDelayMs()); - - List jobsStuckInAcceptedList = jobManager.getJobsStuckInAccepted(now, getExpireAcceptedDelayMs()); - - List jobsStuckInTerminatingList = jobManager.getJobsStuckInTerminating(now, getExpireAcceptedDelayMs()); - - if(!slaEnforcer.hasSLA()) { return; } @@ -2008,11 +1950,9 @@ public void onEnforceSLARequest(JobClusterProto.EnforceSLARequest request) { } for(int i=0; i< noOfJobsToLaunch; i++) { - getSelf().tell(new SubmitJobRequest(name, user, true,request.jobDefinitionOp), getSelf()); } - // enforce max. } else { List listOfJobs = new ArrayList<>(activeJobsCount + acceptedJobsCount); @@ -2029,7 +1969,6 @@ public void onEnforceSLARequest(JobClusterProto.EnforceSLARequest request) { } } - if(logger.isTraceEnabled()) { logger.trace("Exit onEnforceSLA for JobCluster {}", name); } } private long getExpireAcceptedDelayMs() { @@ -2059,7 +1998,6 @@ private Optional cloneToNewJobDefinitionWithoutArtifactNameAndVer return of(clonedJobDefn); } catch (Exception e) { logger.warn("Could not clone JobDefinition {} due to {}", jobDefinition, e.getMessage(), e); - e.printStackTrace(); } // should not get here @@ -2078,21 +2016,18 @@ private Optional cloneToNewJobDefinitionWithoutArtifactNameAndVer private Optional cloneJobDefinitionForQuickSubmitFromArchivedJobs(final List completedJobs, Optional jobDefinitionOp, MantisJobStore store) { - if(logger.isTraceEnabled()) { logger.trace("Enter createNewJobDefinitionFromLastSubmittedInheritSchedInfoAndParameters"); } Optional lastSubmittedJobDefn = getLastSubmittedJobDefinition(completedJobs, jobDefinitionOp, store); - if(lastSubmittedJobDefn.isPresent()) { - if(logger.isTraceEnabled()) { logger.trace("Exit createNewJobDefinitionFromLastSubmittedInheritSchedInfoAndParameters"); } + if (lastSubmittedJobDefn.isPresent()) { return cloneToNewJobDefinitionWithoutArtifactNameAndVersion(lastSubmittedJobDefn.get()); } - if(logger.isTraceEnabled()) { logger.trace("Exit createNewJobDefinitionFromLastSubmittedInheritSchedInfoAndParameters empty"); } return empty(); } private long getExpirePendingInitializeDelayMs() { // jobs older than 60 secs - return 60*1000; + return 60 * 1000; } /** @@ -2103,19 +2038,15 @@ private long getExpirePendingInitializeDelayMs() { */ @Override public void onTriggerCron(JobClusterProto.TriggerCronRequest request) { - if(logger.isTraceEnabled()) { logger.trace("Enter onTriggerCron for Job Cluster {}", this.name);} - if(jobClusterMetadata.getJobClusterDefinition().getSLA().getCronPolicy() != null) { - - if(jobClusterMetadata.getJobClusterDefinition().getSLA().getCronPolicy() == CronPolicy.KEEP_NEW || + if (jobClusterMetadata.getJobClusterDefinition().getSLA().getCronPolicy() != null) { + if (jobClusterMetadata.getJobClusterDefinition().getSLA().getCronPolicy() == CronPolicy.KEEP_NEW || this.jobManager.getAllNonTerminalJobsList().size() == 0) { getSelf().tell(new SubmitJobRequest(name, MANTIS_MASTER_USER, empty(), false), getSelf()); } else { - - // A job is already running skip resubmiting + // A job is already running skip resubmiting logger.info(name + ": Skipping submitting new job upon cron trigger, one exists already"); } } - if(logger.isTraceEnabled()) { logger.trace("Exit onTriggerCron Triggered for Job Cluster {}", this.name);} } private long getTerminatedJobToDeleteDelayHours() { @@ -2124,7 +2055,6 @@ private long getTerminatedJobToDeleteDelayHours() { @Override public void onJobClusterUpdateSLA(UpdateJobClusterSLARequest slaRequest) { - if(logger.isTraceEnabled()) { logger.trace("Enter onJobClusterUpdateSLA {}", slaRequest); } ActorRef sender = getSender(); try { SLA newSla = new SLA(slaRequest.getMin(), slaRequest.getMax(), slaRequest.getCronSpec(), slaRequest.getCronPolicy()); @@ -2132,7 +2062,7 @@ public void onJobClusterUpdateSLA(UpdateJobClusterSLARequest slaRequest) { .withSla(newSla) .build(); boolean isDisabled = jobClusterMetadata.isDisabled(); - if(slaRequest.isForceEnable() && jobClusterMetadata.isDisabled()) { + if (slaRequest.isForceEnable() && jobClusterMetadata.isDisabled()) { isDisabled = false; } IJobClusterMetadata jobCluster = new JobClusterMetadataImpl.Builder() @@ -2142,7 +2072,7 @@ public void onJobClusterUpdateSLA(UpdateJobClusterSLARequest slaRequest) { .build(); updateAndSaveJobCluster(jobCluster); - if(cronManager != null) + if (cronManager != null) cronManager.destroyCron(); this.cronManager = new CronManager(name, getSelf(), newSla); @@ -2153,19 +2083,17 @@ public void onJobClusterUpdateSLA(UpdateJobClusterSLARequest slaRequest) { jobClusterMetadata.getJobClusterDefinition().getName(), name+" SLA update") ); } catch(IllegalArgumentException e) { - logger.error("Invalid arguement job cluster not updated ", e); + logger.error("Invalid argument job cluster not updated ", e); sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, CLIENT_ERROR, name + " Job cluster SLA updation failed " + e.getMessage()), getSelf()); } catch(Exception e) { logger.error("job cluster not updated ", e); sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SERVER_ERROR, name + " Job cluster SLA updation failed " + e.getMessage()), getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit onJobClusterUpdateSLA {}", slaRequest); } } @Override public void onJobClusterUpdateLabels(UpdateJobClusterLabelsRequest labelRequest) { - if(logger.isTraceEnabled()) { logger.trace("Enter onJobClusterUpdateLabels {}", labelRequest); } ActorRef sender = getSender(); try { JobClusterConfig newConfig = new JobClusterConfig.Builder().from(jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig()) @@ -2194,15 +2122,13 @@ public void onJobClusterUpdateLabels(UpdateJobClusterLabelsRequest labelRequest) logger.error("job cluster labels not updated ", e); sender.tell(new UpdateJobClusterLabelsResponse(labelRequest.requestId, SERVER_ERROR, name + " labels updation failed " + e.getMessage()), getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit onJobClusterUpdateLabels {}", labelRequest); } } @Override public void onJobClusterUpdateArtifact(UpdateJobClusterArtifactRequest artifactReq) { - if(logger.isTraceEnabled()) { logger.trace("Entering JobClusterActor:onJobClusterUpdateArtifact"); } ActorRef sender = getSender(); try { - if(!isVersionUnique(artifactReq.getVersion(), jobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs())) { + if (!isVersionUnique(artifactReq.getVersion(), jobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs())) { String msg = String.format("job cluster %s not updated as the version %s is not unique", name,artifactReq.getVersion()); logger.error(msg); sender.tell(new UpdateJobClusterArtifactResponse(artifactReq.requestId, CLIENT_ERROR, msg), getSelf()); @@ -2223,7 +2149,6 @@ public void onJobClusterUpdateArtifact(UpdateJobClusterArtifactRequest artifactR logger.error("job cluster not updated ", e); sender.tell(new UpdateJobClusterArtifactResponse(artifactReq.requestId, SERVER_ERROR, name + " Job cluster artifact updation failed " + e.getMessage()), getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:onJobClusterUpdateArtifact"); } } private void updateJobClusterConfig(JobClusterConfig newConfig) throws Exception { @@ -2277,7 +2202,6 @@ public void onJobClusterUpdateSchedulingInfo(UpdateSchedulingInfo request) { } boolean isVersionUnique(String artifactVersion, List existingConfigs) { - if(logger.isTraceEnabled()) { logger.trace("Enter JobClusterActor {} isVersionnique {} existing versions {}",name,artifactVersion,existingConfigs);} for(JobClusterConfig config : existingConfigs) { if(config.getVersion().equals(artifactVersion)) { logger.info("Given Version {} is not unique during UpdateJobCluster {}",artifactVersion, name); @@ -2290,7 +2214,6 @@ boolean isVersionUnique(String artifactVersion, List existingC //TODO validate the migration config json @Override public void onJobClusterUpdateWorkerMigrationConfig(UpdateJobClusterWorkerMigrationStrategyRequest req) { - if(logger.isTraceEnabled()) { logger.trace("Entering JobClusterActor:onJobClusterUpdateWorkerMigrationConfig {}", req); } ActorRef sender = getSender(); try { @@ -2315,11 +2238,9 @@ public void onJobClusterUpdateWorkerMigrationConfig(UpdateJobClusterWorkerMigrat logger.error("job cluster migration config not updated ", e); sender.tell(new UpdateJobClusterWorkerMigrationStrategyResponse(req.requestId, SERVER_ERROR, name + " Job cluster worker migration config updation failed " + e.getMessage()), getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:onJobClusterUpdateWorkerMigrationConfig {}", req); } } private void updateAndSaveJobCluster(IJobClusterMetadata jobCluster) throws Exception { - if(logger.isTraceEnabled()) { logger.trace("Entering JobClusterActor:updateAndSaveJobCluster {}", jobCluster.getJobClusterDefinition().getName()); } jobStore.updateJobCluster(jobCluster); jobClusterMetadata = jobCluster; // enable cluster if @@ -2328,7 +2249,6 @@ private void updateAndSaveJobCluster(IJobClusterMetadata jobCluster) throws Exce } slaEnforcer = new SLAEnforcer(jobClusterMetadata.getJobClusterDefinition().getSLA()); logger.info("successfully saved job cluster"); - if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:updateAndSaveJobCluster {}", jobCluster.getJobClusterDefinition().getName()); } } /** @@ -2345,7 +2265,6 @@ private void updateAndSaveJobCluster(IJobClusterMetadata jobCluster) throws Exce private Optional getLastSubmittedJobDefinition(final List completedJobs, Optional jobDefinitionOp, MantisJobStore store) { - if(logger.isTraceEnabled()) { logger.trace("Entering getLastSubmittedJobDefinition"); } if(jobDefinitionOp.isPresent()) { return jobDefinitionOp; } @@ -2357,7 +2276,6 @@ private Optional getLastSubmittedJobDefinition(final List archivedJob = store.getArchivedJob(completedJob.get().getJobId()); if(archivedJob.isPresent()) { - if(logger.isTraceEnabled()) { logger.trace("Exit getLastSubmittedJobDefinition returning job {} with defn {}", archivedJob.get().getJobId(), archivedJob.get().getJobDefinition()); } return of(archivedJob.get().getJobDefinition()); } else { logger.warn("Could not find load archived Job {} for cluster {}", completedJob.get().getJobId(), name); @@ -2371,7 +2289,6 @@ private Optional getLastSubmittedJobDefinition(final List getLastSubmittedJobDefinition(final List jobInfo = jobManager.getJobInfoForNonTerminalJob(req.getJobId()); ActorRef sender = getSender(); if(jobInfo.isPresent()) { @@ -2398,12 +2314,10 @@ public void onScaleStage(ScaleStageRequest req) { } else { sender.tell(new ScaleStageResponse(req.requestId, CLIENT_ERROR, "Job " + req.getJobId() + " not found. Could not scale stage to " + req.getNumWorkers(), 0), getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit onScaleStage {}", req); } } @Override public void onResubmitWorker(ResubmitWorkerRequest req) { - if(logger.isTraceEnabled()) { logger.trace("Exit JCA:onResubmitWorker {}", req); } Optional jobInfo = jobManager.getJobInfoForNonTerminalJob(req.getJobId()); ActorRef sender = getSender(); if(jobInfo.isPresent()) { @@ -2411,12 +2325,9 @@ public void onResubmitWorker(ResubmitWorkerRequest req) { } else { sender.tell(new ResubmitWorkerResponse(req.requestId, CLIENT_ERROR, "Job " + req.getJobId() + " not found. Could not resubmit worker"), getSelf()); } - if(logger.isTraceEnabled()) { logger.trace("Exit JCA:onResubmitWorker {}", req); } } - static final class JobInfo { - final long submittedAt; public String version; volatile long initializeInitiatedAt = -1; @@ -2478,8 +2389,6 @@ public void setTerminatedAt(long terminatedAt) { this.terminatedAt = terminatedAt; } - - JobInfo(JobId jobId, JobDefinition jobDefinition, long submittedAt, ActorRef jobActor, JobState state, String user) { this(jobId, jobDefinition, submittedAt, jobActor, state, user, -1, -1); } @@ -2683,12 +2592,10 @@ JobInfo initJob(MantisJobMetadataImpl jobMeta, IJobClusterMetadata jobClusterMet jobInfo.jobActor.tell(new JobProto.InitJob(sender, true), context.self()); markJobInitializeInitiated(jobInfo, System.currentTimeMillis()); - return jobInfo; } JobInfo createJobInfoAndActorAndWatchActor(MantisJobMetadataImpl jobMeta, IJobClusterMetadata jobClusterMetadata) { - MantisScheduler scheduler1 = scheduler.forJob(jobMeta.getJobDefinition()); ActorRef jobActor = context.actorOf(JobActor.props(jobClusterMetadata.getJobClusterDefinition(), jobMeta, jobStore, scheduler1, publisher, costsCalculator), "JobActor-" + jobMeta.getJobId().getId()); @@ -2874,9 +2781,7 @@ Optional markCompleted(JobId jId, long completionTime, JobState st } List getAllNonTerminalJobsList() { - List allJobsList = new ArrayList<>(this.nonTerminalSortedJobSet); - if(logger.isTraceEnabled()) { logger.trace("Exiting JobClusterActor:getAllNonTerminatlJobsList {}", allJobsList); } return allJobsList; } @@ -2931,7 +2836,6 @@ List getTerminatingJobsList() { */ int acceptedJobsCount() { - return this.acceptedJobsMap.size(); } @@ -2940,7 +2844,6 @@ int acceptedJobsCount() { * @return no of active jobs */ int activeJobsCount() { - return this.activeJobsMap.size(); } @@ -2978,15 +2881,11 @@ Optional getJobDataForCompletedJob(String jId) { Optional getJobInfoForNonTerminalJob(JobId jId) { - if(logger.isTraceEnabled() ) { logger.trace("In getJobInfo {}", jId); } if(acceptedJobsMap.containsKey(jId)) { - if(logger.isDebugEnabled() ) { logger.debug("Found {} in accepted state", jId); } return of(acceptedJobsMap.get(jId)); } else if(activeJobsMap.containsKey(jId)) { - if(logger.isDebugEnabled() ) { logger.debug("Found {} in active state", jId); } return of(activeJobsMap.get(jId)); } else if(this.terminatingJobsMap.containsKey(jId)) { - if(logger.isDebugEnabled() ) { logger.debug("Found {} in terminating state", jId); } return of(terminatingJobsMap.get(jId)); } return empty(); @@ -3059,7 +2958,6 @@ public Set getJobsMatchingLabels(List