Skip to content

Commit 6a7508d

Browse files
author
James Lee
committed
improve AverageHousePriceSolution
1 parent 9eaec29 commit 6a7508d

File tree

1 file changed

+4
-15
lines changed

1 file changed

+4
-15
lines changed

src/main/java/com/sparkTutorial/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,48 +7,37 @@
77
import org.apache.spark.api.java.JavaPairRDD;
88
import org.apache.spark.api.java.JavaRDD;
99
import org.apache.spark.api.java.JavaSparkContext;
10-
import org.apache.spark.api.java.function.Function2;
11-
import org.apache.spark.api.java.function.PairFunction;
1210
import scala.Tuple2;
1311

1412
import java.util.Map;
1513

1614
public class AverageHousePriceSolution {
1715

1816
public static void main(String[] args) throws Exception {
19-
2017
Logger.getLogger("org").setLevel(Level.ERROR);
2118
SparkConf conf = new SparkConf().setAppName("wordCounts").setMaster("local[3]");
2219
JavaSparkContext sc = new JavaSparkContext(conf);
2320

2421
JavaRDD<String> lines = sc.textFile("in/RealEstate.csv");
25-
2622
JavaRDD<String> cleanedLines = lines.filter(line -> !line.contains("Bedrooms"));
2723

2824
JavaPairRDD<String, AvgCount> housePricePairRdd = cleanedLines.mapToPair(
29-
(PairFunction<String, String, AvgCount>) line ->
30-
new Tuple2<>(line.split(",")[3],
31-
new AvgCount(1, Double.parseDouble(line.split(",")[2]))));
25+
line -> new Tuple2<>(line.split(",")[3],
26+
new AvgCount(1, Double.parseDouble(line.split(",")[2]))));
3227

3328
JavaPairRDD<String, AvgCount> housePriceTotal = housePricePairRdd.reduceByKey(
34-
(Function2<AvgCount, AvgCount, AvgCount>) (x, y) ->
35-
new AvgCount(x.getCount() + y.getCount(), x.getTotal() + y.getTotal()));
29+
(x, y) -> new AvgCount(x.getCount() + y.getCount(), x.getTotal() + y.getTotal()));
3630

3731
System.out.println("housePriceTotal: ");
3832
for (Map.Entry<String, AvgCount> housePriceTotalPair : housePriceTotal.collectAsMap().entrySet()) {
3933
System.out.println(housePriceTotalPair.getKey() + " : " + housePriceTotalPair.getValue());
40-
4134
}
4235

43-
JavaPairRDD<String, Double> housePriceAvg = housePriceTotal.mapToPair(
44-
(PairFunction<Tuple2<String, AvgCount>, String, Double>) total ->
45-
new Tuple2<>(total._1(), total._2().getTotal()/total._2().getCount()));
46-
36+
JavaPairRDD<String, Double> housePriceAvg = housePriceTotal.mapValues(avgCount -> avgCount.getTotal()/avgCount.getCount());
4737
System.out.println("housePriceAvg: ");
4838
for (Map.Entry<String, Double> housePriceAvgPair : housePriceAvg.collectAsMap().entrySet()) {
4939
System.out.println(housePriceAvgPair.getKey() + " : " + housePriceAvgPair.getValue());
5040

5141
}
5242
}
53-
5443
}

0 commit comments

Comments
 (0)