|
8 | 8 |
|
9 | 9 | (ns ^{:skip-wiki true} |
10 | 10 | clojure.core.async.impl.dispatch |
11 | | - (:require [clojure.core.async.impl.protocols :as impl] |
12 | | - [clojure.core.async.impl.exec.threadpool :as tp]) |
13 | | - (:import [java.util.concurrent Executors ExecutorService])) |
| 11 | + (:require [clojure.core.async.impl.protocols :as impl]) |
| 12 | + (:import [java.util.concurrent Executors ExecutorService ThreadFactory])) |
14 | 13 |
|
15 | 14 | (set! *warn-on-reflection* true) |
16 | 15 |
|
| 16 | +(defn counted-thread-factory |
| 17 | + "Create a ThreadFactory that maintains a counter for naming Threads. |
| 18 | + name-format specifies thread names - use %d to include counter |
| 19 | + daemon is a flag for whether threads are daemons or not |
| 20 | + opts is an options map: |
| 21 | + init-fn - function to run when thread is created" |
| 22 | + ([name-format daemon] |
| 23 | + (counted-thread-factory name-format daemon nil)) |
| 24 | + ([name-format daemon {:keys [init-fn] :as opts}] |
| 25 | + (let [counter (atom 0)] |
| 26 | + (reify |
| 27 | + ThreadFactory |
| 28 | + (newThread [_this runnable] |
| 29 | + (let [body (if init-fn |
| 30 | + (fn [] (init-fn) (.run ^Runnable runnable)) |
| 31 | + runnable) |
| 32 | + t (Thread. ^Runnable body)] |
| 33 | + (doto t |
| 34 | + (.setName (format name-format (swap! counter inc))) |
| 35 | + (.setDaemon daemon)))))))) |
| 36 | + |
| 37 | +(defonce |
| 38 | + ^{:doc "Number of processors reported by the JVM"} |
| 39 | + processors (.availableProcessors (Runtime/getRuntime))) |
| 40 | + |
| 41 | +(def ^:private pool-size |
| 42 | + "Value is set via clojure.core.async.pool-size system property; defaults to 8; uses a |
| 43 | + delay so property can be set from code after core.async namespace is loaded but before |
| 44 | + any use of the async thread pool." |
| 45 | + (delay (or (Long/getLong "clojure.core.async.pool-size") 8))) |
| 46 | + |
| 47 | +(defn thread-pool-executor |
| 48 | + ([] |
| 49 | + (thread-pool-executor nil)) |
| 50 | + ([init-fn] |
| 51 | + (let [executor-svc (Executors/newFixedThreadPool |
| 52 | + @pool-size |
| 53 | + (counted-thread-factory "async-dispatch-%d" true |
| 54 | + {:init-fn init-fn}))] |
| 55 | + (reify impl/Executor |
| 56 | + (impl/exec [_ r] |
| 57 | + (.execute executor-svc ^Runnable r)))))) |
| 58 | + |
17 | 59 | (defonce ^:private in-dispatch (ThreadLocal.)) |
18 | 60 |
|
19 | 61 | (defonce executor |
20 | | - (delay (tp/thread-pool-executor #(.set ^ThreadLocal in-dispatch true)))) |
| 62 | + (delay (thread-pool-executor #(.set ^ThreadLocal in-dispatch true)))) |
21 | 63 |
|
22 | 64 | (defn in-dispatch-thread? |
23 | 65 | "Returns true if the current thread is a go block dispatch pool thread" |
|
39 | 81 | nil) |
40 | 82 |
|
41 | 83 | (defonce ^ExecutorService mixed-executor |
42 | | - (Executors/newCachedThreadPool (conc/counted-thread-factory "async-mixed-%d" true))) |
| 84 | + (Executors/newCachedThreadPool (counted-thread-factory "async-mixed-%d" true))) |
43 | 85 |
|
44 | 86 | (defonce ^ExecutorService io-executor |
45 | | - (Executors/newCachedThreadPool (conc/counted-thread-factory "async-io-%d" true))) |
| 87 | + (Executors/newCachedThreadPool (counted-thread-factory "async-io-%d" true))) |
46 | 88 |
|
47 | 89 | (defonce ^ExecutorService compute-executor |
48 | | - (Executors/newCachedThreadPool (conc/counted-thread-factory "async-compute-%d" true))) |
| 90 | + (Executors/newCachedThreadPool (counted-thread-factory "async-compute-%d" true))) |
49 | 91 |
|
50 | 92 | (defn run |
51 | 93 | "Runs Runnable r on current thread when :on-caller? meta true, else in a thread pool thread." |
|
57 | 99 | (defn exec |
58 | 100 | [f exec] |
59 | 101 | (let [^ExecutorService e (case exec |
60 | | - :compute tp/compute-executor |
61 | | - :io tp/io-executor |
62 | | - tp/mixed-executor)] |
| 102 | + :compute compute-executor |
| 103 | + :io io-executor |
| 104 | + mixed-executor)] |
63 | 105 | (.execute e f))) |
0 commit comments