Skip to content

Commit e51b646

Browse files
author
James Lee
committed
improve AirportsByCountrySolution and GroupByKeyVsReduceByKey
1 parent 858fca1 commit e51b646

File tree

2 files changed

+14
-25
lines changed

2 files changed

+14
-25
lines changed
Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
package com.sparkTutorial.pairRdd.groupbykey;
22

3-
import com.google.common.collect.Iterables;
43
import com.sparkTutorial.rdd.commons.Utils;
54
import org.apache.log4j.Level;
65
import org.apache.log4j.Logger;
76
import org.apache.spark.SparkConf;
87
import org.apache.spark.api.java.JavaPairRDD;
98
import org.apache.spark.api.java.JavaRDD;
109
import org.apache.spark.api.java.JavaSparkContext;
11-
import org.apache.spark.api.java.function.PairFunction;
1210
import scala.Tuple2;
1311

14-
import java.util.Arrays;
12+
import java.util.Map;
1513

1614
public class AirportsByCountrySolution {
1715

@@ -23,18 +21,13 @@ public static void main(String[] args) throws Exception {
2321
JavaRDD<String> lines = sc.textFile("in/airports.text");
2422

2523
JavaPairRDD<String, String> CountryAndAirportNameAndPair =
26-
lines.mapToPair((PairFunction<String, String, String>) airport ->
27-
new Tuple2<>(airport.split(Utils.COMMA_DELIMITER)[3],
28-
airport.split(Utils.COMMA_DELIMITER)[1]));
24+
lines.mapToPair( airport -> new Tuple2<>(airport.split(Utils.COMMA_DELIMITER)[3],
25+
airport.split(Utils.COMMA_DELIMITER)[1]));
2926

3027
JavaPairRDD<String, Iterable<String>> AirportsByCountry = CountryAndAirportNameAndPair.groupByKey();
3128

32-
for (Tuple2<String, Iterable<String>> airports : AirportsByCountry.collect()) {
33-
System.out.println(airports._1() + " : " + iterableToString(airports._2()));
29+
for (Map.Entry<String, Iterable<String>> airports : AirportsByCountry.collectAsMap().entrySet()) {
30+
System.out.println(airports.getKey() + " : " + airports.getValue());
3431
}
3532
}
36-
37-
private static String iterableToString(Iterable<String> iterable) {
38-
return Arrays.toString(Iterables.toArray(iterable, String.class));
39-
}
4033
}
Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.sparkTutorial.pairRdd.groupbykey;
22

3+
import com.google.common.collect.Iterables;
4+
import org.apache.log4j.Level;
5+
import org.apache.log4j.Logger;
36
import org.apache.spark.SparkConf;
47
import org.apache.spark.api.java.JavaPairRDD;
58
import org.apache.spark.api.java.JavaSparkContext;
@@ -11,26 +14,19 @@
1114
public class GroupByKeyVsReduceByKey {
1215

1316
public static void main(String[] args) throws Exception {
14-
17+
Logger.getLogger("org").setLevel(Level.ERROR);
1518
SparkConf conf = new SparkConf().setAppName("GroupByKeyVsReduceByKey").setMaster("local[*]");
1619
JavaSparkContext sc = new JavaSparkContext(conf);
1720

1821
List<String> words = Arrays.asList("one", "two", "two", "three", "three", "three");
19-
2022
JavaPairRDD<String, Integer> wordsPairRdd = sc.parallelize(words).mapToPair(word -> new Tuple2<>(word, 1));
2123

22-
List<Tuple2<String, Integer>> wordCountsWithReduce = wordsPairRdd.reduceByKey((x, y) -> x + y).collect();
23-
24-
List<Tuple2<String, Integer>> wordCountsWithGroup = wordsPairRdd.groupByKey()
25-
.mapToPair(word -> new Tuple2<>(word._1(), getSum(word._2()))).collect();
26-
}
24+
List<Tuple2<String, Integer>> wordCountsWithReduceByKey = wordsPairRdd.reduceByKey((x, y) -> x + y).collect();
25+
System.out.println("wordCountsWithReduceByKey: " + wordCountsWithReduceByKey);
2726

28-
private static int getSum(Iterable<Integer> integers) {
29-
int sum = 0;
30-
for (Integer integer : integers) {
31-
sum = + integer;
32-
}
33-
return sum;
27+
List<Tuple2<String, Integer>> wordCountsWithGroupByKey = wordsPairRdd.groupByKey()
28+
.mapValues(intIterable -> Iterables.size(intIterable)).collect();
29+
System.out.println("wordCountsWithGroupByKey: " + wordCountsWithGroupByKey);
3430
}
3531
}
3632

0 commit comments

Comments
 (0)