Skip to content

Commit 942672b

Browse files
author
Pedro Bernardo
committed
Added sys path to guarantee imports | Added SparkConf to all files
1 parent 3ec564f commit 942672b

18 files changed

+148
-124
lines changed
Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

4-
def filterResponseFromCanada(response, total, missingSalaryMidPoint):
5-
splits = Utils.COMMA_DELIMITER.split(response)
6-
total.add(1)
7-
if not splits[14]:
8-
missingSalaryMidPoint.add(1)
9-
return splits[2] == "Canada"
10-
116
if __name__ == "__main__":
12-
sc = SparkContext("local", "StackOverFlowSurvey")
13-
sc.setLogLevel("ERROR")
14-
7+
conf = SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]")
8+
sc = SparkContext(conf = conf)
159
total = sc.accumulator(0)
1610
missingSalaryMidPoint = sc.accumulator(0)
17-
1811
responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
1912

20-
responseFromCanada = responseRDD.filter(lambda response: \
21-
filterResponseFromCanada(response, total, missingSalaryMidPoint))
13+
def filterResponseFromCanada(response):
14+
splits = Utils.COMMA_DELIMITER.split(response)
15+
total.add(1)
16+
if not splits[14]:
17+
missingSalaryMidPoint.add(1)
18+
return splits[2] == "Canada"
2219

20+
responseFromCanada = responseRDD.filter(filterResponseFromCanada)
2321
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
2422
print("Total count of responses: {}".format(total.value))
25-
print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value))
23+
print("Count of responses missing salary middle point: {}" \
24+
.format(missingSalaryMidPoint.value))

advanced/accumulator/StackOverFlowSurveyFollowUp.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

4-
def filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes):
5-
processedBytes.add(len(response.encode('utf-8')))
6-
splits = Utils.COMMA_DELIMITER.split(response)
7-
total.add(1)
8-
if not splits[14]:
9-
missingSalaryMidPoint.add(1)
10-
return splits[2] == "Canada"
11-
126
if __name__ == "__main__":
13-
sc = SparkContext("local", "StackOverFlowSurvey")
14-
sc.setLogLevel("ERROR")
7+
conf = SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]")
8+
sc = SparkContext(conf = conf)
159

1610
total = sc.accumulator(0)
1711
missingSalaryMidPoint = sc.accumulator(0)
1812
processedBytes = sc.accumulator(0)
19-
2013
responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
2114

22-
responseFromCanada = responseRDD.filter(lambda response: \
23-
filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes))
15+
def filterResponseFromCanada(response):
16+
processedBytes.add(len(response.encode('utf-8')))
17+
splits = Utils.COMMA_DELIMITER.split(response)
18+
total.add(1)
19+
if not splits[14]:
20+
missingSalaryMidPoint.add(1)
21+
return splits[2] == "Canada"
22+
responseFromCanada = responseRDD.filter(filterResponseFromCanada)
2423

2524
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
2625
print("Number of bytes processed: {}".format(processedBytes.value))

advanced/broadcast/UkMakerSpaces.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

4-
def getPostPrefix(line: str):
5-
splits = Utils.COMMA_DELIMITER.split(line)
6-
postcode = splits[4]
7-
return None if not postcode else postcode.split(" ")[0]
8-
96
def loadPostCodeMap():
107
lines = open("in/uk-postcode.csv", "r").read().split("\n")
118
splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""]
129
return {splits[0]: splits[7] for splits in splitsForLines}
1310

11+
def getPostPrefix(line: str):
12+
splits = Utils.COMMA_DELIMITER.split(line)
13+
postcode = splits[4]
14+
return None if not postcode else postcode.split(" ")[0]
15+
1416
if __name__ == "__main__":
15-
sc = SparkContext("local", "UkMakerSpaces")
16-
sc.setLogLevel("ERROR")
17+
conf = SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]")
18+
sc = SparkContext(conf = conf)
1719

1820
postCodeMap = sc.broadcast(loadPostCodeMap())
1921

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

4-
def getPostPrefixes(line: str):
5-
postcode = Utils.COMMA_DELIMITER.split(line)[4]
6-
cleanedPostCode = postcode.replace("\\s+", "")
7-
return [cleanedPostCode[0:i] for i in range(0,len(cleanedPostCode)+1)]
8-
96
def loadPostCodeMap():
107
lines = open("in/uk-postcode.csv", "r").read().split("\n")
118
splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""]
129
return {splits[0]: splits[7] for splits in splitsForLines}
1310

11+
def getPostPrefix(line: str):
12+
splits = Utils.COMMA_DELIMITER.split(line)
13+
postcode = splits[4]
14+
return None if not postcode else postcode.split(" ")[0]
15+
1416
if __name__ == "__main__":
15-
sc = SparkContext("local", "UkMakerSpaces")
16-
sc.setLogLevel("ERROR")
17+
conf = SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]")
18+
sc = SparkContext(conf = conf)
1719
postCodeMap = loadPostCodeMap()
1820
makerSpaceRdd = sc.textFile("in/uk-makerspaces-identifiable-data.csv")
1921

2022
regions = makerSpaceRdd \
2123
.filter(lambda line: Utils.COMMA_DELIMITER.split(line)[0] != "Timestamp") \
22-
.map(lambda line: next((postCodeMap[prefix] for prefix in getPostPrefixes(line) \
23-
if prefix in postCodeMap), "Unknow"))
24+
.map(lambda line: postCodeMap[getPostPrefix(line)] \
25+
if getPostPrefix(line) in postCodeMap else "Unknow")
2426

2527
for region, count in regions.countByValue().items():
2628
print("{} : {}".format(region, count))

pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
5-
sc = SparkContext("local", "AverageHousePrice")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("AverageHousePrice").setMaster("local")
5+
sc = SparkContext(conf = conf)
76

87
lines = sc.textFile("in/RealEstate.csv")
98
cleanedLines = lines.filter(lambda line: "Bedrooms" not in line)
Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
4+
from pairRdd.aggregation.reducebykey.housePrice.AvgCount import AvgCount
25

36
if __name__ == "__main__":
4-
5-
sc = SparkContext("local", "avgHousePrice")
6-
sc.setLogLevel("ERROR")
7+
conf = SparkConf().setAppName("avgHousePrice").setMaster("local[3]")
8+
sc = SparkContext(conf = conf)
79

810
lines = sc.textFile("in/RealEstate.csv")
911
cleanedLines = lines.filter(lambda line: "Bedrooms" not in line)
1012

1113
housePricePairRdd = cleanedLines.map(lambda line: \
12-
(line.split(",")[3], (1, float(line.split(",")[2]))))
14+
(line.split(",")[3], AvgCount(1, float(line.split(",")[2]))))
1315

1416
housePriceTotal = housePricePairRdd \
15-
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
17+
.reduceByKey(lambda x, y: AvgCount(x.count + y.count, x.total + y.total))
1618

1719
print("housePriceTotal: ")
18-
for bedroom, total in housePriceTotal.collect():
19-
print("{} : {}".format(bedroom, total))
20+
for bedroom, avgCount in housePriceTotal.collect():
21+
print("{} : ({}, {})".format(bedroom, avgCount.count, avgCount.total))
2022

21-
housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount[1] / avgCount[0])
23+
housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount.total / avgCount.count)
2224
print("\nhousePriceAvg: ")
2325
for bedroom, avg in housePriceAvg.collect():
2426
print("{} : {}".format(bedroom, avg))

pairRdd/filter/AirportsNotInUsaSolution.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

46
if __name__ == "__main__":
57

6-
sc = SparkContext("local", "airports")
7-
sc.setLogLevel("ERROR")
8+
conf = SparkConf().setAppName("airports").setMaster("local[*]")
9+
sc = SparkContext(conf = conf)
810

911
airportsRDD = sc.textFile("in/airports.text")
1012

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

46
if __name__ == "__main__":
57

6-
sc = SparkContext("local", "airports")
7-
sc.setLogLevel("ERROR")
8+
conf = SparkConf().setAppName("airports").setMaster("local[*]")
9+
sc = SparkContext(conf = conf)
810

911
lines = sc.textFile("in/airports.text")
1012

@@ -15,4 +17,4 @@
1517
airportsByCountry = countryAndAirportNameAndPair.groupByKey()
1618

1719
for country, airportName in airportsByCountry.collectAsMap().items():
18-
print("{}: {}".format(country,list(airportName)))
20+
print("{}: {}".format(country, list(airportName)))

pairRdd/mapValues/AirportsUppercaseSolution.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

46
if __name__ == "__main__":
5-
6-
sc = SparkContext("local", "airports")
7-
sc.setLogLevel("ERROR")
7+
conf = SparkConf().setAppName("airports").setMaster("local[*]")
8+
sc = SparkContext(conf = conf)
89

910
airportsRDD = sc.textFile("in/airports.text")
1011

pairRdd/sort/AverageHousePriceSolution.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
import sys
2+
sys.path.insert(0, '.')
13
from pairRdd.aggregation.reducebykey.housePrice.AvgCount import AvgCount
2-
from pyspark import SparkContext
3-
4+
from pyspark import SparkContext, SparkConf
45

56
if __name__ == "__main__":
6-
7-
sc = SparkContext("local", "averageHousePriceSolution")
8-
sc.setLogLevel("ERROR")
7+
conf = SparkConf().setAppName("averageHousePriceSolution").setMaster("local[*]")
8+
sc = SparkContext(conf = conf)
99

1010
lines = sc.textFile("in/RealEstate.csv")
1111
cleanedLines = lines.filter(lambda line: "Bedrooms" not in line)

0 commit comments

Comments
 (0)