Skip to content

Commit 6cb8556

Browse files
Initial Commit
First Commit of the README, the lecture slides, the directroy structure, and the first python files for Lecture 1.
0 parents  commit 6cb8556

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+273
-0
lines changed

README.md

Lines changed: 170 additions & 0 deletions

section1/lectures/lecture1.pdf

702 KB
Binary file not shown.

section1/lectures/lecture1.pptx

9.52 MB
Binary file not shown.

section1/lectures/lecture1/Slide1.PNG

1.02 MB

section1/lectures/lecture1/Slide2.PNG

1020 KB

section1/lectures/lecture1/Slide3.PNG

1.01 MB

section1/lectures/lecture1/Slide4.PNG

1.04 MB

section1/lectures/lecture1/Slide5.PNG

1.06 MB

section1/lectures/lecture1/Slide6.PNG

1.01 MB

section1/lectures/lecture1/Slide7.PNG

1.04 MB

section1/python/spark_app.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from pyspark import SparkConf,SparkContext
2+
from pyspark.streaming import StreamingContext
3+
from pyspark.sql import Row,SQLContext
4+
import sys
5+
import requests
6+
7+
conf = SparkConf()
8+
conf.setAppName("TwitterStreamApp")
9+
sc = SparkContext(conf=conf)
10+
sc.setLogLevel("ERROR")
11+
ssc = StreamingContext(sc, 300)
12+
ssc.checkpoint("checkpoint_TwitterApp")
13+
dataStream = ssc.socketTextStream("localhost",9009)
14+
15+
16+
def aggregate_tags_count(new_values, total_sum):
17+
return sum(new_values) + (total_sum or 0)
18+
19+
20+
def get_sql_context_instance(spark_context):
21+
if ('sqlContextSingletonInstance' not in globals()):
22+
globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
23+
return globals()['sqlContextSingletonInstance']
24+
25+
26+
def send_df_to_dashboard(df):
27+
top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
28+
tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
29+
url = 'http://localhost:5001/updateData'
30+
request_data = {'label': str(top_tags), 'data': str(tags_count)}
31+
response = requests.post(url, data=request_data)
32+
33+
34+
def process_rdd(time, rdd):
35+
print("----------- %s -----------" % str(time))
36+
try:
37+
sql_context = get_sql_context_instance(rdd.context)
38+
row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
39+
hashtags_df = sql_context.createDataFrame(row_rdd)
40+
hashtags_df.registerTempTable("hashtags")
41+
hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 5")
42+
hashtag_counts_df.show()
43+
send_df_to_dashboard(hashtag_counts_df)
44+
except:
45+
e = sys.exc_info()[0]
46+
print("Error: %s" % e)
47+
48+
words = dataStream.flatMap(lambda line: line.split(" "))
49+
hashtags = words.map(lambda x: (x, 1))
50+
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
51+
tags_totals.foreachRDD(process_rdd)
52+
53+
ssc.start()
54+
ssc.awaitTermination()

section1/python/twitter_app.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
ACCESS_TOKEN = '2988835149-OgWXTLsBiosW74ZJi563l64WQ8f6tNudQKpLudp'
2+
ACCESS_SECRET = 'eOpeUX1wqL5sq9UK7yFbOPZ7ydYRZVqJ2Q2W7w3b1si7V'
3+
CONSUMER_KEY = 'AYYI7CvstplEi3fBAJ24vLVBA'
4+
CONSUMER_SECRET = 'OGFsmodX5DnHBcmZA3OrwFIeXS0gSUfPXcZVGTUOdffItb5Z0N'
5+
6+
7+
import socket
8+
import sys
9+
import requests
10+
import requests_oauthlib
11+
import json
12+
13+
14+
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
15+
16+
17+
def send_tweets_to_spark(http_resp, tcp_connection):
18+
for line in http_resp.iter_lines():
19+
try:
20+
full_tweet = json.loads(line)
21+
tweet_text = full_tweet['text']
22+
print("Tweet Text: " + tweet_text)
23+
print ("------------------------------------------")
24+
tcp_connection.send(tweet_text + '\n')
25+
except:
26+
e = sys.exc_info()[0]
27+
print("Error: %s" % e)
28+
29+
30+
def get_tweets():
31+
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
32+
query_data = [('locations', '-130,-20,100,50'), ('track', '#')]
33+
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
34+
response = requests.get(query_url, auth=my_auth, stream=True)
35+
print(query_url, response)
36+
return response
37+
38+
39+
TCP_IP = "localhost"
40+
TCP_PORT = 9009
41+
conn = None
42+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
43+
s.bind((TCP_IP, TCP_PORT))
44+
s.listen(1)
45+
print("Waiting for TCP connection...")
46+
conn, addr = s.accept()
47+
print("Connected... Starting getting tweets.")
48+
resp = get_tweets()
49+
send_tweets_to_spark(resp,conn)

section2/lectures/lecture2.pdf

702 KB
Binary file not shown.

section2/lectures/lecture2.pptx

9.52 MB
Binary file not shown.

section2/lectures/lecture2/Slide1.PNG

1.02 MB

section2/lectures/lecture2/Slide2.PNG

1020 KB

section2/lectures/lecture2/Slide3.PNG

1.01 MB

section2/lectures/lecture2/Slide4.PNG

1.04 MB

section2/lectures/lecture2/Slide5.PNG

1.06 MB

section2/lectures/lecture2/Slide6.PNG

1.01 MB

section2/lectures/lecture2/Slide7.PNG

1.04 MB

section3/lectures/lecture3.pdf

702 KB
Binary file not shown.

section3/lectures/lecture3.pptx

9.52 MB
Binary file not shown.

section3/lectures/lecture3/Slide1.PNG

1.02 MB

section3/lectures/lecture3/Slide2.PNG

1020 KB

section3/lectures/lecture3/Slide3.PNG

1.01 MB

section3/lectures/lecture3/Slide4.PNG

1.04 MB

section3/lectures/lecture3/Slide5.PNG

1.06 MB

section3/lectures/lecture3/Slide6.PNG

1.01 MB

section3/lectures/lecture3/Slide7.PNG

1.04 MB

section4/lectures/lecture4.pdf

702 KB
Binary file not shown.

section4/lectures/lecture4.pptx

9.52 MB
Binary file not shown.

section4/lectures/lecture4/Slide1.PNG

1.02 MB

section4/lectures/lecture4/Slide2.PNG

1020 KB

section4/lectures/lecture4/Slide3.PNG

1.01 MB

section4/lectures/lecture4/Slide4.PNG

1.04 MB

section4/lectures/lecture4/Slide5.PNG

1.06 MB

section4/lectures/lecture4/Slide6.PNG

1.01 MB

section4/lectures/lecture4/Slide7.PNG

1.04 MB

section5/lectures/lecture5.pdf

702 KB
Binary file not shown.

section5/lectures/lecture5.pptx

9.52 MB
Binary file not shown.

section5/lectures/lecture5/Slide1.PNG

1.02 MB

section5/lectures/lecture5/Slide2.PNG

1020 KB

section5/lectures/lecture5/Slide3.PNG

1.01 MB

section5/lectures/lecture5/Slide4.PNG

1.04 MB

section5/lectures/lecture5/Slide5.PNG

1.06 MB

section5/lectures/lecture5/Slide6.PNG

1.01 MB

section5/lectures/lecture5/Slide7.PNG

1.04 MB

0 commit comments

Comments
 (0)