|
10 | 10 | (:require [clojure.core.async :as async] |
11 | 11 | [clojure.core.async.flow :as-alias flow] |
12 | 12 | [clojure.core.async.flow.spi :as spi] |
13 | | - [clojure.core.async.flow.impl.graph :as graph]) |
| 13 | + [clojure.core.async.flow.impl.graph :as graph] |
| 14 | + [clojure.walk :as walk] |
| 15 | + [clojure.datafy :as datafy]) |
14 | 16 | (:import [java.util.concurrent Future Executors ExecutorService TimeUnit] |
15 | 17 | [java.util.concurrent.locks ReentrantLock])) |
16 | 18 |
|
|
21 | 23 | (defonce io-exec clojure.lang.Agent/soloExecutor) |
22 | 24 | (defonce compute-exec clojure.lang.Agent/pooledExecutor) |
23 | 25 |
|
| 26 | +(defn oid [x] |
| 27 | + (symbol (str (-> x class .getSimpleName) "@" (-> x System/identityHashCode Integer/toHexString)))) |
| 28 | + |
24 | 29 | (defn chan->data |
25 | 30 | [^clojure.core.async.impl.channels.ManyToManyChannel c] |
26 | | - (let [b (.buf c)] |
27 | | - {:buffer-type (if b |
28 | | - (-> b class .getSimpleName symbol) |
29 | | - :none) |
30 | | - :buffer-count (count b) |
31 | | - :put-count (count (.puts c)) |
32 | | - :take-count (count (.takes c)) |
33 | | - :closed? (clojure.core.async.impl.protocols/closed? c)})) |
| 31 | + (let [b (.buf c)] |
| 32 | + {:buffer (if (some? b) (oid b) :none) |
| 33 | + :buffer-count (count b) |
| 34 | + :put-count (count (.puts c)) |
| 35 | + :take-count (count (.takes c)) |
| 36 | + :closed? (clojure.core.async.impl.protocols/closed? c)})) |
| 37 | + |
| 38 | +(defn exec->data [exec] |
| 39 | + (let [ess (as-> (str exec) ^String es |
| 40 | + (.substring es (inc (.lastIndexOf es "[")) (.lastIndexOf es "]")) |
| 41 | + (.split es ","))] |
| 42 | + (merge {:id (oid exec) |
| 43 | + :status (first ess)} ;;TODO less fragile |
| 44 | + (zipmap [:pool-size :active-threads :queued-tasks :completed-tasks] |
| 45 | + (map #(-> ^String % (.substring (inc (.lastIndexOf ^String % " "))) Long.) (rest ess)))))) |
| 46 | + |
| 47 | +(defn datafy [x] |
| 48 | + (condp instance? x |
| 49 | + clojure.lang.Fn (-> x str symbol) |
| 50 | + ExecutorService (exec->data x) |
| 51 | + clojure.lang.Var (symbol x) |
| 52 | + clojure.core.async.impl.channels.ManyToManyChannel (chan->data x) |
| 53 | + (datafy/datafy x))) |
34 | 54 |
|
35 | 55 | (defn futurize ^Future [f {:keys [exec]}] |
36 | 56 | (fn [& args] |
|
92 | 112 | ret)))] |
93 | 113 | (if (= to ::flow/all) ret (-> ret vals first))))] |
94 | 114 | (reify |
| 115 | + clojure.core.protocols/Datafiable |
| 116 | + (datafy [_] |
| 117 | + (walk/postwalk datafy {:procs procs, :conns conns, :execs execs |
| 118 | + :chans (select-keys @chans [:ins :outs :error :report])})) |
| 119 | + |
95 | 120 | clojure.core.async.flow.impl.graph.Graph |
96 | 121 | (start [_] |
97 | 122 | (.lock lock) |
|
237 | 262 | workload (or workload (:workload desc) :mixed)] |
238 | 263 | (assert (or (not params) init) "must have :init if :params") |
239 | 264 | (reify |
240 | | - clojure.core.protocols/Datafiable |
241 | | - (datafy [_] |
242 | | - (let [{:keys [params ins outs]} desc] |
243 | | - {:impl impl :params (-> params keys vec) :ins (-> ins keys vec) :outs (-> outs keys vec)})) |
244 | | - spi/ProcLauncher |
245 | | - (describe [_] desc) |
246 | | - (start [_ {:keys [pid args ins outs resolver]}] |
247 | | - (assert (or (not params) args) "must provide :args if :params") |
248 | | - (let [comp? (= workload :compute) |
249 | | - transform (cond-> transform (= workload :compute) |
250 | | - #(.get (futurize transform {:exec (spi/get-exec resolver :compute)}) |
251 | | - compute-timeout-ms TimeUnit/MILLISECONDS)) |
252 | | - exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io)) |
253 | | - state (when init (init args)) |
254 | | - ins (into (or ins {}) (::flow/in-ports state)) |
255 | | - outs (into (or outs {}) (::flow/out-ports state)) |
256 | | - io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs))) |
257 | | - control (::flow/control ins) |
258 | | - read-ins (dissoc ins ::flow/control) |
259 | | - run |
260 | | - #(loop [status :paused, state state, count 0, read-ins read-ins] |
261 | | - (let [pong (fn [c] |
262 | | - (let [pins (dissoc ins ::flow/control) |
263 | | - pouts (dissoc outs ::flow/error ::flow/report)] |
264 | | - (async/>!! c ;;(outs ::flow/report) |
265 | | - #::flow{:report :ping, :pid pid, :status status |
266 | | - :state state, :count count |
267 | | - :ins (zipmap (keys pins) (map chan->data (vals pins))) |
268 | | - :outs (zipmap (keys pouts) (map chan->data (vals pouts)))}))) |
269 | | - handle-command (partial handle-command pid pong) |
270 | | - [nstatus nstate count read-ins] |
271 | | - (try |
272 | | - (if (= status :paused) |
273 | | - (let [nstatus (handle-command status (async/<!! control)) |
| 265 | + clojure.core.protocols/Datafiable |
| 266 | + (datafy [_] |
| 267 | + (let [{:keys [params ins outs]} desc] |
| 268 | + (walk/postwalk datafy {:impl impl :params (-> params keys vec) |
| 269 | + :ins (-> ins keys vec) :outs (-> outs keys vec)}))) |
| 270 | + spi/ProcLauncher |
| 271 | + (describe [_] desc) |
| 272 | + (start [_ {:keys [pid args ins outs resolver]}] |
| 273 | + (assert (or (not params) args) "must provide :args if :params") |
| 274 | + (let [comp? (= workload :compute) |
| 275 | + transform (cond-> transform (= workload :compute) |
| 276 | + #(.get (futurize transform {:exec (spi/get-exec resolver :compute)}) |
| 277 | + compute-timeout-ms TimeUnit/MILLISECONDS)) |
| 278 | + exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io)) |
| 279 | + state (when init (init args)) |
| 280 | + ins (into (or ins {}) (::flow/in-ports state)) |
| 281 | + outs (into (or outs {}) (::flow/out-ports state)) |
| 282 | + io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs))) |
| 283 | + control (::flow/control ins) |
| 284 | + read-ins (dissoc ins ::flow/control) |
| 285 | + run |
| 286 | + #(loop [status :paused, state state, count 0, read-ins read-ins] |
| 287 | + (let [pong (fn [c] |
| 288 | + (let [pins (dissoc ins ::flow/control) |
| 289 | + pouts (dissoc outs ::flow/error ::flow/report)] |
| 290 | + (async/>!! c (walk/postwalk datafy |
| 291 | + #::flow{:pid pid, :status status |
| 292 | + :state state, :count count |
| 293 | + :ins pins :outs pouts})))) |
| 294 | + handle-command (partial handle-command pid pong) |
| 295 | + [nstatus nstate count read-ins] |
| 296 | + (try |
| 297 | + (if (= status :paused) |
| 298 | + (let [nstatus (handle-command status (async/<!! control)) |
| 299 | + nstate (handle-transition transition status nstatus state)] |
| 300 | + [nstatus nstate count read-ins]) |
| 301 | + ;;:running |
| 302 | + (let [ ;;TODO rotate/randomize after control per normal alts? |
| 303 | + read-chans (let [ipred (or (::flow/input-filter state) identity)] |
| 304 | + (reduce-kv (fn [ret cid chan] |
| 305 | + (if (ipred cid) |
| 306 | + (conj ret chan) |
| 307 | + ret)) |
| 308 | + [control] read-ins)) |
| 309 | + [msg c] (async/alts!! read-chans :priority true) |
| 310 | + cid (io-id c)] |
| 311 | + (if (= c control) |
| 312 | + (let [nstatus (handle-command status msg) |
274 | 313 | nstate (handle-transition transition status nstatus state)] |
275 | 314 | [nstatus nstate count read-ins]) |
276 | | - ;;:running |
277 | | - (let [;;TODO rotate/randomize after control per normal alts? |
278 | | - read-chans (let [ipred (or (::flow/input-filter state) identity)] |
279 | | - (reduce-kv (fn [ret cid chan] |
280 | | - (if (ipred cid) |
281 | | - (conj ret chan) |
282 | | - ret)) |
283 | | - [control] read-ins)) |
284 | | - [msg c] (async/alts!! read-chans :priority true) |
285 | | - cid (io-id c)] |
286 | | - (if (= c control) |
287 | | - (let [nstatus (handle-command status msg) |
288 | | - nstate (handle-transition transition status nstatus state)] |
289 | | - [nstatus nstate count read-ins]) |
290 | | - (try |
291 | | - (let [[nstate outputs] (transform state cid msg) |
292 | | - [nstatus nstate] |
293 | | - (send-outputs status nstate outputs outs |
294 | | - resolver control handle-command transition)] |
295 | | - [nstatus nstate (inc count) (if (some? msg) |
296 | | - read-ins |
297 | | - (dissoc read-ins cid))]) |
298 | | - (catch Throwable ex |
299 | | - (async/>!! (outs ::flow/error) |
300 | | - #::flow{:pid pid, :status status, :state state, |
301 | | - :count (inc count), :cid cid, :msg msg :op :step, :ex ex}) |
302 | | - [status state count read-ins]))))) |
303 | | - (catch Throwable ex |
304 | | - (async/>!! (outs ::flow/error) |
305 | | - #::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex}) |
306 | | - [status state count read-ins]))] |
307 | | - (when-not (= nstatus :exit) ;;fall out |
308 | | - (recur nstatus nstate (long count) read-ins))))] |
309 | | - ((futurize run {:exec exs}))))))) |
| 315 | + (try |
| 316 | + (let [[nstate outputs] (transform state cid msg) |
| 317 | + [nstatus nstate] |
| 318 | + (send-outputs status nstate outputs outs |
| 319 | + resolver control handle-command transition)] |
| 320 | + [nstatus nstate (inc count) (if (some? msg) |
| 321 | + read-ins |
| 322 | + (dissoc read-ins cid))]) |
| 323 | + (catch Throwable ex |
| 324 | + (async/>!! (outs ::flow/error) |
| 325 | + #::flow{:pid pid, :status status, :state state, |
| 326 | + :count (inc count), :cid cid, :msg msg :op :step, :ex ex}) |
| 327 | + [status state count read-ins]))))) |
| 328 | + (catch Throwable ex |
| 329 | + (async/>!! (outs ::flow/error) |
| 330 | + #::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex}) |
| 331 | + [status state count read-ins]))] |
| 332 | + (when-not (= nstatus :exit) ;;fall out |
| 333 | + (recur nstatus nstate (long count) read-ins))))] |
| 334 | + ((futurize run {:exec exs}))))))) |
0 commit comments