2020 [clojure.tools.analyzer.passes.jvm.warn-on-reflection :refer [warn-on-reflection]]
2121 [clojure.tools.analyzer.jvm :as an-jvm]
2222 [clojure.core.async.impl.protocols :as impl]
23- [clojure.core.async.impl.dispatch :as dispatch]
24- [clojure.core.async.impl.runtime :as rt]
2523 [clojure.set :as set])
26- (:import [java.util.concurrent.atomic AtomicReferenceArray ]
27- [clojure.lang Var ]))
24+ (:import [java.util.concurrent.locks Lock ]
25+ [java.util.concurrent.atomic AtomicReferenceArray ]))
2826
2927(defn debug [x]
3028 (pprint x)
3129 x )
3230
31+ (def ^{:const true :tag 'long} FN-IDX 0 )
32+ (def ^{:const true :tag 'long} STATE-IDX 1 )
33+ (def ^{:const true :tag 'long} VALUE-IDX 2 )
34+ (def ^{:const true :tag 'long} BINDINGS-IDX 3 )
35+ (def ^{:const true :tag 'long} EXCEPTION-FRAMES 4 )
36+ (def ^{:const true :tag 'long} USER-START-IDX 5 )
37+
38+ (defn aset-object [^AtomicReferenceArray arr ^long idx o]
39+ (.set arr idx o))
40+
41+ (defn aget-object [^AtomicReferenceArray arr ^long idx]
42+ (.get arr idx))
43+
44+ (defmacro aset-all!
45+ [arr & more]
46+ (assert (even? (count more)) " Must give an even number of args to aset-all!" )
47+ (let [bindings (partition 2 more)
48+ arr-sym (gensym " statearr-" )]
49+ `(let [~arr-sym ~arr]
50+ ~@(map
51+ (fn [[idx val]]
52+ `(aset-object ~arr-sym ~idx ~val))
53+ bindings)
54+ ~arr-sym)))
55+
3356; ; State monad stuff, used only in SSA construction
3457
3558(defmacro gen-plan
194217 IEmittableInstruction
195218 (emit-instruction [this state-sym]
196219 (if (= value ::value )
197- `[~(:id this) (rt/ aget-object ~state-sym ~rt/ VALUE-IDX)]
220+ `[~(:id this) (aget-object ~state-sym ~VALUE-IDX)]
198221 `[~(:id this) ~value])))
199222
200223(defrecord RawCode [ast locals]
294317 (terminate-block [_this state-sym _]
295318 `(do (case ~val-id
296319 ~@(concat (mapcat (fn [test blk]
297- `[~test (rt/ aset-all! ~state-sym ~rt/ STATE-IDX ~blk)])
320+ `[~test (aset-all! ~state-sym ~STATE-IDX ~blk)])
298321 test-vals jmp-blocks)
299322 (when default-block
300- `[(do (rt/ aset-all! ~state-sym ~rt/ STATE-IDX ~default-block)
323+ `[(do (aset-all! ~state-sym ~STATE-IDX ~default-block)
301324 :recur )])))
302325 :recur )))
303326
328351 (block-references [_this] [block])
329352 ITerminator
330353 (terminate-block [_this state-sym _]
331- `(do (rt/ aset-all! ~state-sym ~rt/ VALUE-IDX ~value ~rt/ STATE-IDX ~block)
354+ `(do (aset-all! ~state-sym ~VALUE-IDX ~value ~STATE-IDX ~block)
332355 :recur )))
333356
334357(defrecord Return [value]
341364 (terminate-block [this state-sym custom-terminators]
342365 (if-let [f (get custom-terminators (terminator-code this))]
343366 `(~f ~state-sym ~value)
344- `(do (rt/ aset-all! ~state-sym ~rt/ VALUE-IDX ~value ~rt/ STATE-IDX ::finished )
367+ `(do (aset-all! ~state-sym ~VALUE-IDX ~value ~STATE-IDX ::finished )
345368 nil ))))
346369
347370(defrecord CondBr [test then-block else-block]
352375 ITerminator
353376 (terminate-block [_this state-sym _]
354377 `(do (if ~test
355- (rt/ aset-all! ~state-sym ~rt/ STATE-IDX ~then-block)
356- (rt/ aset-all! ~state-sym ~rt/ STATE-IDX ~else-block))
378+ (aset-all! ~state-sym ~STATE-IDX ~then-block)
379+ (aset-all! ~state-sym ~STATE-IDX ~else-block))
357380 :recur )))
358381
359382(defrecord PushTry [catch-block]
363386 (block-references [_this] [catch-block])
364387 IEmittableInstruction
365388 (emit-instruction [_this state-sym]
366- `[~'_ (rt/ aset-all! ~state-sym ~rt/ EXCEPTION-FRAMES (cons ~catch-block (rt/ aget-object ~state-sym ~rt/ EXCEPTION-FRAMES)))]))
389+ `[~'_ (aset-all! ~state-sym ~EXCEPTION-FRAMES (cons ~catch-block (aget-object ~state-sym ~EXCEPTION-FRAMES)))]))
367390
368391(defrecord PopTry []
369392 IInstruction
372395 (block-references [_this] [])
373396 IEmittableInstruction
374397 (emit-instruction [_this state-sym]
375- `[~'_ (rt/ aset-all! ~state-sym ~rt/ EXCEPTION-FRAMES (rest (rt/ aget-object ~state-sym ~rt/ EXCEPTION-FRAMES)))]))
398+ `[~'_ (aset-all! ~state-sym ~EXCEPTION-FRAMES (rest (aget-object ~state-sym ~EXCEPTION-FRAMES)))]))
376399
377400(defrecord CatchHandler [catches]
378401 IInstruction
382405 ITerminator
383406 (terminate-block [_this state-sym _]
384407 (let [ex (gensym 'ex)]
385- `(let [~ex (rt/ aget-object ~state-sym ~rt/ VALUE-IDX)]
408+ `(let [~ex (aget-object ~state-sym ~VALUE-IDX)]
386409 (cond
387410 ~@(for [[handler-idx type] catches
388- i [`(instance? ~type ~ex) `(rt/ aset-all! ~state-sym ~rt/ STATE-IDX ~handler-idx)]]
411+ i [`(instance? ~type ~ex) `(aset-all! ~state-sym ~STATE-IDX ~handler-idx)]]
389412 i)
390413 :else (throw ~ex))
391414 :recur ))))
865888 (if (empty? args)
866889 []
867890 (mapcat (fn [sym]
868- `[~sym (rt/ aget-object ~state-sym ~(id-for-inst local-map sym))])
891+ `[~sym (aget-object ~state-sym ~(id-for-inst local-map sym))])
869892 args))))
870893
871894(defn- build-block-body [state-sym blk]
882905 blk)
883906 results (interleave (map (partial id-for-inst local-map) results) results)]
884907 (if-not (empty? results)
885- [state-sym `(rt/ aset-all! ~state-sym ~@results)]
908+ [state-sym `(aset-all! ~state-sym ~@results)]
886909 [])))
887910
888911(defn- emit-state-machine [machine num-user-params custom-terminators]
889912 (let [index (index-state-machine machine)
890913 state-sym (with-meta (gensym " state_" )
891914 {:tag 'objects})
892- local-start-idx (+ num-user-params rt/ USER-START-IDX)
915+ local-start-idx (+ num-user-params USER-START-IDX)
893916 state-arr-size (+ local-start-idx (count-persistent-values index))
894917 local-map (atom {::next-idx local-start-idx})
895918 block-catches (:block-catches machine)]
896919 `(fn state-machine#
897- ([] (rt/ aset-all! (AtomicReferenceArray. ~state-arr-size)
898- ~rt/ FN-IDX state-machine#
899- ~rt/ STATE-IDX ~(:start-block machine)))
920+ ([] (aset-all! (AtomicReferenceArray. ~state-arr-size)
921+ ~FN-IDX state-machine#
922+ ~STATE-IDX ~(:start-block machine)))
900923 ([~state-sym]
901924 (let [old-frame# (clojure.lang.Var/getThreadBindingFrame )
902925 ret-value# (try
903- (clojure.lang.Var/resetThreadBindingFrame (rt/ aget-object ~state-sym ~rt/ BINDINGS-IDX))
926+ (clojure.lang.Var/resetThreadBindingFrame (aget-object ~state-sym ~BINDINGS-IDX))
904927 (loop []
905- (let [result# (case (int (rt/ aget-object ~state-sym ~rt/ STATE-IDX))
928+ (let [result# (case (int (aget-object ~state-sym ~STATE-IDX))
906929 ~@(mapcat
907930 (fn [[id blk]]
908931 [id `(let [~@(concat (build-block-preamble local-map index state-sym blk)
914937 (recur )
915938 result#)))
916939 (catch Throwable ex#
917- (rt/ aset-all! ~state-sym ~rt/ VALUE-IDX ex#)
918- (if (seq (rt/ aget-object ~state-sym ~rt/ EXCEPTION-FRAMES))
919- (rt/ aset-all! ~state-sym ~rt/ STATE-IDX (first (rt/ aget-object ~state-sym ~rt/ EXCEPTION-FRAMES)))
940+ (aset-all! ~state-sym ~VALUE-IDX ex#)
941+ (if (seq (aget-object ~state-sym ~EXCEPTION-FRAMES))
942+ (aset-all! ~state-sym ~STATE-IDX (first (aget-object ~state-sym ~EXCEPTION-FRAMES)))
920943 (throw ex#))
921944 :recur )
922945 (finally
923- (rt/ aset-object ~state-sym ~rt/ BINDINGS-IDX (clojure.lang.Var/getThreadBindingFrame ))
946+ (aset-object ~state-sym ~BINDINGS-IDX (clojure.lang.Var/getThreadBindingFrame ))
924947 (clojure.lang.Var/resetThreadBindingFrame old-frame#)))]
925948 (if (identical? ret-value# :recur )
926949 (recur ~state-sym)
927950 ret-value#))))))
928951
952+ (defn finished?
953+ " Returns true if the machine is in a finished state"
954+ [state-array]
955+ (identical? (aget-object state-array STATE-IDX) ::finished ))
956+
957+ (defn- fn-handler
958+ [f]
959+ (reify
960+ Lock
961+ (lock [_])
962+ (unlock [_])
963+
964+ impl/Handler
965+ (active? [_] true )
966+ (blockable? [_] true )
967+ (lock-id [_] 0 )
968+ (commit [_] f)))
969+
970+
971+ (defn run-state-machine [state]
972+ ((aget-object state FN-IDX) state))
973+
974+ (defn run-state-machine-wrapped [state]
975+ (try
976+ (run-state-machine state)
977+ (catch Throwable ex
978+ (impl/close! (aget-object state USER-START-IDX))
979+ (throw ex))))
980+
981+ (defn take! [state blk c]
982+ (if-let [cb (impl/take! c (fn-handler
983+ (fn [x]
984+ (aset-all! state VALUE-IDX x STATE-IDX blk)
985+ (run-state-machine-wrapped state))))]
986+ (do (aset-all! state VALUE-IDX @cb STATE-IDX blk)
987+ :recur )
988+ nil ))
989+
990+ (defn put! [state blk c val]
991+ (if-let [cb (impl/put! c val (fn-handler (fn [ret-val]
992+ (aset-all! state VALUE-IDX ret-val STATE-IDX blk)
993+ (run-state-machine-wrapped state))))]
994+ (do (aset-all! state VALUE-IDX @cb STATE-IDX blk)
995+ :recur )
996+ nil ))
997+
998+ (defn return-chan [state value]
999+ (let [c (aget-object state USER-START-IDX)]
1000+ (when-not (nil? value)
1001+ (impl/put! c value (fn-handler (fn [_] nil ))))
1002+ (impl/close! c)
1003+ c))
1004+
1005+ (def async-custom-terminators
1006+ {'clojure.core.async/<! `take!
1007+ 'clojure.core.async/>! `put!
1008+ 'clojure.core.async/alts! 'clojure.core.async/ioc-alts!
1009+ :Return `return-chan})
1010+
9291011(defn mark-transitions
9301012 {:pass-info {:walk :post :depends #{} :after an-jvm/default-passes}}
9311013 [{:keys [op fn ] :as ast}]
10281110 (parse-to-state-machine user-transitions)
10291111 second
10301112 (emit-state-machine num-user-params user-transitions))))
1031-
1032- (defn go-impl
1033- [env body]
1034- (let [crossing-env (zipmap (keys env) (repeatedly gensym))]
1035- `(let [c# (clojure.core.async/chan 1 )
1036- captured-bindings# (Var/getThreadBindingFrame )]
1037- (dispatch/run
1038- (^:once fn* []
1039- (let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag ))]) crossing-env)
1040- f# ~(state-machine
1041- `(do ~@body) 1 [crossing-env env] rt/async-custom-terminators)
1042- state# (-> (f# )
1043- (rt/aset-all! rt/USER-START-IDX c#
1044- rt/BINDINGS-IDX captured-bindings#))]
1045- (rt/run-state-machine-wrapped state#))))
1046- c#)))
0 commit comments