Skip to content

Commit 4b038de

Browse files
author
Pedro Bernardo
committed
Added sparkSql/RddDataframeConversion.py
1 parent e6e21af commit 4b038de

File tree

1 file changed

+39
-0
lines changed

1 file changed

+39
-0
lines changed

sparkSql/RddDataframeConversion.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from pyspark.sql import SparkSession
2+
from commons.Utils import Utils
3+
4+
def getColNames(line: str):
5+
splits = Utils.COMMA_DELIMITER.split(line)
6+
return [splits[2], splits[6], splits[9], splits[14]]
7+
8+
def mapResponseRdd(line: str):
9+
splits = Utils.COMMA_DELIMITER.split(line)
10+
double1 = None if not splits[6] else float(splits[6])
11+
double2 = None if not splits[14] else float(splits[14])
12+
return splits[2], double1, splits[9], double2
13+
14+
if __name__ == "__main__":
15+
16+
session = SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate()
17+
sc = session.sparkContext
18+
sc.setLogLevel("ERROR")
19+
20+
lines = sc.textFile("in/2016-stack-overflow-survey-responses.csv")
21+
22+
colNames = lines \
23+
.filter(lambda line: Utils.COMMA_DELIMITER.split(line)[2] == "country") \
24+
.map(getColNames)
25+
26+
responseRDD = lines \
27+
.filter(lambda line: not Utils.COMMA_DELIMITER.split(line)[2] == "country") \
28+
.map(mapResponseRdd)
29+
30+
responseDataFrame = responseRDD.toDF(colNames.collect()[0])
31+
32+
print("=== Print out schema ===")
33+
responseDataFrame.printSchema()
34+
35+
print("=== Print 20 records of responses table ===")
36+
responseDataFrame.show(20)
37+
38+
for response in responseDataFrame.rdd.collect():
39+
print(response)

0 commit comments

Comments
 (0)