diff --git a/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py b/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py new file mode 100644 index 00000000..4885ccbe --- /dev/null +++ b/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py @@ -0,0 +1,21 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + sc = SparkContext("local", "AverageHousePrice") + sc.setLogLevel("ERROR") + + lines = sc.textFile("in/RealEstate.csv") + cleanedLines = lines.filter(lambda line: "Bedrooms" not in line) + + housePricePairRdd = cleanedLines.map(lambda line: (line.split(",")[3], float(line.split(",")[2]))) + + createCombiner = lambda x: (1, x) + mergeValue = lambda avgCount, x: (avgCount[0] + 1, avgCount[1] + x) + mergeCombiners = lambda avgCountA, avgCountB: (avgCountA[0] + avgCountB[0], avgCountA[1] + avgCountB[1]) + + housePriceTotal = housePricePairRdd.combineByKey(createCombiner, mergeValue, mergeCombiners) + + housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount[1] / avgCount[0]) + for bedrooms, avgPrice in housePriceAvg.collect(): + print("{} : {}".format(bedrooms, avgPrice)) diff --git a/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.scala b/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.scala deleted file mode 100644 index e744546f..00000000 --- a/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.sparkTutorial.pairRdd.aggregation.combinebykey - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object AverageHousePriceSolution { - - def main(args: Array[String]) { - - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("wordCounts").setMaster("local[3]") - val sc = new SparkContext(conf) - - val lines = sc.textFile("in/RealEstate.csv") - val cleanedLines = lines.filter(line => !line.contains("Bedrooms")) - - val housePricePairRdd = cleanedLines.map(line => (line.split(",")(3), line.split(",")(2).toDouble)) - - val createCombiner = (x: Double) => (1, x) - val mergeValue = (avgCount: AvgCount, x: Double) => (avgCount._1 + 1, avgCount._2 + x) - val mergeCombiners = (avgCountA: AvgCount, avgCountB: AvgCount) => (avgCountA._1 + avgCountB._1, avgCountA._2 + avgCountB._2) - - val housePriceTotal = housePricePairRdd.combineByKey(createCombiner, mergeValue, mergeCombiners) - - val housePriceAvg = housePriceTotal.mapValues(avgCount => avgCount._2 / avgCount._1) - for ((bedrooms, avgPrice) <- housePriceAvg.collect()) println(bedrooms + " : " + avgPrice) - } - - type AvgCount = (Int, Double) -} diff --git a/pairRdd/aggregation/reducebykey/WordCount.py b/pairRdd/aggregation/reducebykey/WordCount.py new file mode 100644 index 00000000..3a00f380 --- /dev/null +++ b/pairRdd/aggregation/reducebykey/WordCount.py @@ -0,0 +1,14 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + sc = SparkContext("local", "wordCounts") + sc.setLogLevel("ERROR") + + lines = sc.textFile("in/word_count.text") + wordRdd = lines.flatMap(lambda line: line.split(" ")) + wordPairRdd = wordRdd.map(lambda word: (word, 1)) + + wordCounts = wordPairRdd.reduceByKey(lambda x, y: x + y) + for word, count in wordCounts.collect(): + print("{} : {}".format(word, count)) diff --git a/pairRdd/aggregation/reducebykey/WordCount.scala b/pairRdd/aggregation/reducebykey/WordCount.scala deleted file mode 100644 index 89d51803..00000000 --- a/pairRdd/aggregation/reducebykey/WordCount.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.sparkTutorial.pairRdd.aggregation.reducebykey - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object WordCount { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("wordCounts").setMaster("local[3]") - val sc = new SparkContext(conf) - - val lines = sc.textFile("in/word_count.text") - val wordRdd = lines.flatMap(line => line.split(" ")) - val wordPairRdd = wordRdd.map(word => (word, 1)) - - val wordCounts = wordPairRdd.reduceByKey((x, y) => x + y) - for ((word, count) <- wordCounts.collect()) println(word + " : " + count) - } -} diff --git a/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceProblem.scala b/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceProblem.py similarity index 72% rename from pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceProblem.scala rename to pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceProblem.py index 1d433a6b..caf05e89 100644 --- a/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceProblem.scala +++ b/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceProblem.py @@ -1,11 +1,10 @@ -package com.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice +from pyspark import SparkContext -object AverageHousePriceProblem { +if __name__ == "__main__": - def main(args: Array[String]) { - - /* Create a Spark program to read the house data from in/RealEstate.csv, - output the average price for houses with different number of bedrooms. + ''' + Create a Spark program to read the house data from in/RealEstate.csv, + output the average price for houses with different number of bedrooms. The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and around it.  @@ -31,8 +30,6 @@ def main(args: Array[String]) { (2, 325000) ... - 3, 1 and 2 mean the number of bedrooms. 325000 means the average price of houses with 3 bedrooms is 325000. - */ - } + 3, 1 and 2 mean the number of bedrooms. 325000 means the average price of houses with 3 bedrooms is 325000. -} + ''' diff --git a/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.py b/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.py new file mode 100644 index 00000000..acb633e2 --- /dev/null +++ b/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.py @@ -0,0 +1,24 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + sc = SparkContext("local", "avgHousePrice") + sc.setLogLevel("ERROR") + + lines = sc.textFile("in/RealEstate.csv") + cleanedLines = lines.filter(lambda line: "Bedrooms" not in line) + + housePricePairRdd = cleanedLines.map(lambda line: \ + (line.split(",")[3], (1, float(line.split(",")[2])))) + + housePriceTotal = housePricePairRdd \ + .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) + + print("housePriceTotal: ") + for bedroom, total in housePriceTotal.collect(): + print("{} : {}".format(bedroom, total)) + + housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount[1] / avgCount[0]) + print("\nhousePriceAvg: ") + for bedroom, avg in housePriceAvg.collect(): + print("{} : {}".format(bedroom, avg)) diff --git a/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.scala b/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.scala deleted file mode 100644 index 29cf8b38..00000000 --- a/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.scala +++ /dev/null @@ -1,27 +0,0 @@ -package com.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object AverageHousePriceSolution { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("avgHousePrice").setMaster("local[3]") - val sc = new SparkContext(conf) - - val lines = sc.textFile("in/RealEstate.csv") - val cleanedLines = lines.filter(line => !line.contains("Bedrooms")) - - val housePricePairRdd = cleanedLines.map(line => (line.split(",")(3), (1, line.split(",")(2).toDouble))) - - val housePriceTotal = housePricePairRdd.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) - - println("housePriceTotal: ") - for ((bedroom, total) <- housePriceTotal.collect()) println(bedroom + " : " + total) - - val housePriceAvg = housePriceTotal.mapValues(avgCount => avgCount._2 / avgCount._1) - println("housePriceAvg: ") - for ((bedroom, avg) <- housePriceAvg.collect()) println(bedroom + " : " + avg) - } -} diff --git a/pairRdd/aggregation/reducebykey/housePrice/AvgCount.py b/pairRdd/aggregation/reducebykey/housePrice/AvgCount.py new file mode 100644 index 00000000..37a59ede --- /dev/null +++ b/pairRdd/aggregation/reducebykey/housePrice/AvgCount.py @@ -0,0 +1,7 @@ +class AvgCount(): + + def __init__(self, count: int, total: float): + self.count = count + self.total = total + + diff --git a/pairRdd/aggregation/reducebykey/housePrice/AvgCount.scala b/pairRdd/aggregation/reducebykey/housePrice/AvgCount.scala deleted file mode 100644 index 8eb0ea9a..00000000 --- a/pairRdd/aggregation/reducebykey/housePrice/AvgCount.scala +++ /dev/null @@ -1,4 +0,0 @@ -package com.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice - -case class AvgCount(count: Int, total: Double) - diff --git a/pairRdd/create/PairRddFromRegularRdd.py b/pairRdd/create/PairRddFromRegularRdd.py new file mode 100644 index 00000000..bfd6f187 --- /dev/null +++ b/pairRdd/create/PairRddFromRegularRdd.py @@ -0,0 +1,12 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + sc = SparkContext("local", "create") + sc.setLogLevel("ERROR") + + inputStrings = ["Lily 23", "Jack 29", "Mary 29", "James 8"] + regularRDDs = sc.parallelize(inputStrings) + + pairRDD = regularRDDs.map(lambda s: (s.split(" ")[0], s.split(" ")[1])) + pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_regular_rdd") diff --git a/pairRdd/create/PairRddFromRegularRdd.scala b/pairRdd/create/PairRddFromRegularRdd.scala deleted file mode 100644 index 5fa8ec05..00000000 --- a/pairRdd/create/PairRddFromRegularRdd.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.sparkTutorial.pairRdd.create - -import org.apache.spark.{SparkConf, SparkContext} - -object PairRddFromRegularRdd { - - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("create").setMaster("local[1]") - val sc = new SparkContext(conf) - - val inputStrings = List("Lily 23", "Jack 29", "Mary 29", "James 8") - val regularRDDs = sc.parallelize(inputStrings) - - val pairRDD = regularRDDs.map(s => (s.split(" ")(0), s.split(" ")(1))) - pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_regular_rdd") - } -} diff --git a/pairRdd/create/PairRddFromTupleList.py b/pairRdd/create/PairRddFromTupleList.py new file mode 100644 index 00000000..c728d005 --- /dev/null +++ b/pairRdd/create/PairRddFromTupleList.py @@ -0,0 +1,11 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + sc = SparkContext("local", "create") + sc.setLogLevel("ERROR") + + tuples = [("Lily", 23), ("Jack", 29), ("Mary", 29), ("James", 8)] + pairRDD = sc.parallelize(tuples) + + pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_tuple_list") diff --git a/pairRdd/create/PairRddFromTupleList.scala b/pairRdd/create/PairRddFromTupleList.scala deleted file mode 100644 index 3b9a7632..00000000 --- a/pairRdd/create/PairRddFromTupleList.scala +++ /dev/null @@ -1,17 +0,0 @@ -package com.sparkTutorial.pairRdd.create - -import org.apache.spark.{SparkConf, SparkContext} - -object PairRddFromTupleList { - - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("create").setMaster("local[1]") - val sc = new SparkContext(conf) - - val tuple = List(("Lily", 23), ("Jack", 29), ("Mary", 29), ("James", 8)) - val pairRDD = sc.parallelize(tuple) - - pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_tuple_list") - } -} diff --git a/pairRdd/filter/AirportsNotInUsaProblem.py b/pairRdd/filter/AirportsNotInUsaProblem.py new file mode 100644 index 00000000..d0e2da3a --- /dev/null +++ b/pairRdd/filter/AirportsNotInUsaProblem.py @@ -0,0 +1,20 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + ''' + Create a Spark program to read the airport data from in/airports.text; + generate a pair RDD with airport name being the key and country name being the value. + Then remove all the airports which are located in United States and output the pair RDD to out/airports_not_in_usa_pair_rdd.text + + Each row of the input file contains the following columns: + Airport ID, Name of airport, Main city served by airport, Country where airport is located, + IATA/FAA code, ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format + + Sample output: + + ("Kamloops", "Canada") + ("Wewak Intl", "Papua New Guinea") + ... + + ''' diff --git a/pairRdd/filter/AirportsNotInUsaProblem.scala b/pairRdd/filter/AirportsNotInUsaProblem.scala deleted file mode 100644 index b1613b53..00000000 --- a/pairRdd/filter/AirportsNotInUsaProblem.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.sparkTutorial.pairRdd.filter - -object AirportsNotInUsaProblem { - - def main(args: Array[String]) { - - /* Create a Spark program to read the airport data from in/airports.text; - generate a pair RDD with airport name being the key and country name being the value. - Then remove all the airports which are located in United States and output the pair RDD to out/airports_not_in_usa_pair_rdd.text - - Each row of the input file contains the following columns: - Airport ID, Name of airport, Main city served by airport, Country where airport is located, - IATA/FAA code, ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format - - Sample output: - - ("Kamloops", "Canada") - ("Wewak Intl", "Papua New Guinea") - ... - */ - } -} diff --git a/pairRdd/filter/AirportsNotInUsaSolution.py b/pairRdd/filter/AirportsNotInUsaSolution.py new file mode 100644 index 00000000..fd9c2fe9 --- /dev/null +++ b/pairRdd/filter/AirportsNotInUsaSolution.py @@ -0,0 +1,16 @@ +from pyspark import SparkContext +from commons.Utils import Utils + +if __name__ == "__main__": + + sc = SparkContext("local", "airports") + sc.setLogLevel("ERROR") + + airportsRDD = sc.textFile("in/airports.text") + + airportPairRDD = airportsRDD.map(lambda line: \ + (Utils.COMMA_DELIMITER.split(line)[1], + Utils.COMMA_DELIMITER.split(line)[3])) + airportsNotInUSA = airportPairRDD.filter(lambda keyValue: keyValue[1] != "\"United States\"") + + airportsNotInUSA.saveAsTextFile("out/airports_not_in_usa_pair_rdd.text") diff --git a/pairRdd/filter/AirportsNotInUsaSolution.scala b/pairRdd/filter/AirportsNotInUsaSolution.scala deleted file mode 100644 index b014ffec..00000000 --- a/pairRdd/filter/AirportsNotInUsaSolution.scala +++ /dev/null @@ -1,21 +0,0 @@ -package com.sparkTutorial.pairRdd.filter - -import com.sparkTutorial.commons.Utils -import org.apache.spark.{SparkConf, SparkContext} - -object AirportsNotInUsaSolution { - - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("airports").setMaster("local") - val sc = new SparkContext(conf) - - val airportsRDD = sc.textFile("in/airports.text") - - val airportPairRDD = airportsRDD.map(line => (line.split(Utils.COMMA_DELIMITER)(1), - line.split(Utils.COMMA_DELIMITER)(3))) - val airportsNotInUSA = airportPairRDD.filter(keyValue => keyValue._2 != "\"United States\"") - - airportsNotInUSA.saveAsTextFile("out/airports_not_in_usa_pair_rdd.text") - } -} diff --git a/pairRdd/groupbykey/AirportsByCountryProblem.py b/pairRdd/groupbykey/AirportsByCountryProblem.py new file mode 100644 index 00000000..6e4b1e24 --- /dev/null +++ b/pairRdd/groupbykey/AirportsByCountryProblem.py @@ -0,0 +1,23 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + ''' + Create a Spark program to read the airport data from in/airports.text, + output the the list of the names of the airports located in each country. + + Each row of the input file contains the following columns: + Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, + ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format + + Sample output: + + "Canada", ["Bagotville", "Montreal", "Coronation", ...] + "Norway" : ["Vigra", "Andenes", "Alta", "Bomoen", "Bronnoy",..] + "Papua New Guinea", ["Goroka", "Madang", ...] + ... + + ''' + + + \ No newline at end of file diff --git a/pairRdd/groupbykey/AirportsByCountryProblem.scala b/pairRdd/groupbykey/AirportsByCountryProblem.scala deleted file mode 100644 index 5bfd7f3d..00000000 --- a/pairRdd/groupbykey/AirportsByCountryProblem.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.sparkTutorial.pairRdd.groupbykey - -object AirportsByCountryProblem { - - def main(args: Array[String]) { - - /* Create a Spark program to read the airport data from in/airports.text, - output the the list of the names of the airports located in each country. - - Each row of the input file contains the following columns: - Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, - ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format - - Sample output: - - "Canada", List("Bagotville", "Montreal", "Coronation", ...) - "Norway" : List("Vigra", "Andenes", "Alta", "Bomoen", "Bronnoy",..) - "Papua New Guinea", List("Goroka", "Madang", ...) - ... - */ - } -} diff --git a/pairRdd/groupbykey/AirportsByCountrySolution.py b/pairRdd/groupbykey/AirportsByCountrySolution.py new file mode 100644 index 00000000..0cc7017e --- /dev/null +++ b/pairRdd/groupbykey/AirportsByCountrySolution.py @@ -0,0 +1,18 @@ +from pyspark import SparkContext +from commons.Utils import Utils + +if __name__ == "__main__": + + sc = SparkContext("local", "airports") + sc.setLogLevel("ERROR") + + lines = sc.textFile("in/airports.text") + + countryAndAirportNameAndPair = lines.map(lambda airport:\ + (Utils.COMMA_DELIMITER.split(airport)[3], + Utils.COMMA_DELIMITER.split(airport)[1])) + + airportsByCountry = countryAndAirportNameAndPair.groupByKey() + + for country, airportName in airportsByCountry.collectAsMap().items(): + print("{}: {}".format(country,list(airportName))) diff --git a/pairRdd/groupbykey/AirportsByCountrySolution.scala b/pairRdd/groupbykey/AirportsByCountrySolution.scala deleted file mode 100644 index 09aa25fa..00000000 --- a/pairRdd/groupbykey/AirportsByCountrySolution.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.sparkTutorial.pairRdd.groupbykey - -import com.sparkTutorial.commons.Utils -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object AirportsByCountrySolution { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("airports").setMaster("local[*]") - val sc = new SparkContext(conf) - - val lines = sc.textFile("in/airports.text") - - val countryAndAirportNameAndPair = lines.map(airport => (airport.split(Utils.COMMA_DELIMITER)(3), - airport.split(Utils.COMMA_DELIMITER)(1))) - - val airportsByCountry = countryAndAirportNameAndPair.groupByKey() - - for ((country, airportName) <- airportsByCountry.collectAsMap()) println(country + ": " + airportName.toList) - } -} diff --git a/pairRdd/groupbykey/GroupByKeyVsReduceByKey.py b/pairRdd/groupbykey/GroupByKeyVsReduceByKey.py new file mode 100644 index 00000000..99eb96df --- /dev/null +++ b/pairRdd/groupbykey/GroupByKeyVsReduceByKey.py @@ -0,0 +1,18 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + sc = SparkContext("local", "GroupByKeyVsReduceByKey") + sc.setLogLevel("ERROR") + + words = ["one", "two", "two", "three", "three", "three"] + wordsPairRdd = sc.parallelize(words).map(lambda word: (word, 1)) + + wordCountsWithReduceByKey = wordsPairRdd.reduceByKey(lambda x, y: x + y).collect() + print("wordCountsWithReduceByKey: {}".format(list(wordCountsWithReduceByKey))) + + wordCountsWithGroupByKey = wordsPairRdd \ + .groupByKey() \ + .mapValues(lambda intIterable: len(intIterable)) \ + .collect() + print("wordCountsWithGroupByKey: {}".format(list(wordCountsWithGroupByKey))) diff --git a/pairRdd/groupbykey/GroupByKeyVsReduceByKey.scala b/pairRdd/groupbykey/GroupByKeyVsReduceByKey.scala deleted file mode 100644 index 8ae2f601..00000000 --- a/pairRdd/groupbykey/GroupByKeyVsReduceByKey.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.sparkTutorial.pairRdd.groupbykey - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object GroupByKeyVsReduceByKey { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("GroupByKeyVsReduceByKey").setMaster("local[*]") - val sc = new SparkContext(conf) - - val words = List("one", "two", "two", "three", "three", "three") - val wordsPairRdd = sc.parallelize(words).map(word => (word, 1)) - - val wordCountsWithReduceByKey = wordsPairRdd.reduceByKey((x, y) => x + y).collect() - println("wordCountsWithReduceByKey: " + wordCountsWithReduceByKey.toList) - - val wordCountsWithGroupByKey = wordsPairRdd.groupByKey().mapValues(intIterable => intIterable.size).collect() - println("wordCountsWithGroupByKey: " + wordCountsWithGroupByKey.toList) - } -} - diff --git a/pairRdd/join/JoinOperations.py b/pairRdd/join/JoinOperations.py new file mode 100644 index 00000000..250f9007 --- /dev/null +++ b/pairRdd/join/JoinOperations.py @@ -0,0 +1,21 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + sc = SparkContext("local", "JoinOperations") + sc.setLogLevel("ERROR") + + ages = sc.parallelize([("Tom", 29), ("John", 22)]) + addresses = sc.parallelize([("James", "USA"), ("John", "UK")]) + + join = ages.join(addresses) + join.saveAsTextFile("out/age_address_join.text") + + leftOuterJoin = ages.leftOuterJoin(addresses) + leftOuterJoin.saveAsTextFile("out/age_address_left_out_join.text") + + rightOuterJoin = ages.rightOuterJoin(addresses) + rightOuterJoin.saveAsTextFile("out/age_address_right_out_join.text") + + fullOuterJoin = ages.fullOuterJoin(addresses) + fullOuterJoin.saveAsTextFile("out/age_address_full_out_join.text") diff --git a/pairRdd/join/JoinOperations.scala b/pairRdd/join/JoinOperations.scala deleted file mode 100644 index 549a5745..00000000 --- a/pairRdd/join/JoinOperations.scala +++ /dev/null @@ -1,27 +0,0 @@ -package com.sparkTutorial.pairRdd.join - -import org.apache.spark.{SparkConf, SparkContext} - -object JoinOperations { - - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("JoinOperations").setMaster("local[1]") - val sc = new SparkContext(conf) - - val ages = sc.parallelize(List(("Tom", 29),("John", 22))) - val addresses = sc.parallelize(List(("James", "USA"), ("John", "UK"))) - - val join = ages.join(addresses) - join.saveAsTextFile("out/age_address_join.text") - - val leftOuterJoin = ages.leftOuterJoin(addresses) - leftOuterJoin.saveAsTextFile("out/age_address_left_out_join.text") - - val rightOuterJoin = ages.rightOuterJoin(addresses) - rightOuterJoin.saveAsTextFile("out/age_address_right_out_join.text") - - val fullOuterJoin = ages.fullOuterJoin(addresses) - fullOuterJoin.saveAsTextFile("out/age_address_full_out_join.text") - } -} diff --git a/pairRdd/mapValues/AirportsUppercaseProblem.py b/pairRdd/mapValues/AirportsUppercaseProblem.py new file mode 100644 index 00000000..124b37f0 --- /dev/null +++ b/pairRdd/mapValues/AirportsUppercaseProblem.py @@ -0,0 +1,24 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + ''' + Create a Spark program to read the airport data from in/airports.text, generate a pair RDD with airport name + being the key and country name being the value. Then convert the country name to uppercase and + output the pair RDD to out/airports_uppercase.text + + Each row of the input file contains the following columns: + + Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, + ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format + + Sample output: + + ("Kamloops", "CANADA") + ("Wewak Intl", "PAPUA NEW GUINEA") + ... + + ''' + + + \ No newline at end of file diff --git a/pairRdd/mapValues/AirportsUppercaseProblem.scala b/pairRdd/mapValues/AirportsUppercaseProblem.scala deleted file mode 100644 index ac9dac2f..00000000 --- a/pairRdd/mapValues/AirportsUppercaseProblem.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.sparkTutorial.pairRdd.mapValues - -object AirportsUppercaseProblem { - - def main(args: Array[String]) { - - /* Create a Spark program to read the airport data from in/airports.text, generate a pair RDD with airport name - being the key and country name being the value. Then convert the country name to uppercase and - output the pair RDD to out/airports_uppercase.text - - Each row of the input file contains the following columns: - - Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, - ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format - - Sample output: - - ("Kamloops", "CANADA") - ("Wewak Intl", "PAPUA NEW GUINEA") - ... - */ - } -} diff --git a/pairRdd/mapValues/AirportsUppercaseSolution.py b/pairRdd/mapValues/AirportsUppercaseSolution.py new file mode 100644 index 00000000..3c9fa201 --- /dev/null +++ b/pairRdd/mapValues/AirportsUppercaseSolution.py @@ -0,0 +1,17 @@ +from pyspark import SparkContext +from commons.Utils import Utils + +if __name__ == "__main__": + + sc = SparkContext("local", "airports") + sc.setLogLevel("ERROR") + + airportsRDD = sc.textFile("in/airports.text") + + airportPairRDD = airportsRDD.map(lambda line: \ + (Utils.COMMA_DELIMITER.split(line)[1], \ + Utils.COMMA_DELIMITER.split(line)[3])) + + upperCase = airportPairRDD.mapValues(lambda countryName: countryName.upper()) + + upperCase.saveAsTextFile("out/airports_uppercase.text") diff --git a/pairRdd/mapValues/AirportsUppercaseSolution.scala b/pairRdd/mapValues/AirportsUppercaseSolution.scala deleted file mode 100644 index 093fb193..00000000 --- a/pairRdd/mapValues/AirportsUppercaseSolution.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.sparkTutorial.pairRdd.mapValues - -import com.sparkTutorial.commons.Utils -import org.apache.spark.{SparkConf, SparkContext} - -object AirportsUppercaseSolution { - - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("airports").setMaster("local") - val sc = new SparkContext(conf) - - val airportsRDD = sc.textFile("in/airports.text") - - val airportPairRDD = airportsRDD.map((line: String) => (line.split(Utils.COMMA_DELIMITER)(1), - line.split(Utils.COMMA_DELIMITER)(3))) - - val upperCase = airportPairRDD.mapValues(countryName => countryName.toUpperCase) - - upperCase.saveAsTextFile("out/airports_uppercase.text") - } -} diff --git a/pairRdd/sort/AverageHousePriceSolution.py b/pairRdd/sort/AverageHousePriceSolution.py new file mode 100644 index 00000000..4306fdbc --- /dev/null +++ b/pairRdd/sort/AverageHousePriceSolution.py @@ -0,0 +1,23 @@ +from pairRdd.aggregation.reducebykey.housePrice.AvgCount import AvgCount +from pyspark import SparkContext + + +if __name__ == "__main__": + + sc = SparkContext("local", "averageHousePriceSolution") + sc.setLogLevel("ERROR") + + lines = sc.textFile("in/RealEstate.csv") + cleanedLines = lines.filter(lambda line: "Bedrooms" not in line) + housePricePairRdd = cleanedLines.map(lambda line: \ + ((int(float(line.split(",")[3]))), AvgCount(1, float(line.split(",")[2])))) + + housePriceTotal = housePricePairRdd.reduceByKey(lambda x, y: \ + AvgCount(x.count + y.count, x.total + y.total)) + + housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount.total / avgCount.count) + + sortedHousePriceAvg = housePriceAvg.sortByKey() + + for bedrooms, avgPrice in sortedHousePriceAvg.collect(): + print("{} : {}".format(bedrooms, avgPrice)) diff --git a/pairRdd/sort/AverageHousePriceSolution.scala b/pairRdd/sort/AverageHousePriceSolution.scala deleted file mode 100644 index 56eb591b..00000000 --- a/pairRdd/sort/AverageHousePriceSolution.scala +++ /dev/null @@ -1,28 +0,0 @@ -package com.sparkTutorial.pairRdd.sort - -import com.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice.AvgCount -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object AverageHousePriceSolution { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("averageHousePriceSolution").setMaster("local[3]") - val sc = new SparkContext(conf) - - val lines = sc.textFile("in/RealEstate.csv") - val cleanedLines = lines.filter(line => !line.contains("Bedrooms")) - val housePricePairRdd = cleanedLines.map( - line => (line.split(",")(3).toInt, AvgCount(1, line.split(",")(2).toDouble))) - - val housePriceTotal = housePricePairRdd.reduceByKey((x, y) => AvgCount(x.count + y.count, x.total + y.total)) - - val housePriceAvg = housePriceTotal.mapValues(avgCount => avgCount.total / avgCount.count) - - val sortedHousePriceAvg = housePriceAvg.sortByKey() - - for ((bedrooms, avgPrice) <- sortedHousePriceAvg.collect()) println(bedrooms + " : " + avgPrice) - } - -} diff --git a/pairRdd/sort/SortedWordCountProblem.py b/pairRdd/sort/SortedWordCountProblem.py new file mode 100644 index 00000000..bc7817b4 --- /dev/null +++ b/pairRdd/sort/SortedWordCountProblem.py @@ -0,0 +1,16 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + ''' + Create a Spark program to read the an article from in/word_count.text, + output the number of occurrence of each word in descending order. + + Sample output: + + apple : 200 + shoes : 193 + bag : 176 + ... + + ''' diff --git a/pairRdd/sort/SortedWordCountProblem.scala b/pairRdd/sort/SortedWordCountProblem.scala deleted file mode 100644 index 19cd5569..00000000 --- a/pairRdd/sort/SortedWordCountProblem.scala +++ /dev/null @@ -1,17 +0,0 @@ -package com.sparkTutorial.pairRdd.sort - - -object SortedWordCountProblem { - - /* Create a Spark program to read the an article from in/word_count.text, - output the number of occurrence of each word in descending order. - - Sample output: - - apple : 200 - shoes : 193 - bag : 176 - ... - */ -} - diff --git a/pairRdd/sort/SortedWordCountSolution.py b/pairRdd/sort/SortedWordCountSolution.py new file mode 100644 index 00000000..398c57ae --- /dev/null +++ b/pairRdd/sort/SortedWordCountSolution.py @@ -0,0 +1,20 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + sc = SparkContext("local", "wordCounts") + sc.setLogLevel("ERROR") + lines = sc.textFile("in/word_count.text") + wordRdd = lines.flatMap(lambda line: line.split(" ")) + + wordPairRdd = wordRdd.map(lambda word: (word, 1)) + wordToCountPairs = wordPairRdd.reduceByKey(lambda x, y: x + y) + + countToWordParis = wordToCountPairs.map(lambda wordToCount: (wordToCount[1], wordToCount[0])) + + sortedCountToWordParis = countToWordParis.sortByKey(ascending=False) + + sortedWordToCountPairs = sortedCountToWordParis.map(lambda countToWord: (countToWord[1], countToWord[0])) + + for word, count in sortedWordToCountPairs.collect(): + print("{} : {}".format(word, count)) diff --git a/pairRdd/sort/SortedWordCountSolution.scala b/pairRdd/sort/SortedWordCountSolution.scala deleted file mode 100644 index 38e8abcb..00000000 --- a/pairRdd/sort/SortedWordCountSolution.scala +++ /dev/null @@ -1,28 +0,0 @@ -package com.sparkTutorial.pairRdd.sort - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object SortedWordCountSolution { - - def main(args: Array[String]) { - - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("wordCounts").setMaster("local[3]") - val sc = new SparkContext(conf) - - val lines = sc.textFile("in/word_count.text") - val wordRdd = lines.flatMap(line => line.split(" ")) - - val wordPairRdd = wordRdd.map(word => (word, 1)) - val wordToCountPairs = wordPairRdd.reduceByKey((x, y) => x + y) - - val countToWordParis = wordToCountPairs.map(wordToCount => (wordToCount._2, wordToCount._1)) - - val sortedCountToWordParis = countToWordParis.sortByKey(ascending = false) - - val sortedWordToCountPairs = sortedCountToWordParis.map(countToWord => (countToWord._2, countToWord._1)) - - for ((word, count) <- sortedWordToCountPairs.collect()) println(word + " : " + count) - } -}