Skip to content

Trace propagation that depends on durabletask-java and durabletask-go #1485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sdk-workflows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@
<dependency>
<groupId>io.dapr</groupId>
<artifactId>durabletask-client</artifactId>
<version>1.5.6</version>
<version>1.5.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<!--
manually declare durabletask-client's jackson dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import io.dapr.durabletask.PurgeResult;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.internal.ApiTokenClientInterceptor;
import io.dapr.workflows.runtime.DefaultWorkflowInstanceStatus;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.opentelemetry.context.Context;

import javax.annotation.Nullable;

Expand All @@ -37,7 +36,6 @@
*/
public class DaprWorkflowClient implements AutoCloseable {

private ClientInterceptor workflowApiTokenInterceptor;
private DurableTaskClient innerClient;
private ManagedChannel grpcChannel;

Expand All @@ -54,7 +52,7 @@ public DaprWorkflowClient() {
* @param properties Properties for the GRPC Channel.
*/
public DaprWorkflowClient(Properties properties) {
this(NetworkUtils.buildGrpcManagedChannel(properties, new ApiTokenClientInterceptor(properties)));
this(NetworkUtils.buildGrpcManagedChannel(properties));
}

/**
Expand Down Expand Up @@ -100,6 +98,11 @@ public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object in
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input);
}

public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object input, String instanceId,
Context context) {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId, context);
}

/**
* Schedules a new workflow using DurableTask client.
*
Expand Down Expand Up @@ -128,6 +131,23 @@ public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, NewWorkfl
orchestrationInstanceOptions);
}

/**
* Schedules a new workflow with a specified set of options for execution.
*
* @param <T> any Workflow type
* @param clazz Class extending Workflow to start an instance of.
* @param options the options for the new workflow, including input, instance ID, etc.
* @param context otel Context for trace propagation.
* @return the <code>instanceId</code> parameter value.
*/
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, NewWorkflowOptions options,
Context context) {
NewOrchestrationInstanceOptions orchestrationInstanceOptions = fromNewWorkflowOptions(options);

return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(),
orchestrationInstanceOptions, context);
}

/**
* Suspend the workflow associated with the provided instance id.
*
Expand Down Expand Up @@ -278,8 +298,10 @@ public void close() throws InterruptedException {
* @return a new instance of a DurableTaskClient with a GRPC channel.
*/
private static DurableTaskClient createDurableTaskClient(ManagedChannel grpcChannel) {

return new DurableTaskGrpcClientBuilder()
.grpcChannel(grpcChannel)
.interceptor(new io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@

import io.dapr.config.Properties;
import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.internal.ApiTokenClientInterceptor;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.opentelemetry.context.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,7 +67,8 @@ public WorkflowRuntimeBuilder(Logger logger) {
private WorkflowRuntimeBuilder(Properties properties, Logger logger) {
this.workflowApiTokenInterceptor = new ApiTokenClientInterceptor(properties);
this.managedChannel = NetworkUtils.buildGrpcManagedChannel(properties, workflowApiTokenInterceptor);
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(this.managedChannel);
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(this.managedChannel)
.interceptors(new DaprWorkflowClientGrpcInterceptors());
this.logger = logger;
}

Expand All @@ -77,7 +80,8 @@ private WorkflowRuntimeBuilder(Properties properties, Logger logger) {
public WorkflowRuntime build() {
if (instance == null) {
synchronized (WorkflowRuntime.class) {
this.executorService = this.executorService == null ? Executors.newCachedThreadPool() : this.executorService;
this.executorService = Context.taskWrapping(this.executorService == null ? Executors.newCachedThreadPool()
: this.executorService);
if (instance == null) {
instance = new WorkflowRuntime(
this.builder.withExecutorService(this.executorService).build(),
Expand Down
Loading