|
140 | 140 | [g [pid io-id :as coord] msgs] (g/inject g coord msgs)) |
141 | 141 |
|
142 | 142 | (defn process |
143 | | - "Given a map of functions (described below), returns a launcher that |
| 143 | + "Given a function of four arities (0-3), or a map of functions |
| 144 | + corresponding thereto (described below), returns a launcher that |
144 | 145 | creates a process compliant with the process protocol (see the |
145 | | - spi/ProcLauncher doc). The possible entries for process-impl-map |
146 | | - are :describe, :init, :transition and :transform. This is |
147 | | - the core facility for defining the logic for processes via ordinary |
148 | | - functions. |
149 | | -
|
150 | | - :describe - required, () -> desc |
151 | | - where desc is a map with keys :params :ins and :outs, each of which |
| 146 | + spi/ProcLauncher doc). |
| 147 | +
|
| 148 | + The possible arities/entries for fn/map are 0 - :describe, 1 |
| 149 | + - :init, 2 - :transition and 3 - :transform. This is the core |
| 150 | + facility for defining the logic for processes via ordinary |
| 151 | + functions. Using a var holding a fn as the 'fn' is the preferred |
| 152 | + method for defining a proc, as it enables hot-code-reloading of the |
| 153 | + proc logic in a flow, and better names in datafy. You can use the |
| 154 | + map form to compose the proc logic from disparate functions or to |
| 155 | + leverage the optionality of some of the entry points. |
| 156 | +
|
| 157 | + arity 0, or :describe - required, () -> description |
| 158 | + where description is a map with keys :params :ins and :outs, each of which |
152 | 159 | in turn is a map of keyword to doc string, and :workload with |
153 | 160 | possible values of :mixed :io :compute. All entries in the describe |
154 | 161 | return map are optional. |
|
167 | 174 | the proc. It will also be called by the impl in order to discover |
168 | 175 | what channels are needed. |
169 | 176 |
|
170 | | - :init - optional, (arg-map) -> initial-state |
| 177 | + arity 1, or :init - optional, (arg-map) -> initial-state |
171 | 178 | |
172 | 179 | init will be called once by the process to establish any initial |
173 | 180 | state. The arg-map will be a map of param->val, as supplied in the |
|
188 | 195 | will be part of the next channel read set. In the absence of this |
189 | 196 | predicate all inputs are read. |
190 | 197 |
|
191 | | - :transition - optional, (state transition) -> state' |
| 198 | + arity 2, or :transition - optional, (state transition) -> state' |
192 | 199 |
|
193 | 200 | transition will be called when the process makes a state transition, |
194 | 201 | transition being one of ::flow/resume, ::flow/pause or ::flow/stop |
|
198 | 205 | process will no longer be used following that. See the SPI for |
199 | 206 | details. state' will be the state supplied to subsequent calls. |
200 | 207 |
|
201 | | - :transform - required, (state in-name msg) -> [state' output] |
| 208 | + arity 3, or :transform - required, (state in-name msg) -> [state' output] |
202 | 209 | where output is a map of outid->[msgs*] |
203 | 210 |
|
204 | 211 | The transform fn will be called every time a message arrives at any |
|
233 | 240 | times out it will be reported on ::flow/error. |
234 | 241 |
|
235 | 242 | When :compute is specified transform must not block!" |
236 | | - ([process-impl-map] (process process-impl-map nil)) |
237 | | - ([process-impl-map {:keys [workload timeout-ms] |
238 | | - :or {timeout-ms 5000} :as opts}] |
239 | | - (impl/proc process-impl-map opts))) |
240 | | - |
241 | | -(defn step-process |
242 | | - "Given a (e.g. communication-free) step function f of three |
243 | | - arities (described below), and the same opts as 'process', returns a |
244 | | - launcher that creates a process compliant with the process |
245 | | - protocol (see 'process'). |
| 243 | + ([fn-or-map] (process fn-or-map nil)) |
| 244 | + ([fn-or-map {:keys [workload timeout-ms] |
| 245 | + :or {timeout-ms 5000} :as opts}] |
| 246 | + (impl/proc fn-or-map opts))) |
246 | 247 |
|
247 | | - The arities of f are: |
248 | | -
|
249 | | - ()->desc |
250 | | - a function matching the semantics of process' :describe |
| 248 | +(defn lift*->step |
| 249 | + "given a fn f taking one arg and returning a collection of non-nil |
| 250 | + values, create a 'step' fn as needed by step-process, with one input |
| 251 | + and one output (named :in and :out), and no state." |
| 252 | + [f] |
| 253 | + (fn |
| 254 | + ([] {:ins {:in (str "the argument to " f)} |
| 255 | + :outs {:out (str "the return of " f)}}) |
| 256 | + ([_] nil) |
| 257 | + ([_ _] nil) |
| 258 | + ([_ _ msg] [nil {:out (f msg)}]))) |
251 | 259 |
|
252 | | - (arg-map)->initial-state |
253 | | - a function matching the semantics of process' :init |
254 | | - |
255 | | - (state in-name msg)->[state' output] |
256 | | - a function matching the semantics of process' :transform" |
257 | | - ([f] (step-process f nil)) |
258 | | - ([f opts] |
259 | | - (process {:describe f, :init f, :transform f} opts))) |
| 260 | +(defn lift1->step |
| 261 | + "like lift*->step except taking a fn returning one value, which when |
| 262 | + nil will yield no output." |
| 263 | + [f] |
| 264 | + (lift*->step #(when-some [m (f %)] (vector m)))) |
260 | 265 |
|
261 | 266 | (defn futurize |
262 | 267 | "Takes a fn f and returns a fn that takes the same arguments as f |
|
271 | 276 | [f & {:keys [exec] |
272 | 277 | :or {exec :mixed} :as opts}] |
273 | 278 | (impl/futurize f opts)) |
274 | | - |
275 | | -(defn lift*->step |
276 | | - "given a fn f taking one arg and returning a collection of non-nil |
277 | | - values, create a 'step' fn as needed by step-process, with one input |
278 | | - and one output (named :in and :out), and no state." |
279 | | - [f] |
280 | | - (fn |
281 | | - ([] {:ins {:in (str "the argument to " f)} |
282 | | - :outs {:out (str "the return of " f)}}) |
283 | | - ([_] nil) |
284 | | - ([_ _ msg] [nil {:out (f msg)}]))) |
285 | | - |
286 | | -(defn lift1->step |
287 | | - "like lift*->step except taking a fn returning one value, which, when |
288 | | - nil, will yield no output." |
289 | | - [f] |
290 | | - (lift*->step #(when-some [m (f %)] (vector m)))) |
0 commit comments