Skip to content

Speculatively assign tasks to workers #3974

@mrocklin

Description

@mrocklin

Motivation

Currently when a task becomes ready to run the scheduler finds the best worker for that task distributes that task to that worker to be enqueued. The worker then handles whatever communication is necessary in order to collect the dependencies for that task and once it's ready it puts it in a queue to be run in a local ThreadPoolExecutor. When it finishes the worker informs the scheduler and goes on to its next task. When the scheduler receives news that the task has finished it marks all of its dependents, each of which go through this process again.

This is occasionally sub-optimal. Consider the following graph:

A1 -> B1
A2 -> B2

If we have one worker then both of the A's will be sent to that worker. The worker will finish A1, send the report to the scheduler that it is finished, and begin work on A2. It will then get information from the scheduler that B1 is ready to compute and it will work on that next. As a result, the resulting order of execution looks like the following:

A1, A2, B1, B2

When really we would have preferred ...

A1, B1, A2, B2

Today we often avoid this situation by doing task fusion on the client side, so that we really only have two tasks

AB1
AB2

If we remove low-level task fusion (which we may want to do for performance reasons) then it would be nice to capture this same A, B, A, B behavior some other way

Send tasks to workers before dependencies are set

One way to capture this same behavior is to send more tasks down to the worker, even before they are ready to run. Today we only send a task to a worker once we have high confidence that that is where it should run, which typically we only know after we understand the data sizes of all of its inputs. We only know this once its dependencies are done, so we only send tasks to workers once all of their dependencies are complete.

However, we could send not-yet-ready-to-run tasks to a worker with high confidence if all of that task's dependencies are also present or running on that worker. This happens to fully consume the low-level task fusion case. If A1 is running on a worker then we believe with high probability that B1 will also run on that same worker.

Changes

This would be a significant change to the worker. We would need to add states to track dependencies, much like how we do in the dask.local scheduler, or a very stripped down version of the dask.distributed scheduler.

This will also require managing interactions with other parts of Dask like ...

  1. Work stealing / load balancing: What happens when to a dependent task when its dependency is taken? (my guess is that it probably goes back to the scheduler and we let the scheduler sort it out)
  2. Resources / restrictions: We should make sure to only speculatively schedule a dependent on a worker if that worker is valid for that worker
  3. Exceptions / failures: Probably we just kick the task back up to the scheduler if anything bad happens.

Learning

I think that this task is probably a good learning task for someone who has some moderate exposure to the distributed scheduler and wants to level up a bit. Adding dependencies to the worker will, I think, force someone to think a lot about the systems that help make Dask run while mostly implementing minor versions of them. cc @quasiben @jacobtomlinson @madsbk

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions