diff --git a/advanced/accumulator/StackOverFlowSurvey.py b/advanced/accumulator/StackOverFlowSurvey.py index be9b0b3a..6a54b44b 100644 --- a/advanced/accumulator/StackOverFlowSurvey.py +++ b/advanced/accumulator/StackOverFlowSurvey.py @@ -1,25 +1,24 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf from commons.Utils import Utils -def filterResponseFromCanada(response, total, missingSalaryMidPoint): - splits = Utils.COMMA_DELIMITER.split(response) - total.add(1) - if not splits[14]: - missingSalaryMidPoint.add(1) - return splits[2] == "Canada" - if __name__ == "__main__": - sc = SparkContext("local", "StackOverFlowSurvey") - sc.setLogLevel("ERROR") - + conf = SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]") + sc = SparkContext(conf = conf) total = sc.accumulator(0) missingSalaryMidPoint = sc.accumulator(0) - responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv") - responseFromCanada = responseRDD.filter(lambda response: \ - filterResponseFromCanada(response, total, missingSalaryMidPoint)) + def filterResponseFromCanada(response): + splits = Utils.COMMA_DELIMITER.split(response) + total.add(1) + if not splits[14]: + missingSalaryMidPoint.add(1) + return splits[2] == "Canada" + responseFromCanada = responseRDD.filter(filterResponseFromCanada) print("Count of responses from Canada: {}".format(responseFromCanada.count())) print("Total count of responses: {}".format(total.value)) - print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value)) + print("Count of responses missing salary middle point: {}" \ + .format(missingSalaryMidPoint.value)) diff --git a/advanced/accumulator/StackOverFlowSurveyFollowUp.py b/advanced/accumulator/StackOverFlowSurveyFollowUp.py index 8db80d1f..03f3909a 100644 --- a/advanced/accumulator/StackOverFlowSurveyFollowUp.py +++ b/advanced/accumulator/StackOverFlowSurveyFollowUp.py @@ -1,26 +1,25 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf from commons.Utils import Utils -def filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes): - processedBytes.add(len(response.encode('utf-8'))) - splits = Utils.COMMA_DELIMITER.split(response) - total.add(1) - if not splits[14]: - missingSalaryMidPoint.add(1) - return splits[2] == "Canada" - if __name__ == "__main__": - sc = SparkContext("local", "StackOverFlowSurvey") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]") + sc = SparkContext(conf = conf) total = sc.accumulator(0) missingSalaryMidPoint = sc.accumulator(0) processedBytes = sc.accumulator(0) - responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv") - responseFromCanada = responseRDD.filter(lambda response: \ - filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes)) + def filterResponseFromCanada(response): + processedBytes.add(len(response.encode('utf-8'))) + splits = Utils.COMMA_DELIMITER.split(response) + total.add(1) + if not splits[14]: + missingSalaryMidPoint.add(1) + return splits[2] == "Canada" + responseFromCanada = responseRDD.filter(filterResponseFromCanada) print("Count of responses from Canada: {}".format(responseFromCanada.count())) print("Number of bytes processed: {}".format(processedBytes.value)) diff --git a/advanced/broadcast/UkMakerSpaces.py b/advanced/broadcast/UkMakerSpaces.py index 7cc172d0..9323590e 100644 --- a/advanced/broadcast/UkMakerSpaces.py +++ b/advanced/broadcast/UkMakerSpaces.py @@ -1,19 +1,21 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf from commons.Utils import Utils -def getPostPrefix(line: str): - splits = Utils.COMMA_DELIMITER.split(line) - postcode = splits[4] - return None if not postcode else postcode.split(" ")[0] - def loadPostCodeMap(): lines = open("in/uk-postcode.csv", "r").read().split("\n") splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""] return {splits[0]: splits[7] for splits in splitsForLines} +def getPostPrefix(line: str): + splits = Utils.COMMA_DELIMITER.split(line) + postcode = splits[4] + return None if not postcode else postcode.split(" ")[0] + if __name__ == "__main__": - sc = SparkContext("local", "UkMakerSpaces") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]") + sc = SparkContext(conf = conf) postCodeMap = sc.broadcast(loadPostCodeMap()) diff --git a/advanced/broadcast/UkMakerSpacesWithoutBroadcast.py b/advanced/broadcast/UkMakerSpacesWithoutBroadcast.py index 4854f417..17e79d36 100644 --- a/advanced/broadcast/UkMakerSpacesWithoutBroadcast.py +++ b/advanced/broadcast/UkMakerSpacesWithoutBroadcast.py @@ -1,26 +1,28 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf from commons.Utils import Utils -def getPostPrefixes(line: str): - postcode = Utils.COMMA_DELIMITER.split(line)[4] - cleanedPostCode = postcode.replace("\\s+", "") - return [cleanedPostCode[0:i] for i in range(0,len(cleanedPostCode)+1)] - def loadPostCodeMap(): lines = open("in/uk-postcode.csv", "r").read().split("\n") splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""] return {splits[0]: splits[7] for splits in splitsForLines} +def getPostPrefix(line: str): + splits = Utils.COMMA_DELIMITER.split(line) + postcode = splits[4] + return None if not postcode else postcode.split(" ")[0] + if __name__ == "__main__": - sc = SparkContext("local", "UkMakerSpaces") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]") + sc = SparkContext(conf = conf) postCodeMap = loadPostCodeMap() makerSpaceRdd = sc.textFile("in/uk-makerspaces-identifiable-data.csv") regions = makerSpaceRdd \ .filter(lambda line: Utils.COMMA_DELIMITER.split(line)[0] != "Timestamp") \ - .map(lambda line: next((postCodeMap[prefix] for prefix in getPostPrefixes(line) \ - if prefix in postCodeMap), "Unknow")) + .map(lambda line: postCodeMap[getPostPrefix(line)] \ + if getPostPrefix(line) in postCodeMap else "Unknow") for region, count in regions.countByValue().items(): print("{} : {}".format(region, count)) diff --git a/build.gradle b/build.gradle deleted file mode 100644 index 3aa9e64f..00000000 --- a/build.gradle +++ /dev/null @@ -1,33 +0,0 @@ -group 'jameslee' -version '1.0-SNAPSHOT' - -apply plugin: 'scala' -apply plugin: 'idea' -apply plugin: 'eclipse' - -repositories { - mavenCentral() -} - -dependencies { - compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '2.1.0' - compile group: 'org.apache.spark', name: 'spark-sql_2.10', version: '2.1.0' - compile group: 'org.scala-lang', name: 'scala-library', version: '2.10.6' -} - - -jar { - zip64 true - archiveName = "StackOverFlowSurvey-spark.jar" - from { - configurations.compile.collect { - it.isDirectory() ? it : zipTree(it) - } - } - manifest { - attributes 'Main-Class': 'com.sparkTutorial.sparkSql.StackOverFlowSurvey' - } - - exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA' - -} diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar deleted file mode 100644 index 94114481..00000000 Binary files a/gradle/wrapper/gradle-wrapper.jar and /dev/null differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties deleted file mode 100644 index 0753a920..00000000 --- a/gradle/wrapper/gradle-wrapper.properties +++ /dev/null @@ -1,6 +0,0 @@ -#Tue Oct 25 21:49:41 BST 2016 -distributionBase=GRADLE_USER_HOME -distributionPath=wrapper/dists -zipStoreBase=GRADLE_USER_HOME -zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-3.2.1-bin.zip diff --git a/gradlew b/gradlew deleted file mode 100755 index 9d82f789..00000000 --- a/gradlew +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/env bash - -############################################################################## -## -## Gradle start up script for UN*X -## -############################################################################## - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS="" - -APP_NAME="Gradle" -APP_BASE_NAME=`basename "$0"` - -# Use the maximum available, or set MAX_FD != -1 to use that value. -MAX_FD="maximum" - -warn ( ) { - echo "$*" -} - -die ( ) { - echo - echo "$*" - echo - exit 1 -} - -# OS specific support (must be 'true' or 'false'). -cygwin=false -msys=false -darwin=false -case "`uname`" in - CYGWIN* ) - cygwin=true - ;; - Darwin* ) - darwin=true - ;; - MINGW* ) - msys=true - ;; -esac - -# Attempt to set APP_HOME -# Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi -done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null - -CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar - -# Determine the Java command to use to start the JVM. -if [ -n "$JAVA_HOME" ] ; then - if [ -x "$JAVA_HOME/jre/sh/java" ] ; then - # IBM's JDK on AIX uses strange locations for the executables - JAVACMD="$JAVA_HOME/jre/sh/java" - else - JAVACMD="$JAVA_HOME/bin/java" - fi - if [ ! -x "$JAVACMD" ] ; then - die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME - -Please set the JAVA_HOME variable in your environment to match the -location of your Java installation." - fi -else - JAVACMD="java" - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. - -Please set the JAVA_HOME variable in your environment to match the -location of your Java installation." -fi - -# Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then - MAX_FD_LIMIT=`ulimit -H -n` - if [ $? -eq 0 ] ; then - if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then - MAX_FD="$MAX_FD_LIMIT" - fi - ulimit -n $MAX_FD - if [ $? -ne 0 ] ; then - warn "Could not set maximum file descriptor limit: $MAX_FD" - fi - else - warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" - fi -fi - -# For Darwin, add options to specify how the application appears in the dock -if $darwin; then - GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" -fi - -# For Cygwin, switch paths to Windows format before running java -if $cygwin ; then - APP_HOME=`cygpath --path --mixed "$APP_HOME"` - CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` - JAVACMD=`cygpath --unix "$JAVACMD"` - - # We build the pattern for arguments to be converted via cygpath - ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` - SEP="" - for dir in $ROOTDIRSRAW ; do - ROOTDIRS="$ROOTDIRS$SEP$dir" - SEP="|" - done - OURCYGPATTERN="(^($ROOTDIRS))" - # Add a user-defined pattern to the cygpath arguments - if [ "$GRADLE_CYGPATTERN" != "" ] ; then - OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" - fi - # Now convert the arguments - kludge to limit ourselves to /bin/sh - i=0 - for arg in "$@" ; do - CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` - CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option - - if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition - eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` - else - eval `echo args$i`="\"$arg\"" - fi - i=$((i+1)) - done - case $i in - (0) set -- ;; - (1) set -- "$args0" ;; - (2) set -- "$args0" "$args1" ;; - (3) set -- "$args0" "$args1" "$args2" ;; - (4) set -- "$args0" "$args1" "$args2" "$args3" ;; - (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; - esac -fi - -# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules -function splitJvmOpts() { - JVM_OPTS=("$@") -} -eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS -JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" - -exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" diff --git a/gradlew.bat b/gradlew.bat deleted file mode 100755 index aec99730..00000000 --- a/gradlew.bat +++ /dev/null @@ -1,90 +0,0 @@ -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto init - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:init -@rem Get command-line arguments, handling Windowz variants - -if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega diff --git a/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py b/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py index 4885ccbe..00b82eba 100644 --- a/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py +++ b/pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py @@ -1,9 +1,8 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - - sc = SparkContext("local", "AverageHousePrice") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("AverageHousePrice").setMaster("local") + sc = SparkContext(conf = conf) lines = sc.textFile("in/RealEstate.csv") cleanedLines = lines.filter(lambda line: "Bedrooms" not in line) diff --git a/pairRdd/aggregation/reducebykey/WordCount.py b/pairRdd/aggregation/reducebykey/WordCount.py index 3a00f380..2bb4f1c0 100644 --- a/pairRdd/aggregation/reducebykey/WordCount.py +++ b/pairRdd/aggregation/reducebykey/WordCount.py @@ -1,9 +1,8 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - - sc = SparkContext("local", "wordCounts") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("wordCounts").setMaster("local[3]") + sc = SparkContext(conf = conf) lines = sc.textFile("in/word_count.text") wordRdd = lines.flatMap(lambda line: line.split(" ")) diff --git a/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.py b/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.py index acb633e2..69ecc9ae 100644 --- a/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.py +++ b/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.py @@ -1,24 +1,26 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf +from pairRdd.aggregation.reducebykey.housePrice.AvgCount import AvgCount if __name__ == "__main__": - - sc = SparkContext("local", "avgHousePrice") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("avgHousePrice").setMaster("local[3]") + sc = SparkContext(conf = conf) lines = sc.textFile("in/RealEstate.csv") cleanedLines = lines.filter(lambda line: "Bedrooms" not in line) housePricePairRdd = cleanedLines.map(lambda line: \ - (line.split(",")[3], (1, float(line.split(",")[2])))) + (line.split(",")[3], AvgCount(1, float(line.split(",")[2])))) housePriceTotal = housePricePairRdd \ - .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) + .reduceByKey(lambda x, y: AvgCount(x.count + y.count, x.total + y.total)) print("housePriceTotal: ") - for bedroom, total in housePriceTotal.collect(): - print("{} : {}".format(bedroom, total)) + for bedroom, avgCount in housePriceTotal.collect(): + print("{} : ({}, {})".format(bedroom, avgCount.count, avgCount.total)) - housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount[1] / avgCount[0]) + housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount.total / avgCount.count) print("\nhousePriceAvg: ") for bedroom, avg in housePriceAvg.collect(): print("{} : {}".format(bedroom, avg)) diff --git a/pairRdd/create/PairRddFromRegularRdd.py b/pairRdd/create/PairRddFromRegularRdd.py index bfd6f187..a699e982 100644 --- a/pairRdd/create/PairRddFromRegularRdd.py +++ b/pairRdd/create/PairRddFromRegularRdd.py @@ -1,9 +1,8 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - - sc = SparkContext("local", "create") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("create").setMaster("local") + sc = SparkContext(conf = conf) inputStrings = ["Lily 23", "Jack 29", "Mary 29", "James 8"] regularRDDs = sc.parallelize(inputStrings) diff --git a/pairRdd/create/PairRddFromTupleList.py b/pairRdd/create/PairRddFromTupleList.py index c728d005..0c72ba06 100644 --- a/pairRdd/create/PairRddFromTupleList.py +++ b/pairRdd/create/PairRddFromTupleList.py @@ -1,9 +1,8 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - - sc = SparkContext("local", "create") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("create").setMaster("local") + sc = SparkContext(conf = conf) tuples = [("Lily", 23), ("Jack", 29), ("Mary", 29), ("James", 8)] pairRDD = sc.parallelize(tuples) diff --git a/pairRdd/filter/AirportsNotInUsaSolution.py b/pairRdd/filter/AirportsNotInUsaSolution.py index fd9c2fe9..e30b4835 100644 --- a/pairRdd/filter/AirportsNotInUsaSolution.py +++ b/pairRdd/filter/AirportsNotInUsaSolution.py @@ -1,10 +1,12 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf from commons.Utils import Utils if __name__ == "__main__": - sc = SparkContext("local", "airports") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("airports").setMaster("local[*]") + sc = SparkContext(conf = conf) airportsRDD = sc.textFile("in/airports.text") diff --git a/pairRdd/groupbykey/AirportsByCountrySolution.py b/pairRdd/groupbykey/AirportsByCountrySolution.py index 0cc7017e..a5670f6c 100644 --- a/pairRdd/groupbykey/AirportsByCountrySolution.py +++ b/pairRdd/groupbykey/AirportsByCountrySolution.py @@ -1,10 +1,12 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf from commons.Utils import Utils if __name__ == "__main__": - sc = SparkContext("local", "airports") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("airports").setMaster("local[*]") + sc = SparkContext(conf = conf) lines = sc.textFile("in/airports.text") @@ -15,4 +17,4 @@ airportsByCountry = countryAndAirportNameAndPair.groupByKey() for country, airportName in airportsByCountry.collectAsMap().items(): - print("{}: {}".format(country,list(airportName))) + print("{}: {}".format(country, list(airportName))) diff --git a/pairRdd/groupbykey/GroupByKeyVsReduceByKey.py b/pairRdd/groupbykey/GroupByKeyVsReduceByKey.py index 99eb96df..bbc80f92 100644 --- a/pairRdd/groupbykey/GroupByKeyVsReduceByKey.py +++ b/pairRdd/groupbykey/GroupByKeyVsReduceByKey.py @@ -1,18 +1,21 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - - sc = SparkContext("local", "GroupByKeyVsReduceByKey") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName('GroupByKeyVsReduceByKey').setMaster("local[*]") + sc = SparkContext(conf = conf) words = ["one", "two", "two", "three", "three", "three"] wordsPairRdd = sc.parallelize(words).map(lambda word: (word, 1)) - wordCountsWithReduceByKey = wordsPairRdd.reduceByKey(lambda x, y: x + y).collect() + wordCountsWithReduceByKey = wordsPairRdd \ + .reduceByKey(lambda x, y: x + y) \ + .collect() print("wordCountsWithReduceByKey: {}".format(list(wordCountsWithReduceByKey))) wordCountsWithGroupByKey = wordsPairRdd \ .groupByKey() \ - .mapValues(lambda intIterable: len(intIterable)) \ + .mapValues(len) \ .collect() print("wordCountsWithGroupByKey: {}".format(list(wordCountsWithGroupByKey))) + + diff --git a/pairRdd/join/JoinOperations.py b/pairRdd/join/JoinOperations.py index 250f9007..47f04219 100644 --- a/pairRdd/join/JoinOperations.py +++ b/pairRdd/join/JoinOperations.py @@ -1,9 +1,8 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - - sc = SparkContext("local", "JoinOperations") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("JoinOperations").setMaster("local[1]") + sc = SparkContext(conf = conf) ages = sc.parallelize([("Tom", 29), ("John", 22)]) addresses = sc.parallelize([("James", "USA"), ("John", "UK")]) diff --git a/pairRdd/mapValues/AirportsUppercaseSolution.py b/pairRdd/mapValues/AirportsUppercaseSolution.py index 3c9fa201..fded76e7 100644 --- a/pairRdd/mapValues/AirportsUppercaseSolution.py +++ b/pairRdd/mapValues/AirportsUppercaseSolution.py @@ -1,10 +1,11 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf from commons.Utils import Utils if __name__ == "__main__": - - sc = SparkContext("local", "airports") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("airports").setMaster("local[*]") + sc = SparkContext(conf = conf) airportsRDD = sc.textFile("in/airports.text") diff --git a/pairRdd/sort/AverageHousePriceSolution.py b/pairRdd/sort/AverageHousePriceSolution.py index 4306fdbc..1fda400e 100644 --- a/pairRdd/sort/AverageHousePriceSolution.py +++ b/pairRdd/sort/AverageHousePriceSolution.py @@ -1,11 +1,11 @@ +import sys +sys.path.insert(0, '.') from pairRdd.aggregation.reducebykey.housePrice.AvgCount import AvgCount -from pyspark import SparkContext - +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - - sc = SparkContext("local", "averageHousePriceSolution") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("averageHousePriceSolution").setMaster("local[*]") + sc = SparkContext(conf = conf) lines = sc.textFile("in/RealEstate.csv") cleanedLines = lines.filter(lambda line: "Bedrooms" not in line) diff --git a/pairRdd/sort/SortedWordCountSolution.py b/pairRdd/sort/SortedWordCountSolution.py index 398c57ae..31d137c4 100644 --- a/pairRdd/sort/SortedWordCountSolution.py +++ b/pairRdd/sort/SortedWordCountSolution.py @@ -1,20 +1,18 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - - sc = SparkContext("local", "wordCounts") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("wordCounts").setMaster("local[*]") + sc = SparkContext(conf = conf) + lines = sc.textFile("in/word_count.text") wordRdd = lines.flatMap(lambda line: line.split(" ")) wordPairRdd = wordRdd.map(lambda word: (word, 1)) wordToCountPairs = wordPairRdd.reduceByKey(lambda x, y: x + y) - countToWordParis = wordToCountPairs.map(lambda wordToCount: (wordToCount[1], wordToCount[0])) - - sortedCountToWordParis = countToWordParis.sortByKey(ascending=False) + sortedWordCountPairs = wordToCountPairs \ + .sortBy(lambda wordCount: wordCount[1], ascending=False) - sortedWordToCountPairs = sortedCountToWordParis.map(lambda countToWord: (countToWord[1], countToWord[0])) - - for word, count in sortedWordToCountPairs.collect(): + for word, count in sortedWordCountPairs.collect(): print("{} : {}".format(word, count)) + diff --git a/rdd/WordCount.py b/rdd/WordCount.py index c95a04f1..ee7d9256 100644 --- a/rdd/WordCount.py +++ b/rdd/WordCount.py @@ -1,11 +1,15 @@ -import sys -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - sc = SparkContext("local", "word count") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("word count").setMaster("local[3]") + sc = SparkContext(conf = conf) + lines = sc.textFile("in/word_count.text") + words = lines.flatMap(lambda line: line.split(" ")) + wordCounts = words.countByValue() + for word, count in wordCounts.items(): - print(word, count) + print("{} : {}".format(word, count)) + diff --git a/rdd/airports/AirportsByLatitudeSolution.py b/rdd/airports/AirportsByLatitudeSolution.py index 2ce476d5..70c1db13 100644 --- a/rdd/airports/AirportsByLatitudeSolution.py +++ b/rdd/airports/AirportsByLatitudeSolution.py @@ -1,4 +1,6 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf from commons.Utils import Utils def splitComma(line: str): @@ -6,7 +8,8 @@ def splitComma(line: str): return "{}, {}".format(splits[1], splits[6]) if __name__ == "__main__": - sc = SparkContext("local", "airports") + conf = SparkConf().setAppName("airports").setMaster("local[*]") + sc = SparkContext(conf = conf) airports = sc.textFile("in/airports.text") diff --git a/rdd/airports/AirportsInUsaSolution.py b/rdd/airports/AirportsInUsaSolution.py index 96ec733b..1fa72b57 100644 --- a/rdd/airports/AirportsInUsaSolution.py +++ b/rdd/airports/AirportsInUsaSolution.py @@ -1,4 +1,6 @@ -from pyspark import SparkContext +import sys +sys.path.insert(0, '.') +from pyspark import SparkContext, SparkConf from commons.Utils import Utils def splitComma(line: str): @@ -6,7 +8,8 @@ def splitComma(line: str): return "{}, {}".format(splits[1], splits[2]) if __name__ == "__main__": - sc = SparkContext("local", "airports") + conf = SparkConf().setAppName("airports").setMaster("local[*]") + sc = SparkContext(conf = conf) airports = sc.textFile("in/airports.text") airportsInUSA = airports.filter(lambda line : Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"") diff --git a/rdd/collect/CollectExample.py b/rdd/collect/CollectExample.py index fe01119d..832dceaf 100644 --- a/rdd/collect/CollectExample.py +++ b/rdd/collect/CollectExample.py @@ -1,10 +1,15 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - sc = SparkContext("local", "collect") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("collect").setMaster("local[*]") + sc = SparkContext(conf = conf) + inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"] + wordRdd = sc.parallelize(inputWords) + words = wordRdd.collect() + for word in words: - print(word) \ No newline at end of file + print(word) + diff --git a/rdd/count/CountExample.py b/rdd/count/CountExample.py index 93bbf45e..54cd69a2 100644 --- a/rdd/count/CountExample.py +++ b/rdd/count/CountExample.py @@ -1,12 +1,16 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - sc = SparkContext("local", "count") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("count").setMaster("local[*]") + sc = SparkContext(conf = conf) + inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"] + wordRdd = sc.parallelize(inputWords) print("Count: {}".format(wordRdd.count())) + worldCountByValue = wordRdd.countByValue() print("CountByValue: ") for word, count in worldCountByValue.items(): print("{} : {}".format(word, count)) + diff --git a/rdd/nasaApacheWebLogs/SameHostsSolution.py b/rdd/nasaApacheWebLogs/SameHostsSolution.py index 7081dce9..c1efb27d 100644 --- a/rdd/nasaApacheWebLogs/SameHostsSolution.py +++ b/rdd/nasaApacheWebLogs/SameHostsSolution.py @@ -1,7 +1,8 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - sc = SparkContext("local", "sameHosts") + conf = SparkConf().setAppName("sameHosts").setMaster("local[1]") + sc = SparkContext(conf = conf) julyFirstLogs = sc.textFile("in/nasa_19950701.tsv") augustFirstLogs = sc.textFile("in/nasa_19950801.tsv") diff --git a/rdd/nasaApacheWebLogs/UnionLogSolutions.py b/rdd/nasaApacheWebLogs/UnionLogSolutions.py index c69c0332..790ead42 100644 --- a/rdd/nasaApacheWebLogs/UnionLogSolutions.py +++ b/rdd/nasaApacheWebLogs/UnionLogSolutions.py @@ -1,10 +1,11 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf def isNotHeader(line: str): return not (line.startswith("host") and "bytes" in line) if __name__ == "__main__": - sc = SparkContext("local", "unionLogs") + conf = SparkConf().setAppName("unionLogs").setMaster("local[*]") + sc = SparkContext(conf = conf) julyFirstLogs = sc.textFile("in/nasa_19950701.tsv") augustFirstLogs = sc.textFile("in/nasa_19950801.tsv") @@ -14,4 +15,5 @@ def isNotHeader(line: str): cleanLogLines = aggregatedLogLines.filter(isNotHeader) sample = cleanLogLines.sample(withReplacement = True, fraction = 0.1) - sample.saveAsTextFile("out/sample_nasa_logs.csv") \ No newline at end of file + sample.saveAsTextFile("out/sample_nasa_logs.csv") + diff --git a/rdd/persist/PersistExample.py b/rdd/persist/PersistExample.py index 947a6e14..9d522ed2 100644 --- a/rdd/persist/PersistExample.py +++ b/rdd/persist/PersistExample.py @@ -1,9 +1,14 @@ -from pyspark import SparkContext, StorageLevel +from pyspark import SparkContext, SparkConf, StorageLevel if __name__ == "__main__": - sc = SparkContext("local", "persist") + conf = SparkConf().setAppName("persist").setMaster("local[*]") + sc = SparkContext(conf = conf) + inputIntegers = [1, 2, 3, 4, 5] integerRdd = sc.parallelize(inputIntegers) + integerRdd.persist(StorageLevel.MEMORY_ONLY) + integerRdd.reduce(lambda x, y: x*y) + integerRdd.count() diff --git a/rdd/reduce/ReduceExample.py b/rdd/reduce/ReduceExample.py index 7eb8392a..49a72831 100644 --- a/rdd/reduce/ReduceExample.py +++ b/rdd/reduce/ReduceExample.py @@ -1,9 +1,11 @@ -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - sc = SparkContext("local", "reduce") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("reduce").setMaster("local[*]") + sc = SparkContext(conf = conf) + inputIntegers = [1, 2, 3, 4, 5] integerRdd = sc.parallelize(inputIntegers) + product = integerRdd.reduce(lambda x, y: x * y) print("product is :{}".format(product)) diff --git a/rdd/sumOfNumbers/SumOfNumbersSolution.py b/rdd/sumOfNumbers/SumOfNumbersSolution.py index b0315c51..437fa4f2 100644 --- a/rdd/sumOfNumbers/SumOfNumbersSolution.py +++ b/rdd/sumOfNumbers/SumOfNumbersSolution.py @@ -1,12 +1,15 @@ -import sys -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - sc = SparkContext("local", "primeNumbers") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("primeNumbers").setMaster("local[*]") + sc = SparkContext(conf = conf) + lines = sc.textFile("in/prime_nums.text") numbers = lines.flatMap(lambda line: line.split("\t")) + validNumbers = numbers.filter(lambda number: number) + intNumbers = validNumbers.map(lambda number: int(number)) - print("Sum is: ") - print(intNumbers.reduce(lambda x, y: x + y)) + + print("Sum is: {}".format(intNumbers.reduce(lambda x, y: x + y))) + diff --git a/rdd/take/TakeExample.py b/rdd/take/TakeExample.py index fc73a781..b008984a 100644 --- a/rdd/take/TakeExample.py +++ b/rdd/take/TakeExample.py @@ -1,11 +1,12 @@ -import sys -from pyspark import SparkContext +from pyspark import SparkContext, SparkConf if __name__ == "__main__": - sc = SparkContext("local", "take") - sc.setLogLevel("ERROR") + conf = SparkConf().setAppName("take").setMaster("local[*]") + sc = SparkContext(conf = conf) + inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"] wordRdd = sc.parallelize(inputWords) + words = wordRdd.take(3) for word in words: print(word) diff --git a/sparkSql/HousePriceProblem.py b/sparkSql/HousePriceProblem.py index a5f88bfa..3122c3eb 100644 --- a/sparkSql/HousePriceProblem.py +++ b/sparkSql/HousePriceProblem.py @@ -4,20 +4,21 @@ Create a Spark program to read the house data from in/RealEstate.csv, group by location, aggregate the average price per SQ Ft and sort by average price per SQ Ft. - The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and - around it.  + The houses dataset contains a collection of recent real estate listings in  + San Luis Obispo county and around it.  The dataset contains the following fields: 1. MLS: Multiple listing service number for the house (unique ID). - 2. Location: city/town where the house is located. Most locations are in San Luis Obispo county and - northern Santa Barbara county (Santa Maria­Orcutt, Lompoc, Guadelupe, Los Alamos), but there - some out of area locations as well. + 2. Location: city/town where the house is located. Most locations are in  + San Luis Obispo county and northern Santa Barbara county (Santa Maria­Orcutt, Lompoc,  + Guadelupe, Los Alamos), but there some out of area locations as well. 3. Price: the most recent listing price of the house (in dollars). 4. Bedrooms: number of bedrooms. 5. Bathrooms: number of bathrooms. 6. Size: size of the house in square feet. 7. Price/SQ.ft: price of the house per square foot. - 8. Status: type of sale. Thee types are represented in the dataset: Short Sale, Foreclosure and Regular. + 8. Status: type of sale. Thee types are represented in the dataset: Short Sale,  + Foreclosure and Regular. Each field is comma separated. diff --git a/sparkSql/HousePriceSolution.py b/sparkSql/HousePriceSolution.py index d6982f4c..cf4f2e28 100644 --- a/sparkSql/HousePriceSolution.py +++ b/sparkSql/HousePriceSolution.py @@ -4,8 +4,8 @@ if __name__ == "__main__": - session = SparkSession.builder.appName("HousePriceSolution").master("local").getOrCreate() - session.sparkContext.setLogLevel("ERROR") + session = SparkSession.builder.appName("HousePriceSolution").master("local[*]").getOrCreate() + realEstate = session.read \ .option("header","true") \ .option("inferSchema", value=True) \ diff --git a/sparkSql/RddDataframeConversion.py b/sparkSql/RddDataframeConversion.py index 76359eff..8b99156a 100644 --- a/sparkSql/RddDataframeConversion.py +++ b/sparkSql/RddDataframeConversion.py @@ -1,32 +1,33 @@ +import sys +sys.path.insert(0, '.') from pyspark.sql import SparkSession from commons.Utils import Utils -def getColNames(line: str): - splits = Utils.COMMA_DELIMITER.split(line) - return [splits[2], splits[6], splits[9], splits[14]] - def mapResponseRdd(line: str): splits = Utils.COMMA_DELIMITER.split(line) double1 = None if not splits[6] else float(splits[6]) double2 = None if not splits[14] else float(splits[14]) return splits[2], double1, splits[9], double2 +def getColNames(line: str): + splits = Utils.COMMA_DELIMITER.split(line) + return [splits[2], splits[6], splits[9], splits[14]] + if __name__ == "__main__": - session = SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate() + session = SparkSession.builder.appName("StackOverFlowSurvey").master("local[*]").getOrCreate() sc = session.sparkContext - sc.setLogLevel("ERROR") lines = sc.textFile("in/2016-stack-overflow-survey-responses.csv") - colNames = lines \ - .filter(lambda line: Utils.COMMA_DELIMITER.split(line)[2] == "country") \ - .map(getColNames) - responseRDD = lines \ .filter(lambda line: not Utils.COMMA_DELIMITER.split(line)[2] == "country") \ .map(mapResponseRdd) + colNames = lines \ + .filter(lambda line: Utils.COMMA_DELIMITER.split(line)[2] == "country") \ + .map(getColNames) + responseDataFrame = responseRDD.toDF(colNames.collect()[0]) print("=== Print out schema ===") @@ -35,5 +36,5 @@ def mapResponseRdd(line: str): print("=== Print 20 records of responses table ===") responseDataFrame.show(20) - for response in responseDataFrame.rdd.collect(): - print(response) + for response in responseDataFrame.rdd.take(10): + print(response) \ No newline at end of file diff --git a/sparkSql/StackOverFlowSurvey.py b/sparkSql/StackOverFlowSurvey.py index e7e9d94a..a27c7eb9 100644 --- a/sparkSql/StackOverFlowSurvey.py +++ b/sparkSql/StackOverFlowSurvey.py @@ -6,39 +6,43 @@ if __name__ == "__main__": - session = SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate() - session.sparkContext.setLogLevel("ERROR") + session = SparkSession.builder.appName("StackOverFlowSurvey").getOrCreate() + dataFrameReader = session.read responses = dataFrameReader \ .option("header", "true") \ .option("inferSchema", value = True) \ .csv("in/2016-stack-overflow-survey-responses.csv") - + print("=== Print out schema ===") responses.printSchema() - responseWithSelectedColumns = responses.select("country", "occupation", AGE_MIDPOINT, SALARY_MIDPOINT) + responseWithSelectedColumns = responses.select("country", "occupation", + AGE_MIDPOINT, SALARY_MIDPOINT) print("=== Print the selected columns of the table ===") responseWithSelectedColumns.show() print("=== Print records where the response is from Afghanistan ===") - responseWithSelectedColumns.filter(responseWithSelectedColumns["country"] == "Afghanistan").show() + responseWithSelectedColumns\ + .filter(responseWithSelectedColumns["country"] == "Afghanistan").show() print("=== Print the count of occupations ===") - groupedDataset = responseWithSelectedColumns.groupBy("occupation") - groupedDataset.count().show() + groupedData = responseWithSelectedColumns.groupBy("occupation") + groupedData.count().show() print("=== Print records with average mid age less than 20 ===") - responseWithSelectedColumns.filter(responseWithSelectedColumns[AGE_MIDPOINT] < 20).show() + responseWithSelectedColumns\ + .filter(responseWithSelectedColumns[AGE_MIDPOINT] < 20).show() print("=== Print the result by salary middle point in descending order ===") - responseWithSelectedColumns.orderBy(responseWithSelectedColumns[SALARY_MIDPOINT], ascending=False).show() + responseWithSelectedColumns\ + .orderBy(responseWithSelectedColumns[SALARY_MIDPOINT], ascending = False).show() print("=== Group by country and aggregate by average salary middle point ===") - datasetGroupByCountry = responseWithSelectedColumns.groupBy("country") - datasetGroupByCountry.avg(SALARY_MIDPOINT).show() + dataGroupByCountry = responseWithSelectedColumns.groupBy("country") + dataGroupByCountry.avg(SALARY_MIDPOINT).show() responseWithSalaryBucket = responses.withColumn(SALARY_MIDPOINT_BUCKET, ((responses[SALARY_MIDPOINT]/20000).cast("integer")*20000)) @@ -47,6 +51,10 @@ responseWithSalaryBucket.select(SALARY_MIDPOINT, SALARY_MIDPOINT_BUCKET).show() print("=== Group by salary bucket ===") - responseWithSalaryBucket.groupBy(SALARY_MIDPOINT_BUCKET).count().orderBy(SALARY_MIDPOINT_BUCKET).show() + responseWithSalaryBucket \ + .groupBy(SALARY_MIDPOINT_BUCKET) \ + .count() \ + .orderBy(SALARY_MIDPOINT_BUCKET) \ + .show() session.stop() diff --git a/sparkSql/join/UkMakerSpaces.py b/sparkSql/join/UkMakerSpaces.py index 446cbf6c..f1f8a526 100644 --- a/sparkSql/join/UkMakerSpaces.py +++ b/sparkSql/join/UkMakerSpaces.py @@ -1,27 +1,22 @@ from pyspark.sql import SparkSession, functions as fs if __name__ == "__main__": + session = SparkSession.builder.appName("UkMakerSpaces").master("local[*]").getOrCreate() - session = SparkSession.builder.appName("UkMakerSpaces").master("local").getOrCreate() - sc = session.sparkContext - sc.setLogLevel("ERROR") - - makerSpace = session.read \ - .option("header", "true") \ + makerSpace = session.read.option("header", "true") \ .csv("in/uk-makerspaces-identifiable-data.csv") - postCode = session.read \ - .option("header", "true") \ - .csv("in/uk-postcode.csv") \ + postCode = session.read.option("header", "true").csv("in/uk-postcode.csv") \ .withColumn("PostCode", fs.concat_ws("", fs.col("PostCode"), fs.lit(" "))) print("=== Print 20 records of makerspace table ===") makerSpace.select("Name of makerspace", "Postcode").show() print("=== Print 20 records of postcode table ===") - postCode.show() + postCode.select("PostCode", "Region").show() - joined = makerSpace.join(postCode, makerSpace["Postcode"].startswith(postCode["Postcode"]), "left_outer") + joined = makerSpace \ + .join(postCode, makerSpace["Postcode"].startswith(postCode["Postcode"]), "left_outer") print("=== Group by Region ===") joined.groupBy("Region").count().show(200) \ No newline at end of file