Skip to content

Commit dc4883a

Browse files
committed
first cut of broadcast system
1 parent 69fa631 commit dc4883a

File tree

2 files changed

+111
-58
lines changed

2 files changed

+111
-58
lines changed

src/main/clojure/clojure/core/async/flow.clj

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@
3333
details. The flow configuration provides a centralized place for
3434
policy decisions regarding process settings, threading, buffering etc.
3535
36+
Flow also provides a subsystem for broadcast communication of
37+
out-of-band messages without explicit connections or
38+
declarations. This could for example be used to communicate the
39+
passage of (real or virtual) time. Broadcast messages are associated
40+
with (otherwise undeclared) signal-ids, and will be received by
41+
processes selecting those ids. Broadcasts messages will arrive along
42+
with messages from process inputs, so signal-ids must not conflict
43+
with any process input-id. Thus namespaced keywords, UUIDs etc or
44+
tuples thereof are recommended as signal-ids. See process
45+
describe/transform and inject below for details.
46+
3647
It is expected that applications will rarely define instances of the
3748
process protocol but instead use the API function 'process' that
3849
implements the process protocol in terms of calls to ordinary
@@ -71,16 +82,22 @@
7182
7283
:proc - a function that starts a process
7384
:args - a map of param->val which will be passed to the process ctor
74-
:chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n
85+
:chan-opts - a map of io-id->{:keys [buf-or-n xform]},
86+
where io-id is an input/output name, and buf-or-n
7587
and xform have their meanings per core.async/chan
76-
the default is {:buf-or-n 10}
88+
the default for inputs and outputs is {:buf-or-n 10}
7789
7890
:conns - a collection of [[from-pid outid] [to-pid inid]] tuples.
7991
8092
Inputs and outputs support multiple connections. When an output is
81-
connected multiple times every connection will get every message,
82-
as per a core.async/mult.
93+
connected multiple times every connection will get every message, as
94+
per a core.async/mult. Note that non-multed outputs do not have
95+
corresponding channels and thus any chan-opts will be ignored.
8396
97+
Broadcast signals are conveyed to a process via a channel with an
98+
async/sliding-buffer of size 100, thus signals not handled in a
99+
timely manner will be dropped in favor of later arriving signals.
100+
84101
:mixed-exec/:io-exec/:compute-exec -> ExecutorService
85102
These can be used to specify the ExecutorService to use for the
86103
corresonding workload, in lieu of the lib defaults.
@@ -136,9 +153,11 @@
136153
(g/ping-proc g pid timeout-ms))
137154

138155
(defn inject
139-
"asynchronously puts the messages on the channel corresponding to the
140-
input or output of the process, returning a future that will
141-
complete when done."
156+
"asynchronously puts the messages on the channel corresponding to
157+
the input or output of the process, returning a future that will
158+
complete when done. You can broadcast messages on a signal using the
159+
special coord [::flow/cast a-signal-id]. Note that signals cannot be
160+
sent to a particular process."
142161
[g [pid io-id :as coord] msgs] (g/inject g coord msgs))
143162

144163
(defn process
@@ -160,20 +179,25 @@
160179
datafy.
161180
162181
arity 0 - 'describe', () -> description
163-
where description is a map with keys :params :ins and :outs, each of which
164-
in turn is a map of keyword to doc string, and :workload with
165-
possible values of :mixed :io :compute. All entries in the describe
166-
return map are optional.
182+
where description is a map with possible keys:
183+
:params :ins and :outs, each of which in turn is a map of keyword to doc string
184+
:signal-select - a predicate of a signal-id. Messages on approved
185+
signals will appear in the transform arity (see below)
186+
For the simple case of enumerated signal-ids, use a set,
187+
e.g. #{:this/signal :that/signal}
188+
If no :signal-select is provided, no signals will be received
189+
:workload with possible values of :mixed :io :compute.
190+
All entries in the describe return map are optional.
167191
168192
:params describes the initial arguments to setup the state for the function.
169-
:ins enumerates the input[s], for which the flow will create channels
170-
:outs enumerates the output[s], for which the flow may create channels.
193+
:ins enumerates the process input[s], for which the flow will create channels
194+
:outs enumerates the process output[s], for which the flow _may_ create channels.
171195
:workload - describes the nature of the workload, one of :mixed :io or :compute
172196
an :io workload should not do extended computation
173197
a :compute workload should never block
174198
175-
No key may be present in both :ins and :outs, allowing for a uniform
176-
channel coordinate system of [:process-id :channel-id]. The
199+
No io-id key may be present in both :ins and :outs, allowing for a
200+
uniform channel coordinate system of [:process-id :channel-id]. The
177201
ins/outs/params returned will be the ins/outs/params of the
178202
process. describe may be called by users to understand how to use
179203
the proc. It will also be called by the impl in order to discover
@@ -213,16 +237,23 @@
213237
process will no longer be used following that. See the SPI for
214238
details. state' will be the state supplied to subsequent calls.
215239
216-
arity 3 - 'transform', (state in-name msg) -> [state' output]
240+
arity 3 - 'transform', (state in-or-signal-id msg) -> [state' output]
217241
where output is a map of outid->[msgs*]
218242
219-
The transform arity will be called every time a message arrives at any
220-
of the inputs. Output can be sent to none, any or all of the :outs
221-
enumerated, and/or an input named by a [pid inid] tuple (e.g. for
222-
reply-to), and/or to the ::flow/report output. A step need not
223-
output at all (output or msgs can be empyt/nil), however an output _message_
224-
may never be nil (per core.async channels). state' will be the state
225-
supplied to subsequent calls.
243+
The transform arity will be called every time a message arrives at
244+
any of the inputs or signals (selected via :signal-select in
245+
describe), identified by the id. Output can be sent to none, any or
246+
all of the :outs enumerated, and/or an input named by a [pid in-id]
247+
coord tuple (e.g. for reply-to), and/or to the ::flow/report
248+
output.
249+
250+
You can broadcast output to all processes selecting a signal via
251+
the special coord [::flow/cast a-signal-id] Note that signals cannot
252+
be sent to a particular process.
253+
254+
A step need not output at all (output or msgs can be empty/nil),
255+
however an output _message_ may never be nil (per core.async
256+
channels). state' will be the state supplied to subsequent calls.
226257
227258
process also accepts an option map with keys:
228259
:workload - one of :mixed, :io or :compute

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 57 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
(.submit e ^Callable #(apply f args)))))
3535

3636
(defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}]
37-
(let [{:keys [ins outs]} (spi/describe proc)
37+
(let [{:keys [ins outs signal-select]} (spi/describe proc)
3838
copts (fn [cs]
3939
(zipmap (keys cs) (map #(chan-opts %) (keys cs))))
4040
inopts (copts ins)
@@ -43,7 +43,7 @@
4343
(some (partial contains? outopts) (keys inopts)))
4444
(throw (ex-info ":ins and :outs cannot share ids within a process"
4545
{:pid pid :ins (keys inopts) :outs (keys outopts)})))
46-
(assoc ret pid {:pid pid :proc proc :ins inopts :outs outopts :args args})))
46+
(assoc ret pid {:pid pid :proc proc :ins inopts :outs outopts :args args :signal-select signal-select})))
4747

4848
(defn create-flow
4949
"see lib ns for docs"
@@ -119,7 +119,18 @@
119119
(needs-mult? coord conns) (make-chan co)
120120
;;direct connect 1:1
121121
:else (in-chans (first conns)))))
122-
outopts))
122+
outopts))
123+
;;pid->{:select :chan}
124+
castees (reduce (fn [ret {:keys [pid signal-select]}]
125+
(assoc ret pid
126+
{:select signal-select
127+
:chan (async/chan (async/sliding-buffer 100))}))
128+
{} (vals pdescs))
129+
cast (fn [sigid msgs]
130+
(doseq [{:keys [select chan]} (vals castees)]
131+
(when (and select (select sigid))
132+
(doseq [m msgs]
133+
(async/>!! chan [sigid m])))))
123134
;;mults
124135
_ (doseq [[out ins] conn-map]
125136
(when (needs-mult? out ins)
@@ -128,7 +139,7 @@
128139
(async/tap m (in-chans in))))))
129140
write-chan #(if-let [[_ c] (or (find in-chans %) (find out-chans %))]
130141
c
131-
(throw (ex-info "can't resolve channel with coord" {:coord %})))
142+
(throw (ex-info "can't resolve channel with io-id" {:io-id %})))
132143
resolver (reify spi/Resolver
133144
(get-write-chan [_ coord]
134145
(write-chan coord))
@@ -139,9 +150,11 @@
139150
(let [chan-map (fn [ks coll] (zipmap (keys ks) (map #(coll [pid %]) (keys ks))))
140151
control-tap (async/chan 10)]
141152
(async/tap control-mult control-tap)
142-
(spi/start proc {:pid pid :args (assoc args ::flow/pid pid) :resolver resolver
153+
(spi/start proc {:pid pid :args (assoc args ::flow/pid pid)
154+
:resolver resolver :cast cast
143155
:ins (assoc (chan-map ins in-chans)
144-
::flow/control control-tap)
156+
::flow/control control-tap
157+
::flow/casts (-> pid castees :chan))
145158
:outs (assoc (chan-map outs out-chans)
146159
::flow/error error-chan
147160
::flow/report report-chan)}))
@@ -151,7 +164,7 @@
151164
(doseq [p (vals pdescs)]
152165
(start-proc p))
153166
;;the only connection to a running flow is via channels
154-
(reset! chans {:control control-chan :resolver resolver
167+
(reset! chans {:control control-chan :resolver resolver :cast cast
155168
:report report-chan, :error error-chan
156169
:ins in-chans, :outs out-chans})
157170
{:report-chan report-chan :error-chan error-chan}))
@@ -172,12 +185,14 @@
172185
(pause-proc [_ pid] (send-command ::flow/pause pid))
173186
(resume-proc [_ pid] (send-command ::flow/resume pid))
174187
(ping-proc [_ pid timeout-ms] (handle-ping pid timeout-ms))
175-
(inject [_ coord msgs]
176-
(let [{:keys [resolver]} (running-chans)
177-
chan (spi/get-write-chan resolver coord)]
178-
((futurize #(doseq [m msgs]
179-
(async/>!! chan m))
180-
{:exec :io})))))))
188+
(inject [_ [target id :as coord] msgs]
189+
(let [{:keys [resolver cast]} (running-chans)
190+
do-io (if (= target ::flow/cast)
191+
#(cast id msgs)
192+
(let [chan (spi/get-write-chan resolver coord)]
193+
#(doseq [m msgs]
194+
(async/>!! chan m))))]
195+
((futurize do-io {:exec :io})))))))
181196

182197
(defn handle-command
183198
[pid pong status cmd]
@@ -199,26 +214,29 @@
199214
:paused ::flow/pause))
200215
state))
201216

202-
(defn send-outputs [status state outputs outs resolver control handle-command transition]
217+
(defn send-outputs [status state outputs outs resolver control handle-command transition cast]
203218
(loop [nstatus status, nstate state, outputs (seq outputs)]
204219
(if (or (nil? outputs) (= nstatus :exit))
205220
[nstatus nstate]
206221
(let [[output msgs] (first outputs)]
207-
(if-let [outc (or (outs output) (spi/get-write-chan resolver output))]
208-
(let [[nstatus nstate]
209-
(loop [nstatus nstatus, nstate nstate, msgs (seq msgs)]
210-
(if (or (nil? msgs) (= nstatus :exit))
211-
[nstatus nstate]
212-
(let [[v c] (async/alts!!
213-
[control [outc (first msgs)]]
214-
:priority true)]
215-
(if (= c control)
216-
(let [nnstatus (handle-command nstatus v)
217-
nnstate (handle-transition transition nstatus nnstatus nstate)]
218-
(recur nnstatus nnstate msgs))
219-
(recur nstatus nstate (next msgs))))))]
220-
(recur nstatus nstate (next outputs)))
221-
(recur nstatus nstate (next outputs)))))))
222+
(if (and (vector? output) (= (first output) ::flow/cast))
223+
(do (cast (second output) msgs)
224+
(recur nstatus nstate (next outputs)))
225+
(if-let [outc (or (outs output) (spi/get-write-chan resolver output))]
226+
(let [[nstatus nstate]
227+
(loop [nstatus nstatus, nstate nstate, msgs (seq msgs)]
228+
(if (or (nil? msgs) (= nstatus :exit))
229+
[nstatus nstate]
230+
(let [[v c] (async/alts!!
231+
[control [outc (first msgs)]]
232+
:priority true)]
233+
(if (= c control)
234+
(let [nnstatus (handle-command nstatus v)
235+
nnstate (handle-transition transition nstatus nnstatus nstate)]
236+
(recur nnstatus nnstate msgs))
237+
(recur nstatus nstate (next msgs))))))]
238+
(recur nstatus nstate (next outputs)))
239+
(recur nstatus nstate (next outputs))))))))
222240

223241
(defn proc
224242
"see lib ns for docs"
@@ -233,7 +251,7 @@
233251
(walk/postwalk datafy {:step step :desc desc})))
234252
spi/ProcLauncher
235253
(describe [_] desc)
236-
(start [_ {:keys [pid args ins outs resolver]}]
254+
(start [_ {:keys [pid args ins outs resolver cast]}]
237255
(assert (or (not params) args) "must provide :args if :params")
238256
(let [transform (if (= workload :compute)
239257
#(.get ^Future ((futurize step {:exec (spi/get-exec resolver :compute)}) %1 %2 %3)
@@ -245,11 +263,12 @@
245263
outs (into (or outs {}) (::flow/out-ports state))
246264
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
247265
control (::flow/control ins)
248-
read-ins (dissoc ins ::flow/control)
266+
casts (::flow/casts ins)
267+
read-ins (dissoc ins ::flow/control ::flow/casts)
249268
run
250269
#(loop [status :paused, state state, count 0, read-ins read-ins]
251270
(let [pong (fn [c]
252-
(let [pins (dissoc ins ::flow/control)
271+
(let [pins (dissoc ins ::flow/control ::flow/casts)
253272
pouts (dissoc outs ::flow/error ::flow/report)]
254273
(async/>!! c (walk/postwalk datafy
255274
#::flow{:pid pid, :status status
@@ -269,18 +288,21 @@
269288
(if (ipred cid)
270289
(conj ret chan)
271290
ret))
272-
[control] read-ins))
291+
[control casts] read-ins))
273292
[msg c] (async/alts!! read-chans :priority true)
274293
cid (io-id c)]
275294
(if (= c control)
276295
(let [nstatus (handle-command status msg)
277296
nstate (handle-transition step status nstatus state)]
278297
[nstatus nstate count read-ins])
279298
(try
280-
(let [[nstate outputs] (transform state cid msg)
299+
(let [[nstate outputs]
300+
(if (= c casts) ;;[sigid msg]
301+
(transform state (first msg) (second msg))
302+
(transform state cid msg))
281303
[nstatus nstate]
282304
(send-outputs status nstate outputs outs
283-
resolver control handle-command step)]
305+
resolver control handle-command step cast)]
284306
[nstatus nstate (inc count) (if (some? msg)
285307
read-ins
286308
(dissoc read-ins cid))])

0 commit comments

Comments
 (0)