diff --git a/dune-project b/dune-project index 5ad3b3a0ff3..7e580ae969d 100644 --- a/dune-project +++ b/dune-project @@ -350,6 +350,7 @@ astring base-threads base64 + bechamel (bos :with-test) cdrom (clock (= :version)) diff --git a/ocaml/libs/http-lib/dune b/ocaml/libs/http-lib/dune index 184fd5b2869..c30eac9699a 100644 --- a/ocaml/libs/http-lib/dune +++ b/ocaml/libs/http-lib/dune @@ -59,6 +59,7 @@ tracing_propagator uri xapi-backtrace + xapi-consts xapi-log xapi-stdext-pervasives xapi-stdext-threads diff --git a/ocaml/libs/http-lib/http.ml b/ocaml/libs/http-lib/http.ml index c979e1f7d98..b9c6166763a 100644 --- a/ocaml/libs/http-lib/http.ml +++ b/ocaml/libs/http-lib/http.ml @@ -676,14 +676,6 @@ module Request = struct let headers, body = to_headers_and_body x in let frame_header = if x.frame then make_frame_header headers else "" in frame_header ^ headers ^ body - - let with_originator_of req f = - Option.iter - (fun req -> - let originator = List.assoc_opt Hdr.originator req.additional_headers in - f originator - ) - req end module Response = struct diff --git a/ocaml/libs/http-lib/http.mli b/ocaml/libs/http-lib/http.mli index e0c972586c7..fe799b0b16c 100644 --- a/ocaml/libs/http-lib/http.mli +++ b/ocaml/libs/http-lib/http.mli @@ -129,8 +129,6 @@ module Request : sig val to_wire_string : t -> string (** [to_wire_string t] returns a string which could be sent to a server *) - - val with_originator_of : t option -> (string option -> unit) -> unit end (** Parsed form of the HTTP response *) @@ -229,6 +227,8 @@ module Hdr : sig val hsts : string (** Header used for HTTP Strict Transport Security *) + + val originator : string end val output_http : Unix.file_descr -> string list -> unit diff --git a/ocaml/libs/http-lib/http_svr.ml b/ocaml/libs/http-lib/http_svr.ml index 4db3df81d2a..37bb7b1a5ee 100644 --- a/ocaml/libs/http-lib/http_svr.ml +++ b/ocaml/libs/http-lib/http_svr.ml @@ -575,12 +575,31 @@ let handle_connection ~header_read_timeout ~header_total_timeout ~max_length:max_header_length ss in - Http.Request.with_originator_of req Tgroup.of_req_originator ; - (* 2. now we attempt to process the request *) let finished = Option.fold ~none:true - ~some:(handle_one x ss x.Server.default_context) + ~some:(fun req -> + if !Constants.tgroups_enabled then ( + let open Xapi_stdext_threads.Threadext in + let originator = + List.assoc_opt Http.Hdr.originator + req.Http.Request.additional_headers + in + originator + |> Tgroup.of_req_originator + |> Option.iter (fun tgroup -> + ThreadRuntimeContext.get () + |> ThreadRuntimeContext.update (fun thread_ctx -> + {thread_ctx with tgroup} + ) + ) ; + + let thread_ctx = ThreadRuntimeContext.get () in + Tgroup.with_one_thread_of_group thread_ctx.tgroup @@ fun () -> + handle_one x ss x.Server.default_context req + ) else + handle_one x ss x.Server.default_context req + ) req in (* 3. do it again if the connection is kept open, but without timeouts *) diff --git a/ocaml/libs/tgroup/dune b/ocaml/libs/tgroup/dune index cff00ee1157..5b45a1bd047 100644 --- a/ocaml/libs/tgroup/dune +++ b/ocaml/libs/tgroup/dune @@ -2,7 +2,12 @@ (name tgroup) (modules tgroup) (public_name tgroup) - (libraries xapi-log xapi-stdext-unix xapi-stdext-std)) + (libraries + xapi-log + xapi-stdext-pervasives + xapi-stdext-unix + xapi-stdext-std) +) (test (name test_tgroup) diff --git a/ocaml/libs/tgroup/test_tgroup.ml b/ocaml/libs/tgroup/test_tgroup.ml index 7623a0c01ee..e5f69ed3ef5 100644 --- a/ocaml/libs/tgroup/test_tgroup.ml +++ b/ocaml/libs/tgroup/test_tgroup.ml @@ -20,7 +20,7 @@ let test_identity () = let test_make ((user_agent, subject_sid), expected_identity) = let actual_identity = - Tgroup.Group.Identity.(make ?user_agent subject_sid |> to_string) + Tgroup.Description.Identity.(make ?user_agent subject_sid |> to_string) in Alcotest.(check string) "Check expected identity" expected_identity actual_identity @@ -29,28 +29,28 @@ let test_identity () = let test_of_creator () = let dummy_identity = - Tgroup.Group.Identity.make ~user_agent:"XenCenter2024" "root" + Tgroup.Description.Identity.make ~user_agent:"XenCenter2024" "root" in let specs = [ ((None, None, None, None), "external/unauthenticated") ; ((Some true, None, None, None), "external/intrapool") ; ( ( Some true - , Some Tgroup.Group.Endpoint.External + , Some Tgroup.Description.Endpoint.External , Some dummy_identity , Some "sm" ) , "external/intrapool" ) ; ( ( Some true - , Some Tgroup.Group.Endpoint.Internal + , Some Tgroup.Description.Endpoint.Internal , Some dummy_identity , Some "sm" ) , "external/intrapool" ) ; ( ( None - , Some Tgroup.Group.Endpoint.Internal + , Some Tgroup.Description.Endpoint.Internal , Some dummy_identity , Some "cli" ) @@ -62,9 +62,11 @@ let test_of_creator () = ] in let test_make ((intrapool, endpoint, identity, originator), expected_group) = - let originator = Option.map Tgroup.Group.Originator.of_string originator in + let originator = + Option.map Tgroup.Description.Originator.of_string originator + in let actual_group = - Tgroup.Group.( + Tgroup.Description.( Creator.make ?intrapool ?endpoint ?identity ?originator () |> of_creator |> to_string diff --git a/ocaml/libs/tgroup/tgroup.ml b/ocaml/libs/tgroup/tgroup.ml index 071a9dfe0d2..75df3298655 100644 --- a/ocaml/libs/tgroup/tgroup.ml +++ b/ocaml/libs/tgroup/tgroup.ml @@ -18,7 +18,7 @@ open D let ( // ) = Filename.concat -module Group = struct +module Description = struct module Internal = struct type t @@ -99,24 +99,42 @@ module Group = struct let root_identity = make "root" end - type _ group = - | Internal_SM : (Internal.t * SM.t) group - | Internal_CLI : (Internal.t * CLI.t) group - | External_Intrapool : (External.t * External.Intrapool.t) group + type _ group_description = + | Internal_SM : (Internal.t * SM.t) group_description + | Internal_CLI : (Internal.t * CLI.t) group_description + | External_Intrapool : (External.t * External.Intrapool.t) group_description | External_Authenticated : Identity.t - -> (External.t * External.Authenticated.t) group - | External_Unauthenticated : (External.t * External.Unauthenticated.t) group - - type t = Group : 'a group -> t + -> (External.t * External.Authenticated.t) group_description + | External_Unauthenticated + : (External.t * External.Unauthenticated.t) group_description + + type t = Description : 'a group_description -> t + + (* Ideally this number would be percentages but not all of the groups have at + least a thread running at any moment of time. This numbers just mean that + the groups External_Authenticated, External_Intrapool, Internal_SM have a + total amount of time allocated in a timeslice 100/10 times more + than Internal_CLI and External_Unauthenticated do. *) + let get_share = function + | Description Internal_SM -> + 100 + | Description Internal_CLI -> + 10 + | Description External_Intrapool -> + 100 + | Description External_Unauthenticated -> + 10 + | Description (External_Authenticated _) -> + 100 let all = [ - Group Internal_SM - ; Group Internal_CLI - ; Group External_Intrapool - ; Group (External_Authenticated Identity.root_identity) - ; Group External_Unauthenticated + Description Internal_SM + ; Description Internal_CLI + ; Description External_Intrapool + ; Description (External_Authenticated Identity.root_identity) + ; Description External_Unauthenticated ] module Endpoint = struct type t = Internal | External end @@ -209,9 +227,9 @@ module Group = struct end let get_originator = function - | Group Internal_SM -> + | Description Internal_SM -> Originator.Internal_SM - | Group Internal_CLI -> + | Description Internal_CLI -> Originator.Internal_CLI | _ -> Originator.External @@ -224,21 +242,21 @@ module Group = struct ) with | _, _, Intrapool -> - Group External_Intrapool + Description External_Intrapool | Endpoint.Internal, Internal_SM, _ -> - Group Internal_SM + Description Internal_SM | Endpoint.Internal, Internal_CLI, _ -> - Group Internal_CLI + Description Internal_CLI | Endpoint.External, Internal_CLI, Authenticated identity | Endpoint.External, Internal_SM, Authenticated identity | _, External, Authenticated identity -> - Group (External_Authenticated identity) + Description (External_Authenticated identity) | Endpoint.External, Internal_CLI, Unautheticated | Endpoint.External, Internal_SM, Unautheticated | _, External, Unautheticated -> - Group External_Unauthenticated + Description External_Unauthenticated - let to_cgroup : type a. a group -> string = function + let to_cgroup : type a. a group_description -> string = function | Internal_SM -> Internal.name // SM.name | Internal_CLI -> @@ -252,12 +270,13 @@ module Group = struct | External_Unauthenticated -> External.name // External.Unauthenticated.name - let to_string g = match g with Group group -> to_cgroup group + let to_string g = + match g with Description group_description -> to_cgroup group_description let authenticated_root = of_creator (Creator.make ~identity:Identity.root_identity ()) - let unauthenticated = Group External_Unauthenticated + let unauthenticated = Description External_Unauthenticated end module Cgroup = struct @@ -267,9 +286,9 @@ module Cgroup = struct let dir_of group : t option = match group with - | Group.Group group -> + | Description.Description group -> Option.map - (fun dir -> dir // Group.to_cgroup group) + (fun dir -> dir // Description.to_cgroup group) (Atomic.get cgroup_dir) let with_dir dir f arg = @@ -302,31 +321,102 @@ module Cgroup = struct ) (dir_of group) - let set_cur_cgroup ~creator = attach_task (Group.of_creator creator) + let set_cur_cgroup ~creator = attach_task (Description.of_creator creator) let set_cgroup creator = set_cur_cgroup ~creator - let init dir = + let init dir groups = let () = Atomic.set cgroup_dir (Some dir) in - Group.all + groups |> List.filter_map dir_of |> List.iter (fun dir -> with_dir dir debug "created cgroup for: %s" dir) ; - set_cur_cgroup ~creator:Group.Creator.default_creator + set_cur_cgroup ~creator:Description.Creator.default_creator end -let of_req_originator originator = - Option.iter - (fun _ -> - try - originator - |> Option.iter (fun originator -> - let originator = Group.Originator.of_string originator in - Group.Creator.make ~endpoint:Group.Endpoint.Internal ~originator - () - |> Cgroup.set_cgroup - ) - with _ -> () - ) - (Atomic.get Cgroup.cgroup_dir) +type t = { + group_descr: Description.t + ; tgroup_name: string + ; mutable tgroup_share: int + ; thread_count: int Atomic.t + ; mutable time_ideal: int +} + +let tgroups = Hashtbl.create 10 + +let add group_descr = + { + group_descr + ; tgroup_name= group_descr |> Description.to_string + ; tgroup_share= group_descr |> Description.get_share + ; thread_count= Atomic.make 0 + ; time_ideal= 0 + } + |> Hashtbl.add tgroups group_descr + +let destroy () = Hashtbl.reset tgroups + +let group_of_description = function + | Description.Description Internal_CLI + | Description.Description External_Unauthenticated -> + Description.Description Internal_CLI |> Hashtbl.find_opt tgroups + | Description.Description (External_Authenticated _) + | Description.Description Internal_SM + | Description.Description External_Intrapool -> + Description.authenticated_root |> Hashtbl.find_opt tgroups + +let tgroups () = Hashtbl.to_seq_values tgroups |> List.of_seq + +let thread_starts_in_tgroup tg = Atomic.incr tg.thread_count + +let thread_stops_in_tgroup tg = Atomic.decr tg.thread_count + +let with_one_thread_in_tgroup tg f = + thread_starts_in_tgroup tg ; + Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> + thread_stops_in_tgroup tg + ) + +let with_one_fewer_thread_in_tgroup tg f = + (* when tgroup.thread_count < 1, then sched_global_slice will ignore this tgroup *) + if Atomic.get tg.thread_count = 0 then + () + else + thread_stops_in_tgroup tg ; + Xapi_stdext_pervasives.Pervasiveext.finally + (fun () -> f tg) + (fun () -> thread_starts_in_tgroup tg) + +let with_one_thread_of_group group f = + match group_of_description group with + | None -> + f () + | Some tg -> + with_one_thread_in_tgroup tg f + +let init dir = + Description.all |> Cgroup.init dir ; + Description.all |> List.iter add ; + Cgroup.set_cur_cgroup ~creator:Description.Creator.default_creator -let of_creator creator = creator |> Cgroup.set_cgroup +let of_req_originator originator = + let ( let* ) = Option.bind in + let* _ = Atomic.get Cgroup.cgroup_dir in + try + let* originator in + let originator = Description.Originator.of_string originator in + let creator = + Description.Creator.make ~endpoint:Description.Endpoint.Internal + ~originator () + in + let () = Cgroup.set_cgroup creator in + let group = Description.of_creator creator in + Some group + with exn -> + warn + "setting the tgroup based on http request header failed with\n\ + \ exception: %s" (Printexc.to_string exn) ; + None + +let of_creator creator = + let () = Cgroup.set_cgroup creator in + Description.of_creator creator diff --git a/ocaml/libs/tgroup/tgroup.mli b/ocaml/libs/tgroup/tgroup.mli index d89ef542ffd..0917a9356e8 100644 --- a/ocaml/libs/tgroup/tgroup.mli +++ b/ocaml/libs/tgroup/tgroup.mli @@ -12,9 +12,9 @@ * GNU Lesser General Public License for more details. *) -(** [Group] module helps with the classification of different xapi execution +(** [Description] module helps with the classification of different xapi execution threads.*) -module Group : sig +module Description : sig (** Abstract type that represents a group of execution threads in xapi. Each group corresponds to a Creator, and has a designated level of priority.*) type t @@ -87,29 +87,73 @@ module Group : sig end (** [Cgroup] module encapsulates different function for managing the cgroups - corresponding with [Groups].*) + corresponding with [Description.].*) module Cgroup : sig (** Represents one of the children of the cgroup directory.*) type t = string - val dir_of : Group.t -> t option - (** [dir_of group] returns the full path of the cgroup directory corresponding - to the group [group] as [Some dir]. + val cgroup_dir : string option Atomic.t - Returns [None] if [init dir] has not been called. *) + val dir_of : Description.t -> t option + (** [dir_of group] returns the full path of the cgroup directory corresponding + to the group [group] as [Some dir]. - val init : string -> unit - (** [init dir] initializes the hierachy of cgroups associated to all [Group.t] - types under the directory [dir].*) + Returns [None] if [init dir] has not been called. *) - val set_cgroup : Group.Creator.t -> unit + val set_cgroup : Description.Creator.t -> unit (** [set_cgroup c] sets the current xapi thread in a cgroup based on the - creator [c].*) + creator [c].*) end -val of_creator : Group.Creator.t -> unit +(** Type that represents a group of threads. Contains information about this + group. + + time_ideal is measured in nanoseconds.*) +type t = { + group_descr: Description.t + ; tgroup_name: string + ; mutable tgroup_share: int + ; thread_count: int Atomic.t + ; mutable time_ideal: int (*This represents the time in nanoseconds*) +} + +val tgroups : unit -> t list +(** [tgroups ()] return the list of groups currently set. *) + +val group_of_description : Description.t -> t option +(** [group_of_description descr] return [Some group] where [group] is the + group associated with description [descr]. + + Returns [None] if there is not such group set.*) + +val add : Description.t -> unit +(** [add descr] adds a group the list of tracked groups based on the + description [descr].*) + +val destroy : unit -> unit +(** [desctroy ()] clears the list of groups tracked. Used for testing.*) + +val with_one_thread_in_tgroup : t -> (unit -> 'a) -> 'a +(** [with_one_thread_in_tgroup tg fn] increments the count of the number of + threads inside the group [tg] for the span of [fn]. *) + +val with_one_thread_of_group : Description.t -> (unit -> 'a) -> 'a +(** [with_one_thread_of_group desc fn] increments the count of the number of + threads inside the group corresponding to [descr] for the span of [fn]. + + No operation is done if there is no such group.*) + +val with_one_fewer_thread_in_tgroup : t -> (t -> 'a) -> 'a +(** [with_one_fewer_thread_in_tgroup tg fn] decrements the count of the number + of threads inside the group [tg] for the span of [fn]. *) + +val init : string -> unit +(** [init dir] initializes the hierachy of cgroups and tgroups associated to + all [Description.t] types under the directory [dir].*) + +val of_creator : Description.Creator.t -> Description.t (** [of_creator g] classifies the current thread based based on the creator [c].*) -val of_req_originator : string option -> unit +val of_req_originator : string option -> Description.t option (** [of_req_originator o] same as [of_creator] but it classifies based on the http request header.*) diff --git a/ocaml/libs/timeslice/dune b/ocaml/libs/timeslice/dune index 94eff6b3a39..4ee21c80587 100644 --- a/ocaml/libs/timeslice/dune +++ b/ocaml/libs/timeslice/dune @@ -1,5 +1,14 @@ (library - (name xapi_timeslice) - (package xapi-idl) - (libraries threads.posix mtime mtime.clock.os xapi-log) + (name xapi_timeslice) + (package xapi-idl) + (libraries + bechamel.monotonic_clock + threads.posix + tgroup + mtime + mtime.clock.os + xapi-consts + xapi-log + xapi-stdext-unix + xapi-stdext-threads) ) diff --git a/ocaml/libs/timeslice/timeslice.ml b/ocaml/libs/timeslice/timeslice.ml index c414b321d64..b14940a121a 100644 --- a/ocaml/libs/timeslice/timeslice.ml +++ b/ocaml/libs/timeslice/timeslice.ml @@ -31,24 +31,165 @@ let[@inline always] am_i_holding_locks () = let last = Atomic.get last_lock_holder in last <> invalid_holder && last = me () -let yield_interval = Atomic.make Mtime.Span.zero +let yield_interval = Atomic.make 0 (* TODO: use bechamel.monotonic-clock instead, which has lower overhead, but not in the right place in xs-opam yet *) -let last_yield = Atomic.make (Mtime_clock.counter ()) +let last_yield = Atomic.make 0 + +let thread_last_yield = Atomic.make 0 let failures = Atomic.make 0 +let[@inline always] with_time_counter_now time_counter f args = + let now = Monotonic_clock.now () |> Int64.to_int in + Atomic.set time_counter now ; + f args + +module Runtime = struct + let epoch_count = Atomic.make 0 + + let maybe_thread_yield ~global_slice_period = + let open Xapi_stdext_threads.Threadext in + let thread_ctx = ThreadRuntimeContext.get () in + let tgroup = thread_ctx.tgroup |> Tgroup.group_of_description in + match tgroup with + | None -> + () + | Some tgroup -> + let current_epoch = Atomic.get epoch_count in + ( if current_epoch <> thread_ctx.tepoch then + (* thread remembers that it is about to run in a new epoch *) + let () = thread_ctx.time_running <- 0 in + thread_ctx.tepoch <- current_epoch + else + (* thread remembers how long it is running in the current epoch *) + let time_running_since_last_yield = + (Monotonic_clock.now () |> Int64.to_int) + - Atomic.get thread_last_yield + in + let time_running = + thread_ctx.time_running + time_running_since_last_yield + in + thread_ctx.time_running <- time_running + ) ; + + let sleep_or_yield sleep_time (tgroup : Tgroup.t) = + (*todo: do not sleep if this is the last thread in the tgroup(s) *) + if tgroup.group_descr = Tgroup.Description.authenticated_root then + with_time_counter_now thread_last_yield Thread.yield () + else + with_time_counter_now thread_last_yield Unix.sleepf sleep_time + in + let is_to_sleep_or_yield delay_s = + if delay_s > 0. then + Tgroup.with_one_fewer_thread_in_tgroup tgroup + (sleep_or_yield delay_s) + in + + (* fair scheduling decision to check if thread time_running has exceeded + tgroup-mandated ideal time per thread *) + if thread_ctx.time_running > tgroup.time_ideal then + let since_last_global_slice = + (Monotonic_clock.now () |> Int64.to_int) - Atomic.get last_yield + in + let until_next_global_slice = + if since_last_global_slice < global_slice_period then + global_slice_period - since_last_global_slice + else + 0 + in + let thread_delay_s = + (until_next_global_slice |> float_of_int) *. 1e-9 + in + is_to_sleep_or_yield thread_delay_s + + let incr_epoch ~frequency = + let epoch = epoch_count |> Atomic.get in + if epoch mod frequency = 0 then + Atomic.set epoch_count 1 + else + Atomic.incr epoch_count + + let sched_global_slice ~global_slice_period = + (*refresh the eopch counter roughly every 10s for timeslices of 10ms*) + incr_epoch ~frequency:1024 ; + + (* goal is to recalculate thread.time_ideal for each thread: *) + (* 1) fairness: each thread group get the same amount of time inside the slice *) + (* 2) control : each thread group time is then weighted by its tgroup_share *) + (* 3) delegate: later, asynchronously, each thread decides to maybe yield based + on its thread group idea of ideal time per thread *) + (* delegation via tgroups minimizes the number of synchronous global writes + here from O(threads) to O(groups) *) + let time_ideal_of_tgroups groups = + Tgroup.( + let group_share_total = + groups |> List.fold_left (fun xs x -> x.tgroup_share + xs) 0 + in + groups + |> List.iter (fun g -> + let group_share_ratio = + match group_share_total with + | 0 -> + 0. + | gst -> + (g.tgroup_share |> float_of_int) /. (gst |> float_of_int) + in + let group_time_ns = + group_share_ratio *. (global_slice_period |> float_of_int) + in + let thread_time_ideal = + match g.thread_count |> Atomic.get with + | 0 -> + 0. + | gnt -> + group_time_ns /. (gnt |> float_of_int) + in + g.time_ideal <- thread_time_ideal |> int_of_float + ) + ) + in + let tgroups_with_threads = + List.fold_left + (fun xs x -> + if x.Tgroup.thread_count |> Atomic.get > 0 then + x :: xs + else + xs + ) + [] (Tgroup.tgroups ()) + in + (* reserve cpu time only to tgroups that have threads to run at the moment *) + tgroups_with_threads |> time_ideal_of_tgroups +end + let periodic_hook (_ : Gc.Memprof.allocation) = let () = try - if not (am_i_holding_locks ()) then - let elapsed = Mtime_clock.count (Atomic.get last_yield) in - if Mtime.Span.compare elapsed (Atomic.get yield_interval) > 0 then ( - let now = Mtime_clock.counter () in - Atomic.set last_yield now ; Thread.yield () - ) + let yield_interval = Atomic.get yield_interval in + if !Constants.tgroups_enabled && !Constants.runtime_sched then + if not (am_i_holding_locks ()) then + let elapsed = + (Monotonic_clock.now () |> Int64.to_int) - Atomic.get last_yield + in + if elapsed > yield_interval then ( + let now = Monotonic_clock.now () |> Int64.to_int in + Atomic.set last_yield now ; + Atomic.set thread_last_yield now ; + Runtime.sched_global_slice ~global_slice_period:yield_interval ; + Thread.yield () + ) else + Runtime.maybe_thread_yield ~global_slice_period:yield_interval + else + () + else if not (am_i_holding_locks ()) then + let elapsed = + (Monotonic_clock.now () |> Int64.to_int) - Atomic.get last_yield + in + if elapsed > yield_interval then + with_time_counter_now last_yield Thread.yield () with _ -> (* It is not safe to raise exceptions here, it'd require changing all code to be safe to asynchronous interrupts/exceptions, see https://guillaume.munch.name/software/ocaml/memprof-limits/index.html#isolation @@ -63,10 +204,9 @@ let periodic = {null_tracker with alloc_minor= periodic_hook; alloc_major= periodic_hook} let set ?(sampling_rate = 1e-4) interval = - Atomic.set yield_interval - (Mtime.Span.of_float_ns @@ (interval *. 1e9) |> Option.get) ; + Atomic.set yield_interval (interval *. 1e9 |> int_of_float) ; Gc.Memprof.start ~sampling_rate ~callstack_size:0 periodic let clear () = Gc.Memprof.stop () ; - Atomic.set yield_interval Mtime.Span.zero + Atomic.set yield_interval 0 diff --git a/ocaml/libs/timeslice/timeslice.mli b/ocaml/libs/timeslice/timeslice.mli index 8fa54677b38..678e245dc2e 100644 --- a/ocaml/libs/timeslice/timeslice.mli +++ b/ocaml/libs/timeslice/timeslice.mli @@ -42,3 +42,9 @@ val lock_acquired : unit -> unit val lock_released : unit -> unit (** [lock_acquired ()] notifies about lock release. *) + +module Runtime : sig + val maybe_thread_yield : global_slice_period:int -> unit + + val sched_global_slice : global_slice_period:int -> unit +end diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml index c8d85d8b6c5..36f77d2ed91 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml @@ -104,7 +104,7 @@ module ThreadRuntimeContext = struct ; thread_name: string ; mutable time_running: int ; mutable tepoch: int - ; tgroup: Tgroup.Group.t + ; tgroup: Tgroup.Description.t } (*The documentation for Ambient_context_thread_local isn't really clear is @@ -117,7 +117,7 @@ module ThreadRuntimeContext = struct let ocaml_tid = Thread.self () |> Thread.id in let time_running = 0 in let tepoch = 0 in - let tgroup = Tgroup.Group.authenticated_root in + let tgroup = Tgroup.Description.authenticated_root in let tls = {thread_name; tgroup; ocaml_tid; time_running; tepoch} in let () = Ambient_context_thread_local.Thread_local.set thread_local_storage tls diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli index 7967e3fa573..89cf3df3d30 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli @@ -50,7 +50,7 @@ module ThreadRuntimeContext : sig ; thread_name: string ; mutable time_running: int ; mutable tepoch: int - ; tgroup: Tgroup.Group.t + ; tgroup: Tgroup.Description.t } val create : ?thread_name:string -> unit -> t diff --git a/ocaml/tests/bench/bench_timeslice.ml b/ocaml/tests/bench/bench_timeslice.ml new file mode 100644 index 00000000000..0eccde72091 --- /dev/null +++ b/ocaml/tests/bench/bench_timeslice.ml @@ -0,0 +1,70 @@ +open Bechamel + +let test_maybe_thread_yield () = + Sys.opaque_identity + (Xapi_timeslice.Timeslice.Runtime.maybe_thread_yield + ~global_slice_period:10_000_000 + ) + +let test_sched_global_slice () = + Sys.opaque_identity + (Xapi_timeslice.Timeslice.Runtime.sched_global_slice + ~global_slice_period:10_000_000 + ) + +let test_tgroups_on ~name f = + let allocate () = + let () = Atomic.set Tgroup.Cgroup.cgroup_dir (Some "") in + let g_cli = Some "cli" |> Tgroup.of_req_originator |> Option.get in + let () = Tgroup.add g_cli in + let tg_cli = Tgroup.group_of_description g_cli |> Option.get in + let _ = Atomic.fetch_and_add tg_cli.thread_count 10 in + let () = Tgroup.add Tgroup.Description.authenticated_root in + let tg_authenticated_root = + Tgroup.group_of_description Tgroup.Description.authenticated_root + |> Option.get + in + let _ = Atomic.fetch_and_add tg_authenticated_root.thread_count 5 in + () + in + let free = Tgroup.destroy in + Test.make_with_resource ~name ~allocate ~free Test.uniq f + +let test_with_thread_classified ~name f = + let allocate () = + let () = Atomic.set Tgroup.Cgroup.cgroup_dir (Some "") in + let g_cli = Some "cli" |> Tgroup.of_req_originator |> Option.get in + let () = Tgroup.add g_cli in + let tg_cli = Tgroup.group_of_description g_cli |> Option.get in + let _ = Atomic.fetch_and_add tg_cli.thread_count 10 in + let () = Tgroup.add Tgroup.Description.authenticated_root in + let tg_authenticated_root = + Tgroup.group_of_description Tgroup.Description.authenticated_root + |> Option.get + in + let () = Atomic.incr tg_authenticated_root.thread_count in + Xapi_stdext_threads.Threadext.ThreadRuntimeContext.( + let thread_ctx = get () in + update + (fun thread_ctx -> + {thread_ctx with tgroup= Tgroup.Description.authenticated_root} + ) + thread_ctx + ) + in + let free () = + Tgroup.destroy () ; + Xapi_stdext_threads.Threadext.ThreadRuntimeContext.remove () + in + Test.make_with_resource ~name ~allocate ~free Test.uniq f + +let benchmarks = + Test.make_grouped ~name:"timeslice" + [ + test_with_thread_classified ~name:"maybe_thread_yield" + (Staged.stage test_maybe_thread_yield) + ; test_tgroups_on ~name:"sched_global_slice" + (Staged.stage test_sched_global_slice) + ] + +let () = Bechamel_simple_cli.cli benchmarks diff --git a/ocaml/tests/bench/dune b/ocaml/tests/bench/dune index 61f92787759..c27b356fd48 100644 --- a/ocaml/tests/bench/dune +++ b/ocaml/tests/bench/dune @@ -1,5 +1,6 @@ (executables (names + bench_timeslice bench_tracing bench_uuid bench_throttle2 @@ -10,6 +11,7 @@ bechamel bechamel-notty notty.unix + tgroup tracing_export threads.posix fmt @@ -18,4 +20,6 @@ xapi_aux tests_common log - xapi_internal)) + xapi_internal + xapi_timeslice + xapi-stdext-threads)) diff --git a/ocaml/xapi-consts/constants.ml b/ocaml/xapi-consts/constants.ml index 3072a459c00..b9699108736 100644 --- a/ocaml/xapi-consts/constants.ml +++ b/ocaml/xapi-consts/constants.ml @@ -422,3 +422,5 @@ let observer_components_all = let tgroups_enabled = ref false let when_tgroups_enabled f = if !tgroups_enabled then f () else () + +let runtime_sched = ref false diff --git a/ocaml/xapi/helpers.ml b/ocaml/xapi/helpers.ml index 2ef16112053..96772a7c362 100644 --- a/ocaml/xapi/helpers.ml +++ b/ocaml/xapi/helpers.ml @@ -427,6 +427,23 @@ let make_rpc ~__context rpc : Rpc.response = in let http = xmlrpc ~subtask_of ~version:"1.1" path in let http = TraceHelper.inject_span_into_req tracing http in + let http = + if !Constants.tgroups_enabled then + let thread_ctx = + Xapi_stdext_threads.Threadext.ThreadRuntimeContext.get () + in + let originator = + thread_ctx.tgroup + |> Tgroup.Description.get_originator + |> Tgroup.Description.Originator.to_string + in + let additional_headers = + ("originator", originator) :: http.additional_headers + in + {http with additional_headers} + else + http + in let transport = if Pool_role.is_master () then Unix Xapi_globs.unix_domain_socket diff --git a/ocaml/xapi/server_helpers.ml b/ocaml/xapi/server_helpers.ml index 04aae674472..9afe1b5fb3d 100644 --- a/ocaml/xapi/server_helpers.ml +++ b/ocaml/xapi/server_helpers.ml @@ -142,30 +142,6 @@ let do_dispatch ?session_id ?forward_op ?self:_ supports_async called_fn_name ~supports_async ~label ~http_req ~fd () in - Constants.when_tgroups_enabled (fun () -> - let identity = - try - Option.map - (fun session_id -> - let subject = - Db.Session.get_auth_user_sid ~__context ~self:session_id - in - Tgroup.Group.Identity.make ?user_agent:http_req.user_agent - subject - ) - ( if !Xapi_globs.slave_emergency_mode then - (* in emergency mode we cannot reach the coordinator, - and we must not attempt to make Db calls - *) - None - else - session_id - ) - with _ -> None - in - Tgroup.of_creator (Tgroup.Group.Creator.make ?identity ()) - ) ; - let sync () = let need_complete = not (Context.forwarded_task __context) in exec_with_context ~__context ~need_complete ~called_async @@ -186,14 +162,54 @@ let do_dispatch ?session_id ?forward_op ?self:_ supports_async called_fn_name (* Return task id immediately *) Rpc.success (API.rpc_of_ref_task (Context.get_task_id __context)) in - match sync_ty with - | `Sync -> - sync () - | `Async -> - let need_complete = not (Context.forwarded_task __context) in - async ~need_complete - | `InternalAsync -> - async ~need_complete:true + let f () = + match sync_ty with + | `Sync -> + sync () + | `Async -> + let need_complete = not (Context.forwarded_task __context) in + async ~need_complete + | `InternalAsync -> + async ~need_complete:true + in + if !Constants.tgroups_enabled then ( + let identity = + try + Option.map + (fun session_id -> + let subject = + Db.Session.get_auth_user_sid ~__context ~self:session_id + in + Tgroup.Description.Identity.make ?user_agent:http_req.user_agent + subject + ) + ( if !Xapi_globs.slave_emergency_mode then + (* in emergency mode we cannot reach the coordinator, + and we must not attempt to make Db calls + *) + None + else + session_id + ) + with _ -> None + in + let tgroup = + Tgroup.of_creator (Tgroup.Description.Creator.make ?identity ()) + in + let open Xapi_stdext_threads.Threadext in + let thread_ctx = ThreadRuntimeContext.get () in + (* authenticated_root here should mean a group has not been set yet and + we should set one. otherwise go with what has already been set.*) + if + thread_ctx.tgroup = Tgroup.Description.authenticated_root + || thread_ctx.tgroup = Tgroup.Description.unauthenticated + then + ThreadRuntimeContext.update + (fun thread_ctx -> {thread_ctx with tgroup}) + thread_ctx ; + Tgroup.with_one_thread_of_group tgroup f + ) else + f () (* regardless of forwarding, we are expected to complete the task *) diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml index f7ac9b546d3..44d835271de 100644 --- a/ocaml/xapi/xapi.ml +++ b/ocaml/xapi/xapi.ml @@ -1063,7 +1063,7 @@ let server_init () = , [] , fun () -> Constants.when_tgroups_enabled @@ fun () -> - Tgroup.Cgroup.init Xapi_globs.xapi_requests_cgroup + Tgroup.init Xapi_globs.xapi_requests_cgroup ) ; ( "Registering SMAPIv1 plugins" , [Startup.OnlyMaster] diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index 8bdeac10d06..ba6add0e1d1 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -1726,6 +1726,11 @@ let other_options = , (fun () -> string_of_bool !Constants.tgroups_enabled) , "Turn on tgroups classification" ) + ; ( "runtime-sched" + , Arg.Set Constants.runtime_sched + , (fun () -> string_of_bool !Constants.tgroups_enabled) + , "Turn on tgroups classification" + ) ; event_from_entry ; event_from_task_entry ; event_next_entry diff --git a/ocaml/xapi/xapi_session.ml b/ocaml/xapi/xapi_session.ml index f7fcfdac7e9..9f2ae6cf20f 100644 --- a/ocaml/xapi/xapi_session.ml +++ b/ocaml/xapi/xapi_session.ml @@ -33,6 +33,20 @@ open Client open Auth_signature open Extauth +let update_thread_ctx tgroup = + if !Constants.tgroups_enabled then + let open Xapi_stdext_threads.Threadext in + let thread_ctx = ThreadRuntimeContext.get () in + (* authenticated_root here should mean a group has not been set yet and + we should set one. otherwise go with what has already been set.*) + if + thread_ctx.tgroup = Tgroup.Description.authenticated_root + || thread_ctx.tgroup = Tgroup.Description.unauthenticated + then + ThreadRuntimeContext.update + (fun thread_ctx -> {thread_ctx with tgroup}) + thread_ctx + module AuthFail : sig (* stats are reset each time you query, so if there hasn't been a failed login attempt since the last time the stats @@ -744,16 +758,24 @@ let consider_touching_session rpc session_id = (* Make sure the pool secret matches *) let slave_login_common ~__context ~host_str ~psecret = Context.with_tracing ~__context __FUNCTION__ @@ fun __context -> - Constants.when_tgroups_enabled (fun () -> - Tgroup.of_creator (Tgroup.Group.Creator.make ~intrapool:true ()) - ) ; - - if not (Helpers.PoolSecret.is_authorized psecret) then ( - let msg = "Pool credentials invalid" in - debug "Failed to authenticate slave %s: %s" host_str msg ; - raise - Api_errors.(Server_error (session_authentication_failed, [host_str; msg])) - ) + let f () = + if not (Helpers.PoolSecret.is_authorized psecret) then ( + let msg = "Pool credentials invalid" in + debug "Failed to authenticate slave %s: %s" host_str msg ; + raise + Api_errors.( + Server_error (session_authentication_failed, [host_str; msg]) + ) + ) + in + if !Constants.tgroups_enabled then ( + let tgroup = + Tgroup.of_creator (Tgroup.Description.Creator.make ~intrapool:true ()) + in + update_thread_ctx tgroup ; + Tgroup.with_one_thread_of_group tgroup f + ) else + f () (* Normal login, uses the master's database *) let slave_login ~__context ~host ~psecret = @@ -943,16 +965,22 @@ let login_with_password ~__context ~uname ~pwd ~version:_ ~originator = | Some `root -> (* in this case, the context origin of this login request is a unix socket bound locally to a filename *) (* we trust requests from local unix filename sockets, so no need to authenticate them before login *) - Constants.when_tgroups_enabled (fun () -> + let f () = + login_no_password_common ~__context ~uname:(Some uname) ~originator + ~host:(Helpers.get_localhost ~__context) + ~pool:false ~is_local_superuser:true ~subject:Ref.null + ~auth_user_sid:"" ~auth_user_name:uname ~rbac_permissions:[] + ~db_ref:None ~client_certificate:false + in + if !Constants.tgroups_enabled then ( + let tgroup = Tgroup.of_creator - Tgroup.Group.(Creator.make ~identity:Identity.root_identity ()) - ) ; - - login_no_password_common ~__context ~uname:(Some uname) ~originator - ~host:(Helpers.get_localhost ~__context) - ~pool:false ~is_local_superuser:true ~subject:Ref.null ~auth_user_sid:"" - ~auth_user_name:uname ~rbac_permissions:[] ~db_ref:None - ~client_certificate:false + Tgroup.Description.(Creator.make ~identity:Identity.root_identity ()) + in + update_thread_ctx tgroup ; + Tgroup.with_one_thread_of_group tgroup f + ) else + f () | Some `client_cert -> (* The session was authenticated by stunnel's verification of the client certificate, so we do not need to verify the username/password. Grant access to functions @@ -993,16 +1021,24 @@ let login_with_password ~__context ~uname ~pwd ~version:_ ~originator = debug "Success: local auth, user %s from %s" uname (Context.get_origin __context) ; - Constants.when_tgroups_enabled (fun () -> + let f () = + login_no_password_common ~__context ~uname:(Some uname) ~originator + ~host:(Helpers.get_localhost ~__context) + ~pool:false ~is_local_superuser:true ~subject:Ref.null + ~auth_user_sid:"" ~auth_user_name:uname ~rbac_permissions:[] + ~db_ref:None ~client_certificate:false + in + if !Constants.tgroups_enabled then ( + let tgroup = Tgroup.of_creator - Tgroup.Group.(Creator.make ~identity:Identity.root_identity ()) - ) ; - - login_no_password_common ~__context ~uname:(Some uname) ~originator - ~host:(Helpers.get_localhost ~__context) - ~pool:false ~is_local_superuser:true ~subject:Ref.null - ~auth_user_sid:"" ~auth_user_name:uname ~rbac_permissions:[] - ~db_ref:None ~client_certificate:false + Tgroup.Description.( + Creator.make ~identity:Identity.root_identity () + ) + in + update_thread_ctx tgroup ; + Tgroup.with_one_thread_of_group tgroup f + ) else + f () ) in let thread_delay_and_raise_error ~error uname msg = @@ -1294,21 +1330,27 @@ let login_with_password ~__context ~uname ~pwd ~version:_ ~originator = ~slow_path:query_external_auth in - Constants.when_tgroups_enabled (fun () -> + let f () = + login_no_password_common ~__context ~uname:(Some uname) + ~originator + ~host:(Helpers.get_localhost ~__context) + ~pool:false ~is_local_superuser:false ~subject + ~auth_user_sid:subject_identifier ~auth_user_name:subject_name + ~rbac_permissions ~db_ref:None ~client_certificate:false + in + if !Constants.tgroups_enabled then ( + let tgroup = Tgroup.of_creator - Tgroup.Group.( + Tgroup.Description.( Creator.make ~identity:(Identity.make subject_identifier) () ) - ) ; - - login_no_password_common ~__context ~uname:(Some uname) - ~originator - ~host:(Helpers.get_localhost ~__context) - ~pool:false ~is_local_superuser:false ~subject - ~auth_user_sid:subject_identifier ~auth_user_name:subject_name - ~rbac_permissions ~db_ref:None ~client_certificate:false + in + update_thread_ctx tgroup ; + Tgroup.with_one_thread_of_group tgroup f + ) else + f () (* we only reach this point if for some reason a function above forgot to catch a possible exception in the Auth_signature module*) with | Not_found | Auth_signature.Subject_cannot_be_resolved -> diff --git a/opam/xapi.opam b/opam/xapi.opam index 06380ac4f8a..5c2cbb0c16d 100644 --- a/opam/xapi.opam +++ b/opam/xapi.opam @@ -16,6 +16,7 @@ depends: [ "astring" "base-threads" "base64" + "bechamel" "bos" {with-test} "cdrom" "clock" {= version}