Skip to content

Commit d119b95

Browse files
author
James Lee
committed
improve AverageHousePriceSolution
1 parent fd3b64d commit d119b95

File tree

1 file changed

+9
-17
lines changed

1 file changed

+9
-17
lines changed

src/main/java/com/sparkTutorial/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.apache.spark.api.java.JavaSparkContext;
1010
import org.apache.spark.api.java.function.Function;
1111
import org.apache.spark.api.java.function.Function2;
12-
import org.apache.spark.api.java.function.PairFunction;
1312
import scala.Tuple2;
1413

1514
import java.util.Map;
@@ -23,33 +22,26 @@ public static void main(String[] args) throws Exception {
2322
JavaSparkContext sc = new JavaSparkContext(conf);
2423

2524
JavaRDD<String> lines = sc.textFile("in/RealEstate.csv");
26-
2725
JavaRDD<String> cleanedLines = lines.filter(line -> !line.contains("Bedrooms"));
2826

2927
JavaPairRDD<String, Double> housePricePairRdd = cleanedLines.mapToPair(
30-
(PairFunction<String, String, Double>) line ->
31-
new Tuple2<>(line.split(",")[3],
32-
Double.parseDouble(line.split(",")[2])));
28+
line -> new Tuple2<>(line.split(",")[3],
29+
Double.parseDouble(line.split(",")[2])));
3330

3431
JavaPairRDD<String, AvgCount> housePriceTotal= housePricePairRdd.combineByKey(createCombiner, mergeValue, mergeCombiners);
3532

36-
JavaPairRDD<String, Double> housePriceAvg = housePriceTotal.mapToPair(
37-
(PairFunction<Tuple2<String, AvgCount>, String, Double>) total ->
38-
new Tuple2<>(total._1(), total._2().getTotal()/total._2().getCount()));
33+
JavaPairRDD<String, Double> housePriceAvg = housePriceTotal.mapValues(avgCount -> avgCount.getTotal()/avgCount.getCount());
3934

4035
for (Map.Entry<String, Double> housePriceAvgPair : housePriceAvg.collectAsMap().entrySet()) {
4136
System.out.println(housePriceAvgPair.getKey() + " : " + housePriceAvgPair.getValue());
42-
4337
}
4438
}
4539

46-
static Function<Double, AvgCount> createCombiner = (Function<Double, AvgCount>) x -> new AvgCount(1, x);
47-
48-
static Function2<AvgCount, Double, AvgCount> mergeValue
49-
= (Function2<AvgCount, Double, AvgCount>) (avgCount, x) -> new AvgCount(avgCount.getCount() + 1,
50-
avgCount.getTotal() + x);
51-
static Function2<AvgCount, AvgCount, AvgCount> mergeCombiners
52-
= (Function2<AvgCount, AvgCount, AvgCount>) (avgCountA, avgCountB) -> new AvgCount(avgCountA.getCount() + avgCountB.getCount(),
53-
avgCountA.getTotal() + avgCountB.getTotal());
40+
static Function<Double, AvgCount> createCombiner = x -> new AvgCount(1, x);
5441

42+
static Function2<AvgCount, Double, AvgCount> mergeValue = (avgCount, x) -> new AvgCount(avgCount.getCount() + 1,
43+
avgCount.getTotal() + x);
44+
static Function2<AvgCount, AvgCount, AvgCount> mergeCombiners =
45+
(avgCountA, avgCountB) -> new AvgCount(avgCountA.getCount() + avgCountB.getCount(),
46+
avgCountA.getTotal() + avgCountB.getTotal());
5547
}

0 commit comments

Comments
 (0)