Skip to content

Commit a5bf8ba

Browse files
clojure example code
- examples are in /src/examples/clojure - placeholder for unit tests in /src/tests
1 parent 456f512 commit a5bf8ba

File tree

3 files changed

+287
-0
lines changed

3 files changed

+287
-0
lines changed

language-adaptors/rxjava-clojure/build.gradle

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,21 @@ dependencies {
99
provided 'org.mockito:mockito-core:1.8.5'
1010
}
1111

12+
// include /src/examples folder
13+
sourceSets {
14+
examples
15+
}
16+
17+
// make 'examples' use the same classpath
18+
configurations {
19+
examplesCompile.extendsFrom compile
20+
examplesRuntime.extendsFrom runtime
21+
}
22+
23+
// include 'examples' in build task
24+
build.dependsOn examplesClasses
25+
26+
// setup Eclipse
1227
eclipse {
1328
classpath {
1429
//you can tweak the classpath of the Eclipse project by adding extra configurations:
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
(ns rx.lang.clojure.examples.rx-examples
2+
(import rx.Observable)
3+
(:require [clj-http.client :as http]))
4+
5+
; NOTE on naming conventions. I'm using camelCase names (against clojure convention)
6+
; in this file as I'm purposefully keeping functions and methods across
7+
; different language implementations in-sync for easy comparison.
8+
9+
; --------------------------------------------------
10+
; Hello World!
11+
; --------------------------------------------------
12+
13+
(defn hello
14+
[& args]
15+
(-> (Observable/toObservable args)
16+
(.subscribe #(println (str "Hello " % "!")))))
17+
18+
; To see output
19+
(comment
20+
(hello "Ben" "George"))
21+
22+
; --------------------------------------------------
23+
; Create Observable from Existing Data
24+
; --------------------------------------------------
25+
26+
(defn existingDataFromNumbers []
27+
(Observable/toObservable [1 2 3 4 5 6]))
28+
29+
(defn existingDataFromNumbersUsingFrom []
30+
(Observable/from [1 2 3 4 5 6]))
31+
32+
(defn existingDataFromObjects []
33+
(Observable/toObservable ["a" "b" "c"]))
34+
35+
(defn existingDataFromObjectsUsingFrom []
36+
(Observable/from ["a" "b" "c"]))
37+
38+
(defn existingDataFromList []
39+
(let [list [5, 6, 7, 8]]
40+
(Observable/toObservable list)))
41+
42+
(defn existingDataFromListUsingFrom []
43+
(let [list [5, 6, 7, 8]]
44+
(Observable/from list)))
45+
46+
(defn existingDataWithJust []
47+
(Observable/just "one object"))
48+
49+
; --------------------------------------------------
50+
; Custom Observable
51+
; --------------------------------------------------
52+
53+
(defn customObservableBlocking []
54+
"This example shows a custom Observable that blocks
55+
when subscribed to (does not spawn an extra thread).
56+
57+
returns Observable<String>"
58+
(Observable/create
59+
(fn [observer]
60+
(doseq [x (range 50)] (-> observer (.onNext (str "value_" x))))
61+
; after sending all values we complete the sequence
62+
(-> observer .onCompleted)
63+
; return a NoOpSubsription since this blocks and thus
64+
; can't be unsubscribed from
65+
(Observable/noOpSubscription))))
66+
67+
; To see output
68+
(comment
69+
(.subscribe (customObservableBlocking) println))
70+
71+
(defn customObservableNonBlocking []
72+
"This example shows a custom Observable that does not block
73+
when subscribed to as it spawns a separate thread.
74+
75+
returns Observable<String>"
76+
(Observable/create
77+
(fn [observer]
78+
(let [f (future
79+
(doseq [x (range 50)]
80+
(-> observer (.onNext (str "anotherValue_" x))))
81+
; after sending all values we complete the sequence
82+
(-> observer .onCompleted))]
83+
; return a subscription that cancels the future
84+
(Observable/createSubscription #(future-cancel f))))))
85+
86+
; To see output
87+
(comment
88+
(.subscribe (customObservableNonBlocking) println))
89+
90+
91+
(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
92+
"Fetch a list of Wikipedia articles asynchronously.
93+
94+
return Observable<String> of HTML"
95+
(Observable/create
96+
(fn [observer]
97+
(let [f (future
98+
(doseq [articleName wikipediaArticleNames]
99+
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
100+
; after sending response to onnext we complete the sequence
101+
(-> observer .onCompleted))]
102+
; a subscription that cancels the future if unsubscribed
103+
(Observable/createSubscription #(future-cancel f))))))
104+
105+
; To see output
106+
(comment
107+
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
108+
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "..."))))
109+
110+
111+
; --------------------------------------------------
112+
; Composition - Simple
113+
; --------------------------------------------------
114+
115+
(defn simpleComposition []
116+
"Asynchronously calls 'customObservableNonBlocking' and defines
117+
a chain of operators to apply to the callback sequence."
118+
(->
119+
(customObservableNonBlocking)
120+
(.skip 10)
121+
(.take 5)
122+
(.map #(str % "_transformed"))
123+
(.subscribe #(println "onNext =>" %))))
124+
125+
; To see output
126+
(comment
127+
(simpleComposition))
128+
129+
130+
; --------------------------------------------------
131+
; Composition - Multiple async calls combined
132+
; --------------------------------------------------
133+
134+
(defn getUser [userId]
135+
"Asynchronously fetch user data
136+
137+
return Observable<Map>"
138+
(Observable/create
139+
(fn [observer]
140+
(let [f (future
141+
(try
142+
; simulate fetching user data via network service call with latency
143+
(Thread/sleep 60)
144+
(-> observer (.onNext {:user-id userId
145+
:name "Sam Harris"
146+
:preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") }))
147+
(-> observer .onCompleted)
148+
(catch Exception e (-> observer (.onError e))))) ]
149+
; a subscription that cancels the future if unsubscribed
150+
(Observable/createSubscription #(future-cancel f))))))
151+
152+
(defn getVideoBookmark [userId, videoId]
153+
"Asynchronously fetch bookmark for video
154+
155+
return Observable<Integer>"
156+
(Observable/create
157+
(fn [observer]
158+
(let [f (future
159+
(try
160+
; simulate fetching user data via network service call with latency
161+
(Thread/sleep 20)
162+
(-> observer (.onNext {:video-id videoId
163+
; 50/50 chance of giving back position 0 or 0-2500
164+
:position (if (= 0 (rand-int 2)) 0 (rand-int 2500))}))
165+
(-> observer .onCompleted)
166+
(catch Exception e (-> observer (.onError e)))))]
167+
; a subscription that cancels the future if unsubscribed
168+
(Observable/createSubscription #(future-cancel f))))))
169+
170+
(defn getVideoMetadata [videoId, preferredLanguage]
171+
"Asynchronously fetch movie metadata for a given language
172+
return Observable<Map>"
173+
(Observable/create
174+
(fn [observer]
175+
(let [f (future
176+
(try
177+
; simulate fetching video data via network service call with latency
178+
(Thread/sleep 50)
179+
; contrived metadata for en-us or es-us
180+
(if (= "en-us" preferredLanguage)
181+
(-> observer (.onNext {:video-id videoId
182+
:title "House of Cards: Episode 1"
183+
:director "David Fincher"
184+
:duration 3365})))
185+
(if (= "es-us" preferredLanguage)
186+
(-> observer (.onNext {:video-id videoId
187+
:title "Cámara de Tarjetas: Episodio 1"
188+
:director "David Fincher"
189+
:duration 3365})))
190+
(-> observer .onCompleted)
191+
(catch Exception e (-> observer (.onError e))))) ]
192+
; a subscription that cancels the future if unsubscribed
193+
(Observable/createSubscription #(future-cancel f))))))
194+
195+
196+
(defn getVideoForUser [userId videoId]
197+
"Get video metadata for a given userId
198+
- video metadata
199+
- video bookmark position
200+
- user data
201+
return Observable<Map>"
202+
(let [user-observable (-> (getUser userId)
203+
(.map (fn [user] {:user-name (:name user)
204+
:language (:preferred-language user)})))
205+
bookmark-observable (-> (getVideoBookmark userId videoId)
206+
(.map (fn [bookmark] {:viewed-position (:position bookmark)})))
207+
; getVideoMetadata requires :language from user-observable so nest inside map function
208+
video-metadata-observable (-> user-observable
209+
(.mapMany
210+
; fetch metadata after a response from user-observable is received
211+
(fn [user-map]
212+
(getVideoMetadata videoId (:language user-map)))))]
213+
; now combine 3 async sequences using zip
214+
(-> (Observable/zip bookmark-observable video-metadata-observable user-observable
215+
(fn [bookmark-map metadata-map user-map]
216+
{:bookmark-map bookmark-map
217+
:metadata-map metadata-map
218+
:user-map user-map}))
219+
; and transform into a single response object
220+
(.map (fn [data]
221+
{:video-id videoId
222+
:video-metadata (:metadata-map data)
223+
:user-id userId
224+
:language (:language (:user-map data))
225+
:bookmark (:viewed-position (:bookmark-map data)) })))))
226+
227+
; To see output like this:
228+
; {:video-id 78965, :video-metadata {:video-id 78965, :title Cámara de Tarjetas: Episodio 1,
229+
; :director David Fincher, :duration 3365}, :user-id 12345, :language es-us, :bookmark 0}
230+
;
231+
(comment
232+
(-> (getVideoForUser 12345 78965)
233+
(.subscribe
234+
(fn [x] (println "--- Object ---\n" x))
235+
(fn [e] (println "--- Error ---\n" e))
236+
(fn [] (println "--- Completed ---")))))
237+
238+
239+
; --------------------------------------------------
240+
; Error Handling
241+
; --------------------------------------------------
242+
243+
(defn fetchWikipediaArticleAsynchronouslyWithErrorHandling [wikipediaArticleNames]
244+
"Fetch a list of Wikipedia articles asynchronously
245+
with proper error handling.
246+
247+
return Observable<String> of HTML"
248+
(Observable/create
249+
(fn [observer]
250+
(let [f (future
251+
(try
252+
(doseq [articleName wikipediaArticleNames]
253+
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
254+
;(catch Exception e (prn "exception")))
255+
(catch Exception e (-> observer (.onError e))))
256+
; after sending response to onNext we complete the sequence
257+
(-> observer .onCompleted))]
258+
; a subscription that cancels the future if unsubscribed
259+
(Observable/createSubscription #(future-cancel f))))))
260+
261+
; To see output
262+
(comment
263+
(-> (fetchWikipediaArticleAsynchronouslyWithErrorHandling ["Tiger" "NonExistentTitle" "Elephant"])
264+
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")
265+
#(println "--- Error ---\n" (.getMessage %)))))
266+
267+
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
(ns rx.lang.clojure.examples.rx-examples
2+
(import rx.Observable))
3+
4+
;; still need to get this wired up in build.gradle to run as tests
5+
; (-> (rx.Observable/toObservable [\"one\" \"two\" \"three\"]) (.take 2) (.subscribe (fn [arg] (println arg))))

0 commit comments

Comments
 (0)