Skip to content

Integrate Celery Tasks better with Long Callbacks #2124

Open
@achimgaedke

Description

@achimgaedke

Motivation

As dash introduced me to Celery, I am using it to organise compute-intensive calculations in the back of a dashboard. So, I'd love to create some groups / chains / chords... use a library of tasks, which I can use inside a calllback and maintain independently of dash as pure celery tasks.

My first approach was to use AsyncResult.get() and started using the "anti-patterns" described in https://docs.celeryq.dev/en/stable/userguide/tasks.html#avoid-launching-synchronous-subtasks - including disable_sync_subtasks=False in order to get the task result value itself back to dash.

The dash community is wondering about this for a while as well, see https://community.plotly.com/t/triggering-callback-from-within-python/23321

Proposed change

After poking around for a while, I'd like to suggest amend the CeleryLongCallbackManager's task creation function _make_job_fn (code as of release 2.5.1, seems to be in the middle of a refactor on the develop branch)- file https://github.com/plotly/dash/blob/v2.5.1/dash/long_callback/managers/celery_manager.py:

def _make_job_fn_async(fn, celery_app, progress, args_deps):
    cache = celery_app.backend

    # Hash function source and module to create a unique (but stable) celery task name
    fn_source = inspect.getsource(fn)
    fn_str = fn_source
    fn_hash = hashlib.sha1(fn_str.encode("utf-8")).hexdigest()

    @celery_app.task(name=f"long_callback_output_{fn_hash}")
    def job_result_fn(user_callback_output, result_key):
        cache.set(result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder))

    @celery_app.task(name=f"long_callback_{fn_hash}")
    def job_fn(result_key, progress_key, user_callback_args, fn=fn):
        def _set_progress(progress_value):
            cache.set(progress_key, json.dumps(progress_value, cls=PlotlyJSONEncoder))

        maybe_progress = [_set_progress] if progress else []
        if isinstance(args_deps, dict):
            user_callback_output = fn(*maybe_progress, **user_callback_args)
        elif isinstance(args_deps, (list, tuple)):
            user_callback_output = fn(*maybe_progress, *user_callback_args)
        else:
            user_callback_output = fn(*maybe_progress, user_callback_args)

        # Added cases for celery Task and Signature
        # set the result value with a task added (linked/chained)
        if isinstance(user_callback_output, Task):
            user_callback_output.apply_async(link=job_result_fn.s(result_key))
        elif isinstance(user_callback_output, Signature):
            (user_callback_output | job_result_fn.s(result_key))()
        # Otherwise do everything within this callback as before
        else:
            cache.set(result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder))

    return job_fn

The main additions are:

  • the task function job_result_fn and
  • the isinstance(user_callback_output, XXX) conditions near the bottom, chaining the job_result_fn task to the user defined tasks/signature.

If the user callback returns a Celery Task or Signature, the result will be written to the cache by a separate task executed when the user-defined tasks are done. The callback function will return immediately (instead of being blocked by a busy wait) once those tasks are created and scheduled. The task job_result_fn will eventually provide the result to dash.

This change will allow the long callback to work as before, but celery power users are able to leverage the task and workflow canvas system.

Example Project

I've uploaded an example project at https://github.com/achimgaedke/dash-celery-integration:

The class CeleryManagerTasksin the module celery_manager_tasks can easily be used as a drop-in replacement for the CeleryLongCallbackManager.

The file celery_integration.py is a working example, utilising a chord with 100 embarassing_parallel tasks, being merged by the collect task - simply counting the number of workers used identified by their process ids.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions