|
80 | 80 | (.uncaughtException (Thread/currentThread) ex)) |
81 | 81 | nil) |
82 | 82 |
|
83 | | -(def ^:private workflow->es-factory-props |
84 | | - {:compute {:sys-prop "clojure.core.async.compute-es-fn" |
85 | | - :default #(Executors/newCachedThreadPool (counted-thread-factory "async-compute-%d" true))} |
86 | | - :io {:sys-prop "clojure.core.async.io-es-fn" |
87 | | - :default #(Executors/newCachedThreadPool (counted-thread-factory "async-io-%d" true))} |
88 | | - :mixed {:sys-prop "clojure.core.async.mixed-es-fn" |
89 | | - :default #(Executors/newCachedThreadPool (counted-thread-factory "async-mixed-%d" true))}}) |
90 | | - |
91 | | -(defn construct-es |
92 | | - [workload] |
93 | | - (let [{:keys [sys-prop default]} (workflow->es-factory-props workload) |
94 | | - es-fn (or (when-let [esf (and sys-prop (System/getProperty sys-prop))] |
95 | | - (requiring-resolve (symbol esf))) |
96 | | - default)] |
97 | | - (if es-fn |
98 | | - (es-fn) |
99 | | - (throw (IllegalArgumentException. (str "Illegal workload tag " workload)))))) |
100 | | - |
101 | | -(defonce ^ExecutorService mixed-executor (construct-es :mixed)) |
| 83 | +(defn- executor-ctor |
| 84 | + [workflow] |
| 85 | + #(Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name %) "-%d") true))) |
102 | 86 |
|
103 | | -(defonce ^ExecutorService io-executor (construct-es :io)) |
| 87 | +(def ^:private workflow->es-ctor |
| 88 | + {:compute (executor-ctor :compute) |
| 89 | + :io (executor-ctor :io) |
| 90 | + :mixed (executor-ctor :mixed)}) |
104 | 91 |
|
105 | | -(defonce ^ExecutorService compute-executor (construct-es :compute)) |
| 92 | +(defn construct-executor |
| 93 | + [workload] |
| 94 | + (let [default-ctor (workflow->es-ctor workload)] |
| 95 | + (if-let [sysprop-ctor (when-let [esf (System/getProperty "clojure.core.async.executor-factory")] |
| 96 | + (requiring-resolve (symbol esf)))] |
| 97 | + (or (sysprop-ctor workload) (default-ctor workload)) |
| 98 | + (default-ctor workload)))) |
106 | 99 |
|
107 | | -(defn es-for [workload] |
108 | | - (case workload |
109 | | - :compute compute-executor |
110 | | - :io io-executor |
111 | | - :mixed mixed-executor |
112 | | - nil)) |
| 100 | +(def executor-for |
| 101 | + {:compute (construct-executor :compute) |
| 102 | + :io (construct-executor :io) |
| 103 | + :mixed (construct-executor :mixed)}) |
113 | 104 |
|
114 | 105 | (defn exec |
115 | 106 | [^Runnable r workload] |
116 | | - (if-let [^ExecutorService e (es-for workload)] |
117 | | - (.execute e r) |
118 | | - (impl/exec @executor r))) |
| 107 | + (let [^ExecutorService e (executor-for workload)] |
| 108 | + (.execute e r))) |
119 | 109 |
|
120 | 110 | (defn run |
121 | 111 | "Runs Runnable r on current thread when :on-caller? meta true, else in a thread pool thread." |
122 | | - ([^Runnable r] |
123 | | - (if (-> r meta :on-caller?) |
124 | | - (try (.run r) (catch Throwable t (ex-handler t))) |
125 | | - (exec r nil))) |
126 | | - ([^Runnable r workload] |
127 | | - (exec r workload))) |
| 112 | + [^Runnable r] |
| 113 | + (if (-> r meta :on-caller?) |
| 114 | + (try (.run r) (catch Throwable t (ex-handler t))) |
| 115 | + (impl/exec @executor r))) |
128 | 116 |
|
0 commit comments