diff --git a/lib/src/main/java/com/palantir/computemodules/ComputeModule.java b/lib/src/main/java/com/palantir/computemodules/ComputeModule.java index e236072..fd2c682 100644 --- a/lib/src/main/java/com/palantir/computemodules/ComputeModule.java +++ b/lib/src/main/java/com/palantir/computemodules/ComputeModule.java @@ -56,6 +56,7 @@ public final class ComputeModule { private final Map> functions; private final Client client; private final ListeningExecutorService executor; + private final boolean reportsRestart; public static ComputeModuleBuilder builder() { return new ComputeModuleBuilder(); @@ -65,6 +66,10 @@ public static ComputeModuleBuilder builder() { * Starts the client polling loop. This is blocking, run in the background needed. */ public Void start() { + // notify the runtime of (re)start of the module + if (reportsRestart) { + client.postRestart(); + } while (true) { client.getJob().ifPresent(job -> { ListenableFuture future = executor.submit(() -> execute(job)); @@ -120,10 +125,14 @@ private InputStream serializeException(Failed failed) { } private ComputeModule( - Client client, ListeningExecutorService executor, Map> functions) { + Client client, + ListeningExecutorService executor, + Map> functions, + boolean reportsRestart) { this.client = client; this.executor = executor; this.functions = functions; + this.reportsRestart = reportsRestart; } public static final class ComputeModuleBuilder { @@ -132,6 +141,7 @@ public static final class ComputeModuleBuilder { Optional.empty(); // ComputeModuleClient construction is deferred due to env vars private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newVirtualThreadPerTaskExecutor()); + private boolean reportsRestart = false; private ComputeModuleBuilder() { functions = new HashMap<>(); @@ -175,8 +185,14 @@ public ComputeModuleBuilder withExecutor(ExecutorService newExecutor) { return this; } + public ComputeModuleBuilder withReportsRestart() { + this.reportsRestart = true; + return this; + } + public ComputeModule build() { - return new ComputeModule(client.orElseGet(() -> new ComputeModuleClient()), executor, functions); + return new ComputeModule( + client.orElseGet(() -> new ComputeModuleClient()), executor, functions, reportsRestart); } } } diff --git a/lib/src/main/java/com/palantir/computemodules/client/Client.java b/lib/src/main/java/com/palantir/computemodules/client/Client.java index 3d4cd22..93713b3 100644 --- a/lib/src/main/java/com/palantir/computemodules/client/Client.java +++ b/lib/src/main/java/com/palantir/computemodules/client/Client.java @@ -23,4 +23,6 @@ public interface Client { Optional getJob(); void postResult(String jobId, InputStream result); + + void postRestart(); } diff --git a/lib/src/main/java/com/palantir/computemodules/client/ComputeModuleClient.java b/lib/src/main/java/com/palantir/computemodules/client/ComputeModuleClient.java index 3da276d..a169c43 100644 --- a/lib/src/main/java/com/palantir/computemodules/client/ComputeModuleClient.java +++ b/lib/src/main/java/com/palantir/computemodules/client/ComputeModuleClient.java @@ -31,6 +31,8 @@ public final class ComputeModuleClient implements Client { private static final SafeLogger log = SafeLoggerFactory.get(ComputeModuleClient.class); + private static final Integer POST_RESULT_MAX_ATTEMPTS = 5; + private static final Integer POST_ERROR_MAX_ATTEMPTS = 3; private final HttpClient client; private final HttpRequest getRequest; @@ -77,10 +79,63 @@ public void postResult(String jobId, InputStream result) { .uri(URI.create("http://127.0.0.1:8946/results" + "/" + jobId)) .POST(BodyPublishers.ofInputStream(() -> result)) .build(); + String error = ""; try { - client.send(request, BodyHandlers.ofString()); + for (Integer i = 0; i < POST_RESULT_MAX_ATTEMPTS; i++) { + HttpResponse response = client.send(request, BodyHandlers.ofString()); + if (response.statusCode() == 204) { + log.info("Successfully posted result", SafeArg.of("jobId", jobId)); + return; + } + error = new String( + "Failed to post error for jobId: " + jobId + ", statusCode: " + response.statusCode()); + log.error("Failed to post error", SafeArg.of("jobId", error)); + Thread.sleep(1000); + } } catch (Exception e) { + error = new String("Failed to post error for jobId: " + jobId + "error: " + e.toString()); log.error("Failed to post result", SafeArg.of("jobId", jobId), e); } + log.error( + "Failed to post result after several attempts. Now attempting to return the error as the result. ", + SafeArg.of("jobId", jobId), + SafeArg.of("error", error), + SafeArg.of("attempts", POST_RESULT_MAX_ATTEMPTS)); + postError(jobId, error); + } + + private void postError(String jobId, String errorString) { + HttpRequest request = postRequest + .copy() + .uri(URI.create("http://127.0.0.1:8946/results" + "/" + jobId)) + .POST(BodyPublishers.ofString(errorString)) + .build(); + for (Integer i = 0; i < POST_ERROR_MAX_ATTEMPTS; i++) { + try { + HttpResponse response = client.send(request, BodyHandlers.ofString()); + if (response.statusCode() == 204) { + log.info("Successfully posted error", SafeArg.of("jobId", jobId)); + return; + } + log.error("Failed to post error", SafeArg.of("jobId", jobId), SafeArg.of("response", response)); + Thread.sleep(1000); + } catch (Exception e) { + log.error("Failed to post error", SafeArg.of("jobId", jobId), e); + } + } + } + + @Override + public void postRestart() { + HttpRequest request = postRequest + .copy() + .uri(URI.create("http://127.0.0.1:8946/restart-notify")) + .POST(BodyPublishers.ofString("")) + .build(); + try { + client.send(request, BodyHandlers.ofString()); + } catch (Exception e) { + log.error("Failed to post restart", e); + } } } diff --git a/lib/src/main/java/com/palantir/computemodules/client/TestClient.java b/lib/src/main/java/com/palantir/computemodules/client/TestClient.java index 4fb5ca4..f444cf6 100644 --- a/lib/src/main/java/com/palantir/computemodules/client/TestClient.java +++ b/lib/src/main/java/com/palantir/computemodules/client/TestClient.java @@ -109,4 +109,7 @@ public O result(String jobId, Class outputType) { throw new RuntimeException(e); } } + + @Override + public void postRestart() {} }