Skip to content

Commit f27836c

Browse files
authored
Merge pull request jleetutorial#4 from jleetutorial/pedromb-scala_to_python
Scala to Python - pairRdd folder
2 parents 76ba21d + ee71add commit f27836c

35 files changed

+312
-382
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
sc = SparkContext("local", "AverageHousePrice")
6+
sc.setLogLevel("ERROR")
7+
8+
lines = sc.textFile("in/RealEstate.csv")
9+
cleanedLines = lines.filter(lambda line: "Bedrooms" not in line)
10+
11+
housePricePairRdd = cleanedLines.map(lambda line: (line.split(",")[3], float(line.split(",")[2])))
12+
13+
createCombiner = lambda x: (1, x)
14+
mergeValue = lambda avgCount, x: (avgCount[0] + 1, avgCount[1] + x)
15+
mergeCombiners = lambda avgCountA, avgCountB: (avgCountA[0] + avgCountB[0], avgCountA[1] + avgCountB[1])
16+
17+
housePriceTotal = housePricePairRdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
18+
19+
housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount[1] / avgCount[0])
20+
for bedrooms, avgPrice in housePriceAvg.collect():
21+
print("{} : {}".format(bedrooms, avgPrice))

pairRdd/aggregation/combinebykey/AverageHousePriceSolution.scala

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
sc = SparkContext("local", "wordCounts")
6+
sc.setLogLevel("ERROR")
7+
8+
lines = sc.textFile("in/word_count.text")
9+
wordRdd = lines.flatMap(lambda line: line.split(" "))
10+
wordPairRdd = wordRdd.map(lambda word: (word, 1))
11+
12+
wordCounts = wordPairRdd.reduceByKey(lambda x, y: x + y)
13+
for word, count in wordCounts.collect():
14+
print("{} : {}".format(word, count))

pairRdd/aggregation/reducebykey/WordCount.scala

Lines changed: 0 additions & 20 deletions
This file was deleted.

pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceProblem.scala renamed to pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceProblem.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
package com.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice
1+
from pyspark import SparkContext
22

3-
object AverageHousePriceProblem {
3+
if __name__ == "__main__":
44

5-
def main(args: Array[String]) {
6-
7-
/* Create a Spark program to read the house data from in/RealEstate.csv,
8-
output the average price for houses with different number of bedrooms.
5+
'''
6+
Create a Spark program to read the house data from in/RealEstate.csv,
7+
output the average price for houses with different number of bedrooms.
98
109
The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and
1110
around it. 
@@ -31,8 +30,6 @@ def main(args: Array[String]) {
3130
(2, 325000)
3231
...
3332
34-
3, 1 and 2 mean the number of bedrooms. 325000 means the average price of houses with 3 bedrooms is 325000.
35-
*/
36-
}
33+
3, 1 and 2 mean the number of bedrooms. 325000 means the average price of houses with 3 bedrooms is 325000.
3734
38-
}
35+
'''
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
sc = SparkContext("local", "avgHousePrice")
6+
sc.setLogLevel("ERROR")
7+
8+
lines = sc.textFile("in/RealEstate.csv")
9+
cleanedLines = lines.filter(lambda line: "Bedrooms" not in line)
10+
11+
housePricePairRdd = cleanedLines.map(lambda line: \
12+
(line.split(",")[3], (1, float(line.split(",")[2]))))
13+
14+
housePriceTotal = housePricePairRdd \
15+
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
16+
17+
print("housePriceTotal: ")
18+
for bedroom, total in housePriceTotal.collect():
19+
print("{} : {}".format(bedroom, total))
20+
21+
housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount[1] / avgCount[0])
22+
print("\nhousePriceAvg: ")
23+
for bedroom, avg in housePriceAvg.collect():
24+
print("{} : {}".format(bedroom, avg))

pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.scala

Lines changed: 0 additions & 27 deletions
This file was deleted.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
class AvgCount():
2+
3+
def __init__(self, count: int, total: float):
4+
self.count = count
5+
self.total = total
6+
7+

pairRdd/aggregation/reducebykey/housePrice/AvgCount.scala

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
sc = SparkContext("local", "create")
6+
sc.setLogLevel("ERROR")
7+
8+
inputStrings = ["Lily 23", "Jack 29", "Mary 29", "James 8"]
9+
regularRDDs = sc.parallelize(inputStrings)
10+
11+
pairRDD = regularRDDs.map(lambda s: (s.split(" ")[0], s.split(" ")[1]))
12+
pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_regular_rdd")

pairRdd/create/PairRddFromRegularRdd.scala

Lines changed: 0 additions & 18 deletions
This file was deleted.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
sc = SparkContext("local", "create")
6+
sc.setLogLevel("ERROR")
7+
8+
tuples = [("Lily", 23), ("Jack", 29), ("Mary", 29), ("James", 8)]
9+
pairRDD = sc.parallelize(tuples)
10+
11+
pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_tuple_list")

pairRdd/create/PairRddFromTupleList.scala

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
'''
6+
Create a Spark program to read the airport data from in/airports.text;
7+
generate a pair RDD with airport name being the key and country name being the value.
8+
Then remove all the airports which are located in United States and output the pair RDD to out/airports_not_in_usa_pair_rdd.text
9+
10+
Each row of the input file contains the following columns:
11+
Airport ID, Name of airport, Main city served by airport, Country where airport is located,
12+
IATA/FAA code, ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format
13+
14+
Sample output:
15+
16+
("Kamloops", "Canada")
17+
("Wewak Intl", "Papua New Guinea")
18+
...
19+
20+
'''

pairRdd/filter/AirportsNotInUsaProblem.scala

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
if __name__ == "__main__":
5+
6+
sc = SparkContext("local", "airports")
7+
sc.setLogLevel("ERROR")
8+
9+
airportsRDD = sc.textFile("in/airports.text")
10+
11+
airportPairRDD = airportsRDD.map(lambda line: \
12+
(Utils.COMMA_DELIMITER.split(line)[1],
13+
Utils.COMMA_DELIMITER.split(line)[3]))
14+
airportsNotInUSA = airportPairRDD.filter(lambda keyValue: keyValue[1] != "\"United States\"")
15+
16+
airportsNotInUSA.saveAsTextFile("out/airports_not_in_usa_pair_rdd.text")

pairRdd/filter/AirportsNotInUsaSolution.scala

Lines changed: 0 additions & 21 deletions
This file was deleted.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
'''
6+
Create a Spark program to read the airport data from in/airports.text,
7+
output the the list of the names of the airports located in each country.
8+
9+
Each row of the input file contains the following columns:
10+
Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code,
11+
ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format
12+
13+
Sample output:
14+
15+
"Canada", ["Bagotville", "Montreal", "Coronation", ...]
16+
"Norway" : ["Vigra", "Andenes", "Alta", "Bomoen", "Bronnoy",..]
17+
"Papua New Guinea", ["Goroka", "Madang", ...]
18+
...
19+
20+
'''
21+
22+
23+

pairRdd/groupbykey/AirportsByCountryProblem.scala

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
if __name__ == "__main__":
5+
6+
sc = SparkContext("local", "airports")
7+
sc.setLogLevel("ERROR")
8+
9+
lines = sc.textFile("in/airports.text")
10+
11+
countryAndAirportNameAndPair = lines.map(lambda airport:\
12+
(Utils.COMMA_DELIMITER.split(airport)[3],
13+
Utils.COMMA_DELIMITER.split(airport)[1]))
14+
15+
airportsByCountry = countryAndAirportNameAndPair.groupByKey()
16+
17+
for country, airportName in airportsByCountry.collectAsMap().items():
18+
print("{}: {}".format(country,list(airportName)))

0 commit comments

Comments
 (0)