|
75 | 75 | (throw (ex-info "invalid connection" {:conn conn})))) |
76 | 76 | {} conns) |
77 | 77 | running-chans #(or (deref chans) (throw (Exception. "flow not running"))) |
78 | | - send-command (fn [command to] |
79 | | - (let [{:keys [control]} (running-chans)] |
80 | | - (async/>!! control #::flow{:command command :to to})))] |
| 78 | + send-command (fn sc |
| 79 | + ([cmap] |
| 80 | + (let [{:keys [control]} (running-chans)] |
| 81 | + (async/>!! control cmap))) |
| 82 | + ([command to] (sc #::flow{:command command :to to}))) |
| 83 | + handle-ping (fn [to timeout-ms] |
| 84 | + (let [reply-chan (async/chan (count procs)) |
| 85 | + ret-chan (async/take (if (= to ::flow/all) (count procs) 1) reply-chan) |
| 86 | + timeout (async/timeout timeout-ms) |
| 87 | + _ (send-command #::flow{:command ::flow/ping, :to to, :reply-chan reply-chan}) |
| 88 | + ret (loop [ret nil] |
| 89 | + (let [[{::flow/keys [pid] :as m} c] (async/alts!! [ret-chan timeout])] |
| 90 | + (if (some? m) |
| 91 | + (recur (assoc ret pid m)) |
| 92 | + ret)))] |
| 93 | + (if (= to ::flow/all) ret (-> ret vals first))))] |
81 | 94 | (reify |
82 | 95 | clojure.core.async.flow.impl.graph.Graph |
83 | 96 | (start [_] |
|
156 | 169 | (finally (.unlock lock)))) |
157 | 170 | (pause [_] (send-command ::flow/pause ::flow/all)) |
158 | 171 | (resume [_] (send-command ::flow/resume ::flow/all)) |
159 | | - (ping [_] (send-command ::flow/ping ::flow/all)) |
| 172 | + (ping [_ timeout-ms] (handle-ping ::flow/all timeout-ms)) |
160 | 173 |
|
161 | 174 | (pause-proc [_ pid] (send-command ::flow/pause pid)) |
162 | 175 | (resume-proc [_ pid] (send-command ::flow/resume pid)) |
163 | | - (ping-proc [_ pid] (send-command ::flow/ping pid)) |
| 176 | + (ping-proc [_ pid timeout-ms] (handle-ping pid timeout-ms)) |
164 | 177 | (command-proc [_ pid command kvs] |
165 | 178 | (assert (and (namespace command) (not= (namespace ::flow/command) (namespace command))) |
166 | 179 | "extension commands must be in your own namespace") |
|
177 | 190 | (defn handle-command |
178 | 191 | [pid pong status cmd] |
179 | 192 | (let [transition #::flow{:stop :exit, :resume :running, :pause :paused} |
180 | | - {::flow/keys [to command]} cmd] |
| 193 | + {::flow/keys [to command reply-chan]} cmd] |
181 | 194 | (if (#{::flow/all pid} to) |
182 | 195 | (do |
183 | | - (when (= command ::flow/ping) (pong)) |
| 196 | + (when (= command ::flow/ping) (pong reply-chan)) |
184 | 197 | (or (transition command) status)) |
185 | 198 | status))) |
186 | 199 |
|
|
245 | 258 | read-ins (dissoc ins ::flow/control) |
246 | 259 | run |
247 | 260 | #(loop [status :paused, state state, count 0, read-ins read-ins] |
248 | | - (let [pong (fn [] |
| 261 | + (let [pong (fn [c] |
249 | 262 | (let [pins (dissoc ins ::flow/control) |
250 | 263 | pouts (dissoc outs ::flow/error ::flow/report)] |
251 | | - (async/>!! (outs ::flow/report) |
| 264 | + (async/>!! c ;;(outs ::flow/report) |
252 | 265 | #::flow{:report :ping, :pid pid, :status status |
253 | 266 | :state state, :count count |
254 | 267 | :ins (zipmap (keys pins) (map chan->data (vals pins))) |
|
0 commit comments