1010from pyspark .sql .types import FloatType , IntegerType , StringType
1111
1212
13+ # [START datascienceonramp_tripdurationudf]
1314def trip_duration_udf (duration ):
1415 """Convert trip duration to seconds. Return None if negative."""
1516 if not duration :
@@ -33,11 +34,17 @@ def trip_duration_udf(duration):
3334 return int (time )
3435
3536
37+ # [END datascienceonramp_tripdurationudf]
38+
39+ # [START datascienceonramp_stationnameudf]
3640def station_name_udf (name ):
3741 """Replaces '/' with '&'."""
3842 return name .replace ("/" , "&" ) if name else None
3943
4044
45+ # [END datascienceonramp_stationnameudf]
46+
47+ # [START datascienceonramp_usertypeudf]
4148def user_type_udf (user ):
4249 """Converts user type to 'Subscriber' or 'Customer'."""
4350 if not user :
@@ -49,17 +56,10 @@ def user_type_udf(user):
4956 return "Customer"
5057
5158
52- def gender_udf (gender ):
53- """Converts gender to 'Male' or 'Female'."""
54- if not gender :
55- return None
56-
57- if gender .lower ().startswith ("m" ):
58- return "Male"
59- elif gender .lower ().startswith ("f" ):
60- return "Female"
59+ # [END datascienceonramp_usertypeudf]
6160
6261
62+ # [START datascienceonramp_stationlocationudf]
6363def angle_udf (angle ):
6464 """Converts DMS notation to degrees. Return None if not in DMS or degrees notation."""
6565 if not angle :
@@ -74,6 +74,9 @@ def angle_udf(angle):
7474 return float (degrees [0 ])
7575
7676
77+ # [END datascienceonramp_stationlocationudf]
78+
79+ # [START datascienceonramp_timeconvertudf]
7780def compute_time (duration , start , end ):
7881 """Calculates duration, start time, and end time from each other if one value is null."""
7982 time_format = "%Y-%m-%dT%H:%M:%S"
@@ -94,15 +97,17 @@ def compute_time(duration, start, end):
9497 if duration :
9598 # Convert to timedelta
9699 duration = datetime .timedelta (seconds = duration )
97-
100+ # [END datascienceonramp_timeconvertudf]
101+ # [START datascienceonramp_timemissingvalueudf]
98102 # Calculate missing value
99103 if start and end and not duration :
100104 duration = end - start
101105 elif duration and end and not start :
102106 start = end - duration
103107 elif duration and start and not end :
104108 end = start + duration
105-
109+ # [END datascienceonramp_timemissingvalueudf]
110+ # [START datascienceonramp_timereturnudf]
106111 # Transform to primitive types
107112 if duration :
108113 duration = int (duration .total_seconds ())
@@ -114,6 +119,9 @@ def compute_time(duration, start, end):
114119 return (duration , start , end )
115120
116121
122+ # [END datascienceonramp_timereturnudf]
123+
124+ # [START datascienceonramp_timehelperudf]
117125def compute_duration_udf (duration , start , end ):
118126 """Calculates duration from start and end time if null."""
119127 return compute_time (duration , start , end )[0 ]
@@ -129,9 +137,12 @@ def compute_end_udf(duration, start, end):
129137 return compute_time (duration , start , end )[2 ]
130138
131139
140+ # [END datascienceonramp_timehelperudf]
141+
142+ # [START datascienceonramp_sparksession]
132143if __name__ == "__main__" :
133- TABLE = sys .argv [1 ]
134- BUCKET_NAME = sys .argv [2 ]
144+ BUCKET_NAME = sys .argv [1 ]
145+ TABLE = sys .argv [2 ]
135146
136147 # Create a SparkSession, viewable via the Spark UI
137148 spark = SparkSession .builder .appName ("data_cleaning" ).getOrCreate ()
@@ -142,13 +153,20 @@ def compute_end_udf(duration, start, end):
142153 except Py4JJavaError as e :
143154 raise Exception (f"Error reading { TABLE } " ) from e
144155
156+ # [END datascienceonramp_sparksession]
157+
158+ # [START datascienceonramp_removecolumn]
159+ # remove unused column
160+ df = df .drop ("gender" )
161+ # [END datascienceonramp_removecolumn]
162+
163+ # [START datascienceonramp_sparksingleudfs]
145164 # Single-parameter udfs
146165 udfs = {
147166 "start_station_name" : UserDefinedFunction (station_name_udf , StringType ()),
148167 "end_station_name" : UserDefinedFunction (station_name_udf , StringType ()),
149168 "tripduration" : UserDefinedFunction (trip_duration_udf , IntegerType ()),
150169 "usertype" : UserDefinedFunction (user_type_udf , StringType ()),
151- "gender" : UserDefinedFunction (gender_udf , StringType ()),
152170 "start_station_latitude" : UserDefinedFunction (angle_udf , FloatType ()),
153171 "start_station_longitude" : UserDefinedFunction (angle_udf , FloatType ()),
154172 "end_station_latitude" : UserDefinedFunction (angle_udf , FloatType ()),
@@ -157,7 +175,8 @@ def compute_end_udf(duration, start, end):
157175
158176 for name , udf in udfs .items ():
159177 df = df .withColumn (name , udf (name ))
160-
178+ # [END datascienceonramp_sparksingleudfs]
179+ # [START datascienceonramp_sparkmultiudfs]
161180 # Multi-parameter udfs
162181 multi_udfs = {
163182 "tripduration" : {
@@ -176,10 +195,12 @@ def compute_end_udf(duration, start, end):
176195
177196 for name , obj in multi_udfs .items ():
178197 df = df .withColumn (name , obj ["udf" ](* obj ["params" ]))
179-
198+ # [END datascienceonramp_sparkmultiudfs]
199+ # [START datascienceonramp_displaysamplerows]
180200 # Display sample of rows
181201 df .show (n = 20 )
182-
202+ # [END datascienceonramp_displaysamplerows]
203+ # [START datascienceonramp_writetogcs]
183204 # Write results to GCS
184205 if "--dry-run" in sys .argv :
185206 print ("Data will not be uploaded to GCS" )
@@ -222,3 +243,4 @@ def compute_end_udf(duration, start, end):
222243 print (
223244 "Data successfully uploaded to " + "gs://" + BUCKET_NAME + "/" + final_path
224245 )
246+ # [END datascienceonramp_writetogcs]
0 commit comments