Skip to content
13 changes: 12 additions & 1 deletion .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ jobs:

- name: Install dependencies
run: go get .


- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Installing protoc-gen-go
run: |
go install google.golang.org/protobuf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]

- name: Generate grpc code
run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto

- name: Run integration tests
run: go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./internal/helpers
74 changes: 70 additions & 4 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
)

var (
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
ErrNotCompleted = errors.New("orchestration has not yet completed")
ErrNoFailures = errors.New("orchestration did not report failure details")
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
ErrNotCompleted = errors.New("orchestration has not yet completed")
ErrNoFailures = errors.New("orchestration did not report failure details")
ErrDuplicateInstance = errors.New("orchestration instance already exists")

EmptyInstanceID = InstanceID("")
)
Expand All @@ -36,6 +37,32 @@ type OrchestrationMetadata struct {
FailureDetails *protos.TaskFailureDetails
}

type CreateOrchestrationAction int

const (
THROW CreateOrchestrationAction = iota
SKIP
TERMINATE
)

type OrchestrationStatus int

const (
RUNNING OrchestrationStatus = iota
COMPLETED
// CONTINUED_AS_NEW
FAILED
// CANCELED
TERMINATED
PENDING
// SUSPENDED
)

type OrchestrationIDReuseOption struct {
CreateOrchestrationAction CreateOrchestrationAction
OrchestrationStatuses []OrchestrationStatus
}

// NewOrchestrationOptions configures options for starting a new orchestration.
type NewOrchestrationOptions func(*protos.CreateInstanceRequest) error

Expand All @@ -57,6 +84,45 @@ func WithInstanceID(id InstanceID) NewOrchestrationOptions {
}
}

// WithOrchestrationReuseOption configures Orchestration ID reuse policy.
func WithOrchestrationReuseOption(option *OrchestrationIDReuseOption) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
req.CreateInstanceOption = &protos.CreateInstanceOption{}
// set action
switch option.CreateOrchestrationAction {
case SKIP:
req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_SKIP
case TERMINATE:
req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_TERMINATE
case THROW:
req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_THROW
}

// set status
for _, status := range option.OrchestrationStatuses {
switch status {
case RUNNING:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING)
case COMPLETED:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED)
// case CONTINUED_AS_NEW:
// req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_CONTINUED_AS_NEW)
case FAILED:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED)
// case CANCELED:
// req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_CANCELED)
case TERMINATED:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED)
case PENDING:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_PENDING)
// case SUSPENDED:
// req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_SUSPENDED)
}
}
return nil
}
}

// WithInput configures an input for the orchestration. The specified input must be serializable.
func WithInput(input any) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
Expand Down
5 changes: 5 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ type Backend interface {
// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
// [api.ErrNotCompleted] is returned if the specified orchestration instance is still running.
PurgeOrchestrationState(context.Context, api.InstanceID) error

// CleanupOrchestration clean up all records for the specified orchestration instance in the entire task hub.
//
// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
CleanupOrchestration(context.Context, api.InstanceID) error
}

// MarshalHistoryEvent serializes the [HistoryEvent] into a protobuf byte array.
Expand Down
49 changes: 47 additions & 2 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -310,14 +311,58 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
ctx, span := helpers.StartNewCreateOrchestrationSpan(ctx, req.Name, req.Version.GetValue(), instanceID)
defer span.End()

// retreive instance with instanceID
metadata, err := g.backend.GetOrchestrationMetadata(ctx, api.InstanceID(instanceID))
if err != nil {
// if the instance doesn't exist, create instance directly.
if errors.Is(err, api.ErrInstanceNotFound) {
return createInstance(ctx, g.backend, instanceID, req, span)
} else {
return nil, err
}
}

// build target status set
statusSet := convertStatusToSet(req.CreateInstanceOption.OperationStatus)

// if current status is not one of the target status, create instance directly
if !statusSet[metadata.RuntimeStatus] {
return createInstance(ctx, g.backend, instanceID, req, span)
} else {
if req.CreateInstanceOption.Action == protos.CreateOrchestrationAction_THROW {
// throw ErrDuplicateEvent since instance already exists and the status is in target status set
return nil, api.ErrDuplicateInstance
} else if req.CreateInstanceOption.Action == protos.CreateOrchestrationAction_SKIP {
// skip creating new instance
g.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", instanceID)
return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil
} else {
// CreateInstanceAction_TERMINATE
// terminate existing instance and create a new one
if err := g.backend.CleanupOrchestration(ctx, api.InstanceID(instanceID)); err != nil {
return nil, err
}
return createInstance(ctx, g.backend, instanceID, req, span)
}
}
}

func createInstance(ctx context.Context, be Backend, instanceID string, req *protos.CreateInstanceRequest, span trace.Span) (*protos.CreateInstanceResponse, error) {
e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))
if err := g.backend.CreateOrchestrationInstance(ctx, e); err != nil {
if err := be.CreateOrchestrationInstance(ctx, e); err != nil {
return nil, err
}

return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil
}

func convertStatusToSet(statuses []protos.OrchestrationStatus) map[protos.OrchestrationStatus]bool {
statusSet := make(map[protos.OrchestrationStatus]bool)
for _, status := range statuses {
statusSet[status] = true
}
return statusSet
}

// TerminateInstance implements protos.TaskHubSidecarServiceServer
func (g *grpcExecutor) TerminateInstance(ctx context.Context, req *protos.TerminateRequest) (*protos.TerminateResponse, error) {
e := helpers.NewExecutionTerminatedEvent(req.Output, req.Recursive)
Expand Down
51 changes: 51 additions & 0 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,57 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins
return nil
}

func (be *sqliteBackend) CleanupOrchestration(ctx context.Context, id api.InstanceID) error {
if err := be.ensureDB(); err != nil {
return err
}

tx, err := be.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
}

var unused int
if err := row.Scan(&unused); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return api.ErrInstanceNotFound
} else {
return fmt.Errorf("failed to scan instance existence: %w", err)
}
}

_, err = tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from History table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewEvents table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewTasks WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewTasks table: %w", err)
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}

// Start implements backend.Backend
func (*sqliteBackend) Start(context.Context) error {
return nil
Expand Down
2 changes: 1 addition & 1 deletion submodules/durabletask-protobuf
46 changes: 46 additions & 0 deletions tests/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,49 @@ func Test_Grpc_Terminate_Recursive(t *testing.T) {
})
}
}

func Test_Grpc_ReuseInstanceIDSkipOrTerminate(t *testing.T) {
delayTime := 4 * time.Second
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
ctx.CreateTimer(delayTime).Await(nil)
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
return fmt.Sprintf("Hello, %s!", name), nil
})

cancelListener := startGrpcListener(t, r)
defer cancelListener()
instanceIDs := api.InstanceID("SKIP_IF_RUNNING_OR_COMPLETED")
ReuseIdOption := &api.OrchestrationIDReuseOption{
CreateOrchestrationAction: api.SKIP,
OrchestrationStatuses: []api.OrchestrationStatus{
api.RUNNING,
api.COMPLETED,
api.PENDING,
},
}

id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceIDs))
require.NoError(t, err)
id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(ReuseIdOption))
require.NoError(t, err)
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeout()
metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true))
require.NoError(t, err)
assert.Equal(t, true, metadata.IsComplete())
assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput)
time.Sleep(1 * time.Second)
}
Loading