Skip to content

Commit 4473c72

Browse files
author
James Lee
committed
improve accumulator and broadcast exmaples
1 parent 7d67d40 commit 4473c72

File tree

4 files changed

+23
-20
lines changed

4 files changed

+23
-20
lines changed

src/main/java/com/sparkTutorial/advanced/accumulator/StackOverFlowSurvey.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.sparkTutorial.advanced.accumulator;
22

3+
import com.sparkTutorial.rdd.commons.Utils;
4+
import org.apache.log4j.Level;
5+
import org.apache.log4j.Logger;
36
import org.apache.spark.SparkConf;
47
import org.apache.spark.SparkContext;
58
import org.apache.spark.api.java.JavaRDD;
@@ -10,11 +13,10 @@
1013
public class StackOverFlowSurvey {
1114

1215
public static void main(String[] args) throws Exception {
13-
16+
Logger.getLogger("org").setLevel(Level.ERROR);
1417
SparkConf conf = new SparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]");
1518

1619
SparkContext sparkContext = new SparkContext(conf);
17-
1820
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
1921

2022
final LongAccumulator total = new LongAccumulator();
@@ -26,11 +28,11 @@ public static void main(String[] args) throws Exception {
2628
JavaRDD<String> responseRDD = javaSparkContext.textFile("in/2016-stack-overflow-survey-responses.csv");
2729

2830
JavaRDD<String> responseFromCanada = responseRDD.filter(response -> {
29-
String[] splits = response.split(",", -1);
31+
String[] splits = response.split(Utils.COMMA_DELIMITER, -1);
3032

3133
total.add(1);
3234

33-
if (splits[14].equals("")) {
35+
if (splits[14].isEmpty()) {
3436
missingSalaryMidPoint.add(1);
3537
}
3638

src/main/java/com/sparkTutorial/advanced/accumulator/StackOverFlowSurveyFollowUp.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.sparkTutorial.advanced.accumulator;
22

3+
import com.sparkTutorial.rdd.commons.Utils;
4+
import org.apache.log4j.Level;
5+
import org.apache.log4j.Logger;
36
import org.apache.spark.SparkConf;
47
import org.apache.spark.SparkContext;
58
import org.apache.spark.api.java.JavaRDD;
@@ -10,11 +13,9 @@
1013
public class StackOverFlowSurveyFollowUp {
1114

1215
public static void main(String[] args) throws Exception {
13-
16+
Logger.getLogger("org").setLevel(Level.ERROR);
1417
SparkConf conf = new SparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]");
15-
1618
SparkContext sparkContext = new SparkContext(conf);
17-
1819
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
1920

2021
final LongAccumulator total = new LongAccumulator();
@@ -31,11 +32,11 @@ public static void main(String[] args) throws Exception {
3132

3233
processedBytes.add(response.getBytes().length);
3334

34-
String[] splits = response.split(",", -1);
35+
String[] splits = response.split(Utils.COMMA_DELIMITER, -1);
3536

3637
total.add(1);
3738

38-
if (splits[14].equals("")) {
39+
if (splits[14].isEmpty()) {
3940
missingSalaryMidPoint.add(1);
4041
}
4142

src/main/java/com/sparkTutorial/advanced/broadcast/UkMarketSpaces.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.sparkTutorial.advanced.broadcast;
22

3+
import com.sparkTutorial.rdd.commons.Utils;
34
import org.apache.log4j.Level;
45
import org.apache.log4j.Logger;
56
import org.apache.spark.SparkConf;
@@ -14,19 +15,16 @@
1415
public class UkMarketSpaces {
1516

1617
public static void main(String[] args) throws Exception {
17-
1818
Logger.getLogger("org").setLevel(Level.ERROR);
19-
2019
SparkConf conf = new SparkConf().setAppName("UkMarketSpaces").setMaster("local[1]");
21-
2220
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
2321

2422
final Broadcast<Map<String, String>> postCodeMap = javaSparkContext.broadcast(loadPostCodeMap());
2523

2624
JavaRDD<String> marketsRdd = javaSparkContext.textFile("in/uk-market-spaces-identifiable-data.csv");
2725

2826
JavaRDD<String> regions = marketsRdd
29-
.filter(line -> !line.split(",", -1)[0].equals("Timestamp"))
27+
.filter(line -> !line.split(Utils.COMMA_DELIMITER, -1)[0].equals("Timestamp"))
3028
.map(line -> {
3129
Optional<String> postPrefix = getPostPrefix(line);
3230
if (postPrefix.isPresent() && postCodeMap.value().containsKey(postPrefix.get())) {
@@ -40,7 +38,7 @@ public static void main(String[] args) throws Exception {
4038
}
4139

4240
private static Optional<String> getPostPrefix(String line) {
43-
String[] splits = line.split(",", -1);
41+
String[] splits = line.split(Utils.COMMA_DELIMITER, -1);
4442
String postcode = splits[4];
4543
if (postcode.isEmpty()) {
4644
return Optional.empty();
@@ -53,7 +51,7 @@ private static Map<String, String> loadPostCodeMap() throws FileNotFoundExceptio
5351
Map<String, String> postCodeMap = new HashMap<>();
5452
while (postCode.hasNextLine()) {
5553
String line = postCode.nextLine();
56-
String[] splits = line.split(",", -1);
54+
String[] splits = line.split(Utils.COMMA_DELIMITER, -1);
5755
postCodeMap.put(splits[0], splits[7]);
5856
}
5957
return postCodeMap;

src/main/java/com/sparkTutorial/advanced/broadcast/UkMarketSpacesWithoutBroadcaset.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.sparkTutorial.advanced.broadcast;
22

3+
import com.sparkTutorial.rdd.commons.Utils;
4+
import org.apache.log4j.Level;
5+
import org.apache.log4j.Logger;
36
import org.apache.spark.SparkConf;
47
import org.apache.spark.api.java.JavaRDD;
58
import org.apache.spark.api.java.JavaSparkContext;
@@ -11,17 +14,16 @@
1114
public class UkMarketSpacesWithoutBroadcaset {
1215

1316
public static void main(String[] args) throws Exception {
14-
17+
Logger.getLogger("org").setLevel(Level.ERROR);
1518
SparkConf conf = new SparkConf().setAppName("UkMarketSpaces").setMaster("local[1]");
16-
1719
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
1820

1921
final Map<String, String> postCodeMap = loadPostCodeMap();
2022

2123
JavaRDD<String> marketsRdd = javaSparkContext.textFile("in/uk-market-spaces-identifiable-data.csv");
2224

2325
JavaRDD<String> regions = marketsRdd
24-
.filter(line -> !line.split(",", -1)[0].equals("Timestamp"))
26+
.filter(line -> !line.split(Utils.COMMA_DELIMITER, -1)[0].equals("Timestamp"))
2527
.map(line -> {
2628
List<String> postCodePrefixes = getPostPrefixes(line);
2729
for (String postCodePrefix: postCodePrefixes) {
@@ -37,7 +39,7 @@ public static void main(String[] args) throws Exception {
3739
}
3840

3941
private static List<String> getPostPrefixes(String line) {
40-
String[] splits = line.split(",", -1);
42+
String[] splits = line.split(Utils.COMMA_DELIMITER, -1);
4143
String postcode = splits[4];
4244
String cleanedPostCode = postcode.replaceAll("\\s+", "");
4345
ArrayList<String> prefixes = new ArrayList<>();
@@ -52,7 +54,7 @@ private static Map<String, String> loadPostCodeMap() throws FileNotFoundExceptio
5254
Map<String, String> postCodeMap = new HashMap<>();
5355
while (postCode.hasNextLine()) {
5456
String line = postCode.nextLine();
55-
String[] splits = line.split(",", -1);
57+
String[] splits = line.split(Utils.COMMA_DELIMITER, -1);
5658
postCodeMap.put(splits[0], splits[7]);
5759
}
5860
return postCodeMap;

0 commit comments

Comments
 (0)