Skip to content

Commit 74917a5

Browse files
committed
internal/workflow: make persistence easier
Loading arbitrarily-typed values from a database is hard -- you have to know the type you're trying to load. The workflow host isn't in a great position to know that. The workflow itself is. Do serialization in the workflow. We can require our results to be JSON-serializable, but not our errors. Fortunately, right now we don't look at the value of errors, just their presence, so we can afford to lose their types. Change-Id: I7ff16ad381d0290bf9dd38aaff192c70537bb283 Reviewed-on: https://go-review.googlesource.com/c/build/+/350450 Trust: Heschi Kreinick <[email protected]> Trust: Alexander Rakoczy <[email protected]> Run-TryBot: Heschi Kreinick <[email protected]> TryBot-Result: Go Bot <[email protected]> Reviewed-by: Alexander Rakoczy <[email protected]>
1 parent 5eaa6e0 commit 74917a5

File tree

2 files changed

+56
-29
lines changed

2 files changed

+56
-29
lines changed

internal/workflow/workflow.go

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ package workflow
2929

3030
import (
3131
"context"
32+
"encoding/json"
3233
"fmt"
3334
"reflect"
3435
"sync"
@@ -202,10 +203,11 @@ type Listener interface {
202203
// TaskState contains the state of a task in a running workflow. Once Finished
203204
// is true, either Result or Error will be populated.
204205
type TaskState struct {
205-
Name string
206-
Finished bool
207-
Result interface{}
208-
Error error
206+
Name string
207+
Finished bool
208+
Result interface{}
209+
SerializedResult []byte
210+
Error string
209211
}
210212

211213
// WorkflowState contains the shallow state of a running workflow.
@@ -251,12 +253,13 @@ type Workflow struct {
251253
}
252254

253255
type taskState struct {
254-
def *taskDefinition
255-
w *Workflow
256-
started bool
257-
finished bool
258-
result interface{}
259-
err error
256+
def *taskDefinition
257+
w *Workflow
258+
started bool
259+
finished bool
260+
result interface{}
261+
serializedResult []byte
262+
err error
260263
}
261264

262265
func (t *taskState) args() ([]reflect.Value, bool) {
@@ -273,12 +276,16 @@ func (t *taskState) args() ([]reflect.Value, bool) {
273276
}
274277

275278
func (t *taskState) toExported() *TaskState {
276-
return &TaskState{
277-
Name: t.def.name,
278-
Finished: t.finished,
279-
Result: t.result,
280-
Error: t.err,
279+
state := &TaskState{
280+
Name: t.def.name,
281+
Finished: t.finished,
282+
Result: t.result,
283+
SerializedResult: append([]byte(nil), t.serializedResult...),
281284
}
285+
if t.err != nil {
286+
state.Error = t.err.Error()
287+
}
288+
return state
282289
}
283290

284291
// Start instantiates a workflow with the given parameters.
@@ -318,10 +325,12 @@ func (w *Workflow) validate() error {
318325
return nil
319326
}
320327

321-
// Resume restores a workflow from stored state. The WorkflowState can be
322-
// constructed by the host. TaskStates should be saved from Listener calls.
323-
// Tasks that had not finished will be restarted, but tasks that finished in
324-
// errors will not be retried.
328+
// Resume restores a workflow from stored state. Tasks that had not finished
329+
// will be restarted, but tasks that finished in errors will not be retried.
330+
//
331+
// The host must create the WorkflowState. TaskStates should be saved from
332+
// listener callbacks, but for ease of stoage, their Result field does not
333+
// need to be populated.
325334
func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
326335
w := &Workflow{
327336
ID: state.ID,
@@ -337,16 +346,25 @@ func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskSt
337346
if !ok {
338347
return nil, fmt.Errorf("task state for %q not found", taskDef.name)
339348
}
340-
w.tasks[taskDef] = &taskState{
341-
def: taskDef,
342-
w: w,
343-
started: tState.Finished, // Can't resume tasks, so either it's new or done.
344-
finished: tState.Finished,
345-
result: tState.Result,
346-
err: tState.Error,
349+
state := &taskState{
350+
def: taskDef,
351+
w: w,
352+
started: tState.Finished, // Can't resume tasks, so either it's new or done.
353+
finished: tState.Finished,
354+
serializedResult: tState.SerializedResult,
355+
}
356+
if state.serializedResult != nil {
357+
ptr := reflect.New(reflect.ValueOf(taskDef.f).Type().Out(0))
358+
if err := json.Unmarshal(tState.SerializedResult, ptr.Interface()); err != nil {
359+
return nil, fmt.Errorf("failed to unmarshal result of %v: %v", taskDef.name, err)
360+
}
361+
state.result = ptr.Elem().Interface()
362+
}
363+
if tState.Error != "" {
364+
state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter.
347365
}
366+
w.tasks[taskDef] = state
348367
}
349-
350368
return w, nil
351369
}
352370

@@ -412,6 +430,9 @@ func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskSta
412430
}
413431
state.finished = true
414432
state.result, state.err = out[0].Interface(), err
433+
if err == nil {
434+
state.serializedResult, state.err = json.Marshal(state.result)
435+
}
415436
return state
416437
}
417438

internal/workflow/workflow_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"time"
1616

1717
"github.com/google/go-cmp/cmp"
18+
"github.com/google/go-cmp/cmp/cmpopts"
1819
"golang.org/x/build/internal/workflow"
1920
)
2021

@@ -200,6 +201,9 @@ func TestResume(t *testing.T) {
200201
wfState := &workflow.WorkflowState{ID: w.ID, Params: nil}
201202
taskStates := storage.states[w.ID]
202203
w2, err := workflow.Resume(wd, wfState, taskStates)
204+
if err != nil {
205+
t.Fatal(err)
206+
}
203207
out := runWorkflow(t, w2, storage)
204208
if got, want := out["output"], "not blocked"; got != want {
205209
t.Errorf("output from maybeBlock was %q, wanted %q", got, want)
@@ -230,12 +234,13 @@ func (l *mapListener) TaskStateChanged(workflowID, taskID string, state *workflo
230234
}
231235

232236
func (l *mapListener) assertState(t *testing.T, w *workflow.Workflow, want map[string]*workflow.TaskState) {
233-
if diff := cmp.Diff(l.states[w.ID], want); diff != "" {
237+
if diff := cmp.Diff(l.states[w.ID], want, cmpopts.IgnoreFields(workflow.TaskState{}, "SerializedResult")); diff != "" {
234238
t.Errorf("task state didn't match expections: %v", diff)
235239
}
236240
}
237241

238242
func startWorkflow(t *testing.T, wd *workflow.Definition, params map[string]string) *workflow.Workflow {
243+
t.Helper()
239244
w, err := workflow.Start(wd, params)
240245
if err != nil {
241246
t.Fatal(err)
@@ -244,6 +249,7 @@ func startWorkflow(t *testing.T, wd *workflow.Definition, params map[string]stri
244249
}
245250

246251
func runWorkflow(t *testing.T, w *workflow.Workflow, listener workflow.Listener) map[string]interface{} {
252+
t.Helper()
247253
if listener == nil {
248254
listener = &verboseListener{t}
249255
}
@@ -260,7 +266,7 @@ func (l *verboseListener) TaskStateChanged(_, _ string, st *workflow.TaskState)
260266
switch {
261267
case !st.Finished:
262268
l.t.Logf("task %-10v: started", st.Name)
263-
case st.Error != nil:
269+
case st.Error != "":
264270
l.t.Logf("task %-10v: error: %v", st.Name, st.Error)
265271
default:
266272
l.t.Logf("task %-10v: done: %v", st.Name, st.Result)

0 commit comments

Comments
 (0)