Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions lib/src/main/java/com/palantir/computemodules/ComputeModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class ComputeModule {
private final Map<String, FunctionRunner<?, ?>> functions;
private final Client client;
private final ListeningExecutorService executor;
private final boolean reportsRestart;

public static ComputeModuleBuilder builder() {
return new ComputeModuleBuilder();
Expand All @@ -65,6 +66,10 @@ public static ComputeModuleBuilder builder() {
* Starts the client polling loop. This is blocking, run in the background needed.
*/
public Void start() {
// notify the runtime of (re)start of the module
if (reportsRestart) {
client.postRestart();
}
while (true) {
client.getJob().ifPresent(job -> {
ListenableFuture<Result> future = executor.submit(() -> execute(job));
Expand Down Expand Up @@ -120,10 +125,14 @@ private InputStream serializeException(Failed failed) {
}

private ComputeModule(
Client client, ListeningExecutorService executor, Map<String, FunctionRunner<?, ?>> functions) {
Client client,
ListeningExecutorService executor,
Map<String, FunctionRunner<?, ?>> functions,
boolean reportsRestart) {
this.client = client;
this.executor = executor;
this.functions = functions;
this.reportsRestart = reportsRestart;
}

public static final class ComputeModuleBuilder {
Expand All @@ -132,6 +141,7 @@ public static final class ComputeModuleBuilder {
Optional.empty(); // ComputeModuleClient construction is deferred due to env vars
private ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newVirtualThreadPerTaskExecutor());
private boolean reportsRestart = false;

private ComputeModuleBuilder() {
functions = new HashMap<>();
Expand Down Expand Up @@ -175,8 +185,14 @@ public ComputeModuleBuilder withExecutor(ExecutorService newExecutor) {
return this;
}

public ComputeModuleBuilder withReportsRestart() {
this.reportsRestart = true;
return this;
}

public ComputeModule build() {
return new ComputeModule(client.orElseGet(() -> new ComputeModuleClient()), executor, functions);
return new ComputeModule(
client.orElseGet(() -> new ComputeModuleClient()), executor, functions, reportsRestart);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public interface Client {
Optional<ComputeModuleJob> getJob();

void postResult(String jobId, InputStream result);

void postRestart();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

public final class ComputeModuleClient implements Client {
private static final SafeLogger log = SafeLoggerFactory.get(ComputeModuleClient.class);
private static final Integer POST_RESULT_MAX_ATTEMPTS = 5;
private static final Integer POST_ERROR_MAX_ATTEMPTS = 3;

private final HttpClient client;
private final HttpRequest getRequest;
Expand Down Expand Up @@ -77,10 +79,63 @@ public void postResult(String jobId, InputStream result) {
.uri(URI.create("http://127.0.0.1:8946/results" + "/" + jobId))
.POST(BodyPublishers.ofInputStream(() -> result))
.build();
String error = "";
try {
client.send(request, BodyHandlers.ofString());
for (Integer i = 0; i < POST_RESULT_MAX_ATTEMPTS; i++) {
HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
if (response.statusCode() == 204) {
log.info("Successfully posted result", SafeArg.of("jobId", jobId));
return;
}
error = new String(
"Failed to post error for jobId: " + jobId + ", statusCode: " + response.statusCode());
log.error("Failed to post error", SafeArg.of("jobId", error));
Thread.sleep(1000);
}
} catch (Exception e) {
error = new String("Failed to post error for jobId: " + jobId + "error: " + e.toString());
log.error("Failed to post result", SafeArg.of("jobId", jobId), e);
}
log.error(
"Failed to post result after several attempts. Now attempting to return the error as the result. ",
SafeArg.of("jobId", jobId),
SafeArg.of("error", error),
SafeArg.of("attempts", POST_RESULT_MAX_ATTEMPTS));
postError(jobId, error);
}

private void postError(String jobId, String errorString) {
HttpRequest request = postRequest
.copy()
.uri(URI.create("http://127.0.0.1:8946/results" + "/" + jobId))
.POST(BodyPublishers.ofString(errorString))
.build();
for (Integer i = 0; i < POST_ERROR_MAX_ATTEMPTS; i++) {
try {
HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
if (response.statusCode() == 204) {
log.info("Successfully posted error", SafeArg.of("jobId", jobId));
return;
}
log.error("Failed to post error", SafeArg.of("jobId", jobId), SafeArg.of("response", response));
Thread.sleep(1000);
} catch (Exception e) {
log.error("Failed to post error", SafeArg.of("jobId", jobId), e);
}
}
}

@Override
public void postRestart() {
HttpRequest request = postRequest
.copy()
.uri(URI.create("http://127.0.0.1:8946/restart-notify"))
.POST(BodyPublishers.ofString(""))
.build();
try {
client.send(request, BodyHandlers.ofString());
} catch (Exception e) {
log.error("Failed to post restart", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,7 @@ public <O> O result(String jobId, Class<O> outputType) {
throw new RuntimeException(e);
}
}

@Override
public void postRestart() {}
}