|
241 | 241 | outs (into (or outs {}) (::flow/out-ports state)) |
242 | 242 | io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs))) |
243 | 243 | control (::flow/control ins) |
244 | | - ;;TODO rotate/randomize after control per normal alts? |
245 | | - read-chans (vec (-> ins (dissoc ::flow/control) vals)) |
| 244 | + read-ins (dissoc ins ::flow/control) |
246 | 245 | run |
247 | | - #(loop [status :paused, state state, count 0, read-chans read-chans] |
| 246 | + #(loop [status :paused, state state, count 0, read-ins read-ins] |
248 | 247 | (let [pong (fn [] |
249 | 248 | (let [pins (dissoc ins ::flow/control) |
250 | 249 | pouts (dissoc outs ::flow/error ::flow/report)] |
|
254 | 253 | :ins (zipmap (keys pins) (map chan->data (vals pins))) |
255 | 254 | :outs (zipmap (keys pouts) (map chan->data (vals pouts)))}))) |
256 | 255 | handle-command (partial handle-command pid pong) |
257 | | - [nstatus nstate count read-chans] |
| 256 | + [nstatus nstate count read-ins] |
258 | 257 | (try |
259 | 258 | (if (= status :paused) |
260 | 259 | (let [nstatus (handle-command status (async/<!! control)) |
261 | 260 | nstate (handle-transition transition status nstatus state)] |
262 | | - [nstatus nstate count read-chans]) |
| 261 | + [nstatus nstate count read-ins]) |
263 | 262 | ;;:running |
264 | | - (let [[msg c] (async/alts!! (into [control] read-chans) :priority true) |
| 263 | + (let [;;TODO rotate/randomize after control per normal alts? |
| 264 | + read-chans (let [ipred (or (::flow/input-filter state) identity)] |
| 265 | + (reduce-kv (fn [ret cid chan] |
| 266 | + (if (ipred cid) |
| 267 | + (conj ret chan) |
| 268 | + ret)) |
| 269 | + [control] read-ins)) |
| 270 | + [msg c] (async/alts!! read-chans :priority true) |
265 | 271 | cid (io-id c)] |
266 | 272 | (if (= c control) |
267 | 273 | (let [nstatus (handle-command status msg) |
268 | 274 | nstate (handle-transition transition status nstatus state)] |
269 | | - [nstatus nstate count read-chans]) |
| 275 | + [nstatus nstate count read-ins]) |
270 | 276 | (try |
271 | 277 | (let [[nstate outputs] (transform state cid msg) |
272 | 278 | [nstatus nstate] |
273 | 279 | (send-outputs status nstate outputs outs |
274 | 280 | resolver control handle-command transition)] |
275 | 281 | [nstatus nstate (inc count) (if (some? msg) |
276 | | - read-chans |
277 | | - (vec (remove #{c} read-chans)))]) |
| 282 | + read-ins |
| 283 | + (dissoc read-ins cid))]) |
278 | 284 | (catch Throwable ex |
279 | 285 | (async/>!! (outs ::flow/error) |
280 | 286 | #::flow{:pid pid, :status status, :state state, |
281 | 287 | :count (inc count), :cid cid, :msg msg :op :step, :ex ex}) |
282 | | - [status state count read-chans]))))) |
| 288 | + [status state count read-ins]))))) |
283 | 289 | (catch Throwable ex |
284 | 290 | (async/>!! (outs ::flow/error) |
285 | 291 | #::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex}) |
286 | | - [status state count read-chans]))] |
| 292 | + [status state count read-ins]))] |
287 | 293 | (when-not (= nstatus :exit) ;;fall out |
288 | | - (recur nstatus nstate (long count) read-chans))))] |
| 294 | + (recur nstatus nstate (long count) read-ins))))] |
289 | 295 | ((futurize run {:exec exs}))))))) |
0 commit comments