Skip to content

Commit 76ba21d

Browse files
authored
Merge pull request jleetutorial#3 from jleetutorial/pedromb-scala_to_python
Scala to Python - advanced folder
2 parents 9fe453e + 35b653c commit 76ba21d

8 files changed

+108
-153
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
def filterResponseFromCanada(response, total, missingSalaryMidPoint):
5+
splits = Utils.COMMA_DELIMITER.split(response)
6+
total.add(1)
7+
if not splits[14]:
8+
missingSalaryMidPoint.add(1)
9+
return splits[2] == "Canada"
10+
11+
if __name__ == "__main__":
12+
sc = SparkContext("local", "StackOverFlowSurvey")
13+
sc.setLogLevel("ERROR")
14+
15+
total = sc.accumulator(0)
16+
missingSalaryMidPoint = sc.accumulator(0)
17+
18+
responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
19+
20+
responseFromCanada = responseRDD.filter(lambda response: \
21+
filterResponseFromCanada(response, total, missingSalaryMidPoint))
22+
23+
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
24+
print("Total count of responses: {}".format(total.value))
25+
print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value))

advanced/accumulator/StackOverFlowSurvey.scala

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
def filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes):
5+
processedBytes.add(len(response.encode('utf-8')))
6+
splits = Utils.COMMA_DELIMITER.split(response)
7+
total.add(1)
8+
if not splits[14]:
9+
missingSalaryMidPoint.add(1)
10+
return splits[2] == "Canada"
11+
12+
if __name__ == "__main__":
13+
sc = SparkContext("local", "StackOverFlowSurvey")
14+
sc.setLogLevel("ERROR")
15+
16+
total = sc.accumulator(0)
17+
missingSalaryMidPoint = sc.accumulator(0)
18+
processedBytes = sc.accumulator(0)
19+
20+
responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
21+
22+
responseFromCanada = responseRDD.filter(lambda response: \
23+
filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes))
24+
25+
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
26+
print("Number of bytes processed: {}".format(processedBytes.value))
27+
print("Total count of responses: {}".format(total.value))
28+
print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value))

advanced/accumulator/StackOverFlowSurveyFollowUp.scala

Lines changed: 0 additions & 39 deletions
This file was deleted.

advanced/broadcast/UkMakerSpaces.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
def getPostPrefix(line: str):
5+
splits = Utils.COMMA_DELIMITER.split(line)
6+
postcode = splits[4]
7+
return None if not postcode else postcode.split(" ")[0]
8+
9+
def loadPostCodeMap():
10+
lines = open("in/uk-postcode.csv", "r").read().split("\n")
11+
splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""]
12+
return {splits[0]: splits[7] for splits in splitsForLines}
13+
14+
if __name__ == "__main__":
15+
sc = SparkContext("local", "UkMakerSpaces")
16+
sc.setLogLevel("ERROR")
17+
18+
postCodeMap = sc.broadcast(loadPostCodeMap())
19+
20+
makerSpaceRdd = sc.textFile("in/uk-makerspaces-identifiable-data.csv")
21+
22+
regions = makerSpaceRdd \
23+
.filter(lambda line: Utils.COMMA_DELIMITER.split(line)[0] != "Timestamp") \
24+
.filter(lambda line: getPostPrefix(line) is not None) \
25+
.map(lambda line: postCodeMap.value[getPostPrefix(line)] \
26+
if getPostPrefix(line) in postCodeMap.value else "Unknow")
27+
28+
for region, count in regions.countByValue().items():
29+
print("{} : {}".format(region, count))

advanced/broadcast/UkMakerSpaces.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
def getPostPrefixes(line: str):
5+
postcode = Utils.COMMA_DELIMITER.split(line)[4]
6+
cleanedPostCode = postcode.replace("\\s+", "")
7+
return [cleanedPostCode[0:i] for i in range(0,len(cleanedPostCode)+1)]
8+
9+
def loadPostCodeMap():
10+
lines = open("in/uk-postcode.csv", "r").read().split("\n")
11+
splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""]
12+
return {splits[0]: splits[7] for splits in splitsForLines}
13+
14+
if __name__ == "__main__":
15+
sc = SparkContext("local", "UkMakerSpaces")
16+
sc.setLogLevel("ERROR")
17+
postCodeMap = loadPostCodeMap()
18+
makerSpaceRdd = sc.textFile("in/uk-makerspaces-identifiable-data.csv")
19+
20+
regions = makerSpaceRdd \
21+
.filter(lambda line: Utils.COMMA_DELIMITER.split(line)[0] != "Timestamp") \
22+
.map(lambda line: next((postCodeMap[prefix] for prefix in getPostPrefixes(line) \
23+
if prefix in postCodeMap), "Unknow"))
24+
25+
for region, count in regions.countByValue().items():
26+
print("{} : {}".format(region, count))

advanced/broadcast/UkMakerSpacesWithoutBroadcast.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

0 commit comments

Comments
 (0)