Skip to content

Commit 76f5cce

Browse files
author
Pedro Bernardo
committed
Changed scripts to use the conf paramenter on the SparkContext constructor & Removed setLogLevel
1 parent a6dc078 commit 76f5cce

File tree

14 files changed

+77
-54
lines changed

14 files changed

+77
-54
lines changed

pairRdd/aggregation/reducebykey/WordCount.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
5-
sc = SparkContext("local", "wordCounts")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("wordCounts").setMaster("local[3]")
5+
sc = SparkContext(conf = conf)
76

87
lines = sc.textFile("in/word_count.text")
98
wordRdd = lines.flatMap(lambda line: line.split(" "))

pairRdd/create/PairRddFromRegularRdd.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
5-
sc = SparkContext("local", "create")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("create").setMaster("local")
5+
sc = SparkContext(conf = conf)
76

87
inputStrings = ["Lily 23", "Jack 29", "Mary 29", "James 8"]
98
regularRDDs = sc.parallelize(inputStrings)

pairRdd/create/PairRddFromTupleList.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
5-
sc = SparkContext("local", "create")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("create").setMaster("local")
5+
sc = SparkContext(conf = conf)
76

87
tuples = [("Lily", 23), ("Jack", 29), ("Mary", 29), ("James", 8)]
98
pairRDD = sc.parallelize(tuples)
Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
5-
sc = SparkContext("local", "GroupByKeyVsReduceByKey")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName('GroupByKeyVsReduceByKey').setMaster("local[*]")
5+
sc = SparkContext(conf = conf)
76

87
words = ["one", "two", "two", "three", "three", "three"]
98
wordsPairRdd = sc.parallelize(words).map(lambda word: (word, 1))
109

11-
wordCountsWithReduceByKey = wordsPairRdd.reduceByKey(lambda x, y: x + y).collect()
10+
wordCountsWithReduceByKey = wordsPairRdd \
11+
.reduceByKey(lambda x, y: x + y) \
12+
.collect()
1213
print("wordCountsWithReduceByKey: {}".format(list(wordCountsWithReduceByKey)))
1314

1415
wordCountsWithGroupByKey = wordsPairRdd \
1516
.groupByKey() \
16-
.mapValues(lambda intIterable: len(intIterable)) \
17+
.mapValues(len) \
1718
.collect()
1819
print("wordCountsWithGroupByKey: {}".format(list(wordCountsWithGroupByKey)))
20+
21+

pairRdd/join/JoinOperations.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
5-
sc = SparkContext("local", "JoinOperations")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("JoinOperations").setMaster("local[1]")
5+
sc = SparkContext(conf = conf)
76

87
ages = sc.parallelize([("Tom", 29), ("John", 22)])
98
addresses = sc.parallelize([("James", "USA"), ("John", "UK")])

rdd/WordCount.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
import sys
2-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
32

43
if __name__ == "__main__":
5-
sc = SparkContext("local", "word count")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("word count").setMaster("local[3]")
5+
sc = SparkContext(conf = conf)
6+
77
lines = sc.textFile("in/word_count.text")
8+
89
words = lines.flatMap(lambda line: line.split(" "))
10+
911
wordCounts = words.countByValue()
12+
1013
for word, count in wordCounts.items():
11-
print(word, count)
14+
print("{} : {}".format(word, count))
15+

rdd/collect/CollectExample.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
sc = SparkContext("local", "collect")
5-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("collect").setMaster("local[*]")
5+
sc = SparkContext(conf = conf)
6+
67
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"]
8+
79
wordRdd = sc.parallelize(inputWords)
10+
811
words = wordRdd.collect()
12+
913
for word in words:
10-
print(word)
14+
print(word)
15+

rdd/count/CountExample.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
sc = SparkContext("local", "count")
5-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("count").setMaster("local[*]")
5+
sc = SparkContext(conf = conf)
66
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"]
77
wordRdd = sc.parallelize(inputWords)
88
print("Count: {}".format(wordRdd.count()))
99
worldCountByValue = wordRdd.countByValue()
1010
print("CountByValue: ")
1111
for word, count in worldCountByValue.items():
1212
print("{} : {}".format(word, count))
13+

rdd/nasaApacheWebLogs/SameHostsSolution.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
sc = SparkContext("local", "sameHosts")
4+
conf = SparkConf().setAppName("sameHosts").setMaster("local[1]")
5+
sc = SparkContext(conf = conf)
56

67
julyFirstLogs = sc.textFile("in/nasa_19950701.tsv")
78
augustFirstLogs = sc.textFile("in/nasa_19950801.tsv")
Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
def isNotHeader(line: str):
44
return not (line.startswith("host") and "bytes" in line)
55

66
if __name__ == "__main__":
7-
sc = SparkContext("local", "unionLogs")
7+
conf = SparkConf().setAppName("unionLogs").setMaster("local[*]")
8+
sc = SparkContext(conf = conf)
89

910
julyFirstLogs = sc.textFile("in/nasa_19950701.tsv")
1011
augustFirstLogs = sc.textFile("in/nasa_19950801.tsv")
@@ -14,4 +15,5 @@ def isNotHeader(line: str):
1415
cleanLogLines = aggregatedLogLines.filter(isNotHeader)
1516
sample = cleanLogLines.sample(withReplacement = True, fraction = 0.1)
1617

17-
sample.saveAsTextFile("out/sample_nasa_logs.csv")
18+
sample.saveAsTextFile("out/sample_nasa_logs.csv")
19+

rdd/persist/PersistExample.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1-
from pyspark import SparkContext, StorageLevel
1+
from pyspark import SparkContext, SparkConf, StorageLevel
22

33
if __name__ == "__main__":
4-
sc = SparkContext("local", "persist")
4+
conf = SparkConf().setAppName("persist").setMaster("local[*]")
5+
sc = SparkContext(conf = conf)
6+
57
inputIntegers = [1, 2, 3, 4, 5]
68
integerRdd = sc.parallelize(inputIntegers)
9+
710
integerRdd.persist(StorageLevel.MEMORY_ONLY)
11+
812
integerRdd.reduce(lambda x, y: x*y)
13+
914
integerRdd.count()

rdd/reduce/ReduceExample.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
22

33
if __name__ == "__main__":
4-
sc = SparkContext("local", "reduce")
5-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("reduce").setMaster("local[*]")
5+
sc = SparkContext(conf = conf)
6+
67
inputIntegers = [1, 2, 3, 4, 5]
78
integerRdd = sc.parallelize(inputIntegers)
9+
810
product = integerRdd.reduce(lambda x, y: x * y)
911
print("product is :{}".format(product))
Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
import sys
2-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
32

43
if __name__ == "__main__":
5-
sc = SparkContext("local", "primeNumbers")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("primeNumbers").setMaster("local[*]")
5+
sc = SparkContext(conf = conf)
6+
77
lines = sc.textFile("in/prime_nums.text")
88
numbers = lines.flatMap(lambda line: line.split("\t"))
9+
910
validNumbers = numbers.filter(lambda number: number)
11+
1012
intNumbers = validNumbers.map(lambda number: int(number))
11-
print("Sum is: ")
12-
print(intNumbers.reduce(lambda x, y: x + y))
13+
14+
print("Sum is: {}".format(intNumbers.reduce(lambda x, y: x + y)))
15+

rdd/take/TakeExample.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import sys
2-
from pyspark import SparkContext
1+
from pyspark import SparkContext, SparkConf
32

43
if __name__ == "__main__":
5-
sc = SparkContext("local", "take")
6-
sc.setLogLevel("ERROR")
4+
conf = SparkConf().setAppName("take").setMaster("local[*]")
5+
sc = SparkContext(conf = conf)
6+
77
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"]
88
wordRdd = sc.parallelize(inputWords)
9+
910
words = wordRdd.take(3)
1011
for word in words:
1112
print(word)

0 commit comments

Comments
 (0)