Skip to content

sketch proposal / brainstorming for the Domainslib API #92

@gasche

Description

@gasche

In #77 we discussed the API of domainslib, and the fact that one could consider several changes, small or large. On my commute back today I tried to "brainstorm" on what another API for domainslib could look like -- same capabilities as today, but with a clearer separation of concern between the various pieces of the implementation. The result is included below for discussion.

module Workset : sig
  (* This lower-level module represents a "job pool" where jobs can be
     added or retrieved.  It is used internally in the implementation
     of domain pools, and also in the Task runner
     (asynchronous actions are sent to the workset, and tasks are
     retrieved from the workset while waiting on a promise). This
     intermediate abstraction gives expressivity for more complex
     scheduling scenarios, for example it is possible to have two
     domain pools share their workset to "union" them. *)

  type job = (unit -> unit)

  type t (* = job Multi_channel.t *)
  (** A workset is a multi-producer multi-consumer unordered queue
      of jobs to be performed. *)

  val make : unit -> t

  val send : t -> job -> unit
  val recv : t -> job
  val recv_poll : t -> job option (** non-blocking *)
end
module Pool : sig
  (** A pool of worker domains that are dedicated to performing jobs
     received from a workset. It is only necessary to deal with Pool
     for semi-advanced Task scheduling policies, for example if you
     want to share the same domain pool between many short-running
     tasks. *)

  type t
  (** Type of domain pool. *)
  
  val spawn : num_worker_domains:int -> workset:Workset.t -> pool
  (** Sets up a task execution pool with [num_worker_domains] worker domains,
      that immediately start pulling jobs from the given [worker] and
      performing them, continuing until [teardown] is called.
  
      When [num_worker_domains] is 0, the new pool will have no domains
      and not perform any work.
  
      Raises {!Invalid_argument} when [num_worker_domains] is less than 0. *)
  
  val get_num_worker_domains : t -> int
  val get_workset : t -> Workset.t

  val set_workset : t -> Workset.t -> t
  
  val teardown : t -> unit
  (** Tears down the task execution pool. Each worker will finish
      ongoing job, but jobs still in the workset after that will
      remain there. *)
end
module Task : sig
  (** A "task" abstraction for determining sub-computations to be
      performed in parallel.

      Tasks are effectful programs that use the effectful operations
      provided by the Task module ([fork_join], [parallel_for],
      [async], [await], etc. They must run in the dynamic scope of
      a handler, one of the [run_*] functions.  *)

  type 'a task = unit -> 'a
  (** Type of task *)

  (** {2. Handlers} *)

  val run : ?num_worker_domains:int -> 'a task -> 'a
  (** [run t] runs the task [t] synchronously, using a temporary pool of worker domains
      to offload asynchronous tasks in parallel.

      By default the number of worker domains created is
        [Domain.recommended_domain_count () - 1].
  *)

  val run_with_workset : Workset.t -> 'a task -> 'a
  (** [run_with_workset ws t] is an advanced version of [run t] that
      does not spawn a temporary domain pool, but instead parks some
      asynchronous tasks in the workset [ws] and performs tasks from
      [ws] during idle times. This gives maximum flexibility to offload
      computation. Note that the calling domain may run jobs from the
      workset that are unrelated to the task.

      This advanced handler is especially useful in the advanced
      use-case of nesting several task handlers. For example, the main
      handler may be invoked with [run], but locally some sub-part of
      the computation runs under a nested [run_with_workset] invocation
      to be offloaded to a different workset feeding a dedicated domain
      pool.
  *)

  (** {2. Effectful operations -- must run in the dynamic scope of a handler.} *)

  val fork_join : 'a task -> 'b task -> 'a * 'b

  val fork_join_u : unit task -> unit task -> unit task

  val parallel_for : ?chunk_size:int -> start:int -> finish:int ->
                     (int -> unit) -> unit

  val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int ->
                            ('a -> 'a -> 'a) -> 'a -> (int -> 'a) -> 'a

  val parallel_scan : ('a -> 'a -> 'a) -> 'a array -> 'a array

  val parallel_find : ?chunk_size:int -> start:int -> finish:int ->
                      (int -> 'a option) -> 'a option

  type !'a promise
  (** Type of promises *)

  val async : 'a task -> 'a promise
  (** [async t] runs the task [t] asynchronously. The function returns
      a promise [r] in which the result of the task [t] will be
      stored. *)

  val await : 'a promise -> 'a
  (** [await r] waits for the promise [r] to be resolved. Other asynchronous tasks
      may be performed in the meantime. If the task associated with the promise have
      completed successfully, then the result of the task will be returned. If the
      task have raised an exception, then [await] raises the same exception. *)
end

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions