Skip to content

Commit 9d9066c

Browse files
authored
Merge pull request #1 from jleetutorial/pedromb-scala_to_python
Scala to Python - rdd folder
2 parents 50dcbc6 + f637b18 commit 9d9066c

29 files changed

+196
-273
lines changed

commons/Utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import re
2+
3+
class Utils():
4+
5+
COMMA_DELIMITER = re.compile(''',(?=(?:[^'"]|'[^']*'|"[^"]*")*$)''')

rdd/WordCount.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
from pyspark import SparkContext
33

44
if __name__ == "__main__":
5-
sc = SparkContext("local", "word count")
6-
lines = sc.textFile("in/word_count.text")
7-
words = lines.flatMap(lambda line: line.split(" "))
8-
wordCounts = words.countByValue()
9-
for word, count in wordCounts.items():
10-
print(word, count)
5+
sc = SparkContext("local", "word count")
6+
sc.setLogLevel("ERROR")
7+
lines = sc.textFile("in/word_count.text")
8+
words = lines.flatMap(lambda line: line.split(" "))
9+
wordCounts = words.countByValue()
10+
for word, count in wordCounts.items():
11+
print(word, count)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
'''
6+
Create a Spark program to read the airport data from in/airports.text, find all the airports whose latitude are bigger than 40.
7+
Then output the airport's name and the airport's latitude to out/airports_by_latitude.text.
8+
9+
Each row of the input file contains the following columns:
10+
Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code,
11+
ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format
12+
13+
Sample output:
14+
"St Anthony", 51.391944
15+
"Tofino", 49.082222
16+
...
17+
'''

rdd/airports/AirportsByLatitudeProblem.scala

Lines changed: 0 additions & 20 deletions
This file was deleted.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
def splitComma(line: str):
5+
splits = Utils.COMMA_DELIMITER.split(line)
6+
return "{}, {}".format(splits[1], splits[6])
7+
8+
if __name__ == "__main__":
9+
sc = SparkContext("local", "airports")
10+
11+
airports = sc.textFile("in/airports.text")
12+
13+
airportsInUSA = airports.filter(lambda line: float(Utils.COMMA_DELIMITER.split(line)[6]) > 40)
14+
15+
airportsNameAndCityNames = airportsInUSA.map(splitComma)
16+
17+
airportsNameAndCityNames.saveAsTextFile("out/airports_by_latitude.text")

rdd/airports/AirportsByLatitudeSolution.scala

Lines changed: 0 additions & 23 deletions
This file was deleted.

rdd/airports/AirportsInUsaProblem.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
'''
6+
Create a Spark program to read the airport data from in/airports.text, find all the airports which are located in United States
7+
and output the airport's name and the city's name to out/airports_in_usa.text.
8+
9+
Each row of the input file contains the following columns:
10+
Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code,
11+
ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format
12+
13+
Sample output:
14+
"Putnam County Airport", "Greencastle"
15+
"Dowagiac Municipal Airport", "Dowagiac"
16+
...
17+
'''

rdd/airports/AirportsInUsaProblem.scala

Lines changed: 0 additions & 19 deletions
This file was deleted.

rdd/airports/AirportsInUsaSolution.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
def splitComma(line: str):
5+
splits = Utils.COMMA_DELIMITER.split(line)
6+
return "{}, {}".format(splits[1], splits[2])
7+
8+
if __name__ == "__main__":
9+
sc = SparkContext("local", "airports")
10+
11+
airports = sc.textFile("in/airports.text")
12+
airportsInUSA = airports.filter(lambda line : Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"")
13+
14+
airportsNameAndCityNames = airportsInUSA.map(splitComma)
15+
airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text")

rdd/airports/AirportsInUsaSolution.scala

Lines changed: 0 additions & 22 deletions
This file was deleted.

rdd/collect/CollectExample.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
sc = SparkContext("local", "collect")
5+
sc.setLogLevel("ERROR")
6+
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"]
7+
wordRdd = sc.parallelize(inputWords)
8+
words = wordRdd.collect()
9+
for word in words:
10+
print(word)

rdd/collect/CollectExample.scala

Lines changed: 0 additions & 20 deletions
This file was deleted.

rdd/count/CountExample.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
sc = SparkContext("local", "count")
5+
sc.setLogLevel("ERROR")
6+
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"]
7+
wordRdd = sc.parallelize(inputWords)
8+
print("Count: {}".format(wordRdd.count()))
9+
worldCountByValue = wordRdd.countByValue()
10+
print("CountByValue: ")
11+
for word, count in worldCountByValue.items():
12+
print("{} : {}".format(word, count))

rdd/count/CountExample.scala

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
'''
6+
"in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995.
7+
"in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995
8+
Create a Spark program to generate a new RDD which contains the hosts which are accessed on BOTH days.
9+
Save the resulting RDD to "out/nasa_logs_same_hosts.csv" file.
10+
11+
Example output:
12+
vagrant.vf.mmc.com
13+
www-a1.proxy.aol.com
14+
.....
15+
16+
Keep in mind, that the original log files contains the following header lines.
17+
host logname time method url response bytes
18+
19+
Make sure the head lines are removed in the resulting RDD.
20+
'''

rdd/nasaApacheWebLogs/SameHostsProblem.scala

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
sc = SparkContext("local", "sameHosts")
5+
6+
julyFirstLogs = sc.textFile("in/nasa_19950701.tsv")
7+
augustFirstLogs = sc.textFile("in/nasa_19950801.tsv")
8+
9+
julyFirstHosts = julyFirstLogs.map(lambda line: line.split("\t")[0])
10+
augustFirstHosts = augustFirstLogs.map(lambda line: line.split("\t")[0])
11+
12+
intersection = julyFirstHosts.intersection(augustFirstHosts)
13+
14+
cleanedHostIntersection = intersection.filter(lambda host: host != "host")
15+
cleanedHostIntersection.saveAsTextFile("out/nasa_logs_same_hosts.csv")

rdd/nasaApacheWebLogs/SameHostsSolution.scala

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
'''
6+
"in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995.
7+
"in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995
8+
Create a Spark program to generate a new RDD which contains the log lines from both July 1st and August 1st,
9+
take a 0.1 sample of those log lines and save it to "out/sample_nasa_logs.tsv" file.
10+
11+
Keep in mind, that the original log files contains the following header lines.
12+
host logname time method url response bytes
13+
14+
Make sure the head lines are removed in the resulting RDD.
15+
'''

rdd/nasaApacheWebLogs/UnionLogProblem.scala

Lines changed: 0 additions & 18 deletions
This file was deleted.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from pyspark import SparkContext
2+
3+
def isNotHeader(line: str):
4+
return not (line.startswith("host") and "bytes" in line)
5+
6+
if __name__ == "__main__":
7+
sc = SparkContext("local", "unionLogs")
8+
9+
julyFirstLogs = sc.textFile("in/nasa_19950701.tsv")
10+
augustFirstLogs = sc.textFile("in/nasa_19950801.tsv")
11+
12+
aggregatedLogLines = julyFirstLogs.union(augustFirstLogs)
13+
14+
cleanLogLines = aggregatedLogLines.filter(isNotHeader)
15+
sample = cleanLogLines.sample(withReplacement = True, fraction = 0.1)
16+
17+
sample.saveAsTextFile("out/sample_nasa_logs.csv")

0 commit comments

Comments
 (0)