Skip to content

Static core clj #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 30, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions language-adaptors/rxjava-clojure/README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,57 @@
# Clojure Adaptor for RxJava

This adaptor provides functions and macros to ease Clojure/RxJava interop. In particular, there are functions and macros for turning Clojure functions and code into RxJava `Func*` and `Action*` interfaces without the tedium of manually reifying the interfaces.

This adaptor allows 'fn' functions to be used and RxJava will know how to invoke them.
# Basic Usage

This enables code such as:
## Requiring the interop namespace
The first thing to do is to require the namespace:

```clojure
(->
(Observable/toObservable ["one" "two" "three"])
(.take 2)
(.subscribe (fn [arg] (println arg))))
(ns my.namespace
(:require [rx.lang.clojure.interop :as rx])
(:import [rx Observable]))
```

This still dependes on Clojure using Java interop against the Java API.
or, at the REPL:

A future goal is a Clojure wrapper to expose the functions in a more idiomatic way.
```clojure
(require '[rx.lang.clojure.interop :as rx])
```

## Using rx/fn
Once the namespace is required, you can use the `rx/fn` macro anywhere RxJava wants a `rx.util.functions.Func` object. The syntax is exactly the same as `clojure.core/fn`:

```clojure
(-> my-observable
(.map (rx/fn [v] (* 2 v))))
```

If you already have a plain old Clojure function you'd like to use, you can pass it to the `rx/fn*` function to get a new object that implements `rx.util.functions.Func`:

```clojure
(-> my-numbers
(.reduce (rx/fn* +)))
```

## Using rx/action
The `rx/action` macro is identical to `rx/fn` except that the object returned implements `rx.util.functions.Action` interfaces. It's used in `subscribe` and other side-effect-y contexts:

```clojure
(-> my-observable
(.map (rx/fn* transform-data))
(.finallyDo (rx/action [] (println "Finished transform")))
(.subscribe (rx/action [v] (println "Got value" v))
(rx/action [e] (println "Get error" e))
(rx/action [] (println "Sequence complete"))))
```

# Gotchas
Here are a few things to keep in mind when using this interop:

* Keep in mind the (mostly empty) distinction between `Func` and `Action` and which is used in which contexts
* If there are multiple Java methods overloaded by `Func` arity, you'll need to use a type hint to let the compiler know which one to choose.
* Methods that take a predicate (like filter) expect the predicate to return a boolean value. A function that returns a non-boolean value will result in a `ClassCastException`.

# Binaries

Expand Down
9 changes: 3 additions & 6 deletions language-adaptors/rxjava-clojure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ apply plugin: 'osgi'
dependencies {
compile project(':rxjava-core')

provided 'junit:junit-dep:4.10'
provided 'org.mockito:mockito-core:1.8.5'

// clojure
compile 'org.clojure:clojure:1.4.+'
compile 'clj-http:clj-http:0.6.4' // https://clojars.org/clj-http
//compile 'clj-http:clj-http:0.6.4' // https://clojars.org/clj-http
}

/*
Expand All @@ -20,7 +17,7 @@ warnOnReflection = true

buildscript {
repositories { maven { url "http://clojars.org/repo" } }
dependencies { classpath "clojuresque:clojuresque:1.5.4" }
dependencies { classpath "clojuresque:clojuresque:1.5.8" }
}

repositories {
Expand Down Expand Up @@ -52,4 +49,4 @@ jar {
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
(ns rx.lang.clojure.examples.http-examples
(:require [rx.lang.clojure.interop :as rx]
[clj-http.client :as http])
(:import rx.Observable rx.subscriptions.Subscriptions))

; NOTE on naming conventions. I'm using camelCase names (against clojure convention)
; in this file as I'm purposefully keeping functions and methods across
; different language implementations in-sync for easy comparison.

(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously.

return Observable<String> of HTML"
(Observable/create
(rx/fn [observer]
(let [f (future
(doseq [articleName wikipediaArticleNames]
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
; after sending response to onnext we complete the sequence
(-> observer .onCompleted))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create (rx/action [] (future-cancel f)))))))

; To see output
(comment
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
(.subscribe (rx/action [v] (println "--- Article ---\n" (subs (:body v) 0 125) "...")))))


; --------------------------------------------------
; Error Handling
; --------------------------------------------------

(defn fetchWikipediaArticleAsynchronouslyWithErrorHandling [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously
with proper error handling.

return Observable<String> of HTML"
(Observable/create
(rx/fn [observer]
(let [f (future
(try
(doseq [articleName wikipediaArticleNames]
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
;(catch Exception e (prn "exception")))
(catch Exception e (-> observer (.onError e))))
; after sending response to onNext we complete the sequence
(-> observer .onCompleted))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create (rx/action [] (future-cancel f)))))))

; To see output
(comment
(-> (fetchWikipediaArticleAsynchronouslyWithErrorHandling ["Tiger" "NonExistentTitle" "Elephant"])
(.subscribe (rx/action [v] (println "--- Article ---\n" (subs (:body v) 0 125) "..."))
(rx/action [e] (println "--- Error ---\n" (.getMessage e))))))


Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns rx.lang.clojure.examples.rx-examples
(:import rx.Observable rx.subscriptions.Subscriptions)
(:require [clj-http.client :as http]))
(:require [rx.lang.clojure.interop :as rx])
(:import rx.Observable rx.subscriptions.Subscriptions))

; NOTE on naming conventions. I'm using camelCase names (against clojure convention)
; in this file as I'm purposefully keeping functions and methods across
Expand All @@ -12,8 +12,8 @@

(defn hello
[& args]
(-> (Observable/toObservable args)
(.subscribe #(println (str "Hello " % "!")))))
(-> (Observable/from args)
(.subscribe (rx/action [v] (println (str "Hello " v "!"))))))

; To see output
(comment
Expand All @@ -23,22 +23,13 @@
; Create Observable from Existing Data
; --------------------------------------------------

(defn existingDataFromNumbers []
(Observable/toObservable [1 2 3 4 5 6]))

(defn existingDataFromNumbersUsingFrom []
(Observable/from [1 2 3 4 5 6]))

(defn existingDataFromObjects []
(Observable/toObservable ["a" "b" "c"]))

(defn existingDataFromObjectsUsingFrom []
(Observable/from ["a" "b" "c"]))

(defn existingDataFromList []
(let [list [5, 6, 7, 8]]
(Observable/toObservable list)))

(defn existingDataFromListUsingFrom []
(let [list [5, 6, 7, 8]]
(Observable/from list)))
Expand All @@ -56,7 +47,7 @@

returns Observable<String>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(doseq [x (range 50)] (-> observer (.onNext (str "value_" x))))
; after sending all values we complete the sequence
(-> observer .onCompleted)
Expand All @@ -66,46 +57,26 @@

; To see output
(comment
(.subscribe (customObservableBlocking) println))
(.subscribe (customObservableBlocking) (rx/action* println)))

(defn customObservableNonBlocking []
"This example shows a custom Observable that does not block
when subscribed to as it spawns a separate thread.

returns Observable<String>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(let [f (future
(doseq [x (range 50)]
(-> observer (.onNext (str "anotherValue_" x))))
; after sending all values we complete the sequence
(-> observer .onCompleted))]
; return a subscription that cancels the future
(Subscriptions/create #(future-cancel f))))))

; To see output
(comment
(.subscribe (customObservableNonBlocking) println))


(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously.

return Observable<String> of HTML"
(Observable/create
(fn [observer]
(let [f (future
(doseq [articleName wikipediaArticleNames]
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
; after sending response to onnext we complete the sequence
(-> observer .onCompleted))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))
(Subscriptions/create (rx/action [] (future-cancel f)))))))

; To see output
(comment
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "..."))))
(.subscribe (customObservableNonBlocking) (rx/action* println)))


; --------------------------------------------------
Expand All @@ -119,8 +90,8 @@
(customObservableNonBlocking)
(.skip 10)
(.take 5)
(.map #(str % "_transformed"))
(.subscribe #(println "onNext =>" %))))
(.map (rx/fn [v] (str v "_transformed")))
(.subscribe (rx/action [v] (println "onNext =>" v)))))

; To see output
(comment
Expand All @@ -136,7 +107,7 @@

return Observable<Map>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(let [f (future
(try
; simulate fetching user data via network service call with latency
Expand All @@ -147,14 +118,14 @@
(-> observer .onCompleted)
(catch Exception e (-> observer (.onError e))))) ]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))
(Subscriptions/create (rx/action [] (future-cancel f)))))))

(defn getVideoBookmark [userId, videoId]
"Asynchronously fetch bookmark for video

return Observable<Integer>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(let [f (future
(try
; simulate fetching user data via network service call with latency
Expand All @@ -165,13 +136,13 @@
(-> observer .onCompleted)
(catch Exception e (-> observer (.onError e)))))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))
(Subscriptions/create (rx/action [] (future-cancel f)))))))

(defn getVideoMetadata [videoId, preferredLanguage]
"Asynchronously fetch movie metadata for a given language
return Observable<Map>"
(Observable/create
(fn [observer]
(rx/fn [observer]
(let [f (future
(try
; simulate fetching video data via network service call with latency
Expand All @@ -190,7 +161,7 @@
(-> observer .onCompleted)
(catch Exception e (-> observer (.onError e))))) ]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))
(Subscriptions/create (rx/action [] (future-cancel f)))))))


(defn getVideoForUser [userId videoId]
Expand All @@ -200,24 +171,24 @@
- user data
return Observable<Map>"
(let [user-observable (-> (getUser userId)
(.map (fn [user] {:user-name (:name user)
(.map (rx/fn [user] {:user-name (:name user)
:language (:preferred-language user)})))
bookmark-observable (-> (getVideoBookmark userId videoId)
(.map (fn [bookmark] {:viewed-position (:position bookmark)})))
(.map (rx/fn [bookmark] {:viewed-position (:position bookmark)})))
; getVideoMetadata requires :language from user-observable so nest inside map function
video-metadata-observable (-> user-observable
(.mapMany
; fetch metadata after a response from user-observable is received
(fn [user-map]
(rx/fn [user-map]
(getVideoMetadata videoId (:language user-map)))))]
; now combine 3 async sequences using zip
(-> (Observable/zip bookmark-observable video-metadata-observable user-observable
(fn [bookmark-map metadata-map user-map]
(rx/fn [bookmark-map metadata-map user-map]
{:bookmark-map bookmark-map
:metadata-map metadata-map
:user-map user-map}))
; and transform into a single response object
(.map (fn [data]
(.map (rx/fn [data]
{:video-id videoId
:video-metadata (:metadata-map data)
:user-id userId
Expand All @@ -231,37 +202,7 @@
(comment
(-> (getVideoForUser 12345 78965)
(.subscribe
(fn [x] (println "--- Object ---\n" x))
(fn [e] (println "--- Error ---\n" e))
(fn [] (println "--- Completed ---")))))


; --------------------------------------------------
; Error Handling
; --------------------------------------------------

(defn fetchWikipediaArticleAsynchronouslyWithErrorHandling [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously
with proper error handling.

return Observable<String> of HTML"
(Observable/create
(fn [observer]
(let [f (future
(try
(doseq [articleName wikipediaArticleNames]
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
;(catch Exception e (prn "exception")))
(catch Exception e (-> observer (.onError e))))
; after sending response to onNext we complete the sequence
(-> observer .onCompleted))]
; a subscription that cancels the future if unsubscribed
(Subscriptions/create #(future-cancel f))))))

; To see output
(comment
(-> (fetchWikipediaArticleAsynchronouslyWithErrorHandling ["Tiger" "NonExistentTitle" "Elephant"])
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")
#(println "--- Error ---\n" (.getMessage %)))))

(rx/action [x] (println "--- Object ---\n" x))
(rx/action [e] (println "--- Error ---\n" e))
(rx/action [] (println "--- Completed ---")))))

Loading