Skip to content

Commit 8f2fb87

Browse files
Updated Part 3 Demos & Exercises
1 parent a9e4f71 commit 8f2fb87

21 files changed

+1434
-452
lines changed

3_advanced/.ipynb_checkpoints/1_Stream-Stream Join Demo-checkpoint.ipynb

Lines changed: 75 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,6 @@
77
"# Stream-Stream Join Demo"
88
]
99
},
10-
{
11-
"cell_type": "markdown",
12-
"metadata": {},
13-
"source": [
14-
"Different types of Join\n",
15-
"Stream-stream joins\n",
16-
"Stream-dataset joins\n",
17-
"DEMO: Do a demo with Stream-stream joins\n",
18-
"DEMO: Do a demo with Stream-dataset joins\n",
19-
"EXERCISE: Give an exercise with Stream-stream joins or Stream-dataset joins\n"
20-
]
21-
},
2210
{
2311
"cell_type": "markdown",
2412
"metadata": {},
@@ -40,20 +28,7 @@
4028
"windowedStream1 = stream1.window(20)\n",
4129
"windowedStream2 = stream2.window(60)\n",
4230
"joinedStream = windowedStream1.join(windowedStream2)\n",
43-
"```\n",
44-
"\n",
45-
"### Stream-dataset joins\n",
46-
"\n",
47-
"This has already been shown earlier while explain `DStream.transform` operation. Here is yet another example of joining a windowed stream with a dataset.\n",
48-
"```python\n",
49-
"dataset = ... # some RDD\n",
50-
"windowedStream = stream.window(20)\n",
51-
"joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))\n",
52-
"```\n",
53-
"In fact, you can also dynamically change the `dataset` you want to join against. The function provided to `transform` is evaluated every batch interval and therefore will use the current dataset that `dataset` reference points to.\n",
54-
"\n",
55-
"The complete list of DStream transformations is available in the API documentation. For the Python API, see [DStream](https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream).\n",
56-
"\n"
31+
"```"
5732
]
5833
},
5934
{
@@ -81,7 +56,80 @@
8156
"collapsed": true
8257
},
8358
"outputs": [],
84-
"source": []
59+
"source": [
60+
"from pyspark import SparkContext\n",
61+
"from pyspark.streaming import StreamingContext\n",
62+
"from pprint import pprint\n",
63+
"import time"
64+
]
65+
},
66+
{
67+
"cell_type": "code",
68+
"execution_count": null,
69+
"metadata": {},
70+
"outputs": [],
71+
"source": [
72+
"sc = SparkContext()\n",
73+
"ssc = StreamingContext(sc, 1)"
74+
]
75+
},
76+
{
77+
"cell_type": "code",
78+
"execution_count": null,
79+
"metadata": {},
80+
"outputs": [],
81+
"source": [
82+
"rdd_queue = []\n",
83+
"for i in xrange(5): \n",
84+
" rdd_data = xrange(1000)\n",
85+
" rdd = ssc.sparkContext.parallelize(rdd_data)\n",
86+
" rdd_queue.append(rdd)\n",
87+
"pprint(rdd_queue)\n",
88+
"\n",
89+
"# Creating queue stream # 1\n",
90+
"ds1 = ssc.queueStream(rdd_queue).map(lambda x: (x % 10, 1)).window(4).reduceByKey(lambda v1,v2:v1+v2)\n",
91+
"ds1.pprint()"
92+
]
93+
},
94+
{
95+
"cell_type": "code",
96+
"execution_count": null,
97+
"metadata": {},
98+
"outputs": [],
99+
"source": [
100+
"# Creating queue stream # 2\n",
101+
"ds2 = ssc.queueStream(rdd_queue).map(lambda x: (x % 5, 1)).window(windowDuration=20).reduceByKey(lambda v1,v2:v1+v2)\n",
102+
"ds2.pprint()"
103+
]
104+
},
105+
{
106+
"cell_type": "code",
107+
"execution_count": null,
108+
"metadata": {},
109+
"outputs": [],
110+
"source": [
111+
"# Crossing the Streams\n",
112+
"joinedStream = ds1.join(ds2)\n",
113+
"joinedStream.pprint()"
114+
]
115+
},
116+
{
117+
"cell_type": "code",
118+
"execution_count": null,
119+
"metadata": {},
120+
"outputs": [],
121+
"source": [
122+
"ssc.start()"
123+
]
124+
},
125+
{
126+
"cell_type": "code",
127+
"execution_count": null,
128+
"metadata": {},
129+
"outputs": [],
130+
"source": [
131+
"ssc.stop()"
132+
]
85133
},
86134
{
87135
"cell_type": "markdown",

3_advanced/.ipynb_checkpoints/2_Stream-Dataset Join Demo-checkpoint.ipynb

Lines changed: 87 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,6 @@
1111
"cell_type": "markdown",
1212
"metadata": {},
1313
"source": [
14-
"Different types of Join\n",
15-
"Stream-stream joins\n",
16-
"Stream-dataset joins\n",
17-
"DEMO: Do a demo with Stream-stream joins\n",
18-
"DEMO: Do a demo with Stream-dataset joins\n",
19-
"EXERCISE: Give an exercise with Stream-stream joins or Stream-dataset joins\n"
20-
]
21-
},
22-
{
23-
"cell_type": "markdown",
24-
"metadata": {},
25-
"source": [
26-
"### Join Operations\n",
27-
"\n",
28-
"Finally, its worth highlighting how easily you can perform different kinds of joins in Spark Streaming.\n",
29-
"\n",
30-
"### Stream-stream joins\n",
31-
"\n",
32-
"Streams can be very easily joined with other streams.\n",
33-
"```python\n",
34-
"stream1 = ...\n",
35-
"stream2 = ...\n",
36-
"joinedStream = stream1.join(stream2)\n",
37-
"```\n",
38-
"Here, in each batch interval, the RDD generated by `stream1` will be joined with the RDD generated by `stream2`. You can also do `leftOuterJoin`, `rightOuterJoin`, `fullOuterJoin`. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well.\n",
39-
"```python\n",
40-
"windowedStream1 = stream1.window(20)\n",
41-
"windowedStream2 = stream2.window(60)\n",
42-
"joinedStream = windowedStream1.join(windowedStream2)\n",
43-
"```\n",
44-
"\n",
4514
"### Stream-dataset joins\n",
4615
"\n",
4716
"This has already been shown earlier while explain `DStream.transform` operation. Here is yet another example of joining a windowed stream with a dataset.\n",
@@ -74,14 +43,100 @@
7443
"findspark.init('/home/matthew/spark-2.1.0-bin-hadoop2.7')"
7544
]
7645
},
46+
{
47+
"cell_type": "code",
48+
"execution_count": null,
49+
"metadata": {},
50+
"outputs": [],
51+
"source": [
52+
"from pyspark import SparkContext\n",
53+
"from pyspark.streaming import StreamingContext\n",
54+
"import time"
55+
]
56+
},
57+
{
58+
"cell_type": "code",
59+
"execution_count": null,
60+
"metadata": {},
61+
"outputs": [],
62+
"source": [
63+
"sc = SparkContext(\"local[2]\", \"IP-Matcher\")\n",
64+
"ssc = StreamingContext(sc, 2)"
65+
]
66+
},
67+
{
68+
"cell_type": "code",
69+
"execution_count": null,
70+
"metadata": {},
71+
"outputs": [],
72+
"source": [
73+
"ips_rdd = sc.parallelize(set())\n",
74+
"lines_ds = ssc.socketTextStream(\"localhost\", 9999)"
75+
]
76+
},
77+
{
78+
"cell_type": "code",
79+
"execution_count": null,
80+
"metadata": {},
81+
"outputs": [],
82+
"source": [
83+
"# split each line into IPs\n",
84+
"ips_ds = lines_ds.flatMap(lambda line: line.split(\" \"))\n",
85+
"pairs_ds = ips_ds.map(lambda ip: (ip, 1))"
86+
]
87+
},
88+
{
89+
"cell_type": "code",
90+
"execution_count": null,
91+
"metadata": {},
92+
"outputs": [],
93+
"source": [
94+
"# join with the IPs RDD\n",
95+
"matches_ds = pairs_ds.transform(lambda rdd: rdd.join(ips_rdd))\n",
96+
"matches_ds.pprint()"
97+
]
98+
},
99+
{
100+
"cell_type": "code",
101+
"execution_count": null,
102+
"metadata": {},
103+
"outputs": [],
104+
"source": [
105+
"# In another window run:\n",
106+
"# $ nc -lk 9999\n",
107+
"# Then enter IP addresses separated by spaces into the nc window"
108+
]
109+
},
77110
{
78111
"cell_type": "code",
79112
"execution_count": null,
80113
"metadata": {
81114
"collapsed": true
82115
},
83116
"outputs": [],
84-
"source": []
117+
"source": [
118+
"ssc.start()\n",
119+
"\n",
120+
"# alternate between two sets of IP addresses for the RDD\n",
121+
"IP_FILES = ('data/ip_file1.txt', 'data/ip_file2.txt')\n",
122+
"file_index = 0\n",
123+
"while True:\n",
124+
" with open(IP_FILES[file_index]) as f:\n",
125+
" ips = f.read().splitlines()\n",
126+
" ips_rdd = sc.parallelize(ips).map(lambda ip: (ip, 1))\n",
127+
" print \"using\", IP_FILES[file_index]\n",
128+
" file_index = (file_index + 1) % len(IP_FILES)\n",
129+
" time.sleep(8)"
130+
]
131+
},
132+
{
133+
"cell_type": "code",
134+
"execution_count": null,
135+
"metadata": {},
136+
"outputs": [],
137+
"source": [
138+
"ssc.stop()"
139+
]
85140
},
86141
{
87142
"cell_type": "markdown",

3_advanced/.ipynb_checkpoints/3_Join Operations Exercise-checkpoint.ipynb

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,6 @@
77
"# Join Operations Exercise"
88
]
99
},
10-
{
11-
"cell_type": "markdown",
12-
"metadata": {},
13-
"source": [
14-
"Different types of Join\n",
15-
"Stream-stream joins\n",
16-
"Stream-dataset joins\n",
17-
"DEMO: Do a demo with Stream-stream joins\n",
18-
"DEMO: Do a demo with Stream-dataset joins\n",
19-
"EXERCISE: Give an exercise with Stream-stream joins or Stream-dataset joins\n"
20-
]
21-
},
2210
{
2311
"cell_type": "markdown",
2412
"metadata": {},
@@ -60,7 +48,8 @@
6048
"cell_type": "markdown",
6149
"metadata": {},
6250
"source": [
63-
"### Exercise"
51+
"### Exercise\n",
52+
"Create a streaming app that can join the incoming orders with our previous knowledge of whether this customer is good or bad."
6453
]
6554
},
6655
{
@@ -74,14 +63,88 @@
7463
"findspark.init('/home/matthew/spark-2.1.0-bin-hadoop2.7')"
7564
]
7665
},
66+
{
67+
"cell_type": "code",
68+
"execution_count": null,
69+
"metadata": {},
70+
"outputs": [],
71+
"source": [
72+
"from pyspark import SparkContext\n",
73+
"from pyspark.streaming import StreamingContext\n",
74+
"import time"
75+
]
76+
},
77+
{
78+
"cell_type": "code",
79+
"execution_count": null,
80+
"metadata": {},
81+
"outputs": [],
82+
"source": [
83+
"sc = SparkContext()\n",
84+
"ssc = StreamingContext(sc, 1)"
85+
]
86+
},
87+
{
88+
"cell_type": "code",
89+
"execution_count": null,
90+
"metadata": {},
91+
"outputs": [],
92+
"source": [
93+
"# For testing, create prepopulated QueueStream of streaming customer orders. \n",
94+
"transaction_rdd_queue = []\n",
95+
"for i in xrange(5): \n",
96+
" transactions = [(customer_id, None) for customer_id in xrange(10)]\n",
97+
" transaction_rdd = ssc.sparkContext.parallelize(transactions)\n",
98+
" transaction_rdd_queue.append(transaction_rdd)\n",
99+
"transaction_rdd_queue.pprint()"
100+
]
101+
},
102+
{
103+
"cell_type": "code",
104+
"execution_count": null,
105+
"metadata": {},
106+
"outputs": [],
107+
"source": [
108+
"# Batch RDD of whether customers are good or bad. \n",
109+
"# (customer_id, is_good_customer)\n",
110+
"customers = [(0,True),(1,False), (2,True), (3,False), (4,True), (5,False), (6,True), (7,False), (8,True), (9,False)]\n",
111+
"customer_rdd = ssc.sparkContext.parallelize(customers)"
112+
]
113+
},
114+
{
115+
"cell_type": "code",
116+
"execution_count": null,
117+
"metadata": {},
118+
"outputs": [],
119+
"source": [
120+
"# Creating queue stream\n",
121+
"ds = ssc.queueStream(transaction_rdd_queue)"
122+
]
123+
},
77124
{
78125
"cell_type": "code",
79126
"execution_count": null,
80127
"metadata": {
81128
"collapsed": true
82129
},
83130
"outputs": [],
84-
"source": []
131+
"source": [
132+
"# Join the streaming RDD and batch RDDs to filter out bad customers.\n",
133+
"dst = ds.transform(lambda rdd: rdd.join(customer_rdd)).filter(lambda (customer_id, (customer_data, is_good_customer)): is_good_customer)\n",
134+
"## END OF EXERCISE SECTION ==================================\n",
135+
"dst.pprint()"
136+
]
137+
},
138+
{
139+
"cell_type": "code",
140+
"execution_count": null,
141+
"metadata": {},
142+
"outputs": [],
143+
"source": [
144+
"ssc.start()\n",
145+
"time.sleep(6)\n",
146+
"ssc.stop()"
147+
]
85148
},
86149
{
87150
"cell_type": "markdown",

0 commit comments

Comments
 (0)