diff --git a/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java b/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java
index 8629982a90..18e4024142 100644
--- a/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java
+++ b/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java
@@ -46,10 +46,9 @@ private void registerWorkflowsAndActivities(ApplicationContext applicationContex
workflowRuntimeBuilder.registerActivity(activity);
}
- try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
- LOGGER.info("Starting workflow runtime ... ");
- runtime.start(false);
- }
+ WorkflowRuntime runtime = workflowRuntimeBuilder.build();
+ LOGGER.info("Starting workflow runtime ... ");
+ runtime.start(false);
}
@Override
diff --git a/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java b/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java
index ef0e6702ad..b8ce0ef67c 100644
--- a/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java
+++ b/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java
@@ -13,8 +13,8 @@
package io.dapr.examples.unittesting;
-import com.microsoft.durabletask.Task;
-import com.microsoft.durabletask.TaskCanceledException;
+import io.dapr.durabletask.Task;
+import io.dapr.durabletask.TaskCanceledException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;
diff --git a/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java
index 61b1572bda..51fb9ae6aa 100644
--- a/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java
+++ b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java
@@ -29,9 +29,8 @@ public static void main(String[] args) throws Exception {
builder.registerActivity(ToUpperCaseActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
- try (WorkflowRuntime runtime = builder.build()) {
- System.out.println("Start workflow runtime");
- runtime.start();
- }
+ WorkflowRuntime runtime = builder.build();
+ System.out.println("Start workflow runtime");
+ runtime.start();
}
}
diff --git a/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkflowWorker.java b/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkflowWorker.java
index 81a9b4973c..0e692551e5 100644
--- a/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkflowWorker.java
+++ b/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkflowWorker.java
@@ -31,9 +31,7 @@ public static void main(String[] args) throws Exception {
builder.registerActivity(ReverseActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
- try (WorkflowRuntime runtime = builder.build()) {
- System.out.println("Start workflow runtime");
- runtime.start();
- }
+ WorkflowRuntime runtime = builder.build();
+ System.out.println("Start workflow runtime");
}
}
diff --git a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java
index 0ca050e874..43ef176a2e 100644
--- a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java
+++ b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java
@@ -16,6 +16,9 @@
import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
public class DemoContinueAsNewWorker {
/**
* The main method of this app.
@@ -25,13 +28,14 @@ public class DemoContinueAsNewWorker {
*/
public static void main(String[] args) throws Exception {
// Register the Workflow with the builder.
- WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoContinueAsNewWorkflow.class);
+ WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().
+ registerWorkflow(DemoContinueAsNewWorkflow.class)
+ .withExecutorService(Executors.newFixedThreadPool(3));
builder.registerActivity(CleanUpActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
- try (WorkflowRuntime runtime = builder.build()) {
- System.out.println("Start workflow runtime");
- runtime.start();
- }
+ WorkflowRuntime runtime = builder.build();
+ System.out.println("Start workflow runtime");
+ runtime.start();
}
}
diff --git a/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java
index aaa1e7c819..f7d0c8ebf5 100644
--- a/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java
+++ b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java
@@ -30,9 +30,8 @@ public static void main(String[] args) throws Exception {
builder.registerActivity(DenyActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
- try (WorkflowRuntime runtime = builder.build()) {
- System.out.println("Start workflow runtime");
- runtime.start();
- }
+ WorkflowRuntime runtime = builder.build();
+ System.out.println("Start workflow runtime");
+ runtime.start();
}
}
diff --git a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java
index d5c6d14e7a..4c691dbc35 100644
--- a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java
+++ b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java
@@ -29,9 +29,8 @@ public static void main(String[] args) throws Exception {
builder.registerActivity(CountWordsActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
- try (WorkflowRuntime runtime = builder.build()) {
- System.out.println("Start workflow runtime");
- runtime.start();
- }
+ WorkflowRuntime runtime = builder.build();
+ System.out.println("Start workflow runtime");
+ runtime.start(false);
}
}
diff --git a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java
index 1760a53b3e..611b1cac67 100644
--- a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java
+++ b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java
@@ -13,7 +13,7 @@
package io.dapr.examples.workflows.faninout;
-import com.microsoft.durabletask.Task;
+import io.dapr.durabletask.Task;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
diff --git a/pom.xml b/pom.xml
index 51f5070027..19842bf1cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,7 @@
2.16.1
true
diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java
index ca8678e2d7..5c6a360c8a 100644
--- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java
+++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java
@@ -91,10 +91,9 @@ static void daprProperties(DynamicPropertyRegistry registry) {
*/
@BeforeEach
public void init() {
- try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
- System.out.println("Start workflow runtime");
- runtime.start(false);
- }
+ WorkflowRuntime runtime = workflowRuntimeBuilder.build();
+ System.out.println("Start workflow runtime");
+ runtime.start(false);
}
@Test
diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml
index a5c4ae9495..d8c7ada903 100644
--- a/sdk-workflows/pom.xml
+++ b/sdk-workflows/pom.xml
@@ -45,14 +45,14 @@
test
- com.microsoft
+ io.dapr
durabletask-client
- 1.5.0
+ 1.5.2
com.fasterxml.jackson.core
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java
index 9ed34fdc16..f649f0086d 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java
@@ -13,10 +13,10 @@
package io.dapr.workflows;
-import com.microsoft.durabletask.CompositeTaskFailedException;
-import com.microsoft.durabletask.Task;
-import com.microsoft.durabletask.TaskCanceledException;
-import com.microsoft.durabletask.TaskFailedException;
+import io.dapr.durabletask.CompositeTaskFailedException;
+import io.dapr.durabletask.Task;
+import io.dapr.durabletask.TaskCanceledException;
+import io.dapr.durabletask.TaskFailedException;
import org.slf4j.Logger;
import javax.annotation.Nullable;
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java
index e40d0640ad..ab46dff796 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java
@@ -13,12 +13,12 @@
package io.dapr.workflows.client;
-import com.microsoft.durabletask.DurableTaskClient;
-import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
-import com.microsoft.durabletask.NewOrchestrationInstanceOptions;
-import com.microsoft.durabletask.OrchestrationMetadata;
-import com.microsoft.durabletask.PurgeResult;
import io.dapr.config.Properties;
+import io.dapr.durabletask.DurableTaskClient;
+import io.dapr.durabletask.DurableTaskGrpcClientBuilder;
+import io.dapr.durabletask.NewOrchestrationInstanceOptions;
+import io.dapr.durabletask.OrchestrationMetadata;
+import io.dapr.durabletask.PurgeResult;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.internal.ApiTokenClientInterceptor;
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java
index 4a16d45f44..551c21a373 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java
@@ -13,7 +13,7 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.TaskActivityContext;
+import io.dapr.durabletask.TaskActivityContext;
import io.dapr.workflows.WorkflowActivityContext;
/**
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java
index 20572995ce..a6c09fe768 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java
@@ -13,12 +13,12 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.CompositeTaskFailedException;
-import com.microsoft.durabletask.RetryPolicy;
-import com.microsoft.durabletask.Task;
-import com.microsoft.durabletask.TaskCanceledException;
-import com.microsoft.durabletask.TaskOptions;
-import com.microsoft.durabletask.TaskOrchestrationContext;
+import io.dapr.durabletask.CompositeTaskFailedException;
+import io.dapr.durabletask.RetryPolicy;
+import io.dapr.durabletask.Task;
+import io.dapr.durabletask.TaskCanceledException;
+import io.dapr.durabletask.TaskOptions;
+import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java
index 6919a510e7..c51fe7c3a7 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java
@@ -13,7 +13,7 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.FailureDetails;
+import io.dapr.durabletask.FailureDetails;
import io.dapr.workflows.client.WorkflowFailureDetails;
/**
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java
index d1082adb8a..392357bc32 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java
@@ -13,9 +13,9 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.FailureDetails;
-import com.microsoft.durabletask.OrchestrationMetadata;
-import com.microsoft.durabletask.OrchestrationRuntimeStatus;
+import io.dapr.durabletask.FailureDetails;
+import io.dapr.durabletask.OrchestrationMetadata;
+import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.client.WorkflowFailureDetails;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import io.dapr.workflows.client.WorkflowRuntimeStatus;
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java
index 3dcb8ef6b6..0bbfd4beb9 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java
@@ -13,8 +13,8 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.TaskActivity;
-import com.microsoft.durabletask.TaskActivityFactory;
+import io.dapr.durabletask.TaskActivity;
+import io.dapr.durabletask.TaskActivityFactory;
import io.dapr.workflows.WorkflowActivity;
import java.lang.reflect.Constructor;
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java
index 17d509924e..537427318b 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java
@@ -13,8 +13,8 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.TaskActivity;
-import com.microsoft.durabletask.TaskActivityFactory;
+import io.dapr.durabletask.TaskActivity;
+import io.dapr.durabletask.TaskActivityFactory;
import io.dapr.workflows.WorkflowActivity;
/**
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java
index 4fab3f9cdf..10b5248748 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java
@@ -13,8 +13,8 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.TaskOrchestration;
-import com.microsoft.durabletask.TaskOrchestrationFactory;
+import io.dapr.durabletask.TaskOrchestration;
+import io.dapr.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
import java.lang.reflect.Constructor;
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java
index ad3159406d..f803c49de7 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java
@@ -13,8 +13,8 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.TaskOrchestration;
-import com.microsoft.durabletask.TaskOrchestrationFactory;
+import io.dapr.durabletask.TaskOrchestration;
+import io.dapr.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
/**
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java
index 6754f675bd..7ac44e0001 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java
@@ -13,17 +13,34 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.DurableTaskGrpcWorker;
+import io.dapr.durabletask.DurableTaskGrpcWorker;
+import io.grpc.ManagedChannel;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Contains methods to register workflows and activities.
*/
public class WorkflowRuntime implements AutoCloseable {
- private DurableTaskGrpcWorker worker;
+ private final DurableTaskGrpcWorker worker;
+ private final ManagedChannel managedChannel;
+ private final ExecutorService executorService;
- public WorkflowRuntime(DurableTaskGrpcWorker worker) {
+ /**
+ * Constructor.
+ *
+ * @param worker grpcWorker processing activities.
+ * @param managedChannel grpc channel.
+ * @param executorService executor service responsible for running the threads.
+ */
+ public WorkflowRuntime(DurableTaskGrpcWorker worker,
+ ManagedChannel managedChannel,
+ ExecutorService executorService) {
this.worker = worker;
+ this.managedChannel = managedChannel;
+ this.executorService = executorService;
}
/**
@@ -50,11 +67,31 @@ public void start(boolean block) {
/**
* {@inheritDoc}
*/
- @Override
public void close() {
- if (this.worker != null) {
- this.worker.close();
- this.worker = null;
+ this.shutDownWorkerPool();
+ this.closeSideCarChannel();
+ }
+
+ private void closeSideCarChannel() {
+ this.managedChannel.shutdown();
+
+ try {
+ if (!this.managedChannel.awaitTermination(60, TimeUnit.SECONDS)) {
+ this.managedChannel.shutdownNow();
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void shutDownWorkerPool() {
+ this.executorService.shutdown();
+ try {
+ if (!this.executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ this.executorService.shutdownNow();
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
}
}
}
\ No newline at end of file
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java
index 397e58b30f..7f1147a0d7 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java
@@ -13,8 +13,8 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.config.Properties;
+import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowActivity;
@@ -26,6 +26,8 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
public class WorkflowRuntimeBuilder {
private static final ClientInterceptor WORKFLOW_INTERCEPTOR = new ApiTokenClientInterceptor();
@@ -36,6 +38,8 @@ public class WorkflowRuntimeBuilder {
private final Set activitySet = Collections.synchronizedSet(new HashSet<>());
private final Set workflowSet = Collections.synchronizedSet(new HashSet<>());
private final DurableTaskGrpcWorkerBuilder builder;
+ private final ManagedChannel managedChannel;
+ private ExecutorService executorService;
/**
* Constructs the WorkflowRuntimeBuilder.
@@ -58,8 +62,8 @@ public WorkflowRuntimeBuilder(Logger logger) {
}
private WorkflowRuntimeBuilder(Properties properties, Logger logger) {
- ManagedChannel managedChannel = NetworkUtils.buildGrpcManagedChannel(properties, WORKFLOW_INTERCEPTOR);
- this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(managedChannel);
+ this.managedChannel = NetworkUtils.buildGrpcManagedChannel(properties, WORKFLOW_INTERCEPTOR);
+ this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(this.managedChannel);
this.logger = logger;
}
@@ -71,8 +75,11 @@ private WorkflowRuntimeBuilder(Properties properties, Logger logger) {
public WorkflowRuntime build() {
if (instance == null) {
synchronized (WorkflowRuntime.class) {
+ this.executorService = this.executorService == null ? Executors.newCachedThreadPool() : this.executorService;
if (instance == null) {
- instance = new WorkflowRuntime(this.builder.build());
+ instance = new WorkflowRuntime(
+ this.builder.withExecutorService(this.executorService).build(),
+ this.managedChannel, this.executorService);
}
}
}
@@ -84,6 +91,18 @@ public WorkflowRuntime build() {
return instance;
}
+ /**
+ * Register Executor Service to use with workflow.
+ *
+ * @param executorService to be used.
+ * @return {@link WorkflowRuntimeBuilder}.
+ */
+ public WorkflowRuntimeBuilder withExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ this.builder.withExecutorService(executorService);
+ return this;
+ }
+
/**
* Registers a Workflow object.
*
diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverter.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverter.java
index 198a1215c7..2900916aaf 100644
--- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverter.java
+++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverter.java
@@ -13,7 +13,7 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.OrchestrationRuntimeStatus;
+import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.client.WorkflowRuntimeStatus;
public class WorkflowRuntimeStatusConverter {
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java
index 61d153484c..0de864c887 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java
@@ -13,11 +13,11 @@
package io.dapr.workflows;
-import com.microsoft.durabletask.CompositeTaskFailedException;
-import com.microsoft.durabletask.Task;
-import com.microsoft.durabletask.TaskCanceledException;
-import com.microsoft.durabletask.TaskOptions;
-import com.microsoft.durabletask.TaskOrchestrationContext;
+import io.dapr.durabletask.CompositeTaskFailedException;
+import io.dapr.durabletask.Task;
+import io.dapr.durabletask.TaskCanceledException;
+import io.dapr.durabletask.TaskOptions;
+import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.runtime.DefaultWorkflowContext;
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java
index 4b2b7ec520..3ad66877c5 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java
@@ -13,10 +13,10 @@
package io.dapr.workflows.client;
-import com.microsoft.durabletask.DurableTaskClient;
-import com.microsoft.durabletask.NewOrchestrationInstanceOptions;
-import com.microsoft.durabletask.OrchestrationMetadata;
-import com.microsoft.durabletask.OrchestrationRuntimeStatus;
+import io.dapr.durabletask.DurableTaskClient;
+import io.dapr.durabletask.NewOrchestrationInstanceOptions;
+import io.dapr.durabletask.OrchestrationMetadata;
+import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java
index f4f40d0564..776e070812 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java
@@ -13,9 +13,9 @@
package io.dapr.workflows.client;
-import com.microsoft.durabletask.FailureDetails;
-import com.microsoft.durabletask.OrchestrationMetadata;
-import com.microsoft.durabletask.OrchestrationRuntimeStatus;
+import io.dapr.durabletask.FailureDetails;
+import io.dapr.durabletask.OrchestrationMetadata;
+import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.runtime.DefaultWorkflowInstanceStatus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java
index 0783176051..76a7e07af1 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java
@@ -1,6 +1,6 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.TaskActivityContext;
+import io.dapr.durabletask.TaskActivityContext;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.junit.Test;
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java
index bd8788bbdd..0c680ea51e 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java
@@ -1,6 +1,6 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.TaskActivityContext;
+import io.dapr.durabletask.TaskActivityContext;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.junit.Test;
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java
index a73b616bc2..fd76cadaf4 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java
@@ -13,7 +13,7 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.TaskOrchestrationContext;
+import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java
index 22f315aa53..85849af212 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java
@@ -13,7 +13,7 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.TaskOrchestrationContext;
+import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java
index c159930b91..2b3341fbf3 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java
@@ -67,7 +67,8 @@ public void registerValidWorkflowActivityInstance() {
@Test
public void buildTest() {
assertDoesNotThrow(() -> {
- try (WorkflowRuntime runtime = new WorkflowRuntimeBuilder().build()) {
+ try {
+ WorkflowRuntime runtime = new WorkflowRuntimeBuilder().build();
System.out.println("WorkflowRuntime created");
} catch (Exception e) {
throw new RuntimeException(e);
@@ -88,13 +89,11 @@ public void loggingOutputTest() {
WorkflowRuntimeBuilder workflowRuntimeBuilder = new WorkflowRuntimeBuilder();
- try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
- verify(testLogger, times(1))
- .info(eq("Registered Workflow: {}"), eq("TestWorkflow"));
+ WorkflowRuntime runtime = workflowRuntimeBuilder.build();
+ verify(testLogger, times(1))
+ .info(eq("Registered Workflow: {}"), eq("TestWorkflow"));
- verify(testLogger, times(1))
- .info(eq("Registered Activity: {}"), eq("TestActivity"));
- }
+ verify(testLogger, times(1))
+ .info(eq("Registered Activity: {}"), eq("TestActivity"));
}
-
}
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverterTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverterTest.java
index 04783d961a..9e2d4f9835 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverterTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverterTest.java
@@ -13,7 +13,7 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.OrchestrationRuntimeStatus;
+import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.client.WorkflowRuntimeStatus;
import org.junit.jupiter.api.Test;
diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java
index d3bd40a437..9ee2710e79 100644
--- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java
+++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java
@@ -14,10 +14,15 @@
package io.dapr.workflows.runtime;
-import com.microsoft.durabletask.DurableTaskGrpcWorker;
-import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
+import io.dapr.durabletask.DurableTaskGrpcWorker;
+import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder;
+import io.dapr.config.Properties;
+import io.dapr.utils.NetworkUtils;
import org.junit.jupiter.api.Test;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public class WorkflowRuntimeTest {
@@ -25,15 +30,16 @@ public class WorkflowRuntimeTest {
@Test
public void startTest() {
DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder().build();
- try (WorkflowRuntime runtime = new WorkflowRuntime(worker)) {
- assertDoesNotThrow(() -> runtime.start(false));
- }
+ WorkflowRuntime runtime = new WorkflowRuntime(worker, NetworkUtils.buildGrpcManagedChannel(new Properties()),
+ Executors.newCachedThreadPool());
+ assertDoesNotThrow(() -> runtime.start(false));
}
@Test
public void closeWithoutStarting() {
DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder().build();
- try (WorkflowRuntime runtime = new WorkflowRuntime(worker)) {
+ try (WorkflowRuntime runtime = new WorkflowRuntime(worker, NetworkUtils.buildGrpcManagedChannel(new Properties()),
+ Executors.newCachedThreadPool())) {
assertDoesNotThrow(runtime::close);
}
}