Skip to content

Commit 19d8baf

Browse files
committed
cmd/relui,internal/relui: resume workflows
Resume all unfinished workflows on relui startup. Moves most workflow running logic to a new Worker type. This also adds a couple new columns to the workflow to record output and whether a workflow has completed. In order to actually resume workflows, we'll have to either add functionality to unmark them as "finished" (restarting manually, which seems reasonable), or not marking a workflow finished if it terminates with context.Cancelled. For golang/go#47401 Change-Id: I3e0ed021d7a47fb125f1034df83dc3c6d95887f8 Reviewed-on: https://go-review.googlesource.com/c/build/+/353553 Trust: Alexander Rakoczy <[email protected]> Run-TryBot: Alexander Rakoczy <[email protected]> TryBot-Result: Go Bot <[email protected]> Reviewed-by: Heschi Kreinick <[email protected]>
1 parent beddf46 commit 19d8baf

14 files changed

+667
-99
lines changed

cmd/relui/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ func main() {
4141
log.Fatal(err)
4242
}
4343
defer db.Close()
44-
s := relui.NewServer(db)
44+
w := relui.NewWorker(db, relui.NewPGListener(db))
45+
go w.Run(ctx)
46+
if err := w.ResumeAll(ctx); err != nil {
47+
log.Printf("w.ResumeAll() = %v", err)
48+
}
49+
s := relui.NewServer(db, w)
4550
if err != nil {
4651
log.Fatalf("relui.NewServer() = %v", err)
4752
}

cmd/relui/sqlc.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@ overrides:
1010
- go_type: "database/sql.NullString"
1111
db_type: "jsonb"
1212
nullable: true
13+
- go_type: "string"
14+
db_type: "jsonb"
15+
nullable: false

internal/relui/db/models.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/relui/db/workflows.sql.go

Lines changed: 87 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/relui/listener.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@ import (
1919
"golang.org/x/build/internal/workflow"
2020
)
2121

22-
// listener implements workflow.Listener for recording workflow state.
23-
type listener struct {
22+
func NewPGListener(db *pgxpool.Pool) *PGListener {
23+
return &PGListener{db}
24+
}
25+
26+
// PGListener implements workflow.Listener for recording workflow state.
27+
type PGListener struct {
2428
db *pgxpool.Pool
2529
}
2630

2731
// TaskStateChanged is called whenever a task is updated by the
2832
// workflow. The workflow.TaskState is persisted as a db.Task,
2933
// creating or updating a row as necessary.
30-
func (l *listener) TaskStateChanged(workflowID uuid.UUID, taskName string, state *workflow.TaskState) error {
34+
func (l *PGListener) TaskStateChanged(workflowID uuid.UUID, taskName string, state *workflow.TaskState) error {
3135
log.Printf("TaskStateChanged(%q, %q, %v)", workflowID, taskName, state)
3236
ctx, cancel := context.WithCancel(context.Background())
3337
defer cancel()
@@ -55,7 +59,43 @@ func (l *listener) TaskStateChanged(workflowID uuid.UUID, taskName string, state
5559
return err
5660
}
5761

58-
func (l *listener) Logger(workflowID uuid.UUID, taskName string) workflow.Logger {
62+
// WorkflowStarted persists a new workflow execution in the database.
63+
func (l *PGListener) WorkflowStarted(ctx context.Context, workflowID uuid.UUID, name string, params map[string]string) error {
64+
q := db.New(l.db)
65+
m, err := json.Marshal(params)
66+
if err != nil {
67+
return err
68+
}
69+
updated := time.Now()
70+
_, err = q.CreateWorkflow(ctx, db.CreateWorkflowParams{
71+
ID: workflowID,
72+
Name: sql.NullString{String: name, Valid: true},
73+
Params: sql.NullString{String: string(m), Valid: len(m) > 0},
74+
CreatedAt: updated,
75+
UpdatedAt: updated,
76+
})
77+
return err
78+
}
79+
80+
// WorkflowFinished saves the final state of a workflow after its run
81+
// has completed.
82+
func (l *PGListener) WorkflowFinished(ctx context.Context, workflowID uuid.UUID, outputs map[string]interface{}, err error) error {
83+
log.Printf("WorkflowCompleted(%q, %v, %q)", workflowID, outputs, err)
84+
q := db.New(l.db)
85+
m, err := json.Marshal(outputs)
86+
if err != nil {
87+
return err
88+
}
89+
_, err = q.WorkflowFinished(ctx, db.WorkflowFinishedParams{
90+
ID: workflowID,
91+
Finished: true,
92+
Output: string(m),
93+
UpdatedAt: time.Now(),
94+
})
95+
return err
96+
}
97+
98+
func (l *PGListener) Logger(workflowID uuid.UUID, taskName string) workflow.Logger {
5999
return &postgresLogger{
60100
db: l.db,
61101
workflowID: workflowID,

internal/relui/listener_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestListenerTaskStateChanged(t *testing.T) {
2828
t.Fatalf("q.CreateWorkflow(%v, %v) = %v, wanted no error", ctx, wfp, err)
2929
}
3030

31-
l := &listener{db: dbp}
31+
l := &PGListener{db: dbp}
3232
state := &workflow.TaskState{
3333
Name: "TestTask",
3434
Finished: true,
@@ -75,7 +75,7 @@ func TestListenerLogger(t *testing.T) {
7575
t.Fatalf("q.UpsertTask(%v, %v) = %v, wanted no error", ctx, params, err)
7676
}
7777

78-
l := &listener{db: dbp}
78+
l := &PGListener{db: dbp}
7979
l.Logger(wf.ID, "TestTask").Printf("A fancy log line says %q", "hello")
8080

8181
logs, err := q.TaskLogs(ctx)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-- Copyright 2021 The Go Authors. All rights reserved.
2+
-- Use of this source code is governed by a BSD-style
3+
-- license that can be found in the LICENSE file.
4+
5+
BEGIN;
6+
7+
ALTER TABLE workflows DROP COLUMN finished;
8+
ALTER TABLE workflows DROP COLUMN output;
9+
10+
COMMIT;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-- Copyright 2021 The Go Authors. All rights reserved.
2+
-- Use of this source code is governed by a BSD-style
3+
-- license that can be found in the LICENSE file.
4+
5+
BEGIN;
6+
7+
ALTER TABLE workflows
8+
ADD COLUMN finished bool NOT NULL DEFAULT false;
9+
10+
CREATE INDEX workflows_finished_ix ON workflows (finished) WHERE finished = false;
11+
12+
ALTER TABLE workflows
13+
ADD COLUMN output jsonb NOT NULL DEFAULT jsonb_build_object();
14+
15+
ALTER TABLE workflows
16+
ADD COLUMN error text NOT NULL DEFAULT '';
17+
18+
UPDATE workflows
19+
SET finished = true
20+
WHERE workflows.id NOT IN (
21+
SELECT DISTINCT tasks.workflow_id
22+
FROM tasks
23+
WHERE finished = false
24+
);
25+
26+
COMMIT;

internal/relui/queries/workflows.sql

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
-- license that can be found in the LICENSE file.
44

55
-- name: Workflows :many
6-
SELECT id, params, name, created_at, updated_at
6+
SELECT *
77
FROM workflows
88
ORDER BY created_at DESC;
99

1010
-- name: Workflow :one
11-
SELECT id, params, name, created_at, updated_at
11+
SELECT *
1212
FROM workflows
1313
WHERE id = $1;
1414

@@ -41,7 +41,7 @@ ORDER BY created_at;
4141
-- name: TasksForWorkflow :many
4242
SELECT tasks.*
4343
FROM tasks
44-
WHERE workflow_id=$1
44+
WHERE workflow_id = $1
4545
ORDER BY created_at;
4646

4747
-- name: CreateTaskLog :one
@@ -52,10 +52,25 @@ RETURNING *;
5252
-- name: TaskLogsForTask :many
5353
SELECT task_logs.*
5454
FROM task_logs
55-
WHERE workflow_id=$1 AND task_name = $2
55+
WHERE workflow_id = $1
56+
AND task_name = $2
5657
ORDER BY created_at;
5758

5859
-- name: TaskLogs :many
5960
SELECT task_logs.*
6061
FROM task_logs
6162
ORDER BY created_at;
63+
64+
-- name: UnfinishedWorkflows :many
65+
SELECT workflows.*
66+
FROM workflows
67+
WHERE workflows.finished = false;
68+
69+
-- name: WorkflowFinished :one
70+
UPDATE workflows
71+
SET finished = $2,
72+
output = $3,
73+
updated_at = $4
74+
WHERE workflows.id = $1
75+
RETURNING *;
76+

0 commit comments

Comments
 (0)