4747
4848
4949class SparkContext (object ):
50+
5051 """
5152 Main entry point for Spark functionality. A SparkContext represents the
5253 connection to a Spark cluster, and can be used to create L{RDD}s and
@@ -59,7 +60,8 @@ class SparkContext(object):
5960 _next_accum_id = 0
6061 _active_spark_context = None
6162 _lock = Lock ()
62- _python_includes = None # zip and egg files that need to be added to PYTHONPATH
63+ # zip and egg files that need to be added to PYTHONPATH
64+ _python_includes = None
6365 _default_batch_size_for_serialized_input = 10
6466
6567 def __init__ (self , master = None , appName = None , sparkHome = None , pyFiles = None ,
@@ -99,13 +101,15 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
99101 self ._callsite = rdd ._extract_concise_traceback ()
100102 else :
101103 tempNamedTuple = namedtuple ("Callsite" , "function file linenum" )
102- self ._callsite = tempNamedTuple (function = None , file = None , linenum = None )
104+ self ._callsite = tempNamedTuple (
105+ function = None , file = None , linenum = None )
103106 SparkContext ._ensure_initialized (self , gateway = gateway )
104107 try :
105108 self ._do_init (master , appName , sparkHome , pyFiles , environment , batchSize , serializer ,
106109 conf )
107110 except :
108- # If an error occurs, clean up in order to allow future SparkContext creation:
111+ # If an error occurs, clean up in order to allow future
112+ # SparkContext creation:
109113 self .stop ()
110114 raise
111115
@@ -138,7 +142,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
138142 if not self ._conf .contains ("spark.master" ):
139143 raise Exception ("A master URL must be set in your configuration" )
140144 if not self ._conf .contains ("spark.app.name" ):
141- raise Exception ("An application name must be set in your configuration" )
145+ raise Exception (
146+ "An application name must be set in your configuration" )
142147
143148 # Read back our properties from the conf in case we loaded some of them from
144149 # the classpath or an external config file
@@ -179,7 +184,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
179184 self .addPyFile (path )
180185
181186 # Deploy code dependencies set by spark-submit; these will already have been added
182- # with SparkContext.addFile, so we just need to add them to the PYTHONPATH
187+ # with SparkContext.addFile, so we just need to add them to the
188+ # PYTHONPATH
183189 for path in self ._conf .get ("spark.submit.pyFiles" , "" ).split ("," ):
184190 if path != "" :
185191 (dirname , filename ) = os .path .split (path )
@@ -189,9 +195,11 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
189195 sys .path .append (dirname )
190196
191197 # Create a temporary directory inside spark.local.dir:
192- local_dir = self ._jvm .org .apache .spark .util .Utils .getLocalDir (self ._jsc .sc ().conf ())
198+ local_dir = self ._jvm .org .apache .spark .util .Utils .getLocalDir (
199+ self ._jsc .sc ().conf ())
193200 self ._temp_dir = \
194- self ._jvm .org .apache .spark .util .Utils .createTempDir (local_dir ).getAbsolutePath ()
201+ self ._jvm .org .apache .spark .util .Utils .createTempDir (
202+ local_dir ).getAbsolutePath ()
195203
196204 def _initialize_context (self , jconf ):
197205 """
@@ -213,7 +221,7 @@ def _ensure_initialized(cls, instance=None, gateway=None):
213221
214222 if instance :
215223 if (SparkContext ._active_spark_context and
216- SparkContext ._active_spark_context != instance ):
224+ SparkContext ._active_spark_context != instance ):
217225 currentMaster = SparkContext ._active_spark_context .master
218226 currentAppName = SparkContext ._active_spark_context .appName
219227 callsite = SparkContext ._active_spark_context ._callsite
@@ -284,7 +292,8 @@ def parallelize(self, c, numSlices=None):
284292 # because it sends O(n) Py4J commands. As an alternative, serialized
285293 # objects are written to a file and loaded through textFile().
286294 tempFile = NamedTemporaryFile (delete = False , dir = self ._temp_dir )
287- # Make sure we distribute data evenly if it's smaller than self.batchSize
295+ # Make sure we distribute data evenly if it's smaller than
296+ # self.batchSize
288297 if "__len__" not in dir (c ):
289298 c = list (c ) # Make it a list so we can compute its length
290299 batchSize = min (len (c ) // numSlices , self ._batchSize )
@@ -403,10 +412,12 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
403412 Java object. (default sc._default_batch_size_for_serialized_input)
404413 """
405414 minSplits = minSplits or min (self .defaultParallelism , 2 )
406- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
407- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
415+ batchSize = max (
416+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
417+ ser = BatchedSerializer (PickleSerializer ()) if (
418+ batchSize > 1 ) else PickleSerializer ()
408419 jrdd = self ._jvm .PythonRDD .sequenceFile (self ._jsc , path , keyClass , valueClass ,
409- keyConverter , valueConverter , minSplits , batchSize )
420+ keyConverter , valueConverter , minSplits , batchSize )
410421 return RDD (jrdd , self , ser )
411422
412423 def newAPIHadoopFile (self , path , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -434,10 +445,13 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv
434445 Java object. (default sc._default_batch_size_for_serialized_input)
435446 """
436447 jconf = self ._dictToJavaMap (conf )
437- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
438- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
439- jrdd = self ._jvm .PythonRDD .newAPIHadoopFile (self ._jsc , path , inputFormatClass , keyClass ,
440- valueClass , keyConverter , valueConverter , jconf , batchSize )
448+ batchSize = max (
449+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
450+ ser = BatchedSerializer (PickleSerializer ()) if (
451+ batchSize > 1 ) else PickleSerializer ()
452+ jrdd = self ._jvm .PythonRDD .newAPIHadoopFile (
453+ self ._jsc , path , inputFormatClass , keyClass ,
454+ valueClass , keyConverter , valueConverter , jconf , batchSize )
441455 return RDD (jrdd , self , ser )
442456
443457 def newAPIHadoopRDD (self , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -462,10 +476,13 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N
462476 Java object. (default sc._default_batch_size_for_serialized_input)
463477 """
464478 jconf = self ._dictToJavaMap (conf )
465- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
466- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
467- jrdd = self ._jvm .PythonRDD .newAPIHadoopRDD (self ._jsc , inputFormatClass , keyClass ,
468- valueClass , keyConverter , valueConverter , jconf , batchSize )
479+ batchSize = max (
480+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
481+ ser = BatchedSerializer (PickleSerializer ()) if (
482+ batchSize > 1 ) else PickleSerializer ()
483+ jrdd = self ._jvm .PythonRDD .newAPIHadoopRDD (
484+ self ._jsc , inputFormatClass , keyClass ,
485+ valueClass , keyConverter , valueConverter , jconf , batchSize )
469486 return RDD (jrdd , self , ser )
470487
471488 def hadoopFile (self , path , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -493,10 +510,13 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=
493510 Java object. (default sc._default_batch_size_for_serialized_input)
494511 """
495512 jconf = self ._dictToJavaMap (conf )
496- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
497- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
498- jrdd = self ._jvm .PythonRDD .hadoopFile (self ._jsc , path , inputFormatClass , keyClass ,
499- valueClass , keyConverter , valueConverter , jconf , batchSize )
513+ batchSize = max (
514+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
515+ ser = BatchedSerializer (PickleSerializer ()) if (
516+ batchSize > 1 ) else PickleSerializer ()
517+ jrdd = self ._jvm .PythonRDD .hadoopFile (
518+ self ._jsc , path , inputFormatClass , keyClass ,
519+ valueClass , keyConverter , valueConverter , jconf , batchSize )
500520 return RDD (jrdd , self , ser )
501521
502522 def hadoopRDD (self , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -521,10 +541,12 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
521541 Java object. (default sc._default_batch_size_for_serialized_input)
522542 """
523543 jconf = self ._dictToJavaMap (conf )
524- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
525- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
544+ batchSize = max (
545+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
546+ ser = BatchedSerializer (PickleSerializer ()) if (
547+ batchSize > 1 ) else PickleSerializer ()
526548 jrdd = self ._jvm .PythonRDD .hadoopRDD (self ._jsc , inputFormatClass , keyClass , valueClass ,
527- keyConverter , valueConverter , jconf , batchSize )
549+ keyConverter , valueConverter , jconf , batchSize )
528550 return RDD (jrdd , self , ser )
529551
530552 def _checkpointFile (self , name , input_deserializer ):
@@ -587,7 +609,8 @@ def accumulator(self, value, accum_param=None):
587609 elif isinstance (value , complex ):
588610 accum_param = accumulators .COMPLEX_ACCUMULATOR_PARAM
589611 else :
590- raise Exception ("No default accumulator param for type %s" % type (value ))
612+ raise Exception (
613+ "No default accumulator param for type %s" % type (value ))
591614 SparkContext ._next_accum_id += 1
592615 return Accumulator (SparkContext ._next_accum_id - 1 , value , accum_param )
593616
@@ -632,12 +655,14 @@ def addPyFile(self, path):
632655 HTTP, HTTPS or FTP URI.
633656 """
634657 self .addFile (path )
635- (dirname , filename ) = os .path .split (path ) # dirname may be directory or HDFS/S3 prefix
658+ # dirname may be directory or HDFS/S3 prefix
659+ (dirname , filename ) = os .path .split (path )
636660
637661 if filename .endswith ('.zip' ) or filename .endswith ('.ZIP' ) or filename .endswith ('.egg' ):
638662 self ._python_includes .append (filename )
639663 # for tests in local mode
640- sys .path .append (os .path .join (SparkFiles .getRootDirectory (), filename ))
664+ sys .path .append (
665+ os .path .join (SparkFiles .getRootDirectory (), filename ))
641666
642667 def setCheckpointDir (self , dirName ):
643668 """
@@ -651,7 +676,8 @@ def _getJavaStorageLevel(self, storageLevel):
651676 Returns a Java StorageLevel based on a pyspark.StorageLevel.
652677 """
653678 if not isinstance (storageLevel , StorageLevel ):
654- raise Exception ("storageLevel must be of type pyspark.StorageLevel" )
679+ raise Exception (
680+ "storageLevel must be of type pyspark.StorageLevel" )
655681
656682 newStorageLevel = self ._jvm .org .apache .spark .storage .StorageLevel
657683 return newStorageLevel (storageLevel .useDisk ,
@@ -754,13 +780,15 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
754780 """
755781 if partitions is None :
756782 partitions = range (rdd ._jrdd .partitions ().size ())
757- javaPartitions = ListConverter ().convert (partitions , self ._gateway ._gateway_client )
783+ javaPartitions = ListConverter ().convert (
784+ partitions , self ._gateway ._gateway_client )
758785
759786 # Implementation note: This is implemented as a mapPartitions followed
760787 # by runJob() in order to avoid having to pass a Python lambda into
761788 # SparkContext#runJob.
762789 mappedRDD = rdd .mapPartitions (partitionFunc )
763- it = self ._jvm .PythonRDD .runJob (self ._jsc .sc (), mappedRDD ._jrdd , javaPartitions , allowLocal )
790+ it = self ._jvm .PythonRDD .runJob (
791+ self ._jsc .sc (), mappedRDD ._jrdd , javaPartitions , allowLocal )
764792 return list (mappedRDD ._collect_iterator_through_file (it ))
765793
766794
@@ -772,7 +800,8 @@ def _test():
772800 globs ['sc' ] = SparkContext ('local[4]' , 'PythonTest' , batchSize = 2 )
773801 globs ['tempdir' ] = tempfile .mkdtemp ()
774802 atexit .register (lambda : shutil .rmtree (globs ['tempdir' ]))
775- (failure_count , test_count ) = doctest .testmod (globs = globs , optionflags = doctest .ELLIPSIS )
803+ (failure_count , test_count ) = doctest .testmod (
804+ globs = globs , optionflags = doctest .ELLIPSIS )
776805 globs ['sc' ].stop ()
777806 if failure_count :
778807 exit (- 1 )
0 commit comments