|
21 | 21 | (defonce io-exec clojure.lang.Agent/soloExecutor) |
22 | 22 | (defonce compute-exec clojure.lang.Agent/pooledExecutor) |
23 | 23 |
|
| 24 | +(defn chan->data |
| 25 | + [^clojure.core.async.impl.channels.ManyToManyChannel c] |
| 26 | + (let [b (.buf c)] |
| 27 | + {:buffer-type (if b |
| 28 | + (-> b class .getSimpleName symbol) |
| 29 | + :none) |
| 30 | + :buffer-count (count b) |
| 31 | + :put-count (count (.puts c)) |
| 32 | + :take-count (count (.takes c)) |
| 33 | + :closed? (clojure.core.async.impl.protocols/closed? c)})) |
| 34 | + |
24 | 35 | (defn futurize ^Future [f {:keys [exec]}] |
25 | 36 | (fn [& args] |
26 | 37 | (^[Callable] ExecutorService/.submit |
|
231 | 242 | read-chans (into [control] (-> ins (dissoc ::flow/control) vals)) |
232 | 243 | run |
233 | 244 | #(loop [status :paused, state (when init (init args)), count 0] |
234 | | - (let [pong (fn [] (async/>!! (outs ::flow/report) |
235 | | - #::flow{:report :ping, :pid pid, :status status |
236 | | - :state state, :count count})) |
| 245 | + (let [pong (fn [] |
| 246 | + (let [pins (dissoc ins ::flow/control) |
| 247 | + pouts (dissoc outs ::flow/error ::flow/report)] |
| 248 | + (async/>!! (outs ::flow/report) |
| 249 | + #::flow{:report :ping, :pid pid, :status status |
| 250 | + :state state, :count count |
| 251 | + :ins (zipmap (keys pins) (map chan->data (vals pins))) |
| 252 | + :outs (zipmap (keys pouts) (map chan->data (vals pouts)))}))) |
237 | 253 | handle-command (partial handle-command pid pong) |
238 | 254 | [nstatus nstate count] |
239 | 255 | (try |
|
262 | 278 | (catch Throwable ex |
263 | 279 | (async/>!! (outs ::flow/error) |
264 | 280 | #::flow{:pid pid, :status status, :state state, |
265 | | - :count (inc count), :cid cid, :msg msg :op :step, :ex ex}) |
| 281 | + :count (inc count), :cid cid, :msg msg :op :step, :ex ex}) |
266 | 282 | [status state count]))))) |
267 | 283 | (catch Throwable ex |
268 | 284 | (async/>!! (outs ::flow/error) |
|
0 commit comments