Skip to content

Commit bd2d382

Browse files
author
James Lee
committed
improve sort examples
1 parent d119b95 commit bd2d382

File tree

3 files changed

+14
-34
lines changed

3 files changed

+14
-34
lines changed

src/main/java/com/sparkTutorial/pairRdd/sort/sortbykey/AverageHousePriceSolution.java renamed to src/main/java/com/sparkTutorial/pairRdd/sort/AverageHousePriceSolution.java

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sparkTutorial.pairRdd.sort.sortbykey;
1+
package com.sparkTutorial.pairRdd.sort;
22

33

44
import com.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice.AvgCount;
@@ -8,38 +8,26 @@
88
import org.apache.spark.api.java.JavaPairRDD;
99
import org.apache.spark.api.java.JavaRDD;
1010
import org.apache.spark.api.java.JavaSparkContext;
11-
import org.apache.spark.api.java.function.Function2;
12-
import org.apache.spark.api.java.function.PairFunction;
1311
import scala.Tuple2;
1412

15-
import java.util.Map;
16-
1713
public class AverageHousePriceSolution {
1814

1915
public static void main(String[] args) throws Exception {
20-
2116
Logger.getLogger("org").setLevel(Level.ERROR);
2217
SparkConf conf = new SparkConf().setAppName("wordCounts").setMaster("local[3]");
2318
JavaSparkContext sc = new JavaSparkContext(conf);
2419

2520
JavaRDD<String> lines = sc.textFile("in/RealEstate.csv");
26-
2721
JavaRDD<String> cleanedLines = lines.filter(line -> !line.contains("Bedrooms"));
2822

29-
JavaPairRDD<String, AvgCount> housePricePairRdd = cleanedLines.mapToPair(
30-
(PairFunction<String, String, AvgCount>) line ->
31-
new Tuple2<>(line.split(",")[3],
32-
new AvgCount(1, Double.parseDouble(line.split(",")[2]))));
33-
34-
JavaPairRDD<String, AvgCount> housePriceTotal = housePricePairRdd.reduceByKey(
35-
(Function2<AvgCount, AvgCount, AvgCount>) (x, y) ->
36-
new AvgCount(x.getCount() + y.getCount(), x.getTotal() + y.getTotal()));
37-
23+
JavaPairRDD<Integer, AvgCount> housePricePairRdd = cleanedLines.mapToPair(
24+
line -> new Tuple2<>(Integer.valueOf(line.split(",")[3]),
25+
new AvgCount(1, Double.parseDouble(line.split(",")[2]))));
3826

39-
JavaPairRDD<Integer, Double> housePriceAvg = housePriceTotal.mapToPair(
40-
(PairFunction<Tuple2<String, AvgCount>, Integer, Double>) total ->
41-
new Tuple2<>(Integer.valueOf(total._1()), total._2().getTotal()/total._2().getCount()));
27+
JavaPairRDD<Integer, AvgCount> housePriceTotal = housePricePairRdd.reduceByKey(
28+
(x, y) -> new AvgCount(x.getCount() + y.getCount(), x.getTotal() + y.getTotal()));
4229

30+
JavaPairRDD<Integer, Double> housePriceAvg = housePriceTotal.mapValues(avgCount -> avgCount.getTotal()/avgCount.getCount());
4331

4432
JavaPairRDD<Integer, Double> sortedHousePriceAvg = housePriceAvg.sortByKey();
4533

src/main/java/com/sparkTutorial/pairRdd/sort/sortbykey/SortedWorldCountProblem.java renamed to src/main/java/com/sparkTutorial/pairRdd/sort/SortedWorldCountProblem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sparkTutorial.pairRdd.sort.sortbykey;
1+
package com.sparkTutorial.pairRdd.sort;
22

33

44
public class SortedWorldCountProblem {
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sparkTutorial.pairRdd.sort.sortbykey;
1+
package com.sparkTutorial.pairRdd.sort;
22

33

44
import org.apache.log4j.Level;
@@ -7,8 +7,6 @@
77
import org.apache.spark.api.java.JavaPairRDD;
88
import org.apache.spark.api.java.JavaRDD;
99
import org.apache.spark.api.java.JavaSparkContext;
10-
import org.apache.spark.api.java.function.Function2;
11-
import org.apache.spark.api.java.function.PairFunction;
1210
import scala.Tuple2;
1311

1412
import java.util.Arrays;
@@ -24,24 +22,18 @@ public static void main(String[] args) throws Exception {
2422
JavaRDD<String> lines = sc.textFile("in/word_count.text");
2523
JavaRDD<String> wordRdd = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
2624

27-
JavaPairRDD<String, Integer> wordPairRdd = wordRdd.mapToPair(
28-
(PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1));
29-
30-
JavaPairRDD<String, Integer> wordToCountPairs = wordPairRdd.reduceByKey((Function2<Integer, Integer, Integer>) (x, y) -> x + y);
31-
32-
JavaPairRDD<Integer, String> countToWordParis = wordToCountPairs.mapToPair(
33-
(PairFunction<Tuple2<String, Integer>, Integer, String>) wordToCount -> new Tuple2<>(wordToCount._2(),
34-
wordToCount._1()));
25+
JavaPairRDD<String, Integer> wordPairRdd = wordRdd.mapToPair(word -> new Tuple2<>(word, 1));
26+
JavaPairRDD<String, Integer> wordToCountPairs = wordPairRdd.reduceByKey((x, y) -> x + y);
3527

28+
JavaPairRDD<Integer, String> countToWordParis = wordToCountPairs.mapToPair(wordToCount -> new Tuple2<>(wordToCount._2(),
29+
wordToCount._1()));
3630
JavaPairRDD<Integer, String> sortedCountToWordParis = countToWordParis.sortByKey(false);
3731

3832
JavaPairRDD<String, Integer> sortedWordToCountPairs = sortedCountToWordParis
39-
.mapToPair((PairFunction<Tuple2<Integer, String>, String, Integer>) countToWord -> new Tuple2<>(countToWord._2(),
40-
countToWord._1()));
33+
.mapToPair(countToWord -> new Tuple2<>(countToWord._2(), countToWord._1()));
4134

4235
for (Tuple2<String, Integer> wordToCount : sortedWordToCountPairs.collect()) {
4336
System.out.println(wordToCount._1() + " : " + wordToCount._2());
44-
4537
}
4638
}
4739
}

0 commit comments

Comments
 (0)