Skip to content

Commit 6ca34ed

Browse files
Updated parts 3, 4, and 5
1 parent 8f2fb87 commit 6ca34ed

26 files changed

+1612
-1225
lines changed

3_advanced/.ipynb_checkpoints/3_Join Operations Exercise-checkpoint.ipynb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@
130130
"outputs": [],
131131
"source": [
132132
"# Join the streaming RDD and batch RDDs to filter out bad customers.\n",
133-
"dst = ds.transform(lambda rdd: rdd.join(customer_rdd)).filter(lambda (customer_id, (customer_data, is_good_customer)): is_good_customer)\n",
133+
"\n",
134+
"\n",
134135
"## END OF EXERCISE SECTION ==================================\n",
135136
"dst.pprint()"
136137
]

3_advanced/.ipynb_checkpoints/5_updateStateByKey Exercise-checkpoint.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@
116116
"# - Counts how many times each number between 0 and 9 is seen.\n",
117117
"# - Update the state with the `updateFunction` using updateStateByKey\n",
118118
"\n",
119-
"dst = ds.map(lambda x: int(x) % 10).map(lambda x: (x,1)).updateStateByKey(updateFunction)\n",
119+
"\n",
120120
"\n",
121121
"##======================= END OF EXERCISE SECTION =======================\n",
122122
"\n",

3_advanced/.ipynb_checkpoints/7_Checkpointing Exercise-checkpoint.ipynb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
"outputs": [],
134134
"source": [
135135
"## TODO: define a checkpoint Directory checkpointDir\n",
136-
"checkpointDir = 'checkpoint'\n",
136+
"\n",
137137
"#### END OF THIS EXERCISE SECTION ###### \n",
138138
"\n",
139139
"def functionToCreateContext():\n",
@@ -146,14 +146,14 @@
146146
" ds.pprint()\n",
147147
" ds.count().pprint()\n",
148148
" \n",
149-
" # TODO: Set up checkpoint for the checkpoint directory\n",
150-
" ssc.checkpoint(checkpointDir)\n",
149+
" # TODO: Set up checkpoint for the StreamingContext\n",
150+
"\n",
151151
" #### END OF THIS EXERCISE SECTION ######\n",
152152
" return ssc\n",
153153
"\n",
154154
"## TODO: Create Checkpoint for ssc with the `getOrCreate()` method for streaming contexts\n",
155-
"ssc = StreamingContext.getOrCreate(\n",
156-
" checkpointDir, functionToCreateContext)\n",
155+
"\n",
156+
"\n",
157157
"#### END OF THIS EXERCISE SECTION ######"
158158
]
159159
},

3_advanced/.ipynb_checkpoints/9_Accumulators Exercise-checkpoint.ipynb

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,17 @@
5757
"#Class of functions that will be fed into the accumulator \n",
5858
"class DiffAccumulatorParam(AccumulatorParam):\n",
5959
" #TODO: Define function that zeroes out values in a dictionary using a loop\n",
60-
" def zero(self, value): \n",
61-
" dict1={} \n",
62-
" for i in range(0,len(value)): \n",
63-
" dict1[i]=0 \n",
64-
" return dict1\n",
60+
" def zero(self, value): \n",
61+
" \n",
62+
" \n",
63+
" \n",
6564
" #### END OF THIS EXERCISE SECTION ###### \n",
6665
" \n",
6766
" #TODO: Define function that adds corresponding values in a dictionary using a loop\n",
6867
" def addInPlace(self, val1, val2): \n",
69-
" for i in val1.keys():\n",
70-
" val1[i] += val2[i] \n",
71-
" return val1\n",
68+
" \n",
69+
" \n",
70+
" \n",
7271
" #### END OF THIS EXERCISE SECTION ###### "
7372
]
7473
},
@@ -85,7 +84,8 @@
8584
"rdd1 = sc.parallelize(d)\n",
8685
" \n",
8786
"#TODO: Create an accumulator, `va` that takes in the DiffAccumulatorParam class\n",
88-
"va = sc.accumulator(c, DiffAccumulatorParam()) \n",
87+
"\n",
88+
"\n",
8989
"#### END OF THIS EXERCISE SECTION ###### "
9090
]
9191
},
@@ -104,7 +104,8 @@
104104
"rdd1.foreach(diff) \n",
105105
"\n",
106106
"#TODO: print the value of accumulator \n",
107-
"print(va.value)\n",
107+
"\n",
108+
"\n",
108109
"#### END OF THIS EXERCISE SECTION ###### "
109110
]
110111
},
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Join Operations Exercise"
8+
]
9+
},
10+
{
11+
"cell_type": "markdown",
12+
"metadata": {},
13+
"source": [
14+
"### Join Operations\n",
15+
"\n",
16+
"Finally, its worth highlighting how easily you can perform different kinds of joins in Spark Streaming.\n",
17+
"\n",
18+
"### Stream-stream joins\n",
19+
"\n",
20+
"Streams can be very easily joined with other streams.\n",
21+
"```python\n",
22+
"stream1 = ...\n",
23+
"stream2 = ...\n",
24+
"joinedStream = stream1.join(stream2)\n",
25+
"```\n",
26+
"Here, in each batch interval, the RDD generated by `stream1` will be joined with the RDD generated by `stream2`. You can also do `leftOuterJoin`, `rightOuterJoin`, `fullOuterJoin`. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well.\n",
27+
"```python\n",
28+
"windowedStream1 = stream1.window(20)\n",
29+
"windowedStream2 = stream2.window(60)\n",
30+
"joinedStream = windowedStream1.join(windowedStream2)\n",
31+
"```\n",
32+
"\n",
33+
"### Stream-dataset joins\n",
34+
"\n",
35+
"This has already been shown earlier while explain `DStream.transform` operation. Here is yet another example of joining a windowed stream with a dataset.\n",
36+
"```python\n",
37+
"dataset = ... # some RDD\n",
38+
"windowedStream = stream.window(20)\n",
39+
"joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))\n",
40+
"```\n",
41+
"In fact, you can also dynamically change the `dataset` you want to join against. The function provided to `transform` is evaluated every batch interval and therefore will use the current dataset that `dataset` reference points to.\n",
42+
"\n",
43+
"The complete list of DStream transformations is available in the API documentation. For the Python API, see [DStream](https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream).\n",
44+
"\n"
45+
]
46+
},
47+
{
48+
"cell_type": "markdown",
49+
"metadata": {},
50+
"source": [
51+
"### Exercise\n",
52+
"Create a streaming app that can join the incoming orders with our previous knowledge of whether this customer is good or bad."
53+
]
54+
},
55+
{
56+
"cell_type": "code",
57+
"execution_count": null,
58+
"metadata": {},
59+
"outputs": [],
60+
"source": [
61+
"import findspark\n",
62+
"# TODO: your path will likely not have 'matthew' in it. Change it to reflect your path.\n",
63+
"findspark.init('/home/matthew/spark-2.1.0-bin-hadoop2.7')"
64+
]
65+
},
66+
{
67+
"cell_type": "code",
68+
"execution_count": null,
69+
"metadata": {},
70+
"outputs": [],
71+
"source": [
72+
"from pyspark import SparkContext\n",
73+
"from pyspark.streaming import StreamingContext\n",
74+
"import time"
75+
]
76+
},
77+
{
78+
"cell_type": "code",
79+
"execution_count": null,
80+
"metadata": {},
81+
"outputs": [],
82+
"source": [
83+
"sc = SparkContext()\n",
84+
"ssc = StreamingContext(sc, 1)"
85+
]
86+
},
87+
{
88+
"cell_type": "code",
89+
"execution_count": null,
90+
"metadata": {},
91+
"outputs": [],
92+
"source": [
93+
"# For testing, create prepopulated QueueStream of streaming customer orders. \n",
94+
"transaction_rdd_queue = []\n",
95+
"for i in xrange(5): \n",
96+
" transactions = [(customer_id, None) for customer_id in xrange(10)]\n",
97+
" transaction_rdd = ssc.sparkContext.parallelize(transactions)\n",
98+
" transaction_rdd_queue.append(transaction_rdd)\n",
99+
"transaction_rdd_queue.pprint()"
100+
]
101+
},
102+
{
103+
"cell_type": "code",
104+
"execution_count": null,
105+
"metadata": {},
106+
"outputs": [],
107+
"source": [
108+
"# Batch RDD of whether customers are good or bad. \n",
109+
"# (customer_id, is_good_customer)\n",
110+
"customers = [(0,True),(1,False), (2,True), (3,False), (4,True), (5,False), (6,True), (7,False), (8,True), (9,False)]\n",
111+
"customer_rdd = ssc.sparkContext.parallelize(customers)"
112+
]
113+
},
114+
{
115+
"cell_type": "code",
116+
"execution_count": null,
117+
"metadata": {},
118+
"outputs": [],
119+
"source": [
120+
"# Creating queue stream\n",
121+
"ds = ssc.queueStream(transaction_rdd_queue)"
122+
]
123+
},
124+
{
125+
"cell_type": "code",
126+
"execution_count": null,
127+
"metadata": {
128+
"collapsed": true
129+
},
130+
"outputs": [],
131+
"source": [
132+
"# Join the streaming RDD and batch RDDs to filter out bad customers.\n",
133+
"dst = ds.transform(lambda rdd: rdd.join(customer_rdd)).filter(lambda (customer_id, (customer_data, is_good_customer)): is_good_customer)\n",
134+
"## END OF EXERCISE SECTION ==================================\n",
135+
"dst.pprint()"
136+
]
137+
},
138+
{
139+
"cell_type": "code",
140+
"execution_count": null,
141+
"metadata": {},
142+
"outputs": [],
143+
"source": [
144+
"ssc.start()\n",
145+
"time.sleep(6)\n",
146+
"ssc.stop()"
147+
]
148+
},
149+
{
150+
"cell_type": "markdown",
151+
"metadata": {},
152+
"source": [
153+
"## Reference\n",
154+
"1. https://spark.apache.org/docs/latest/streaming-programming-guide.html#join-operations"
155+
]
156+
},
157+
{
158+
"cell_type": "markdown",
159+
"metadata": {},
160+
"source": [
161+
" "
162+
]
163+
}
164+
],
165+
"metadata": {
166+
"kernelspec": {
167+
"display_name": "Python 3",
168+
"language": "python",
169+
"name": "python3"
170+
},
171+
"language_info": {
172+
"codemirror_mode": {
173+
"name": "ipython",
174+
"version": 3
175+
},
176+
"file_extension": ".py",
177+
"mimetype": "text/x-python",
178+
"name": "python",
179+
"nbconvert_exporter": "python",
180+
"pygments_lexer": "ipython3",
181+
"version": "3.5.2"
182+
}
183+
},
184+
"nbformat": 4,
185+
"nbformat_minor": 2
186+
}

3_advanced/3_Join Operations Exercise.ipynb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@
130130
"outputs": [],
131131
"source": [
132132
"# Join the streaming RDD and batch RDDs to filter out bad customers.\n",
133-
"dst = ds.transform(lambda rdd: rdd.join(customer_rdd)).filter(lambda (customer_id, (customer_data, is_good_customer)): is_good_customer)\n",
133+
"\n",
134+
"\n",
134135
"## END OF EXERCISE SECTION ==================================\n",
135136
"dst.pprint()"
136137
]

0 commit comments

Comments
 (0)