| 
142 | 142 |   [g [pid io-id :as coord] msgs] (g/inject g coord msgs))  | 
143 | 143 | 
 
  | 
144 | 144 | (defn process  | 
145 |  | -  "Given a function of four arities (0-3), aka the 'step-fn', or a map  | 
146 |  | -  of functions corresponding thereto (described below), returns a  | 
147 |  | -  launcher that creates a process compliant with the process  | 
 | 145 | +  "Given a function of four arities (0-3), aka the 'step-fn',  | 
 | 146 | +  returns a launcher that creates a process compliant with the process  | 
148 | 147 |   protocol (see the spi/ProcLauncher doc).  | 
149 | 148 | 
  | 
150 |  | -  The possible arities/entries for the step-fn/map are  | 
 | 149 | +  The possible arities for the step-fn are  | 
151 | 150 | 
  | 
152 |  | -  0 - :describe,  | 
153 |  | -  1 - :init,  | 
154 |  | -  2 - :transition  | 
155 |  | -  3 - :transform.  | 
 | 151 | +  0 - 'describe',   () -> description  | 
 | 152 | +  1 - 'init',       (arg-map) -> initial-state  | 
 | 153 | +  2 - 'transition', (state transition) -> state'  | 
 | 154 | +  3 - 'transform',  (state input msg) -> [state' output-map]  | 
156 | 155 | 
  | 
157 | 156 |   This is the core facility for defining the logic for processes via  | 
158 | 157 |   ordinary functions. Using a var holding a fn as the 'step-fn' is the  | 
159 | 158 |   preferred method for defining a proc, as it enables  | 
160 | 159 |   hot-code-reloading of the proc logic in a flow, and better names in  | 
161 |  | -  datafy. You can use the map form to compose the proc logic from  | 
162 |  | -  disparate functions or to leverage the optionality of some of the  | 
163 |  | -  entry points.  | 
 | 160 | +  datafy.  | 
164 | 161 | 
  | 
165 |  | -  arity 0, or :describe - required, () -> description  | 
 | 162 | +  arity 0 - 'describe', () -> description  | 
166 | 163 |   where description is a map with keys :params :ins and :outs, each of which  | 
167 | 164 |   in turn is a map of keyword to doc string, and :workload with  | 
168 | 165 |   possible values of :mixed :io :compute. All entries in the describe  | 
 | 
182 | 179 |   the proc. It will also be called by the impl in order to discover  | 
183 | 180 |   what channels are needed.  | 
184 | 181 | 
  | 
185 |  | -  arity 1, or :init - optional, (arg-map) -> initial-state  | 
 | 182 | +  arity 1 - 'init', (arg-map) -> initial-state  | 
186 | 183 |     | 
187 |  | -  init will be called once by the process to establish any initial  | 
 | 184 | +  The init arity will be called once by the process to establish any initial  | 
188 | 185 |   state. The arg-map will be a map of param->val, as supplied in the  | 
189 | 186 |   flow def. The key ::flow/pid will be added, mapped to the pid  | 
190 | 187 |   associated with the process (useful e.g. if the process wants to  | 
191 |  | -  refer to itself in reply-to coordinates). init must be provided if  | 
192 |  | -  'describe' returns :params.  | 
 | 188 | +  refer to itself in reply-to coordinates).   | 
193 | 189 | 
  | 
194 | 190 |   Optionally, a returned init state may contain the  | 
195 | 191 |   keys ::flow/in-ports and/or ::flow/out-ports. These should be maps  | 
 | 
200 | 196 |   outside of it. Use :transition to coordinate the lifecycle of these  | 
201 | 197 |   external channels.  | 
202 | 198 | 
  | 
203 |  | -  Optionally, _any_ returned state, whether from :init, :transition  | 
204 |  | -  or :transform, may contain the key ::flow/input-filter, a predicate  | 
 | 199 | +  Optionally, _any_ returned state, whether from init, transition  | 
 | 200 | +  or transform, may contain the key ::flow/input-filter, a predicate  | 
205 | 201 |   of cid. Only inputs (including in-ports) satisfying the predicate  | 
206 | 202 |   will be part of the next channel read set. In the absence of this  | 
207 | 203 |   predicate all inputs are read.  | 
208 | 204 | 
  | 
209 |  | -  arity 2, or :transition - optional, (state transition) -> state'  | 
 | 205 | +  arity 2 - 'transition', (state transition) -> state'  | 
210 | 206 | 
  | 
211 |  | -  transition will be called when the process makes a state transition,  | 
212 |  | -  transition being one of ::flow/resume, ::flow/pause or ::flow/stop  | 
 | 207 | +  The transition arity will be called when the process makes a state  | 
 | 208 | +  transition, transition being one of ::flow/resume, ::flow/pause  | 
 | 209 | +  or ::flow/stop  | 
213 | 210 | 
  | 
214 |  | -  With this fn a process impl can track changes and coordinate  | 
 | 211 | +  With this a process impl can track changes and coordinate  | 
215 | 212 |   resources, especially cleaning up any resources on :stop, since the  | 
216 | 213 |   process will no longer be used following that. See the SPI for  | 
217 | 214 |   details. state' will be the state supplied to subsequent calls.  | 
218 | 215 | 
  | 
219 |  | -  arity 3, or :transform - required, (state in-name msg) -> [state' output]  | 
 | 216 | +  arity 3 - 'transform', (state in-name msg) -> [state' output]  | 
220 | 217 |   where output is a map of outid->[msgs*]  | 
221 | 218 | 
  | 
222 |  | -  The transform fn will be called every time a message arrives at any  | 
 | 219 | +  The transform arity will be called every time a message arrives at any  | 
223 | 220 |   of the inputs. Output can be sent to none, any or all of the :outs  | 
224 | 221 |   enumerated, and/or an input named by a [pid inid] tuple (e.g. for  | 
225 | 222 |   reply-to), and/or to the ::flow/report output. A step need not  | 
226 | 223 |   output at all (output or msgs can be empyt/nil), however an output _message_  | 
227 | 224 |   may never be nil (per core.async channels). state' will be the state  | 
228 | 225 |   supplied to subsequent calls.  | 
229 | 226 | 
  | 
230 |  | -  process accepts an option map with keys:  | 
 | 227 | +  process also accepts an option map with keys:  | 
231 | 228 |   :workload - one of :mixed, :io or :compute  | 
232 | 229 |   :compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)  | 
233 | 230 |                 will be used when getting the return from the future - see below  | 
 | 
242 | 239 | 
  | 
243 | 240 |   When :io is specified, transform should not do extensive computation.  | 
244 | 241 | 
  | 
245 |  | -  When :compute is specified (only allowed for :transform), each call  | 
246 |  | -  to transform will be run in a separate thread. The process loop will  | 
247 |  | -  run in an :io context (since it no longer directly calls transform,  | 
248 |  | -  all it does is I/O) and it will submit transform to the :compute  | 
249 |  | -  executor then await (blocking, for compute-timeout-ms) the  | 
250 |  | -  completion of the future returned by the executor. If the future  | 
251 |  | -  times out it will be reported on ::flow/error.  | 
 | 242 | +  When :compute is specified, each call to transform will be run in a  | 
 | 243 | +  separate thread. The process loop will run in an :io context (since  | 
 | 244 | +  it no longer directly calls transform, all it does is I/O) and it  | 
 | 245 | +  will submit transform to the :compute executor then await (blocking,  | 
 | 246 | +  for compute-timeout-ms) the completion of the future returned by the  | 
 | 247 | +  executor. If the future times out it will be reported  | 
 | 248 | +  on ::flow/error.  | 
252 | 249 | 
  | 
253 | 250 |   When :compute is specified transform must not block!"  | 
254 |  | -  ([fn-or-map] (process fn-or-map nil))  | 
255 |  | -  ([fn-or-map {:keys [workload compute-timeout-ms] :as opts}]  | 
256 |  | -   (impl/proc fn-or-map opts)))  | 
 | 251 | +  ([step-fn] (process step-fn nil))  | 
 | 252 | +  ([step-fn {:keys [workload compute-timeout-ms] :as opts}]  | 
 | 253 | +   (impl/proc step-fn opts)))  | 
 | 254 | + | 
 | 255 | +(defn map->step  | 
 | 256 | +  "given a map of functions corresponding to step fn arities (see  | 
 | 257 | +  'process'), returns a step fn suitable for passing to 'process'. You  | 
 | 258 | +  can use this map form to compose the proc logic from disparate  | 
 | 259 | +  functions or to leverage the optionality of some of the entry  | 
 | 260 | +  points.  | 
 | 261 | +
  | 
 | 262 | +  The keys in the map are:  | 
 | 263 | +  :describe, arity 0 - required  | 
 | 264 | +  :init, arity 1 - optional, but should be provided if 'describe' returns :params.  | 
 | 265 | +  :transition, arity 2 - optional  | 
 | 266 | +  :transform, arity 3 - required"  | 
 | 267 | +  [{:keys [describe init transition transform]}]  | 
 | 268 | +  (assert (and describe transform) "must provide :describe and :transform")  | 
 | 269 | +  (fn  | 
 | 270 | +    ([] (describe))  | 
 | 271 | +    ([arg-map] (when init (init arg-map)))  | 
 | 272 | +    ([state trans] (if transition (transition state trans) state))  | 
 | 273 | +    ([state input msg] (transform state input msg))))  | 
257 | 274 | 
 
  | 
258 | 275 | (defn lift*->step  | 
259 | 276 |   "given a fn f taking one arg and returning a collection of non-nil  | 
 | 
263 | 280 |   (fn  | 
264 | 281 |     ([] {:ins {:in (str "the argument to " f)}  | 
265 | 282 |          :outs {:out (str "the return of " f)}})  | 
266 |  | -    ([_] nil)  | 
267 |  | -    ([_ _] nil)  | 
268 |  | -    ([_ _ msg] [nil {:out (f msg)}])))  | 
 | 283 | +    ([arg-map] nil)  | 
 | 284 | +    ([state transition] nil)  | 
 | 285 | +    ([state input msg] [nil {:out (f msg)}])))  | 
269 | 286 | 
 
  | 
270 | 287 | (defn lift1->step  | 
271 | 288 |   "like lift*->step except taking a fn returning one value, which when  | 
272 | 289 |   nil will yield no output."  | 
273 | 290 |   [f]  | 
274 |  | -  (lift*->step #(when-some [m (f %)] (vector m))))  | 
 | 291 | +  (fn  | 
 | 292 | +    ([] {:ins {:in (str "the argument to " f)}  | 
 | 293 | +         :outs {:out (str "the return of " f)}})  | 
 | 294 | +    ([arg-map] nil)  | 
 | 295 | +    ([state transition] nil)  | 
 | 296 | +    ([state input msg] [nil (when-some [m (f msg)] {:out (vector m)})])))  | 
275 | 297 | 
 
  | 
276 | 298 | (defn futurize  | 
277 | 299 |   "Takes a fn f and returns a fn that takes the same arguments as f  | 
 | 
0 commit comments