2929from pyspark .files import SparkFiles
3030from pyspark .java_gateway import launch_gateway
3131from pyspark .serializers import PickleSerializer , BatchedSerializer , UTF8Deserializer , \
32- PairDeserializer
32+ PairDeserializer
3333from pyspark .storagelevel import StorageLevel
3434from pyspark import rdd
3535from pyspark .rdd import RDD
@@ -50,12 +50,11 @@ class SparkContext(object):
5050 _next_accum_id = 0
5151 _active_spark_context = None
5252 _lock = Lock ()
53- _python_includes = None # zip and egg files that need to be added to PYTHONPATH
54-
53+ _python_includes = None # zip and egg files that need to be added to PYTHONPATH
5554
5655 def __init__ (self , master = None , appName = None , sparkHome = None , pyFiles = None ,
57- environment = None , batchSize = 1024 , serializer = PickleSerializer (), conf = None ,
58- gateway = None ):
56+ environment = None , batchSize = 1024 , serializer = PickleSerializer (), conf = None ,
57+ gateway = None ):
5958 """
6059 Create a new SparkContext. At least the master and app name should be set,
6160 either through the named parameters here or through C{conf}.
@@ -138,8 +137,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
138137 self ._accumulatorServer = accumulators ._start_update_server ()
139138 (host , port ) = self ._accumulatorServer .server_address
140139 self ._javaAccumulator = self ._jsc .accumulator (
141- self ._jvm .java .util .ArrayList (),
142- self ._jvm .PythonAccumulatorParam (host , port ))
140+ self ._jvm .java .util .ArrayList (),
141+ self ._jvm .PythonAccumulatorParam (host , port ))
143142
144143 self .pythonExec = os .environ .get ("PYSPARK_PYTHON" , 'python' )
145144
@@ -165,7 +164,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
165164 (dirname , filename ) = os .path .split (path )
166165 self ._python_includes .append (filename )
167166 sys .path .append (path )
168- if not dirname in sys .path :
167+ if dirname not in sys .path :
169168 sys .path .append (dirname )
170169
171170 # Create a temporary directory inside spark.local.dir:
@@ -192,15 +191,19 @@ def _ensure_initialized(cls, instance=None, gateway=None):
192191 SparkContext ._writeToFile = SparkContext ._jvm .PythonRDD .writeToFile
193192
194193 if instance :
195- if SparkContext ._active_spark_context and SparkContext ._active_spark_context != instance :
194+ if (SparkContext ._active_spark_context and
195+ SparkContext ._active_spark_context != instance ):
196196 currentMaster = SparkContext ._active_spark_context .master
197197 currentAppName = SparkContext ._active_spark_context .appName
198198 callsite = SparkContext ._active_spark_context ._callsite
199199
200200 # Raise error if there is already a running Spark context
201- raise ValueError ("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
202- " created by %s at %s:%s " \
203- % (currentAppName , currentMaster , callsite .function , callsite .file , callsite .linenum ))
201+ raise ValueError (
202+ "Cannot run multiple SparkContexts at once; "
203+ "existing SparkContext(app=%s, master=%s)"
204+ " created by %s at %s:%s "
205+ % (currentAppName , currentMaster ,
206+ callsite .function , callsite .file , callsite .linenum ))
204207 else :
205208 SparkContext ._active_spark_context = instance
206209
@@ -290,7 +293,7 @@ def textFile(self, name, minPartitions=None):
290293 Read a text file from HDFS, a local file system (available on all
291294 nodes), or any Hadoop-supported file system URI, and return it as an
292295 RDD of Strings.
293-
296+
294297 >>> path = os.path.join(tempdir, "sample-text.txt")
295298 >>> with open(path, "w") as testFile:
296299 ... testFile.write("Hello world!")
@@ -584,11 +587,12 @@ def addPyFile(self, path):
584587 HTTP, HTTPS or FTP URI.
585588 """
586589 self .addFile (path )
587- (dirname , filename ) = os .path .split (path ) # dirname may be directory or HDFS/S3 prefix
590+ (dirname , filename ) = os .path .split (path ) # dirname may be directory or HDFS/S3 prefix
588591
589592 if filename .endswith ('.zip' ) or filename .endswith ('.ZIP' ) or filename .endswith ('.egg' ):
590593 self ._python_includes .append (filename )
591- sys .path .append (os .path .join (SparkFiles .getRootDirectory (), filename )) # for tests in local mode
594+ # for tests in local mode
595+ sys .path .append (os .path .join (SparkFiles .getRootDirectory (), filename ))
592596
593597 def setCheckpointDir (self , dirName ):
594598 """
@@ -649,9 +653,9 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False):
649653 Cancelled
650654
651655 If interruptOnCancel is set to true for the job group, then job cancellation will result
652- in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
653- that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
654- where HDFS may respond to Thread.interrupt() by marking nodes as dead.
656+ in Thread.interrupt() being called on the job's executor threads. This is useful to help
657+ ensure that the tasks are actually stopped in a timely manner, but is off by default due
658+ to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
655659 """
656660 self ._jsc .setJobGroup (groupId , description , interruptOnCancel )
657661
@@ -688,7 +692,7 @@ def cancelAllJobs(self):
688692 """
689693 self ._jsc .sc ().cancelAllJobs ()
690694
691- def runJob (self , rdd , partitionFunc , partitions = None , allowLocal = False ):
695+ def runJob (self , rdd , partitionFunc , partitions = None , allowLocal = False ):
692696 """
693697 Executes the given partitionFunc on the specified set of partitions,
694698 returning the result as an array of elements.
@@ -703,7 +707,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
703707 >>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
704708 [0, 1, 16, 25]
705709 """
706- if partitions == None :
710+ if partitions is None :
707711 partitions = range (rdd ._jrdd .partitions ().size ())
708712 javaPartitions = ListConverter ().convert (partitions , self ._gateway ._gateway_client )
709713
@@ -714,6 +718,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
714718 it = self ._jvm .PythonRDD .runJob (self ._jsc .sc (), mappedRDD ._jrdd , javaPartitions , allowLocal )
715719 return list (mappedRDD ._collect_iterator_through_file (it ))
716720
721+
717722def _test ():
718723 import atexit
719724 import doctest
0 commit comments