@@ -3,6 +3,7 @@ package harmonytask
33import (
44 "context"
55 "fmt"
6+ "os"
67 "strconv"
78 "sync/atomic"
89 "time"
@@ -23,6 +24,8 @@ var POLL_NEXT_DURATION = 100 * time.Millisecond // After scheduling a task, wait
2324var CLEANUP_FREQUENCY = 5 * time .Minute // Check for dead workers this often * everyone
2425var FOLLOW_FREQUENCY = 1 * time .Minute // Check for work to follow this often
2526
27+ var ExitStatusRestartRequest = 100
28+
2629type TaskTypeDetails struct {
2730 // Max returns how many tasks this machine can run of this type.
2831 // Nil (default)/Zero or less means unrestricted.
@@ -57,6 +60,16 @@ type TaskTypeDetails struct {
5760 // CanAccept() can read taskEngine's WorkOrigin string to learn about a task.
5861 // Ex: make new CC sectors, clean-up, or retrying pipelines that failed in later states.
5962 IAmBored func (AddTaskFunc ) error
63+
64+ // CanYield is true if the task should yield when the node is not schedulable.
65+ // This is implied for background tasks.
66+ CanYield bool
67+
68+ // SchedOverrides is a map of task names which, when running while the node is not schedulable,
69+ // allow this task to continue being scheduled. This is useful in pipelines where a long-running
70+ // task would block a short-running task from being scheduled, blocking other related pipelines on
71+ // other machines.
72+ SchedulingOverrides map [string ]bool
6073}
6174
6275// TaskInterface must be implemented in order to have a task used by harmonytask.
@@ -126,6 +139,9 @@ type TaskEngine struct {
126139 follows map [string ][]followStruct
127140 hostAndPort string
128141
142+ // runtime flags
143+ yieldBackground atomic.Bool
144+
129145 // synchronous to the single-threaded poller
130146 lastFollowTime time.Time
131147 lastCleanup atomic.Value
@@ -283,20 +299,24 @@ func (e *TaskEngine) poller() {
283299 nextWait = POLL_DURATION
284300
285301 // Check if the machine is schedulable
286- schedulable , err := e .schedulable ()
302+ schedulable , err := e .checkNodeFlags ()
287303 if err != nil {
288304 log .Error ("Unable to check schedulable status: " , err )
289305 continue
290306 }
307+
308+ e .yieldBackground .Store (! schedulable )
309+
310+ accepted := e .pollerTryAllWork (schedulable )
311+ if accepted {
312+ nextWait = POLL_NEXT_DURATION
313+ }
314+
291315 if ! schedulable {
292316 log .Debugf ("Machine %s is not schedulable. Please check the cordon status." , e .hostAndPort )
293317 continue
294318 }
295319
296- accepted := e .pollerTryAllWork ()
297- if accepted {
298- nextWait = POLL_NEXT_DURATION
299- }
300320 if time .Since (e .lastFollowTime ) > FOLLOW_FREQUENCY {
301321 e .followWorkInDB ()
302322 }
@@ -361,12 +381,40 @@ func (e *TaskEngine) followWorkInDB() {
361381}
362382
363383// pollerTryAllWork starts the next 1 task
364- func (e * TaskEngine ) pollerTryAllWork () bool {
384+ func (e * TaskEngine ) pollerTryAllWork (schedulable bool ) bool {
365385 if time .Since (e .lastCleanup .Load ().(time.Time )) > CLEANUP_FREQUENCY {
366386 e .lastCleanup .Store (time .Now ())
367387 resources .CleanupMachines (e .ctx , e .db )
368388 }
369389 for _ , v := range e .handlers {
390+ if ! schedulable {
391+ if v .TaskTypeDetails .SchedulingOverrides == nil {
392+ continue
393+ }
394+
395+ // Override the schedulable flag if the task has any assigned overrides
396+ var foundOverride bool
397+ for relatedTaskName := range v .TaskTypeDetails .SchedulingOverrides {
398+ var assignedOverrideTasks []int
399+ err := e .db .Select (e .ctx , & assignedOverrideTasks , `SELECT id
400+ FROM harmony_task
401+ WHERE owner_id = $1 AND name=$2
402+ ORDER BY update_time LIMIT 1` , e .ownerID , relatedTaskName )
403+ if err != nil {
404+ log .Error ("Unable to read assigned overrides " , err )
405+ break
406+ }
407+ if len (assignedOverrideTasks ) > 0 {
408+ log .Infow ("found override, scheduling despite schedulable=false flag" , "ownerID" , e .ownerID , "relatedTaskName" , relatedTaskName , "assignedOverrideTasks" , assignedOverrideTasks )
409+ foundOverride = true
410+ break
411+ }
412+ }
413+ if ! foundOverride {
414+ continue
415+ }
416+ }
417+
370418 if err := v .AssertMachineHasCapacity (); err != nil {
371419 log .Debugf ("skipped scheduling %s type tasks on due to %s" , v .Name , err .Error ())
372420 continue
@@ -407,6 +455,11 @@ func (e *TaskEngine) pollerTryAllWork() bool {
407455 log .Warn ("Work not accepted for " + strconv .Itoa (len (unownedTasks )) + " " + v .Name + " task(s)" )
408456 }
409457 }
458+
459+ if ! schedulable {
460+ return false
461+ }
462+
410463 // if no work was accepted, are we bored? Then find work in priority order.
411464 for _ , v := range e .handlers {
412465 v := v
@@ -462,15 +515,43 @@ func (e *TaskEngine) Host() string {
462515 return e .hostAndPort
463516}
464517
465- func (e * TaskEngine ) schedulable () (bool , error ) {
518+ func (e * TaskEngine ) checkNodeFlags () (bool , error ) {
466519 var unschedulable bool
467- err := e .db .QueryRow (e .ctx , `SELECT unschedulable FROM harmony_machines WHERE host_and_port=$1` , e .hostAndPort ).Scan (& unschedulable )
520+ var restartRequest * time.Time
521+ err := e .db .QueryRow (e .ctx , `SELECT unschedulable, restart_request FROM harmony_machines WHERE host_and_port=$1` , e .hostAndPort ).Scan (& unschedulable , & restartRequest )
468522 if err != nil {
469523 return false , err
470524 }
525+
526+ if restartRequest != nil {
527+ e .restartIfNoTasksPending (* restartRequest )
528+ }
529+
471530 return ! unschedulable , nil
472531}
473532
533+ func (e * TaskEngine ) restartIfNoTasksPending (pendingSince time.Time ) {
534+ var tasksPending int
535+ err := e .db .QueryRow (e .ctx , `SELECT COUNT(*) FROM harmony_task WHERE owner_id=$1` , e .ownerID ).Scan (& tasksPending )
536+ if err != nil {
537+ log .Error ("Unable to check for tasks pending: " , err )
538+ return
539+ }
540+ if tasksPending == 0 {
541+ log .Infow ("no tasks pending, restarting" , "ownerID" , e .ownerID , "pendingSince" , pendingSince , "took" , time .Since (pendingSince ))
542+
543+ // unset the flags first
544+ _ , err = e .db .Exec (e .ctx , `UPDATE harmony_machines SET restart_request=NULL, unschedulable=FALSE WHERE host_and_port=$1` , e .hostAndPort )
545+ if err != nil {
546+ log .Error ("Unable to unset restart request: " , err )
547+ return
548+ }
549+
550+ // then exit
551+ os .Exit (ExitStatusRestartRequest )
552+ }
553+ }
554+
474555// About the Registry
475556// This registry exists for the benefit of "static methods" of TaskInterface extensions.
476557// For example, GetSPID(db, taskID) (int, err) is a static method that can be called
0 commit comments