Skip to content

Commit 9fe453e

Browse files
authored
Merge pull request jleetutorial#2 from jleetutorial/pedromb-scala_to_python
Scala to Python - sparkSql folder
2 parents 9d9066c + 3aeb5d8 commit 9fe453e

14 files changed

+175
-263
lines changed

commons/Utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import re
22

33
class Utils():
4-
5-
COMMA_DELIMITER = re.compile(''',(?=(?:[^'"]|'[^']*'|"[^"]*")*$)''')
4+
5+
COMMA_DELIMITER = re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''')

commons/Utils.scala

Lines changed: 0 additions & 6 deletions
This file was deleted.

sparkSql/HousePriceProblem.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
if __name__ == "__main__":
2+
3+
'''
4+
Create a Spark program to read the house data from in/RealEstate.csv,
5+
group by location, aggregate the average price per SQ Ft and sort by average price per SQ Ft.
6+
7+
The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and
8+
around it. 
9+
10+
The dataset contains the following fields:
11+
1. MLS: Multiple listing service number for the house (unique ID).
12+
2. Location: city/town where the house is located. Most locations are in San Luis Obispo county and
13+
northern Santa Barbara county (Santa Maria­Orcutt, Lompoc, Guadelupe, Los Alamos), but there
14+
some out of area locations as well.
15+
3. Price: the most recent listing price of the house (in dollars).
16+
4. Bedrooms: number of bedrooms.
17+
5. Bathrooms: number of bathrooms.
18+
6. Size: size of the house in square feet.
19+
7. Price/SQ.ft: price of the house per square foot.
20+
8. Status: type of sale. Thee types are represented in the dataset: Short Sale, Foreclosure and Regular.
21+
22+
Each field is comma separated.
23+
24+
Sample output:
25+
26+
+----------------+-----------------+
27+
| Location| avg(Price SQ Ft)|
28+
+----------------+-----------------+
29+
| Oceano| 95.0|
30+
| Bradley| 206.0|
31+
| San Luis Obispo| 359.0|
32+
| Santa Ynez| 491.4|
33+
| Cayucos| 887.0|
34+
|................|.................|
35+
|................|.................|
36+
|................|.................|
37+
'''
38+

sparkSql/HousePriceProblem.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

sparkSql/HousePriceSolution.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from pyspark.sql import SparkSession
2+
3+
PRICE_SQ_FT = "Price SQ Ft"
4+
5+
if __name__ == "__main__":
6+
7+
session = SparkSession.builder.appName("HousePriceSolution").master("local").getOrCreate()
8+
session.sparkContext.setLogLevel("ERROR")
9+
realEstate = session.read \
10+
.option("header","true") \
11+
.option("inferSchema", value=True) \
12+
.csv("in/RealEstate.csv")
13+
14+
realEstate.groupBy("Location") \
15+
.avg(PRICE_SQ_FT) \
16+
.orderBy("avg(Price SQ FT)") \
17+
.show()

sparkSql/HousePriceSolution.scala

Lines changed: 0 additions & 25 deletions
This file was deleted.

sparkSql/RddDataframeConversion.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from pyspark.sql import SparkSession
2+
from commons.Utils import Utils
3+
4+
def getColNames(line: str):
5+
splits = Utils.COMMA_DELIMITER.split(line)
6+
return [splits[2], splits[6], splits[9], splits[14]]
7+
8+
def mapResponseRdd(line: str):
9+
splits = Utils.COMMA_DELIMITER.split(line)
10+
double1 = None if not splits[6] else float(splits[6])
11+
double2 = None if not splits[14] else float(splits[14])
12+
return splits[2], double1, splits[9], double2
13+
14+
if __name__ == "__main__":
15+
16+
session = SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate()
17+
sc = session.sparkContext
18+
sc.setLogLevel("ERROR")
19+
20+
lines = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
21+
22+
colNames = lines \
23+
.filter(lambda line: Utils.COMMA_DELIMITER.split(line)[2] == "country") \
24+
.map(getColNames)
25+
26+
responseRDD = lines \
27+
.filter(lambda line: not Utils.COMMA_DELIMITER.split(line)[2] == "country") \
28+
.map(mapResponseRdd)
29+
30+
responseDataFrame = responseRDD.toDF(colNames.collect()[0])
31+
32+
print("=== Print out schema ===")
33+
responseDataFrame.printSchema()
34+
35+
print("=== Print 20 records of responses table ===")
36+
responseDataFrame.show(20)
37+
38+
for response in responseDataFrame.rdd.collect():
39+
print(response)

sparkSql/RddDatasetConversion.scala

Lines changed: 0 additions & 42 deletions
This file was deleted.

sparkSql/Response.scala

Lines changed: 0 additions & 3 deletions
This file was deleted.

sparkSql/StackOverFlowSurvey.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from pyspark.sql import SparkSession
2+
3+
AGE_MIDPOINT = "age_midpoint"
4+
SALARY_MIDPOINT = "salary_midpoint"
5+
SALARY_MIDPOINT_BUCKET = "salary_midpoint_bucket"
6+
7+
if __name__ == "__main__":
8+
9+
session = SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate()
10+
session.sparkContext.setLogLevel("ERROR")
11+
dataFrameReader = session.read
12+
13+
responses = dataFrameReader \
14+
.option("header", "true") \
15+
.option("inferSchema", value = True) \
16+
.csv("in/2016-stack-overflow-survey-responses.csv")
17+
18+
print("=== Print out schema ===")
19+
responses.printSchema()
20+
21+
responseWithSelectedColumns = responses.select("country", "occupation", AGE_MIDPOINT, SALARY_MIDPOINT)
22+
23+
print("=== Print the selected columns of the table ===")
24+
responseWithSelectedColumns.show()
25+
26+
print("=== Print records where the response is from Afghanistan ===")
27+
responseWithSelectedColumns.filter(responseWithSelectedColumns["country"] == "Afghanistan").show()
28+
29+
print("=== Print the count of occupations ===")
30+
groupedDataset = responseWithSelectedColumns.groupBy("occupation")
31+
groupedDataset.count().show()
32+
33+
print("=== Print records with average mid age less than 20 ===")
34+
responseWithSelectedColumns.filter(responseWithSelectedColumns[AGE_MIDPOINT] < 20).show()
35+
36+
print("=== Print the result by salary middle point in descending order ===")
37+
responseWithSelectedColumns.orderBy(responseWithSelectedColumns[SALARY_MIDPOINT], ascending=False).show()
38+
39+
print("=== Group by country and aggregate by average salary middle point ===")
40+
datasetGroupByCountry = responseWithSelectedColumns.groupBy("country")
41+
datasetGroupByCountry.avg(SALARY_MIDPOINT).show()
42+
43+
responseWithSalaryBucket = responses.withColumn(SALARY_MIDPOINT_BUCKET,
44+
((responses[SALARY_MIDPOINT]/20000).cast("integer")*20000))
45+
46+
print("=== With salary bucket column ===")
47+
responseWithSalaryBucket.select(SALARY_MIDPOINT, SALARY_MIDPOINT_BUCKET).show()
48+
49+
print("=== Group by salary bucket ===")
50+
responseWithSalaryBucket.groupBy(SALARY_MIDPOINT_BUCKET).count().orderBy(SALARY_MIDPOINT_BUCKET).show()
51+
52+
session.stop()

sparkSql/StackOverFlowSurvey.scala

Lines changed: 0 additions & 60 deletions
This file was deleted.

sparkSql/TypedDataset.scala

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)