Skip to content

Commit 83216e1

Browse files
authored
Merge pull request #7 from jleetutorial/pedro-changes-path
Added sys path to guarantee imports | Added SparkConf to all files
2 parents 3ec564f + ac8e586 commit 83216e1

18 files changed

+145
-123
lines changed
Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

4-
def filterResponseFromCanada(response, total, missingSalaryMidPoint):
5-
splits = Utils.COMMA_DELIMITER.split(response)
6-
total.add(1)
7-
if not splits[14]:
8-
missingSalaryMidPoint.add(1)
9-
return splits[2] == "Canada"
10-
116
if __name__ == "__main__":
12-
sc = SparkContext("local", "StackOverFlowSurvey")
13-
sc.setLogLevel("ERROR")
14-
7+
conf = SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]")
8+
sc = SparkContext(conf = conf)
159
total = sc.accumulator(0)
1610
missingSalaryMidPoint = sc.accumulator(0)
17-
1811
responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
1912

20-
responseFromCanada = responseRDD.filter(lambda response: \
21-
filterResponseFromCanada(response, total, missingSalaryMidPoint))
13+
def filterResponseFromCanada(response):
14+
splits = Utils.COMMA_DELIMITER.split(response)
15+
total.add(1)
16+
if not splits[14]:
17+
missingSalaryMidPoint.add(1)
18+
return splits[2] == "Canada"
2219

20+
responseFromCanada = responseRDD.filter(filterResponseFromCanada)
2321
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
2422
print("Total count of responses: {}".format(total.value))
25-
print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value))
23+
print("Count of responses missing salary middle point: {}" \
24+
.format(missingSalaryMidPoint.value))

advanced/accumulator/StackOverFlowSurveyFollowUp.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

4-
def filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes):
5-
processedBytes.add(len(response.encode('utf-8')))
6-
splits = Utils.COMMA_DELIMITER.split(response)
7-
total.add(1)
8-
if not splits[14]:
9-
missingSalaryMidPoint.add(1)
10-
return splits[2] == "Canada"
11-
126
if __name__ == "__main__":
13-
sc = SparkContext("local", "StackOverFlowSurvey")
14-
sc.setLogLevel("ERROR")
7+
conf = SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]")
8+
sc = SparkContext(conf = conf)
159

1610
total = sc.accumulator(0)
1711
missingSalaryMidPoint = sc.accumulator(0)
1812
processedBytes = sc.accumulator(0)
19-
2013
responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
2114

22-
responseFromCanada = responseRDD.filter(lambda response: \
23-
filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes))
15+
def filterResponseFromCanada(response):
16+
processedBytes.add(len(response.encode('utf-8')))
17+
splits = Utils.COMMA_DELIMITER.split(response)
18+
total.add(1)
19+
if not splits[14]:
20+
missingSalaryMidPoint.add(1)
21+
return splits[2] == "Canada"
22+
responseFromCanada = responseRDD.filter(filterResponseFromCanada)
2423

2524
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
2625
print("Number of bytes processed: {}".format(processedBytes.value))

advanced/broadcast/UkMakerSpaces.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

4-
def getPostPrefix(line: str):
5-
splits = Utils.COMMA_DELIMITER.split(line)
6-
postcode = splits[4]
7-
return None if not postcode else postcode.split(" ")[0]
8-
96
def loadPostCodeMap():
107
lines = open("in/uk-postcode.csv", "r").read().split("\n")
118
splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""]
129
return {splits[0]: splits[7] for splits in splitsForLines}
1310

11+
def getPostPrefix(line: str):
12+
splits = Utils.COMMA_DELIMITER.split(line)
13+
postcode = splits[4]
14+
return None if not postcode else postcode.split(" ")[0]
15+
1416
if __name__ == "__main__":
15-
sc = SparkContext("local", "UkMakerSpaces")
16-
sc.setLogLevel("ERROR")
17+
conf = SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]")
18+
sc = SparkContext(conf = conf)
1719

1820
postCodeMap = sc.broadcast(loadPostCodeMap())
1921

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

4-
def getPostPrefixes(line: str):
5-
postcode = Utils.COMMA_DELIMITER.split(line)[4]
6-
cleanedPostCode = postcode.replace("\\s+", "")
7-
return [cleanedPostCode[0:i] for i in range(0,len(cleanedPostCode)+1)]
8-
96
def loadPostCodeMap():
107
lines = open("in/uk-postcode.csv", "r").read().split("\n")
118
splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""]
129
return {splits[0]: splits[7] for splits in splitsForLines}
1310

11+
def getPostPrefix(line: str):
12+
splits = Utils.COMMA_DELIMITER.split(line)
13+
postcode = splits[4]
14+
return None if not postcode else postcode.split(" ")[0]
15+
1416
if __name__ == "__main__":
15-
sc = SparkContext("local", "UkMakerSpaces")
16-
sc.setLogLevel("ERROR")
17+
conf = SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]")
18+
sc = SparkContext(conf = conf)
1719
postCodeMap = loadPostCodeMap()
1820
makerSpaceRdd = sc.textFile("in/uk-makerspaces-identifiable-data.csv")
1921

2022
regions = makerSpaceRdd \
2123
.filter(lambda line: Utils.COMMA_DELIMITER.split(line)[0] != "Timestamp") \
22-
.map(lambda line: next((postCodeMap[prefix] for prefix in getPostPrefixes(line) \
23-
if prefix in postCodeMap), "Unknow"))
24+
.map(lambda line: postCodeMap[getPostPrefix(line)] \
25+
if getPostPrefix(line) in postCodeMap else "Unknow")
2426

2527
for region, count in regions.countByValue().items():
2628
print("{} : {}".format(region, count))

pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
5-
sc = SparkContext("local", "AverageHousePrice")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("AverageHousePrice").setMaster("local")
5+
sc = SparkContext(conf = conf)
76

87
lines = sc.textFile("in/RealEstate.csv")
98
cleanedLines = lines.filter(lambda line: "Bedrooms" not in line)
Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
4+
from pairRdd.aggregation.reducebykey.housePrice.AvgCount import AvgCount
25

36
if __name__ == "__main__":
4-
5-
sc = SparkContext("local", "avgHousePrice")
6-
sc.setLogLevel("ERROR")
7+
conf = SparkConf().setAppName("avgHousePrice").setMaster("local[3]")
8+
sc = SparkContext(conf = conf)
79

810
lines = sc.textFile("in/RealEstate.csv")
911
cleanedLines = lines.filter(lambda line: "Bedrooms" not in line)
1012

1113
housePricePairRdd = cleanedLines.map(lambda line: \
12-
(line.split(",")[3], (1, float(line.split(",")[2]))))
14+
(line.split(",")[3], AvgCount(1, float(line.split(",")[2]))))
1315

1416
housePriceTotal = housePricePairRdd \
15-
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
17+
.reduceByKey(lambda x, y: AvgCount(x.count + y.count, x.total + y.total))
1618

1719
print("housePriceTotal: ")
18-
for bedroom, total in housePriceTotal.collect():
19-
print("{} : {}".format(bedroom, total))
20+
for bedroom, avgCount in housePriceTotal.collect():
21+
print("{} : ({}, {})".format(bedroom, avgCount.count, avgCount.total))
2022

21-
housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount[1] / avgCount[0])
23+
housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount.total / avgCount.count)
2224
print("\nhousePriceAvg: ")
2325
for bedroom, avg in housePriceAvg.collect():
2426
print("{} : {}".format(bedroom, avg))

pairRdd/filter/AirportsNotInUsaSolution.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

46
if __name__ == "__main__":
57

6-
sc = SparkContext("local", "airports")
7-
sc.setLogLevel("ERROR")
8+
conf = SparkConf().setAppName("airports").setMaster("local[*]")
9+
sc = SparkContext(conf = conf)
810

911
airportsRDD = sc.textFile("in/airports.text")
1012

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

46
if __name__ == "__main__":
57

6-
sc = SparkContext("local", "airports")
7-
sc.setLogLevel("ERROR")
8+
conf = SparkConf().setAppName("airports").setMaster("local[*]")
9+
sc = SparkContext(conf = conf)
810

911
lines = sc.textFile("in/airports.text")
1012

@@ -15,4 +17,4 @@
1517
airportsByCountry = countryAndAirportNameAndPair.groupByKey()
1618

1719
for country, airportName in airportsByCountry.collectAsMap().items():
18-
print("{}: {}".format(country,list(airportName)))
20+
print("{}: {}".format(country, list(airportName)))

pairRdd/mapValues/AirportsUppercaseSolution.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

46
if __name__ == "__main__":
5-
6-
sc = SparkContext("local", "airports")
7-
sc.setLogLevel("ERROR")
7+
conf = SparkConf().setAppName("airports").setMaster("local[*]")
8+
sc = SparkContext(conf = conf)
89

910
airportsRDD = sc.textFile("in/airports.text")
1011

pairRdd/sort/AverageHousePriceSolution.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
import sys
2+
sys.path.insert(0, '.')
13
from pairRdd.aggregation.reducebykey.housePrice.AvgCount import AvgCount
2-
from pyspark import SparkContext
3-
4+
from pyspark import SparkContext, SparkConf
45

56
if __name__ == "__main__":
6-
7-
sc = SparkContext("local", "averageHousePriceSolution")
8-
sc.setLogLevel("ERROR")
7+
conf = SparkConf().setAppName("averageHousePriceSolution").setMaster("local[*]")
8+
sc = SparkContext(conf = conf)
99

1010
lines = sc.textFile("in/RealEstate.csv")
1111
cleanedLines = lines.filter(lambda line: "Bedrooms" not in line)

rdd/airports/AirportsByLatitudeSolution.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

46
def splitComma(line: str):
57
splits = Utils.COMMA_DELIMITER.split(line)
68
return "{}, {}".format(splits[1], splits[6])
79

810
if __name__ == "__main__":
9-
sc = SparkContext("local", "airports")
11+
conf = SparkConf().setAppName("airports").setMaster("local[*]")
12+
sc = SparkContext(conf = conf)
1013

1114
airports = sc.textFile("in/airports.text")
1215

rdd/airports/AirportsInUsaSolution.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
from pyspark import SparkContext
1+
import sys
2+
sys.path.insert(0, '.')
3+
from pyspark import SparkContext, SparkConf
24
from commons.Utils import Utils
35

46
def splitComma(line: str):
57
splits = Utils.COMMA_DELIMITER.split(line)
68
return "{}, {}".format(splits[1], splits[2])
79

810
if __name__ == "__main__":
9-
sc = SparkContext("local", "airports")
11+
conf = SparkConf().setAppName("airports").setMaster("local[*]")
12+
sc = SparkContext(conf = conf)
1013

1114
airports = sc.textFile("in/airports.text")
1215
airportsInUSA = airports.filter(lambda line : Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"")

rdd/count/CountExample.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
if __name__ == "__main__":
44
conf = SparkConf().setAppName("count").setMaster("local[*]")
55
sc = SparkContext(conf = conf)
6+
67
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"]
8+
79
wordRdd = sc.parallelize(inputWords)
810
print("Count: {}".format(wordRdd.count()))
11+
912
worldCountByValue = wordRdd.countByValue()
1013
print("CountByValue: ")
1114
for word, count in worldCountByValue.items():

sparkSql/HousePriceProblem.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,21 @@
44
Create a Spark program to read the house data from in/RealEstate.csv,
55
group by location, aggregate the average price per SQ Ft and sort by average price per SQ Ft.
66
7-
The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and
8-
around it. 
7+
The houses dataset contains a collection of recent real estate listings in 
8+
San Luis Obispo county and around it. 
99
1010
The dataset contains the following fields:
1111
1. MLS: Multiple listing service number for the house (unique ID).
12-
2. Location: city/town where the house is located. Most locations are in San Luis Obispo county and
13-
northern Santa Barbara county (Santa Maria­Orcutt, Lompoc, Guadelupe, Los Alamos), but there
14-
some out of area locations as well.
12+
2. Location: city/town where the house is located. Most locations are in 
13+
San Luis Obispo county and northern Santa Barbara county (Santa Maria­Orcutt, Lompoc, 
14+
Guadelupe, Los Alamos), but there some out of area locations as well.
1515
3. Price: the most recent listing price of the house (in dollars).
1616
4. Bedrooms: number of bedrooms.
1717
5. Bathrooms: number of bathrooms.
1818
6. Size: size of the house in square feet.
1919
7. Price/SQ.ft: price of the house per square foot.
20-
8. Status: type of sale. Thee types are represented in the dataset: Short Sale, Foreclosure and Regular.
20+
8. Status: type of sale. Thee types are represented in the dataset: Short Sale, 
21+
Foreclosure and Regular.
2122
2223
Each field is comma separated.
2324

sparkSql/HousePriceSolution.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
if __name__ == "__main__":
66

7-
session = SparkSession.builder.appName("HousePriceSolution").master("local").getOrCreate()
8-
session.sparkContext.setLogLevel("ERROR")
7+
session = SparkSession.builder.appName("HousePriceSolution").master("local[*]").getOrCreate()
8+
99
realEstate = session.read \
1010
.option("header","true") \
1111
.option("inferSchema", value=True) \

0 commit comments

Comments
 (0)