Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apt/curio.service
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ LimitNOFILE=1000000
Restart=always
RestartSec=10
EnvironmentFile=/etc/curio.env
RestartForceExitStatus=100

[Install]
WantedBy=multi-user.target
1 change: 1 addition & 0 deletions documentation/en/curio-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ LimitNOFILE=1000000
Restart=always
RestartSec=10
EnvironmentFile=/etc/curio.env
RestartForceExitStatus=100

[Install]
WantedBy=multi-user.target
Expand Down
2 changes: 2 additions & 0 deletions harmony/harmonydb/sql/20230719-harmony.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ CREATE TABLE harmony_task (
previous_task INTEGER,
name varchar(16) NOT NULL
-- retries INTEGER NOT NULL DEFAULT 0 -- added later
-- unschedulable BOOLEAN DEFAULT FALSE -- added in 20250111-machine-maintenance.sql
-- restart_request TIMESTAMP WITH TIME ZONE -- added in 20250818-restart-request.sql
);
COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.';
COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.';
Expand Down
2 changes: 2 additions & 0 deletions harmony/harmonydb/sql/20250818-restart-request.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE harmony_machines
ADD COLUMN restart_request TIMESTAMP WITH TIME ZONE;
97 changes: 89 additions & 8 deletions harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package harmonytask
import (
"context"
"fmt"
"os"
"strconv"
"sync/atomic"
"time"
Expand All @@ -23,6 +24,8 @@ var POLL_NEXT_DURATION = 100 * time.Millisecond // After scheduling a task, wait
var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone
var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often

var ExitStatusRestartRequest = 100

type TaskTypeDetails struct {
// Max returns how many tasks this machine can run of this type.
// Nil (default)/Zero or less means unrestricted.
Expand Down Expand Up @@ -57,6 +60,16 @@ type TaskTypeDetails struct {
// CanAccept() can read taskEngine's WorkOrigin string to learn about a task.
// Ex: make new CC sectors, clean-up, or retrying pipelines that failed in later states.
IAmBored func(AddTaskFunc) error

// CanYield is true if the task should yield when the node is not schedulable.
// This is implied for background tasks.
CanYield bool

// SchedOverrides is a map of task names which, when running while the node is not schedulable,
// allow this task to continue being scheduled. This is useful in pipelines where a long-running
// task would block a short-running task from being scheduled, blocking other related pipelines on
// other machines.
SchedulingOverrides map[string]bool
}

// TaskInterface must be implemented in order to have a task used by harmonytask.
Expand Down Expand Up @@ -126,6 +139,9 @@ type TaskEngine struct {
follows map[string][]followStruct
hostAndPort string

// runtime flags
yieldBackground atomic.Bool

// synchronous to the single-threaded poller
lastFollowTime time.Time
lastCleanup atomic.Value
Expand Down Expand Up @@ -283,20 +299,24 @@ func (e *TaskEngine) poller() {
nextWait = POLL_DURATION

// Check if the machine is schedulable
schedulable, err := e.schedulable()
schedulable, err := e.checkNodeFlags()
if err != nil {
log.Error("Unable to check schedulable status: ", err)
continue
}

e.yieldBackground.Store(!schedulable)

accepted := e.pollerTryAllWork(schedulable)
if accepted {
nextWait = POLL_NEXT_DURATION
}

if !schedulable {
log.Debugf("Machine %s is not schedulable. Please check the cordon status.", e.hostAndPort)
continue
}

accepted := e.pollerTryAllWork()
if accepted {
nextWait = POLL_NEXT_DURATION
}
if time.Since(e.lastFollowTime) > FOLLOW_FREQUENCY {
e.followWorkInDB()
}
Expand Down Expand Up @@ -361,12 +381,40 @@ func (e *TaskEngine) followWorkInDB() {
}

// pollerTryAllWork starts the next 1 task
func (e *TaskEngine) pollerTryAllWork() bool {
func (e *TaskEngine) pollerTryAllWork(schedulable bool) bool {
if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY {
e.lastCleanup.Store(time.Now())
resources.CleanupMachines(e.ctx, e.db)
}
for _, v := range e.handlers {
if !schedulable {
if v.TaskTypeDetails.SchedulingOverrides == nil {
continue
}

// Override the schedulable flag if the task has any assigned overrides
var foundOverride bool
for relatedTaskName := range v.TaskTypeDetails.SchedulingOverrides {
var assignedOverrideTasks []int
err := e.db.Select(e.ctx, &assignedOverrideTasks, `SELECT id
FROM harmony_task
WHERE owner_id = $1 AND name=$2
ORDER BY update_time LIMIT 1`, e.ownerID, relatedTaskName)
if err != nil {
log.Error("Unable to read assigned overrides ", err)
break
}
if len(assignedOverrideTasks) > 0 {
log.Infow("found override, scheduling despite schedulable=false flag", "ownerID", e.ownerID, "relatedTaskName", relatedTaskName, "assignedOverrideTasks", assignedOverrideTasks)
foundOverride = true
break
}
}
if !foundOverride {
continue
}
}

if err := v.AssertMachineHasCapacity(); err != nil {
log.Debugf("skipped scheduling %s type tasks on due to %s", v.Name, err.Error())
continue
Expand Down Expand Up @@ -407,6 +455,11 @@ func (e *TaskEngine) pollerTryAllWork() bool {
log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)")
}
}

if !schedulable {
return false
}

// if no work was accepted, are we bored? Then find work in priority order.
for _, v := range e.handlers {
v := v
Expand Down Expand Up @@ -462,15 +515,43 @@ func (e *TaskEngine) Host() string {
return e.hostAndPort
}

func (e *TaskEngine) schedulable() (bool, error) {
func (e *TaskEngine) checkNodeFlags() (bool, error) {
var unschedulable bool
err := e.db.QueryRow(e.ctx, `SELECT unschedulable FROM harmony_machines WHERE host_and_port=$1`, e.hostAndPort).Scan(&unschedulable)
var restartRequest *time.Time
err := e.db.QueryRow(e.ctx, `SELECT unschedulable, restart_request FROM harmony_machines WHERE host_and_port=$1`, e.hostAndPort).Scan(&unschedulable, &restartRequest)
if err != nil {
return false, err
}

if restartRequest != nil {
e.restartIfNoTasksPending(*restartRequest)
}

return !unschedulable, nil
}

func (e *TaskEngine) restartIfNoTasksPending(pendingSince time.Time) {
var tasksPending int
err := e.db.QueryRow(e.ctx, `SELECT COUNT(*) FROM harmony_task WHERE owner_id=$1`, e.ownerID).Scan(&tasksPending)
if err != nil {
log.Error("Unable to check for tasks pending: ", err)
return
}
if tasksPending == 0 {
log.Infow("no tasks pending, restarting", "ownerID", e.ownerID, "pendingSince", pendingSince, "took", time.Since(pendingSince))

// unset the flags first
_, err = e.db.Exec(e.ctx, `UPDATE harmony_machines SET restart_request=NULL, unschedulable=FALSE WHERE host_and_port=$1`, e.hostAndPort)
if err != nil {
log.Error("Unable to unset restart request: ", err)
return
}

// then exit
os.Exit(ExitStatusRestartRequest)
}
}

// About the Registry
// This registry exists for the benefit of "static methods" of TaskInterface extensions.
// For example, GetSPID(db, taskID) (int, err) is a static method that can be called
Expand Down
29 changes: 29 additions & 0 deletions harmony/harmonytask/metrics.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package harmonytask

import (
"context"
"time"

promclient "github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

curiobuild "github.com/filecoin-project/curio/build"
)

var (
taskNameTag, _ = tag.NewKey("task_name")
sourceTag, _ = tag.NewKey("source")
versionTag, _ = tag.NewKey("version")
pre = "harmonytask_"

// tasks can be short, but can extend to hours
Expand All @@ -18,6 +24,7 @@ var (

// TaskMeasures groups all harmonytask metrics.
var TaskMeasures = struct {
Uptime *stats.Int64Measure
TasksStarted *stats.Int64Measure
TasksCompleted *stats.Int64Measure
TasksFailed *stats.Int64Measure
Expand All @@ -29,6 +36,7 @@ var TaskMeasures = struct {
PollerIterations *stats.Int64Measure
AddedTasks *stats.Int64Measure
}{
Uptime: stats.Int64(pre+"uptime", "Total uptime of the node in seconds.", stats.UnitSeconds),
TasksStarted: stats.Int64(pre+"tasks_started", "Total number of tasks started.", stats.UnitDimensionless),
TasksCompleted: stats.Int64(pre+"tasks_completed", "Total number of tasks completed successfully.", stats.UnitDimensionless),
TasksFailed: stats.Int64(pre+"tasks_failed", "Total number of tasks that failed.", stats.UnitDimensionless),
Expand All @@ -48,6 +56,11 @@ var TaskMeasures = struct {
// TaskViews groups all harmonytask-related default views.
func init() {
err := view.Register(
&view.View{
Measure: TaskMeasures.Uptime,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{versionTag},
},
&view.View{
Measure: TaskMeasures.TasksStarted,
Aggregation: view.Sum(),
Expand Down Expand Up @@ -102,4 +115,20 @@ func init() {
if err != nil {
panic(err)
}

// record uptime every 10 seconds
go func() {
v := curiobuild.UserVersion()
bootTime := time.Now()

for {
time.Sleep(10 * time.Second)
err := stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(versionTag, v),
}, TaskMeasures.Uptime.M(int64(time.Since(bootTime).Seconds())))
if err != nil {
log.Errorw("Could not record uptime", "error", err)
}
}
}()
}
8 changes: 8 additions & 0 deletions harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/taskhelp"
)

var log = logging.Logger("harmonytask")
Expand Down Expand Up @@ -215,6 +216,13 @@ canAcceptAgain:
}()

done, doErr = h.Do(*tID, func() bool {
if taskhelp.IsBackgroundTask(h.Name) || h.CanYield {
if h.TaskEngine.yieldBackground.Load() {
log.Infow("yielding background task", "name", h.Name, "id", *tID)
return false
}
}

var owner int
// Background here because we don't want GracefulRestart to block this save.
err := h.TaskEngine.db.QueryRow(context.Background(),
Expand Down
1 change: 1 addition & 0 deletions harmony/taskhelp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func SliceIfFound[T any](slice []T, f func(T) bool) ([]T, bool) {
// BackgroundTask are tasks that:
// * Always run in the background
// * Never finish "successfully"
// When a node is cordoned (not schedulable), background tasks MUST yield.
func BackgroundTask(name string) string {
return "bg:" + name
}
Expand Down
2 changes: 1 addition & 1 deletion tasks/f3/f3_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (f *F3Task) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done boo
// When participateLoop returns, we go back to get a new ticket
}

return false, xerrors.Errorf("f3 task is background task")
return false, xerrors.Errorf("yield")
}

func (f *F3Task) tryGetF3ParticipationTicket(ctx context.Context, stillOwned func() bool, participant address.Address, previousTicket []byte) (api.F3ParticipationTicket, error) {
Expand Down
3 changes: 2 additions & 1 deletion tasks/proofshare/task_client_poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (t *TaskClientPoll) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
// TypeDetails implements harmonytask.TaskInterface.
func (t *TaskClientPoll) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "PSClientPoll",
Name: "PSClientPoll",
CanYield: true,
Cost: resources.Resources{
Cpu: 0,
Ram: 4 << 20,
Expand Down
1 change: 0 additions & 1 deletion tasks/proofshare/task_client_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func (t *TaskClientSend) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
return false, xerrors.Errorf("failed to create payment: %w", err)
}
if !match {
log.Infow("no applicable requests found", "taskID", taskID, "wallet", walletID)
time.Sleep(time.Duration(100+rand.Intn(150)) * time.Second / 100)
continue
}
Expand Down
19 changes: 19 additions & 0 deletions tasks/seal/task_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package seal

import (
"context"
"fmt"

"golang.org/x/xerrors"

Expand Down Expand Up @@ -233,7 +234,25 @@ func (f *FinalizeTask) TypeDetails() harmonytask.TaskTypeDetails {
Ram: 100 << 20,
},
MaxFailures: 10,

// Allow finalize to be scheduled when batch tasks are still running even when the node is not schedulable.
// This allows finalize to unblock move-storage and PoRep for multiple hours while the node is technically not schedulable,
// but is still finishing another batch. In most cases this behavior enables nearly zero-waste restarts of supraseal nodes.
SchedulingOverrides: batchTaskNameGrid(),
}
}

func batchTaskNameGrid() map[string]bool {
batchSizes := []int{128, 64, 32, 16, 8}
sectorSizes := []string{"32G", "64G"}

out := map[string]bool{}
for _, batchSize := range batchSizes {
for _, sectorSize := range sectorSizes {
out[fmt.Sprintf("Batch%d-%s", batchSize, sectorSize)] = true
}
}
return out
}

func (f *FinalizeTask) Adder(taskFunc harmonytask.AddTaskFunc) {
Expand Down
Loading