|
1 | 1 | from pyspark import SparkConf,SparkContext
|
2 |
| -from pyspark.streaming import StreamingContext |
| 2 | +from pyspark.streaming import StreamingContext # From within pyspark or send to spark-submit |
3 | 3 | from pyspark.sql import Row,SQLContext
|
4 | 4 | import sys
|
5 | 5 | import requests
|
6 | 6 |
|
7 | 7 | conf = SparkConf()
|
8 |
| -conf.setAppName("TwitterStreamApp") |
9 |
| -sc = SparkContext(conf=conf) |
10 |
| -sc.setLogLevel("ERROR") |
11 |
| -ssc = StreamingContext(sc, 300) |
12 |
| -ssc.checkpoint("checkpoint_TwitterApp") |
13 |
| -dataStream = ssc.socketTextStream("localhost",9009) |
14 |
| - |
15 |
| - |
16 |
| -def aggregate_tags_count(new_values, total_sum): |
17 |
| - return sum(new_values) + (total_sum or 0) |
18 | 8 |
|
| 9 | +conf.setAppName("TwitterStreamApp") |
19 | 10 |
|
20 |
| -def get_sql_context_instance(spark_context): |
21 |
| - if ('sqlContextSingletonInstance' not in globals()): |
22 |
| - globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) |
23 |
| - return globals()['sqlContextSingletonInstance'] |
24 |
| - |
| 11 | +sc = SparkContext(conf=conf) |
25 | 12 |
|
26 |
| -def send_df_to_dashboard(df): |
27 |
| - top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] |
28 |
| - tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] |
29 |
| - url = 'http://localhost:5001/updateData' |
30 |
| - request_data = {'label': str(top_tags), 'data': str(tags_count)} |
31 |
| - response = requests.post(url, data=request_data) |
| 13 | +sc.setLogLevel("ERROR") |
32 | 14 |
|
| 15 | +ssc = StreamingContext(sc, 300) # 5 minute batch interval |
33 | 16 |
|
34 |
| -def process_rdd(time, rdd): |
35 |
| - print("----------- %s -----------" % str(time)) |
36 |
| - try: |
37 |
| - sql_context = get_sql_context_instance(rdd.context) |
38 |
| - row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) |
39 |
| - hashtags_df = sql_context.createDataFrame(row_rdd) |
40 |
| - hashtags_df.registerTempTable("hashtags") |
41 |
| - hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 5") |
42 |
| - hashtag_counts_df.show() |
43 |
| - send_df_to_dashboard(hashtag_counts_df) |
44 |
| - except: |
45 |
| - e = sys.exc_info()[0] |
46 |
| - print("Error: %s" % e) |
| 17 | +ssc.checkpoint("checkpoint_TwitterApp") |
47 | 18 |
|
48 |
| -words = dataStream.flatMap(lambda line: line.split(" ")) |
49 |
| -hashtags = words.map(lambda x: (x, 1)) |
50 |
| -tags_totals = hashtags.updateStateByKey(aggregate_tags_count) |
51 |
| -tags_totals.foreachRDD(process_rdd) |
| 19 | +dataStream = ssc.socketTextStream("localhost",9009) # Stream IP (localhost), and port (5555 in our case) |
52 | 20 |
|
53 |
| -ssc.start() |
54 |
| -ssc.awaitTermination() |
| 21 | +dataStream.pprint() # Print the incoming tweets to the console |
| 22 | + |
| 23 | +ssc.start() # Start reading the stream |
| 24 | +ssc.awaitTermination() # Wait for the process to terminate |
0 commit comments