From b50ffb4f2ce6f00d735b5968bdfebe0f7a1f93b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 11:06:17 +0200 Subject: [PATCH 01/12] cordon: yield bg tasks --- harmony/harmonytask/harmonytask.go | 5 +++++ harmony/harmonytask/task_type_handler.go | 8 ++++++++ tasks/f3/f3_task.go | 2 +- tasks/proofshare/task_client_send.go | 1 - 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/harmony/harmonytask/harmonytask.go b/harmony/harmonytask/harmonytask.go index df8c016b3..4230e6759 100644 --- a/harmony/harmonytask/harmonytask.go +++ b/harmony/harmonytask/harmonytask.go @@ -126,6 +126,9 @@ type TaskEngine struct { follows map[string][]followStruct hostAndPort string + // runtime flags + yieldBackground atomic.Bool + // synchronous to the single-threaded poller lastFollowTime time.Time lastCleanup atomic.Value @@ -288,6 +291,8 @@ func (e *TaskEngine) poller() { log.Error("Unable to check schedulable status: ", err) continue } + + e.yieldBackground.Store(!schedulable) if !schedulable { log.Debugf("Machine %s is not schedulable. Please check the cordon status.", e.hostAndPort) continue diff --git a/harmony/harmonytask/task_type_handler.go b/harmony/harmonytask/task_type_handler.go index 6db86bb23..3ecc45a1d 100644 --- a/harmony/harmonytask/task_type_handler.go +++ b/harmony/harmonytask/task_type_handler.go @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/taskhelp" ) var log = logging.Logger("harmonytask") @@ -215,6 +216,13 @@ canAcceptAgain: }() done, doErr = h.Do(*tID, func() bool { + if taskhelp.IsBackgroundTask(h.Name) { + if h.TaskEngine.yieldBackground.Load() { + log.Infow("yielding background task", "name", h.Name, "id", *tID) + return false + } + } + var owner int // Background here because we don't want GracefulRestart to block this save. err := h.TaskEngine.db.QueryRow(context.Background(), diff --git a/tasks/f3/f3_task.go b/tasks/f3/f3_task.go index 5a9437c71..86d280ae8 100644 --- a/tasks/f3/f3_task.go +++ b/tasks/f3/f3_task.go @@ -112,7 +112,7 @@ func (f *F3Task) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done boo // When participateLoop returns, we go back to get a new ticket } - return false, xerrors.Errorf("f3 task is background task") + return false, xerrors.Errorf("yield") } func (f *F3Task) tryGetF3ParticipationTicket(ctx context.Context, stillOwned func() bool, participant address.Address, previousTicket []byte) (api.F3ParticipationTicket, error) { diff --git a/tasks/proofshare/task_client_send.go b/tasks/proofshare/task_client_send.go index 77b2c256c..074a9b943 100644 --- a/tasks/proofshare/task_client_send.go +++ b/tasks/proofshare/task_client_send.go @@ -142,7 +142,6 @@ func (t *TaskClientSend) Do(taskID harmonytask.TaskID, stillOwned func() bool) ( return false, xerrors.Errorf("failed to create payment: %w", err) } if !match { - log.Infow("no applicable requests found", "taskID", taskID, "wallet", walletID) time.Sleep(time.Duration(100+rand.Intn(150)) * time.Second / 100) continue } From d9f787c0f65d6b296b2f338ecb85b6f052c47a83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 11:16:52 +0200 Subject: [PATCH 02/12] make PSClientPoll yield as well --- harmony/harmonytask/harmonytask.go | 4 ++++ harmony/harmonytask/task_type_handler.go | 2 +- harmony/taskhelp/common.go | 1 + tasks/proofshare/task_client_poll.go | 1 + 4 files changed, 7 insertions(+), 1 deletion(-) diff --git a/harmony/harmonytask/harmonytask.go b/harmony/harmonytask/harmonytask.go index 4230e6759..b4fbd9d85 100644 --- a/harmony/harmonytask/harmonytask.go +++ b/harmony/harmonytask/harmonytask.go @@ -57,6 +57,10 @@ type TaskTypeDetails struct { // CanAccept() can read taskEngine's WorkOrigin string to learn about a task. // Ex: make new CC sectors, clean-up, or retrying pipelines that failed in later states. IAmBored func(AddTaskFunc) error + + // CanYield is true if the task should yield when the node is not schedulable. + // This is implied for background tasks. + CanYield bool } // TaskInterface must be implemented in order to have a task used by harmonytask. diff --git a/harmony/harmonytask/task_type_handler.go b/harmony/harmonytask/task_type_handler.go index 3ecc45a1d..f3af543c3 100644 --- a/harmony/harmonytask/task_type_handler.go +++ b/harmony/harmonytask/task_type_handler.go @@ -216,7 +216,7 @@ canAcceptAgain: }() done, doErr = h.Do(*tID, func() bool { - if taskhelp.IsBackgroundTask(h.Name) { + if taskhelp.IsBackgroundTask(h.Name) || h.CanYield { if h.TaskEngine.yieldBackground.Load() { log.Infow("yielding background task", "name", h.Name, "id", *tID) return false diff --git a/harmony/taskhelp/common.go b/harmony/taskhelp/common.go index 3ee1869fc..10a930d8f 100644 --- a/harmony/taskhelp/common.go +++ b/harmony/taskhelp/common.go @@ -23,6 +23,7 @@ func SliceIfFound[T any](slice []T, f func(T) bool) ([]T, bool) { // BackgroundTask are tasks that: // * Always run in the background // * Never finish "successfully" +// When a node is cordoned (not schedulable), background tasks MUST yield. func BackgroundTask(name string) string { return "bg:" + name } diff --git a/tasks/proofshare/task_client_poll.go b/tasks/proofshare/task_client_poll.go index c25fd7518..6eff3c417 100644 --- a/tasks/proofshare/task_client_poll.go +++ b/tasks/proofshare/task_client_poll.go @@ -198,6 +198,7 @@ func (t *TaskClientPoll) Do(taskID harmonytask.TaskID, stillOwned func() bool) ( func (t *TaskClientPoll) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Name: "PSClientPoll", + CanYield: true, Cost: resources.Resources{ Cpu: 0, Ram: 4 << 20, From 536761b05f10febb7d2e3c4455968b7c87c45615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 11:35:26 +0200 Subject: [PATCH 03/12] harmonytask: Scheduling Overrides --- harmony/harmonytask/harmonytask.go | 51 +++++++++++++++++++++++++++--- tasks/seal/task_finalize.go | 19 +++++++++++ 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/harmony/harmonytask/harmonytask.go b/harmony/harmonytask/harmonytask.go index b4fbd9d85..6c27e0841 100644 --- a/harmony/harmonytask/harmonytask.go +++ b/harmony/harmonytask/harmonytask.go @@ -61,6 +61,12 @@ type TaskTypeDetails struct { // CanYield is true if the task should yield when the node is not schedulable. // This is implied for background tasks. CanYield bool + + // SchedOverrides is a map of task names which, when running while the node is not schedulable, + // allow this task to continue being scheduled. This is useful in pipelines where a long-running + // task would block a short-running task from being scheduled, blocking other related pipelines on + // other machines. + SchedulingOverrides map[string]bool } // TaskInterface must be implemented in order to have a task used by harmonytask. @@ -297,15 +303,17 @@ func (e *TaskEngine) poller() { } e.yieldBackground.Store(!schedulable) - if !schedulable { - log.Debugf("Machine %s is not schedulable. Please check the cordon status.", e.hostAndPort) - continue - } accepted := e.pollerTryAllWork() if accepted { nextWait = POLL_NEXT_DURATION } + + if !schedulable { + log.Debugf("Machine %s is not schedulable. Please check the cordon status.", e.hostAndPort) + continue + } + if time.Since(e.lastFollowTime) > FOLLOW_FREQUENCY { e.followWorkInDB() } @@ -370,12 +378,40 @@ func (e *TaskEngine) followWorkInDB() { } // pollerTryAllWork starts the next 1 task -func (e *TaskEngine) pollerTryAllWork() bool { +func (e *TaskEngine) pollerTryAllWork(schedulable bool) bool { if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY { e.lastCleanup.Store(time.Now()) resources.CleanupMachines(e.ctx, e.db) } for _, v := range e.handlers { + if !schedulable { + if v.TaskTypeDetails.SchedulingOverrides == nil { + continue + } + + // Override the schedulable flag if the task has any assigned overrides + var foundOverride bool + for relatedTaskName := range v.TaskTypeDetails.SchedulingOverrides { + var assignedOverrideTasks []int + err := e.db.Select(e.ctx, &assignedOverrideTasks, `SELECT id + FROM harmony_task + WHERE owner_id = $1 AND name=$2 + ORDER BY update_time LIMIT 1`, e.ownerID, relatedTaskName) + if err != nil { + log.Error("Unable to read assigned overrides ", err) + break + } + if len(assignedOverrideTasks) > 0 { + log.Infow("found override, scheduling despite schedulable=false flag", "ownerID", e.ownerID, "relatedTaskName", relatedTaskName, "assignedOverrideTasks", assignedOverrideTasks) + foundOverride = true + break + } + } + if !foundOverride { + continue + } + } + if err := v.AssertMachineHasCapacity(); err != nil { log.Debugf("skipped scheduling %s type tasks on due to %s", v.Name, err.Error()) continue @@ -416,6 +452,11 @@ func (e *TaskEngine) pollerTryAllWork() bool { log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)") } } + + if !schedulable { + return false + } + // if no work was accepted, are we bored? Then find work in priority order. for _, v := range e.handlers { v := v diff --git a/tasks/seal/task_finalize.go b/tasks/seal/task_finalize.go index 0f1fac0cd..aeb512243 100644 --- a/tasks/seal/task_finalize.go +++ b/tasks/seal/task_finalize.go @@ -2,6 +2,7 @@ package seal import ( "context" + "fmt" "golang.org/x/xerrors" @@ -233,7 +234,25 @@ func (f *FinalizeTask) TypeDetails() harmonytask.TaskTypeDetails { Ram: 100 << 20, }, MaxFailures: 10, + + // Allow finalize to be scheduled when batch tasks are still running even when the node is not schedulable. + // This allows finalize to unblock move-storage and PoRep for multiple hours while the node is technically not schedulable, + // but is still finishing another batch. In most cases this behavior enables nearly zero-waste restarts of supraseal nodes. + SchedulingOverrides: batchTaskNameGrid(), + } +} + +func batchTaskNameGrid() map[string]bool { + batchSizes := []int{128, 64, 32, 16, 8} + sectorSizes := []string{"32G", "64G"} + + out := map[string]bool{} + for _, batchSize := range batchSizes { + for _, sectorSize := range sectorSizes { + out[fmt.Sprintf("Batch%d-%s", batchSize, sectorSize)] = true + } } + return out } func (f *FinalizeTask) Adder(taskFunc harmonytask.AddTaskFunc) { From 6c2cef346d7741d1044e77cb0c43c1b4390e1bd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 11:43:22 +0200 Subject: [PATCH 04/12] metrics: record version --- harmony/harmonytask/metrics.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/harmony/harmonytask/metrics.go b/harmony/harmonytask/metrics.go index 76a6b2e5a..656c5bfca 100644 --- a/harmony/harmonytask/metrics.go +++ b/harmony/harmonytask/metrics.go @@ -1,6 +1,10 @@ package harmonytask import ( + "context" + "time" + + curiobuild "github.com/filecoin-project/curio/build" promclient "github.com/prometheus/client_golang/prometheus" "go.opencensus.io/stats" "go.opencensus.io/stats/view" @@ -10,6 +14,7 @@ import ( var ( taskNameTag, _ = tag.NewKey("task_name") sourceTag, _ = tag.NewKey("source") + versionTag, _ = tag.NewKey("version") pre = "harmonytask_" // tasks can be short, but can extend to hours @@ -18,6 +23,7 @@ var ( // TaskMeasures groups all harmonytask metrics. var TaskMeasures = struct { + Uptime *stats.Int64Measure TasksStarted *stats.Int64Measure TasksCompleted *stats.Int64Measure TasksFailed *stats.Int64Measure @@ -29,6 +35,7 @@ var TaskMeasures = struct { PollerIterations *stats.Int64Measure AddedTasks *stats.Int64Measure }{ + Uptime: stats.Int64(pre+"uptime", "Total uptime of the node in seconds.", stats.UnitSeconds), TasksStarted: stats.Int64(pre+"tasks_started", "Total number of tasks started.", stats.UnitDimensionless), TasksCompleted: stats.Int64(pre+"tasks_completed", "Total number of tasks completed successfully.", stats.UnitDimensionless), TasksFailed: stats.Int64(pre+"tasks_failed", "Total number of tasks that failed.", stats.UnitDimensionless), @@ -48,6 +55,11 @@ var TaskMeasures = struct { // TaskViews groups all harmonytask-related default views. func init() { err := view.Register( + &view.View{ + Measure: TaskMeasures.Uptime, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{versionTag}, + }, &view.View{ Measure: TaskMeasures.TasksStarted, Aggregation: view.Sum(), @@ -102,4 +114,20 @@ func init() { if err != nil { panic(err) } + + // record uptime every 10 seconds + go func() { + v := curiobuild.UserVersion() + bootTime := time.Now() + + for { + time.Sleep(10 * time.Second) + err := stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(versionTag, v), + }, TaskMeasures.Uptime.M(int64(time.Since(bootTime).Seconds()))) + if err != nil { + log.Errorw("Could not record uptime", "error", err) + } + } + }() } From 56939482b06a323ed00d8a3da077b2e53d533b12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 11:43:55 +0200 Subject: [PATCH 05/12] fix build --- harmony/harmonytask/harmonytask.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/harmony/harmonytask/harmonytask.go b/harmony/harmonytask/harmonytask.go index 6c27e0841..72fcdf5d3 100644 --- a/harmony/harmonytask/harmonytask.go +++ b/harmony/harmonytask/harmonytask.go @@ -304,7 +304,7 @@ func (e *TaskEngine) poller() { e.yieldBackground.Store(!schedulable) - accepted := e.pollerTryAllWork() + accepted := e.pollerTryAllWork(schedulable) if accepted { nextWait = POLL_NEXT_DURATION } From b1704dfcd9b05e910f3a444974cb1d026c1a9a11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 12:09:55 +0200 Subject: [PATCH 06/12] smart restart --- apt/curio.service | 1 + documentation/en/curio-service.md | 1 + harmony/harmonydb/sql/20230719-harmony.sql | 2 ++ harmony/harmonytask/harmonytask.go | 37 ++++++++++++++++++++-- 4 files changed, 38 insertions(+), 3 deletions(-) diff --git a/apt/curio.service b/apt/curio.service index 02223f65f..a2fa3dc7b 100644 --- a/apt/curio.service +++ b/apt/curio.service @@ -10,6 +10,7 @@ LimitNOFILE=1000000 Restart=always RestartSec=10 EnvironmentFile=/etc/curio.env +RestartForceExitStatus=100 [Install] WantedBy=multi-user.target diff --git a/documentation/en/curio-service.md b/documentation/en/curio-service.md index 97f0ccf53..70c918139 100644 --- a/documentation/en/curio-service.md +++ b/documentation/en/curio-service.md @@ -27,6 +27,7 @@ LimitNOFILE=1000000 Restart=always RestartSec=10 EnvironmentFile=/etc/curio.env +RestartForceExitStatus=100 [Install] WantedBy=multi-user.target diff --git a/harmony/harmonydb/sql/20230719-harmony.sql b/harmony/harmonydb/sql/20230719-harmony.sql index 5b3d8b265..969b2e56b 100644 --- a/harmony/harmonydb/sql/20230719-harmony.sql +++ b/harmony/harmonydb/sql/20230719-harmony.sql @@ -19,6 +19,8 @@ CREATE TABLE harmony_task ( previous_task INTEGER, name varchar(16) NOT NULL -- retries INTEGER NOT NULL DEFAULT 0 -- added later + -- unschedulable BOOLEAN DEFAULT FALSE -- added in 20250111-machine-maintenance.sql + -- restart_request TIMESTAMP WITH TIME ZONE -- added in 20250818-restart-request.sql ); COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.'; COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.'; diff --git a/harmony/harmonytask/harmonytask.go b/harmony/harmonytask/harmonytask.go index 72fcdf5d3..48f045f80 100644 --- a/harmony/harmonytask/harmonytask.go +++ b/harmony/harmonytask/harmonytask.go @@ -3,6 +3,7 @@ package harmonytask import ( "context" "fmt" + "os" "strconv" "sync/atomic" "time" @@ -23,6 +24,8 @@ var POLL_NEXT_DURATION = 100 * time.Millisecond // After scheduling a task, wait var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often +var ExitStatusRestartRequest = 100 + type TaskTypeDetails struct { // Max returns how many tasks this machine can run of this type. // Nil (default)/Zero or less means unrestricted. @@ -296,7 +299,7 @@ func (e *TaskEngine) poller() { nextWait = POLL_DURATION // Check if the machine is schedulable - schedulable, err := e.schedulable() + schedulable, err := e.checkNodeFlags() if err != nil { log.Error("Unable to check schedulable status: ", err) continue @@ -512,15 +515,43 @@ func (e *TaskEngine) Host() string { return e.hostAndPort } -func (e *TaskEngine) schedulable() (bool, error) { +func (e *TaskEngine) checkNodeFlags() (bool, error) { var unschedulable bool - err := e.db.QueryRow(e.ctx, `SELECT unschedulable FROM harmony_machines WHERE host_and_port=$1`, e.hostAndPort).Scan(&unschedulable) + var restartRequest *time.Time + err := e.db.QueryRow(e.ctx, `SELECT unschedulable, restart_request FROM harmony_machines WHERE host_and_port=$1`, e.hostAndPort).Scan(&unschedulable, &restartRequest) if err != nil { return false, err } + + if restartRequest != nil { + e.restartIfNoTasksPending(*restartRequest) + } + return !unschedulable, nil } +func (e *TaskEngine) restartIfNoTasksPending(pendingSince time.Time) { + var tasksPending int + err := e.db.QueryRow(e.ctx, `SELECT COUNT(*) FROM harmony_task WHERE owner_id=$1`, e.ownerID).Scan(&tasksPending) + if err != nil { + log.Error("Unable to check for tasks pending: ", err) + return + } + if tasksPending == 0 { + log.Infow("no tasks pending, restarting", "ownerID", e.ownerID, "pendingSince", pendingSince, "took", time.Since(pendingSince)) + + // unset the flags first + _, err = e.db.Exec(e.ctx, `UPDATE harmony_machines SET restart_request=NULL, unschedulable=FALSE WHERE host_and_port=$1`, e.hostAndPort) + if err != nil { + log.Error("Unable to unset restart request: ", err) + return + } + + // then exit + os.Exit(ExitStatusRestartRequest) + } +} + // About the Registry // This registry exists for the benefit of "static methods" of TaskInterface extensions. // For example, GetSPID(db, taskID) (int, err) is a static method that can be called From 8771dced8ce5536753876f02337a171ff21932b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 12:19:24 +0200 Subject: [PATCH 07/12] webui: Jsonrpc reconnect fixes --- web/static/lib/jsonrpc.mjs | 102 +++++++++++++++++++++++++++++-------- 1 file changed, 82 insertions(+), 20 deletions(-) diff --git a/web/static/lib/jsonrpc.mjs b/web/static/lib/jsonrpc.mjs index 4f560f0bb..1aff5eaa2 100644 --- a/web/static/lib/jsonrpc.mjs +++ b/web/static/lib/jsonrpc.mjs @@ -20,31 +20,60 @@ class JsonRpcClient { this.url = url; this.requestId = 0; this.pendingRequests = new Map(); + + // Reconnection state + this._connectPromise = null; + this._reconnectTimer = null; + this._shouldReconnect = true; } async connect() { - return new Promise((resolve, reject) => { - this.ws = new WebSocket(this.url); - - this.ws.onopen = () => { - console.log("Connected to the server"); - resolve(); - }; - - this.ws.onclose = () => { - console.log("Connection closed, attempting to reconnect..."); - setTimeout(() => this.connect().then(resolve, reject), 1000); // Reconnect after 1 second - }; + if (this._connectPromise) { + return this._connectPromise; + } - this.ws.onerror = (error) => { - console.error("WebSocket error:", error); - reject(error); + this._shouldReconnect = true; + + this._connectPromise = new Promise((resolve) => { + const attempt = () => { + this.ws = new WebSocket(this.url); + + this.ws.onopen = () => { + console.log("Connected to the server"); + // Reset backoff on successful connect + this._clearReconnectTimer(); + // Resolve initial connect promise (subsequent reconnects are transparent) + if (this._connectPromise) { + // Resolve only once + resolve(); + this._connectPromise = null; + } + }; + + this.ws.onclose = () => { + console.log("Connection closed, attempting to reconnect..."); + // Reject all in-flight RPC calls + this._rejectAllPending(new Error('WebSocket disconnected')); + if (this._shouldReconnect) { + this._scheduleReconnect(attempt); + } + }; + + this.ws.onerror = (error) => { + console.error("WebSocket error:", error); + // Force close to unify handling in onclose + try { this.ws.close(); } catch (_) {} + }; + + this.ws.onmessage = (message) => { + this.handleMessage(message); + }; }; - this.ws.onmessage = (message) => { - this.handleMessage(message); - }; + attempt(); }); + + return this._connectPromise; } handleMessage(message) { @@ -74,13 +103,46 @@ class JsonRpcClient { return new Promise((resolve, reject) => { this.pendingRequests.set(id, { resolve, reject }); - if (this.ws.readyState === WebSocket.OPEN) { - this.ws.send(JSON.stringify(request)); + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + try { + this.ws.send(JSON.stringify(request)); + } catch (e) { + this.pendingRequests.delete(id); + reject(e); + } } else { + this.pendingRequests.delete(id); reject('WebSocket is not open'); } }); } + + _scheduleReconnect(attempt) { + if (this._reconnectTimer) { + return; + } + this._reconnectTimer = setTimeout(() => { + this._reconnectTimer = null; + attempt(); + }, 1000); + } + + _clearReconnectTimer() { + if (this._reconnectTimer) { + clearTimeout(this._reconnectTimer); + this._reconnectTimer = null; + } + } + + _rejectAllPending(error) { + const err = error instanceof Error ? error : new Error(String(error || 'WebSocket disconnected')); + for (const [id, resolver] of this.pendingRequests.entries()) { + try { + resolver.reject(err); + } catch (_) {} + } + this.pendingRequests.clear(); + } } async function init() { From 31e304083915d6af24ad416cfd742396d647415f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 12:31:18 +0200 Subject: [PATCH 08/12] webui: restart requests --- web/api/webrpc/cluster.go | 28 +++++++++++++++++++++++++++- web/static/cluster-machines.mjs | 32 +++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/web/api/webrpc/cluster.go b/web/api/webrpc/cluster.go index 04e8cb60e..f5de09e5b 100644 --- a/web/api/webrpc/cluster.go +++ b/web/api/webrpc/cluster.go @@ -24,6 +24,8 @@ type MachineSummary struct { Layers string Uptime string Unschedulable bool + RestartRequest string + Restarting bool RunningTasks int } @@ -38,6 +40,7 @@ func (a *WebRPC) ClusterMachines(ctx context.Context) ([]MachineSummary, error) hm.ram, hm.gpu, hm.unschedulable, + hm.restart_request, hmd.machine_name, hmd.tasks, hmd.layers, @@ -59,8 +62,9 @@ func (a *WebRPC) ClusterMachines(ctx context.Context) ([]MachineSummary, error) var lastContact time.Duration var ram int64 var uptime time.Time + var restartRequest *time.Time - if err := rows.Scan(&m.ID, &m.Address, &lastContact, &m.Cpu, &ram, &m.Gpu, &m.Unschedulable, &m.Name, &m.Tasks, &m.Layers, &uptime); err != nil { + if err := rows.Scan(&m.ID, &m.Address, &lastContact, &m.Cpu, &ram, &m.Gpu, &m.Unschedulable, &restartRequest, &m.Name, &m.Tasks, &m.Layers, &uptime); err != nil { return nil, err // Handle error } m.SinceContact = lastContact.Round(time.Second).String() @@ -76,6 +80,12 @@ func (a *WebRPC) ClusterMachines(ctx context.Context) ([]MachineSummary, error) } m.RunningTasks = runningTasks } + + if restartRequest != nil { + m.RestartRequest = humanize.Time(*restartRequest) + m.Restarting = true + } + summaries = append(summaries, m) } return summaries, nil @@ -409,3 +419,19 @@ func (a *WebRPC) Uncordon(ctx context.Context, id int64) error { } return nil } + +func (a *WebRPC) Restart(ctx context.Context, id int64) error { + _, err := a.deps.DB.Exec(ctx, `UPDATE harmony_machines SET restart_request = NOW() WHERE id = $1`, id) + if err != nil { + return xerrors.Errorf("restart failed: %w", err) + } + return nil +} + +func (a *WebRPC) AbortRestart(ctx context.Context, id int64) error { + _, err := a.deps.DB.Exec(ctx, `UPDATE harmony_machines SET restart_request = NULL WHERE id = $1`, id) + if err != nil { + return xerrors.Errorf("abort restart failed: %w", err) + } + return nil +} \ No newline at end of file diff --git a/web/static/cluster-machines.mjs b/web/static/cluster-machines.mjs index 839f4acea..976e3e529 100644 --- a/web/static/cluster-machines.mjs +++ b/web/static/cluster-machines.mjs @@ -34,6 +34,16 @@ customElements.define('cluster-machines', class ClusterMachines extends LitEleme this.loadData(); } + async _restart(id) { + await RPCCall('Restart', [id]); + this.loadData(); + } + + async _abortRestart(id) { + await RPCCall('AbortRestart', [id]); + this.loadData(); + } + render() { return html` ${item.SinceContact} ${item.Uptime} - + this._cordon(item.ID)} style="${item.Unschedulable ? 'opacity: 0.3; pointer-events: none;' : ''}" > @@ -121,6 +131,26 @@ customElements.define('cluster-machines', class ClusterMachines extends LitEleme + ${!item.Restarting ? html` + this._restart(item.ID)} style="${!item.Unschedulable ? 'opacity: 0.3; pointer-events: none;' : ''}"> + + + + + + ` : html` + this._abortRestart(item.ID)}> + + + + + + `} + + ${item.Restarting + ? html`restarting (since ${item.RestartRequest})` + : '' + } ${!item.Unschedulable ? html`enabled` : html` From d8396504bc71b29403ad03fd4ed13b19ea457744 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 12:55:59 +0200 Subject: [PATCH 09/12] make gen --- harmony/harmonytask/metrics.go | 5 +++-- tasks/proofshare/task_client_poll.go | 2 +- tasks/seal/task_finalize.go | 2 +- web/api/webrpc/cluster.go | 20 ++++++++++---------- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/harmony/harmonytask/metrics.go b/harmony/harmonytask/metrics.go index 656c5bfca..45de3c3cb 100644 --- a/harmony/harmonytask/metrics.go +++ b/harmony/harmonytask/metrics.go @@ -4,11 +4,12 @@ import ( "context" "time" - curiobuild "github.com/filecoin-project/curio/build" promclient "github.com/prometheus/client_golang/prometheus" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + + curiobuild "github.com/filecoin-project/curio/build" ) var ( @@ -23,7 +24,7 @@ var ( // TaskMeasures groups all harmonytask metrics. var TaskMeasures = struct { - Uptime *stats.Int64Measure + Uptime *stats.Int64Measure TasksStarted *stats.Int64Measure TasksCompleted *stats.Int64Measure TasksFailed *stats.Int64Measure diff --git a/tasks/proofshare/task_client_poll.go b/tasks/proofshare/task_client_poll.go index 6eff3c417..220ff0bcb 100644 --- a/tasks/proofshare/task_client_poll.go +++ b/tasks/proofshare/task_client_poll.go @@ -197,7 +197,7 @@ func (t *TaskClientPoll) Do(taskID harmonytask.TaskID, stillOwned func() bool) ( // TypeDetails implements harmonytask.TaskInterface. func (t *TaskClientPoll) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ - Name: "PSClientPoll", + Name: "PSClientPoll", CanYield: true, Cost: resources.Resources{ Cpu: 0, diff --git a/tasks/seal/task_finalize.go b/tasks/seal/task_finalize.go index aeb512243..7691d5cee 100644 --- a/tasks/seal/task_finalize.go +++ b/tasks/seal/task_finalize.go @@ -237,7 +237,7 @@ func (f *FinalizeTask) TypeDetails() harmonytask.TaskTypeDetails { // Allow finalize to be scheduled when batch tasks are still running even when the node is not schedulable. // This allows finalize to unblock move-storage and PoRep for multiple hours while the node is technically not schedulable, - // but is still finishing another batch. In most cases this behavior enables nearly zero-waste restarts of supraseal nodes. + // but is still finishing another batch. In most cases this behavior enables nearly zero-waste restarts of supraseal nodes. SchedulingOverrides: batchTaskNameGrid(), } } diff --git a/web/api/webrpc/cluster.go b/web/api/webrpc/cluster.go index f5de09e5b..a056513d1 100644 --- a/web/api/webrpc/cluster.go +++ b/web/api/webrpc/cluster.go @@ -17,16 +17,16 @@ type MachineSummary struct { Name string SinceContact string - Tasks string - Cpu int - RamHumanized string - Gpu int - Layers string - Uptime string - Unschedulable bool + Tasks string + Cpu int + RamHumanized string + Gpu int + Layers string + Uptime string + Unschedulable bool RestartRequest string - Restarting bool - RunningTasks int + Restarting bool + RunningTasks int } func (a *WebRPC) ClusterMachines(ctx context.Context) ([]MachineSummary, error) { @@ -434,4 +434,4 @@ func (a *WebRPC) AbortRestart(ctx context.Context, id int64) error { return xerrors.Errorf("abort restart failed: %w", err) } return nil -} \ No newline at end of file +} From 6b1cf17201514bbbafa7a50bb63782e81f76f9c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 14:17:37 +0200 Subject: [PATCH 10/12] jrpc reject on conn fail --- web/static/lib/jsonrpc.mjs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/web/static/lib/jsonrpc.mjs b/web/static/lib/jsonrpc.mjs index 1aff5eaa2..118af569a 100644 --- a/web/static/lib/jsonrpc.mjs +++ b/web/static/lib/jsonrpc.mjs @@ -7,7 +7,11 @@ class JsonRpcClient { const client = new JsonRpcClient('/api/webrpc/v0'); await client.connect(); return client; - })(); + })().catch((err) => { + // Reset cached instance so future calls can retry cleanly + JsonRpcClient.instance = null; + throw err; + }); } return await JsonRpcClient.instance; } @@ -34,17 +38,17 @@ class JsonRpcClient { this._shouldReconnect = true; - this._connectPromise = new Promise((resolve) => { + this._connectPromise = new Promise((resolve, reject) => { + let hasOpened = false; + const attempt = () => { this.ws = new WebSocket(this.url); this.ws.onopen = () => { + hasOpened = true; console.log("Connected to the server"); - // Reset backoff on successful connect this._clearReconnectTimer(); - // Resolve initial connect promise (subsequent reconnects are transparent) if (this._connectPromise) { - // Resolve only once resolve(); this._connectPromise = null; } @@ -52,8 +56,17 @@ class JsonRpcClient { this.ws.onclose = () => { console.log("Connection closed, attempting to reconnect..."); - // Reject all in-flight RPC calls this._rejectAllPending(new Error('WebSocket disconnected')); + if (!hasOpened) { + // Initial connection attempt failed: propagate error and stop reconnecting here + this._shouldReconnect = false; + this._clearReconnectTimer(); + if (this._connectPromise) { + reject(new Error('WebSocket initial connection failed')); + this._connectPromise = null; + } + return; + } if (this._shouldReconnect) { this._scheduleReconnect(attempt); } @@ -61,7 +74,6 @@ class JsonRpcClient { this.ws.onerror = (error) => { console.error("WebSocket error:", error); - // Force close to unify handling in onclose try { this.ws.close(); } catch (_) {} }; From 8cef7ca263e5ab070b4a54e38dce76d3ac579883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 14:31:40 +0200 Subject: [PATCH 11/12] missing schema file --- harmony/harmonydb/sql/20250818-restart-request.sql | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 harmony/harmonydb/sql/20250818-restart-request.sql diff --git a/harmony/harmonydb/sql/20250818-restart-request.sql b/harmony/harmonydb/sql/20250818-restart-request.sql new file mode 100644 index 000000000..743868cb7 --- /dev/null +++ b/harmony/harmonydb/sql/20250818-restart-request.sql @@ -0,0 +1,2 @@ +ALTER TABLE harmony_machines +ADD COLUMN restart_request TIMESTAMP WITH TIME ZONE; From 4a99f0544c767b242a7ad6f7b95abc345a5c77e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Aug 2025 16:15:09 +0200 Subject: [PATCH 12/12] rm snake case --- web/static/cluster-machines.mjs | 20 ++++++------ web/static/lib/jsonrpc.mjs | 54 ++++++++++++++++----------------- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/web/static/cluster-machines.mjs b/web/static/cluster-machines.mjs index 976e3e529..d2ff9ddfc 100644 --- a/web/static/cluster-machines.mjs +++ b/web/static/cluster-machines.mjs @@ -20,26 +20,26 @@ customElements.define('cluster-machines', class ClusterMachines extends LitEleme this.requestUpdate(); } - _toggleDetailed(e) { + toggleDetailed(e) { this.detailed = e.target.checked; } - async _cordon(id) { + async cordon(id) { await RPCCall('Cordon', [id]); this.loadData(); } - async _uncordon(id) { + async uncordon(id) { await RPCCall('Uncordon', [id]); this.loadData(); } - async _restart(id) { + async restart(id) { await RPCCall('Restart', [id]); this.loadData(); } - async _abortRestart(id) { + async abortRestart(id) { await RPCCall('AbortRestart', [id]); this.loadData(); } @@ -64,7 +64,7 @@ customElements.define('cluster-machines', class ClusterMachines extends LitEleme type="checkbox" id="detailedCheckbox" .checked=${this.detailed} - @change=${this._toggleDetailed} + @change=${this.toggleDetailed} />