Skip to content

Commit 5f9071a

Browse files
author
Pedro Bernardo
committed
Added advanced/accumulator/*.py
1 parent 3aeb5d8 commit 5f9071a

File tree

2 files changed

+53
-0
lines changed

2 files changed

+53
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
def filterResponseFromCanada(response, total, missingSalaryMidPoint):
5+
splits = Utils.COMMA_DELIMITER.split(response)
6+
total.add(1)
7+
if not splits[14]:
8+
missingSalaryMidPoint.add(1)
9+
return splits[2] == "Canada"
10+
11+
if __name__ == "__main__":
12+
sc = SparkContext("local", "StackOverFlowSurvey")
13+
sc.setLogLevel("ERROR")
14+
15+
total = sc.accumulator(0)
16+
missingSalaryMidPoint = sc.accumulator(0)
17+
18+
responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
19+
20+
responseFromCanada = responseRDD.filter(lambda response: \
21+
filterResponseFromCanada(response, total, missingSalaryMidPoint))
22+
23+
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
24+
print("Total count of responses: {}".format(total.value))
25+
print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value))
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
def filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes):
5+
processedBytes.add(len(response.encode('utf-8')))
6+
splits = Utils.COMMA_DELIMITER.split(response)
7+
total.add(1)
8+
if not splits[14]:
9+
missingSalaryMidPoint.add(1)
10+
return splits[2] == "Canada"
11+
12+
if __name__ == "__main__":
13+
sc = SparkContext("local", "StackOverFlowSurvey")
14+
sc.setLogLevel("ERROR")
15+
16+
total = sc.accumulator(0)
17+
missingSalaryMidPoint = sc.accumulator(0)
18+
processedBytes = sc.accumulator(0)
19+
20+
responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
21+
22+
responseFromCanada = responseRDD.filter(lambda response: \
23+
filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes))
24+
25+
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
26+
print("Number of bytes processed: {}".format(processedBytes.value))
27+
print("Total count of responses: {}".format(total.value))
28+
print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value))

0 commit comments

Comments
 (0)