@@ -2,50 +2,126 @@ use std::sync::{Arc, Mutex};
22use std:: thread;
33use std:: time:: Duration ;
44
5+ use once_cell:: sync:: Lazy ;
6+
57use crate :: rt;
68use crate :: task;
7- use crate :: utils:: abort_on_panic;
8-
9- pub fn spawn_thread ( ) {
10- thread:: Builder :: new ( )
11- . name ( "async-std/monitor" . to_string ( ) )
12- . spawn ( || {
13- const PROBING_DURATION_MS : u64 = 500 ;
14- const SCALING_DOWN_SEC : u64 = 1 * 60 ; // 1 minute
15-
16- abort_on_panic ( || {
17- let running = & Arc :: new ( Mutex :: new ( false ) ) ;
18-
19- {
20- let running = Arc :: clone ( running) ;
21- task:: spawn ( async move {
22- loop {
23- * running. lock ( ) . unwrap ( ) = true ;
24- task:: sleep ( Duration :: from_millis ( PROBING_DURATION_MS ) ) . await ;
25- }
26- } ) ;
27- }
28-
29- {
30- task:: spawn ( async {
31- loop {
32- task:: sleep ( Duration :: from_secs ( SCALING_DOWN_SEC ) ) . await ;
33- rt:: scale_down ( ) ;
34- }
35- } ) ;
36- }
37-
38- loop {
39- * running. lock ( ) . unwrap ( ) = false ;
40- thread:: sleep ( Duration :: from_millis ( PROBING_DURATION_MS * 2 ) ) ;
41- if !* running. lock ( ) . unwrap ( ) {
42- eprintln ! (
43- "WARNING: You are blocking the runtime, please use spawn_blocking"
44- ) ;
45- rt:: scale_up ( ) ;
46- }
47- }
48- } )
49- } )
50- . expect ( "cannot start a monitor thread" ) ;
9+
10+ pub ( crate ) static MONITOR : Lazy < Mutex < & ' static Monitor > > =
11+ Lazy :: new ( || Mutex :: new ( & DEFAULT_MONITOR ) ) ;
12+
13+ /// Monitor function.
14+ #[ derive( Debug ) ]
15+ pub struct Monitor ( pub fn ( ) ) ;
16+
17+ impl Monitor {
18+ pub ( crate ) fn run ( & self ) {
19+ self . 0 ( ) ;
20+ }
21+ }
22+
23+ /// Replace the monitor.
24+ ///
25+ /// This will replace monitor function used by monitor thread by runtime.
26+ ///
27+ /// Monitor thread is special thread that can be used to monitor wether
28+ /// the runtime is blocked or not.
29+ ///
30+ /// Default monitor is [`DEFAULT_MONITOR`]
31+ ///
32+ /// [`DEFAULT_MONITOR`]: static.DEFAULT_MONITOR.html
33+ pub fn replace_monitor ( new : & ' static Monitor ) -> & ' static Monitor {
34+ let mut m = MONITOR . lock ( ) . unwrap ( ) ;
35+ let old = & m as & ' static Monitor ;
36+ * m = new;
37+ old
38+ }
39+
40+ /// Default monitor
41+ ///
42+ /// Whenever runtime is blocked, print warning and scale up the runtime.
43+ /// This also try to scale down when there are too many worker thread.
44+ pub static DEFAULT_MONITOR : Monitor = Monitor ( default_monitor) ;
45+
46+ /// Abort on blocking monitor.
47+ ///
48+ /// Whenever runtime is blocked, abort the program.
49+ pub static ABORT_ON_BLOCKING_MONITOR : Monitor = Monitor ( abort_on_blocking_monitor) ;
50+
51+ async fn repeater ( interval : Duration , before : impl Fn ( ) , after : impl Fn ( ) ) {
52+ loop {
53+ before ( ) ;
54+ task:: sleep ( interval) . await ;
55+ after ( ) ;
56+ }
57+ }
58+
59+ fn repeater_blocking ( interval : Duration , before : impl Fn ( ) , after : impl Fn ( ) ) {
60+ loop {
61+ before ( ) ;
62+ thread:: sleep ( interval) ;
63+ after ( ) ;
64+ }
65+ }
66+
67+ fn default_monitor ( ) {
68+ const PROB_INTERVAL : Duration = Duration :: from_millis ( 500 ) ;
69+ const SCALE_DOWN_INTERVAL : Duration = Duration :: from_secs ( 60 ) ;
70+
71+ let running = & Arc :: new ( Mutex :: new ( false ) ) ;
72+
73+ {
74+ let running = Arc :: clone ( running) ;
75+ task:: spawn ( repeater (
76+ PROB_INTERVAL ,
77+ move || {
78+ * running. lock ( ) . unwrap ( ) = true ;
79+ } ,
80+ || { } ,
81+ ) ) ;
82+ }
83+
84+ task:: spawn ( repeater ( SCALE_DOWN_INTERVAL , || rt:: scale_down ( ) , || { } ) ) ;
85+
86+ repeater_blocking (
87+ PROB_INTERVAL + Duration :: from_millis ( 50 ) ,
88+ || {
89+ * running. lock ( ) . unwrap ( ) = false ;
90+ } ,
91+ || {
92+ if !* running. lock ( ) . unwrap ( ) {
93+ eprintln ! ( "WARNING: You are blocking the runtime, please use spawn_blocking" ) ;
94+ rt:: scale_up ( ) ;
95+ }
96+ } ,
97+ ) ;
98+ }
99+
100+ fn abort_on_blocking_monitor ( ) {
101+ const PROB_INTERVAL : Duration = Duration :: from_secs ( 1 ) ;
102+
103+ let running = & Arc :: new ( Mutex :: new ( false ) ) ;
104+
105+ {
106+ let running = Arc :: clone ( running) ;
107+ task:: spawn ( repeater (
108+ PROB_INTERVAL ,
109+ move || {
110+ * running. lock ( ) . unwrap ( ) = true ;
111+ } ,
112+ || { } ,
113+ ) ) ;
114+ }
115+
116+ repeater_blocking (
117+ PROB_INTERVAL + Duration :: from_millis ( 50 ) ,
118+ || {
119+ * running. lock ( ) . unwrap ( ) = false ;
120+ } ,
121+ || {
122+ if !* running. lock ( ) . unwrap ( ) {
123+ panic ! ( "FATAL: You are blocking the runtime, please use spawn_blocking" ) ;
124+ }
125+ } ,
126+ ) ;
51127}
0 commit comments